Skip to content

规则引擎 SQL 语句中可用的字段

SELECT 和 WHERE 子句可用的字段与事件的类型相关。其中 clientid, usernameevent 是通用字段,每种事件类型都有。

使用规则引擎 SQL 语句处理消息发布

规则引擎的 SQL 语句可以处理消息发布。 在一个规则语句中,用户可以用 FROM 子句指定一个或者多个主题,当任何消息发布到指定的主题时都会触发该规则。

字段解释
idMQTT 消息 ID
clientid消息来源 Client ID
username消息来源用户名
payloadMQTT 消息体
peerhost客户端的 IPAddress
topicMQTT 主题
qosMQTT 消息的 QoS
flagsMQTT 消息的 Flags
headersMQTT 消息内部与流程处理相关的额外数据
pub_propsPUBLISH Properties (仅适用于 MQTT 5.0)
timestamp事件触发时间 (ms)
publish_received_atPUBLISH 消息到达 Broker 的时间 (ms)
node事件触发所在节点

示例

sql
SELECT
  payload.msg as msg,
  clientid,
  username,
  payload,
  topic,
  qos
FROM
  "t/#"

输出

json
{
  "username": "u_emqx",
  "topic": "t/a",
  "qos": 1,
  "payload": "{\"msg\":\"hello\"}",
  "msg": "hello",
  "clientid": "c_emqx"
}

使用规则引擎 SQL 语句处理事件

规则引擎的 SQL 语句既可以处理消息(消息发布),也可以处理事件(客户端上下线、客户端订阅等)。对于消息,FROM 子句后面直接跟主题名;对于事件,FROM 子句后面跟事件主题。

事件消息的主题以 "$events/" 开头,比如 "$events/client_connected", "$events/session_subscribed"。 如果想让 emqx 将事件消息发布出来,可以在 emqx_rule_engine.conf 文件中配置。

FROM 子句可用的事件主题

事件主题名释义
$events/message_delivered消息投递
$events/message_acked消息确认
$events/message_dropped消息在转发的过程中被丢弃
$events/delivery_dropped消息在投递的过程中被丢弃
$events/client_connected连接完成
$events/client_disconnected连接断开
$events/session_subscribed订阅
$events/session_unsubscribed取消订阅

$events/message_delivered (消息投递)

当消息被放入底层socket时触发规则

字段解释
idMQTT 消息 ID
from_clientid消息来源 Client ID
from_username消息来源用户名
clientid消息目的 Client ID
username消息目的用户名
payloadMQTT 消息体
peerhost客户端的 IPAddress
topicMQTT 主题
qosMQTT 消息的 QoS
flagsMQTT 消息的 Flags
pub_propsPUBLISH Properties (仅适用于 MQTT 5.0)
timestamp事件触发时间 (ms)
publish_received_atPUBLISH 消息到达 Broker 的时间 (ms)
node事件触发所在节点

示例

sql
SELECT
  from_clientid,
  from_username,
  topic,
  qos,
  node,
  timestamp
FROM
  "$events/message_delivered"

输出

json
{
  "topic": "t/a",
  "timestamp": 1645002753259,
  "qos": 1,
  "node": "emqx@127.0.0.1",
  "from_username": "u_emqx_1",
  "from_clientid": "c_emqx_1"
}

$events/message_acked (消息确认)

当消息发送到客户端,并收到客户端回复的ack时触发规则,仅QOS1,QOS2会触发

字段解释
idMQTT 消息 ID
from_clientid消息来源 Client ID
from_username消息来源用户名
clientid消息目的 Client ID
username消息目的用户名
payloadMQTT 消息体
peerhost客户端的 IPAddress
topicMQTT 主题
qosMQTT 消息的 QoS
flagsMQTT 消息的 Flags
pub_propsPUBLISH Properties (仅适用于 MQTT 5.0)
puback_propsPUBACK Properties (仅适用于 MQTT 5.0)
timestamp事件触发时间 (ms)
publish_received_atPUBLISH 消息到达 Broker 的时间 (ms)
node事件触发所在节点

示例

sql
SELECT
  from_clientid,
  from_username,
  topic,
  qos,
  node,
  timestamp
FROM
  "$events/message_acked"

输出

json
{
  "topic": "t/a",
  "timestamp": 1645002965664,
  "qos": 1,
  "node": "emqx@127.0.0.1",
  "from_username": "u_emqx_1",
  "from_clientid": "c_emqx_1"
}

$events/message_dropped (消息在转发的过程中被丢弃)

当一条消息被丢弃时触发规则

字段解释
idMQTT 消息 ID
reason消息丢弃原因,可能的原因:
no_subscribers: 没有订阅者
receive_maximum_exceeded: awaiting_rel 队列已满
packet_identifier_inuse: 消息 ID 已被使用
clientid消息来源 Client ID
username消息来源用户名
payloadMQTT 消息体
peerhost客户端的 IPAddress
topicMQTT 主题
qosMQTT 消息的 QoS
flagsMQTT 消息的 Flags
pub_propsPUBLISH Properties (仅适用于 MQTT 5.0)
timestamp事件触发时间 (ms)
publish_received_atPUBLISH 消息到达 Broker 的时间 (ms)
node事件触发所在节点

示例

sql
SELECT
  reason,
  topic,
  qos,
  node,
  timestamp
FROM
  "$events/message_dropped"

输出

json
{
  "topic": "t/a",
  "timestamp": 1645003103004,
  "reason": "no_subscribers",
  "qos": 1,
  "node": "emqx@127.0.0.1"
}

$events/delivery_dropped (消息在投递的过程中被丢弃)

当订阅者的消息队列已满时触发规则

字段解释
idMQTT 消息 ID
reason消息丢弃原因,可能的原因:
queue_full: 消息队列已满(QoS>0)
no_local: 不允许客户端接收自己发布的消息
expired: 消息或者会话过期
qos0_msg: QoS0 的消息因为消息队列已满被丢弃
from_clientid消息来源 Client ID
from_username消息来源用户名
clientid消息目的 Client ID
username消息目的用户名
payloadMQTT 消息体
peerhost客户端的 IPAddress
topicMQTT 主题
qosMQTT 消息的 QoS
flagsMQTT 消息的 Flags
pub_propsPUBLISH Properties (仅适用于 MQTT 5.0)
timestamp事件触发时间 (ms)
publish_received_atPUBLISH 消息到达 Broker 的时间 (ms)
node事件触发所在节点

示例

sql
SELECT
  from_clientid,
  from_username,
  reason,
  topic,
  qos
FROM "$events/delivery_dropped"

输出

json
{
  "topic": "t/a",
  "reason": "queue_full",
  "qos": 1,
  "from_username": "u_emqx_1",
  "from_clientid": "c_emqx_1"
}

$events/client_connected (终端连接成功)

当终端连接成功时触发规则

字段解释
clientid消息目的 Client ID
username消息目的用户名
mountpoint主题挂载点(主题前缀)
peername终端的 IPAddress 和 Port
socknameemqx 监听的 IPAddress 和 Port
proto_name协议名字
proto_ver协议版本
keepaliveMQTT 保活间隔
clean_startMQTT clean_start
expiry_intervalMQTT Session 过期时间
is_bridge是否为 MQTT bridge 连接
connected_at终端连接完成时间 (ms)
conn_propsCONNECT Properties (仅适用于 MQTT 5.0)
timestamp事件触发时间 (ms)
node事件触发所在节点

示例

sql
SELECT
  clientid,
  username,
  keepalive,
  is_bridge
FROM
  "$events/client_connected"

输出

json
{
  "username": "u_emqx",
  "keepalive": 60,
  "is_bridge": false,
  "clientid": "c_emqx"
}

$events/client_disconnected (终端连接断开)

当终端连接断开时触发规则

字段解释
reason终端连接断开原因:
normal:客户端主动断开
kicked:服务端踢出,通过 REST API
keepalive_timeout: keepalive 超时
not_authorized: 认证失败,或者 acl_nomatch = disconnect 时没有权限的 Pub/Sub 会主动断开客户端
tcp_closed: 对端关闭了网络连接
discarded: 因为相同 ClientID 的客户端上线且设置 clean_start = true
takeovered: 因为相同 ClientID 的客户端上线且设置 clean_start = false
internal_error: 畸形报文或其他未知错误
clientid消息目的 Client ID
username消息目的用户名
peername终端的 IPAddress 和 Port
socknameemqx 监听的 IPAddress 和 Port
disconnected_at终端连接断开时间 (ms)
disconn_propsDISCONNECT Properties (仅适用于 MQTT 5.0)
timestamp事件触发时间 (ms)
node事件触发所在节点

示例

sql
SELECT
  clientid,
  username,
  reason,
  disconnected_at,
  node
FROM
  "$events/client_disconnected"

输出

json
{
  "username": "u_emqx",
  "reason": "normal",
  "node": "emqx@127.0.0.1",
  "disconnected_at": 1645003578536,
  "clientid": "c_emqx"
}

$events/session_subscribed (终端订阅成功)

当终端订阅成功时触发规则

字段解释
clientid消息目的 Client ID
username消息目的用户名
peerhost客户端的 IPAddress
topicMQTT 主题
qosMQTT 消息的 QoS
sub_propsSUBSCRIBE Properties (仅适用于 5.0)
timestamp事件触发时间 (ms)
node事件触发所在节点

示例

sql
SELECT
  clientid,
  username,
  topic,
  qos
FROM
  "$events/session_subscribed"

输出

json
{
  "username": "u_emqx",
  "topic": "t/a",
  "qos": 1,
  "clientid": "c_emqx"
}

$events/session_unsubscribed (取消终端订阅成功)

当取消终端订阅成功时触发规则

字段解释
clientid消息目的 Client ID
username消息目的用户名
peerhost客户端的 IPAddress
topicMQTT 主题
qosMQTT 消息的 QoS
unsub_propsUNSUBSCRIBE Properties (仅适用于 5.0)
timestamp事件触发时间 (ms)
node事件触发所在节点

示例

sql
SELECT
  clientid,
  username,
  topic,
  qos
FROM
  "$events/session_unsubscribed"

输出

json
{
  "username": "u_emqx",
  "topic": "t/a",
  "qos": 1,
  "clientid": "c_emqx"
}

下一部分,规则引擎内置函数