Skip to content

Enhance Message Traceability with Event Topics and TimescaleDB Integration

EMQX Cloud can leverage event topics and rule-based data integration to build end-to-end message traceability across the full MQTT transmission path. This approach captures latency data from the moment a publisher sends a message, through broker-side processing, to the point where a subscriber acknowledges receipt.

This guide explains how to use EMQX Cloud with TimescaleDB to build this traceability workflow. The end result is a practical latency observation system that measures delays at the following stages:

  • Publisher to broker
  • Broker-side processing
  • Broker to subscriber
  • End-to-end total transmission

The implementation stores trace events in TimescaleDB and uses SQL queries to inspect raw records, validate data integrity, and calculate per-stage average latency.

Prerequisites

Before starting, ensure the following conditions are met:

  • An EMQX v5 deployment is created and accessible.
  • TimescaleDB is deployed and accessible.
  • System clocks on the publisher and subscriber are synchronized.
  • The publisher includes a publish_at timestamp in the payload of each MQTT message.
  • QoS 1 or QoS 2 is used to enable capture of acknowledgment events.
  • Timestamps are handled in a consistent timezone across clients and the database. UTC is recommended.

Timestamp Definitions

The tracing model uses the following timestamps:

  • publish_at: The client-side timestamp written into the MQTT payload by the publisher.
  • publish_received_at: The time when EMQX receives the PUBLISH packet.
  • message_delivered: The time when EMQX delivers the message to the subscriber side.
  • message_acked: The time when EMQX receives the acknowledgment (ACK) from the subscriber.

These timestamps are used to calculate the latency at each stage of the transmission path.

Set Up the TimescaleDB Table

Create a table to store message trace records, then convert it into a TimescaleDB hypertable. Each message stage is stored as a separate row, enabling later correlation queries using msg_id.

sql
CREATE EXTENSION IF NOT EXISTS timescaledb;
DROP TABLE IF EXISTS mqtt_message_traces;
CREATE TABLE mqtt_message_traces
(
    event_at          TIMESTAMPZ         NOT NULL,
    event             character varying NOT NULL,
    msg_id            character varying NOT NULL,
    publish_at        bigint,
    emqx_received_at  bigint,
    emqx_delivered_at bigint,
    sub_ack_at        bigint,
    pub_clientid      character varying,
    sub_clientid      character varying,
    topic             character varying NOT NULL,
    qos               integer           NOT NULL,
    payload           text
);

Convert the table into a hypertable using event_at as the time column:

sql
SELECT create_hypertable(
           'mqtt_message_traces',
           'event_at',
           chunk_time_interval => interval '12 hour',
           if_not_exists => TRUE
       );

Create indexes to improve query performance:

  • An index on the event column for fast event type lookups.
  • A composite index on msg_id and descending event_at for tracking specific messages over time.
  • A composite index on pub_clientid and descending event_at for analyzing specific publisher behavior.
sql
CREATE INDEX IF NOT EXISTS emqx_mqtt_message_traces_event_idx
    ON mqtt_message_traces (event);
CREATE INDEX IF NOT EXISTS emqx_mqtt_message_traces_msg_id_event_at_idx
    ON mqtt_message_traces (msg_id, event_at DESC);
CREATE INDEX IF NOT EXISTS emqx_mqtt_message_traces_pub_clientid_event_at_idx
    ON mqtt_message_traces (pub_clientid, event_at DESC);

Configure TimescaleDB Data Integration in EMQX Cloud

The traceability workflow uses three rules and three write actions. All three actions write to the same table, but each captures a different stage of the message lifecycle.

All three rules use the same correlation key:

id as msg_id

The same msg_id is used to correlate the following three events:

  • message.publish
  • message.delivered
  • message.acked

Configure the Publish Trace Rule

The first rule records publish-stage data. It listens on a wildcard topic and captures regular messages on all business topics.

Rule SQL:

sql
SELECT
  timestamp div 1000 as event_at,
  event,
  id as msg_id,
  int(payload.publish_at) as publish_at,
  int(publish_received_at) as emqx_received_at,
  clientid as pub_clientid,
  topic,
  qos,
  payload
FROM "#"

Action SQL:

sql
INSERT INTO mqtt_message_traces(
  event_at,
  event,
  msg_id,
  publish_at,
  emqx_received_at,
  emqx_delivered_at,
  sub_ack_at,
  pub_clientid,
  sub_clientid,
  topic,
  qos,
  payload
) VALUES (
  to_timestamp(${event_at}),
  ${event},
  ${msg_id},
  ${publish_at}::bigint,
  ${emqx_received_at}::bigint,
  NULL,
  NULL,
  ${pub_clientid},
  NULL,
  ${topic},
  ${qos},
  ${payload}
)

When the publisher sends a message to a business topic (for example, emqx/test), this rule writes a publish trace record to TimescaleDB.

action-view

Configure the Delivered Trace Rule

The second rule records the timestamp when EMQX delivers the message to the subscriber path.

Rule SQL:

sql
SELECT
  timestamp div 1000 as event_at,
  event,
  id as msg_id,
  int(payload.publish_at) as publish_at,
  int(publish_received_at) as emqx_received_at,
  int(timestamp) as emqx_delivered_at,
  from_clientid as pub_clientid,
  clientid as sub_clientid,
  topic,
  qos,
  payload
FROM "$events/message_delivered"

Action SQL:

sql
INSERT INTO mqtt_message_traces(
  event_at,
  event,
  msg_id,
  publish_at,
  emqx_received_at,
  emqx_delivered_at,
  sub_ack_at,
  pub_clientid,
  sub_clientid,
  topic,
  qos,
  payload
) VALUES (
  to_timestamp(${event_at}),
  ${event},
  ${msg_id},
  ${publish_at}::bigint,
  ${emqx_received_at}::bigint,
  ${emqx_delivered_at}::bigint,
  NULL,
  ${pub_clientid},
  ${sub_clientid},
  ${topic},
  ${qos},
  ${payload}
)

This record is used to calculate broker-side processing latency and broker-to-subscriber latency.

rule_2nd

Configure the Acked Trace Rule

The third rule records acknowledgment-stage data. This rule only fires when an acknowledgment event exists, so QoS 1 or QoS 2 is required.

Rule SQL:

sql
SELECT
  timestamp div 1000 as event_at,
  event,
  id as msg_id,
  int(payload.publish_at) as publish_at,
  int(publish_received_at) as emqx_received_at,
  int(timestamp) as sub_ack_at,
  from_clientid as pub_clientid,
  clientid as sub_clientid,
  topic,
  qos,
  payload
FROM "$events/message_acked"

Action SQL:

sql
INSERT INTO mqtt_message_traces(
  event_at,
  event,
  msg_id,
  publish_at,
  emqx_received_at,
  emqx_delivered_at,
  sub_ack_at,
  pub_clientid,
  sub_clientid,
  topic,
  qos,
  payload
) VALUES (
  to_timestamp(${event_at}),
  ${event},
  ${msg_id},
  ${publish_at}::bigint,
  ${emqx_received_at}::bigint,
  NULL,
  ${sub_ack_at}::bigint,
  ${pub_clientid},
  ${sub_clientid},
  ${topic},
  ${qos},
  ${payload}
)

This record is used to calculate subscriber-side acknowledgment latency and end-to-end total latency.

rule_3rd

Connector Configuration Notes

From a logical design perspective, all three actions can share a single TimescaleDB connector.

However, during sustained load testing on EMQX 5, sharing a single PostgreSQL/TimescaleDB connector was found to be unstable. The observable symptom is that one action accumulates write failures, reducing the number of complete samples available for correlation queries.

For stable operation, the recommended configuration is:

  • A dedicated connector for the publish trace action
  • A dedicated connector for the delivered trace action
  • A dedicated connector for the acked trace action

This is an implementation-level stability choice and does not affect the table schema, rule SQL, action SQL, correlation logic, or latency calculation formulas.

Use Python SDK to Simulate Message Delivery

This section describes the publisher and subscriber behavior used for validation.

The publisher sends MQTT messages to a business topic (for example, emqx/test), with each payload containing a publish_at timestamp. The subscriber listens on the same topic and remains online throughout the message flow.

Install the Python dependency:

bash
python3 -m pip install paho-mqtt

Publisher

The publisher needs to:

  • Connect to EMQX
  • Publish messages to the target topic
  • Use QoS 1
  • Include a publish_at field in each message payload

Example payload:

json
{"publish_at":1773579492999,"msg":10000}

The following publisher script supports both fixed-interval sending and duration-based load testing at a specified TPS, suitable for both functional verification and sustained load testing.

python
#!/usr/bin/env python3
import argparse, json, random, time
from paho.mqtt import client as mqtt_client
parser = argparse.ArgumentParser()
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("--port", type=int, default=1883)
parser.add_argument("--topic", default="emqx/test")
parser.add_argument("--qos", type=int, default=1)
parser.add_argument("--count", type=int, default=20)
parser.add_argument("--interval-sec", type=float, default=1.0)
parser.add_argument("--tps", type=float)
parser.add_argument("--duration-sec", type=float)
parser.add_argument("--hold-sec", type=float, default=0.0)
args = parser.parse_args()
def on_connect(client, userdata, flags, reason_code, properties):
    if reason_code != 0:
        raise RuntimeError(f"connect failed: {reason_code}")
client = mqtt_client.Client(
    mqtt_client.CallbackAPIVersion.VERSION2,
    client_id=f"python-mqtt-pub-{random.randint(0, 100000)}",
)
client.on_connect = on_connect
client.connect(args.host, args.port)
client.loop_start()
count = 0
start = time.perf_counter()
if args.tps and args.duration_sec:
    interval = 1.0 / args.tps
    deadline = start + args.duration_sec
    next_send = start
    keep_sending = lambda now, sent: now < deadline
else:
    interval = args.interval_sec
    next_send = start + interval
    keep_sending = lambda now, sent: sent < args.count
while keep_sending(time.perf_counter(), count):
    now = time.perf_counter()
    if now < next_send:
        time.sleep(next_send - now)
    payload = json.dumps({"publish_at": int(time.time() * 1000), "msg": count}, separators=(",", ":"))
    result = client.publish(args.topic, payload, qos=args.qos)
    if result.rc != 0:
        raise RuntimeError(f"publish failed: {result.rc}")
    count += 1
    next_send += interval
publish_elapsed = time.perf_counter() - start
if args.hold_sec > 0:
    time.sleep(args.hold_sec)
total_elapsed = time.perf_counter() - start
print(json.dumps({
    "topic": args.topic,
    "sent": count,
    "publish_duration_sec": round(publish_elapsed, 3),
    "total_duration_sec": round(total_elapsed, 3),
    "hold_sec": args.hold_sec,
    "target_tps": args.tps,
    "actual_tps": round(count / publish_elapsed, 3) if publish_elapsed else 0.0,
}))
client.disconnect()
client.loop_stop()

Example run:

bash
python3 publisher.py \
  --host 127.0.0.1 \
  --port 1883 \
  --topic emqx/test \
  --qos 1 \
  --tps 500 \
  --duration-sec 20 \
  --hold-sec 25

Example output:

text
{"topic": "emqx/test", "sent": 10001, "publish_duration_sec": 20.0, "total_duration_sec": 45.004, "hold_sec": 25.0, "target_tps": 500.0, "actual_tps": 500.038}
python_1

Subscriber

The subscriber needs to:

  • Connect to EMQX
  • Subscribe to the same business topic
  • Use QoS 1
  • Remain connected longer than the publish phase
python
#!/usr/bin/env python3
import argparse, json, random, time
from paho.mqtt import client as mqtt_client
parser = argparse.ArgumentParser()
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("--port", type=int, default=1883)
parser.add_argument("--topic", default="emqx/test")
parser.add_argument("--qos", type=int, default=1)
parser.add_argument("--duration-sec", type=float)
args = parser.parse_args()
counter = {"received": 0}
def on_connect(client, userdata, flags, reason_code, properties):
    if reason_code != 0:
        raise RuntimeError(f"connect failed: {reason_code}")
    client.subscribe(args.topic, qos=args.qos)
def on_message(client, userdata, msg):
    counter["received"] += 1
client = mqtt_client.Client(
    mqtt_client.CallbackAPIVersion.VERSION2,
    client_id=f"python-mqtt-sub-{random.randint(0, 100000)}",
)
client.on_connect = on_connect
client.on_message = on_message
client.connect(args.host, args.port)
client.loop_start()
start = time.perf_counter()
while args.duration_sec is None or time.perf_counter() - start < args.duration_sec:
    time.sleep(1)
elapsed = time.perf_counter() - start
print(json.dumps({
    "topic": args.topic,
    "received": counter["received"],
    "duration_sec": round(elapsed, 3),
    "recv_tps": round(counter["received"] / elapsed, 3) if elapsed else 0.0,
}))
client.disconnect()
client.loop_stop()

Start the subscriber first:

bash
python3 subscriber.py \
  --host 127.0.0.1 \
  --port 1883 \
  --topic emqx/test \
  --qos 1 \
  --duration-sec 80

Example output:

text
{"topic": "emqx/test", "received": 10001, "duration_sec": 80.268, "recv_tps": 124.596}
python_2

Use two terminals:

  1. Start the subscriber first.
  2. Then start the publisher.
  3. Wait for both commands to complete.
  4. Query TimescaleDB after message delivery is complete.

View Message Latency

Once the publisher, subscriber, rules, and TimescaleDB actions are all running, use SQL to validate the trace records and calculate latency.

How to Calculate Delay

Latency formulas for each stage:

  • Publisher to EMQX: publish_received_at - publish_at
  • EMQX processing: message_delivered - publish_received_at
  • EMQX to subscriber: message_acked - message_delivered
  • End-to-end total: message_acked - publish_at

TIP

  • If a message lacks a publish_at timestamp, publisher-to-broker latency cannot be calculated.
  • Without QoS 0, message_acked events will not be generated, so subscriber-side latency cannot be calculated.

Query Examples

1. Count All Trace Records

sql
SELECT COUNT(*) AS total_rows
FROM mqtt_message_traces;

Example output:

text
 total_rows
------------
      30002

2. View Recent Publish Records

sql
SELECT event_at, event, msg_id, publish_at, emqx_received_at, pub_clientid, topic, qos, payload
FROM mqtt_message_traces
WHERE event = 'message.publish'
ORDER BY event_at DESC
LIMIT 20;

Use this query to confirm that publisher-side timestamps and broker receive timestamps are stored correctly.

3. View Recent Delivered Records

sql
SELECT event_at, event, msg_id, publish_at, emqx_received_at, emqx_delivered_at, pub_clientid, sub_clientid, topic, qos
FROM mqtt_message_traces
WHERE event = 'message.delivered'
ORDER BY event_at DESC
LIMIT 20;

Use this query to confirm that delivery-stage timestamps are captured and that both publisher and subscriber client IDs are visible.

4. View Recent Acked Records

sql
SELECT event_at, event, msg_id, publish_at, emqx_received_at, sub_ack_at, pub_clientid, sub_clientid, topic, qos
FROM mqtt_message_traces
WHERE event = 'message.acked'
ORDER BY event_at DESC
LIMIT 20;

Use this query to confirm that acknowledgment-stage data is present and can be used to calculate subscriber-side latency.

5. Count Records by Event Type

sql
SELECT event, COUNT(*) AS row_count
FROM mqtt_message_traces
GROUP BY event
ORDER BY event;

Example output:

text
       event       | row_count
-------------------+-----------
 message.acked     |     10001
 message.delivered |     10000
 message.publish   |     10001

Use this query for a quick integrity check on the trace dataset.

6. Calculate Average Latency per Stage for the Past Hour

sql
WITH relevant_messages AS (
  SELECT *
  FROM mqtt_message_traces
  WHERE event_at >= NOW() - INTERVAL '1 hour'
),
message_latency AS (
  SELECT
    t1.msg_id,
    COALESCE(t2.sub_clientid, t3.sub_clientid) AS sub_clientid,
    MAX(t1.publish_at) AS publish_at,
    MAX(t1.emqx_received_at) AS publish_received_at,
    MAX(t2.emqx_delivered_at) AS message_delivered,
    MAX(t3.sub_ack_at) AS message_acked
  FROM relevant_messages t1
  JOIN relevant_messages t2
    ON t1.msg_id = t2.msg_id
   AND t2.event = 'message.delivered'
  JOIN relevant_messages t3
    ON t1.msg_id = t3.msg_id
   AND t3.event = 'message.acked'
  WHERE t1.event = 'message.publish'
  GROUP BY t1.msg_id, COALESCE(t2.sub_clientid, t3.sub_clientid)
)
SELECT
  COUNT(*) AS sample_count,
  ROUND(AVG(publish_received_at - publish_at), 3) AS publisher_to_emqx_ms,
  ROUND(AVG(message_delivered - publish_received_at), 3) AS emqx_processing_ms,
  ROUND(AVG(message_acked - message_delivered), 3) AS emqx_to_subscriber_ms,
  ROUND(AVG(message_acked - publish_at), 3) AS end_to_end_ms
FROM message_latency
WHERE publish_at IS NOT NULL
  AND publish_received_at IS NOT NULL
  AND message_delivered IS NOT NULL
  AND message_acked IS NOT NULL;

Example output:

text
 sample_count | publisher_to_emqx_ms | emqx_processing_ms | emqx_to_subscriber_ms | end_to_end_ms
--------------+----------------------+--------------------+-----------------------+---------------
        10000 |                5.891 |              0.073 |                 0.273 |         6.236

all_stage_delay

In a healthy EMQX system, emqx_processing_ms (broker processing latency) is typically very low. The more significant contributors are the publish and consume phases — publisher_to_emqx_ms and emqx_to_subscriber_ms.

Query notes:

  • WHERE event_at >= NOW() - INTERVAL '1 hour' limits the query to a one-hour observation window, avoiding interference from historical data.
  • relevant_messages narrows the dataset before the JOIN.
  • COALESCE(t2.sub_clientid, t3.sub_clientid) ensures that subscriber identity is preserved in the grouped results.
  • GROUP BY t1.msg_id, COALESCE(t2.sub_clientid, t3.sub_clientid) prevents records from different subscribers from being merged into a single aggregate row.
  • MAX(...) collapses the timestamps from three event rows into a single aggregate row.
  • The final WHERE ... IS NOT NULL ensures that only records with a complete publish/delivered/acked chain are used to calculate latency.

7. Validate for Negative Latency Values

Run the following query before accepting latency results. Any non-zero result indicates a timestamp issue, clock synchronization problem, or field mapping error.

sql
WITH relevant_messages AS (
  SELECT *
  FROM mqtt_message_traces
  WHERE event_at >= NOW() - INTERVAL '1 hour'
),
message_latency AS (
  SELECT
    t1.msg_id,
    COALESCE(t2.sub_clientid, t3.sub_clientid) AS sub_clientid,
    MAX(t1.publish_at) AS publish_at,
    MAX(t1.emqx_received_at) AS publish_received_at,
    MAX(t2.emqx_delivered_at) AS message_delivered,
    MAX(t3.sub_ack_at) AS message_acked
  FROM relevant_messages t1
  JOIN relevant_messages t2
    ON t1.msg_id = t2.msg_id
   AND t2.event = 'message.delivered'
  JOIN relevant_messages t3
    ON t1.msg_id = t3.msg_id
   AND t3.event = 'message.acked'
  WHERE t1.event = 'message.publish'
  GROUP BY t1.msg_id, COALESCE(t2.sub_clientid, t3.sub_clientid)
)
SELECT
  COUNT(*) FILTER (WHERE publish_received_at - publish_at < 0) AS negative_client_to_emqx,
  COUNT(*) FILTER (WHERE message_delivered - publish_received_at < 0) AS negative_emqx_processing,
  COUNT(*) FILTER (WHERE message_acked - message_delivered < 0) AS negative_emqx_to_subscriber,
  COUNT(*) FILTER (WHERE message_acked - publish_at < 0) AS negative_end_to_end
FROM message_latency
WHERE publish_at IS NOT NULL
  AND publish_received_at IS NOT NULL
  AND message_delivered IS NOT NULL
  AND message_acked IS NOT NULL;

Example output:

text
 negative_client_to_emqx | negative_emqx_processing | negative_emqx_to_subscriber | negative_end_to_end
-------------------------+--------------------------+-----------------------------+---------------------
                       0 |                        0 |                           0 |                   0

All four values must be 0 before the results can be considered valid. If any count is non-zero, check the timestamp mappings and clock synchronization configuration before using the results as final conclusions.

Result Validation

Before publishing final latency results, verify all of the following:

  • Trace records exist for each event type.
  • Key timestamp fields are populated.
  • The average latency query returns a meaningful sample_count.
  • The negative-value validation query returns 0 for all four checks.
  • Action metrics show no abnormal failure counts.

Example action metrics from a successful validation run:

text
publish trace action:
  matched = 10001
  success = 10001
  failed  = 0
delivered trace action:
  matched = 10001
  success = 10000
  failed  = 1
acked trace action:
  matched = 10001
  success = 10001
  failed  = 0

If any action has a non-zero failure count, or if any negative latency count is non-zero, fix the data path before using the results as final conclusions.