TimescaleDBでMQTTメッセージのエンドツーエンドレイテンシを追跡する
EMQX Cloudは、イベントトピックとルールベースのデータ統合を活用して、MQTTの送信経路全体にわたるエンドツーエンドのメッセージトレーサビリティを構築できます。この手法では、パブリッシャーがメッセージを送信した瞬間から、ブローカー側の処理を経て、サブスクライバーが受信をアックした時点までのレイテンシデータを取得します。
本ガイドでは、EMQX CloudとTimescaleDBを用いてこのトレーサビリティワークフローを構築する方法を説明します。最終的に得られるのは、以下の各段階における遅延を計測する実用的なレイテンシ観測システムです。
- パブリッシャーからブローカーまで
- ブローカー側の処理
- ブローカーからサブスクライバーまで
- エンドツーエンドの総送信時間
実装では、トレースイベントをTimescaleDBに格納し、SQLクエリで生データを検査、データ整合性を検証し、各段階の平均レイテンシを算出します。
前提条件
開始前に以下の条件を満たしていることを確認してください。
- EMQX v5のデプロイメントが作成されアクセス可能であること。
- TimescaleDBがデプロイされアクセス可能であること。
- パブリッシャーとサブスクライバーのシステムクロックが同期されていること。
- パブリッシャーが各MQTTメッセージのペイロードに
publish_atタイムスタンプを含めていること。 - アックイベントの取得を可能にするためにQoS 1またはQoS 2を使用していること。
- クライアントとデータベース間でタイムスタンプのタイムゾーンが一貫していること(UTC推奨)。
タイムスタンプ定義
トレーシングモデルでは以下のタイムスタンプを使用します。
publish_at:パブリッシャーがMQTTペイロードに書き込むクライアント側のタイムスタンプ。publish_received_at:EMQXがPUBLISHパケットを受信した時刻。message_delivered:EMQXがメッセージをサブスクライバー側に配信した時刻。message_acked:EMQXがサブスクライバーからのアック(ACK)を受信した時刻。
これらのタイムスタンプを用いて送信経路の各段階のレイテンシを計算します。
TimescaleDBテーブルのセットアップ
メッセージトレースレコードを格納するテーブルを作成し、TimescaleDBのハイパーテーブルに変換します。各メッセージ段階は別々の行として保存され、後でmsg_idを使った相関クエリが可能になります。
CREATE EXTENSION IF NOT EXISTS timescaledb;
DROP TABLE IF EXISTS mqtt_message_traces;
CREATE TABLE mqtt_message_traces
(
event_at TIMESTAMPZ NOT NULL,
event character varying NOT NULL,
msg_id character varying NOT NULL,
publish_at bigint,
emqx_received_at bigint,
emqx_delivered_at bigint,
sub_ack_at bigint,
pub_clientid character varying,
sub_clientid character varying,
topic character varying NOT NULL,
qos integer NOT NULL,
payload text
);event_atを時間カラムとしてハイパーテーブルに変換します。
SELECT create_hypertable(
'mqtt_message_traces',
'event_at',
chunk_time_interval => interval '12 hour',
if_not_exists => TRUE
);クエリ性能向上のためにインデックスを作成します。
event列に対するインデックス(イベントタイプの高速検索用)msg_idと降順のevent_atの複合インデックス(特定メッセージの時間経過追跡用)pub_clientidと降順のevent_atの複合インデックス(特定パブリッシャーの解析用)
CREATE INDEX IF NOT EXISTS emqx_mqtt_message_traces_event_idx
ON mqtt_message_traces (event);
CREATE INDEX IF NOT EXISTS emqx_mqtt_message_traces_msg_id_event_at_idx
ON mqtt_message_traces (msg_id, event_at DESC);
CREATE INDEX IF NOT EXISTS emqx_mqtt_message_traces_pub_clientid_event_at_idx
ON mqtt_message_traces (pub_clientid, event_at DESC);EMQX CloudでのTimescaleDBデータ統合設定
トレーサビリティワークフローは3つのルールと3つの書き込みアクションを使用します。3つのアクションは同じテーブルに書き込みますが、それぞれメッセージライフサイクルの異なる段階をキャプチャします。
3つのルールは共通の相関キーを使用します。
id as msg_id同じmsg_idを使って以下3つのイベントを相関させます。
message.publishmessage.deliveredmessage.acked
Publishトレースルールの設定
最初のルールはパブリッシュ段階のデータを記録します。ワイルドカードトピックでリッスンし、全ビジネストピックの通常メッセージをキャプチャします。
ルールSQL:
SELECT
timestamp div 1000 as event_at,
event,
id as msg_id,
int(payload.publish_at) as publish_at,
int(publish_received_at) as emqx_received_at,
clientid as pub_clientid,
topic,
qos,
payload
FROM "#"アクションSQL:
INSERT INTO mqtt_message_traces(
event_at,
event,
msg_id,
publish_at,
emqx_received_at,
emqx_delivered_at,
sub_ack_at,
pub_clientid,
sub_clientid,
topic,
qos,
payload
) VALUES (
to_timestamp(${event_at}),
${event},
${msg_id},
${publish_at}::bigint,
${emqx_received_at}::bigint,
NULL,
NULL,
${pub_clientid},
NULL,
${topic},
${qos},
${payload}
)パブリッシャーがビジネストピック(例:emqx/test)にメッセージを送信すると、このルールがTimescaleDBにパブリッシュトレースレコードを書き込みます。

Deliveredトレースルールの設定
2番目のルールは、EMQXがメッセージをサブスクライバー側に配信したタイムスタンプを記録します。
ルールSQL:
SELECT
timestamp div 1000 as event_at,
event,
id as msg_id,
int(payload.publish_at) as publish_at,
int(publish_received_at) as emqx_received_at,
int(timestamp) as emqx_delivered_at,
from_clientid as pub_clientid,
clientid as sub_clientid,
topic,
qos,
payload
FROM "$events/message_delivered"アクションSQL:
INSERT INTO mqtt_message_traces(
event_at,
event,
msg_id,
publish_at,
emqx_received_at,
emqx_delivered_at,
sub_ack_at,
pub_clientid,
sub_clientid,
topic,
qos,
payload
) VALUES (
to_timestamp(${event_at}),
${event},
${msg_id},
${publish_at}::bigint,
${emqx_received_at}::bigint,
${emqx_delivered_at}::bigint,
NULL,
${pub_clientid},
${sub_clientid},
${topic},
${qos},
${payload}
)このレコードはブローカー側の処理レイテンシとブローカーからサブスクライバーまでのレイテンシ計算に使用されます。

Ackedトレースルールの設定
3番目のルールはアック段階のデータを記録します。アックイベントが存在する場合のみ発火するため、QoS 1またはQoS 2が必要です。
ルールSQL:
SELECT
timestamp div 1000 as event_at,
event,
id as msg_id,
int(payload.publish_at) as publish_at,
int(publish_received_at) as emqx_received_at,
int(timestamp) as sub_ack_at,
from_clientid as pub_clientid,
clientid as sub_clientid,
topic,
qos,
payload
FROM "$events/message_acked"アクションSQL:
INSERT INTO mqtt_message_traces(
event_at,
event,
msg_id,
publish_at,
emqx_received_at,
emqx_delivered_at,
sub_ack_at,
pub_clientid,
sub_clientid,
topic,
qos,
payload
) VALUES (
to_timestamp(${event_at}),
${event},
${msg_id},
${publish_at}::bigint,
${emqx_received_at}::bigint,
NULL,
${sub_ack_at}::bigint,
${pub_clientid},
${sub_clientid},
${topic},
${qos},
${payload}
)このレコードはサブスクライバー側のアックレイテンシとエンドツーエンドの総レイテンシ計算に使用されます。

コネクター設定に関する注意点
論理設計上は3つのアクションで単一のTimescaleDBコネクターを共有可能です。
しかし、EMQX 5での持続的な負荷テスト中に、単一のPostgreSQL/TimescaleDBコネクター共有は不安定であることが判明しました。観察される症状は、あるアクションで書き込み失敗が蓄積し、相関クエリに利用可能な完全なサンプル数が減少することです。
安定運用のための推奨設定は以下の通りです。
- パブリッシュトレースアクション用に専用コネクター
- デリバリートレースアクション用に専用コネクター
- アックトレースアクション用に専用コネクター
これは実装レベルの安定性選択であり、テーブルスキーマ、ルールSQL、アクションSQL、相関ロジック、レイテンシ計算式には影響しません。
Python SDKを使ったメッセージ配信のシミュレーション
このセクションでは検証に使用するパブリッシャーとサブスクライバーの動作を説明します。
パブリッシャーはビジネストピック(例:emqx/test)にMQTTメッセージを送信し、各ペイロードにpublish_atタイムスタンプを含めます。サブスクライバーは同じトピックをサブスクライブし、メッセージフロー全体でオンライン状態を維持します。
Pythonの依存パッケージをインストールします。
python3 -m pip install paho-mqttパブリッシャー
パブリッシャーは以下を行います。
- EMQXに接続する
- 対象トピックにメッセージをパブリッシュする
- QoS 1を使用する
- 各メッセージペイロードに
publish_atフィールドを含める
ペイロード例:
{"publish_at":1773579492999,"msg":10000}以下のパブリッシャースクリプトは、固定間隔送信と指定TPSでの期間ベース負荷テストの両方に対応しており、機能検証と持続負荷テストの両方に適しています。
#!/usr/bin/env python3
import argparse, json, random, time
from paho.mqtt import client as mqtt_client
parser = argparse.ArgumentParser()
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("--port", type=int, default=1883)
parser.add_argument("--topic", default="emqx/test")
parser.add_argument("--qos", type=int, default=1)
parser.add_argument("--count", type=int, default=20)
parser.add_argument("--interval-sec", type=float, default=1.0)
parser.add_argument("--tps", type=float)
parser.add_argument("--duration-sec", type=float)
parser.add_argument("--hold-sec", type=float, default=0.0)
args = parser.parse_args()
def on_connect(client, userdata, flags, reason_code, properties):
if reason_code != 0:
raise RuntimeError(f"connect failed: {reason_code}")
client = mqtt_client.Client(
mqtt_client.CallbackAPIVersion.VERSION2,
client_id=f"python-mqtt-pub-{random.randint(0, 100000)}",
)
client.on_connect = on_connect
client.connect(args.host, args.port)
client.loop_start()
count = 0
start = time.perf_counter()
if args.tps and args.duration_sec:
interval = 1.0 / args.tps
deadline = start + args.duration_sec
next_send = start
keep_sending = lambda now, sent: now < deadline
else:
interval = args.interval_sec
next_send = start + interval
keep_sending = lambda now, sent: sent < args.count
while keep_sending(time.perf_counter(), count):
now = time.perf_counter()
if now < next_send:
time.sleep(next_send - now)
payload = json.dumps({"publish_at": int(time.time() * 1000), "msg": count}, separators=(",", ":"))
result = client.publish(args.topic, payload, qos=args.qos)
if result.rc != 0:
raise RuntimeError(f"publish failed: {result.rc}")
count += 1
next_send += interval
publish_elapsed = time.perf_counter() - start
if args.hold_sec > 0:
time.sleep(args.hold_sec)
total_elapsed = time.perf_counter() - start
print(json.dumps({
"topic": args.topic,
"sent": count,
"publish_duration_sec": round(publish_elapsed, 3),
"total_duration_sec": round(total_elapsed, 3),
"hold_sec": args.hold_sec,
"target_tps": args.tps,
"actual_tps": round(count / publish_elapsed, 3) if publish_elapsed else 0.0,
}))
client.disconnect()
client.loop_stop()実行例:
python3 publisher.py \
--host 127.0.0.1 \
--port 1883 \
--topic emqx/test \
--qos 1 \
--tps 500 \
--duration-sec 20 \
--hold-sec 25出力例:
{"topic": "emqx/test", "sent": 10001, "publish_duration_sec": 20.0, "total_duration_sec": 45.004, "hold_sec": 25.0, "target_tps": 500.0, "actual_tps": 500.038}
サブスクライバー
サブスクライバーは以下を行います。
- EMQXに接続する
- 同じビジネストピックをサブスクライブする
- QoS 1を使用する
- パブリッシュフェーズより長く接続を維持する
#!/usr/bin/env python3
import argparse, json, random, time
from paho.mqtt import client as mqtt_client
parser = argparse.ArgumentParser()
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("--port", type=int, default=1883)
parser.add_argument("--topic", default="emqx/test")
parser.add_argument("--qos", type=int, default=1)
parser.add_argument("--duration-sec", type=float)
args = parser.parse_args()
counter = {"received": 0}
def on_connect(client, userdata, flags, reason_code, properties):
if reason_code != 0:
raise RuntimeError(f"connect failed: {reason_code}")
client.subscribe(args.topic, qos=args.qos)
def on_message(client, userdata, msg):
counter["received"] += 1
client = mqtt_client.Client(
mqtt_client.CallbackAPIVersion.VERSION2,
client_id=f"python-mqtt-sub-{random.randint(0, 100000)}",
)
client.on_connect = on_connect
client.on_message = on_message
client.connect(args.host, args.port)
client.loop_start()
start = time.perf_counter()
while args.duration_sec is None or time.perf_counter() - start < args.duration_sec:
time.sleep(1)
elapsed = time.perf_counter() - start
print(json.dumps({
"topic": args.topic,
"received": counter["received"],
"duration_sec": round(elapsed, 3),
"recv_tps": round(counter["received"] / elapsed, 3) if elapsed else 0.0,
}))
client.disconnect()
client.loop_stop()サブスクライバーを先に起動します。
python3 subscriber.py \
--host 127.0.0.1 \
--port 1883 \
--topic emqx/test \
--qos 1 \
--duration-sec 80出力例:
{"topic": "emqx/test", "received": 10001, "duration_sec": 80.268, "recv_tps": 124.596}
推奨実行順序
2つのターミナルを使用してください。
- まずサブスクライバーを起動します。
- 次にパブリッシャーを起動します。
- 両方のコマンドが完了するまで待ちます。
- メッセージ配信完了後にTimescaleDBをクエリします。
メッセージレイテンシの確認
パブリッシャー、サブスクライバー、ルール、TimescaleDBアクションがすべて稼働したら、SQLでトレースレコードを検証しレイテンシを計算します。
遅延の計算方法
各段階のレイテンシ計算式:
- パブリッシャーからEMQXまで:
publish_received_at - publish_at - EMQX処理時間:
message_delivered - publish_received_at - EMQXからサブスクライバーまで:
message_acked - message_delivered - エンドツーエンド合計:
message_acked - publish_at
TIP
- メッセージに
publish_atタイムスタンプがない場合、パブリッシャーからブローカーまでのレイテンシは計算できません。 - QoS 0では
message_ackedイベントが生成されないため、サブスクライバー側のレイテンシは計算できません。
クエリ例
1. 全トレースレコード数のカウント
SELECT COUNT(*) AS total_rows
FROM mqtt_message_traces;出力例:
total_rows
------------
300022. 最近のパブリッシュレコードを表示
SELECT event_at, event, msg_id, publish_at, emqx_received_at, pub_clientid, topic, qos, payload
FROM mqtt_message_traces
WHERE event = 'message.publish'
ORDER BY event_at DESC
LIMIT 20;パブリッシャー側のタイムスタンプとブローカー受信タイムスタンプが正しく保存されていることを確認します。
3. 最近のデリバリーレコードを表示
SELECT event_at, event, msg_id, publish_at, emqx_received_at, emqx_delivered_at, pub_clientid, sub_clientid, topic, qos
FROM mqtt_message_traces
WHERE event = 'message.delivered'
ORDER BY event_at DESC
LIMIT 20;配信段階のタイムスタンプが取得されていること、およびパブリッシャーとサブスクライバーのクライアントIDが確認できます。
4. 最近のアックレコードを表示
SELECT event_at, event, msg_id, publish_at, emqx_received_at, sub_ack_at, pub_clientid, sub_clientid, topic, qos
FROM mqtt_message_traces
WHERE event = 'message.acked'
ORDER BY event_at DESC
LIMIT 20;アック段階のデータが存在し、サブスクライバー側のレイテンシ計算に利用可能であることを確認します。
5. イベントタイプごとのレコード数をカウント
SELECT event, COUNT(*) AS row_count
FROM mqtt_message_traces
GROUP BY event
ORDER BY event;出力例:
event | row_count
-------------------+-----------
message.acked | 10001
message.delivered | 10000
message.publish | 10001トレースデータセットの整合性を簡易チェックできます。
6. 過去1時間の各段階平均レイテンシを計算
WITH relevant_messages AS (
SELECT *
FROM mqtt_message_traces
WHERE event_at >= NOW() - INTERVAL '1 hour'
),
message_latency AS (
SELECT
t1.msg_id,
COALESCE(t2.sub_clientid, t3.sub_clientid) AS sub_clientid,
MAX(t1.publish_at) AS publish_at,
MAX(t1.emqx_received_at) AS publish_received_at,
MAX(t2.emqx_delivered_at) AS message_delivered,
MAX(t3.sub_ack_at) AS message_acked
FROM relevant_messages t1
JOIN relevant_messages t2
ON t1.msg_id = t2.msg_id
AND t2.event = 'message.delivered'
JOIN relevant_messages t3
ON t1.msg_id = t3.msg_id
AND t3.event = 'message.acked'
WHERE t1.event = 'message.publish'
GROUP BY t1.msg_id, COALESCE(t2.sub_clientid, t3.sub_clientid)
)
SELECT
COUNT(*) AS sample_count,
ROUND(AVG(publish_received_at - publish_at), 3) AS publisher_to_emqx_ms,
ROUND(AVG(message_delivered - publish_received_at), 3) AS emqx_processing_ms,
ROUND(AVG(message_acked - message_delivered), 3) AS emqx_to_subscriber_ms,
ROUND(AVG(message_acked - publish_at), 3) AS end_to_end_ms
FROM message_latency
WHERE publish_at IS NOT NULL
AND publish_received_at IS NOT NULL
AND message_delivered IS NOT NULL
AND message_acked IS NOT NULL;出力例:
sample_count | publisher_to_emqx_ms | emqx_processing_ms | emqx_to_subscriber_ms | end_to_end_ms
--------------+----------------------+--------------------+-----------------------+---------------
10000 | 5.891 | 0.073 | 0.273 | 6.236
正常なEMQXシステムでは、emqx_processing_ms(ブローカー処理レイテンシ)は非常に低い値になることが多いです。より大きな寄与要因はパブリッシュおよびコンシュームフェーズ、すなわちpublisher_to_emqx_msとemqx_to_subscriber_msです。
クエリのポイント:
WHERE event_at >= NOW() - INTERVAL '1 hour'で1時間の観測ウィンドウに限定し、過去データの影響を排除。relevant_messagesでJOIN前にデータセットを絞り込み。COALESCE(t2.sub_clientid, t3.sub_clientid)でサブスクライバーIDを保持。GROUP BY t1.msg_id, COALESCE(t2.sub_clientid, t3.sub_clientid)で異なるサブスクライバーのレコードが混ざらないようにする。MAX(...)で3つのイベント行のタイムスタンプを1行に集約。- 最終的に
WHERE ... IS NOT NULLで完全なpublish/delivered/ackedチェーンを持つレコードのみを対象にする。
7. 負のレイテンシ値の検証
レイテンシ結果を受け入れる前に以下のクエリを実行してください。非ゼロの結果があれば、タイムスタンプの問題、クロック同期不良、フィールドマッピングエラーの可能性があります。
WITH relevant_messages AS (
SELECT *
FROM mqtt_message_traces
WHERE event_at >= NOW() - INTERVAL '1 hour'
),
message_latency AS (
SELECT
t1.msg_id,
COALESCE(t2.sub_clientid, t3.sub_clientid) AS sub_clientid,
MAX(t1.publish_at) AS publish_at,
MAX(t1.emqx_received_at) AS publish_received_at,
MAX(t2.emqx_delivered_at) AS message_delivered,
MAX(t3.sub_ack_at) AS message_acked
FROM relevant_messages t1
JOIN relevant_messages t2
ON t1.msg_id = t2.msg_id
AND t2.event = 'message.delivered'
JOIN relevant_messages t3
ON t1.msg_id = t3.msg_id
AND t3.event = 'message.acked'
WHERE t1.event = 'message.publish'
GROUP BY t1.msg_id, COALESCE(t2.sub_clientid, t3.sub_clientid)
)
SELECT
COUNT(*) FILTER (WHERE publish_received_at - publish_at < 0) AS negative_client_to_emqx,
COUNT(*) FILTER (WHERE message_delivered - publish_received_at < 0) AS negative_emqx_processing,
COUNT(*) FILTER (WHERE message_acked - message_delivered < 0) AS negative_emqx_to_subscriber,
COUNT(*) FILTER (WHERE message_acked - publish_at < 0) AS negative_end_to_end
FROM message_latency
WHERE publish_at IS NOT NULL
AND publish_received_at IS NOT NULL
AND message_delivered IS NOT NULL
AND message_acked IS NOT NULL;出力例:
negative_client_to_emqx | negative_emqx_processing | negative_emqx_to_subscriber | negative_end_to_end
-------------------------+--------------------------+-----------------------------+---------------------
0 | 0 | 0 | 04つの値すべてが0でなければ結果は有効とみなせません。非ゼロの場合はタイムスタンプマッピングやクロック同期設定を確認し、問題を解消してから結果を最終判断に使用してください。
結果の検証
最終的なレイテンシ結果を公開する前に、以下をすべて検証してください。
- 各イベントタイプのトレースレコードが存在すること。
- 主要なタイムスタンプフィールドが埋まっていること。
- 平均レイテンシクエリが有意な
sample_countを返すこと。 - 負の値検証クエリが4つすべて0を返すこと。
- アクションメトリクスに異常な失敗数がないこと。
成功検証時のアクションメトリクス例:
publish trace action:
matched = 10001
success = 10001
failed = 0
delivered trace action:
matched = 10001
success = 10000
failed = 1
acked trace action:
matched = 10001
success = 10001
failed = 0いずれかのアクションで失敗数が非ゼロ、または負のレイテンシカウントが非ゼロの場合は、データパスの問題を修正してから結果を最終判断に使用してください。