Build MQTT full-link observability: An event audit solution based on EMQX Event Topics and Tables
In production environments, IoT teams often face the following issues:
- Frequent device disconnections
- Client authentication failures
- Topic ACL denials
- Messages not delivered successfully
- Abnormal message drops
Traditional logs are usually distributed across the broker, authentication system, and business system. When troubleshooting issues, teams need to correlate data across multiple systems, which is inefficient.
With EMQX Event Topics, Rule Engine, and Tables, you can store client connection, authentication, authorization, subscription, and message flow events in a unified way. This enables MQTT full-link event auditing and issue traceability.
Solution objectives
Build a unified MQTT full-link event logging platform based on EMQX Event Topics, Rule Engine, and Tables.
This solution implements:
- Client lifecycle tracking
- MQTT authentication auditing
- MQTT authorization auditing
- MQTT message flow tracing
- Long-term event storage
- SQL-based query and analysis
Architecture design
EMQX Event Topics
|
v
Rule Engine
|
v
Single Action
|
v
EMQX Tables
|
v
SQL QueryCharacteristics:
- Single Rule
- Single Action
- Single wide table
- Long-term storage
- SQL query
Tables schema
Table name:
emqx_full_link_eventsCreate table statement:
CREATE TABLE emqx_full_link_events (
"timestamp" TIMESTAMP TIME INDEX,
"event" STRING,
"clientid" STRING SKIPPING INDEX,
"username" STRING SKIPPING INDEX,
"peername" STRING SKIPPING INDEX,
"sockname" STRING,
"node" STRING,
"topic" STRING,
"id" STRING,
"from_clientid" STRING,
"from_username" STRING,
"qos" BIGINT,
"payload" STRING,
"reason" STRING,
"reason_code" STRING,
"action" STRING,
"result" STRING,
"authz_source" STRING,
"keepalive" BIGINT,
"clean_start" BOOLEAN,
"expiry_interval" BIGINT,
"connected_at" BIGINT,
"disconnected_at" BIGINT,
"publish_received_at" BIGINT,
"client_attrs" STRING
)
WITH (
'append_mode'='true',
'ttl'='7d'
);Table parameter description
This solution uses the following table parameters:
WITH (
'append_mode'='true',
'ttl'='7d'
);append_mode='true'
MQTT events are typical time-series audit data. Client connections, authentication, authorization, subscriptions, and message delivery continuously generate new event records.
After Append Mode is enabled:
- All events are written in append mode.
- Historical records are not updated.
- The complete event flow is retained.
- Event auditing and issue traceability are supported.
Therefore, it is suitable for MQTT full-link event storage scenarios.
ttl='7d'
Event Topics continuously generate large amounts of event data.
To prevent unlimited data growth, this example sets:
ttl='7d'This means only event data from the most recent 7 days is retained. Data beyond the retention period is automatically cleaned up.
In production environments, you can adjust the retention period based on business requirements.
Tables connector configuration
First, create a Connector for writing data to EMQX Tables.
Create a Tables connector
Go to Data Integration -> Connectors -> New Connector -> EMQX Tables.
Enter the connection information for the corresponding Tables Deployment and complete the creation.
Rule configuration
Collect all events in a unified way:
SELECT *
FROM "$events/#"Advantages:
- A single Rule covers all Event Topics.
- No need to maintain multiple Rules.
- Unified collection entry point.
Action configuration
emqx_full_link_events clientid=${clientid},event=${event},topic=${topic},username=${username},node=${node},peername=${peername},sockname=${sockname},qos=${qos}i,id=${id},payload=${payload},reason=${reason},reason_code=${reason_code},result=${result},from_clientid=${from_clientid},from_username=${from_username},action=${action},authz_source=${authz_source},keepalive=${keepalive}i,clean_start=${clean_start},expiry_interval=${expiry_interval}i,connected_at=${connected_at}i,disconnected_at=${disconnected_at}i,publish_received_at=${publish_received_at}i,client_attrs=${client_attrs} ${timestamp}Index design
clientid STRING SKIPPING INDEX
username STRING SKIPPING INDEX
peername STRING SKIPPING INDEX| Field | Purpose |
|---|---|
clientid | Query the complete flow of a device |
username | Query user behavior |
peername | Query the source IP address |
Typical troubleshooting scenarios
Different Event Topics expose different fields. Therefore, it is normal for some fields to be empty. This does not affect issue traceability and troubleshooting.
This solution uses a single wide table to store client connection, authentication, authorization, subscription, and message flow events in a unified way. This makes it easy to complete full-link troubleshooting analysis with a single SQL query.
Scenario 1: Troubleshoot the client lifecycle
Applicable issues:
- Did the device connect to the broker successfully?
- Why did the device go offline?
- Did the device complete authentication?
- Did the device subscribe to the topic successfully?
Related events:
| Event name | Event | Description |
|---|---|---|
| Authentication event | client.check_authn_complete | Client authentication completed |
| Connection event | client.connected | Client connected successfully |
| Connection acknowledgment event | client.connack | Broker returned the connection acknowledgment |
| Subscription event | session.subscribed | Client subscribed successfully |
| Offline event | client.disconnected | Client disconnected |
Key fields:
| Field | Description |
|---|---|
clientid | Client ID |
username | Username |
peername | Client source address |
connected_at | Online time |
disconnected_at | Offline time |
reason | Offline reason |
reason_code | Authentication result |
Query example:
SELECT *
FROM emqx_full_link_events
WHERE clientid='device001'
AND event IN (
'client.check_authn_complete',
'client.connected',
'client.connack',
'session.subscribed',
'client.disconnected'
)
ORDER BY timestamp ASC;Scenario 2: Troubleshoot authentication and authorization
Applicable issues:
- Why did the device fail to connect?
- Are the username and password correct?
- Why did the topic subscription fail?
- Why did ACL reject the request?
Related events:
| Event name | Event | Description |
|---|---|---|
| Authentication event | client.check_authn_complete | User authentication result |
| Authorization event | client.check_authz_complete | ACL check result |
Key fields:
| Field | Description |
|---|---|
reason_code | Authentication result |
action | publish / subscribe |
result | allow / deny |
authz_source | ACL source |
Common result description:
| Field | Example value | Description |
|---|---|---|
reason_code | success | Authentication succeeded |
reason_code | bad_username_or_password | Username or password is incorrect |
result | allow | Access is allowed |
result | deny | Access is denied |
Query authentication failures:
SELECT *
FROM emqx_full_link_events
WHERE event='client.check_authn_complete'
AND reason_code<>'success'
ORDER BY timestamp DESC;Query ACL denials:
SELECT *
FROM emqx_full_link_events
WHERE event='client.check_authz_complete'
AND result='deny'
ORDER BY timestamp DESC;Scenario 3: Analyze message delivery
Applicable issues:
- Was the message delivered successfully?
- Which client received the message?
- Was the ACK received?
Related events:
| Event name | Event | Description |
|---|---|---|
| Message delivery event | message.delivered | Message delivered successfully |
| Message acknowledgment event | message.acked | ACK received |
Key fields:
| Field | Description |
|---|---|
topic | MQTT Topic |
payload | Message content |
qos | QoS |
from_clientid | Publishing client |
clientid | Receiving client |
Query example:
SELECT *
FROM emqx_full_link_events
WHERE event IN (
'message.delivered',
'message.acked'
)
ORDER BY timestamp DESC;Scenario 4: Analyze message drops
Applicable issues:
- Why did the message not arrive?
- Why was the message dropped by the broker?
Related events:
| Event name | Event | Description |
|---|---|---|
| Message dropped event | message.dropped | Message dropped during forwarding |
| Message delivery dropped event | message.delivery_dropped | Message dropped during delivery |
Key fields:
| Field | Description |
|---|---|
topic | Topic |
payload | Message content |
clientid | Client |
reason | Drop reason |
Common reason description:
| Reason | Description |
|---|---|
no_subscribers | No subscribers |
queue_full | Queue is full |
expired | Message expired |
acl_denied | ACL denied |
disconnected | Client is offline |
Query publishing drops:
SELECT *
FROM emqx_full_link_events
WHERE event='message.dropped'
ORDER BY timestamp DESC;Query delivery drops:
SELECT *
FROM emqx_full_link_events
WHERE event='message.delivery_dropped'
ORDER BY timestamp DESC;Common query templates
Query by client ID
Query all events of a specified client:
SELECT *
FROM emqx_full_link_events
WHERE clientid='device009'
ORDER BY timestamp DESC;Query by username
Query all events of a specified user:
SELECT *
FROM emqx_full_link_events
WHERE username='test'
ORDER BY timestamp DESC;Query by source IP address
Analyze client behavior from a specified source address:
SELECT *
FROM emqx_full_link_events
WHERE peername LIKE '78.234.%'
ORDER BY timestamp DESC;Query by topic
Analyze message and permission behavior for a specified topic:
SELECT *
FROM emqx_full_link_events
WHERE topic='factory/device/status'
ORDER BY timestamp DESC;Query by event type
Analyze a specified event:
SELECT *
FROM emqx_full_link_events
WHERE event='message.dropped'
ORDER BY timestamp DESC;Summary
By combining Event Topics, Rule Engine, and Tables, you can build a lightweight MQTT full-link event observability platform.
This solution uses a single Rule, a single Action, and a single wide table to store client connection, authentication, authorization, subscription, and message flow events in a unified way. It implements:
- Client lifecycle tracking
- MQTT authentication auditing
- MQTT authorization auditing
- MQTT message flow analysis
- Message drop root cause analysis
- Source IP behavior analysis
- Long-term event retention
- SQL-based query and analysis
- Grafana visualization
With the single wide table design, client connection, authentication, authorization, subscription, and message events can be stored and queried through a unified table structure. This makes it easy to complete full-link troubleshooting analysis with a single SQL query.
This solution is suitable for:
- IoT device operations and maintenance
- MQTT platform operations
- Client behavior analysis
- Topic permission auditing
- Message delivery troubleshooting
- Long-term event retention and traceability
It helps operations, development, and business teams quickly locate issues, trace historical events, and analyze behavior, reducing troubleshooting costs and improving MQTT platform operations efficiency and observability.