Skip to content

SQL 数据源和字段

规则的 SQL 语句可以处理的数据源有:MQTT 消息、客户端事件,或是连接外部数据系统的 Source。

SQL 语句使用 FROM 来指定数据源,在 SELECTWHERE 子句中可以引用相应的字段。 数据源类型不同,可以使用的字段也不同。

MQTT 消息

规则的 SQL 语句可以处理消息发布。 在一个规则语句中,用户可以用 FROM 子句指定一个或者多个主题, 当任何消息发布到指定的主题时都会触发该规则。

字段解释
idMQTT 消息 ID
clientid消息来源 Client ID
username消息来源用户名
payloadMQTT 消息体
peerhost客户端的 IPAddress
topicMQTT 主题
qosMQTT 消息的 QoS
flagsMQTT 消息的 Flags
pub_propsPUBLISH Properties (仅适用于 MQTT 5.0)
timestamp事件触发时间 (单位:毫秒)
publish_received_atPUBLISH 消息到达 Broker 的时间 (单位:毫秒)
node事件触发所在节点

SQL 示例

sql
SELECT
  *
FROM
  "t/#"

输出

json
{
  "clientid": "c_emqx",
  "event": "message.publish",
  "event_type": "message_publish",
  "flags": {},
  "id": "0005E27C1D24E44FF440000017520000",
  "metadata": {
    "rule_id": "sql_tester:099ddfa9c466d1ca"
  },
  "node": "emqx@127.0.0.1",
  "payload": "abc",
  "peerhost": "192.168.0.10",
  "pub_props": {
    "Message-Expiry-Interval": 30,
    "Payload-Format-Indicator": 0,
    "User-Property": {
      "foo": "bar"
    },
    "User-Property-Pairs": [
      {
        "key": "foo"
      },
      {
        "value": "bar"
      }
    ]
  },
  "publish_received_at": 1656397576334,
  "qos": 1,
  "timestamp": 1656397576334,
  "topic": "t/a",
  "username": "u_emqx"
}

客户端事件

规则的 SQL 语句既可以处理消息(消息发布),也可以处理事件(客户端上下线、客户端订阅等)。对于消息,FROM 子句后面直接跟主题名;对于事件,FROM 子句后面跟事件主题。

事件主题以 $events/ 开头,比如 $events/client_connected$events/session_subscribed

TIP

默认情况下,客户端无法直接订阅客户端事件消息。 本节介绍了如何使用规则来订阅这些消息,您也可以通过订阅系统主题直接获取客户端事件消息。

事件名称事件主题名释义
消息投递事件$events/message_delivered消息投递
消息确认事件$events/message_acked消息确认
消息在转发的过程中被丢弃事件$events/message_dropped消息在转发的过程中被丢弃
消息在投递的过程中被丢弃事件$events/delivery_dropped消息在投递的过程中被丢弃
客户端连接成功事件$events/client_connected连接完成
客户端连接断开事件$events/client_disconnected连接断开
连接确认事件$events/client_connack连接确认
鉴权完成事件$events/client_check_authz_complete鉴权完成
客户端订阅成功事件$events/session_subscribed订阅
客户端取消订阅成功事件$events/session_unsubscribed取消订阅

消息投递事件 ("$events/message_delivered")

当消息被放入底层socket时触发规则。

字段解释
idMQTT 消息 ID
from_clientid消息来源 Client ID
from_username消息来源用户名
clientid消息目的 Client ID
username消息目的用户名
payloadMQTT 消息体
peerhost客户端的 IPAddress
topicMQTT 主题
qosMQTT 消息的 QoS
flagsMQTT 消息的 Flags
pub_propsPUBLISH Properties (仅适用于 MQTT 5.0)
timestamp事件触发时间 (单位:毫秒)
publish_received_atPUBLISH 消息到达 Broker 的时间 (单位:毫秒)
node事件触发所在节点

示例

sql
SELECT
  from_clientid,
  from_username,
  topic,
  qos,
  node,
  timestamp
FROM
  "$events/message_delivered"

输出

json
{
  "topic": "t/a",
  "timestamp": 1645002753259,
  "qos": 1,
  "node": "emqx@127.0.0.1",
  "from_username": "u_emqx_1",
  "from_clientid": "c_emqx_1"
}

消息确认事件 ("$events/message_acked")

当消息发送到客户端,并收到客户端回复的ack时触发规则,仅QOS1,QOS2会触发。

字段解释
idMQTT 消息 ID
from_clientid消息来源 Client ID
from_username消息来源用户名
clientid消息目的 Client ID
username消息目的用户名
payloadMQTT 消息体
peerhost客户端的 IPAddress
topicMQTT 主题
qosMQTT 消息的 QoS
flagsMQTT 消息的 Flags
pub_propsPUBLISH Properties (仅适用于 MQTT 5.0)
puback_propsPUBACK Properties (仅适用于 MQTT 5.0)
timestamp事件触发时间 (单位:毫秒)
publish_received_atPUBLISH 消息到达 Broker 的时间 (单位:毫秒)
node事件触发所在节点

示例

sql
SELECT
  from_clientid,
  from_username,
  topic,
  qos,
  node,
  timestamp
FROM
  "$events/message_acked"

输出

json
{
  "topic": "t/a",
  "timestamp": 1645002965664,
  "qos": 1,
  "node": "emqx@127.0.0.1",
  "from_username": "u_emqx_1",
  "from_clientid": "c_emqx_1"
}

消息在转发的过程中被丢弃事件 ("$events/message_dropped")

当一条消息无任何订阅者时触发规则。

字段解释
idMQTT 消息 ID
reason消息丢弃原因,可能的原因:
no_subscribers:没有订阅者
receive_maximum_exceeded: awaiting_rel 队列已满
packet_identifier_inuse: 消息 ID 已被使用
clientid消息来源 Client ID
username消息来源用户名
payloadMQTT 消息体
peerhost客户端的 IPAddress
topicMQTT 主题
qosMQTT 消息的 QoS
flagsMQTT 消息的 Flags
pub_propsPUBLISH Properties (仅适用于 MQTT 5.0)
timestamp事件触发时间 (单位:毫秒)
publish_received_atPUBLISH 消息到达 Broker 的时间 (单位:毫秒)
node事件触发所在节点
示例
sql
SELECT
  reason,
  topic,
  qos,
  node,
  timestamp
FROM
  "$events/message_dropped"

输出

json
{
  "topic": "t/a",
  "timestamp": 1645003103004,
  "reason": "no_subscribers",
  "qos": 1,
  "node": "emqx@127.0.0.1"
}

消息在投递的过程中被丢弃事件 ("$events/delivery_dropped")

当订阅者的消息队列已满时触发规则。

字段解释
idMQTT 消息 ID
reason消息丢弃原因,可能的原因:
queue_full:消息队列已满(QoS>0)
no_local:不允许客户端接收自己发布的消息
expired:消息或者会话过期
qos0_msg:QoS 0 的消息因为消息队列已满被丢弃
from_clientid消息来源 Client ID
from_username消息来源用户名
clientid消息目的 Client ID
username消息目的用户名
payloadMQTT 消息体
peerhost客户端的 IPAddress
topicMQTT 主题
qosMQTT 消息的 QoS
flagsMQTT 消息的 Flags
pub_propsPUBLISH Properties (仅适用于 MQTT 5.0)
timestamp事件触发时间 (单位:毫秒)
publish_received_atPUBLISH 消息到达 Broker 的时间 (单位:毫秒)
node事件触发所在节点

示例

sql
SELECT
  from_clientid,
  from_username,
  reason,
  topic,
  qos
FROM "$events/delivery_dropped"

输出

json
{
  "topic": "t/a",
  "reason": "queue_full",
  "qos": 1,
  "from_username": "u_emqx_1",
  "from_clientid": "c_emqx_1"
}

客户端连接成功事件 ("$events/client_connected")

当客户端连接成功时触发规则。

字段解释
clientid消息目的 Client ID
username消息目的用户名
mountpoint主题挂载点(主题前缀)
peername客户端的 IPAddress 和 Port
socknameemqx 监听的 IPAddress 和 Port
proto_name协议名字
proto_ver协议版本
keepaliveMQTT 保活间隔
clean_startMQTT clean_start
expiry_intervalMQTT Session 过期时间
is_bridge是否为 MQTT bridge 连接
connected_at客户端连接完成时间 (单位:毫秒)
conn_propsCONNECT Properties (仅适用于 MQTT 5.0)
timestamp事件触发时间 (单位:毫秒)
node事件触发所在节点

示例

sql
SELECT
  clientid,
  username,
  keepalive,
  is_bridge
FROM
  "$events/client_connected"

输出

json
{
  "username": "u_emqx",
  "keepalive": 60,
  "is_bridge": false,
  "clientid": "c_emqx"
}

客户端连接断开事件 ("$events/client_disconnected")

当客户端连接断开时触发规则。

字段解释
reason客户端连接断开原因:
normal:客户端主动断开
kicked:服务端踢出,通过 REST API
keepalive_timeout:keepalive 超时
not_authorized:认证失败,或者 acl_nomatch = disconnect 时没有权限的 Pub/Sub 会主动断开客户端
tcp_closed:对端关闭了网络连接
discarded: 另一个客户端使用相同的 ClientID 连接并设置 clean_start = true
takenover: 另一个客户端使用相同的 ClientID 连接并设置 clean_start = false
internal_error:畸形报文或其他未知错误
clientid消息目的 Client ID
username消息目的用户名
peername客户端的 IPAddress 和 Port
socknameemqx 监听的 IPAddress 和 Port
disconnected_at客户端连接断开时间 (单位:毫秒)
disconn_propsDISCONNECT Properties (仅适用于 MQTT 5.0)
timestamp事件触发时间 (单位:毫秒)
node事件触发所在节点

示例

sql
SELECT
  clientid,
  username,
  reason,
  disconnected_at,
  node
FROM
  "$events/client_disconnected"

输出

json
{
  "username": "u_emqx",
  "reason": "normal",
  "node": "emqx@127.0.0.1",
  "disconnected_at": 1645003578536,
  "clientid": "c_emqx"
}

连接确认事件 ("$events/client_connack")

当服务端向客户端发送CONNACK报文时触发规则,reason_code 包含各种错误原因代码。

字段解释
reason_code各种原因代码
clientid消息目的 Client ID
username消息目的用户名
peername客户端的 IPAddress 和 Port
socknameemqx 监听的 IPAddress 和 Port
proto_name协议名字
proto_ver协议版本
keepaliveMQTT 保活间隔
clean_startMQTT clean_start
expiry_intervalMQTT Session 过期时间
conn_propsCONNECT Properties (仅适用于 MQTT 5.0)
timestamp事件触发时间 (ms)
node事件触发所在节点

MQTT v5.0 协议将返回码重命名为原因码,增加了一个原因码来指示更多类型的错误(Reason code and ACK - MQTT 5.0 new features)。 因此reason_code 在MQTT v3.1.1与MQTT v5.0中有很大的不同。

MQTT v3.1.1

reason_code描述
connection_accepted已接受连接
unacceptable_protocol_version服务器不支持客户端请求的 MQTT 协议
client_identifier_not_valid客户端 ID 是正确的 UTF-8 字符串,但服务器不允许
server_unavaliable网络连接已建立,但 MQTT 服务不可用
malformed_username_or_password用户名或密码中的数据格式错误
unauthorized_client客户端连接未授权

MQTT v5.0

reason_code描述
success连接成功
unspecified_error未指定的错误
malformed_packet畸形数据包
protocol_error协议错误
implementation_specific_error实现特定错误
unsupported_protocol_version不支持的协议版本
client_identifier_not_valid客户端标识符无效
bad_username_or_password错误的用户名或密码
not_authorized未经授权
server_unavailable服务器无法使用
server_busy服务器繁忙
banned禁止访问
bad_authentication_method错误的身份验证方法
topic_name_invalid主题名称无效
packet_too_large数据包太大
quota_exceeded超出配额
retain_not_supported不支持的retain
qos_not_supported不支持的qos
use_another_server使用另一台服务器
server_moved服务器迁移了
connection_rate_exceeded超出连接速率

示例

sql
SELECT
  clientid,
  username,
  reason_code,
  node
FROM
  "$events/client_connack"

输出

json
{
  "username": "u_emqx",
  "reason_code": "success",
  "node": "emqx@127.0.0.1",
  "connected_at": 1645003578536,
  "clientid": "c_emqx"
}

鉴权完成事件 ("$events/client_check_authz_complete")

当客户端鉴权结束时触发规则。

字段解释
clientid消息目的 Client ID
username消息目的用户名
peerhost客户端的 IPAddress
topicMQTT 主题
actionpublish or subscribe,发布或者订阅事件
resultallow or deny,鉴权完成
authz_source认证源
timestamp事件触发时间 (ms)
node事件触发所在节点

示例

sql
SELECT
  clientid,
  username,
  topic,
  action,
  result,
  authz_source,
  node
FROM
  "$events/client_check_authz_complete"

输出

json
{
  "username": "u_emqx",
  "topic": "t/a",
  "action": "publish",
  "result": "allow",
  "authz_source": "cache",
  "node": "emqx@127.0.0.1",
  "clientid": "c_emqx"
}

客户端订阅成功事件 ("$events/session_subscribed")

当客户端订阅成功时触发规则。

字段解释
clientid消息目的 Client ID
username消息目的用户名
peerhost客户端的 IPAddress
topicMQTT 主题
qosMQTT 消息的 QoS
sub_propsSUBSCRIBE Properties (仅适用于 5.0)
timestamp事件触发时间 (单位:毫秒)
node事件触发所在节点

示例

sql
SELECT
  clientid,
  username,
  topic,
  qos
FROM
  "$events/session_subscribed"

输出

json
{
  "username": "u_emqx",
  "topic": "t/a",
  "qos": 1,
  "clientid": "c_emqx"
}

客户端取消订阅成功事件 ("$events/session_unsubscribed")

当客户端取消订阅成功时触发规则。

字段解释
clientid消息目的 Client ID
username消息目的用户名
peerhost客户端的 IPAddress
topicMQTT 主题
qosMQTT 消息的 QoS
unsub_propsUNSUBSCRIBE Properties (仅适用于 5.0)
timestamp事件触发时间 (单位:毫秒)
node事件触发所在节点

示例

sql
SELECT
  clientid,
  username,
  topic,
  qos
FROM
  "$events/session_unsubscribed"

输出

json
{
  "username": "u_emqx",
  "topic": "t/a",
  "qos": 1,
  "clientid": "c_emqx"
}

Source

规则使用 $bridges/ 开头的主题来表示 Source 的消息或事件。格式为:$bridges/<type>:<name>

其中 <type>:<name> 部分是 Source 的 ID,<type> 是 Source 的类型,<name> 是 Source 的名字。 比如 $bridges/mqtt:my_mqtt_bridge

MQTT 订阅者事件 ("$bridges/mqtt:*")

当该 MQTT 订阅者从外部 MQTT Broker 接收到消息时触发规则

字段解释
idMQTT 消息 ID
server远程 MQTT Broker 的地址,例如 "broker.emqx.io:1883"
payloadMQTT 消息体
topicMQTT 主题
qosMQTT 消息的 QoS
dupMQTT 消息的 DUP Flag
retainMQTT 消息的 Retain Flag
pub_propsPUBLISH Properties (仅适用于 MQTT 5.0)
message_received_atPUBLISH 消息到达 Broker 的时间 (单位:毫秒)

示例

sql
SELECT
  *
FROM
  "$bridges/mqtt:my_mqtt_bridge"

输出:

json
{
  "id": "0005E27C1D24E44FF440000017520000",
  "server": "broker.emqx.io:1883",
  "payload": "hello",
  "topic": "t/a",
  "qos": 1,
  "dup": false,
  "retain": false,
  "pub_props": {
    "Message-Expiry-Interval": 30,
    "Payload-Format-Indicator": 0,
    "User-Property": {
      "foo": "bar"
    },
    "User-Property-Pairs": [
      {
        "key": "foo"
      },
      {
        "value": "bar"
      }
    ]
  },
  "message_received_at": 1645002753259,
}