# 使用事件主题和 TimescaleDB 集成追踪消息延迟

EMQX Cloud 可以利用事件主题和基于规则的数据集成，在完整的 MQTT 传输路径上构建端到端的消息可追溯性。通过这种方式，可以从发布者发送消息的那一刻起，经过 Broker 端处理，直到订阅者确认收到为止，全程采集延迟数据。

本文介绍如何使用 EMQX Cloud 与 TimescaleDB 构建这一可追溯性工作流。最终实现一套实用的延迟观测机制，可用于测量以下各阶段的延迟：

- 发布者到 Broker 的延迟
- Broker 端处理延迟
- Broker 到订阅者的延迟
- 端到端总传输延迟

实现方案将追踪事件存储到 TimescaleDB，并通过 SQL 查询检查原始记录、验证数据完整性，以及计算各阶段的平均延迟。

## 前置准备

开始前，请确保满足以下条件：

- EMQX v5 版本的部署已创建并可访问。
- TimescaleDB 已部署并可访问。
- 发布者和订阅者的系统时钟已同步。
- 发布者在每条 MQTT 消息的 Payload 中包含 `publish_at` 时间戳。
- 使用 QoS 1 或 QoS 2，以便捕获确认事件。
- 客户端与数据库的时间戳处理使用统一时区，推荐使用 UTC。

### 时间戳定义

追踪模型中使用以下时间戳：

- `publish_at`：发布者写入 MQTT Payload 的客户端侧时间戳。
- `publish_received_at`：EMQX 接收到 PUBLISH 报文的时间。
- `message_delivered`：EMQX 将消息投递至订阅者侧的时间。
- `message_acked`：EMQX 收到订阅者确认（ACK）的时间。

这些时间戳用于计算传输路径各阶段的延迟。

## 设置 TimescaleDB 表

创建一张表用于存储消息追踪记录，然后将其转换为 TimescaleDB 超级表。每个消息阶段以独立的行存储，以便后续通过 `msg_id` 进行关联查询。

```sql
CREATE EXTENSION IF NOT EXISTS timescaledb;
DROP TABLE IF EXISTS mqtt_message_traces;
CREATE TABLE mqtt_message_traces
(
    event_at          TIMESTAMPZ         NOT NULL,
    event             character varying NOT NULL,
    msg_id            character varying NOT NULL,
    publish_at        bigint,
    emqx_received_at  bigint,
    emqx_delivered_at bigint,
    sub_ack_at        bigint,
    pub_clientid      character varying,
    sub_clientid      character varying,
    topic             character varying NOT NULL,
    qos               integer           NOT NULL,
    payload           text
);
```

使用 `event_at` 作为时间列，将表转换为超级表：

```sql
SELECT create_hypertable(
           'mqtt_message_traces',
           'event_at',
           chunk_time_interval => interval '12 hour',
           if_not_exists => TRUE
       );
```

创建索引以提升查询性能：

- 针对 `event` 列的索引，用于快速定位事件类型。
- 针对 `msg_id` 和降序 `event_at` 的联合索引，用于按时间追踪特定消息。
- 针对 `pub_clientid` 和降序 `event_at` 的联合索引，用于分析特定发布者的行为。

```sql
CREATE INDEX IF NOT EXISTS emqx_mqtt_message_traces_event_idx
    ON mqtt_message_traces (event);
CREATE INDEX IF NOT EXISTS emqx_mqtt_message_traces_msg_id_event_at_idx
    ON mqtt_message_traces (msg_id, event_at DESC);
CREATE INDEX IF NOT EXISTS emqx_mqtt_message_traces_pub_clientid_event_at_idx
    ON mqtt_message_traces (pub_clientid, event_at DESC);
```

## 在 EMQX Cloud 中配置 TimescaleDB 数据集成

可追溯性工作流使用三条规则和三个写入动作。三个动作均写入同一张表，但每个动作捕获消息生命周期的不同阶段。

三条规则使用统一的关联键：

```
id as msg_id
```

同一 `msg_id` 用于关联以下三个事件：

- `message.publish`
- `message.delivered`
- `message.acked`

### 配置发布追踪规则

第一条规则记录发布阶段的数据。它监听通配符主题，可捕获所有业务主题上的普通消息。

**规则 SQL：**

```sql
SELECT
  timestamp div 1000 as event_at,
  event,
  id as msg_id,
  int(payload.publish_at) as publish_at,
  int(publish_received_at) as emqx_received_at,
  clientid as pub_clientid,
  topic,
  qos,
  payload
FROM "#"
```

**动作 SQL：**

```sql
INSERT INTO mqtt_message_traces(
  event_at,
  event,
  msg_id,
  publish_at,
  emqx_received_at,
  emqx_delivered_at,
  sub_ack_at,
  pub_clientid,
  sub_clientid,
  topic,
  qos,
  payload
) VALUES (
  to_timestamp(${event_at}),
  ${event},
  ${msg_id},
  ${publish_at}::bigint,
  ${emqx_received_at}::bigint,
  NULL,
  NULL,
  ${pub_clientid},
  NULL,
  ${topic},
  ${qos},
  ${payload}
)
```

当发布者向业务主题（如 `emqx/test`）发送消息时，此规则将发布追踪记录写入 TimescaleDB。

![action-view](./_assets/action-view.png)

### 配置投递追踪规则

第二条规则记录 EMQX 将消息投递至订阅者路径的时间点。

**规则 SQL：**

```sql
SELECT
  timestamp div 1000 as event_at,
  event,
  id as msg_id,
  int(payload.publish_at) as publish_at,
  int(publish_received_at) as emqx_received_at,
  int(timestamp) as emqx_delivered_at,
  from_clientid as pub_clientid,
  clientid as sub_clientid,
  topic,
  qos,
  payload
FROM "$events/message_delivered"
```

**动作 SQL：**

```sql
INSERT INTO mqtt_message_traces(
  event_at,
  event,
  msg_id,
  publish_at,
  emqx_received_at,
  emqx_delivered_at,
  sub_ack_at,
  pub_clientid,
  sub_clientid,
  topic,
  qos,
  payload
) VALUES (
  to_timestamp(${event_at}),
  ${event},
  ${msg_id},
  ${publish_at}::bigint,
  ${emqx_received_at}::bigint,
  ${emqx_delivered_at}::bigint,
  NULL,
  ${pub_clientid},
  ${sub_clientid},
  ${topic},
  ${qos},
  ${payload}
)
```

此记录用于计算 Broker 端处理延迟和 Broker 到订阅者的延迟。

![rule_2nd](./_assets/rule_2nd.png)

### 配置确认追踪规则

第三条规则记录确认阶段的数据。此规则仅在存在确认事件时触发，因此必须使用 QoS 1 或 QoS 2。

**规则 SQL：**

```sql
SELECT
  timestamp div 1000 as event_at,
  event,
  id as msg_id,
  int(payload.publish_at) as publish_at,
  int(publish_received_at) as emqx_received_at,
  int(timestamp) as sub_ack_at,
  from_clientid as pub_clientid,
  clientid as sub_clientid,
  topic,
  qos,
  payload
FROM "$events/message_acked"
```

**动作 SQL：**

```sql
INSERT INTO mqtt_message_traces(
  event_at,
  event,
  msg_id,
  publish_at,
  emqx_received_at,
  emqx_delivered_at,
  sub_ack_at,
  pub_clientid,
  sub_clientid,
  topic,
  qos,
  payload
) VALUES (
  to_timestamp(${event_at}),
  ${event},
  ${msg_id},
  ${publish_at}::bigint,
  ${emqx_received_at}::bigint,
  NULL,
  ${sub_ack_at}::bigint,
  ${pub_clientid},
  ${sub_clientid},
  ${topic},
  ${qos},
  ${payload}
)
```

此记录用于计算订阅者侧确认延迟和端到端总延迟。

![rule_3rd](./_assets/rule_3rd.png)

### 连接器配置说明

从逻辑设计角度，三个动作可以共用一个 TimescaleDB 连接器。

但在对 EMQX 5 进行持续负载验证时发现，共用一个 PostgreSQL/TimescaleDB 连接器的稳定性较差。可观察到的现象是某个动作会积累写入失败，导致可用于关联查询的完整样本数量减少。

为保证运行稳定性，建议采用以下配置：

- 发布追踪动作使用独立连接器
- 投递追踪动作使用独立连接器
- 确认追踪动作使用独立连接器

这仅是实现层面的稳定性选择，不影响表结构、规则 SQL、动作 SQL、关联逻辑或延迟计算公式。

## 使用 Python SDK 模拟消息传递

本节介绍用于验证的发布者和订阅者行为。

发布者向业务主题（如 `emqx/test`）发送 MQTT 消息，每条 Payload 包含 `publish_at` 时间戳。订阅者监听相同主题，并在整个消息流传输期间保持在线。

安装 Python 依赖：

```bash
python3 -m pip install paho-mqtt
```

### 发布者

发布者需要：

- 连接到 EMQX
- 向目标主题发布消息
- 使用 QoS 1
- 在每条消息 Payload 中包含 `publish_at` 字段

Payload 示例：

```json
{"publish_at":1773579492999,"msg":10000}
```

以下发布者脚本同时支持固定间隔发送和指定 TPS 的持续时长测试，可用于功能验证或持续负载测试。

```python
#!/usr/bin/env python3
import argparse, json, random, time
from paho.mqtt import client as mqtt_client
parser = argparse.ArgumentParser()
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("--port", type=int, default=1883)
parser.add_argument("--topic", default="emqx/test")
parser.add_argument("--qos", type=int, default=1)
parser.add_argument("--count", type=int, default=20)
parser.add_argument("--interval-sec", type=float, default=1.0)
parser.add_argument("--tps", type=float)
parser.add_argument("--duration-sec", type=float)
parser.add_argument("--hold-sec", type=float, default=0.0)
args = parser.parse_args()
def on_connect(client, userdata, flags, reason_code, properties):
    if reason_code != 0:
        raise RuntimeError(f"connect failed: {reason_code}")
client = mqtt_client.Client(
    mqtt_client.CallbackAPIVersion.VERSION2,
    client_id=f"python-mqtt-pub-{random.randint(0, 100000)}",
)
client.on_connect = on_connect
client.connect(args.host, args.port)
client.loop_start()
count = 0
start = time.perf_counter()
if args.tps and args.duration_sec:
    interval = 1.0 / args.tps
    deadline = start + args.duration_sec
    next_send = start
    keep_sending = lambda now, sent: now < deadline
else:
    interval = args.interval_sec
    next_send = start + interval
    keep_sending = lambda now, sent: sent < args.count
while keep_sending(time.perf_counter(), count):
    now = time.perf_counter()
    if now < next_send:
        time.sleep(next_send - now)
    payload = json.dumps({"publish_at": int(time.time() * 1000), "msg": count}, separators=(",", ":"))
    result = client.publish(args.topic, payload, qos=args.qos)
    if result.rc != 0:
        raise RuntimeError(f"publish failed: {result.rc}")
    count += 1
    next_send += interval
publish_elapsed = time.perf_counter() - start
if args.hold_sec > 0:
    time.sleep(args.hold_sec)
total_elapsed = time.perf_counter() - start
print(json.dumps({
    "topic": args.topic,
    "sent": count,
    "publish_duration_sec": round(publish_elapsed, 3),
    "total_duration_sec": round(total_elapsed, 3),
    "hold_sec": args.hold_sec,
    "target_tps": args.tps,
    "actual_tps": round(count / publish_elapsed, 3) if publish_elapsed else 0.0,
}))
client.disconnect()
client.loop_stop()
```

运行示例：

```bash
python3 publisher.py \
  --host 127.0.0.1 \
  --port 1883 \
  --topic emqx/test \
  --qos 1 \
  --tps 500 \
  --duration-sec 20 \
  --hold-sec 25
```

输出示例：

```text
{"topic": "emqx/test", "sent": 10001, "publish_duration_sec": 20.0, "total_duration_sec": 45.004, "hold_sec": 25.0, "target_tps": 500.0, "actual_tps": 500.038}
```

<img src="./_assets/python_1.png" alt="python_1" style="zoom:67%;" />

### 订阅者

订阅者需要：

- 连接到 EMQX
- 订阅相同的业务主题
- 使用 QoS 1
- 保持连接时长超过发布阶段

```python
#!/usr/bin/env python3
import argparse, json, random, time
from paho.mqtt import client as mqtt_client
parser = argparse.ArgumentParser()
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("--port", type=int, default=1883)
parser.add_argument("--topic", default="emqx/test")
parser.add_argument("--qos", type=int, default=1)
parser.add_argument("--duration-sec", type=float)
args = parser.parse_args()
counter = {"received": 0}
def on_connect(client, userdata, flags, reason_code, properties):
    if reason_code != 0:
        raise RuntimeError(f"connect failed: {reason_code}")
    client.subscribe(args.topic, qos=args.qos)
def on_message(client, userdata, msg):
    counter["received"] += 1
client = mqtt_client.Client(
    mqtt_client.CallbackAPIVersion.VERSION2,
    client_id=f"python-mqtt-sub-{random.randint(0, 100000)}",
)
client.on_connect = on_connect
client.on_message = on_message
client.connect(args.host, args.port)
client.loop_start()
start = time.perf_counter()
while args.duration_sec is None or time.perf_counter() - start < args.duration_sec:
    time.sleep(1)
elapsed = time.perf_counter() - start
print(json.dumps({
    "topic": args.topic,
    "received": counter["received"],
    "duration_sec": round(elapsed, 3),
    "recv_tps": round(counter["received"] / elapsed, 3) if elapsed else 0.0,
}))
client.disconnect()
client.loop_stop()
```

先启动订阅者：

```bash
python3 subscriber.py \
  --host 127.0.0.1 \
  --port 1883 \
  --topic emqx/test \
  --qos 1 \
  --duration-sec 80
```

输出示例：

```text
{"topic": "emqx/test", "received": 10001, "duration_sec": 80.268, "recv_tps": 124.596}
```

<img src="./_assets/python_2.png" alt="python_2" style="zoom:67%;" />

### 建议运行顺序

使用两个终端：

1. 先启动订阅者。
2. 再启动发布者。
3. 等待两个命令均执行完毕。
4. 消息投递完成后，再查询 TimescaleDB。

## 查看消息延迟

发布者、订阅者、规则和 TimescaleDB 动作均运行后，可通过 SQL 验证追踪记录并计算延迟。

### 如何计算延迟

各阶段延迟的计算公式如下：

- 发布客户端到 EMQX 延迟：`publish_received_at - publish_at`
- EMQX 处理延迟：`message_delivered - publish_received_at`
- EMQX 到订阅客户端延迟：`message_acked - message_delivered`
- 端到端总传输延迟：`message_acked - publish_at`

::: tip 提示

- 如果消息缺少 `publish_at` 时间戳，则无法计算发布者到 Broker 的延迟。
- 使用 QoS 0 时不会产生 `message_acked` 事件，因此无法计算订阅者侧延迟。

:::

### 查询示例

#### 1. 统计所有追踪记录总数

```sql
SELECT COUNT(*) AS total_rows
FROM mqtt_message_traces;
```

输出示例：

```text
 total_rows
------------
      30002
```

#### 2. 查看最近的发布记录

```sql
SELECT event_at, event, msg_id, publish_at, emqx_received_at, pub_clientid, topic, qos, payload
FROM mqtt_message_traces
WHERE event = 'message.publish'
ORDER BY event_at DESC
LIMIT 20;
```

此查询用于确认发布者侧时间戳和 Broker 接收时间戳均已正确存储。

#### 3. 查看最近的投递记录

```sql
SELECT event_at, event, msg_id, publish_at, emqx_received_at, emqx_delivered_at, pub_clientid, sub_clientid, topic, qos
FROM mqtt_message_traces
WHERE event = 'message.delivered'
ORDER BY event_at DESC
LIMIT 20;
```

此查询用于确认投递阶段时间戳已被捕获，且发布者和订阅者的 Client ID 均可见。

#### 4. 查看最近的确认记录

```sql
SELECT event_at, event, msg_id, publish_at, emqx_received_at, sub_ack_at, pub_clientid, sub_clientid, topic, qos
FROM mqtt_message_traces
WHERE event = 'message.acked'
ORDER BY event_at DESC
LIMIT 20;
```

此查询用于确认确认阶段数据存在，可用于计算订阅者侧延迟。

#### 5. 按事件类型统计记录数

```sql
SELECT event, COUNT(*) AS row_count
FROM mqtt_message_traces
GROUP BY event
ORDER BY event;
```

输出示例：

```text
       event       | row_count
-------------------+-----------
 message.acked     |     10001
 message.delivered |     10000
 message.publish   |     10001
```

此查询用于快速检查追踪数据集的完整性。

#### 6. 查询过去一小时内各阶段平均延迟

```sql
WITH relevant_messages AS (
  SELECT *
  FROM mqtt_message_traces
  WHERE event_at >= NOW() - INTERVAL '1 hour'
),
message_latency AS (
  SELECT
    t1.msg_id,
    COALESCE(t2.sub_clientid, t3.sub_clientid) AS sub_clientid,
    MAX(t1.publish_at) AS publish_at,
    MAX(t1.emqx_received_at) AS publish_received_at,
    MAX(t2.emqx_delivered_at) AS message_delivered,
    MAX(t3.sub_ack_at) AS message_acked
  FROM relevant_messages t1
  JOIN relevant_messages t2
    ON t1.msg_id = t2.msg_id
   AND t2.event = 'message.delivered'
  JOIN relevant_messages t3
    ON t1.msg_id = t3.msg_id
   AND t3.event = 'message.acked'
  WHERE t1.event = 'message.publish'
  GROUP BY t1.msg_id, COALESCE(t2.sub_clientid, t3.sub_clientid)
)
SELECT
  COUNT(*) AS sample_count,
  ROUND(AVG(publish_received_at - publish_at), 3) AS publisher_to_emqx_ms,
  ROUND(AVG(message_delivered - publish_received_at), 3) AS emqx_processing_ms,
  ROUND(AVG(message_acked - message_delivered), 3) AS emqx_to_subscriber_ms,
  ROUND(AVG(message_acked - publish_at), 3) AS end_to_end_ms
FROM message_latency
WHERE publish_at IS NOT NULL
  AND publish_received_at IS NOT NULL
  AND message_delivered IS NOT NULL
  AND message_acked IS NOT NULL;
```

输出示例：

```text
 sample_count | publisher_to_emqx_ms | emqx_processing_ms | emqx_to_subscriber_ms | end_to_end_ms
--------------+----------------------+--------------------+-----------------------+---------------
        10000 |                5.891 |              0.073 |                 0.273 |         6.236
```

![all_stage_delay](./_assets/all_stage_delay.png)

在正常运行的 EMQX 系统中，`emqx_processing_ms`（Broker 处理延迟）通常极低。真正耗时的部分在于发布和消费过程，即 `publisher_to_emqx_ms` 和 `emqx_to_subscriber_ms`。

**查询说明：**

- `WHERE event_at >= NOW() - INTERVAL '1 hour'` 将查询范围限定在近一小时的观测窗口，避免历史数据干扰。
- `relevant_messages` 在执行 JOIN 前缩小数据集范围。
- `COALESCE(t2.sub_clientid, t3.sub_clientid)` 确保分组结果包含订阅者身份信息。
- `GROUP BY t1.msg_id, COALESCE(t2.sub_clientid, t3.sub_clientid)` 防止不同订阅者的记录被合并为同一条聚合结果。
- `MAX(...)` 将三个事件行的时间戳合并到一条聚合行中。
- 末尾的 `WHERE ... IS NOT NULL` 确保只使用发布/投递/确认链路完整的记录计算延迟。

#### 7. 验证负值

在接受延迟结果前，先运行以下查询。若有非零结果，说明存在时间戳问题、时钟同步问题或字段映射错误。

```sql
WITH relevant_messages AS (
  SELECT *
  FROM mqtt_message_traces
  WHERE event_at >= NOW() - INTERVAL '1 hour'
),
message_latency AS (
  SELECT
    t1.msg_id,
    COALESCE(t2.sub_clientid, t3.sub_clientid) AS sub_clientid,
    MAX(t1.publish_at) AS publish_at,
    MAX(t1.emqx_received_at) AS publish_received_at,
    MAX(t2.emqx_delivered_at) AS message_delivered,
    MAX(t3.sub_ack_at) AS message_acked
  FROM relevant_messages t1
  JOIN relevant_messages t2
    ON t1.msg_id = t2.msg_id
   AND t2.event = 'message.delivered'
  JOIN relevant_messages t3
    ON t1.msg_id = t3.msg_id
   AND t3.event = 'message.acked'
  WHERE t1.event = 'message.publish'
  GROUP BY t1.msg_id, COALESCE(t2.sub_clientid, t3.sub_clientid)
)
SELECT
  COUNT(*) FILTER (WHERE publish_received_at - publish_at < 0) AS negative_client_to_emqx,
  COUNT(*) FILTER (WHERE message_delivered - publish_received_at < 0) AS negative_emqx_processing,
  COUNT(*) FILTER (WHERE message_acked - message_delivered < 0) AS negative_emqx_to_subscriber,
  COUNT(*) FILTER (WHERE message_acked - publish_at < 0) AS negative_end_to_end
FROM message_latency
WHERE publish_at IS NOT NULL
  AND publish_received_at IS NOT NULL
  AND message_delivered IS NOT NULL
  AND message_acked IS NOT NULL;
```

输出示例：

```text
 negative_client_to_emqx | negative_emqx_processing | negative_emqx_to_subscriber | negative_end_to_end
-------------------------+--------------------------+-----------------------------+---------------------
                       0 |                        0 |                           0 |                   0
```

四项均为 0 时，结果方可认为有效。若任一计数非零，应在将结果用作最终结论前检查时间戳映射和时钟同步配置。

## 结果验证

在发布最终延迟结果前，请验证以下所有条件：

- 各事件类型的追踪记录均已存在。
- 关键时间戳字段均已填充。
- 平均延迟查询返回了有意义的 `sample_count`。
- 负值验证查询四项均返回 0。
- 动作指标未显示异常失败数。

动作指标示例（来自一次验证通过的运行）：

```text
publish trace action:
  matched = 10001
  success = 10001
  failed  = 0
delivered trace action:
  matched = 10001
  success = 10000
  failed  = 1
acked trace action:
  matched = 10001
  success = 10001
  failed  = 0
```

如果任何动作存在非零的失败数，或负值延迟计数非零，应在将结果用作最终结论前修正数据路径。
