SQL Statement Fields
The fields available in the SELECT and WHERE clauses are related to the type of event. Among them, clientid
, username
and event
are common fields that are contained by each type of event.
Message Publishing with Rule Engine
The SQL statement of the rules engine can handle the message publishing. In a rule statement, the user can specify one or more topics with the FROM
clause, and the rule will be triggered when any message is published to the specified topic.
Field | Explanation |
---|---|
id | MQTT message ID |
clientid | Client ID of the sender |
username | Username of the sender |
payload | MQTT payload |
peerhost | Client IPAddress |
topic | MQTT topic |
qos | Enumeration of message QoS 0,1,2 |
flags | Flags |
headers | Internal data related to the message processing |
pub_props | The PUBLISH Properties (MQTT 5.0 only) |
timestamp | Timestamp (ms) |
publish_received_at | Time when PUBLISH message reaches Broker (ms) |
node | Node name of the trigger event |
example
SELECT
payload.msg as msg,
clientid,
username,
payload,
topic,
qos
FROM
"t/#"
output
{
"username": "u_emqx",
"topic": "t/a",
"qos": 1,
"payload": "{\"msg\":\"hello\"}",
"msg": "hello",
"clientid": "c_emqx"
}
Events Handling with Rule Engine
The SQL statements of the rule engine can handle both messages (message publishing) and events (client online and offline, client subscription, etc.). For messages, the FROM clause is directly followed by the topic name; for events, the FROM clause is followed by the event topic.
The topic of the event message starts with "$events/"
, such as "$events/client_connected",
"$events/session_subscribed"
. If you want emqx to publish the event message, you can configure it in the emqx_rule_engine.conf
file.
Event Topics for FROM
Clause
Event Topic Name | Explanation |
---|---|
$events/message_delivered | Message delivery |
$events/message_acked | Message acknowledged |
$events/message_dropped | Message dropped when routing |
$events/delivery_dropped | Message dropped when delivering |
$events/client_connected | Connection complete |
$events/client_disconnected | Disconnect |
$events/session_subscribed | Subscribe |
$events/session_unsubscribed | Unsubcribe |
$events/message_delivered
Trigger the rule when a message is put into the underlying socket
Field | Explanation |
---|---|
id | MQTT message ID |
from_clientid | Client ID of the sender |
from_username | Username of the sender |
clientid | Client ID of the receiver |
username | Username of the receiver |
payload | MQTT payload |
peerhost | Client IPAddress |
topic | MQTT topic |
qos | Enumeration of message QoS 0,1,2 |
flags | Flags |
pub_props | The PUBLISH Properties (MQTT 5.0 only) |
timestamp | Event trigger time(millisecond) |
publish_received_at | Time when PUBLISH message reaches Broker (ms) |
node | Node name of the trigger event |
example
SELECT
from_clientid,
from_username,
topic,
qos,
node,
timestamp
FROM
"$events/message_delivered"
output
{
"topic": "t/a",
"timestamp": 1645002753259,
"qos": 1,
"node": "emqx@127.0.0.1",
"from_username": "u_emqx_1",
"from_clientid": "c_emqx_1"
}
$events/message_acked
The rule is triggered when the message is sent to the client and an ack is received from the client. Only QOS1 and QOS2 messages will be triggered
Field | Explanation |
---|---|
id | MQTT message id |
from_clientid | Client ID of the sender |
from_username | Username of the sender |
clientid | Client ID of the receiver |
username | Username of the receiver |
payload | MQTT payload |
peerhost | client IPAddress |
topic | MQTT topic |
qos | Enumeration of message QoS 0,1,2 |
flags | Flags |
pub_props | The PUBLISH Properties (MQTT 5.0 only) |
puback_props | The PUBACK Properties (MQTT 5.0 only) |
timestamp | Event trigger time(millisecond) |
publish_received_at | Time when PUBLISH message reaches Broker (ms) |
node | Node name of the trigger event |
example
SELECT
from_clientid,
from_username,
topic,
qos,
node,
timestamp
FROM
"$events/message_acked"
output
{
"topic": "t/a",
"timestamp": 1645002965664,
"qos": 1,
"node": "emqx@127.0.0.1",
"from_username": "u_emqx_1",
"from_clientid": "c_emqx_1"
}
$events/message_dropped
Trigger rule when a message has no subscribers
Field | Explanation |
---|---|
id | MQTT message id |
reason | Reasons of dropping, possible reasons: no_subscribers: no clients subscribes the topic receive_maximum_exceeded: awaiting_rel queue is full packet_identifier_inuse: send a qos2 message with unreleased packet ID |
clientid | Client ID of the sender |
username | Username of the sender |
payload | MQTT payload |
peerhost | Client IPAddress |
topic | MQTT topic |
qos | Enumeration of message QoS 0,1,2 |
flags | Flags |
pub_props | The PUBLISH Properties (MQTT 5.0 only) |
timestamp | Event trigger time(millisecond) |
publish_received_at | Time when PUBLISH message reaches Broker (ms) |
node | Node name of the trigger event |
example
SELECT
reason,
topic,
qos,
node,
timestamp
FROM
"$events/message_dropped"
output
{
"topic": "t/a",
"timestamp": 1645003103004,
"reason": "no_subscribers",
"qos": 1,
"node": "emqx@127.0.0.1"
}
$events/delivery_dropped
Trigger rule when subscriber's message queue is full
Field | Explanation |
---|---|
id | MQTT message ID |
reason | Reasons of dropping, possible reasons: queue_full: the message queue is full(QoS>0) no_local: it's not allowed for the client to received messages published by themselves expired: the message or the session is expired qos0_msg: the message queue is full(QoS0) |
from_clientid | Client ID of the sender |
from_username | Username of the sender |
clientid | Client ID of the receiver |
username | Username of the receiver |
payload | MQTT payload |
peerhost | Client IPAddress |
topic | MQTT topic |
qos | Enumeration of message QoS 0,1,2 |
flags | Flags |
pub_props | The PUBLISH Properties (MQTT 5.0 only) |
timestamp | Event trigger time(millisecond) |
publish_received_at | Time when PUBLISH message reaches Broker (ms) |
node | Node name of the trigger event |
example
SELECT
from_clientid,
from_username,
reason,
topic,
qos
FROM "$events/delivery_dropped"
output
{
"topic": "t/a",
"reason": "queue_full",
"qos": 1,
"from_username": "u_emqx_1",
"from_clientid": "c_emqx_1"
}
$events/client_connected
Trigger the rule when the terminal is connected successfully
Field | Explanation |
---|---|
clientid | Client ID |
username | Current MQTT username |
mountpoint | Mountpoint for bridging messages |
peername | IPAddress and Port of terminal |
sockname | IPAddress and Port listened by emqx |
proto_name | Protocol name |
proto_ver | Protocol version |
keepalive | MQTT keepalive interval |
clean_start | MQTT clean_start |
expiry_interval | MQTT Session Expiration time |
is_bridge | Whether it is MQTT bridge connection |
connected_at | Client connected timestamp (millisecond) |
conn_props | The CONNECT Properties (MQTT 5.0 only) |
timestamp | Event trigger time(millisecond) |
node | Node name of the trigger event |
example
SELECT
clientid,
username,
keepalive,
is_bridge
FROM
"$events/client_connected"
output
{
"username": "u_emqx",
"keepalive": 60,
"is_bridge": false,
"clientid": "c_emqx"
}
$events/client_disconnected
Trigger rule when terminal connection is lost
Field | Explanation |
---|---|
reason | Reasons of disconnection of terminal normal:the client is actively disconnected kicked:the server kicks out, and it is kicked out through REST API keepalive_timeout: keepalive timeout not_authorized: auth failed,or acl_nomatch = disconnect , Pub/Sub without permission will disconnect the clienttcp_closed: the peer has closed the network connection discarded: another client connected with the same ClientID and set clean_start = true takeovered: another client connected with the same ClientID and set clean_start = false internal_error: malformed message or other unknown errors |
clientid | Client ID |
username | Current MQTT username |
peername | IPAddress and Port of terminal |
sockname | IPAddress and Port listened by emqx |
disconnected_at | Client disconnected timestamp (millisecond) |
disconn_props | The DISCONNECT Properties (MQTT 5.0 only) |
timestamp | Event trigger time(millisecond) |
node | Node name of the trigger event |
example
SELECT
clientid,
username,
reason,
disconnected_at,
node
FROM
"$events/client_disconnected"
output
{
"username": "u_emqx",
"reason": "normal",
"node": "emqx@127.0.0.1",
"disconnected_at": 1645003578536,
"clientid": "c_emqx"
}
$events/session_subscribed
Trigger the rule when the terminal subscribes successfully
Field | Explanation |
---|---|
clientid | Client ID |
username | Current MQTT username |
peerhost | Client IPAddress |
topic | MQTT topic |
qos | Enumeration of message QoS 0,1,2 |
sub_props | The SUBSCRIBE Properties (MQTT 5.0 only) |
timestamp | Event trigger time(millisecond) |
node | Node name of the trigger event |
example
SELECT
clientid,
username,
topic,
qos
FROM
"$events/session_subscribed"
output
{
"username": "u_emqx",
"topic": "t/a",
"qos": 1,
"clientid": "c_emqx"
}
$events/session_unsubscribed
Triggered when the terminal subscription is cancelled successfully
Field | Explanation |
---|---|
clientid | Client ID |
username | Current MQTT username |
peerhost | Client IPAddress |
topic | MQTT topic |
qos | Enumeration of message QoS 0,1,2 |
unsub_props | The UNSUBSCRIBE Properties (MQTT 5.0 only) |
timestamp | Event trigger time(millisecond) |
node | Node name of the trigger event |
example
SELECT
clientid,
username,
topic,
qos
FROM
"$events/session_unsubscribed"
output
{
"username": "u_emqx",
"topic": "t/a",
"qos": 1,
"clientid": "c_emqx"
}