Skip to content

使用数据集成获取客户端上下线事件主题消息

客户端上下线事件主题是 EMQX 用于通知客户端连接状态变化的系统主题,包括 $events/client_connected$events/client_disconnected。普通客户端无法直接订阅这些主题消息,但可以通过规则引擎捕获它们,用于进一步处理,例如写入数据库、转发至其他主题,或进行实时监控与分析。

由于 EMQX Broker 作为 MQTT Broker 而非存储服务,并受安全隐私限制,默认不会保存客户端的历史状态记录或调试级别事件日志信息。因此,建议在部署中使用客户端上下线事件主题配置数据集成规则,以便实时掌握设备连接情况并保留必要的行为记录,这对于后续的问题排查和审计分析都具有重要意义。

本页将介绍客户端上下线事件主题的使用场景,演示如何通过规则引擎捕获事件主题消息并进行处理,再使用 MQTTX Desktop 模拟客户端接收事件主题消息。

应用场景

客户端上下线事件主题可用于多种场景,包括:

  • 记录设备上下线历史:将客户端上下线事件存入外部数据库,方便追踪每台设备的连接历史和行为模式,为后续分析与统计提供支持。
  • 监控设备在线状态与活跃度:实时掌握设备的在线情况,了解客户端活跃度,从而帮助业务系统进行动态调度或策略调整。
  • 排查异常断连:通过分析断连事件中的断开原因和时间,辅助技术支持团队快速定位异常断连原因,从而及时解决问题。
  • 安全审计与访问行为追踪:将上下线事件中的信息存储在日志分析平台,用于访问追踪、安全排查和合规审计。

触发条件与关键字段

客户端连接成功事件 ("$events/client_connected")

  • 触发条件:客户端连接成功时触发。
  • 关键字段
字段解释
clientid成功连接的 Client ID
username成功连接的用户名
peername客户端连接的 IP 地址和端口
keepaliveMQTT 保活间隔 (客户端向 Broker 定期发送心跳消息的时间间隔)
clean_startMQTT clean_start (客户端是否复用已存在的会话)
expiry_intervalMQTT Session 过期时间 (客户端会话在断开连接后应保留的最长时间)
connected_at客户端完成连接的时间戳 (单位:毫秒)

客户端连接断开事件 ("$events/client_disconnected")

  • 触发条件:客户端连接断开时触发。
  • 关键字段
字段解释
reason客户端连接断开原因
clientid成功连接的 Client ID
username成功连接的用户名
peername客户端连接的 IP 地址和端口
disconnected_at客户端断开连接的时间戳 (单位:毫秒)

TIP

事件主题的完整字段信息,请参阅官方文档:客户端事件

连接断开场景分析

客户端连接断开事件主题中的 reason 字段提供了断连的具体原因,有助于深入理解连接问题并优化客户端连接逻辑。以下是所有断连原因的详细分析:

原因码解释
normal客户端主动断开连接,是最常规的断开场景,客户端通过正常的 MQTT 协议流程结束会话。
kicked客户端被服务端踢出,通常是由于管理员通过 API 或控制台主动断开客户端会话连接。
keepalive_timeout当客户端未能在规定的 Keepalive 时间内发送心跳(Pingreq)消息到 EMQX 时,EMQX 会主动断开客户端连接。常见原因包括:
1. 网络抖动:客户端与 EMQX 之间的网络不稳定,导致心跳消息丢包,抖动,延迟,未能及时到达EMQX。
2. 客户端心跳机制问题:客户端的心跳机制存在缺陷,未按规定定时发送 MQTT 心跳消息。
更多关于 Keepalive 的详细信息,请参阅 Keepalive 机制
not_authorized当客户端未通过认证或未获得足够的权限时,EMQX 会主动断开客户端连接。常见原因包括:
1. 认证失败:客户端提供了无效的 MQTT 认证信息,导致认证失败。
2. 权限不足:当配置了 acl_nomatch = disconnect 时(默认配置,即未匹配到访问权限时断开连接),客户端尝试进行没有权限的发布/订阅操作。
tcp_closed对端关闭了网络连接。通常发生在客户端崩溃或网络异常时,客户端跳过 MQTT 协议交互:没有发送 MQTT DISCONNECT 报文,而是直接从底层主动断开 TCP 连接。比如,直接强制关闭客户端工具或设备断开网络时,运营商网络故障中断等,都会触发此原因。
internal_error由于收到畸形报文或其他未知错误引起的客户端断开。这种情况一般需要通过检查报文内容、查看服务端日志,或联系 EMQX 技术支持团队来进一步定位和解决问题。
discarded两个客户端使用相同的 ClientID 连接,且设置了 clean_start = true 时,EMQX 会丢弃已有会话并建立新的会话,并断开旧客户端的连接。
clean_start = true 表示客户端希望每次连接都从零开始,不保留之前的会话状态。此时,当一个客户端使用相同 ClientID 连接时,EMQX 会认为这是一个全新的会话,从而删除客户端的历史订阅和消息队列,并用新连接替代旧会话。
takenover两个客户端使用相同的 ClientID 连接,且设置了 clean_start = false 时,EMQX 会让新客户端沿用原来的会话,并断开旧客户端的连接。
clean_start = false 表示客户端希望复用已有的会话,保留之前的会话状态。此时,当一个新的客户端使用相同 ClientID 连接时,EMQX 会将旧客户端的会话交给新客户端继续使用,同时断开旧客户端。

TIP

下面列出了两类常见的 ClientID 冲突断连的场景:

  1. ClientID 冲突互挤:当多个不同的客户端使用相同的 ClientID 并配置了自动重连时,新的连接会不断挤掉旧会话,形成循环重连。此时客户端的 IP 和端口通常会变化。

    建议:为每个客户端分配唯一的 ClientID。

  2. 客户端反复重连:如果客户端会话仍然存在,但客户端未识别到会话在线,客户端进程不断发起新的 MQTT CONNECT 报文,也会覆盖旧会话。此时 IP 通常不变,端口可能变化;在某些物联网网卡设备上,IP 和端口都可能变化。

    建议:检查客户端连接逻辑。

配置数据集成获取事件主题消息

在实际使用中,事件主题通常有以下两种处理方式:

  1. 消息重发布:将事件主题消息重新发布到其他 MQTT 主题。这种方式轻量、实时性强,并且与 MQTT 原生生态兼容,适合在系统内部快速消费和处理事件。
  2. 转发至外部服务:将事件主题消息发送至外部系统,如数据库、消息队列或 HTTP 服务。该方式支持与各种外部系统集成,便于实现实时响应和持久化处理。

本页仅演示消息重发布和转发至 HTTP 服务的方法,其他转发至不同数据库或外部服务的方式可参考官方文档:EMQX Cloud 数据集成

消息重发布

本节将演示如何将客户端上下线事件主题消息重新发布至其他主题。

创建规则和动作

  1. 数据集成页面中的数据转发服务分类下点击消息重新发布。如果已经创建过其他的连接器,则点击新建连接器,然后在数据转发服务分类下选择消息重新发布
  2. SQL 编辑器中定义规则 SQL。如需排查客户端断连事件,可参考以下 SQL 示例:
SQL
SELECT
    clientid, 
    username, 
    reason, 
    disconnected_at
FROM
    "$events/client_disconnected"
  1. 点击下一步,添加动作。
  2. 创建动作步骤页中,配置以下信息:
    • 使用连接器:使用默认选项 消息重新发布。
    • 主题:设置目标主题为 client_disconnected
    • PayloadQoSRetain: 使用默认值。
  3. 点击确定完成配置。

测试规则和动作

推荐使用 MQTTX 模拟客户端连接与消息上报,同时您也可以使用其他任意客户端完成。

  1. 使用 MQTTX 创建两个连接到部署,分别将 ClientID 设置为 connsub
  2. 使用客户端 sub 订阅接收重发布消息的主题 client_disconnected
  3. 将客户端 conn 断开连接,此时客户端 sub 应该能收到以下格式的事件主题消息:
json
{ 
    "clientid": "conn", 
    "username": "u_emqx", 
    "reason": "normal", 
    "disconnected_at": 1645003578536
}

将事件主题消息转发至 HTTP 服务

本节将演示如何将客户端上下线事件主题消息转发至 HTTP 服务。开始之前,您需要创建 VPC 对等连接以通过内部网络 IP 访问目标连接器。或者开通 NAT 网关,通过公网 IP 访问目标连接器。

创建 HTTP Server 连接器

  1. 在部署菜单中选择数据集成,在 Web 服务分类下选择 HTTP 服务。如果已经创建过其他连接器,选择新建连接器,然后在 Web 服务分类下选择 HTTP 服务
  2. 创建连接器页面中填写 URL,其他配置项可按需调整。URL 应指向将接收事件消息的目标 HTTP 服务。连接器会按照规则中定义的 payload 向该 URL 发送 POST 请求。
  3. 点击测试按钮测试连接,如果 HTTP 服务能够正常访问,则会返回成功提示。
  4. 点击新建按钮完成配置。

创建规则和动作

  1. 点击新建规则进入新建规则步骤页。
  2. SQL 编辑器中定义规则 SQL。如需排查客户端断连事件,可参考以下 SQL 示例:
sql
SELECT
    clientid, 
    username, 
    reason, 
    disconnected_at
FROM 
    "$events/client_disconnected"
  1. 点击下一步开始创建动作。

  2. 使用连接器下拉框中选择之前创建的 HTTP Server 连接器。剩下的配置保持默认值。

  3. 点击确认按钮完成规则创建。

TIP

关于将数据转发至 HTTP 服务的完整流程与配置,可参考官方文档:将 MQTT 数据发送到 HTTP 服务

测试规则和动作

推荐使用 MQTTX 模拟客户端连接与消息上报,同时您也可以使用其他任意客户端完成。

  1. 使用 MQTTX 连接到部署,将 ClientID 设置为 conn
  2. 将客户端 conn 断开连接,此时 HTTP 服务应该能收到 POST 请求,请求体原始内容的格式为:
json
{ 
    "clientid": "conn", 
    "username": "u_emqx", 
    "reason": "normal", 
    "disconnected_at": 1645003578536
}