Skip to content

使用数据集成获取消息发布与丢弃事件主题消息

消息发布与丢弃事件主题是 EMQX 提供的系统事件主题,用于实时监控消息在 Broker 内部的生命周期,包括消息发布、投递、确认等过程。这些事件对于分析消息流转情况、排查消息丢失问题、统计消息吞吐量具有重要意义。普通客户端无法直接订阅这些系统主题,但可以通过规则引擎捕获它们,用于进一步处理,例如写入数据库、转发至其他主题,或进行实时监控与分析。

由于 EMQX Broker 作为 MQTT Broker 而非存储服务,并受安全隐私限制,默认不会保存任何消息传递的历史信息记录或丢弃记录,因此,建议在部署中使用消息发布与丢弃事件主题配置数据集成规则,可以很方便的实时存储和分析消息生命周期、消息可靠性及系统传输质量。这对于后续的性能优化、问题排查和审计分析都具有重要意义。

本页将介绍消息发布与丢弃事件主题的使用场景,演示如何通过规则引擎捕获事件主题消息并进行处理,再使用 MQTTX Desktop 模拟客户端接收事件主题消息。

应用场景

消息发布与订阅事件主题可用于多种业务与运维场景,包括:

  • 消息流追踪与链路监控:追踪消息从客户端发布到成功投递的全过程,帮助发现消息堆积、延迟、或投递失败的原因。
  • 业务统计与吞吐分析:统计每个客户端、每个主题的消息发布与投递数量,用于评估业务系统负载与用户行为。
  • 异常监测与告警:当出现消息未投递丢弃、消息投递过程中丢弃等异常时,可通过事件主题捕获信息并获取丢弃详情。
  • 安全审计与访问日志:记录哪个客户端发布了什么消息,消息是否被正常消费,帮助企业进行审计和合规分析。

触发条件与关键字段

消息在转发的过程中被丢弃事件 ("$events/message_dropped")

  • 触发条件:当一条消息无任何订阅者时触发。
  • 关键字段
字段解释
clientid发布消息的 Client ID
username发布消息的用户名
topic消息主题
reason消息丢弃原因
qos消息 QoS 等级
timestamp丢弃事件时间戳(单位:毫秒)
publish_received_atPUBLISH 消息到达 Broker 的时间 (单位:毫秒)
  • 常见丢弃原因
原因码解释
no_subscribers消息主题无任何订阅者。
receive_maximum_exceeded对于 QoS 2 信息的 awaiting_rel 队列已满。
packet_identifier_inuse收到了一个 QoS 2 消息,但它使用的 Packet ID(报文标识符)还没有被释放,仍在使用中。
  • 典型应用场景:监控消息未订阅被丢弃的情况。

消息在投递的过程中被丢弃事件 ("$events/delivery_dropped")

  • 触发条件:当订阅者的消息队列已满时触发规则。
  • 关键字段
字段解释
from_clientid发布消息的 Client ID
from_username发布消息的用户名
topic消息主题
reason消息丢弃原因
qos消息 QoS 等级
timestamp丢弃事件时间戳(单位:毫秒)
publish_received_atPUBLISH 消息到达 Broker 的时间 (单位:毫秒)
  • 常见丢弃原因
原因码解释
no_local不允许客户端接收自己发布的消息 (在 MQTT v5 中,若订阅时将 No Local 选项设置为 1,服务端将不会把该客户端发布的消息转发回给它自己)。
expired消息或者会话过期。
queue_full消息队列已满(QoS>0),通常因为订阅端消费能力不足,或订阅端 clean_start=false且离线时间过长。
qos0_msgQoS 0 的消息因为消息队列已满被丢弃。
  • 典型应用场景
    • 监控订阅端消费能力与离线情况
    • 辅助分析消息滞留或下发失败原因
    • 与 $events/message_dropped 搭配使用,完整追踪消息在上下行阶段的丢弃情况

消息投递成功事件 ("$events/message_delivered")

  • 触发条件:当 EMQX 成功将消息发送到订阅客户端时触发(仅表示 Broker 已发出消息,不代表客户端确认接收)。
  • 关键字段
字段解释
clientid消息目的 ID
from_clientid消息来源 Client ID
username消息目的 username
topic消息主题
qos消息 QoS 等级
timestamp事件触发时间 (单位:毫秒)
publish_received_atPUBLISH 消息到达 Broker 的时间 (单位:毫秒)
  • 典型应用场景
    • 统计消息投递成功率
    • 监控消息下发延迟
    • 可以与 $events/message_acked 结合分析消息确认时延(仅适用于 QoS1 和 QoS2 消息)

消息确认事件 ("$events/message_acked")

  • 触发条件:当客户端向 EMQX 发送 PUBACK 或 PUBCOMP 报文确认收到消息时触发,仅适用于 QoS 1 与 QoS 2 消息。
  • 关键字段
字段解释
idMQTT 消息 ID,有助于排查时延
clientid消息目的 ID
from_clientid消息来源 Client ID
username消息目的 username
topic消息主题
qos消息 QoS 等级
timestamp事件触发时间 (单位:毫秒)
publish_received_atPUBLISH 消息到达 Broker 的时间 (单位:毫秒)
  • 典型应用场景
    1. 计算消息从投递到确认的延迟时间
    2. 分析 QoS1/QoS2 消息的确认率
    3. 保障关键业务场景中消息的可靠传输

TIP

事件主题的完整字段信息,请参阅官方文档:客户端事件

配置数据集成获取丢弃事件主题消息

在实际使用中,事件主题通常有以下两种处理方式:

  1. 消息重发布:将事件主题消息重新发布到其他 MQTT 主题。这种方式轻量、实时性强,并且与 MQTT 原生生态兼容,适合在系统内部快速消费和处理事件。
  2. 转发至外部服务:将事件主题消息发送至外部系统,如数据库、消息队列或 HTTP 服务。该方式支持与各种外部系统集成,便于实现实时响应和持久化处理。

本节仅演示消息重发布的方法。如何使用转发至外部服务,参考将消息投递成功事件主题消息转发至 HTTP 服务

其他转发至不同数据库或外部服务的方式可参考官方文档:EMQX Cloud 数据集成

创建消息重发布规则和动作

本节将演示如何将消息在转发的过程中被丢弃事件主题消息重新发布至其他主题。

  1. 数据集成页面中的数据转发服务分类下点击消息重新发布。如果已经创建过其他的连接器,则点击新建连接器,然后在数据转发服务分类下选择消息重新发布
  2. SQL 编辑器中定义规则 SQL。如需排查客户端连接失败事件,可参考以下 SQL 示例:
SQL
SELECT
    client_id,
    reason,
    topic,
    qos,
    timestamp
FROM
    "$events/message_dropped"
  1. 点击下一步,添加动作。
  2. 创建动作步骤页中,配置以下信息:
    • 使用连接器:使用默认选项 消息重新发布。
    • 主题:设置目标主题为 message_dropped
    • PayloadQoSRetain: 使用默认值。
  3. 点击确定完成配置。

测试规则和动作

推荐使用 MQTTX 模拟客户端连接与消息上报,同时您也可以使用其他任意客户端完成。

  1. 使用 MQTTX 创建两个连接到部署,分别将 ClientID 设置为 pubsub
  2. 使用客户端 sub 订阅接收重发布消息的主题 message_dropped
  3. 使用客户端 pub 发布一条消息到主题 test。此时客户端 sub 应该能收到以下格式的事件主题消息:
json
{
    "topic": "test",
    "timestamp": 1761309999897,
    "reason": "no_subscribers",
    "qos": 1,
    "clientid":"pub"
}

这个事件的 payload 意味着由于没有对应的订阅者,所以这信息在到达 EMQX 后就直接被丢弃了,并没有进一步投递。

将消息投递成功事件主题消息转发至 HTTP 服务

本节将演示如何将消息投递成功事件主题消息转发至 HTTP 服务。开始之前,您需要创建 VPC 对等连接以通过内部网络 IP 访问目标连接器。或者开通 NAT 网关,通过公网 IP 访问目标连接器。

创建 HTTP 服务连接器

  1. 在部署菜单中选择数据集成,在 Web 服务分类下选择 HTTP 服务。如果已经创建过其他连接器,选择新建连接器,然后在 Web 服务 分类下选择 HTTP 服务
  2. 创建连接器页面中填写 URL,其他配置项可按需调整。URL 应指向将接收事件消息的目标 HTTP 服务。连接器会按照规则中定义的 payload 向该 URL 发送 POST 请求。
  3. 点击测试按钮测试连接,如果 HTTP 服务能够正常访问,则会返回成功提示。
  4. 点击新建按钮完成配置。

创建规则和动作

  1. 点击新建规则进入新建规则步骤页。
  2. SQL 编辑器 中定义规则 SQL。如需排查客户端断连事件,可参考以下 SQL 示例:
sql
SELECT
    from_clientid,
    clientid,
    username,
    payload,
    topic,
    qos
FROM
    "$events/message_delivered"
  1. 点击下一步开始创建动作。

  2. 使用连接器下拉框中选择之前创建的 HTTP Server 连接器。剩下的配置保持默认值。

  3. 点击确认按钮完成规则创建。

TIP

关于将数据转发至 HTTP 服务的完整流程与配置,可参考官方文档:将 MQTT 数据发送到 HTTP 服务

测试规则和动作

推荐使用 MQTTX 模拟客户端连接与消息上报,同时您也可以使用其他任意客户端完成。

  1. 使用 MQTTX 创建两个连接到部署,分别将 ClientID 设置为 pubsub
  2. 使用客户端 sub 订阅主题 test
  3. 使用客户端 pub 发布一条消息到主题 test
  4. 此时 HTTP 服务应该能收到 POST 请求,请求体原始内容的格式为:
json
{
    "username": "test",
    "topic": "test",
    "qos": 0,
    "payload": "Hello Event",
    "from_clientid": "pub",
    "clientid": "sub"
}

最佳实践建议

  • 对高频事件(如 message_delivered)启用合适的限流或通过规则 SQL 的 WHEN 子句进行条件过滤,防止产生过多事件记录。或者使用 Kafka 等外接系统用以承接。
  • 为每个事件主题/业务系统使用独立的规则和连接器,便于运维和隔离分析。
  • 配合客户端上下线事件主题一起使用,可完整监控设备在线与消息行为全流程。