系统主题与客户端事件订阅
EMQX Cloud 会通过 $SYS/ 系统主题发布 Broker 的客户端上下线和订阅事件等信息,例如:
$SYS/brokers/${node}/clients/${clientid}/connected$SYS 系统主题通常包含 Broker 节点相关信息,适用于 Broker 内部观测、兼容性适配以及运维诊断等场景。如果需要在 EMQX Cloud 中订阅系统主题,需要在默认 ACL 规则中添加相应授权,例如允许用户名 emqx 订阅 $SYS/# 主题。
生产业务集成建议使用 EMQX Cloud 数据集成捕获客户端事件,再通过"消息重新发布"转发到业务主题。EMQX Cloud 支持通过规则 SQL 筛选和处理数据,并将处理结果重新发布到指定 MQTT 主题。
本文介绍 EMQX Cloud 系统主题的使用方式,以及如何通过数据集成将客户端事件转发到业务主题,供业务系统订阅使用。
使用场景对比
系统主题适合用于 Broker 观测、兼容性适配和运维诊断。如果您的目标是在生产业务中捕获客户端上下线、订阅等事件并集成到业务系统,建议使用数据集成。
| 对比项 | $SYS 系统主题 | 数据集成 + 消息重新发布 |
|---|---|---|
| 使用场景 | Broker 观测、诊断、兼容性 | 业务集成、事件转发 |
| 主题结构 | 包含 ${node},与 Broker 内部节点名称绑定 | 可重发布到自定义业务主题 |
| 数据处理 | 客户端自行解析原始消息 | 可在规则中统一过滤、转换、脱敏 |
| 安全边界 | 可能暴露 Broker 内部节点信息 | 可对业务主题单独设置访问控制 |
| 扩展能力 | 仅用于订阅观察 | 可同时转发到 MQTT 主题或外部系统 |
系统主题
系统主题格式为 $SYS/brokers/${node}/...,其中 ${node} 是 Broker 节点名称(例如 emqx@127.0.0.1)。
客户端事件主题
以下是与客户端状态相关的系统主题:
| 事件类型 | 系统主题 |
|---|---|
| 客户端上线 | $SYS/brokers/${node}/clients/${clientid}/connected |
| 客户端下线 | $SYS/brokers/${node}/clients/${clientid}/disconnected |
| 客户端订阅 | $SYS/brokers/${node}/clients/${clientid}/subscribed |
| 客户端取消订阅 | $SYS/brokers/${node}/clients/${clientid}/unsubscribed |
提示
以上四类事件的系统主题默认开启。客户端如需订阅,需要在默认 ACL 规则中添加相应授权,例如允许指定用户名订阅 $SYS/# 主题。
通过数据集成捕获客户端事件(推荐)
数据集成提供了内置的事件主题,以 $events/ 开头,比系统主题更稳定,不与节点名称绑定,且支持在规则中处理数据后转发到自定义业务主题。默认情况下,客户端无法直接订阅这些事件消息,需要通过数据集成规则来处理和转发。各事件可用字段的完整说明,参阅 SQL 数据源和字段 - 客户端事件。
系统主题与数据集成事件主题对照
| 事件类型 | $SYS 系统主题 | 数据集成事件主题 |
|---|---|---|
| 客户端上线 | $SYS/brokers/${node}/clients/${clientid}/connected | $events/client/connected |
| 客户端下线 | $SYS/brokers/${node}/clients/${clientid}/disconnected | $events/client/disconnected |
| 客户端订阅 | $SYS/brokers/${node}/clients/${clientid}/subscribed | $events/session/subscribed |
| 客户端取消订阅 | $SYS/brokers/${node}/clients/${clientid}/unsubscribed | $events/session/unsubscribed |
提示
新配置建议使用新的命名空间事件主题,例如 $events/client/connected、$events/session/subscribed。旧事件主题仍兼容,但不建议在新规则中继续使用。
推荐架构
将事件通过数据集成规则重新发布到业务主题,业务客户端订阅业务主题,不直接依赖系统主题:
$events/client/# 或 $events/session/#
→ 数据集成规则 SQL(过滤、转换)
→ 消息重新发布动作
→ 业务主题(例如 iot/events/client/connected)
→ 业务客户端订阅配置数据集成转发客户端事件
下面以四种常见的客户端事件为例,说明如何配置规则和消息重新发布动作。
客户端上线事件
事件主题
$events/client/connected规则 SQL
SELECT
clientid,
username,
peername,
proto_name,
proto_ver,
keepalive,
clean_start,
expiry_interval,
connected_at,
timestamp,
node
FROM
"$events/client/connected"建议重发布主题
iot/events/client/connectedPayload 模板
{
"event": "client.connected",
"clientid": "${clientid}",
"username": "${username}",
"peername": "${peername}",
"proto_name": "${proto_name}",
"proto_ver": ${proto_ver},
"keepalive": ${keepalive},
"clean_start": ${clean_start},
"expiry_interval": ${expiry_interval},
"connected_at": ${connected_at},
"timestamp": ${timestamp},
"node": "${node}"
}客户端下线事件
事件主题
$events/client/disconnected规则 SQL
SELECT
clientid,
username,
reason,
peername,
connected_at,
disconnected_at,
timestamp,
node
FROM
"$events/client/disconnected"建议重发布主题
iot/events/client/disconnectedPayload 模板
{
"event": "client.disconnected",
"clientid": "${clientid}",
"username": "${username}",
"reason": "${reason}",
"peername": "${peername}",
"connected_at": ${connected_at},
"disconnected_at": ${disconnected_at},
"timestamp": ${timestamp},
"node": "${node}"
}客户端订阅事件
事件主题
$events/session/subscribed规则 SQL
SELECT
clientid,
username,
topic,
qos,
sub_props,
timestamp,
node
FROM
"$events/session/subscribed"建议重发布主题
iot/events/session/subscribedPayload 模板
{
"event": "session.subscribed",
"clientid": "${clientid}",
"username": "${username}",
"topic": "${topic}",
"qos": ${qos},
"sub_props": ${sub_props},
"timestamp": ${timestamp},
"node": "${node}"
}客户端取消订阅事件
事件主题
$events/session/unsubscribed规则 SQL
SELECT
clientid,
username,
topic,
qos,
unsub_props,
timestamp,
node
FROM
"$events/session/unsubscribed"建议重发布主题
iot/events/session/unsubscribedPayload 模板
{
"event": "session.unsubscribed",
"clientid": "${clientid}",
"username": "${username}",
"topic": "${topic}",
"qos": ${qos},
"unsub_props": ${unsub_props},
"timestamp": ${timestamp},
"node": "${node}"
}创建数据集成规则
在 EMQX Cloud 控制台中按以下步骤创建规则并添加消息重新发布动作:
- 进入部署的数据集成页面。
- 在数据转发分类下选择消息重新发布,点击创建。
- 在规则 SQL 输入框中填写对应的规则 SQL。
- 点击下一步,添加消息重新发布动作。
- 填写目标业务主题、Payload 模板,以及 QoS、Retain 等参数。
- 点击确认保存,并使用 SQL 测试验证规则是否按预期运行。
TIP
如果需要避免重发布的消息再次触发同一条规则,可以在动作配置中启用直接派发。启用后,消息会直接发送给订阅者,不经过规则引擎,从而避免循环触发。
最佳实践
- 使用
$events/client/#和$events/session/#作为客户端连接、断开、订阅、取消订阅等事件的统一来源。 - 通过数据集成规则对事件进行过滤、转换和脱敏处理。
- 将事件重发布到稳定的业务主题,例如
iot/events/client/+、iot/events/session/+,避免业务系统直接依赖$SYS/brokers/${node}/...等内部主题结构。 - 业务客户端仅订阅业务主题,并为其配置独立 ACL 规则,实现访问隔离与权限控制。
- 按需将事件写入外部系统,用于审计、查询、监控和告警等场景。