# 使用数据集成获取客户端上下线事件主题消息

**客户端上下线事件主题**是 EMQX 用于通知客户端连接状态变化的系统主题，包括 `$events/client_connected` 和 `$events/client_disconnected`。普通客户端无法直接订阅这些主题消息，但可以通过规则引擎捕获它们，用于进一步处理，例如写入数据库、转发至其他主题，或进行实时监控与分析。

由于 EMQX Broker 作为 MQTT Broker 而非存储服务，并受安全隐私限制，默认不会保存客户端的历史状态记录或调试级别事件日志信息。因此，建议在部署中使用客户端上下线事件主题配置**数据集成规则**，以便实时掌握设备连接情况并保留必要的行为记录，这对于后续的问题排查和审计分析都具有重要意义。

本页将介绍客户端上下线事件主题的使用场景，演示如何通过规则引擎捕获事件主题消息并进行处理，再使用 [MQTTX Desktop](https://mqttx.app/zh) 模拟客户端接收事件主题消息。

## 应用场景

客户端上下线事件主题可用于多种场景，包括：
- **记录设备上下线历史**：将客户端上下线事件存入外部数据库，方便追踪每台设备的连接历史和行为模式，为后续分析与统计提供支持。
- **监控设备在线状态与活跃度**：实时掌握设备的在线情况，了解客户端活跃度，从而帮助业务系统进行动态调度或策略调整。
- **排查异常断连**：通过分析断连事件中的断开原因和时间，辅助技术支持团队快速定位异常断连原因，从而及时解决问题。
- **安全审计与访问行为追踪**：将上下线事件中的信息存储在日志分析平台，用于访问追踪、安全排查和合规审计。

## 触发条件与关键字段

### 客户端连接成功事件 ("$events/client\_connected")

- **触发条件**：客户端连接成功时触发。
- **关键字段**：

| 字段            | 解释                                                           |
|-----------------|----------------------------------------------------------------|
| clientid        | 成功连接的 Client ID                                           |
| username        | 成功连接的用户名                                               |
| peername        | 客户端连接的 IP 地址和端口                                     |
| keepalive       | MQTT 保活间隔 (客户端向 Broker 定期发送心跳消息的时间间隔)     |
| clean_start     | MQTT clean_start (客户端是否复用已存在的会话)                  |
| expiry_interval | MQTT Session 过期时间 (客户端会话在断开连接后应保留的最长时间) |
| connected_at    | 客户端完成连接的时间戳 (单位：毫秒)                            |

### 客户端连接断开事件 ("$events/client\_disconnected")

- **触发条件**：客户端连接断开时触发。
- **关键字段**：

| 字段            | 解释                                                           |
|-----------------|----------------------------------------------------------------|
| reason          | 客户端连接断开原因                                    |
| clientid        | 成功连接的 Client ID                                           |
| username        | 成功连接的用户名                                               |
| peername        | 客户端连接的 IP 地址和端口                                     |
| disconnected_at | 客户端断开连接的时间戳 (单位：毫秒)                            |

::: tip

事件主题的完整字段信息，请参阅官方文档：[客户端事件](../rule_engine/rule_engine_events.md#客户端事件)。

:::

## 连接断开场景分析

客户端连接断开事件主题中的 `reason` 字段提供了断连的具体原因，有助于深入理解连接问题并优化客户端连接逻辑。以下是所有断连原因的详细分析：

| 原因码  | 解释                                                            |
| ------ | -------------------------------------------------------------- |
| normal | 客户端主动断开连接，是最常规的断开场景，客户端通过正常的 MQTT 协议流程结束会话。 |
| kicked | 客户端被服务端踢出，通常是由于管理员通过 API 或控制台主动断开客户端会话连接。 |
| keepalive_timeout | 当客户端未能在规定的 Keepalive 时间内发送心跳（Pingreq）消息到 EMQX 时，EMQX 会主动断开客户端连接。常见原因包括：<br>1. **网络抖动**：客户端与 EMQX 之间的网络不稳定，导致心跳消息丢包，抖动，延迟，未能及时到达EMQX。<br>2. **客户端心跳机制问题**：客户端的心跳机制存在缺陷，未按规定定时发送 MQTT 心跳消息。<br>更多关于 Keepalive 的详细信息，请参阅 [Keepalive 机制](https://www.emqx.com/zh/blog/mqtt-keep-alive)。 |
| not_authorized | 当客户端未通过认证或未获得足够的权限时，EMQX 会主动断开客户端连接。常见原因包括：<br>1. **认证失败**：客户端提供了无效的 MQTT 认证信息，导致认证失败。<br>2. **权限不足**：当配置了 `acl_nomatch = disconnect` 时（默认配置，即未匹配到访问权限时断开连接），客户端尝试进行没有权限的发布/订阅操作。 |
| tcp_closed | 对端关闭了网络连接。通常发生在客户端崩溃或网络异常时，客户端跳过 MQTT 协议交互：没有发送 MQTT DISCONNECT 报文，而是直接从底层主动断开 TCP 连接。比如，直接强制关闭客户端工具或设备断开网络时，运营商网络故障中断等，都会触发此原因。 |
| internal_error | 由于收到畸形报文或其他未知错误引起的客户端断开。这种情况一般需要通过检查报文内容、查看服务端日志，或联系 EMQX 技术支持团队来进一步定位和解决问题。 |
| discarded | 两个客户端使用相同的 ClientID 连接，且设置了 `clean_start = true` 时，EMQX 会丢弃已有会话并建立新的会话，并断开旧客户端的连接。<br> `clean_start = true` 表示客户端希望每次连接都从零开始，不保留之前的会话状态。此时，当一个客户端使用相同 ClientID 连接时，EMQX 会认为这是一个全新的会话，从而删除客户端的历史订阅和消息队列，并用新连接替代旧会话。 |
| takenover | 两个客户端使用相同的 ClientID 连接，且设置了 `clean_start = false` 时，EMQX 会让新客户端沿用原来的会话，并断开旧客户端的连接。<br> `clean_start = false` 表示客户端希望复用已有的会话，保留之前的会话状态。此时，当一个新的客户端使用相同 ClientID 连接时，EMQX 会将旧客户端的会话交给新客户端继续使用，同时断开旧客户端。 |

::: tip

下面列出了两类常见的 ClientID 冲突断连的场景：

1. **ClientID 冲突互挤**：当多个不同的客户端使用相同的 ClientID 并配置了自动重连时，新的连接会不断挤掉旧会话，形成循环重连。此时客户端的 IP 和端口通常会变化。

    **建议**：为每个客户端分配唯一的 ClientID。
    
2. **客户端反复重连**：如果客户端会话仍然存在，但客户端未识别到会话在线，客户端进程不断发起新的 MQTT CONNECT 报文，也会覆盖旧会话。此时 IP 通常不变，端口可能变化；在某些物联网网卡设备上，IP 和端口都可能变化。

    **建议**：检查客户端连接逻辑。
:::

## 配置数据集成获取事件主题消息

在实际使用中，事件主题通常有以下两种处理方式：

1. **消息重发布**：将事件主题消息重新发布到其他 MQTT 主题。这种方式轻量、实时性强，并且与 MQTT 原生生态兼容，适合在系统内部快速消费和处理事件。
2. **转发至外部服务**：将事件主题消息发送至外部系统，如数据库、消息队列或 HTTP 服务。该方式支持与各种外部系统集成，便于实现实时响应和持久化处理。

本页仅演示消息重发布和转发至 HTTP 服务的方法，其他转发至不同数据库或外部服务的方式可参考官方文档：[EMQX Cloud 数据集成](../data_integration/introduction.md)。

### 消息重发布

本节将演示如何将客户端上下线事件主题消息重新发布至其他主题。

#### 创建规则和动作

1. 在**数据集成**页面中的**数据转发**服务分类下点击**消息重新发布**。如果已经创建过其他的连接器，则点击**新建连接器**，然后在**数据转发**服务分类下选择**消息重新发布**。
2. 在 **SQL 编辑器**中定义规则 SQL。如需排查客户端断连事件，可参考以下 SQL 示例：
``` SQL
SELECT
    clientid, 
    username, 
    reason, 
    disconnected_at
FROM
    "$events/client_disconnected"
```
3. 点击**下一步**，添加动作。 
4. 在**创建动作**步骤页中，配置以下信息：
    - **使用连接器**：使用默认选项 消息重新发布。
    - **主题**：设置目标主题为 `client_disconnected`。
    - **Payload**、**QoS** 和 **Retain**： 使用默认值。
5. 点击**确定**完成配置。
   
#### 测试规则和动作

推荐使用 [MQTTX](https://mqttx.app/) 模拟客户端连接与消息上报，同时您也可以使用其他任意客户端完成。

1. 使用 MQTTX 创建两个连接到部署，分别将 ClientID 设置为 `conn` 和 `sub`。
2. 使用客户端 `sub` 订阅接收重发布消息的主题 `client_disconnected`。
3. 将客户端 `conn` 断开连接，此时客户端 `sub` 应该能收到以下格式的事件主题消息：
```json
{ 
    "clientid": "conn", 
    "username": "u_emqx", 
    "reason": "normal", 
    "disconnected_at": 1645003578536
}
```

### 将事件主题消息转发至 HTTP 服务

本节将演示如何将客户端上下线事件主题消息转发至 HTTP 服务。开始之前，您需要创建 [VPC 对等连接](../deployments/vpc_peering.md)以通过内部网络 IP 访问目标连接器。或者开通 [NAT 网关](../vas/nat-gateway.md)，通过公网 IP 访问目标连接器。

#### 创建 HTTP Server 连接器

1. 在部署菜单中选择**数据集成**，在 **Web 服务**分类下选择 **HTTP 服务**。如果已经创建过其他连接器，选择**新建连接器**，然后在 **Web 服务**分类下选择 **HTTP 服务**。
2. 在**创建连接器**页面中填写 URL，其他配置项可按需调整。URL 应指向将接收事件消息的目标 HTTP 服务。连接器会按照规则中定义的 payload 向该 URL 发送 POST 请求。
3. 点击**测试**按钮测试连接，如果 HTTP 服务能够正常访问，则会返回成功提示。
4. 点击**新建**按钮完成配置。
   
#### 创建规则和动作

1. 点击**新建规则**进入新建规则步骤页。
2. 在 **SQL 编辑器**中定义规则 SQL。如需排查客户端断连事件，可参考以下 SQL 示例：
   
```sql
SELECT
    clientid, 
    username, 
    reason, 
    disconnected_at
FROM 
    "$events/client_disconnected"
```
3. 点击**下一步**开始创建动作。 

4. 从**使用连接器**下拉框中选择之前创建的 HTTP Server 连接器。剩下的配置保持默认值。

5. 点击**确认**按钮完成规则创建。

::: tip

  关于将数据转发至 HTTP 服务的完整流程与配置，可参考官方文档：[将 MQTT 数据发送到 HTTP 服务](../data_integration/http_server.md)。

:::    

#### 测试规则和动作

推荐使用 [MQTTX](https://mqttx.app/) 模拟客户端连接与消息上报，同时您也可以使用其他任意客户端完成。

1. 使用 MQTTX 连接到部署，将 ClientID 设置为 `conn`。 
2. 将客户端 `conn` 断开连接，此时 HTTP 服务应该能收到 POST 请求，请求体原始内容的格式为：
```json
{ 
    "clientid": "conn", 
    "username": "u_emqx", 
    "reason": "normal", 
    "disconnected_at": 1645003578536
}
```