Skip to content

使用 EMQX Tables 实现 MQTT 消息全链路追踪:延迟、投递与确认分析

在 MQTT 系统中,消息从发布者发送,到 Broker 接收、路由、投递,再到订阅者确认(ACK),会经过多个关键阶段。当系统出现消息延迟、消费堆积、ACK 超时、重复消费或消息丢失等问题时,仅依赖 Broker 的基础监控指标,通常很难准确定位问题发生在哪一段链路。

在实际生产环境中,一个 MQTT 消息可能经历:

  • Publisher 网络发送延迟
  • Broker 接收与内部处理延迟
  • Broker 到 Subscriber 的投递延迟
  • Subscriber ACK 返回延迟
  • 消息未成功送达或 ACK 缺失等异常情况

因此,除了传统的 Broker 指标监控之外,还需要一种能够对 MQTT 消息生命周期进行关联分析的链路追踪机制。

EMQX Cloud 可以结合事件主题(Event Topics)、规则引擎(Rule Engine)以及 Tables 数据集成能力,对 MQTT 消息生命周期中的关键事件进行采集与关联。通过这种方式,可以从消息发布开始,到 Broker 投递,再到订阅端 ACK 确认,全程记录消息链路状态与时间戳。

本文将介绍如何使用 EMQX Cloud 与 Tables 构建一套 MQTT 消息全链路可观测性方案,实现以下能力:

  • 分析消息发布、投递与 ACK 确认链路
  • 关联 message.publishmessage.deliveredmessage.acked 事件
  • 测量 Publisher、Broker、Subscriber 各阶段延迟
  • 检测 ACK 缺失、链路不完整与潜在消息丢弃问题
  • 验证消息是否成功送达订阅端
  • 基于 SQL 对消息生命周期进行统计与故障分析

前置准备

开始前,请确保满足以下条件:

  • EMQX v5 版本的部署已创建并可访问。
  • Tables 已部署并可访问。
  • 发布者和订阅者的系统时钟已同步。
  • 发布者在每条 MQTT 消息的 Payload 中包含 publish_at 时间戳。
  • 使用 QoS 1 或 QoS 2,以便捕获确认事件。
  • 客户端与数据库的时间戳处理使用统一时区,推荐使用 UTC。

时间戳定义

追踪模型中使用以下时间戳:

  • publish_at:发布者写入 MQTT Payload 的客户端侧时间戳。
  • publish_received_at:EMQX 接收到 PUBLISH 报文的时间。
  • message_delivered:EMQX 将消息投递至订阅者侧的时间。
  • message_acked:EMQX 收到订阅者确认(ACK)的时间。

这些时间戳用于计算传输路径各阶段的延迟。

设置 Tables 表

创建一张表用于存储消息追踪记录。每个消息阶段以独立的行存储,以便后续通过 msg_id 进行关联查询。

text
CREATE TABLE mqtt_message_traces (
    "timestamp" TIMESTAMP NOT NULL,
    event_name STRING NOT NULL,
    msg_id STRING NOT NULL,
    trace_key STRING NOT NULL,
    publish_at BIGINT,
    emqx_received_at BIGINT,
    emqx_delivered_at BIGINT,
    sub_ack_at BIGINT,
    pub_clientid STRING,
    sub_clientid STRING,
    topic STRING NOT NULL,
    qos INT NOT NULL,
    msg BIGINT,
    TIME INDEX ("timestamp"),
    PRIMARY KEY (trace_key)
)
WITH (
    ttl = '30d'
);

在 EMQX Cloud 中配置 Tables 数据集成

可追溯性工作流使用三条规则和三个写入动作。三个动作均写入同一张表,但每个动作捕获消息生命周期的不同阶段。

三条规则使用统一的关联键:

sql
id as msg_id

同一 msg_id 用于关联以下三个事件:

  • message.publish
  • message.delivered
  • message.acked

配置发布追踪规则

第一条规则记录发布阶段的数据。它监听通配符主题,可捕获所有业务主题上的普通消息。

规则 SQL:

sql
SELECT
  timestamp,
  event,
  id as msg_id,
  concat(id, '_publish') as trace_key,
  int(payload.publish_at) as publish_at,
  int(publish_received_at) as emqx_received_at,
  clientid as pub_clientid,
  topic,
  qos,
  int(payload.msg) as msg
FROM "#"

动作 SQL:

sql
mqtt_message_traces,event_name=${event},msg_id=${msg_id},trace_key=${trace_key},topic=${topic},pub_clientid=${pub_clientid} publish_at=${publish_at}i,emqx_received_at=${emqx_received_at}i,qos=${qos}i,msg=${msg}i ${timestamp}

当发布者向业务主题(如 emqx/test)发送消息时,此规则将发布追踪记录写入 Tables。

data_explorer_1

配置投递追踪规则

第二条规则记录 EMQX 将消息投递至订阅者路径的时间点。

规则 SQL:

sql
SELECT
  timestamp,
  event,
  id as msg_id,
  concat(id, '_delivered') as trace_key,
  int(timestamp) as emqx_delivered_at,
  from_clientid as pub_clientid,
  clientid as sub_clientid,
  topic,
  qos,
  int(payload.msg) as msg
FROM "$events/message_delivered"

动作 SQL:

sql
mqtt_message_traces,event_name=${event},msg_id=${msg_id},trace_key=${trace_key},topic=${topic},pub_clientid=${pub_clientid},sub_clientid=${sub_clientid} emqx_delivered_at=${emqx_delivered_at}i,qos=${qos}i,msg=${msg}i ${timestamp}

此记录用于计算 Broker 端处理延迟和 Broker 到订阅者的延迟。

data_explorer_2

配置确认追踪规则

第三条规则记录确认阶段的数据。此规则仅在存在确认事件时触发,因此必须使用 QoS 1 或 QoS 2。

规则 SQL:

sql
SELECT
  timestamp,
  event,
  id as msg_id,
  concat(id, '_acked') as trace_key,
  int(timestamp) as sub_ack_at,
  from_clientid as pub_clientid,
  clientid as sub_clientid,
  topic,
  qos,
  int(payload.msg) as msg
FROM "$events/message_acked"

动作 SQL:

sql
mqtt_message_traces,event_name=${event},msg_id=${msg_id},trace_key=${trace_key},topic=${topic},pub_clientid=${pub_clientid},sub_clientid=${sub_clientid} sub_ack_at=${sub_ack_at}i,qos=${qos}i,msg=${msg}i ${timestamp}

此记录用于计算订阅者侧确认延迟和端到端总延迟。

连接器配置说明

从逻辑设计角度,三个动作可以共用一个 Tables 连接器。

但在对 EMQX 5 进行持续负载验证时发现,共用一个连接器的稳定性较差。可观察到的现象是某个动作会积累写入失败,导致可用于关联查询的完整样本数量减少。

为保证运行稳定性,建议采用以下配置:

  • 发布追踪动作使用独立连接器
  • 投递追踪动作使用独立连接器
  • 确认追踪动作使用独立连接器

这仅是实现层面的稳定性选择,不影响表结构、规则 SQL、动作 SQL、关联逻辑或延迟计算公式。

使用 Python SDK 模拟消息传递

本节介绍用于验证的发布者和订阅者行为。

发布者向业务主题(如 emqx/test)发送 MQTT 消息,每条 Payload 包含 publish_at 时间戳。订阅者监听相同主题,并在整个消息流传输期间保持在线。

安装 Python 依赖:

bash
python3 -m venv .venv && source .venv/bin/activate
pip install paho-mqtt

发布者

发布者需要:

  • 连接到 EMQX
  • 向目标主题发布消息
  • 使用 QoS 1
  • 在每条消息 Payload 中包含 publish_at 字段

Payload 示例:

json
{"publish_at":1773579492999,"msg":10000}

以下发布者脚本同时支持固定间隔发送和指定 TPS 的持续时长测试,可用于功能验证或持续负载测试。

python
#!/usr/bin/env python3
import argparse, json, random, time
from paho.mqtt import client as mqtt_client
parser = argparse.ArgumentParser()
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("--port", type=int, default=1883)
parser.add_argument("--topic", default="emqx/test")
parser.add_argument("--qos", type=int, default=1)
parser.add_argument("--count", type=int, default=20)
parser.add_argument("--interval-sec", type=float, default=1.0)
parser.add_argument("--tps", type=float)
parser.add_argument("--duration-sec", type=float)
parser.add_argument("--hold-sec", type=float, default=0.0)
parser.add_argument("--username", default=None)
parser.add_argument("--password", default=None)
args = parser.parse_args()
def on_connect(client, userdata, flags, reason_code, properties):
    if reason_code != 0:
        raise RuntimeError(f"connect failed: {reason_code}")
client = mqtt_client.Client(
    mqtt_client.CallbackAPIVersion.VERSION2,
    client_id=f"python-mqtt-pub-{random.randint(0, 100000)}",
)
client.on_connect = on_connect
if args.username:
    client.username_pw_set(args.username, args.password)
client.connect(args.host, args.port)
client.loop_start()
count = 0
start = time.perf_counter()
if args.tps and args.duration_sec:
    interval = 1.0 / args.tps
    deadline = start + args.duration_sec
    next_send = start
    keep_sending = lambda now, sent: now < deadline
else:
    interval = args.interval_sec
    next_send = start + interval
    keep_sending = lambda now, sent: sent < args.count
while keep_sending(time.perf_counter(), count):
    now = time.perf_counter()
    if now < next_send:
        time.sleep(next_send - now)
    payload = json.dumps({"publish_at": int(time.time() * 1000), "msg": count}, separators=(",", ":"))
    result = client.publish(args.topic, payload, qos=args.qos)
    if result.rc != 0:
        raise RuntimeError(f"publish failed: {result.rc}")
    count += 1
    next_send += interval
publish_elapsed = time.perf_counter() - start
if args.hold_sec > 0:
    time.sleep(args.hold_sec)
total_elapsed = time.perf_counter() - start
print(json.dumps({
    "topic": args.topic,
    "sent": count,
    "publish_duration_sec": round(publish_elapsed, 3),
    "total_duration_sec": round(total_elapsed, 3),
    "hold_sec": args.hold_sec,
    "target_tps": args.tps,
    "actual_tps": round(count / publish_elapsed, 3) if publish_elapsed else 0.0,
}))
client.disconnect()
client.loop_stop()

运行示例:

bash
python3 publisher.py \
  --host XXXX.ala.dedicated.aws.mqttce.net \
  --port 1883 \
  --topic emqx/test \
  --qos 1 \
  --tps 1 \
  --duration-sec 5 \
  --hold-sec 5 \
  --username xxx \
  --password xxx

输出示例:

text
{"topic": "emqx/test", "sent": 6, "publish_duration_sec": 5.002, "total_duration_sec": 10.007, "hold_sec": 5.0, "target_tps": 1.0, "actual_tps": 1.2}

订阅者

订阅者需要:

  • 连接到 EMQX
  • 订阅相同的业务主题
  • 使用 QoS 1
  • 保持连接时长超过发布阶段
python
#!/usr/bin/env python3
import argparse, json, random, time
from paho.mqtt import client as mqtt_client
parser = argparse.ArgumentParser()
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("--port", type=int, default=1883)
parser.add_argument("--topic", default="emqx/test")
parser.add_argument("--qos", type=int, default=1)
parser.add_argument("--duration-sec", type=float)
parser.add_argument("--username", default=None)
parser.add_argument("--password", default=None)
args = parser.parse_args()
counter = {"received": 0}
def on_connect(client, userdata, flags, reason_code, properties):
    if reason_code != 0:
        raise RuntimeError(f"connect failed: {reason_code}")
    client.subscribe(args.topic, qos=args.qos)
def on_message(client, userdata, msg):
    counter["received"] += 1
client = mqtt_client.Client(
    mqtt_client.CallbackAPIVersion.VERSION2,
    client_id=f"python-mqtt-sub-{random.randint(0, 100000)}",
)
client.on_connect = on_connect
client.on_message = on_message
if args.username:
    client.username_pw_set(args.username, args.password)
client.connect(args.host, args.port)
client.loop_start()
start = time.perf_counter()
while args.duration_sec is None or time.perf_counter() - start < args.duration_sec:
    time.sleep(1)
elapsed = time.perf_counter() - start
print(json.dumps({
    "topic": args.topic,
    "received": counter["received"],
    "duration_sec": round(elapsed, 3),
    "recv_tps": round(counter["received"] / elapsed, 3) if elapsed else 0.0,
}))
client.disconnect()
client.loop_stop()

先启动订阅者:

bash
python3 subscriber.py \
  --host xxxx.dedicated.aws.mqttce.net \
  --port 1883 \
  --topic emqx/test \
  --qos 1 \
  --username xxx \
  --password xxx

建议运行顺序

使用两个终端:

  1. 先启动订阅者。
  2. 再启动发布者。
  3. 等待两个命令均执行完毕。
  4. 消息投递完成后,再查询 EMQX Tables。

查看消息延迟

发布者、订阅者、规则和 Tables 动作均运行后,可通过 SQL 验证追踪记录并计算延迟。

如何计算延迟

各阶段延迟的计算公式如下:

  • 发布客户端到 EMQX 延迟:publish_received_at - publish_at
  • EMQX 处理延迟:message_delivered - publish_received_at
  • EMQX 到订阅客户端延迟:message_acked - message_delivered
  • 端到端总传输延迟:message_acked - publish_at

提示

  • 如果消息缺少 publish_at 时间戳,则无法计算发布者到 Broker 的延迟。
  • 使用 QoS 0 时不会产生 message_acked 事件,因此无法计算订阅者侧延迟。

查询示例

1. 统计所有追踪记录总数

sql
SELECT COUNT(*) AS total_rows
FROM mqtt_message_traces;

2. 查看最近的发布记录

sql
SELECT
    timestamp,
    event_name,
    msg_id,
    publish_at,
    emqx_received_at,
    pub_clientid,
    topic,
    qos,
    msg
FROM mqtt_message_traces
WHERE event_name = 'message.publish'
ORDER BY timestamp DESC
LIMIT 20;

此查询用于确认发布者侧时间戳和 Broker 接收时间戳均已正确存储。

3. 查看最近的投递记录

sql
SELECT
    "timestamp",
    event_name,
    msg_id,
    publish_at,
    emqx_received_at,
    emqx_delivered_at,
    pub_clientid,
    sub_clientid,
    topic,
    qos
FROM mqtt_message_traces
WHERE event_name = 'message.delivered'
ORDER BY "timestamp" DESC
LIMIT 20;

此查询用于确认投递阶段数据存在,可用于计算 Broker 到订阅者的延迟。

4. 查看最近的确认记录

sql
SELECT
    timestamp,
    event_name,
    msg_id,
    publish_at,
    emqx_received_at,
    sub_ack_at,
    pub_clientid,
    sub_clientid,
    topic,
    qos,
    msg
FROM mqtt_message_traces
WHERE event_name = 'message.acked'
ORDER BY timestamp DESC
LIMIT 20;

此查询用于确认确认阶段数据存在,可用于计算订阅者侧延迟。

5. 按事件类型统计记录数

sql
SELECT
    event_name,
    COUNT(*) AS row_count
FROM mqtt_message_traces
GROUP BY event_name
ORDER BY event_name;

此查询用于快速检查追踪数据集的完整性。

6. 查询过去一小时内各阶段平均延迟

sql
WITH publish_events AS (
  SELECT
    msg_id,
    MAX(publish_at) AS publish_at,
    MAX(emqx_received_at) AS publish_received_at
  FROM mqtt_message_traces
  WHERE event_name = 'message.publish'
    AND timestamp >= now() - INTERVAL '1 hour'
  GROUP BY msg_id
),
ack_events AS (
  SELECT
    msg_id,
    MAX(sub_ack_at) AS message_acked
  FROM mqtt_message_traces
  WHERE event_name = 'message.acked'
    AND timestamp >= now() - INTERVAL '1 hour'
  GROUP BY msg_id
),
message_latency AS (
  SELECT
    p.msg_id,
    p.publish_at,
    p.publish_received_at,
    a.message_acked
  FROM publish_events p
  JOIN ack_events a
    ON p.msg_id = a.msg_id
)
SELECT
  COUNT(*) AS sample_count,
  ROUND(AVG(publish_received_at - publish_at), 3) AS publisher_to_emqx_ms,
  ROUND(AVG(message_acked - publish_at), 3) AS end_to_end_ms
FROM message_latency
WHERE publish_at IS NOT NULL
  AND publish_received_at IS NOT NULL
  AND message_acked IS NOT NULL;

在正常运行的 EMQX 系统中,Broker 处理延迟通常极低。真正耗时的部分在于发布和消费过程,即 publisher_to_emqx_msemqx_to_subscriber_ms

查询说明:

  • WHERE timestamp >= now() - INTERVAL '1 hour' 将查询范围限定在近一小时的观测窗口,避免历史数据干扰。
  • publish_eventsack_events 在执行 JOIN 前缩小数据集范围。
  • MAX(...) 将多个事件行的时间戳合并到一条聚合行中。
  • 末尾的 WHERE ... IS NOT NULL 确保只使用发布/确认链路完整的记录计算延迟。

7. 验证负值

在接受延迟结果前,先运行以下查询。若有非零结果,说明存在时间戳问题、时钟同步问题或字段映射错误。

sql
WITH publish_events AS (
  SELECT
    msg_id,
    MAX(publish_at) AS publish_at,
    MAX(emqx_received_at) AS publish_received_at
  FROM mqtt_message_traces
  WHERE event_name = 'message.publish'
    AND timestamp >= now() - INTERVAL '1 hour'
  GROUP BY msg_id
),
ack_events AS (
  SELECT
    msg_id,
    MAX(sub_ack_at) AS message_acked
  FROM mqtt_message_traces
  WHERE event_name = 'message.acked'
    AND timestamp >= now() - INTERVAL '1 hour'
  GROUP BY msg_id
),
message_latency AS (
  SELECT
    p.msg_id,
    p.publish_at,
    p.publish_received_at,
    a.message_acked
  FROM publish_events p
  JOIN ack_events a
    ON p.msg_id = a.msg_id
)
SELECT
  SUM(CASE WHEN publish_received_at - publish_at < 0 THEN 1 ELSE 0 END) AS negative_publisher_to_emqx,
  SUM(CASE WHEN message_acked - publish_at < 0 THEN 1 ELSE 0 END) AS negative_end_to_end
FROM message_latency
WHERE publish_at IS NOT NULL
  AND publish_received_at IS NOT NULL
  AND message_acked IS NOT NULL;

所有值均为 0 时,结果方可认为有效。若任一计数非零,应在将结果用作最终结论前检查时间戳映射和时钟同步配置。

结果验证

在发布最终延迟结果前,请验证以下所有条件:

  1. message.publishmessage.acked 两类追踪记录均已存在;
  2. 关键时间戳字段 publish_atemqx_received_atsub_ack_at 均已正确填充;
  3. 平均延迟查询返回了有意义的 sample_count
  4. 负值验证查询所有项均返回 0;
  5. EMQX Rule Action 指标未显示异常失败数。