Data Integrationによるメッセージパブリッシュおよびドロップイベントトピックメッセージのキャプチャ
メッセージパブリッシュおよびドロップイベントトピックは、EMQXが提供するシステムイベントトピックであり、ブローカー内のメッセージのライフサイクル(パブリッシュ、配信、アック、ドロップイベントなど)をリアルタイムで監視するためのものです。これらのイベントは、メッセージフローの解析、メッセージロスの診断、メッセージスループットの測定に不可欠です。通常のクライアントはこれらのシステムトピックを直接サブスクライブできませんが、ルールエンジンを利用してキャプチャし、データベースへの保存や他トピックへの転送、リアルタイム監視や分析に活用できます。
EMQXブローカーはMQTTブローカーとして機能し、ストレージサービスではないため、またセキュリティやプライバシーの制約から、メッセージ送受信やドロップの履歴をデフォルトで保存しません。したがって、メッセージパブリッシュおよびドロップイベントトピックに基づくデータ統合ルールの設定を推奨します。これにより、メッセージのライフサイクル情報、信頼性指標、システムの配信品質をリアルタイムで収集・分析・永続化でき、パフォーマンス最適化やトラブルシューティング、監査分析に役立ちます。
本ページでは、メッセージパブリッシュおよびドロップイベントトピックのユースケースを解説し、ルールエンジンによるイベントメッセージのキャプチャと処理方法を示し、MQTTX Desktopを使ったイベントメッセージ発生のシミュレーション方法を紹介します。
ユースケース
メッセージパブリッシュおよびドロップイベントトピックは、以下のような幅広い運用およびビジネスシナリオに適用可能です。
- メッセージトレーシングおよび配信経路監視:クライアントからのパブリッシュから正常配信までのメッセージライフサイクル全体を追跡し、メッセージの滞留やレイテンシ、配信失敗を特定します。
- ビジネスメトリクスおよびスループット分析:クライアントやトピックごとのパブリッシュ数・配信数をカウントし、アプリケーション負荷やユーザー行動パターンを把握します。
- 異常検知およびアラート:サブスクライバー不在や配信失敗、キューオーバーフローによるメッセージドロップを検知し、イベントトピックで詳細なドロップ情報を取得します。
- セキュリティ監査およびアクセスログ:どのクライアントがどのメッセージをパブリッシュし、正常に配信・消費されたかを記録し、監査やコンプライアンス要件に対応します。
トリガー条件と主要フィールド
ルーティング中のメッセージドロップ ($events/message_dropped)
- トリガー条件:メッセージにマッチするサブスクライバーが存在しない場合に発生します。
- 主要フィールド:
| フィールド名 | 説明 |
|---|---|
| clientid | メッセージパブリッシャーのクライアントID |
| username | メッセージパブリッシャーのユーザー名 |
| topic | メッセージのトピック |
| reason | メッセージがドロップされた理由 |
| qos | メッセージのQoSレベル |
| timestamp | ドロップイベントのタイムスタンプ(ミリ秒) |
| publish_received_at | PUBLISHパケットがブローカーに到達したタイムスタンプ(ミリ秒) |
- 主なドロップ理由:
| 理由コード | 説明 |
|---|---|
| no_subscribers | トピックにサブスクライバーが存在しない |
| receive_maximum_exceeded | QoS 2のawaiting_relキューが満杯 |
| packet_identifier_inuse | QoS 2で使用されるパケットIDがまだ解放されていない |
- 典型的なユースケース:サブスクライバー不在によるメッセージドロップの監視。
配信中のメッセージドロップ ($events/delivery_dropped)
- トリガー条件:サブスクライバーのメッセージキューが満杯の場合に発生します。
- 主要フィールド:
| フィールド名 | 説明 |
|---|---|
| from_clientid | メッセージパブリッシャーのクライアントID |
| from_username | メッセージパブリッシャーのユーザー名 |
| topic | メッセージのトピック |
| reason | メッセージがドロップされた理由 |
| qos | メッセージのQoSレベル |
| timestamp | ドロップイベントのタイムスタンプ(ミリ秒) |
| publish_received_at | PUBLISHパケットがブローカーに到達したタイムスタンプ(ミリ秒) |
- 主なドロップ理由:
| 理由コード | 説明 |
|---|---|
| no_local | クライアントが自身のパブリッシュしたメッセージを受信できない(MQTT 5のNo Local=1) |
| expired | メッセージまたはセッションの有効期限切れ |
| queue_full | サブスクライバーのキューが満杯(QoS > 0)、通常は消費遅延や長期オフラインが原因 |
| qos0_msg | QoS 0メッセージがキュー満杯のためドロップ |
- 典型的なユースケース:
- サブスクライバーの消費能力やオフライン期間の監視
- メッセージバックログや配信失敗原因の分析
$events/message_droppedと併用し、アップリンク・ダウンリンク両面のメッセージドロップ挙動を追跡
メッセージ配信イベント ($events/message_delivered)
- トリガー条件:EMQXがメッセージをサブスクライバーに正常送信した際に発生(クライアントのアックではなく、ブローカー側の配信完了を示す)。
- 主要フィールド:
| フィールド名 | 説明 |
|---|---|
| clientid | 対象クライアントID |
| from_clientid | メッセージパブリッシャーのクライアントID |
| username | 対象クライアントのユーザー名 |
| topic | メッセージのトピック |
| qos | メッセージのQoSレベル |
| timestamp | イベントのタイムスタンプ(ミリ秒) |
| publish_received_at | PUBLISHパケットがブローカーに到達したタイムスタンプ(ミリ秒) |
- 典型的なユースケース:
- メッセージ配信成功率の測定
- メッセージ配信レイテンシの監視
$events/message_ackedと組み合わせてアックレイテンシ解析(QoS 1/2)
メッセージアックイベント ($events/message_acked)
- トリガー条件:クライアントがPUBACKまたはPUBCOMPを送信して受信をアックした際に発生(QoS 1およびQoS 2のみ)。
- 主要フィールド:
| フィールド名 | 説明 |
|---|---|
| id | MQTTメッセージID(レイテンシ追跡に有用) |
| clientid | 対象クライアントID |
| from_clientid | パブリッシャークライアントID |
| username | 対象クライアントのユーザー名 |
| topic | メッセージのトピック |
| qos | メッセージのQoSレベル |
| timestamp | イベントのタイムスタンプ(ミリ秒) |
| publish_received_at | PUBLISHパケットがブローカーに到達したタイムスタンプ(ミリ秒) |
- 典型的なユースケース:
- メッセージ配信からアックまでのレイテンシ計測
- QoS 1 / QoS 2メッセージのアック率分析
- ミッションクリティカルなワークフローの信頼性確保
TIP
イベントトピックの全フィールド一覧は、MQTT Eventsを参照してください。
ドロップイベントトピックメッセージをキャプチャするためのデータ統合設定
実際の運用では、イベントトピックは主に以下の2つの方法で処理されます。
- メッセージの再パブリッシュ:イベントメッセージを別のMQTTトピックに再パブリッシュします。軽量かつリアルタイムで、ネイティブなMQTTワークフローと親和性があります。
- 外部サービスへの転送:イベントメッセージをデータベースやメッセージキュー、HTTPサービスなどの外部システムに送信し、永続的な分析や下流システムとの連携を実現します。
本節ではメッセージの再パブリッシュ方法のみを示します。外部サービスへの転送方法は、メッセージ配信イベントメッセージをHTTPサービスに転送するを参照してください。
その他のデータベースや外部サービスへの転送方法は、公式ドキュメントのEMQX Cloud データ統合をご覧ください。
メッセージ再パブリッシュ用ルールとアクションの作成
ここでは、メッセージドロップイベントトピックメッセージを別トピックに再パブリッシュする方法を示します。
ルールとアクションの作成手順
データ統合のデータ転送でRepublishを選択します。すでにコネクターが存在する場合はCreate Connectorをクリックし、Republishを選びます。
SQLエディターでルールSQLを定義します。例えば、メッセージドロップイベントを診断するには以下のSQLを使用します。
sqlSELECT clientid, reason, topic, qos, timestamp FROM "$events/message_dropped"Nextをクリックしてアクションを追加します。
アクションを設定します。
Connector:デフォルトの
Republishを使用します。Topic:ターゲットトピックを
message_droppedに設定します。Payload、QoS、Retainはデフォルトのままにします。
Confirmをクリックして設定を完了します。
ルールとアクションのテスト
MQTTXなどのMQTTクライアントを使用します。
- ClientIDが
pubとsubの2つのMQTTX接続を作成します。 subクライアントはmessage_droppedをサブスクライブします。pubがトピックtestにメッセージをパブリッシュすると、subクライアントは以下のようなメッセージを受信します。
{
"topic": "test",
"timestamp": 1761309999897,
"reason": "no_subscribers",
"qos": 1,
"clientid": "pub"
}このペイロードは、メッセージがEMQXに到達した時点でトピックにサブスクライバーが存在しなかったためドロップされたことを示しています。
メッセージ配信イベントメッセージをHTTPサービスに転送する
ここでは、メッセージ配信イベントメッセージをHTTPサービスに転送する方法を示します。開始前に、コネクターにプライベートIPでアクセスするためのVPCピアリング接続を作成するか、パブリックIPでアクセスするためにNATゲートウェイを有効にしてください。
HTTPサーバーコネクターの作成
データ統合のWebサービスでHTTP Serverを選択します。すでにコネクターがある場合はCreate Connectorを選び、HTTP Serverを選択します。
転送先HTTPエンドポイントのURLを入力し、必要に応じてその他の設定を調整します。
URLはイベントメッセージを受信するHTTPサービスを指し、コネクターはルールで定義されたペイロードをPOSTリクエストで送信します。
Testをクリックして接続確認を行います。
Newをクリックしてコネクター作成を完了します。
ルールとアクションの作成
Create Ruleをクリックします。
SQLエディターで以下のようにSQLを定義します。
sqlSELECT from_clientid, clientid, username, payload, topic, qos FROM "$events/message_delivered"Nextをクリックしてアクションを作成します。
先ほど作成したHTTP Serverコネクターを選択し、その他の設定はデフォルトのままにします。
Confirmをクリックします。
TIP
HTTPサービスへのデータ転送の全手順と設定については、公式ドキュメントのHTTP ServerへのMQTTデータ取り込みを参照してください。
ルールとアクションのテスト
- ClientIDが
pubとsubの2つのMQTTX接続を作成します。 subはトピックtestをサブスクライブします。pubがtestにメッセージをパブリッシュします。- HTTPサービスは以下のようなペイロードのPOSTリクエストを受信します。
{
"username": "test",
"topic": "test",
"qos": 0,
"payload": "Hello Event",
"from_clientid": "pub",
"clientid": "sub"
}ベストプラクティスの推奨事項
- 高頻度イベント(例:
message_delivered)では、レート制限を有効にするか、ルールSQLのWHEN句でイベントをフィルタリングするか、Kafkaなどの外部システムに転送してバッファリングすることを推奨します。 - イベントトピックやビジネスモジュールごとにルールやコネクターを分けることで、保守性と分析の分離性を向上させます。
- クライアントのオンライン/オフラインイベントトピックと組み合わせることで、デバイス接続性とメッセージ挙動のライフサイクル全体の監視を実現します。