Skip to content

Full-Chain MQTT Message Tracing with EMQX Tables: Latency, Delivery, and Acknowledgment Analysis

In an MQTT system, a message passes through several key stages: the publisher sends it, the broker receives and routes it, delivers it to subscribers, and the subscribers acknowledge receipt (ACK). When issues such as message delays, consumer backlogs, ACK timeouts, duplicate consumption, or message loss occur, basic broker monitoring metrics alone are rarely sufficient to pinpoint which segment of the chain is at fault.

In a production environment, an MQTT message may go through:

  • Publisher network transmission latency
  • Broker receive and internal processing latency
  • Broker-to-subscriber delivery latency
  • Subscriber ACK return latency
  • Abnormal situations such as failed delivery or missing ACK

Beyond traditional broker metric monitoring, a tracing mechanism is needed to correlate and analyze the MQTT message lifecycle.

EMQX Cloud combines event topics, the rule engine, and Tables data integration to capture and correlate key events in the MQTT message lifecycle. This approach records message chain status and timestamps from the moment of publication, through broker delivery, to subscriber ACK confirmation.

This guide explains how to use EMQX Cloud and Tables to build a full-chain MQTT message observability system with the following capabilities:

  • Analyze publish, delivery, and ACK confirmation chains
  • Correlate message.publish, message.delivered, and message.acked events
  • Measure per-stage latency across Publisher, Broker, and Subscriber
  • Detect missing ACKs, incomplete chains, and potential message drops
  • Verify that messages were successfully delivered to subscribers
  • Use SQL to aggregate and analyze the message lifecycle for fault analysis

Prerequisites

Before starting, ensure the following conditions are met:

  • An EMQX v5 deployment is created and accessible.
  • Tables is deployed and accessible.
  • System clocks on the publisher and subscriber are synchronized.
  • The publisher includes a publish_at timestamp in the payload of each MQTT message.
  • QoS 1 or QoS 2 is used to enable capture of acknowledgment events.
  • Timestamps are handled in a consistent timezone across clients and the database. UTC is recommended.

Timestamp Definitions

The tracing model uses the following timestamps:

  • publish_at: The client-side timestamp written into the MQTT payload by the publisher.
  • publish_received_at: The time when EMQX receives the PUBLISH packet.
  • message_delivered: The time when EMQX delivers the message to the subscriber side.
  • message_acked: The time when EMQX receives the acknowledgment (ACK) from the subscriber.

These timestamps are used to calculate the latency at each stage of the transmission path.

Set Up the Tables Table

Create a table to store message trace records. Each message stage is stored as a separate row, enabling later correlation queries using msg_id.

text
CREATE TABLE mqtt_message_traces (
    "timestamp" TIMESTAMP NOT NULL,
    event_name STRING NOT NULL,
    msg_id STRING NOT NULL,
    trace_key STRING NOT NULL,
    publish_at BIGINT,
    emqx_received_at BIGINT,
    emqx_delivered_at BIGINT,
    sub_ack_at BIGINT,
    pub_clientid STRING,
    sub_clientid STRING,
    topic STRING NOT NULL,
    qos INT NOT NULL,
    msg BIGINT,
    TIME INDEX ("timestamp"),
    PRIMARY KEY (trace_key)
)
WITH (
    ttl = '30d'
);

Configure Tables Data Integration in EMQX Cloud

The traceability workflow uses three rules and three write actions. All three actions write to the same table, but each captures a different stage of the message lifecycle.

All three rules use the same correlation key:

sql
id as msg_id

The same msg_id is used to correlate the following three events:

  • message.publish
  • message.delivered
  • message.acked

Configure the Publish Trace Rule

The first rule records publish-stage data. It listens on a wildcard topic and captures regular messages on all business topics.

Rule SQL:

sql
SELECT
  timestamp,
  event,
  id as msg_id,
  concat(id, '_publish') as trace_key,
  int(payload.publish_at) as publish_at,
  int(publish_received_at) as emqx_received_at,
  clientid as pub_clientid,
  topic,
  qos,
  int(payload.msg) as msg
FROM "#"

Action SQL:

sql
mqtt_message_traces,event_name=${event},msg_id=${msg_id},trace_key=${trace_key},topic=${topic},pub_clientid=${pub_clientid} publish_at=${publish_at}i,emqx_received_at=${emqx_received_at}i,qos=${qos}i,msg=${msg}i ${timestamp}

When the publisher sends a message to a business topic (for example, emqx/test), this rule writes a publish trace record to Tables.

data_explorer_1

Configure the Delivered Trace Rule

The second rule records the timestamp when EMQX delivers the message to the subscriber path.

Rule SQL:

sql
SELECT
  timestamp,
  event,
  id as msg_id,
  concat(id, '_delivered') as trace_key,
  int(timestamp) as emqx_delivered_at,
  from_clientid as pub_clientid,
  clientid as sub_clientid,
  topic,
  qos,
  int(payload.msg) as msg
FROM "$events/message_delivered"

Action SQL:

sql
mqtt_message_traces,event_name=${event},msg_id=${msg_id},trace_key=${trace_key},topic=${topic},pub_clientid=${pub_clientid},sub_clientid=${sub_clientid} emqx_delivered_at=${emqx_delivered_at}i,qos=${qos}i,msg=${msg}i ${timestamp}

This record is used to calculate broker-side processing latency and broker-to-subscriber latency.

data_explorer_2

Configure the Acked Trace Rule

The third rule records acknowledgment-stage data. This rule only fires when an acknowledgment event exists, so QoS 1 or QoS 2 is required.

Rule SQL:

sql
SELECT
  timestamp,
  event,
  id as msg_id,
  concat(id, '_acked') as trace_key,
  int(timestamp) as sub_ack_at,
  from_clientid as pub_clientid,
  clientid as sub_clientid,
  topic,
  qos,
  int(payload.msg) as msg
FROM "$events/message_acked"

Action SQL:

sql
mqtt_message_traces,event_name=${event},msg_id=${msg_id},trace_key=${trace_key},topic=${topic},pub_clientid=${pub_clientid},sub_clientid=${sub_clientid} sub_ack_at=${sub_ack_at}i,qos=${qos}i,msg=${msg}i ${timestamp}

This record is used to calculate subscriber-side acknowledgment latency and end-to-end total latency.

Connector Configuration Notes

From a logical design perspective, all three actions can share a single Tables connector.

However, during sustained load testing on EMQX 5, sharing a single connector was found to be unstable. The observable symptom is that one action accumulates write failures, reducing the number of complete samples available for correlation queries.

For stable operation, the recommended configuration is:

  • A dedicated connector for the publish trace action
  • A dedicated connector for the delivered trace action
  • A dedicated connector for the acked trace action

This is an implementation-level stability choice and does not affect the table schema, rule SQL, action SQL, correlation logic, or latency calculation formulas.

Use Python SDK to Simulate Message Delivery

This section describes the publisher and subscriber behavior used for validation.

The publisher sends MQTT messages to a business topic (for example, emqx/test), with each payload containing a publish_at timestamp. The subscriber listens on the same topic and remains online throughout the message flow.

Install the Python dependency:

bash
python3 -m venv .venv && source .venv/bin/activate
pip install paho-mqtt

Publisher

The publisher needs to:

  • Connect to EMQX
  • Publish messages to the target topic
  • Use QoS 1
  • Include a publish_at field in each message payload

Example payload:

json
{"publish_at":1773579492999,"msg":10000}

The following publisher script supports both fixed-interval sending and duration-based load testing at a specified TPS, suitable for both functional verification and sustained load testing.

python
#!/usr/bin/env python3
import argparse, json, random, time
from paho.mqtt import client as mqtt_client
parser = argparse.ArgumentParser()
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("--port", type=int, default=1883)
parser.add_argument("--topic", default="emqx/test")
parser.add_argument("--qos", type=int, default=1)
parser.add_argument("--count", type=int, default=20)
parser.add_argument("--interval-sec", type=float, default=1.0)
parser.add_argument("--tps", type=float)
parser.add_argument("--duration-sec", type=float)
parser.add_argument("--hold-sec", type=float, default=0.0)
parser.add_argument("--username", default=None)
parser.add_argument("--password", default=None)
args = parser.parse_args()
def on_connect(client, userdata, flags, reason_code, properties):
    if reason_code != 0:
        raise RuntimeError(f"connect failed: {reason_code}")
client = mqtt_client.Client(
    mqtt_client.CallbackAPIVersion.VERSION2,
    client_id=f"python-mqtt-pub-{random.randint(0, 100000)}",
)
client.on_connect = on_connect
if args.username:
    client.username_pw_set(args.username, args.password)
client.connect(args.host, args.port)
client.loop_start()
count = 0
start = time.perf_counter()
if args.tps and args.duration_sec:
    interval = 1.0 / args.tps
    deadline = start + args.duration_sec
    next_send = start
    keep_sending = lambda now, sent: now < deadline
else:
    interval = args.interval_sec
    next_send = start + interval
    keep_sending = lambda now, sent: sent < args.count
while keep_sending(time.perf_counter(), count):
    now = time.perf_counter()
    if now < next_send:
        time.sleep(next_send - now)
    payload = json.dumps({"publish_at": int(time.time() * 1000), "msg": count}, separators=(",", ":"))
    result = client.publish(args.topic, payload, qos=args.qos)
    if result.rc != 0:
        raise RuntimeError(f"publish failed: {result.rc}")
    count += 1
    next_send += interval
publish_elapsed = time.perf_counter() - start
if args.hold_sec > 0:
    time.sleep(args.hold_sec)
total_elapsed = time.perf_counter() - start
print(json.dumps({
    "topic": args.topic,
    "sent": count,
    "publish_duration_sec": round(publish_elapsed, 3),
    "total_duration_sec": round(total_elapsed, 3),
    "hold_sec": args.hold_sec,
    "target_tps": args.tps,
    "actual_tps": round(count / publish_elapsed, 3) if publish_elapsed else 0.0,
}))
client.disconnect()
client.loop_stop()

Example run:

bash
python3 publisher.py \
  --host XXXX.ala.dedicated.aws.mqttce.net \
  --port 1883 \
  --topic emqx/test \
  --qos 1 \
  --tps 1 \
  --duration-sec 5 \
  --hold-sec 5 \
  --username xxx \
  --password xxx

Example output:

text
{"topic": "emqx/test", "sent": 6, "publish_duration_sec": 5.002, "total_duration_sec": 10.007, "hold_sec": 5.0, "target_tps": 1.0, "actual_tps": 1.2}

Subscriber

The subscriber needs to:

  • Connect to EMQX
  • Subscribe to the same business topic
  • Use QoS 1
  • Remain connected longer than the publish phase
python
#!/usr/bin/env python3
import argparse, json, random, time
from paho.mqtt import client as mqtt_client
parser = argparse.ArgumentParser()
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("--port", type=int, default=1883)
parser.add_argument("--topic", default="emqx/test")
parser.add_argument("--qos", type=int, default=1)
parser.add_argument("--duration-sec", type=float)
parser.add_argument("--username", default=None)
parser.add_argument("--password", default=None)
args = parser.parse_args()
counter = {"received": 0}
def on_connect(client, userdata, flags, reason_code, properties):
    if reason_code != 0:
        raise RuntimeError(f"connect failed: {reason_code}")
    client.subscribe(args.topic, qos=args.qos)
def on_message(client, userdata, msg):
    counter["received"] += 1
client = mqtt_client.Client(
    mqtt_client.CallbackAPIVersion.VERSION2,
    client_id=f"python-mqtt-sub-{random.randint(0, 100000)}",
)
client.on_connect = on_connect
client.on_message = on_message
if args.username:
    client.username_pw_set(args.username, args.password)
client.connect(args.host, args.port)
client.loop_start()
start = time.perf_counter()
while args.duration_sec is None or time.perf_counter() - start < args.duration_sec:
    time.sleep(1)
elapsed = time.perf_counter() - start
print(json.dumps({
    "topic": args.topic,
    "received": counter["received"],
    "duration_sec": round(elapsed, 3),
    "recv_tps": round(counter["received"] / elapsed, 3) if elapsed else 0.0,
}))
client.disconnect()
client.loop_stop()

Start the subscriber first:

bash
python3 subscriber.py \
  --host xxxx.dedicated.aws.mqttce.net \
  --port 1883 \
  --topic emqx/test \
  --qos 1 \
  --username xxx \
  --password xxx

Use two terminals:

  1. Start the subscriber first.
  2. Then start the publisher.
  3. Wait for both commands to complete.
  4. Query EMQX Tables after message delivery is complete.

View Message Latency

Once the publisher, subscriber, rules, and Tables actions are all running, use SQL to validate the trace records and calculate latency.

How to Calculate Latency

Latency formulas for each stage:

  • Publisher to EMQX: publish_received_at - publish_at
  • EMQX processing: message_delivered - publish_received_at
  • EMQX to subscriber: message_acked - message_delivered
  • End-to-end total: message_acked - publish_at

TIP

  • If a message lacks a publish_at timestamp, publisher-to-broker latency cannot be calculated.
  • With QoS 0, message_acked events are not generated, so subscriber-side latency cannot be calculated.

Query Examples

1. Count All Trace Records

sql
SELECT COUNT(*) AS total_rows
FROM mqtt_message_traces;

2. View Recent Publish Records

sql
SELECT
    timestamp,
    event_name,
    msg_id,
    publish_at,
    emqx_received_at,
    pub_clientid,
    topic,
    qos,
    msg
FROM mqtt_message_traces
WHERE event_name = 'message.publish'
ORDER BY timestamp DESC
LIMIT 20;

Use this query to confirm that publisher-side timestamps and broker receive timestamps are stored correctly.

3. View Recent Delivered Records

sql
SELECT
    "timestamp",
    event_name,
    msg_id,
    publish_at,
    emqx_received_at,
    emqx_delivered_at,
    pub_clientid,
    sub_clientid,
    topic,
    qos
FROM mqtt_message_traces
WHERE event_name = 'message.delivered'
ORDER BY "timestamp" DESC
LIMIT 20;

Use this query to confirm that delivery-stage data is present and can be used to calculate broker-to-subscriber latency.

4. View Recent Acked Records

sql
SELECT
    timestamp,
    event_name,
    msg_id,
    publish_at,
    emqx_received_at,
    sub_ack_at,
    pub_clientid,
    sub_clientid,
    topic,
    qos,
    msg
FROM mqtt_message_traces
WHERE event_name = 'message.acked'
ORDER BY timestamp DESC
LIMIT 20;

Use this query to confirm that acknowledgment-stage data is present and can be used to calculate subscriber-side latency.

5. Count Records by Event Type

sql
SELECT
    event_name,
    COUNT(*) AS row_count
FROM mqtt_message_traces
GROUP BY event_name
ORDER BY event_name;

Use this query for a quick integrity check on the trace dataset.

6. Calculate Average Latency per Stage for the Past Hour

sql
WITH publish_events AS (
  SELECT
    msg_id,
    MAX(publish_at) AS publish_at,
    MAX(emqx_received_at) AS publish_received_at
  FROM mqtt_message_traces
  WHERE event_name = 'message.publish'
    AND timestamp >= now() - INTERVAL '1 hour'
  GROUP BY msg_id
),
ack_events AS (
  SELECT
    msg_id,
    MAX(sub_ack_at) AS message_acked
  FROM mqtt_message_traces
  WHERE event_name = 'message.acked'
    AND timestamp >= now() - INTERVAL '1 hour'
  GROUP BY msg_id
),
message_latency AS (
  SELECT
    p.msg_id,
    p.publish_at,
    p.publish_received_at,
    a.message_acked
  FROM publish_events p
  JOIN ack_events a
    ON p.msg_id = a.msg_id
)
SELECT
  COUNT(*) AS sample_count,
  ROUND(AVG(publish_received_at - publish_at), 3) AS publisher_to_emqx_ms,
  ROUND(AVG(message_acked - publish_at), 3) AS end_to_end_ms
FROM message_latency
WHERE publish_at IS NOT NULL
  AND publish_received_at IS NOT NULL
  AND message_acked IS NOT NULL;

In a healthy EMQX system, broker processing latency is typically very low. The more significant contributors are the publish and consume phases: publisher_to_emqx_ms and emqx_to_subscriber_ms.

Query notes:

  • WHERE timestamp >= now() - INTERVAL '1 hour' limits the query to a one-hour observation window, avoiding interference from historical data.
  • publish_events and ack_events narrow the dataset before the JOIN.
  • MAX(...) collapses the timestamps from multiple event rows into a single aggregate row.
  • The final WHERE ... IS NOT NULL ensures that only records with a complete publish/acked chain are used to calculate latency.

7. Validate for Negative Latency Values

Run the following query before accepting latency results. Any non-zero result indicates a timestamp issue, clock synchronization problem, or field mapping error.

sql
WITH publish_events AS (
  SELECT
    msg_id,
    MAX(publish_at) AS publish_at,
    MAX(emqx_received_at) AS publish_received_at
  FROM mqtt_message_traces
  WHERE event_name = 'message.publish'
    AND timestamp >= now() - INTERVAL '1 hour'
  GROUP BY msg_id
),
ack_events AS (
  SELECT
    msg_id,
    MAX(sub_ack_at) AS message_acked
  FROM mqtt_message_traces
  WHERE event_name = 'message.acked'
    AND timestamp >= now() - INTERVAL '1 hour'
  GROUP BY msg_id
),
message_latency AS (
  SELECT
    p.msg_id,
    p.publish_at,
    p.publish_received_at,
    a.message_acked
  FROM publish_events p
  JOIN ack_events a
    ON p.msg_id = a.msg_id
)
SELECT
  SUM(CASE WHEN publish_received_at - publish_at < 0 THEN 1 ELSE 0 END) AS negative_publisher_to_emqx,
  SUM(CASE WHEN message_acked - publish_at < 0 THEN 1 ELSE 0 END) AS negative_end_to_end
FROM message_latency
WHERE publish_at IS NOT NULL
  AND publish_received_at IS NOT NULL
  AND message_acked IS NOT NULL;

All values must be 0 before the results can be considered valid. If any count is non-zero, check the timestamp mappings and clock synchronization configuration before using the results as final conclusions.

Result Validation

Before publishing final latency results, verify all of the following:

  1. Trace records exist for both message.publish and message.acked event types;
  2. Key timestamp fields publish_at, emqx_received_at, and sub_ack_at are correctly populated;
  3. The average latency query returns a meaningful sample_count;
  4. The negative-value validation query returns 0 for all checks;
  5. EMQX Rule Action metrics show no abnormal failure counts.