Skip to content

系统主题与客户端事件订阅

EMQX Cloud 会通过 $SYS/ 系统主题发布 Broker 的客户端上下线和订阅事件等信息,例如:

$SYS/brokers/${node}/clients/${clientid}/connected

$SYS 系统主题通常包含 Broker 节点相关信息,适用于 Broker 内部观测、兼容性适配以及运维诊断等场景。如果需要在 EMQX Cloud 中订阅系统主题,需要在默认 ACL 规则中添加相应授权,例如允许用户名 emqx 订阅 $SYS/# 主题。

生产业务集成建议使用 EMQX Cloud 数据集成捕获客户端事件,再通过"消息重新发布"转发到业务主题。EMQX Cloud 支持通过规则 SQL 筛选和处理数据,并将处理结果重新发布到指定 MQTT 主题。

本文介绍 EMQX Cloud 系统主题的使用方式,以及如何通过数据集成将客户端事件转发到业务主题,供业务系统订阅使用。

使用场景对比

系统主题适合用于 Broker 观测、兼容性适配和运维诊断。如果您的目标是在生产业务中捕获客户端上下线、订阅等事件并集成到业务系统,建议使用数据集成。

对比项$SYS 系统主题数据集成 + 消息重新发布
使用场景Broker 观测、诊断、兼容性业务集成、事件转发
主题结构包含 ${node},与 Broker 内部节点名称绑定可重发布到自定义业务主题
数据处理客户端自行解析原始消息可在规则中统一过滤、转换、脱敏
安全边界可能暴露 Broker 内部节点信息可对业务主题单独设置访问控制
扩展能力仅用于订阅观察可同时转发到 MQTT 主题或外部系统

系统主题

系统主题格式为 $SYS/brokers/${node}/...,其中 ${node} 是 Broker 节点名称(例如 emqx@127.0.0.1)。

客户端事件主题

以下是与客户端状态相关的系统主题:

事件类型系统主题
客户端上线$SYS/brokers/${node}/clients/${clientid}/connected
客户端下线$SYS/brokers/${node}/clients/${clientid}/disconnected
客户端订阅$SYS/brokers/${node}/clients/${clientid}/subscribed
客户端取消订阅$SYS/brokers/${node}/clients/${clientid}/unsubscribed

提示

以上四类事件的系统主题默认开启。客户端如需订阅,需要在默认 ACL 规则中添加相应授权,例如允许指定用户名订阅 $SYS/# 主题。

通过数据集成捕获客户端事件(推荐)

数据集成提供了内置的事件主题,以 $events/ 开头,比系统主题更稳定,不与节点名称绑定,且支持在规则中处理数据后转发到自定义业务主题。默认情况下,客户端无法直接订阅这些事件消息,需要通过数据集成规则来处理和转发。各事件可用字段的完整说明,参阅 SQL 数据源和字段 - 客户端事件

系统主题与数据集成事件主题对照

事件类型$SYS 系统主题数据集成事件主题
客户端上线$SYS/brokers/${node}/clients/${clientid}/connected$events/client/connected
客户端下线$SYS/brokers/${node}/clients/${clientid}/disconnected$events/client/disconnected
客户端订阅$SYS/brokers/${node}/clients/${clientid}/subscribed$events/session/subscribed
客户端取消订阅$SYS/brokers/${node}/clients/${clientid}/unsubscribed$events/session/unsubscribed

提示

新配置建议使用新的命名空间事件主题,例如 $events/client/connected$events/session/subscribed。旧事件主题仍兼容,但不建议在新规则中继续使用。

推荐架构

将事件通过数据集成规则重新发布到业务主题,业务客户端订阅业务主题,不直接依赖系统主题:

$events/client/#  或  $events/session/#
  → 数据集成规则 SQL(过滤、转换)
  → 消息重新发布动作
  → 业务主题(例如 iot/events/client/connected)
  → 业务客户端订阅

配置数据集成转发客户端事件

下面以四种常见的客户端事件为例,说明如何配置规则和消息重新发布动作。

客户端上线事件

事件主题

$events/client/connected

规则 SQL

sql
SELECT
  clientid,
  username,
  peername,
  proto_name,
  proto_ver,
  keepalive,
  clean_start,
  expiry_interval,
  connected_at,
  timestamp,
  node
FROM
  "$events/client/connected"

建议重发布主题

iot/events/client/connected

Payload 模板

json
{
  "event": "client.connected",
  "clientid": "${clientid}",
  "username": "${username}",
  "peername": "${peername}",
  "proto_name": "${proto_name}",
  "proto_ver": ${proto_ver},
  "keepalive": ${keepalive},
  "clean_start": ${clean_start},
  "expiry_interval": ${expiry_interval},
  "connected_at": ${connected_at},
  "timestamp": ${timestamp},
  "node": "${node}"
}

客户端下线事件

事件主题

$events/client/disconnected

规则 SQL

sql
SELECT
  clientid,
  username,
  reason,
  peername,
  connected_at,
  disconnected_at,
  timestamp,
  node
FROM
  "$events/client/disconnected"

建议重发布主题

iot/events/client/disconnected

Payload 模板

json
{
  "event": "client.disconnected",
  "clientid": "${clientid}",
  "username": "${username}",
  "reason": "${reason}",
  "peername": "${peername}",
  "connected_at": ${connected_at},
  "disconnected_at": ${disconnected_at},
  "timestamp": ${timestamp},
  "node": "${node}"
}

客户端订阅事件

事件主题

$events/session/subscribed

规则 SQL

sql
SELECT
  clientid,
  username,
  topic,
  qos,
  sub_props,
  timestamp,
  node
FROM
  "$events/session/subscribed"

建议重发布主题

iot/events/session/subscribed

Payload 模板

json
{
  "event": "session.subscribed",
  "clientid": "${clientid}",
  "username": "${username}",
  "topic": "${topic}",
  "qos": ${qos},
  "sub_props": ${sub_props},
  "timestamp": ${timestamp},
  "node": "${node}"
}

客户端取消订阅事件

事件主题

$events/session/unsubscribed

规则 SQL

sql
SELECT
  clientid,
  username,
  topic,
  qos,
  unsub_props,
  timestamp,
  node
FROM
  "$events/session/unsubscribed"

建议重发布主题

iot/events/session/unsubscribed

Payload 模板

json
{
  "event": "session.unsubscribed",
  "clientid": "${clientid}",
  "username": "${username}",
  "topic": "${topic}",
  "qos": ${qos},
  "unsub_props": ${unsub_props},
  "timestamp": ${timestamp},
  "node": "${node}"
}

创建数据集成规则

在 EMQX Cloud 控制台中按以下步骤创建规则并添加消息重新发布动作:

  1. 进入部署的数据集成页面。
  2. 数据转发分类下选择消息重新发布,点击创建
  3. 在规则 SQL 输入框中填写对应的规则 SQL。
  4. 点击下一步,添加消息重新发布动作。
  5. 填写目标业务主题、Payload 模板,以及 QoS、Retain 等参数。
  6. 点击确认保存,并使用 SQL 测试验证规则是否按预期运行。

TIP

如果需要避免重发布的消息再次触发同一条规则,可以在动作配置中启用直接派发。启用后,消息会直接发送给订阅者,不经过规则引擎,从而避免循环触发。

最佳实践

  • 使用 $events/client/#$events/session/# 作为客户端连接、断开、订阅、取消订阅等事件的统一来源。
  • 通过数据集成规则对事件进行过滤、转换和脱敏处理。
  • 将事件重发布到稳定的业务主题,例如 iot/events/client/+iot/events/session/+,避免业务系统直接依赖 $SYS/brokers/${node}/... 等内部主题结构。
  • 业务客户端仅订阅业务主题,并为其配置独立 ACL 规则,实现访问隔离与权限控制。
  • 按需将事件写入外部系统,用于审计、查询、监控和告警等场景。