MQTT Bridge with Disk Queue
使用该插件可将本地 MQTT 消息转发到另一个 MQTT Broker,并通过磁盘缓冲提升系统韧性。
功能特性
- 按桥接实例提供磁盘缓冲。
- 当远端 Broker 不可用时自动重试。
- 支持使用
${topic}进行主题改写。 - 单个插件支持配置多个桥接。
- 配置更新按桥接粒度生效(未变更的桥接保持运行)。
工作原理
- 使用每个桥接的
filter_topic匹配本地发布消息。 - 将匹配消息追加到磁盘队列分区。
- 将队列中的消息发布到远端 Broker。
- 如果因网络/连接问题导致发布失败,则自动重试。
- 如果某个队列分区超过
queue.max_total_bytes,则会丢弃该分区中最旧的记录。
配置
推荐通过 EMQX Dashboard 配置,也可以直接修改插件配置文件。
在生产环境中,建议先从一个桥接开始,验证流量和行为后再扩展。
配置文件位置
有两个相关的配置文件位置:
安装后插件包内自带的默认配置文件:
- Docker 安装示例(
0.2.0):/opt/emqx/plugins/emqx_bridge_mqtt_dq-0.2.0/emqx_bridge_mqtt_dq-0.2.0/priv/config.hocon - deb/rpm 安装示例(
0.2.0):/usr/lib/emqx/plugins/emqx_bridge_mqtt_dq-0.2.0/emqx_bridge_mqtt_dq-0.2.0/priv/config.hocon
- Docker 安装示例(
通过 Dashboard 或 API 保存后,由 EMQX 管理的持久化插件配置文件:
- Docker:
/opt/emqx/data/plugins/emqx_bridge_mqtt_dq/config.hocon - deb/rpm:
/var/lib/emqx/plugins/emqx_bridge_mqtt_dq/config.hocon
- Docker:
priv/config.hocon 是插件包自带的默认模板。 data/plugins/.../config.hocon 是 EMQX 保存配置变更后实际使用的持久化配置文件。
快速开始(Dashboard)
- 启用插件。
- 在
remotes下添加一个可复用的远端配置。 - 在
bridges下添加一个桥接配置。 - 设置
remote、filter_topic和remote_topic。 - 保存并验证远端消息投递。
- 只有在基线验证完成后再调整队列和连接池参数。
示例
bridges {
to-cloud {
enable = true
remote = cloud
proto_ver = "v4"
keepalive_s = 60
pool_size = 4
filter_topic = "devices/#"
remote_topic = "fwd/${topic}"
remote_qos = "${qos}"
remote_retain = "${retain}"
queue {
seg_bytes = "100MB"
max_total_bytes = "1GB"
}
}
}
remotes {
cloud {
server = "cloud-broker.example.com:8883"
username = "bridge_user"
password = "secret"
ssl {
enable = true
verify = verify_none
# cacertfile = "/path/to/ca.pem"
# certfile = "/path/to/client-cert.pem"
# keyfile = "/path/to/client-key.pem"
}
}
}环境变量替换
配置文件中的任意字符串值都可以使用 ${EMQXDQ_*} 语法引用操作系统环境变量。只有带有 EMQXDQ_ 前缀的变量会被解析;其他 ${...} 模式(例如 remote_topic 中的 ${topic})不会被替换。整个值必须就是占位符本身;不支持部分插值(例如 "prefix-${EMQXDQ_VAR}-suffix")。
限制: ${EMQXDQ_*} 替换仅适用于接收字符串值的配置字段(例如 server、username、password)。它不能用于布尔字段(enable)或整数字段(pool_size、keepalive_s)。
示例:
remotes {
cloud {
server = "${EMQXDQ_REMOTE_SERVER}"
username = "${EMQXDQ_REMOTE_USER}"
password = "${EMQXDQ_REMOTE_PASSWORD}"
}
}如果环境变量未设置,插件会记录错误日志,并将原始 ${EMQXDQ_...} 字符串保留为字面值。这通常会导致连接失败(例如尝试连接到 "${EMQXDQ_REMOTE_SERVER}"),从而使配置错误在日志和状态 API 中都能被明显发现。
警告:动态配置更新与节点本地环境变量
环境变量会在节点解析配置时进行替换。当您通过 EMQX Dashboard、REST API 或 CLI 更新插件配置时,原始配置文本会被持久化,然后在集群中每个节点上重新解析。如果各节点上的相关环境变量值不同(或有的节点缺失),每个节点解析出的生效配置也会不同。
因此,除非您能确保集群中每个节点都设置了完全一致的环境变量,否则不要在通过 Dashboard、API 或 CLI 下发的配置中使用
${EMQXDQ_...}替换。如果需要使用节点本地 secret,建议直接编辑配置文件并重载插件,或者使用统一的 secret 注入方式(例如在 Kubernetes 中以相同方式挂载 ConfigMap/Secret)。
配置参考
顶层
| 字段 | 类型 | 默认值 | 说明 |
|---|---|---|---|
bridges | map | {} | 桥接名称到桥接配置的映射。 |
remotes | map | {} | 可复用远端 Broker 定义的映射。 |
Bridge(bridges.<name>)
| 字段 | 类型 | 默认值 | 说明 |
|---|---|---|---|
enable | boolean | true | 启用或禁用该桥接。 |
remote | string | — | 对应 remotes 中远端定义的名称。 |
proto_ver | string | "v4" | MQTT 协议版本:v3、v4 或 v5。 |
clientid_prefix | string | "emqx-dq-<name>-" | 自动生成 MQTT Client ID 的前缀。每个连接会追加唯一索引(例如 emqx-dq-mybridge-0)。可选;置空则使用默认值。 |
keepalive_s | integer | 60 | MQTT keep-alive 秒数。 |
pool_size | integer | 4 | 到远端 Broker 的 MQTT 连接数。 |
buffer_pool_size | integer | 4 | 每个桥接的磁盘队列 buffer worker 数量。见下方告警说明。 |
filter_topic | string | — | 本地主题过滤模式。支持 + 和 # 通配符。 |
remote_topic | string | — | 目标主题模板。使用 ${topic} 表示原始主题。 |
enqueue_timeout_ms | integer | 5000 | 等待磁盘队列确认的最大阻塞时间(毫秒)。仅对 QoS > 0 生效;QoS 0 始终异步。 |
max_inflight | integer | 32 | 每个到远端 Broker 连接的最大未确认消息数。控制从磁盘队列取批大小以及 emqtt 发送窗口。 |
remote_qos | string | "${qos}" | 向远端 Broker 发布时使用的 QoS("0"、"1"、"2")。默认 "${qos}" 表示保持原消息 QoS。 |
remote_retain | string | "${retain}" | 向远端 Broker 发布时使用的 retain 标志("true"、"false")。默认 "${retain}" 表示保持原消息 retain 标志。 |
max_publish_retries | integer | -1 | 每条消息在被丢弃前的发布重试次数。-1 表示无限重试。每次 PUBACK 失败或连接丢失都会消耗一次重试额度。 |
Remote(remotes.<name>)
| 字段 | 类型 | 默认值 | 说明 |
|---|---|---|---|
server | string | — | 远端 MQTT Broker 地址(host:port)。 |
username | string | "" | 连接远端 Broker 使用的用户名。 |
password | string | "" | 连接远端 Broker 使用的密码。 |
ssl.enable | boolean | false | 是否启用到远端 Broker 的 SSL/TLS 连接。 |
ssl.verify | string | verify_none | TLS 校验模式。支持:verify_none、verify_peer。 |
ssl.sni | string | server hostname | TLS SNI。默认使用 server 主机名。设为 "disable" 可关闭 SNI。 |
ssl.cacertfile | string | — | 用于校验远端 Broker 证书的 CA 证书文件。 |
ssl.certfile | string | — | 双向 TLS 认证使用的客户端证书文件。 |
ssl.keyfile | string | — | 双向 TLS 认证使用的客户端私钥文件。 |
Queue
| 字段 | 类型 | 默认值 | 说明 |
|---|---|---|---|
queue.base_dir | string | "emqx_bridge_mqtt_dq" | 磁盘队列段文件的基础目录。桥接名与分区索引会自动追加在后面(即 <base_dir>/<bridge_name>/<index>)。相对路径基于 EMQX data_dir 解析;绝对路径则按原样使用。 |
queue_seg_bytes | string | "100MB" | 单个队列段文件的最大大小。 |
queue.max_total_bytes | string | "1GB" | 每个分区允许使用的最大磁盘队列大小。每个桥接使用 buffer_pool_size 个分区(默认 4),因此最坏情况下总磁盘使用量为 buffer_pool_size x 该值。超出后会丢弃最旧消息。 |
主题模板
remote_topic 字段支持 ${topic} 占位符,转发时会被替换为原始发布主题。
示例:
remote_topic = "${topic}":保持原始主题不变直接转发。remote_topic = "forwarded/${topic}":在原始主题前增加前缀。remote_topic = "region1/${topic}":添加区域命名空间。
remote_topic 在消息从队列发送出去时生效。修改该字段后,受影响桥接在重启后,队列中的消息会使用新的模板。
REST API
该插件在 EMQX plugin API 基础路径下暴露四个端点:
GET /api/v5/plugin_api/emqx_bridge_mqtt_dq/metrics:Prometheus 文本格式GET /api/v5/plugin_api/emqx_bridge_mqtt_dq/stats:JSON 仪表盘快照GET /api/v5/plugin_api/emqx_bridge_mqtt_dq/stats/<bridge>:单个桥接视图GET /api/v5/plugin_api/emqx_bridge_mqtt_dq/status:插件/集群健康状态摘要
所有 JSON 端点都返回 application/json; charset=utf-8。
这些 JSON API 是集群聚合的。如果有节点不可用或聚合超时,API 仍会返回尽力而为的数据,但响应中会明确给出集群完整性元数据。
示例:
curl -u admin:public \
http://127.0.0.1:18083/api/v5/plugin_api/emqx_bridge_mqtt_dq/metricscurl -u admin:public \
http://127.0.0.1:18083/api/v5/plugin_api/emqx_bridge_mqtt_dq/stats/stats 响应结构
/stats 响应体包含:
cluster:集群完整性与失败节点信息uptime_seconds:响应节点中观察到的最大插件运行时长summary:所有已配置桥接的汇总统计bridges:每个已配置桥接对应一项
示例:
{
"cluster": {
"complete": true,
"responded_nodes": ["emqx@127.0.0.1"],
"failed_nodes": [],
"timeout_ms": 5000
},
"uptime_seconds": 123,
"summary": {
"bridge_count": 1,
"running_bridge_count": 1,
"buffered": 12,
"backlog": 3,
"inflight": 8,
"enqueue": 1000,
"dequeue": 995,
"publish": 990,
"drop": 5
},
"bridges": [
{
"name": "to-cloud",
"config_state": "enabled",
"runtime_state": "running",
"status": "ok",
"status_reason": null,
"enqueue": 1000,
"dequeue": 995,
"publish": 990,
"drop": 5,
"retried_by_reason": {
"connect_failed": 2,
"reason_code": 3
},
"buffered": 12,
"backlog": 3,
"inflight": 8,
"buffers": [
{
"bridge": "to-cloud",
"index": 0,
"status": "running",
"buffered": 12
}
],
"connectors": [
{
"bridge": "to-cloud",
"index": 0,
"status": "connected",
"backlog": 3,
"inflight": 8
}
]
}
]
}GET /stats/<bridge> 返回:
{
"cluster": {
"complete": true,
"responded_nodes": ["emqx@127.0.0.1"],
"failed_nodes": [],
"timeout_ms": 5000
},
"bridge": {
"name": "to-cloud",
"config_state": "enabled",
"runtime_state": "running",
"status": "ok"
}
}如果当前配置中不存在该桥接,则 API 返回 404。
GET /status 返回精简健康视图:
{
"plugin": "emqx_bridge_mqtt_dq",
"cluster": {
"complete": true,
"responded_nodes": ["emqx@127.0.0.1"],
"failed_nodes": [],
"timeout_ms": 5000
},
"status": "ok",
"bridge_count": 1
}/metrics 端点以 Prometheus 文本格式返回集群聚合指标,例如:
emqx_bridge_mqtt_dq_uptime_secondsemqx_bridge_mqtt_dq_bridge_enqueue_total{bridge="..."}emqx_bridge_mqtt_dq_bridge_dequeue_total{bridge="..."}emqx_bridge_mqtt_dq_bridge_publish_total{bridge="..."}emqx_bridge_mqtt_dq_bridge_drop_total{bridge="..."}emqx_bridge_mqtt_dq_bridge_status{bridge="...",status="..."}emqx_bridge_mqtt_dq_bridge_retry_reason_total{bridge="...",reason="..."}emqx_bridge_mqtt_dq_buffer_buffered{bridge="...",index="..."}emqx_bridge_mqtt_dq_connector_backlog{bridge="...",index="..."}emqx_bridge_mqtt_dq_connector_inflight{bridge="...",index="..."}
指标语义
Bridge 指标
enqueue:进入桥接入队路径的本地消息数dequeue:从本地队列中持久化移除的消息数publish:成功发布到远端 Broker 的消息数drop:最终被判定为丢弃的排队消息数retried_by_reason:按原因分类的重试次数config_state:配置中期望的桥接状态(enabled或disabled)runtime_state:观测到的 worker/storage 运行状态(running、degraded或purged)status:面向运维的桥接健康状态(ok、partial、disconnected、disabled、error)
当前重试原因包括:
reason_code:远端 Broker 返回了非成功 MQTT reason code,因此消息被重试connect_failed:连接或发布失败触发重试timeout:超时类重试connection_lost:关联客户端进程退出,导致 inflight 消息被回收后重试other:无法归类时的兜底原因
当桥接完全清空后,计数器满足:
enqueue = dequeue = publish + drop
Buffer 指标
buffered:当前该持久化队列分区中存储的消息数量- buffer 行中的
status:worker 存在时为running,否则为missing
该 gauge 会在 replayq:open/1 之后立即刷新,因此即使尚无新流量,磁盘上已持久化的消息也能被观测到。
Connector 指标
backlog:connector backlog 队列中等待分发给emqtt的消息数inflight:已经交给emqtt但仍在等待完成的消息数- connector 行中的
status:connected、disconnected、partial、missing或unknown
配置变更行为
配置更新按桥接粒度生效:
- 已变更的桥接会重启。
- 已删除的桥接会停止。
- 被禁用的桥接会停止并清空其队列目录。
- 新桥接会启动。
- 未变更的桥接会继续运行。
插件整体不会因为每次配置更新而重启。 但每个被重启的桥接都会有一个短暂切换窗口,在此期间匹配的消息可能被丢弃。建议在低流量时段进行影响桥接的配置调整。
修改配置前建议
- 确认哪些桥接会受到影响。
- 选择低流量窗口执行。
- 通过 Dashboard 状态和日志观察重启/重连错误。
- 对关键链路,在变更后验证端到端消息投递。
修改 queue.base_dir
修改已启用桥接的 queue.base_dir 会使该桥接使用新目录重启。实际队列路径为 <base_dir>/<bridge_name>/<index>。旧目录不会被自动清理:它会作为孤立数据继续保留在磁盘中。如果旧目录不再需要,请在确认桥接已在新路径上正常运行后手动删除。
修改 buffer_pool_size
buffer_pool_size 控制每个桥接拥有多少个磁盘队列分区。消息会通过 erlang:phash2(Topic, buffer_pool_size) 被分配到各分区。修改该值会带来以下影响:
缩小分区池(例如 8 -> 4):索引 >= 新大小的分区将不再被消费。其旧文件仍保留在
queue.base_dir下,需要手动清理。扩大分区池(例如 4 -> 8):哈希空间会发生变化,因此此前映射到分区 N 的主题,之后可能映射到分区 M。旧分区中已排队的消息仍会继续投递(且在该分区内保持顺序),但同一主题的新消息可能进入新分区。这会打破变更前后的端到端逐主题顺序:某些旧消息可能会晚于新消息送达。
桥接级掉消息窗口:修改
buffer_pool_size会导致该桥接重启,因此切换期间匹配中的消息可能丢失。
消息投递保证
在正常运行情况下,该插件提供至少一次投递;在持续故障情况下,则退化为尽力而为。以下场景可能发生消息丢失:
磁盘队列溢出
当某个队列分区超过 queue.max_total_bytes 时,该分区中最旧的消息会被静默丢弃,以便为新数据腾出空间。插件会周期性输出警告日志(mqtt_dq_buffer_overflow),而不是为每条消息单独记日志。
缓解方式: 增加 queue.max_total_bytes、增加 buffer_pool_size 以分散负载到更多分区,或降低消息吞吐量。
远端 Broker 拒绝发布
当远端 Broker 在 PUBACK(QoS 1)或 PUBREC(QoS 2)中返回非成功 reason code 时,connector 最多会重试 3 次。如果重试全部耗尽,消息会被丢弃,并输出警告日志(mqtt_dq_publish_dropped)。
常见拒绝 reason code 包括:
| Code | 含义(MQTT 5.0) |
|---|---|
| 16 | No matching subscribers |
| 128 | Unspecified error |
| 131 | Implementation specific error |
| 135 | Not authorized |
| 144 | Topic Name invalid |
| 145 | Packet identifier in use |
| 151 | Quota exceeded |
注意:reason code 0(Success)和 16(No matching subscribers)会被视为成功投递,不会触发重试。
缓解方式: 检查远端 Broker 的 ACL 与主题策略,并结合日志定位具体 reason code。
连接反复失败
每当到远端 Broker 的连接断开时,所有待确认(尚未 ACK)的消息都会损失一次重试机会。若在没有一次成功投递的情况下累计发生 3 次连接失败,则该消息会被丢弃。
例如,一条在网络故障期间发布的消息:
- 本地入队(重试计数 = 3)。
- 远端重连成功,消息被发送,但在 ACK 返回前再次断连(重试计数 = 2)。
- 再次重连并发送,连接又断开(重试计数 = 1)。
- 再次重连并发送,若再次被拒绝或连接断开(重试计数 = 0)。
- 消息被丢弃,并记录警告日志。
缓解方式: 排查远端 Broker 持续不可达的原因。短暂网络抖动会被透明处理;只有持续不稳定才会触发该场景。
入队背压(QoS > 0 本地发布)
当 QoS 1 或 2 客户端发布了一条匹配桥接的消息时,插件会先将该消息发送到 buffer worker 的邮箱,然后最多阻塞发布会话进程 enqueue_timeout_ms(默认 5000 ms),等待磁盘写入确认。
当该超时触发时,消息本身不会丢失:它已经进入了 buffer worker 的 Erlang 邮箱,最终仍会被写入磁盘队列。该超时仅用于控制本地发布路径最多阻塞多久。
其重要性在于:message.publish Hook 运行在 MQTT 会话进程内部。当 Hook 阻塞时,该会话无法处理来自同一客户端的其他消息。如果 buffer worker 很慢(例如磁盘 IO 卡顿或邮箱堆积严重),这个超时机制可以防止单个慢桥接无限期阻塞客户端会话。
当超时触发时:
- 会话进程停止等待并继续正常执行。
- 客户端照常收到 PUBACK/PUBREC:不会显式收到发布错误。
- 插件会记录告警日志(
mqtt_dq_enqueue_timeout)。 - 消息继续保留在 buffer worker 的邮箱中,待 worker 追上处理进度后写入磁盘队列。
真正的风险是间接的:如果 buffer worker 长时间跟不上,邮箱会无限增长,从而增加内存占用。这意味着桥接处理能力已经落后于输入流量。
缓解方式: 增加 buffer_pool_size 分散负载,为 queue.base_dir 使用更快的存储,或降低匹配主题的消息速率。
注意:QoS 0 本地发布从不阻塞:它们始终异步入队,不对发布会话施加背压。
Bridge 重启窗口
当桥接因配置变更、插件重载或启停操作而重启时,会存在一个短暂窗口,在此期间匹配消息可能无法被捕获。
缓解方式: 在低流量时段执行配置变更。
QoS 0 的 TCP 级投递
对于发送到远端 Broker 的 QoS 0 消息,connector 在消息进入本地 TCP 发送缓冲区后即认为投递成功。如果远端 Broker 在 TCP 栈接收数据之后、Broker 进程真正处理该消息之前发生崩溃,则消息可能丢失,而且 connector 不会收到任何错误反馈。
这是 MQTT QoS 0 本身的特性,并非该插件特有问题。
运维说明
持久化
缓冲消息在以下场景下依然可以保留:
- EMQX 节点重启。
- 插件重载或升级。
- 到远端 Broker 的临时网络中断。
队列上限
当某个分区的队列使用量超过 queue.max_total_bytes 时,该分区中最旧的消息会被丢弃,以便为新数据腾出空间。插件会输出警告日志。
Pool 大小规划
每个 buffer worker 会通过 BufferIndex rem pool_size 被分配给一个 connector。为了让负载分布更均匀:
buffer_pool_size应当大于或等于pool_size。buffer_pool_size最好是pool_size的整数倍 (即buffer_pool_size mod pool_size = 0)。
较好的例子:pool_size = 4, buffer_pool_size = 4(1:1), pool_size = 4, buffer_pool_size = 8(2:1)。
较差的例子:pool_size = 4, buffer_pool_size = 5:connector 0 需要服务两个 buffer,而其他 connector 只服务一个,会导致吞吐分布不均。
如果某个 connector 断开,分配给它的 buffer worker 会暂停;待该 connector 重连后会自动恢复。
顺序性
在桥接配置稳定不变的情况下,可以保持逐主题顺序。如果修改了 buffer_pool_size,则可能按前文所述暂时影响顺序性。
发布者 ACK 行为(QoS 1/2)
对于匹配桥接的消息:
- 返回给发布客户端的
PUBACK(QoS 1)与PUBREC(QoS 2)可能会被延迟,直到 EMQX 等到磁盘队列入队确认(enqueue_timeout_ms)。 - 即使该等待最终超时,EMQX 仍会完成客户端的发布流程。客户端不会因为磁盘队列入队超时而收到发布错误。