# TimescaleDBでMQTTメッセージのエンドツーエンドレイテンシを追跡する

EMQX Cloudは、イベントトピックとルールベースのデータ統合を活用して、MQTTの送信経路全体にわたるエンドツーエンドのメッセージトレーサビリティを構築できます。この手法では、パブリッシャーがメッセージを送信した瞬間から、ブローカー側の処理を経て、サブスクライバーが受信をアックした時点までのレイテンシデータを取得します。

本ガイドでは、EMQX CloudとTimescaleDBを用いてこのトレーサビリティワークフローを構築する方法を説明します。最終的に得られるのは、以下の各段階における遅延を計測する実用的なレイテンシ観測システムです。

- パブリッシャーからブローカーまで
- ブローカー側の処理
- ブローカーからサブスクライバーまで
- エンドツーエンドの総送信時間

実装では、トレースイベントをTimescaleDBに格納し、SQLクエリで生データを検査、データ整合性を検証し、各段階の平均レイテンシを算出します。

## 前提条件

開始前に以下の条件を満たしていることを確認してください。

- EMQX v5のデプロイメントが作成されアクセス可能であること。
- TimescaleDBがデプロイされアクセス可能であること。
- パブリッシャーとサブスクライバーのシステムクロックが同期されていること。
- パブリッシャーが各MQTTメッセージのペイロードに`publish_at`タイムスタンプを含めていること。
- アックイベントの取得を可能にするためにQoS 1またはQoS 2を使用していること。
- クライアントとデータベース間でタイムスタンプのタイムゾーンが一貫していること（UTC推奨）。

### タイムスタンプ定義

トレーシングモデルでは以下のタイムスタンプを使用します。

- `publish_at`：パブリッシャーがMQTTペイロードに書き込むクライアント側のタイムスタンプ。
- `publish_received_at`：EMQXがPUBLISHパケットを受信した時刻。
- `message_delivered`：EMQXがメッセージをサブスクライバー側に配信した時刻。
- `message_acked`：EMQXがサブスクライバーからのアック（ACK）を受信した時刻。

これらのタイムスタンプを用いて送信経路の各段階のレイテンシを計算します。

## TimescaleDBテーブルのセットアップ

メッセージトレースレコードを格納するテーブルを作成し、TimescaleDBのハイパーテーブルに変換します。各メッセージ段階は別々の行として保存され、後で`msg_id`を使った相関クエリが可能になります。

```sql
CREATE EXTENSION IF NOT EXISTS timescaledb;
DROP TABLE IF EXISTS mqtt_message_traces;
CREATE TABLE mqtt_message_traces
(
    event_at          TIMESTAMPZ         NOT NULL,
    event             character varying NOT NULL,
    msg_id            character varying NOT NULL,
    publish_at        bigint,
    emqx_received_at  bigint,
    emqx_delivered_at bigint,
    sub_ack_at        bigint,
    pub_clientid      character varying,
    sub_clientid      character varying,
    topic             character varying NOT NULL,
    qos               integer           NOT NULL,
    payload           text
);
```

`event_at`を時間カラムとしてハイパーテーブルに変換します。

```sql
SELECT create_hypertable(
           'mqtt_message_traces',
           'event_at',
           chunk_time_interval => interval '12 hour',
           if_not_exists => TRUE
       );
```

クエリ性能向上のためにインデックスを作成します。

- `event`列に対するインデックス（イベントタイプの高速検索用）
- `msg_id`と降順の`event_at`の複合インデックス（特定メッセージの時間経過追跡用）
- `pub_clientid`と降順の`event_at`の複合インデックス（特定パブリッシャーの解析用）

```sql
CREATE INDEX IF NOT EXISTS emqx_mqtt_message_traces_event_idx
    ON mqtt_message_traces (event);
CREATE INDEX IF NOT EXISTS emqx_mqtt_message_traces_msg_id_event_at_idx
    ON mqtt_message_traces (msg_id, event_at DESC);
CREATE INDEX IF NOT EXISTS emqx_mqtt_message_traces_pub_clientid_event_at_idx
    ON mqtt_message_traces (pub_clientid, event_at DESC);
```

## EMQX CloudでのTimescaleDBデータ統合設定

トレーサビリティワークフローは3つのルールと3つの書き込みアクションを使用します。3つのアクションは同じテーブルに書き込みますが、それぞれメッセージライフサイクルの異なる段階をキャプチャします。

3つのルールは共通の相関キーを使用します。

```
id as msg_id
```

同じ`msg_id`を使って以下3つのイベントを相関させます。

- `message.publish`
- `message.delivered`
- `message.acked`

### Publishトレースルールの設定

最初のルールはパブリッシュ段階のデータを記録します。ワイルドカードトピックでリッスンし、全ビジネストピックの通常メッセージをキャプチャします。

**ルールSQL:**

```sql
SELECT
  timestamp div 1000 as event_at,
  event,
  id as msg_id,
  int(payload.publish_at) as publish_at,
  int(publish_received_at) as emqx_received_at,
  clientid as pub_clientid,
  topic,
  qos,
  payload
FROM "#"
```

**アクションSQL:**

```sql
INSERT INTO mqtt_message_traces(
  event_at,
  event,
  msg_id,
  publish_at,
  emqx_received_at,
  emqx_delivered_at,
  sub_ack_at,
  pub_clientid,
  sub_clientid,
  topic,
  qos,
  payload
) VALUES (
  to_timestamp(${event_at}),
  ${event},
  ${msg_id},
  ${publish_at}::bigint,
  ${emqx_received_at}::bigint,
  NULL,
  NULL,
  ${pub_clientid},
  NULL,
  ${topic},
  ${qos},
  ${payload}
)
```

パブリッシャーがビジネストピック（例：`emqx/test`）にメッセージを送信すると、このルールがTimescaleDBにパブリッシュトレースレコードを書き込みます。

![action-view](./_assets/action-view.png)

### Deliveredトレースルールの設定

2番目のルールは、EMQXがメッセージをサブスクライバー側に配信したタイムスタンプを記録します。

**ルールSQL:**

```sql
SELECT
  timestamp div 1000 as event_at,
  event,
  id as msg_id,
  int(payload.publish_at) as publish_at,
  int(publish_received_at) as emqx_received_at,
  int(timestamp) as emqx_delivered_at,
  from_clientid as pub_clientid,
  clientid as sub_clientid,
  topic,
  qos,
  payload
FROM "$events/message_delivered"
```

**アクションSQL:**

```sql
INSERT INTO mqtt_message_traces(
  event_at,
  event,
  msg_id,
  publish_at,
  emqx_received_at,
  emqx_delivered_at,
  sub_ack_at,
  pub_clientid,
  sub_clientid,
  topic,
  qos,
  payload
) VALUES (
  to_timestamp(${event_at}),
  ${event},
  ${msg_id},
  ${publish_at}::bigint,
  ${emqx_received_at}::bigint,
  ${emqx_delivered_at}::bigint,
  NULL,
  ${pub_clientid},
  ${sub_clientid},
  ${topic},
  ${qos},
  ${payload}
)
```

このレコードはブローカー側の処理レイテンシとブローカーからサブスクライバーまでのレイテンシ計算に使用されます。

![rule_2nd](./_assets/rule_2nd.png)

### Ackedトレースルールの設定

3番目のルールはアック段階のデータを記録します。アックイベントが存在する場合のみ発火するため、QoS 1またはQoS 2が必要です。

**ルールSQL:**

```sql
SELECT
  timestamp div 1000 as event_at,
  event,
  id as msg_id,
  int(payload.publish_at) as publish_at,
  int(publish_received_at) as emqx_received_at,
  int(timestamp) as sub_ack_at,
  from_clientid as pub_clientid,
  clientid as sub_clientid,
  topic,
  qos,
  payload
FROM "$events/message_acked"
```

**アクションSQL:**

```sql
INSERT INTO mqtt_message_traces(
  event_at,
  event,
  msg_id,
  publish_at,
  emqx_received_at,
  emqx_delivered_at,
  sub_ack_at,
  pub_clientid,
  sub_clientid,
  topic,
  qos,
  payload
) VALUES (
  to_timestamp(${event_at}),
  ${event},
  ${msg_id},
  ${publish_at}::bigint,
  ${emqx_received_at}::bigint,
  NULL,
  ${sub_ack_at}::bigint,
  ${pub_clientid},
  ${sub_clientid},
  ${topic},
  ${qos},
  ${payload}
)
```

このレコードはサブスクライバー側のアックレイテンシとエンドツーエンドの総レイテンシ計算に使用されます。

![rule_3rd](./_assets/rule_3rd.png)

### コネクター設定に関する注意点

論理設計上は3つのアクションで単一のTimescaleDBコネクターを共有可能です。

しかし、EMQX 5での持続的な負荷テスト中に、単一のPostgreSQL/TimescaleDBコネクター共有は不安定であることが判明しました。観察される症状は、あるアクションで書き込み失敗が蓄積し、相関クエリに利用可能な完全なサンプル数が減少することです。

安定運用のための推奨設定は以下の通りです。

- パブリッシュトレースアクション用に専用コネクター
- デリバリートレースアクション用に専用コネクター
- アックトレースアクション用に専用コネクター

これは実装レベルの安定性選択であり、テーブルスキーマ、ルールSQL、アクションSQL、相関ロジック、レイテンシ計算式には影響しません。

## Python SDKを使ったメッセージ配信のシミュレーション

このセクションでは検証に使用するパブリッシャーとサブスクライバーの動作を説明します。

パブリッシャーはビジネストピック（例：`emqx/test`）にMQTTメッセージを送信し、各ペイロードに`publish_at`タイムスタンプを含めます。サブスクライバーは同じトピックをサブスクライブし、メッセージフロー全体でオンライン状態を維持します。

Pythonの依存パッケージをインストールします。

```bash
python3 -m pip install paho-mqtt
```

### パブリッシャー

パブリッシャーは以下を行います。

- EMQXに接続する
- 対象トピックにメッセージをパブリッシュする
- QoS 1を使用する
- 各メッセージペイロードに`publish_at`フィールドを含める

ペイロード例：

```json
{"publish_at":1773579492999,"msg":10000}
```

以下のパブリッシャースクリプトは、固定間隔送信と指定TPSでの期間ベース負荷テストの両方に対応しており、機能検証と持続負荷テストの両方に適しています。

```python
#!/usr/bin/env python3
import argparse, json, random, time
from paho.mqtt import client as mqtt_client
parser = argparse.ArgumentParser()
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("--port", type=int, default=1883)
parser.add_argument("--topic", default="emqx/test")
parser.add_argument("--qos", type=int, default=1)
parser.add_argument("--count", type=int, default=20)
parser.add_argument("--interval-sec", type=float, default=1.0)
parser.add_argument("--tps", type=float)
parser.add_argument("--duration-sec", type=float)
parser.add_argument("--hold-sec", type=float, default=0.0)
args = parser.parse_args()
def on_connect(client, userdata, flags, reason_code, properties):
    if reason_code != 0:
        raise RuntimeError(f"connect failed: {reason_code}")
client = mqtt_client.Client(
    mqtt_client.CallbackAPIVersion.VERSION2,
    client_id=f"python-mqtt-pub-{random.randint(0, 100000)}",
)
client.on_connect = on_connect
client.connect(args.host, args.port)
client.loop_start()
count = 0
start = time.perf_counter()
if args.tps and args.duration_sec:
    interval = 1.0 / args.tps
    deadline = start + args.duration_sec
    next_send = start
    keep_sending = lambda now, sent: now < deadline
else:
    interval = args.interval_sec
    next_send = start + interval
    keep_sending = lambda now, sent: sent < args.count
while keep_sending(time.perf_counter(), count):
    now = time.perf_counter()
    if now < next_send:
        time.sleep(next_send - now)
    payload = json.dumps({"publish_at": int(time.time() * 1000), "msg": count}, separators=(",", ":"))
    result = client.publish(args.topic, payload, qos=args.qos)
    if result.rc != 0:
        raise RuntimeError(f"publish failed: {result.rc}")
    count += 1
    next_send += interval
publish_elapsed = time.perf_counter() - start
if args.hold_sec > 0:
    time.sleep(args.hold_sec)
total_elapsed = time.perf_counter() - start
print(json.dumps({
    "topic": args.topic,
    "sent": count,
    "publish_duration_sec": round(publish_elapsed, 3),
    "total_duration_sec": round(total_elapsed, 3),
    "hold_sec": args.hold_sec,
    "target_tps": args.tps,
    "actual_tps": round(count / publish_elapsed, 3) if publish_elapsed else 0.0,
}))
client.disconnect()
client.loop_stop()
```

実行例：

```bash
python3 publisher.py \
  --host 127.0.0.1 \
  --port 1883 \
  --topic emqx/test \
  --qos 1 \
  --tps 500 \
  --duration-sec 20 \
  --hold-sec 25
```

出力例：

```text
{"topic": "emqx/test", "sent": 10001, "publish_duration_sec": 20.0, "total_duration_sec": 45.004, "hold_sec": 25.0, "target_tps": 500.0, "actual_tps": 500.038}
```

<img src="./_assets/python_1.png" alt="python_1" style="zoom:67%;" />

### サブスクライバー

サブスクライバーは以下を行います。

- EMQXに接続する
- 同じビジネストピックをサブスクライブする
- QoS 1を使用する
- パブリッシュフェーズより長く接続を維持する

```python
#!/usr/bin/env python3
import argparse, json, random, time
from paho.mqtt import client as mqtt_client
parser = argparse.ArgumentParser()
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("--port", type=int, default=1883)
parser.add_argument("--topic", default="emqx/test")
parser.add_argument("--qos", type=int, default=1)
parser.add_argument("--duration-sec", type=float)
args = parser.parse_args()
counter = {"received": 0}
def on_connect(client, userdata, flags, reason_code, properties):
    if reason_code != 0:
        raise RuntimeError(f"connect failed: {reason_code}")
    client.subscribe(args.topic, qos=args.qos)
def on_message(client, userdata, msg):
    counter["received"] += 1
client = mqtt_client.Client(
    mqtt_client.CallbackAPIVersion.VERSION2,
    client_id=f"python-mqtt-sub-{random.randint(0, 100000)}",
)
client.on_connect = on_connect
client.on_message = on_message
client.connect(args.host, args.port)
client.loop_start()
start = time.perf_counter()
while args.duration_sec is None or time.perf_counter() - start < args.duration_sec:
    time.sleep(1)
elapsed = time.perf_counter() - start
print(json.dumps({
    "topic": args.topic,
    "received": counter["received"],
    "duration_sec": round(elapsed, 3),
    "recv_tps": round(counter["received"] / elapsed, 3) if elapsed else 0.0,
}))
client.disconnect()
client.loop_stop()
```

サブスクライバーを先に起動します。

```bash
python3 subscriber.py \
  --host 127.0.0.1 \
  --port 1883 \
  --topic emqx/test \
  --qos 1 \
  --duration-sec 80
```

出力例：

```text
{"topic": "emqx/test", "received": 10001, "duration_sec": 80.268, "recv_tps": 124.596}
```

<img src="./_assets/python_2.png" alt="python_2" style="zoom:67%;" />

### 推奨実行順序

2つのターミナルを使用してください。

1. まずサブスクライバーを起動します。
2. 次にパブリッシャーを起動します。
3. 両方のコマンドが完了するまで待ちます。
4. メッセージ配信完了後にTimescaleDBをクエリします。

## メッセージレイテンシの確認

パブリッシャー、サブスクライバー、ルール、TimescaleDBアクションがすべて稼働したら、SQLでトレースレコードを検証しレイテンシを計算します。

### 遅延の計算方法

各段階のレイテンシ計算式：

- パブリッシャーからEMQXまで：`publish_received_at - publish_at`
- EMQX処理時間：`message_delivered - publish_received_at`
- EMQXからサブスクライバーまで：`message_acked - message_delivered`
- エンドツーエンド合計：`message_acked - publish_at`

::: tip

- メッセージに`publish_at`タイムスタンプがない場合、パブリッシャーからブローカーまでのレイテンシは計算できません。
- QoS 0では`message_acked`イベントが生成されないため、サブスクライバー側のレイテンシは計算できません。

:::

### クエリ例

#### 1. 全トレースレコード数のカウント

```sql
SELECT COUNT(*) AS total_rows
FROM mqtt_message_traces;
```

出力例：

```text
 total_rows
------------
      30002
```

#### 2. 最近のパブリッシュレコードを表示

```sql
SELECT event_at, event, msg_id, publish_at, emqx_received_at, pub_clientid, topic, qos, payload
FROM mqtt_message_traces
WHERE event = 'message.publish'
ORDER BY event_at DESC
LIMIT 20;
```

パブリッシャー側のタイムスタンプとブローカー受信タイムスタンプが正しく保存されていることを確認します。

#### 3. 最近のデリバリーレコードを表示

```sql
SELECT event_at, event, msg_id, publish_at, emqx_received_at, emqx_delivered_at, pub_clientid, sub_clientid, topic, qos
FROM mqtt_message_traces
WHERE event = 'message.delivered'
ORDER BY event_at DESC
LIMIT 20;
```

配信段階のタイムスタンプが取得されていること、およびパブリッシャーとサブスクライバーのクライアントIDが確認できます。

#### 4. 最近のアックレコードを表示

```sql
SELECT event_at, event, msg_id, publish_at, emqx_received_at, sub_ack_at, pub_clientid, sub_clientid, topic, qos
FROM mqtt_message_traces
WHERE event = 'message.acked'
ORDER BY event_at DESC
LIMIT 20;
```

アック段階のデータが存在し、サブスクライバー側のレイテンシ計算に利用可能であることを確認します。

#### 5. イベントタイプごとのレコード数をカウント

```sql
SELECT event, COUNT(*) AS row_count
FROM mqtt_message_traces
GROUP BY event
ORDER BY event;
```

出力例：

```text
       event       | row_count
-------------------+-----------
 message.acked     |     10001
 message.delivered |     10000
 message.publish   |     10001
```

トレースデータセットの整合性を簡易チェックできます。

#### 6. 過去1時間の各段階平均レイテンシを計算

```sql
WITH relevant_messages AS (
  SELECT *
  FROM mqtt_message_traces
  WHERE event_at >= NOW() - INTERVAL '1 hour'
),
message_latency AS (
  SELECT
    t1.msg_id,
    COALESCE(t2.sub_clientid, t3.sub_clientid) AS sub_clientid,
    MAX(t1.publish_at) AS publish_at,
    MAX(t1.emqx_received_at) AS publish_received_at,
    MAX(t2.emqx_delivered_at) AS message_delivered,
    MAX(t3.sub_ack_at) AS message_acked
  FROM relevant_messages t1
  JOIN relevant_messages t2
    ON t1.msg_id = t2.msg_id
   AND t2.event = 'message.delivered'
  JOIN relevant_messages t3
    ON t1.msg_id = t3.msg_id
   AND t3.event = 'message.acked'
  WHERE t1.event = 'message.publish'
  GROUP BY t1.msg_id, COALESCE(t2.sub_clientid, t3.sub_clientid)
)
SELECT
  COUNT(*) AS sample_count,
  ROUND(AVG(publish_received_at - publish_at), 3) AS publisher_to_emqx_ms,
  ROUND(AVG(message_delivered - publish_received_at), 3) AS emqx_processing_ms,
  ROUND(AVG(message_acked - message_delivered), 3) AS emqx_to_subscriber_ms,
  ROUND(AVG(message_acked - publish_at), 3) AS end_to_end_ms
FROM message_latency
WHERE publish_at IS NOT NULL
  AND publish_received_at IS NOT NULL
  AND message_delivered IS NOT NULL
  AND message_acked IS NOT NULL;
```

出力例：

```text
 sample_count | publisher_to_emqx_ms | emqx_processing_ms | emqx_to_subscriber_ms | end_to_end_ms
--------------+----------------------+--------------------+-----------------------+---------------
        10000 |                5.891 |              0.073 |                 0.273 |         6.236
```

![all_stage_delay](./_assets/all_stage_delay.png)

正常なEMQXシステムでは、`emqx_processing_ms`（ブローカー処理レイテンシ）は非常に低い値になることが多いです。より大きな寄与要因はパブリッシュおよびコンシュームフェーズ、すなわち`publisher_to_emqx_ms`と`emqx_to_subscriber_ms`です。

**クエリのポイント：**

- `WHERE event_at >= NOW() - INTERVAL '1 hour'`で1時間の観測ウィンドウに限定し、過去データの影響を排除。
- `relevant_messages`でJOIN前にデータセットを絞り込み。
- `COALESCE(t2.sub_clientid, t3.sub_clientid)`でサブスクライバーIDを保持。
- `GROUP BY t1.msg_id, COALESCE(t2.sub_clientid, t3.sub_clientid)`で異なるサブスクライバーのレコードが混ざらないようにする。
- `MAX(...)`で3つのイベント行のタイムスタンプを1行に集約。
- 最終的に`WHERE ... IS NOT NULL`で完全なpublish/delivered/ackedチェーンを持つレコードのみを対象にする。

#### 7. 負のレイテンシ値の検証

レイテンシ結果を受け入れる前に以下のクエリを実行してください。非ゼロの結果があれば、タイムスタンプの問題、クロック同期不良、フィールドマッピングエラーの可能性があります。

```sql
WITH relevant_messages AS (
  SELECT *
  FROM mqtt_message_traces
  WHERE event_at >= NOW() - INTERVAL '1 hour'
),
message_latency AS (
  SELECT
    t1.msg_id,
    COALESCE(t2.sub_clientid, t3.sub_clientid) AS sub_clientid,
    MAX(t1.publish_at) AS publish_at,
    MAX(t1.emqx_received_at) AS publish_received_at,
    MAX(t2.emqx_delivered_at) AS message_delivered,
    MAX(t3.sub_ack_at) AS message_acked
  FROM relevant_messages t1
  JOIN relevant_messages t2
    ON t1.msg_id = t2.msg_id
   AND t2.event = 'message.delivered'
  JOIN relevant_messages t3
    ON t1.msg_id = t3.msg_id
   AND t3.event = 'message.acked'
  WHERE t1.event = 'message.publish'
  GROUP BY t1.msg_id, COALESCE(t2.sub_clientid, t3.sub_clientid)
)
SELECT
  COUNT(*) FILTER (WHERE publish_received_at - publish_at < 0) AS negative_client_to_emqx,
  COUNT(*) FILTER (WHERE message_delivered - publish_received_at < 0) AS negative_emqx_processing,
  COUNT(*) FILTER (WHERE message_acked - message_delivered < 0) AS negative_emqx_to_subscriber,
  COUNT(*) FILTER (WHERE message_acked - publish_at < 0) AS negative_end_to_end
FROM message_latency
WHERE publish_at IS NOT NULL
  AND publish_received_at IS NOT NULL
  AND message_delivered IS NOT NULL
  AND message_acked IS NOT NULL;
```

出力例：

```text
 negative_client_to_emqx | negative_emqx_processing | negative_emqx_to_subscriber | negative_end_to_end
-------------------------+--------------------------+-----------------------------+---------------------
                       0 |                        0 |                           0 |                   0
```

4つの値すべてが0でなければ結果は有効とみなせません。非ゼロの場合はタイムスタンプマッピングやクロック同期設定を確認し、問題を解消してから結果を最終判断に使用してください。

## 結果の検証

最終的なレイテンシ結果を公開する前に、以下をすべて検証してください。

- 各イベントタイプのトレースレコードが存在すること。
- 主要なタイムスタンプフィールドが埋まっていること。
- 平均レイテンシクエリが有意な`sample_count`を返すこと。
- 負の値検証クエリが4つすべて0を返すこと。
- アクションメトリクスに異常な失敗数がないこと。

成功検証時のアクションメトリクス例：

```text
publish trace action:
  matched = 10001
  success = 10001
  failed  = 0
delivered trace action:
  matched = 10001
  success = 10000
  failed  = 1
acked trace action:
  matched = 10001
  success = 10001
  failed  = 0
```

いずれかのアクションで失敗数が非ゼロ、または負のレイテンシカウントが非ゼロの場合は、データパスの問題を修正してから結果を最終判断に使用してください。
