使用数据集成获取客户端上下线事件主题消息
客户端上下线事件主题是 EMQX 用于通知客户端连接状态变化的系统主题,包括 $events/client_connected 和 $events/client_disconnected。普通客户端无法直接订阅这些主题消息,但可以通过规则引擎捕获它们,用于进一步处理,例如写入数据库、转发至其他主题,或进行实时监控与分析。
由于 EMQX Broker 作为 MQTT Broker 而非存储服务,并受安全隐私限制,默认不会保存客户端的历史状态记录或调试级别事件日志信息。因此,建议在部署中使用客户端上下线事件主题配置数据集成规则,以便实时掌握设备连接情况并保留必要的行为记录,这对于后续的问题排查和审计分析都具有重要意义。
本页将介绍客户端上下线事件主题的使用场景,演示如何通过规则引擎捕获事件主题消息并进行处理,再使用 MQTTX Desktop 模拟客户端接收事件主题消息。
应用场景
客户端上下线事件主题可用于多种场景,包括:
- 记录设备上下线历史:将客户端上下线事件存入外部数据库,方便追踪每台设备的连接历史和行为模式,为后续分析与统计提供支持。
- 监控设备在线状态与活跃度:实时掌握设备的在线情况,了解客户端活跃度,从而帮助业务系统进行动态调度或策略调整。
- 排查异常断连:通过分析断连事件中的断开原因和时间,辅助技术支持团队快速定位异常断连原因,从而及时解决问题。
- 安全审计与访问行为追踪:将上下线事件中的信息存储在日志分析平台,用于访问追踪、安全排查和合规审计。
触发条件与关键字段
客户端连接成功事件 ("$events/client_connected")
- 触发条件:客户端连接成功时触发。
- 关键字段:
| 字段 | 解释 |
|---|---|
| clientid | 成功连接的 Client ID |
| username | 成功连接的用户名 |
| peername | 客户端连接的 IP 地址和端口 |
| keepalive | MQTT 保活间隔 (客户端向 Broker 定期发送心跳消息的时间间隔) |
| clean_start | MQTT clean_start (客户端是否复用已存在的会话) |
| expiry_interval | MQTT Session 过期时间 (客户端会话在断开连接后应保留的最长时间) |
| connected_at | 客户端完成连接的时间戳 (单位:毫秒) |
客户端连接断开事件 ("$events/client_disconnected")
- 触发条件:客户端连接断开时触发。
- 关键字段:
| 字段 | 解释 |
|---|---|
| reason | 客户端连接断开原因 |
| clientid | 成功连接的 Client ID |
| username | 成功连接的用户名 |
| peername | 客户端连接的 IP 地址和端口 |
| disconnected_at | 客户端断开连接的时间戳 (单位:毫秒) |
TIP
事件主题的完整字段信息,请参阅官方文档:客户端事件。
连接断开场景分析
客户端连接断开事件主题中的 reason 字段提供了断连的具体原因,有助于深入理解连接问题并优化客户端连接逻辑。以下是所有断连原因的详细分析:
| 原因码 | 解释 |
|---|---|
| normal | 客户端主动断开连接,是最常规的断开场景,客户端通过正常的 MQTT 协议流程结束会话。 |
| kicked | 客户端被服务端踢出,通常是由于管理员通过 API 或控制台主动断开客户端会话连接。 |
| keepalive_timeout | 当客户端未能在规定的 Keepalive 时间内发送心跳(Pingreq)消息到 EMQX 时,EMQX 会主动断开客户端连接。常见原因包括: 1. 网络抖动:客户端与 EMQX 之间的网络不稳定,导致心跳消息丢包,抖动,延迟,未能及时到达EMQX。 2. 客户端心跳机制问题:客户端的心跳机制存在缺陷,未按规定定时发送 MQTT 心跳消息。 更多关于 Keepalive 的详细信息,请参阅 Keepalive 机制。 |
| not_authorized | 当客户端未通过认证或未获得足够的权限时,EMQX 会主动断开客户端连接。常见原因包括: 1. 认证失败:客户端提供了无效的 MQTT 认证信息,导致认证失败。 2. 权限不足:当配置了 acl_nomatch = disconnect 时(默认配置,即未匹配到访问权限时断开连接),客户端尝试进行没有权限的发布/订阅操作。 |
| tcp_closed | 对端关闭了网络连接。通常发生在客户端崩溃或网络异常时,客户端跳过 MQTT 协议交互:没有发送 MQTT DISCONNECT 报文,而是直接从底层主动断开 TCP 连接。比如,直接强制关闭客户端工具或设备断开网络时,运营商网络故障中断等,都会触发此原因。 |
| internal_error | 由于收到畸形报文或其他未知错误引起的客户端断开。这种情况一般需要通过检查报文内容、查看服务端日志,或联系 EMQX 技术支持团队来进一步定位和解决问题。 |
| discarded | 两个客户端使用相同的 ClientID 连接,且设置了 clean_start = true 时,EMQX 会丢弃已有会话并建立新的会话,并断开旧客户端的连接。clean_start = true 表示客户端希望每次连接都从零开始,不保留之前的会话状态。此时,当一个客户端使用相同 ClientID 连接时,EMQX 会认为这是一个全新的会话,从而删除客户端的历史订阅和消息队列,并用新连接替代旧会话。 |
| takenover | 两个客户端使用相同的 ClientID 连接,且设置了 clean_start = false 时,EMQX 会让新客户端沿用原来的会话,并断开旧客户端的连接。clean_start = false 表示客户端希望复用已有的会话,保留之前的会话状态。此时,当一个新的客户端使用相同 ClientID 连接时,EMQX 会将旧客户端的会话交给新客户端继续使用,同时断开旧客户端。 |
TIP
下面列出了两类常见的 ClientID 冲突断连的场景:
ClientID 冲突互挤:当多个不同的客户端使用相同的 ClientID 并配置了自动重连时,新的连接会不断挤掉旧会话,形成循环重连。此时客户端的 IP 和端口通常会变化。
建议:为每个客户端分配唯一的 ClientID。
客户端反复重连:如果客户端会话仍然存在,但客户端未识别到会话在线,客户端进程不断发起新的 MQTT CONNECT 报文,也会覆盖旧会话。此时 IP 通常不变,端口可能变化;在某些物联网网卡设备上,IP 和端口都可能变化。
建议:检查客户端连接逻辑。
配置数据集成获取事件主题消息
在实际使用中,事件主题通常有以下两种处理方式:
- 消息重发布:将事件主题消息重新发布到其他 MQTT 主题。这种方式轻量、实时性强,并且与 MQTT 原生生态兼容,适合在系统内部快速消费和处理事件。
- 转发至外部服务:将事件主题消息发送至外部系统,如数据库、消息队列或 HTTP 服务。该方式支持与各种外部系统集成,便于实现实时响应和持久化处理。
本页仅演示消息重发布和转发至 HTTP 服务的方法,其他转发至不同数据库或外部服务的方式可参考官方文档:EMQX Cloud 数据集成。
消息重发布
本节将演示如何将客户端上下线事件主题消息重新发布至其他主题。
创建规则和动作
- 在数据集成页面中的数据转发服务分类下点击消息重新发布。如果已经创建过其他的连接器,则点击新建连接器,然后在数据转发服务分类下选择消息重新发布。
- 在 SQL 编辑器中定义规则 SQL。如需排查客户端断连事件,可参考以下 SQL 示例:
SELECT
clientid,
username,
reason,
disconnected_at
FROM
"$events/client_disconnected"- 点击下一步,添加动作。
- 在创建动作步骤页中,配置以下信息:
- 使用连接器:使用默认选项 消息重新发布。
- 主题:设置目标主题为
client_disconnected。 - Payload、QoS 和 Retain: 使用默认值。
- 点击确定完成配置。
测试规则和动作
推荐使用 MQTTX 模拟客户端连接与消息上报,同时您也可以使用其他任意客户端完成。
- 使用 MQTTX 创建两个连接到部署,分别将 ClientID 设置为
conn和sub。 - 使用客户端
sub订阅接收重发布消息的主题client_disconnected。 - 将客户端
conn断开连接,此时客户端sub应该能收到以下格式的事件主题消息:
{
"clientid": "conn",
"username": "u_emqx",
"reason": "normal",
"disconnected_at": 1645003578536
}将事件主题消息转发至 HTTP 服务
本节将演示如何将客户端上下线事件主题消息转发至 HTTP 服务。开始之前,您需要创建 VPC 对等连接以通过内部网络 IP 访问目标连接器。或者开通 NAT 网关,通过公网 IP 访问目标连接器。
创建 HTTP Server 连接器
- 在部署菜单中选择数据集成,在 Web 服务分类下选择 HTTP 服务。如果已经创建过其他连接器,选择新建连接器,然后在 Web 服务分类下选择 HTTP 服务。
- 在创建连接器页面中填写 URL,其他配置项可按需调整。URL 应指向将接收事件消息的目标 HTTP 服务。连接器会按照规则中定义的 payload 向该 URL 发送 POST 请求。
- 点击测试按钮测试连接,如果 HTTP 服务能够正常访问,则会返回成功提示。
- 点击新建按钮完成配置。
创建规则和动作
- 点击新建规则进入新建规则步骤页。
- 在 SQL 编辑器中定义规则 SQL。如需排查客户端断连事件,可参考以下 SQL 示例:
SELECT
clientid,
username,
reason,
disconnected_at
FROM
"$events/client_disconnected"点击下一步开始创建动作。
从使用连接器下拉框中选择之前创建的 HTTP Server 连接器。剩下的配置保持默认值。
点击确认按钮完成规则创建。
TIP
关于将数据转发至 HTTP 服务的完整流程与配置,可参考官方文档:将 MQTT 数据发送到 HTTP 服务。
测试规则和动作
推荐使用 MQTTX 模拟客户端连接与消息上报,同时您也可以使用其他任意客户端完成。
- 使用 MQTTX 连接到部署,将 ClientID 设置为
conn。 - 将客户端
conn断开连接,此时 HTTP 服务应该能收到 POST 请求,请求体原始内容的格式为:
{
"clientid": "conn",
"username": "u_emqx",
"reason": "normal",
"disconnected_at": 1645003578536
}