Skip to content

EMQX TablesによるフルチェーンMQTTメッセージトレーシング:レイテンシ、配信、アック分析

MQTTシステムでは、メッセージはパブリッシャーが送信し、ブローカーが受信・ルーティングし、サブスクライバーに配信し、サブスクライバーが受領確認(ACK)を返すという複数の重要な段階を経ます。メッセージ遅延、コンシューマのバックログ、ACKタイムアウト、重複消費、メッセージロスなどの問題が発生した場合、単純なブローカーの監視指標だけでは、どのチェーン区間に問題があるか特定することは困難です。

本番環境では、MQTTメッセージは以下のような経路を通過します。

  • パブリッシャーのネットワーク伝送レイテンシ
  • ブローカーの受信および内部処理レイテンシ
  • ブローカーからサブスクライバーへの配信レイテンシ
  • サブスクライバーのACK返却レイテンシ
  • 配信失敗やACK欠落などの異常事象

従来のブローカー指標監視に加え、MQTTメッセージのライフサイクルを相関・分析するトレーシング機構が必要です。

EMQX Cloudは、イベントトピック、ルールエンジン、Tablesのデータ統合を組み合わせて、MQTTメッセージライフサイクルの重要イベントをキャプチャ・相関します。この手法により、パブリッシュ時点からブローカー配信、サブスクライバーのACK確認までのメッセージチェーンの状態とタイムスタンプを記録できます。

本ガイドでは、EMQX CloudとTablesを使って以下の機能を持つフルチェーンMQTTメッセージ可観測性システムを構築する方法を説明します。

  • パブリッシュ、配信、ACK確認のチェーン分析
  • message.publishmessage.deliveredmessage.ackedイベントの相関
  • パブリッシャー、ブローカー、サブスクライバー各段階のレイテンシ計測
  • ACK欠落、不完全チェーン、メッセージドロップの検出
  • メッセージがサブスクライバーに正常配信されたことの検証
  • SQLによるメッセージライフサイクル集計・分析による障害解析

前提条件

開始前に以下を満たしていることを確認してください。

  • EMQX v5のデプロイメントが作成されアクセス可能であること
  • Tablesがデプロイされアクセス可能であること
  • パブリッシャーとサブスクライバーのシステムクロックが同期されていること
  • パブリッシャーが各MQTTメッセージのペイロードにpublish_atタイムスタンプを含めていること
  • QoS 1またはQoS 2を使用し、アックイベントのキャプチャを可能にしていること
  • クライアントとデータベース間で一貫したタイムゾーンでタイムスタンプを扱っていること(UTC推奨)

タイムスタンプ定義

トレーシングモデルで使用するタイムスタンプは以下の通りです。

  • publish_at:パブリッシャーがMQTTペイロードに書き込むクライアント側のタイムスタンプ
  • publish_received_at:EMQXがPUBLISHパケットを受信した時刻
  • message_delivered:EMQXがメッセージをサブスクライバー側に配信した時刻
  • message_acked:EMQXがサブスクライバーからACKを受信した時刻

これらのタイムスタンプを用いて、伝送経路の各段階のレイテンシを計算します。

Tablesテーブルのセットアップ

メッセージトレースレコードを格納するテーブルを作成します。各メッセージ段階は別々の行として保存され、後でmsg_idを使った相関クエリが可能となります。

text
CREATE TABLE mqtt_message_traces (
    "timestamp" TIMESTAMP NOT NULL,
    event_name STRING NOT NULL,
    msg_id STRING NOT NULL,
    trace_key STRING NOT NULL,
    publish_at BIGINT,
    emqx_received_at BIGINT,
    emqx_delivered_at BIGINT,
    sub_ack_at BIGINT,
    pub_clientid STRING,
    sub_clientid STRING,
    topic STRING NOT NULL,
    qos INT NOT NULL,
    msg BIGINT,
    TIME INDEX ("timestamp"),
    PRIMARY KEY (trace_key)
)
WITH (
    ttl = '30d'
);

EMQX CloudでのTablesデータ統合設定

トレーサビリティワークフローは3つのルールと3つの書き込みアクションを使用します。3つのアクションは同じテーブルに書き込みますが、それぞれメッセージライフサイクルの異なる段階をキャプチャします。

3つのルールは同じ相関キーを使用します。

sql
id as msg_id

同じmsg_idを使って以下3つのイベントを相関します。

  • message.publish
  • message.delivered
  • message.acked

パブリッシュトレースルールの設定

最初のルールはパブリッシュ段階のデータを記録します。ワイルドカードトピックでリッスンし、すべての業務トピックの通常メッセージをキャプチャします。

ルールSQL:

sql
SELECT
  timestamp,
  event,
  id as msg_id,
  concat(id, '_publish') as trace_key,
  int(payload.publish_at) as publish_at,
  int(publish_received_at) as emqx_received_at,
  clientid as pub_clientid,
  topic,
  qos,
  int(payload.msg) as msg
FROM "#"

アクションSQL:

sql
mqtt_message_traces,event_name=${event},msg_id=${msg_id},trace_key=${trace_key},topic=${topic},pub_clientid=${pub_clientid} publish_at=${publish_at}i,emqx_received_at=${emqx_received_at}i,qos=${qos}i,msg=${msg}i ${timestamp}

パブリッシャーが業務トピック(例:emqx/test)にメッセージを送信すると、このルールがTablesにパブリッシュトレースレコードを書き込みます。

data_explorer_1

デリバリートレースルールの設定

2番目のルールはEMQXがメッセージをサブスクライバー側に配信した時刻を記録します。

ルールSQL:

sql
SELECT
  timestamp,
  event,
  id as msg_id,
  concat(id, '_delivered') as trace_key,
  int(timestamp) as emqx_delivered_at,
  from_clientid as pub_clientid,
  clientid as sub_clientid,
  topic,
  qos,
  int(payload.msg) as msg
FROM "$events/message_delivered"

アクションSQL:

sql
mqtt_message_traces,event_name=${event},msg_id=${msg_id},trace_key=${trace_key},topic=${topic},pub_clientid=${pub_clientid},sub_clientid=${sub_clientid} emqx_delivered_at=${emqx_delivered_at}i,qos=${qos}i,msg=${msg}i ${timestamp}

このレコードはブローカー側処理レイテンシとブローカーからサブスクライバーへのレイテンシ計算に使用されます。

data_explorer_2

アックトレースルールの設定

3番目のルールはアック段階のデータを記録します。アックイベントが存在する場合のみ発火するため、QoS 1またはQoS 2が必要です。

ルールSQL:

sql
SELECT
  timestamp,
  event,
  id as msg_id,
  concat(id, '_acked') as trace_key,
  int(timestamp) as sub_ack_at,
  from_clientid as pub_clientid,
  clientid as sub_clientid,
  topic,
  qos,
  int(payload.msg) as msg
FROM "$events/message_acked"

アクションSQL:

sql
mqtt_message_traces,event_name=${event},msg_id=${msg_id},trace_key=${trace_key},topic=${topic},pub_clientid=${pub_clientid},sub_clientid=${sub_clientid} sub_ack_at=${sub_ack_at}i,qos=${qos}i,msg=${msg}i ${timestamp}

このレコードはサブスクライバー側のアックレイテンシおよびエンドツーエンドの総レイテンシ計算に使用されます。

コネクター設定の注意点

論理設計上は3つのアクションは単一のTablesコネクターを共有可能です。

しかし、EMQX 5での持続的な負荷テストでは、単一コネクターの共有は不安定であることが判明しています。観測される症状は、あるアクションが書き込み失敗を蓄積し、相関クエリに利用できる完全なサンプル数が減少することです。

安定運用のためには以下の構成を推奨します。

  • パブリッシュトレースアクション用の専用コネクター
  • デリバリートレースアクション用の専用コネクター
  • アックトレースアクション用の専用コネクター

これは実装レベルの安定性選択であり、テーブルスキーマ、ルールSQL、アクションSQL、相関ロジック、レイテンシ計算式には影響しません。

Python SDKでメッセージ配信をシミュレートする

このセクションでは検証に用いるパブリッシャーとサブスクライバーの動作を説明します。

パブリッシャーは業務トピック(例:emqx/test)にMQTTメッセージを送信し、各ペイロードにpublish_atタイムスタンプを含めます。サブスクライバーは同じトピックをサブスクライブし、メッセージフロー中はオンラインを維持します。

Python依存関係をインストールします。

bash
python3 -m venv .venv && source .venv/bin/activate
pip install paho-mqtt

パブリッシャー

パブリッシャーは以下を実施します。

  • EMQXに接続
  • 対象トピックにメッセージをパブリッシュ
  • QoS 1を使用
  • 各メッセージペイロードにpublish_atフィールドを含める

ペイロード例:

json
{"publish_at":1773579492999,"msg":10000}

以下のパブリッシャースクリプトは固定間隔送信と指定TPSでの期間負荷試験の両方に対応し、機能検証および持続負荷試験に適しています。

python
#!/usr/bin/env python3
import argparse, json, random, time
from paho.mqtt import client as mqtt_client
parser = argparse.ArgumentParser()
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("--port", type=int, default=1883)
parser.add_argument("--topic", default="emqx/test")
parser.add_argument("--qos", type=int, default=1)
parser.add_argument("--count", type=int, default=20)
parser.add_argument("--interval-sec", type=float, default=1.0)
parser.add_argument("--tps", type=float)
parser.add_argument("--duration-sec", type=float)
parser.add_argument("--hold-sec", type=float, default=0.0)
parser.add_argument("--username", default=None)
parser.add_argument("--password", default=None)
args = parser.parse_args()
def on_connect(client, userdata, flags, reason_code, properties):
    if reason_code != 0:
        raise RuntimeError(f"connect failed: {reason_code}")
client = mqtt_client.Client(
    mqtt_client.CallbackAPIVersion.VERSION2,
    client_id=f"python-mqtt-pub-{random.randint(0, 100000)}",
)
client.on_connect = on_connect
if args.username:
    client.username_pw_set(args.username, args.password)
client.connect(args.host, args.port)
client.loop_start()
count = 0
start = time.perf_counter()
if args.tps and args.duration_sec:
    interval = 1.0 / args.tps
    deadline = start + args.duration_sec
    next_send = start
    keep_sending = lambda now, sent: now < deadline
else:
    interval = args.interval_sec
    next_send = start + interval
    keep_sending = lambda now, sent: sent < args.count
while keep_sending(time.perf_counter(), count):
    now = time.perf_counter()
    if now < next_send:
        time.sleep(next_send - now)
    payload = json.dumps({"publish_at": int(time.time() * 1000), "msg": count}, separators=(",", ":"))
    result = client.publish(args.topic, payload, qos=args.qos)
    if result.rc != 0:
        raise RuntimeError(f"publish failed: {result.rc}")
    count += 1
    next_send += interval
publish_elapsed = time.perf_counter() - start
if args.hold_sec > 0:
    time.sleep(args.hold_sec)
total_elapsed = time.perf_counter() - start
print(json.dumps({
    "topic": args.topic,
    "sent": count,
    "publish_duration_sec": round(publish_elapsed, 3),
    "total_duration_sec": round(total_elapsed, 3),
    "hold_sec": args.hold_sec,
    "target_tps": args.tps,
    "actual_tps": round(count / publish_elapsed, 3) if publish_elapsed else 0.0,
}))
client.disconnect()
client.loop_stop()

実行例:

bash
python3 publisher.py \
  --host XXXX.ala.dedicated.aws.mqttce.net \
  --port 1883 \
  --topic emqx/test \
  --qos 1 \
  --tps 1 \
  --duration-sec 5 \
  --hold-sec 5 \
  --username xxx \
  --password xxx

出力例:

text
{"topic": "emqx/test", "sent": 6, "publish_duration_sec": 5.002, "total_duration_sec": 10.007, "hold_sec": 5.0, "target_tps": 1.0, "actual_tps": 1.2}

サブスクライバー

サブスクライバーは以下を実施します。

  • EMQXに接続
  • 同じ業務トピックをサブスクライブ
  • QoS 1を使用
  • パブリッシュフェーズより長く接続を維持
python
#!/usr/bin/env python3
import argparse, json, random, time
from paho.mqtt import client as mqtt_client
parser = argparse.ArgumentParser()
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("--port", type=int, default=1883)
parser.add_argument("--topic", default="emqx/test")
parser.add_argument("--qos", type=int, default=1)
parser.add_argument("--duration-sec", type=float)
parser.add_argument("--username", default=None)
parser.add_argument("--password", default=None)
args = parser.parse_args()
counter = {"received": 0}
def on_connect(client, userdata, flags, reason_code, properties):
    if reason_code != 0:
        raise RuntimeError(f"connect failed: {reason_code}")
    client.subscribe(args.topic, qos=args.qos)
def on_message(client, userdata, msg):
    counter["received"] += 1
client = mqtt_client.Client(
    mqtt_client.CallbackAPIVersion.VERSION2,
    client_id=f"python-mqtt-sub-{random.randint(0, 100000)}",
)
client.on_connect = on_connect
client.on_message = on_message
if args.username:
    client.username_pw_set(args.username, args.password)
client.connect(args.host, args.port)
client.loop_start()
start = time.perf_counter()
while args.duration_sec is None or time.perf_counter() - start < args.duration_sec:
    time.sleep(1)
elapsed = time.perf_counter() - start
print(json.dumps({
    "topic": args.topic,
    "received": counter["received"],
    "duration_sec": round(elapsed, 3),
    "recv_tps": round(counter["received"] / elapsed, 3) if elapsed else 0.0,
}))
client.disconnect()
client.loop_stop()

サブスクライバーを先に起動します。

bash
python3 subscriber.py \
  --host xxxx.dedicated.aws.mqttce.net \
  --port 1883 \
  --topic emqx/test \
  --qos 1 \
  --username xxx \
  --password xxx

推奨実行手順

2つのターミナルを使い、

  1. 先にサブスクライバーを起動
  2. 次にパブリッシャーを起動
  3. 両コマンドの完了を待つ
  4. メッセージ配信完了後にEMQX Tablesをクエリ

メッセージレイテンシの確認

パブリッシャー、サブスクライバー、ルール、Tablesアクションがすべて稼働したら、SQLでトレースレコードを検証しレイテンシを計算します。

レイテンシ計算方法

各段階のレイテンシ計算式:

  • パブリッシャー→EMQX:publish_received_at - publish_at
  • EMQX処理:message_delivered - publish_received_at
  • EMQX→サブスクライバー:message_acked - message_delivered
  • エンドツーエンド合計:message_acked - publish_at

TIP

  • メッセージにpublish_atタイムスタンプがない場合、パブリッシャーからブローカーへのレイテンシは計算できません。
  • QoS 0ではmessage_ackedイベントが発生しないため、サブスクライバー側レイテンシは計測できません。

クエリ例

1. 全トレースレコード数のカウント

sql
SELECT COUNT(*) AS total_rows
FROM mqtt_message_traces;

2. 最近のパブリッシュレコードの表示

sql
SELECT
    timestamp,
    event_name,
    msg_id,
    publish_at,
    emqx_received_at,
    pub_clientid,
    topic,
    qos,
    msg
FROM mqtt_message_traces
WHERE event_name = 'message.publish'
ORDER BY timestamp DESC
LIMIT 20;

パブリッシャー側タイムスタンプとブローカー受信タイムスタンプが正しく格納されていることを確認します。

3. 最近のデリバリーレコードの表示

sql
SELECT
    "timestamp",
    event_name,
    msg_id,
    publish_at,
    emqx_received_at,
    emqx_delivered_at,
    pub_clientid,
    sub_clientid,
    topic,
    qos
FROM mqtt_message_traces
WHERE event_name = 'message.delivered'
ORDER BY "timestamp" DESC
LIMIT 20;

配信段階のデータが存在し、ブローカーからサブスクライバーへのレイテンシ計算に利用可能であることを確認します。

4. 最近のアックレコードの表示

sql
SELECT
    timestamp,
    event_name,
    msg_id,
    publish_at,
    emqx_received_at,
    sub_ack_at,
    pub_clientid,
    sub_clientid,
    topic,
    qos,
    msg
FROM mqtt_message_traces
WHERE event_name = 'message.acked'
ORDER BY timestamp DESC
LIMIT 20;

アック段階のデータが存在し、サブスクライバー側レイテンシ計算に利用可能であることを確認します。

5. イベントタイプ別レコード数のカウント

sql
SELECT
    event_name,
    COUNT(*) AS row_count
FROM mqtt_message_traces
GROUP BY event_name
ORDER BY event_name;

トレースデータセットの整合性を簡易にチェックします。

6. 過去1時間の各段階平均レイテンシ計算

sql
WITH publish_events AS (
  SELECT
    msg_id,
    MAX(publish_at) AS publish_at,
    MAX(emqx_received_at) AS publish_received_at
  FROM mqtt_message_traces
  WHERE event_name = 'message.publish'
    AND timestamp >= now() - INTERVAL '1 hour'
  GROUP BY msg_id
),
ack_events AS (
  SELECT
    msg_id,
    MAX(sub_ack_at) AS message_acked
  FROM mqtt_message_traces
  WHERE event_name = 'message.acked'
    AND timestamp >= now() - INTERVAL '1 hour'
  GROUP BY msg_id
),
message_latency AS (
  SELECT
    p.msg_id,
    p.publish_at,
    p.publish_received_at,
    a.message_acked
  FROM publish_events p
  JOIN ack_events a
    ON p.msg_id = a.msg_id
)
SELECT
  COUNT(*) AS sample_count,
  ROUND(AVG(publish_received_at - publish_at), 3) AS publisher_to_emqx_ms,
  ROUND(AVG(message_acked - publish_at), 3) AS end_to_end_ms
FROM message_latency
WHERE publish_at IS NOT NULL
  AND publish_received_at IS NOT NULL
  AND message_acked IS NOT NULL;

正常なEMQXシステムでは、ブローカー処理レイテンシは非常に低く、主なレイテンシはパブリッシュおよびコンシュームフェーズ(publisher_to_emqx_msemqx_to_subscriber_ms)に起因します。

クエリのポイント:

  • WHERE timestamp >= now() - INTERVAL '1 hour'で1時間の観測ウィンドウに限定し、過去データの影響を排除
  • publish_eventsack_eventsで対象データを絞り込み
  • MAX(...)で複数行のタイムスタンプを単一集約行にまとめる
  • 最終的にWHERE ... IS NOT NULLで完全なパブリッシュ/アックチェーンのみを対象にレイテンシ計算

7. 負のレイテンシ値の検証

レイテンシ結果を受け入れる前に以下クエリを実行してください。非ゼロ結果はタイムスタンプ問題、クロック同期不良、フィールドマッピング誤りを示します。

sql
WITH publish_events AS (
  SELECT
    msg_id,
    MAX(publish_at) AS publish_at,
    MAX(emqx_received_at) AS publish_received_at
  FROM mqtt_message_traces
  WHERE event_name = 'message.publish'
    AND timestamp >= now() - INTERVAL '1 hour'
  GROUP BY msg_id
),
ack_events AS (
  SELECT
    msg_id,
    MAX(sub_ack_at) AS message_acked
  FROM mqtt_message_traces
  WHERE event_name = 'message.acked'
    AND timestamp >= now() - INTERVAL '1 hour'
  GROUP BY msg_id
),
message_latency AS (
  SELECT
    p.msg_id,
    p.publish_at,
    p.publish_received_at,
    a.message_acked
  FROM publish_events p
  JOIN ack_events a
    ON p.msg_id = a.msg_id
)
SELECT
  SUM(CASE WHEN publish_received_at - publish_at < 0 THEN 1 ELSE 0 END) AS negative_publisher_to_emqx,
  SUM(CASE WHEN message_acked - publish_at < 0 THEN 1 ELSE 0 END) AS negative_end_to_end
FROM message_latency
WHERE publish_at IS NOT NULL
  AND publish_received_at IS NOT NULL
  AND message_acked IS NOT NULL;

すべての値が0でなければなりません。非ゼロの場合は、タイムスタンプマッピングやクロック同期設定を確認し、結果を最終結論として使用しないでください。

結果検証

最終的なレイテンシ結果を公開する前に、以下をすべて検証してください。

  1. message.publishおよびmessage.ackedイベントタイプのトレースレコードが存在すること
  2. 主要なタイムスタンプフィールドpublish_atemqx_received_atsub_ack_atが正しく格納されていること
  3. 平均レイテンシクエリが有効なsample_countを返すこと
  4. 負の値検証クエリがすべて0を返すこと
  5. EMQXルールアクションのメトリクスに異常な失敗数がないこと