# SQL 数据源和字段

规则的 SQL 语句可以处理的数据源有：MQTT 消息、客户端事件，或是连接外部数据系统的 Source。

SQL 语句使用 `FROM` 来指定数据源，在 `SELECT` 和 `WHERE` 子句中可以引用相应的字段。 数据源类型不同，可以使用的字段也不同。

## MQTT 消息

规则的 SQL 语句可以处理消息发布。 在一个规则语句中，用户可以用 FROM 子句指定一个或者多个主题， 当任何消息发布到指定的主题时都会触发该规则。

| 字段                | 解释                                                         |
| :------------------ | :----------------------------------------------------------- |
| id                  | MQTT 消息 ID                                                 |
| clientid            | 消息来源 Client ID                                           |
| username            | 消息来源用户名                                               |
| payload             | MQTT 消息体                                                  |
| peerhost            | 客户端的 IPAddress                                           |
| topic               | MQTT 主题                                                    |
| qos                 | MQTT 消息的 QoS                                              |
| flags               | MQTT 消息的 Flags                                            |
| pub_props           | PUBLISH Properties (仅适用于 MQTT 5.0)                       |
| timestamp           | 事件触发时间 (单位：毫秒)                                    |
| publish_received_at | PUBLISH 消息到达 Broker 的时间 (单位：毫秒)                  |
| node                | 事件触发所在节点                                             |
| client_attrs        | [客户端属性](https://docs.emqx.com/zh/emqx/latest/client-attributes/client-attributes.html) |

SQL 示例：

```sql
SELECT
  *
FROM
  "t/#"
```

输出：

```json
{
  "clientid": "c_emqx",
  "event": "message.publish",
  "event_type": "message_publish",
  "flags": {},
  "id": "0005E27C1D24E44FF440000017520000",
  "metadata": {
    "rule_id": "sql_tester:099ddfa9c466d1ca"
  },
  "node": "emqx@127.0.0.1",
  "payload": "abc",
  "peerhost": "192.168.0.10",
  "pub_props": {
    "Message-Expiry-Interval": 30,
    "Payload-Format-Indicator": 0,
    "User-Property": {
      "foo": "bar"
    },
    "User-Property-Pairs": [
      {
        "key": "foo"
      },
      {
        "value": "bar"
      }
    ]
  },
  "publish_received_at": 1656397576334,
  "qos": 1,
  "timestamp": 1656397576334,
  "topic": "t/a",
  "username": "u_emqx"
}
```

## 客户端事件

规则的 SQL 语句既可以处理消息（消息发布），也可以处理事件（客户端上下线、客户端订阅等）。对于消息，`FROM` 子句后面直接跟主题名；对于事件，`FROM` 子句后面跟事件主题。事件主题以 `$events/` 开头，比如 `$events/client_connected`，`$events/session_subscribed`。

默认情况下，客户端无法直接订阅客户端事件消息。本节介绍如何使用规则来订阅这些事件主题消息，并介绍各个字段的含义。

EMQX Cloud 数据集成提供了以下可用的事件主题。

| 事件名称                                                     | 事件主题名                          | 释义                     |
| ------------------------------------------------------------ | ----------------------------------- | ------------------------ |
| [消息投递事件](#消息投递事件-events-message-delivered)       | $events/message_delivered           | 消息投递                 |
| [消息确认事件](#消息确认事件-events-message-acked)           | $events/message_acked               | 消息确认                 |
| [消息在转发的过程中被丢弃事件](#消息在转发的过程中被丢弃事件-events-message-dropped) | $events/message_dropped             | 消息在转发的过程中被丢弃 |
| [消息在投递的过程中被丢弃事件](#消息在投递的过程中被丢弃事件-events-delivery-dropped) | $events/delivery_dropped            | 消息在投递的过程中被丢弃 |
| [客户端连接成功事件](#客户端连接成功事件-events-client-connected) | $events/client_connected            | 连接完成                 |
| [客户端连接断开事件](#客户端连接断开事件-events-client-disconnected) | $events/client_disconnected         | 连接断开                 |
| [连接确认事件](#连接确认事件-events-client-connack)          | $events/client_connack              | 连接确认                 |
| [鉴权完成事件](#鉴权完成事件-events-client-check-authz-complete) | $events/client_check_authz_complete | 鉴权完成                 |
| [认证完成事件](#认证完成事件-events-client-check-authn-complete) | $events/client_check_authn_complete | 认证完成                 |
| [客户端订阅成功事件](#客户端订阅成功事件-events-session-subscribed) | $events/session_subscribed          | 订阅                     |
| [客户端取消订阅成功事件](#客户端取消订阅成功事件-events-session-unsubscribed) | $events/session_unsubscribed        | 取消订阅                 |

## 消息投递事件 ("$events/message_delivered")

当消息被放入底层socket时触发规则。

| 字段                  | 解释                                        |
| :-------------------- | :------------------------------------------ |
| id                    | MQTT 消息 ID                                |
| from\_clientid        | 消息来源 Client ID                          |
| from\_username        | 消息来源用户名                              |
| clientid              | 消息目的 Client ID                          |
| username              | 消息目的用户名                              |
| payload               | MQTT 消息体                                 |
| peerhost              | 客户端的 IPAddress                          |
| topic                 | MQTT 主题                                   |
| qos                   | MQTT 消息的 QoS                             |
| flags                 | MQTT 消息的 Flags                           |
| pub\_props            | PUBLISH Properties (仅适用于 MQTT 5.0)      |
| timestamp             | 事件触发时间 (单位：毫秒)                   |
| publish\_received\_at | PUBLISH 消息到达 Broker 的时间 (单位：毫秒) |
| node                  | 事件触发所在节点                            |

示例

```sql
SELECT
  from_clientid,
  from_username,
  topic,
  qos,
  node,
  timestamp
FROM
  "$events/message_delivered"
```

输出

```json
{
  "topic": "t/a",
  "timestamp": 1645002753259,
  "qos": 1,
  "node": "emqx@127.0.0.1",
  "from_username": "u_emqx_1",
  "from_clientid": "c_emqx_1"
}
```

## 消息确认事件 ("$events/message_acked")

当消息发送到客户端，并收到客户端回复的ack时触发规则，仅QOS1，QOS2会触发。

| 字段                  | 解释                                        |
| :-------------------- | :------------------------------------------ |
| id                    | MQTT 消息 ID                                |
| from\_clientid        | 消息来源 Client ID                          |
| from\_username        | 消息来源用户名                              |
| clientid              | 消息目的 Client ID                          |
| username              | 消息目的用户名                              |
| payload               | MQTT 消息体                                 |
| peerhost              | 客户端的 IPAddress                          |
| topic                 | MQTT 主题                                   |
| qos                   | MQTT 消息的 QoS                             |
| flags                 | MQTT 消息的 Flags                           |
| pub\_props            | PUBLISH Properties (仅适用于 MQTT 5.0)      |
| puback\_props         | PUBACK Properties (仅适用于 MQTT 5.0)       |
| timestamp             | 事件触发时间 (单位：毫秒)                   |
| publish\_received\_at | PUBLISH 消息到达 Broker 的时间 (单位：毫秒) |
| node                  | 事件触发所在节点                            |

示例

```sql
SELECT
  from_clientid,
  from_username,
  topic,
  qos,
  node,
  timestamp
FROM
  "$events/message_acked"
```

输出

```json
{
  "topic": "t/a",
  "timestamp": 1645002965664,
  "qos": 1,
  "node": "emqx@127.0.0.1",
  "from_username": "u_emqx_1",
  "from_clientid": "c_emqx_1"
}
```

## 消息在转发的过程中被丢弃事件 ("$events/message_dropped")

当一条消息无任何订阅者时触发规则。

| 字段                  | 解释                                                         |
| :-------------------- | :----------------------------------------------------------- |
| id                    | MQTT 消息 ID                                                 |
| reason                | 消息丢弃原因，可能的原因：<br/>no\_subscribers：没有订阅者<br/>receive\_maximum\_exceeded: awaiting\_rel 队列已满<br/>packet\_identifier\_inuse: 消息 ID 已被使用 |
| clientid              | 消息来源 Client ID                                           |
| username              | 消息来源用户名                                               |
| payload               | MQTT 消息体                                                  |
| peerhost              | 客户端的 IPAddress                                           |
| topic                 | MQTT 主题                                                    |
| qos                   | MQTT 消息的 QoS                                              |
| flags                 | MQTT 消息的 Flags                                            |
| pub\_props            | PUBLISH Properties (仅适用于 MQTT 5.0)                       |
| timestamp             | 事件触发时间 (单位：毫秒)                                    |
| publish\_received\_at | PUBLISH 消息到达 Broker 的时间 (单位：毫秒)                  |
| node                  | 事件触发所在节点                                             |

示例


```sql
SELECT
  reason,
  topic,
  qos,
  node,
  timestamp
FROM
  "$events/message_dropped"
```

输出

```json
{
  "topic": "t/a",
  "timestamp": 1645003103004,
  "reason": "no_subscribers",
  "qos": 1,
  "node": "emqx@127.0.0.1"
}
```

## 消息在投递的过程中被丢弃事件 ("$events/delivery_dropped")

当订阅者的消息队列已满时触发规则。

| 字段                  | 解释                                                         |
| :-------------------- | :----------------------------------------------------------- |
| id                    | MQTT 消息 ID                                                 |
| reason                | 消息丢弃原因，可能的原因：<br/>queue\_full：消息队列已满(QoS>0)<br/>no\_local：不允许客户端接收自己发布的消息<br/>expired：消息或者会话过期<br/>qos0\_msg：QoS 0 的消息因为消息队列已满被丢弃 |
| from\_clientid        | 消息来源 Client ID                                           |
| from\_username        | 消息来源用户名                                               |
| clientid              | 消息目的 Client ID                                           |
| username              | 消息目的用户名                                               |
| payload               | MQTT 消息体                                                  |
| peerhost              | 客户端的 IPAddress                                           |
| topic                 | MQTT 主题                                                    |
| qos                   | MQTT 消息的 QoS                                              |
| flags                 | MQTT 消息的 Flags                                            |
| pub\_props            | PUBLISH Properties (仅适用于 MQTT 5.0)                       |
| timestamp             | 事件触发时间 (单位：毫秒)                                    |
| publish\_received\_at | PUBLISH 消息到达 Broker 的时间 (单位：毫秒)                  |
| node                  | 事件触发所在节点                                             |

示例

```sql
SELECT
  from_clientid,
  from_username,
  reason,
  topic,
  qos
FROM "$events/delivery_dropped"
```

输出

```json
{
  "topic": "t/a",
  "reason": "queue_full",
  "qos": 1,
  "from_username": "u_emqx_1",
  "from_clientid": "c_emqx_1"
}
```

## 客户端连接成功事件 ("$events/client_connected")

当客户端连接成功时触发规则。

| 字段             | 解释                                                         |
| :--------------- | :----------------------------------------------------------- |
| clientid         | 消息目的 Client ID                                           |
| username         | 消息目的用户名                                               |
| mountpoint       | 主题挂载点(主题前缀)                                         |
| peername         | 客户端的 IPAddress 和 Port                                   |
| sockname         | emqx 监听的 IPAddress 和 Port                                |
| proto\_name      | 协议名字                                                     |
| proto\_ver       | 协议版本                                                     |
| keepalive        | MQTT 保活间隔                                                |
| clean\_start     | MQTT clean\_start                                            |
| expiry\_interval | MQTT Session 过期时间                                        |
| is\_bridge       | 是否为 MQTT bridge 连接                                      |
| connected\_at    | 客户端连接完成时间 (单位：毫秒)                              |
| conn\_props      | CONNECT Properties (仅适用于 MQTT 5.0)                       |
| timestamp        | 事件触发时间 (单位：毫秒)                                    |
| node             | 事件触发所在节点                                             |
| client_attrs     | [客户端属性](https://docs.emqx.com/zh/emqx/latest/client-attributes/client-attributes.html) |

示例

```sql
SELECT
  clientid,
  username,
  keepalive,
  is_bridge
FROM
  "$events/client_connected"
```

输出

```json
{
  "username": "u_emqx",
  "keepalive": 60,
  "is_bridge": false,
  "clientid": "c_emqx"
}
```

## 客户端连接断开事件 ("$events/client_disconnected")

当客户端连接断开时触发规则。

| 字段             | 解释                                                         |
| :--------------- | :----------------------------------------------------------- |
| reason           | 客户端连接断开原因：<br/>normal：客户端主动断开<br/>kicked：服务端踢出，通过 REST API<br/>keepalive\_timeout：keepalive 超时<br/>not\_authorized：认证失败，或者 acl\_nomatch = disconnect 时没有权限的 Pub/Sub 会主动断开客户端<br/>tcp\_closed：对端关闭了网络连接<br/>discarded: 另一个客户端使用相同的 ClientID 连接并设置 `clean_start = true`<br/>takenover: 另一个客户端使用相同的 ClientID 连接并设置 `clean_start = false`<br/>internal\_error：畸形报文或其他未知错误<br/> |
| clientid         | 消息目的 Client ID                                           |
| username         | 消息目的用户名                                               |
| peername         | 客户端的 IPAddress 和 Port                                   |
| sockname         | emqx 监听的 IPAddress 和 Port                                |
| connected\_at    | 客户端连接开始时间（单位：毫秒）。该时间戳表示当前会话建立的时间，有助于识别断开事件所属的连接会话。<br />此字段可确保延迟的断开事件不会覆盖较新的连接状态。 |
| disconnected\_at | 客户端连接断开时间 (单位：毫秒)                              |
| disconn\_props   | DISCONNECT Properties (仅适用于 MQTT 5.0)                    |
| timestamp        | 事件触发时间 (单位：毫秒)                                    |
| node             | 事件触发所在节点                                             |
| client_attrs     | [客户端属性](https://docs.emqx.com/zh/emqx/latest/client-attributes/client-attributes.html) |

示例

```sql
SELECT
  clientid,
  username,
  reason,
  connected_at,
  disconnected_at,
  node
FROM
  "$events/client_disconnected"
```

输出

```json
{
  "username": "u_emqx",
  "reason": "normal",
  "node": "emqx@127.0.0.1",
  "connected_at": 1645003578036,
  "disconnected_at": 1645003578536,
  "clientid": "c_emqx"
}
```

## 连接确认事件 ("$events/client_connack")

当服务端向客户端发送CONNACK报文时触发规则，reason_code 包含各种错误原因代码。

| 字段             | 解释                                   |
| ---------------- | :------------------------------------- |
| reason\_code     | 各种原因代码                           |
| clientid         | 消息目的 Client ID                     |
| username         | 消息目的用户名                         |
| peername         | 客户端的 IPAddress 和 Port             |
| sockname         | emqx 监听的 IPAddress 和 Port          |
| proto\_name      | 协议名字                               |
| proto\_ver       | 协议版本                               |
| keepalive        | MQTT 保活间隔                          |
| clean\_start     | MQTT clean\_start                      |
| expiry\_interval | MQTT Session 过期时间                  |
| conn\_props      | CONNECT Properties (仅适用于 MQTT 5.0) |
| timestamp        | 事件触发时间 (ms)                      |
| node             | 事件触发所在节点                       |

MQTT v5.0 协议将返回码重命名为原因码，增加了一个原因码来指示更多类型的错误（[Reason code and ACK - MQTT 5.0 new features](https://www.emqx.com/zh/blog/mqtt5-new-features-reason-code-and-ack)）。
因此，reason_code 在 MQTT v3.1.1 与 MQTT v5.0中有很大的不同。

MQTT v3.1.1

| reason\_code                      | 描述                                            |
| --------------------------------- | ----------------------------------------------- |
| connection\_accepted              | 已接受连接                                      |
| unacceptable\_protocol\_version   | 服务器不支持客户端请求的 MQTT 协议              |
| client\_identifier\_not\_valid    | 客户端 ID 是正确的 UTF-8 字符串，但服务器不允许 |
| server\_unavaliable               | 网络连接已建立，但 MQTT 服务不可用              |
| malformed\_username\_or\_password | 用户名或密码中的数据格式错误                    |
| unauthorized\_client              | 客户端连接未授权                                |

MQTT v5.0

| reason\_code                    | 描述               |
| ------------------------------- | ------------------ |
| success                         | 连接成功           |
| unspecified\_error              | 未指定的错误       |
| malformed\_packet               | 畸形数据包         |
| protocol\_error                 | 协议错误           |
| implementation\_specific\_error | 实现特定错误       |
| unsupported\_protocol\_version  | 不支持的协议版本   |
| client\_identifier\_not\_valid  | 客户端标识符无效   |
| bad\_username\_or\_password     | 错误的用户名或密码 |
| not\_authorized                 | 未经授权           |
| server\_unavailable             | 服务器无法使用     |
| server\_busy                    | 服务器繁忙         |
| banned                          | 禁止访问           |
| bad\_authentication\_method     | 错误的身份验证方法 |
| topic\_name\_invalid            | 主题名称无效       |
| packet\_too\_large              | 数据包太大         |
| quota\_exceeded                 | 超出配额           |
| retain\_not\_supported          | 不支持的 retain    |
| qos\_not\_supported             | 不支持的 qos       |
| use\_another\_server            | 使用另一台服务器   |
| server\_moved                   | 服务器迁移了       |
| connection\_rate\_exceeded      | 超出连接速率       |

示例

```sql
SELECT
  clientid,
  username,
  reason_code,
  node
FROM
  "$events/client_connack"
```

输出

```json
{
  "username": "u_emqx",
  "reason_code": "success",
  "node": "emqx@127.0.0.1",
  "connected_at": 1645003578536,
  "clientid": "c_emqx"
}
```

## 鉴权完成事件 ("$events/client_check_authz_complete")

当客户端鉴权结束时触发规则。

| 字段          | 解释                                                         |
| ------------- | :----------------------------------------------------------- |
| clientid      | 消息目的 Client ID                                           |
| username      | 消息目的用户名                                               |
| peerhost      | 客户端的 IPAddress                                           |
| topic         | MQTT 主题                                                    |
| action        | publish or subscribe，发布或者订阅事件                       |
| result        | allow or deny，鉴权完成                                      |
| authz\_source | 认证源                                                       |
| timestamp     | 事件触发时间 (ms)                                            |
| node          | 事件触发所在节点                                             |
| client_attrs  | [客户端属性](https://docs.emqx.com/zh/emqx/latest/client-attributes/client-attributes.html) |

示例

```sql
SELECT
  clientid,
  username,
  topic,
  action,
  result,
  authz_source,
  node
FROM
  "$events/client_check_authz_complete"
```

输出

```json
{
  "username": "u_emqx",
  "topic": "t/a",
  "action": "publish",
  "result": "allow",
  "authz_source": "cache",
  "node": "emqx@127.0.0.1",
  "clientid": "c_emqx"
}
```

## 认证完成事件 ("$events/client_check_authn_complete")

当客户端认证结束时触发规则。

| 字段           | 解释                                                         |
| -------------- | :----------------------------------------------------------- |
| clientid       | 消息目的 Client ID                                           |
| username       | 消息目的用户名                                               |
| peername       | 客户端的 IPAddress                                           |
| `reason_code`  | 认证结果                                                     |
| `is_superuser` | 是否是超级用户                                               |
| `is_anonymous` | 是否是匿名用户                                               |
| `client_attrs` | [客户端属性](https://docs.emqx.com/zh/emqx/latest/client-attributes/client-attributes.html) |

示例

```sql
SELECT
  clientid,
  username,
  reason_code,
  is_superuser,
  is_anonymous
FROM
  "$events/client_check_authn_complete"
```

输出

```json
{
  "clientid": "c_emqx",
  "username": "u_emqx",
  "reason_code": "success",
  "is_superuser": true,
  "is_anonymous": false
}
```

## 客户端订阅成功事件 ("$events/session_subscribed")

当客户端订阅成功时触发规则。

| 字段         | 解释                                                         |
| :----------- | :----------------------------------------------------------- |
| clientid     | 消息目的 Client ID                                           |
| username     | 消息目的用户名                                               |
| peerhost     | 客户端的 IPAddress                                           |
| topic        | MQTT 主题                                                    |
| qos          | MQTT 消息的 QoS                                              |
| sub_props    | SUBSCRIBE Properties (仅适用于 5.0)                          |
| timestamp    | 事件触发时间 (单位：毫秒)                                    |
| node         | 事件触发所在节点                                             |
| client_attrs | [客户端属性](https://docs.emqx.com/zh/emqx/latest/client-attributes/client-attributes.html) |

示例

```sql
SELECT
  clientid,
  username,
  topic,
  qos
FROM
  "$events/session_subscribed"
```

输出

```json
{
  "username": "u_emqx",
  "topic": "t/a",
  "qos": 1,
  "clientid": "c_emqx"
}
```

## 客户端取消订阅成功事件 ("$events/session_unsubscribed")

当客户端取消订阅成功时触发规则。

| 字段         | 解释                                                         |
| :----------- | :----------------------------------------------------------- |
| clientid     | 消息目的 Client ID                                           |
| username     | 消息目的用户名                                               |
| peerhost     | 客户端的 IPAddress                                           |
| topic        | MQTT 主题                                                    |
| qos          | MQTT 消息的 QoS                                              |
| unsub_props  | UNSUBSCRIBE Properties (仅适用于 5.0)                        |
| timestamp    | 事件触发时间 (单位：毫秒)                                    |
| node         | 事件触发所在节点                                             |
| client_attrs | [客户端属性](https://docs.emqx.com/zh/emqx/latest/client-attributes/client-attributes.html) |

示例

```sql
SELECT
  clientid,
  username,
  topic,
  qos
FROM
  "$events/session_unsubscribed"
```

输出

```json
{
  "username": "u_emqx",
  "topic": "t/a",
  "qos": 1,
  "clientid": "c_emqx"
}
```

## 输入动作

规则使用 `$bridges/` 开头的主题来表示输入动作的消息或事件。格式为：`$bridges/<type>:<name>`。

其中 `<type>:<name>` 部分是输入动作的 ID，`<type>` 是输入动作的类型，`<name>` 是输入动作的名字。 比如 `$bridges/mqtt:my_mqtt_bridge`。

### MQTT 订阅者事件 ("$bridges/mqtt:*")

当该 MQTT 订阅者从外部 MQTT Broker 接收到消息时触发规则。

| 字段                | 解释                                                |
| :------------------ | :-------------------------------------------------- |
| id                  | MQTT 消息 ID                                        |
| server              | 远程 MQTT Broker 的地址，例如 "broker.emqx.io:1883" |
| payload             | MQTT 消息体                                         |
| topic               | MQTT 主题                                           |
| qos                 | MQTT 消息的 QoS                                     |
| dup                 | MQTT 消息的 DUP Flag                                |
| retain              | MQTT 消息的 Retain Flag                             |
| pub_props           | PUBLISH Properties (仅适用于 MQTT 5.0)              |
| message_received_at | PUBLISH 消息到达 Broker 的时间 (单位：毫秒)         |

示例:

```sql
SELECT
  *
FROM
  "$bridges/mqtt:my_mqtt_bridge"
```

输出:

```json
{
  "id": "0005E27C1D24E44FF440000017520000",
  "server": "broker.emqx.io:1883",
  "payload": "hello",
  "topic": "t/a",
  "qos": 1,
  "dup": false,
  "retain": false,
  "pub_props": {
    "Message-Expiry-Interval": 30,
    "Payload-Format-Indicator": 0,
    "User-Property": {
      "foo": "bar"
    },
    "User-Property-Pairs": [
      {
        "key": "foo"
      },
      {
        "value": "bar"
      }
    ]
  },
  "message_received_at": 1645002753259,
}
```
