Enhance Message Traceability with Event Topics and TimescaleDB Integration
EMQX Cloud can leverage event topics and rule-based data integration to build end-to-end message traceability across the full MQTT transmission path. This approach captures latency data from the moment a publisher sends a message, through broker-side processing, to the point where a subscriber acknowledges receipt.
This guide explains how to use EMQX Cloud with TimescaleDB to build this traceability workflow. The end result is a practical latency observation system that measures delays at the following stages:
- Publisher to broker
- Broker-side processing
- Broker to subscriber
- End-to-end total transmission
The implementation stores trace events in TimescaleDB and uses SQL queries to inspect raw records, validate data integrity, and calculate per-stage average latency.
Prerequisites
Before starting, ensure the following conditions are met:
- An EMQX v5 deployment is created and accessible.
- TimescaleDB is deployed and accessible.
- System clocks on the publisher and subscriber are synchronized.
- The publisher includes a
publish_attimestamp in the payload of each MQTT message. - QoS 1 or QoS 2 is used to enable capture of acknowledgment events.
- Timestamps are handled in a consistent timezone across clients and the database. UTC is recommended.
Timestamp Definitions
The tracing model uses the following timestamps:
publish_at: The client-side timestamp written into the MQTT payload by the publisher.publish_received_at: The time when EMQX receives the PUBLISH packet.message_delivered: The time when EMQX delivers the message to the subscriber side.message_acked: The time when EMQX receives the acknowledgment (ACK) from the subscriber.
These timestamps are used to calculate the latency at each stage of the transmission path.
Set Up the TimescaleDB Table
Create a table to store message trace records, then convert it into a TimescaleDB hypertable. Each message stage is stored as a separate row, enabling later correlation queries using msg_id.
CREATE EXTENSION IF NOT EXISTS timescaledb;
DROP TABLE IF EXISTS mqtt_message_traces;
CREATE TABLE mqtt_message_traces
(
event_at TIMESTAMPZ NOT NULL,
event character varying NOT NULL,
msg_id character varying NOT NULL,
publish_at bigint,
emqx_received_at bigint,
emqx_delivered_at bigint,
sub_ack_at bigint,
pub_clientid character varying,
sub_clientid character varying,
topic character varying NOT NULL,
qos integer NOT NULL,
payload text
);Convert the table into a hypertable using event_at as the time column:
SELECT create_hypertable(
'mqtt_message_traces',
'event_at',
chunk_time_interval => interval '12 hour',
if_not_exists => TRUE
);Create indexes to improve query performance:
- An index on the
eventcolumn for fast event type lookups. - A composite index on
msg_idand descendingevent_atfor tracking specific messages over time. - A composite index on
pub_clientidand descendingevent_atfor analyzing specific publisher behavior.
CREATE INDEX IF NOT EXISTS emqx_mqtt_message_traces_event_idx
ON mqtt_message_traces (event);
CREATE INDEX IF NOT EXISTS emqx_mqtt_message_traces_msg_id_event_at_idx
ON mqtt_message_traces (msg_id, event_at DESC);
CREATE INDEX IF NOT EXISTS emqx_mqtt_message_traces_pub_clientid_event_at_idx
ON mqtt_message_traces (pub_clientid, event_at DESC);Configure TimescaleDB Data Integration in EMQX Cloud
The traceability workflow uses three rules and three write actions. All three actions write to the same table, but each captures a different stage of the message lifecycle.
All three rules use the same correlation key:
id as msg_idThe same msg_id is used to correlate the following three events:
message.publishmessage.deliveredmessage.acked
Configure the Publish Trace Rule
The first rule records publish-stage data. It listens on a wildcard topic and captures regular messages on all business topics.
Rule SQL:
SELECT
timestamp div 1000 as event_at,
event,
id as msg_id,
int(payload.publish_at) as publish_at,
int(publish_received_at) as emqx_received_at,
clientid as pub_clientid,
topic,
qos,
payload
FROM "#"Action SQL:
INSERT INTO mqtt_message_traces(
event_at,
event,
msg_id,
publish_at,
emqx_received_at,
emqx_delivered_at,
sub_ack_at,
pub_clientid,
sub_clientid,
topic,
qos,
payload
) VALUES (
to_timestamp(${event_at}),
${event},
${msg_id},
${publish_at}::bigint,
${emqx_received_at}::bigint,
NULL,
NULL,
${pub_clientid},
NULL,
${topic},
${qos},
${payload}
)When the publisher sends a message to a business topic (for example, emqx/test), this rule writes a publish trace record to TimescaleDB.

Configure the Delivered Trace Rule
The second rule records the timestamp when EMQX delivers the message to the subscriber path.
Rule SQL:
SELECT
timestamp div 1000 as event_at,
event,
id as msg_id,
int(payload.publish_at) as publish_at,
int(publish_received_at) as emqx_received_at,
int(timestamp) as emqx_delivered_at,
from_clientid as pub_clientid,
clientid as sub_clientid,
topic,
qos,
payload
FROM "$events/message_delivered"Action SQL:
INSERT INTO mqtt_message_traces(
event_at,
event,
msg_id,
publish_at,
emqx_received_at,
emqx_delivered_at,
sub_ack_at,
pub_clientid,
sub_clientid,
topic,
qos,
payload
) VALUES (
to_timestamp(${event_at}),
${event},
${msg_id},
${publish_at}::bigint,
${emqx_received_at}::bigint,
${emqx_delivered_at}::bigint,
NULL,
${pub_clientid},
${sub_clientid},
${topic},
${qos},
${payload}
)This record is used to calculate broker-side processing latency and broker-to-subscriber latency.

Configure the Acked Trace Rule
The third rule records acknowledgment-stage data. This rule only fires when an acknowledgment event exists, so QoS 1 or QoS 2 is required.
Rule SQL:
SELECT
timestamp div 1000 as event_at,
event,
id as msg_id,
int(payload.publish_at) as publish_at,
int(publish_received_at) as emqx_received_at,
int(timestamp) as sub_ack_at,
from_clientid as pub_clientid,
clientid as sub_clientid,
topic,
qos,
payload
FROM "$events/message_acked"Action SQL:
INSERT INTO mqtt_message_traces(
event_at,
event,
msg_id,
publish_at,
emqx_received_at,
emqx_delivered_at,
sub_ack_at,
pub_clientid,
sub_clientid,
topic,
qos,
payload
) VALUES (
to_timestamp(${event_at}),
${event},
${msg_id},
${publish_at}::bigint,
${emqx_received_at}::bigint,
NULL,
${sub_ack_at}::bigint,
${pub_clientid},
${sub_clientid},
${topic},
${qos},
${payload}
)This record is used to calculate subscriber-side acknowledgment latency and end-to-end total latency.

Connector Configuration Notes
From a logical design perspective, all three actions can share a single TimescaleDB connector.
However, during sustained load testing on EMQX 5, sharing a single PostgreSQL/TimescaleDB connector was found to be unstable. The observable symptom is that one action accumulates write failures, reducing the number of complete samples available for correlation queries.
For stable operation, the recommended configuration is:
- A dedicated connector for the publish trace action
- A dedicated connector for the delivered trace action
- A dedicated connector for the acked trace action
This is an implementation-level stability choice and does not affect the table schema, rule SQL, action SQL, correlation logic, or latency calculation formulas.
Use Python SDK to Simulate Message Delivery
This section describes the publisher and subscriber behavior used for validation.
The publisher sends MQTT messages to a business topic (for example, emqx/test), with each payload containing a publish_at timestamp. The subscriber listens on the same topic and remains online throughout the message flow.
Install the Python dependency:
python3 -m pip install paho-mqttPublisher
The publisher needs to:
- Connect to EMQX
- Publish messages to the target topic
- Use QoS 1
- Include a
publish_atfield in each message payload
Example payload:
{"publish_at":1773579492999,"msg":10000}The following publisher script supports both fixed-interval sending and duration-based load testing at a specified TPS, suitable for both functional verification and sustained load testing.
#!/usr/bin/env python3
import argparse, json, random, time
from paho.mqtt import client as mqtt_client
parser = argparse.ArgumentParser()
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("--port", type=int, default=1883)
parser.add_argument("--topic", default="emqx/test")
parser.add_argument("--qos", type=int, default=1)
parser.add_argument("--count", type=int, default=20)
parser.add_argument("--interval-sec", type=float, default=1.0)
parser.add_argument("--tps", type=float)
parser.add_argument("--duration-sec", type=float)
parser.add_argument("--hold-sec", type=float, default=0.0)
args = parser.parse_args()
def on_connect(client, userdata, flags, reason_code, properties):
if reason_code != 0:
raise RuntimeError(f"connect failed: {reason_code}")
client = mqtt_client.Client(
mqtt_client.CallbackAPIVersion.VERSION2,
client_id=f"python-mqtt-pub-{random.randint(0, 100000)}",
)
client.on_connect = on_connect
client.connect(args.host, args.port)
client.loop_start()
count = 0
start = time.perf_counter()
if args.tps and args.duration_sec:
interval = 1.0 / args.tps
deadline = start + args.duration_sec
next_send = start
keep_sending = lambda now, sent: now < deadline
else:
interval = args.interval_sec
next_send = start + interval
keep_sending = lambda now, sent: sent < args.count
while keep_sending(time.perf_counter(), count):
now = time.perf_counter()
if now < next_send:
time.sleep(next_send - now)
payload = json.dumps({"publish_at": int(time.time() * 1000), "msg": count}, separators=(",", ":"))
result = client.publish(args.topic, payload, qos=args.qos)
if result.rc != 0:
raise RuntimeError(f"publish failed: {result.rc}")
count += 1
next_send += interval
publish_elapsed = time.perf_counter() - start
if args.hold_sec > 0:
time.sleep(args.hold_sec)
total_elapsed = time.perf_counter() - start
print(json.dumps({
"topic": args.topic,
"sent": count,
"publish_duration_sec": round(publish_elapsed, 3),
"total_duration_sec": round(total_elapsed, 3),
"hold_sec": args.hold_sec,
"target_tps": args.tps,
"actual_tps": round(count / publish_elapsed, 3) if publish_elapsed else 0.0,
}))
client.disconnect()
client.loop_stop()Example run:
python3 publisher.py \
--host 127.0.0.1 \
--port 1883 \
--topic emqx/test \
--qos 1 \
--tps 500 \
--duration-sec 20 \
--hold-sec 25Example output:
{"topic": "emqx/test", "sent": 10001, "publish_duration_sec": 20.0, "total_duration_sec": 45.004, "hold_sec": 25.0, "target_tps": 500.0, "actual_tps": 500.038}
Subscriber
The subscriber needs to:
- Connect to EMQX
- Subscribe to the same business topic
- Use QoS 1
- Remain connected longer than the publish phase
#!/usr/bin/env python3
import argparse, json, random, time
from paho.mqtt import client as mqtt_client
parser = argparse.ArgumentParser()
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("--port", type=int, default=1883)
parser.add_argument("--topic", default="emqx/test")
parser.add_argument("--qos", type=int, default=1)
parser.add_argument("--duration-sec", type=float)
args = parser.parse_args()
counter = {"received": 0}
def on_connect(client, userdata, flags, reason_code, properties):
if reason_code != 0:
raise RuntimeError(f"connect failed: {reason_code}")
client.subscribe(args.topic, qos=args.qos)
def on_message(client, userdata, msg):
counter["received"] += 1
client = mqtt_client.Client(
mqtt_client.CallbackAPIVersion.VERSION2,
client_id=f"python-mqtt-sub-{random.randint(0, 100000)}",
)
client.on_connect = on_connect
client.on_message = on_message
client.connect(args.host, args.port)
client.loop_start()
start = time.perf_counter()
while args.duration_sec is None or time.perf_counter() - start < args.duration_sec:
time.sleep(1)
elapsed = time.perf_counter() - start
print(json.dumps({
"topic": args.topic,
"received": counter["received"],
"duration_sec": round(elapsed, 3),
"recv_tps": round(counter["received"] / elapsed, 3) if elapsed else 0.0,
}))
client.disconnect()
client.loop_stop()Start the subscriber first:
python3 subscriber.py \
--host 127.0.0.1 \
--port 1883 \
--topic emqx/test \
--qos 1 \
--duration-sec 80Example output:
{"topic": "emqx/test", "received": 10001, "duration_sec": 80.268, "recv_tps": 124.596}
Recommended Run Order
Use two terminals:
- Start the subscriber first.
- Then start the publisher.
- Wait for both commands to complete.
- Query TimescaleDB after message delivery is complete.
View Message Latency
Once the publisher, subscriber, rules, and TimescaleDB actions are all running, use SQL to validate the trace records and calculate latency.
How to Calculate Delay
Latency formulas for each stage:
- Publisher to EMQX:
publish_received_at - publish_at - EMQX processing:
message_delivered - publish_received_at - EMQX to subscriber:
message_acked - message_delivered - End-to-end total:
message_acked - publish_at
TIP
- If a message lacks a
publish_attimestamp, publisher-to-broker latency cannot be calculated. - Without QoS 0,
message_ackedevents will not be generated, so subscriber-side latency cannot be calculated.
Query Examples
1. Count All Trace Records
SELECT COUNT(*) AS total_rows
FROM mqtt_message_traces;Example output:
total_rows
------------
300022. View Recent Publish Records
SELECT event_at, event, msg_id, publish_at, emqx_received_at, pub_clientid, topic, qos, payload
FROM mqtt_message_traces
WHERE event = 'message.publish'
ORDER BY event_at DESC
LIMIT 20;Use this query to confirm that publisher-side timestamps and broker receive timestamps are stored correctly.
3. View Recent Delivered Records
SELECT event_at, event, msg_id, publish_at, emqx_received_at, emqx_delivered_at, pub_clientid, sub_clientid, topic, qos
FROM mqtt_message_traces
WHERE event = 'message.delivered'
ORDER BY event_at DESC
LIMIT 20;Use this query to confirm that delivery-stage timestamps are captured and that both publisher and subscriber client IDs are visible.
4. View Recent Acked Records
SELECT event_at, event, msg_id, publish_at, emqx_received_at, sub_ack_at, pub_clientid, sub_clientid, topic, qos
FROM mqtt_message_traces
WHERE event = 'message.acked'
ORDER BY event_at DESC
LIMIT 20;Use this query to confirm that acknowledgment-stage data is present and can be used to calculate subscriber-side latency.
5. Count Records by Event Type
SELECT event, COUNT(*) AS row_count
FROM mqtt_message_traces
GROUP BY event
ORDER BY event;Example output:
event | row_count
-------------------+-----------
message.acked | 10001
message.delivered | 10000
message.publish | 10001Use this query for a quick integrity check on the trace dataset.
6. Calculate Average Latency per Stage for the Past Hour
WITH relevant_messages AS (
SELECT *
FROM mqtt_message_traces
WHERE event_at >= NOW() - INTERVAL '1 hour'
),
message_latency AS (
SELECT
t1.msg_id,
COALESCE(t2.sub_clientid, t3.sub_clientid) AS sub_clientid,
MAX(t1.publish_at) AS publish_at,
MAX(t1.emqx_received_at) AS publish_received_at,
MAX(t2.emqx_delivered_at) AS message_delivered,
MAX(t3.sub_ack_at) AS message_acked
FROM relevant_messages t1
JOIN relevant_messages t2
ON t1.msg_id = t2.msg_id
AND t2.event = 'message.delivered'
JOIN relevant_messages t3
ON t1.msg_id = t3.msg_id
AND t3.event = 'message.acked'
WHERE t1.event = 'message.publish'
GROUP BY t1.msg_id, COALESCE(t2.sub_clientid, t3.sub_clientid)
)
SELECT
COUNT(*) AS sample_count,
ROUND(AVG(publish_received_at - publish_at), 3) AS publisher_to_emqx_ms,
ROUND(AVG(message_delivered - publish_received_at), 3) AS emqx_processing_ms,
ROUND(AVG(message_acked - message_delivered), 3) AS emqx_to_subscriber_ms,
ROUND(AVG(message_acked - publish_at), 3) AS end_to_end_ms
FROM message_latency
WHERE publish_at IS NOT NULL
AND publish_received_at IS NOT NULL
AND message_delivered IS NOT NULL
AND message_acked IS NOT NULL;Example output:
sample_count | publisher_to_emqx_ms | emqx_processing_ms | emqx_to_subscriber_ms | end_to_end_ms
--------------+----------------------+--------------------+-----------------------+---------------
10000 | 5.891 | 0.073 | 0.273 | 6.236
In a healthy EMQX system, emqx_processing_ms (broker processing latency) is typically very low. The more significant contributors are the publish and consume phases — publisher_to_emqx_ms and emqx_to_subscriber_ms.
Query notes:
WHERE event_at >= NOW() - INTERVAL '1 hour'limits the query to a one-hour observation window, avoiding interference from historical data.relevant_messagesnarrows the dataset before the JOIN.COALESCE(t2.sub_clientid, t3.sub_clientid)ensures that subscriber identity is preserved in the grouped results.GROUP BY t1.msg_id, COALESCE(t2.sub_clientid, t3.sub_clientid)prevents records from different subscribers from being merged into a single aggregate row.MAX(...)collapses the timestamps from three event rows into a single aggregate row.- The final
WHERE ... IS NOT NULLensures that only records with a complete publish/delivered/acked chain are used to calculate latency.
7. Validate for Negative Latency Values
Run the following query before accepting latency results. Any non-zero result indicates a timestamp issue, clock synchronization problem, or field mapping error.
WITH relevant_messages AS (
SELECT *
FROM mqtt_message_traces
WHERE event_at >= NOW() - INTERVAL '1 hour'
),
message_latency AS (
SELECT
t1.msg_id,
COALESCE(t2.sub_clientid, t3.sub_clientid) AS sub_clientid,
MAX(t1.publish_at) AS publish_at,
MAX(t1.emqx_received_at) AS publish_received_at,
MAX(t2.emqx_delivered_at) AS message_delivered,
MAX(t3.sub_ack_at) AS message_acked
FROM relevant_messages t1
JOIN relevant_messages t2
ON t1.msg_id = t2.msg_id
AND t2.event = 'message.delivered'
JOIN relevant_messages t3
ON t1.msg_id = t3.msg_id
AND t3.event = 'message.acked'
WHERE t1.event = 'message.publish'
GROUP BY t1.msg_id, COALESCE(t2.sub_clientid, t3.sub_clientid)
)
SELECT
COUNT(*) FILTER (WHERE publish_received_at - publish_at < 0) AS negative_client_to_emqx,
COUNT(*) FILTER (WHERE message_delivered - publish_received_at < 0) AS negative_emqx_processing,
COUNT(*) FILTER (WHERE message_acked - message_delivered < 0) AS negative_emqx_to_subscriber,
COUNT(*) FILTER (WHERE message_acked - publish_at < 0) AS negative_end_to_end
FROM message_latency
WHERE publish_at IS NOT NULL
AND publish_received_at IS NOT NULL
AND message_delivered IS NOT NULL
AND message_acked IS NOT NULL;Example output:
negative_client_to_emqx | negative_emqx_processing | negative_emqx_to_subscriber | negative_end_to_end
-------------------------+--------------------------+-----------------------------+---------------------
0 | 0 | 0 | 0All four values must be 0 before the results can be considered valid. If any count is non-zero, check the timestamp mappings and clock synchronization configuration before using the results as final conclusions.
Result Validation
Before publishing final latency results, verify all of the following:
- Trace records exist for each event type.
- Key timestamp fields are populated.
- The average latency query returns a meaningful
sample_count. - The negative-value validation query returns 0 for all four checks.
- Action metrics show no abnormal failure counts.
Example action metrics from a successful validation run:
publish trace action:
matched = 10001
success = 10001
failed = 0
delivered trace action:
matched = 10001
success = 10000
failed = 1
acked trace action:
matched = 10001
success = 10001
failed = 0If any action has a non-zero failure count, or if any negative latency count is non-zero, fix the data path before using the results as final conclusions.