Skip to content

MQTT Bridge with Disk Queue

使用该插件可将本地 MQTT 消息转发到另一个 MQTT Broker,并通过磁盘缓冲提升系统韧性。

功能特性

  • 按桥接实例提供磁盘缓冲。
  • 当远端 Broker 不可用时自动重试。
  • 支持使用 ${topic} 进行主题改写。
  • 单个插件支持配置多个桥接。
  • 配置更新按桥接粒度生效(未变更的桥接保持运行)。

工作原理

  1. 使用每个桥接的 filter_topic 匹配本地发布消息。
  2. 将匹配消息追加到磁盘队列分区。
  3. 将队列中的消息发布到远端 Broker。
  4. 如果因网络/连接问题导致发布失败,则自动重试。
  5. 如果某个队列分区超过 queue.max_total_bytes,则会丢弃该分区中最旧的记录。

配置

推荐通过 EMQX Dashboard 配置,也可以直接修改插件配置文件。

在生产环境中,建议先从一个桥接开始,验证流量和行为后再扩展。

配置文件位置

有两个相关的配置文件位置:

  • 安装后插件包内自带的默认配置文件:

    • Docker 安装示例(0.2.0): /opt/emqx/plugins/emqx_bridge_mqtt_dq-0.2.0/emqx_bridge_mqtt_dq-0.2.0/priv/config.hocon
    • deb/rpm 安装示例(0.2.0): /usr/lib/emqx/plugins/emqx_bridge_mqtt_dq-0.2.0/emqx_bridge_mqtt_dq-0.2.0/priv/config.hocon
  • 通过 Dashboard 或 API 保存后,由 EMQX 管理的持久化插件配置文件:

    • Docker: /opt/emqx/data/plugins/emqx_bridge_mqtt_dq/config.hocon
    • deb/rpm: /var/lib/emqx/plugins/emqx_bridge_mqtt_dq/config.hocon

priv/config.hocon 是插件包自带的默认模板。 data/plugins/.../config.hocon 是 EMQX 保存配置变更后实际使用的持久化配置文件。

快速开始(Dashboard)

  1. 启用插件。
  2. remotes 下添加一个可复用的远端配置。
  3. bridges 下添加一个桥接配置。
  4. 设置 remotefilter_topicremote_topic
  5. 保存并验证远端消息投递。
  6. 只有在基线验证完成后再调整队列和连接池参数。

示例

bridges {
  to-cloud {
    enable = true
    remote = cloud
    proto_ver = "v4"
    keepalive_s = 60
    pool_size = 4
    filter_topic = "devices/#"
    remote_topic = "fwd/${topic}"
    remote_qos = "${qos}"
    remote_retain = "${retain}"
    queue {
      seg_bytes = "100MB"
      max_total_bytes = "1GB"
    }
  }
}

remotes {
  cloud {
    server = "cloud-broker.example.com:8883"
    username = "bridge_user"
    password = "secret"
    ssl {
      enable = true
      verify = verify_none
      # cacertfile = "/path/to/ca.pem"
      # certfile = "/path/to/client-cert.pem"
      # keyfile = "/path/to/client-key.pem"
    }
  }
}

环境变量替换

配置文件中的任意字符串值都可以使用 ${EMQXDQ_*} 语法引用操作系统环境变量。只有带有 EMQXDQ_ 前缀的变量会被解析;其他 ${...} 模式(例如 remote_topic 中的 ${topic})不会被替换。整个值必须就是占位符本身;不支持部分插值(例如 "prefix-${EMQXDQ_VAR}-suffix")。

限制: ${EMQXDQ_*} 替换仅适用于接收字符串值的配置字段(例如 serverusernamepassword)。它不能用于布尔字段(enable)或整数字段(pool_sizekeepalive_s)。

示例:

remotes {
  cloud {
    server = "${EMQXDQ_REMOTE_SERVER}"
    username = "${EMQXDQ_REMOTE_USER}"
    password = "${EMQXDQ_REMOTE_PASSWORD}"
  }
}

如果环境变量未设置,插件会记录错误日志,并将原始 ${EMQXDQ_...} 字符串保留为字面值。这通常会导致连接失败(例如尝试连接到 "${EMQXDQ_REMOTE_SERVER}"),从而使配置错误在日志和状态 API 中都能被明显发现。

警告:动态配置更新与节点本地环境变量

环境变量会在节点解析配置时进行替换。当您通过 EMQX Dashboard、REST API 或 CLI 更新插件配置时,原始配置文本会被持久化,然后在集群中每个节点上重新解析。如果各节点上的相关环境变量值不同(或有的节点缺失),每个节点解析出的生效配置也会不同。

因此,除非您能确保集群中每个节点都设置了完全一致的环境变量,否则不要在通过 Dashboard、API 或 CLI 下发的配置中使用 ${EMQXDQ_...} 替换。如果需要使用节点本地 secret,建议直接编辑配置文件并重载插件,或者使用统一的 secret 注入方式(例如在 Kubernetes 中以相同方式挂载 ConfigMap/Secret)。

配置参考

顶层

字段类型默认值说明
bridgesmap{}桥接名称到桥接配置的映射。
remotesmap{}可复用远端 Broker 定义的映射。

Bridge(bridges.<name>

字段类型默认值说明
enablebooleantrue启用或禁用该桥接。
remotestring对应 remotes 中远端定义的名称。
proto_verstring"v4"MQTT 协议版本:v3v4v5
clientid_prefixstring"emqx-dq-<name>-"自动生成 MQTT Client ID 的前缀。每个连接会追加唯一索引(例如 emqx-dq-mybridge-0)。可选;置空则使用默认值。
keepalive_sinteger60MQTT keep-alive 秒数。
pool_sizeinteger4到远端 Broker 的 MQTT 连接数。
buffer_pool_sizeinteger4每个桥接的磁盘队列 buffer worker 数量。见下方告警说明。
filter_topicstring本地主题过滤模式。支持 +# 通配符。
remote_topicstring目标主题模板。使用 ${topic} 表示原始主题。
enqueue_timeout_msinteger5000等待磁盘队列确认的最大阻塞时间(毫秒)。仅对 QoS > 0 生效;QoS 0 始终异步。
max_inflightinteger32每个到远端 Broker 连接的最大未确认消息数。控制从磁盘队列取批大小以及 emqtt 发送窗口。
remote_qosstring"${qos}"向远端 Broker 发布时使用的 QoS("0""1""2")。默认 "${qos}" 表示保持原消息 QoS。
remote_retainstring"${retain}"向远端 Broker 发布时使用的 retain 标志("true""false")。默认 "${retain}" 表示保持原消息 retain 标志。
max_publish_retriesinteger-1每条消息在被丢弃前的发布重试次数。-1 表示无限重试。每次 PUBACK 失败或连接丢失都会消耗一次重试额度。

Remote(remotes.<name>

字段类型默认值说明
serverstring远端 MQTT Broker 地址(host:port)。
usernamestring""连接远端 Broker 使用的用户名。
passwordstring""连接远端 Broker 使用的密码。
ssl.enablebooleanfalse是否启用到远端 Broker 的 SSL/TLS 连接。
ssl.verifystringverify_noneTLS 校验模式。支持:verify_noneverify_peer
ssl.snistringserver hostnameTLS SNI。默认使用 server 主机名。设为 "disable" 可关闭 SNI。
ssl.cacertfilestring用于校验远端 Broker 证书的 CA 证书文件。
ssl.certfilestring双向 TLS 认证使用的客户端证书文件。
ssl.keyfilestring双向 TLS 认证使用的客户端私钥文件。

Queue

字段类型默认值说明
queue.base_dirstring"emqx_bridge_mqtt_dq"磁盘队列段文件的基础目录。桥接名与分区索引会自动追加在后面(即 <base_dir>/<bridge_name>/<index>)。相对路径基于 EMQX data_dir 解析;绝对路径则按原样使用。
queue_seg_bytesstring"100MB"单个队列段文件的最大大小。
queue.max_total_bytesstring"1GB"每个分区允许使用的最大磁盘队列大小。每个桥接使用 buffer_pool_size 个分区(默认 4),因此最坏情况下总磁盘使用量为 buffer_pool_size x 该值。超出后会丢弃最旧消息。

主题模板

remote_topic 字段支持 ${topic} 占位符,转发时会被替换为原始发布主题。

示例:

  • remote_topic = "${topic}":保持原始主题不变直接转发。
  • remote_topic = "forwarded/${topic}":在原始主题前增加前缀。
  • remote_topic = "region1/${topic}":添加区域命名空间。

remote_topic 在消息从队列发送出去时生效。修改该字段后,受影响桥接在重启后,队列中的消息会使用新的模板。

REST API

该插件在 EMQX plugin API 基础路径下暴露四个端点:

  • GET /api/v5/plugin_api/emqx_bridge_mqtt_dq/metrics:Prometheus 文本格式
  • GET /api/v5/plugin_api/emqx_bridge_mqtt_dq/stats:JSON 仪表盘快照
  • GET /api/v5/plugin_api/emqx_bridge_mqtt_dq/stats/<bridge>:单个桥接视图
  • GET /api/v5/plugin_api/emqx_bridge_mqtt_dq/status:插件/集群健康状态摘要

所有 JSON 端点都返回 application/json; charset=utf-8

这些 JSON API 是集群聚合的。如果有节点不可用或聚合超时,API 仍会返回尽力而为的数据,但响应中会明确给出集群完整性元数据。

示例:

bash
curl -u admin:public \
  http://127.0.0.1:18083/api/v5/plugin_api/emqx_bridge_mqtt_dq/metrics
bash
curl -u admin:public \
  http://127.0.0.1:18083/api/v5/plugin_api/emqx_bridge_mqtt_dq/stats

/stats 响应结构

/stats 响应体包含:

  • cluster:集群完整性与失败节点信息
  • uptime_seconds:响应节点中观察到的最大插件运行时长
  • summary:所有已配置桥接的汇总统计
  • bridges:每个已配置桥接对应一项

示例:

json
{
  "cluster": {
    "complete": true,
    "responded_nodes": ["emqx@127.0.0.1"],
    "failed_nodes": [],
    "timeout_ms": 5000
  },
  "uptime_seconds": 123,
  "summary": {
    "bridge_count": 1,
    "running_bridge_count": 1,
    "buffered": 12,
    "backlog": 3,
    "inflight": 8,
    "enqueue": 1000,
    "dequeue": 995,
    "publish": 990,
    "drop": 5
  },
  "bridges": [
    {
      "name": "to-cloud",
      "config_state": "enabled",
      "runtime_state": "running",
      "status": "ok",
      "status_reason": null,
      "enqueue": 1000,
      "dequeue": 995,
      "publish": 990,
      "drop": 5,
      "retried_by_reason": {
        "connect_failed": 2,
        "reason_code": 3
      },
      "buffered": 12,
      "backlog": 3,
      "inflight": 8,
      "buffers": [
        {
          "bridge": "to-cloud",
          "index": 0,
          "status": "running",
          "buffered": 12
        }
      ],
      "connectors": [
        {
          "bridge": "to-cloud",
          "index": 0,
          "status": "connected",
          "backlog": 3,
          "inflight": 8
        }
      ]
    }
  ]
}

GET /stats/<bridge> 返回:

json
{
  "cluster": {
    "complete": true,
    "responded_nodes": ["emqx@127.0.0.1"],
    "failed_nodes": [],
    "timeout_ms": 5000
  },
  "bridge": {
    "name": "to-cloud",
    "config_state": "enabled",
    "runtime_state": "running",
    "status": "ok"
  }
}

如果当前配置中不存在该桥接,则 API 返回 404

GET /status 返回精简健康视图:

json
{
  "plugin": "emqx_bridge_mqtt_dq",
  "cluster": {
    "complete": true,
    "responded_nodes": ["emqx@127.0.0.1"],
    "failed_nodes": [],
    "timeout_ms": 5000
  },
  "status": "ok",
  "bridge_count": 1
}

/metrics 端点以 Prometheus 文本格式返回集群聚合指标,例如:

  • emqx_bridge_mqtt_dq_uptime_seconds
  • emqx_bridge_mqtt_dq_bridge_enqueue_total{bridge="..."}
  • emqx_bridge_mqtt_dq_bridge_dequeue_total{bridge="..."}
  • emqx_bridge_mqtt_dq_bridge_publish_total{bridge="..."}
  • emqx_bridge_mqtt_dq_bridge_drop_total{bridge="..."}
  • emqx_bridge_mqtt_dq_bridge_status{bridge="...",status="..."}
  • emqx_bridge_mqtt_dq_bridge_retry_reason_total{bridge="...",reason="..."}
  • emqx_bridge_mqtt_dq_buffer_buffered{bridge="...",index="..."}
  • emqx_bridge_mqtt_dq_connector_backlog{bridge="...",index="..."}
  • emqx_bridge_mqtt_dq_connector_inflight{bridge="...",index="..."}

指标语义

Bridge 指标

  • enqueue:进入桥接入队路径的本地消息数
  • dequeue:从本地队列中持久化移除的消息数
  • publish:成功发布到远端 Broker 的消息数
  • drop:最终被判定为丢弃的排队消息数
  • retried_by_reason:按原因分类的重试次数
  • config_state:配置中期望的桥接状态(enableddisabled
  • runtime_state:观测到的 worker/storage 运行状态(runningdegradedpurged
  • status:面向运维的桥接健康状态(okpartialdisconnecteddisablederror

当前重试原因包括:

  • reason_code:远端 Broker 返回了非成功 MQTT reason code,因此消息被重试
  • connect_failed:连接或发布失败触发重试
  • timeout:超时类重试
  • connection_lost:关联客户端进程退出,导致 inflight 消息被回收后重试
  • other:无法归类时的兜底原因

当桥接完全清空后,计数器满足:

  • enqueue = dequeue = publish + drop

Buffer 指标

  • buffered:当前该持久化队列分区中存储的消息数量
  • buffer 行中的 status:worker 存在时为 running,否则为 missing

该 gauge 会在 replayq:open/1 之后立即刷新,因此即使尚无新流量,磁盘上已持久化的消息也能被观测到。

Connector 指标

  • backlog:connector backlog 队列中等待分发给 emqtt 的消息数
  • inflight:已经交给 emqtt 但仍在等待完成的消息数
  • connector 行中的 statusconnecteddisconnectedpartialmissingunknown

配置变更行为

配置更新按桥接粒度生效:

  • 已变更的桥接会重启。
  • 已删除的桥接会停止。
  • 被禁用的桥接会停止并清空其队列目录。
  • 新桥接会启动。
  • 未变更的桥接会继续运行。

插件整体不会因为每次配置更新而重启。 但每个被重启的桥接都会有一个短暂切换窗口,在此期间匹配的消息可能被丢弃。建议在低流量时段进行影响桥接的配置调整。

修改配置前建议

  1. 确认哪些桥接会受到影响。
  2. 选择低流量窗口执行。
  3. 通过 Dashboard 状态和日志观察重启/重连错误。
  4. 对关键链路,在变更后验证端到端消息投递。

修改 queue.base_dir

修改已启用桥接的 queue.base_dir 会使该桥接使用新目录重启。实际队列路径为 <base_dir>/<bridge_name>/<index>。旧目录不会被自动清理:它会作为孤立数据继续保留在磁盘中。如果旧目录不再需要,请在确认桥接已在新路径上正常运行后手动删除。

修改 buffer_pool_size

buffer_pool_size 控制每个桥接拥有多少个磁盘队列分区。消息会通过 erlang:phash2(Topic, buffer_pool_size) 被分配到各分区。修改该值会带来以下影响:

  1. 缩小分区池(例如 8 -> 4):索引 >= 新大小的分区将不再被消费。其旧文件仍保留在 queue.base_dir 下,需要手动清理。

  2. 扩大分区池(例如 4 -> 8):哈希空间会发生变化,因此此前映射到分区 N 的主题,之后可能映射到分区 M。旧分区中已排队的消息仍会继续投递(且在该分区内保持顺序),但同一主题的新消息可能进入新分区。这会打破变更前后的端到端逐主题顺序:某些旧消息可能会晚于新消息送达。

  3. 桥接级掉消息窗口:修改 buffer_pool_size 会导致该桥接重启,因此切换期间匹配中的消息可能丢失。

消息投递保证

在正常运行情况下,该插件提供至少一次投递;在持续故障情况下,则退化为尽力而为。以下场景可能发生消息丢失:

磁盘队列溢出

当某个队列分区超过 queue.max_total_bytes 时,该分区中最旧的消息会被静默丢弃,以便为新数据腾出空间。插件会周期性输出警告日志(mqtt_dq_buffer_overflow),而不是为每条消息单独记日志。

缓解方式: 增加 queue.max_total_bytes、增加 buffer_pool_size 以分散负载到更多分区,或降低消息吞吐量。

远端 Broker 拒绝发布

当远端 Broker 在 PUBACK(QoS 1)或 PUBREC(QoS 2)中返回非成功 reason code 时,connector 最多会重试 3 次。如果重试全部耗尽,消息会被丢弃,并输出警告日志(mqtt_dq_publish_dropped)。

常见拒绝 reason code 包括:

Code含义(MQTT 5.0)
16No matching subscribers
128Unspecified error
131Implementation specific error
135Not authorized
144Topic Name invalid
145Packet identifier in use
151Quota exceeded

注意:reason code 0(Success)和 16(No matching subscribers)会被视为成功投递,不会触发重试。

缓解方式: 检查远端 Broker 的 ACL 与主题策略,并结合日志定位具体 reason code。

连接反复失败

每当到远端 Broker 的连接断开时,所有待确认(尚未 ACK)的消息都会损失一次重试机会。若在没有一次成功投递的情况下累计发生 3 次连接失败,则该消息会被丢弃。

例如,一条在网络故障期间发布的消息:

  1. 本地入队(重试计数 = 3)。
  2. 远端重连成功,消息被发送,但在 ACK 返回前再次断连(重试计数 = 2)。
  3. 再次重连并发送,连接又断开(重试计数 = 1)。
  4. 再次重连并发送,若再次被拒绝或连接断开(重试计数 = 0)。
  5. 消息被丢弃,并记录警告日志。

缓解方式: 排查远端 Broker 持续不可达的原因。短暂网络抖动会被透明处理;只有持续不稳定才会触发该场景。

入队背压(QoS > 0 本地发布)

当 QoS 1 或 2 客户端发布了一条匹配桥接的消息时,插件会先将该消息发送到 buffer worker 的邮箱,然后最多阻塞发布会话进程 enqueue_timeout_ms(默认 5000 ms),等待磁盘写入确认。

当该超时触发时,消息本身不会丢失:它已经进入了 buffer worker 的 Erlang 邮箱,最终仍会被写入磁盘队列。该超时仅用于控制本地发布路径最多阻塞多久。

其重要性在于:message.publish Hook 运行在 MQTT 会话进程内部。当 Hook 阻塞时,该会话无法处理来自同一客户端的其他消息。如果 buffer worker 很慢(例如磁盘 IO 卡顿或邮箱堆积严重),这个超时机制可以防止单个慢桥接无限期阻塞客户端会话。

当超时触发时:

  1. 会话进程停止等待并继续正常执行。
  2. 客户端照常收到 PUBACK/PUBREC:不会显式收到发布错误。
  3. 插件会记录告警日志(mqtt_dq_enqueue_timeout)。
  4. 消息继续保留在 buffer worker 的邮箱中,待 worker 追上处理进度后写入磁盘队列。

真正的风险是间接的:如果 buffer worker 长时间跟不上,邮箱会无限增长,从而增加内存占用。这意味着桥接处理能力已经落后于输入流量。

缓解方式: 增加 buffer_pool_size 分散负载,为 queue.base_dir 使用更快的存储,或降低匹配主题的消息速率。

注意:QoS 0 本地发布从不阻塞:它们始终异步入队,不对发布会话施加背压。

Bridge 重启窗口

当桥接因配置变更、插件重载或启停操作而重启时,会存在一个短暂窗口,在此期间匹配消息可能无法被捕获。

缓解方式: 在低流量时段执行配置变更。

QoS 0 的 TCP 级投递

对于发送到远端 Broker 的 QoS 0 消息,connector 在消息进入本地 TCP 发送缓冲区后即认为投递成功。如果远端 Broker 在 TCP 栈接收数据之后、Broker 进程真正处理该消息之前发生崩溃,则消息可能丢失,而且 connector 不会收到任何错误反馈。

这是 MQTT QoS 0 本身的特性,并非该插件特有问题。

运维说明

持久化

缓冲消息在以下场景下依然可以保留:

  • EMQX 节点重启。
  • 插件重载或升级。
  • 到远端 Broker 的临时网络中断。

队列上限

当某个分区的队列使用量超过 queue.max_total_bytes 时,该分区中最旧的消息会被丢弃,以便为新数据腾出空间。插件会输出警告日志。

Pool 大小规划

每个 buffer worker 会通过 BufferIndex rem pool_size 被分配给一个 connector。为了让负载分布更均匀:

  • buffer_pool_size 应当大于或等于 pool_size
  • buffer_pool_size 最好是 pool_size整数倍 (即 buffer_pool_size mod pool_size = 0)。

较好的例子:pool_size = 4, buffer_pool_size = 4(1:1), pool_size = 4, buffer_pool_size = 8(2:1)。

较差的例子:pool_size = 4, buffer_pool_size = 5:connector 0 需要服务两个 buffer,而其他 connector 只服务一个,会导致吞吐分布不均。

如果某个 connector 断开,分配给它的 buffer worker 会暂停;待该 connector 重连后会自动恢复。

顺序性

在桥接配置稳定不变的情况下,可以保持逐主题顺序。如果修改了 buffer_pool_size,则可能按前文所述暂时影响顺序性。

发布者 ACK 行为(QoS 1/2)

对于匹配桥接的消息:

  • 返回给发布客户端的 PUBACK(QoS 1)与 PUBREC(QoS 2)可能会被延迟,直到 EMQX 等到磁盘队列入队确认(enqueue_timeout_ms)。
  • 即使该等待最终超时,EMQX 仍会完成客户端的发布流程。客户端不会因为磁盘队列入队超时而收到发布错误。