データ統合
EMQXは、MQTTプロトコルを通じてIoTデバイスを接続し、リアルタイムでメッセージを送受信するMQTTメッセージングプラットフォームです。これを基盤として、EMQXのデータ統合は外部データシステムとの接続を導入し、デバイスと他の業務システムとのシームレスな統合を可能にします。
データ統合はSinkおよびSourceコンポーネントを使用して外部データシステムと接続します。SinkはMySQL、Kafka、HTTPサービスなどの外部データシステムへメッセージを送信するために使用され、SourceはMQTT、Kafka、GCP PubSubなどの外部データシステムからメッセージを受信するために使用されます。
この仕組みにより、EMQXは単なるIoTデバイス間のメッセージ送信を超えて、デバイスが生成するデータを業務全体のエコシステムに有機的に統合します。これにより、IoTアプリケーションの適用シナリオが拡大し、デバイスと業務システム間のやり取りが豊かかつ多様になります。
注意
EMQX v5.4.0以降、従来のデータブリッジはデータフローの方向に応じて分割され、SinkおよびSourceに名称変更されました。
現時点でEMQXがSourceとしてサポートする外部データシステムは以下の通りです:
- MQTTサービス
- Kafka
- GCP PubSub
本ページでは、SinkおよびSourceの動作原理、対応する外部データシステム、主な機能、管理方法について包括的に解説します。
動作原理
EMQXのデータ統合は標準機能として提供されています。MQTTメッセージングプラットフォームとして、EMQXはMQTTプロトコルを通じてIoTデバイスからデータを受信します。内蔵のルールエンジンの助けを借りて、受信したデータはルールエンジンに設定されたルールによって処理されます。ルールは処理済みデータを設定されたSink/Sourceを通じて外部データシステムに転送するアクションをトリガーします。Dashboard上のRulesやFlowデザイナーを使えば、コーディング不要で簡単にルール作成、アクションの紐付け、Sink/Sourceの作成が可能です。
内蔵ルールエンジン
様々なIoTデバイスやシステムからのデータソースは、多種多様なデータ型やフォーマットを持ちます。EMQXはSQLベースの強力な内蔵ルールエンジンを備えており、データ処理と配信の中核コンポーネントです。ルールエンジンは条件判定、文字列操作、データ型変換、圧縮・解凍など多彩な機能を持ち、複雑なデータを柔軟に扱えます。
クライアントが特定イベントをトリガーしたりメッセージがEMQXに到達した際、ルールエンジンは事前定義されたルールに基づきリアルタイムでデータを処理します。データ抽出、フィルタリング、付加情報の付与、フォーマット変換などを行い、処理済みデータを指定されたSinkへ転送します。
ルールエンジンの詳細な動作についてはルールエンジン章をご参照ください。
Sink
Sinkはルールのアクションに追加されるデータ出力コンポーネントです。デバイスがイベントをトリガーしたりメッセージがEMQXに到達すると、システムは該当ルールをマッチング・実行し、データをフィルタリング・処理します。ルールエンジンで処理されたデータは指定されたSinkに転送されます。Sink内では${var}
や${.var}
構文を使ってデータから変数を抽出し、動的にSQL文やデータテンプレートを生成するなどの処理を設定できます。その後、対応するコネクターを通じて外部データシステムにデータを送信し、メッセージの保存、データ更新、イベント通知などを実現します。
Sinkでサポートされる変数抽出構文は以下の通りです:
${var}
:ルールの出力結果から変数を抽出する構文です。例:${topic}
。ネストした変数を抽出したい場合はドット.
を使い、${payload.temp}
のように記述します。抽出対象の変数が出力結果に含まれない場合は文字列undefined
が返されます。${.}
、${.var}
:${.}
はルールの全出力結果を含むJSON文字列を抽出し、${.var}
は${var}
と同じ意味です。
Source
Sourceはデータ入力コンポーネントであり、ルールのデータソースとして機能し、ルールSQLで選択されます。
SourceはMQTTやKafkaなどの外部データシステムからメッセージをサブスクライブまたはコンシュームします。コネクターを通じて新しいメッセージが到着すると、ルールエンジンは該当ルールをマッチング・実行し、データをフィルタリング・処理します。処理後のデータは指定されたEMQXトピックにパブリッシュされ、クラウドコマンドの配信などに利用されます。
対応統合
EMQXは以下の種類のデータシステムとのデータ統合をサポートしています:
デフォルト
クラウド
TSDB
SQL
NoSQL
メッセージキュー
その他
Sinkの特徴
Sinkは以下の機能により利便性を高め、データ統合のパフォーマンスと信頼性をさらに向上させます。すべてのSinkがこれらの機能を完全に実装しているわけではありません。詳細な対応状況は各Sinkのドキュメントをご確認ください。
非同期リクエストモード
非同期リクエストモードは、メッセージのパブリッシュ・サブスクライブ処理がSinkの実行速度に影響されないように設計されています。ただし、非同期モードを有効にすると、サブスクライバーがメッセージを受信した時点で、まだ外部データシステムへの書き込みが完了していない場合があります。
EMQXはデータ処理効率向上のため、デフォルトで非同期リクエストモードを有効にしています。メッセージの配信タイミングに厳密な要件がある場合は、非同期リクエストモードを無効にしてください。
また、max_inflight
パラメータは非同期リクエスト時のメッセージ順序にも影響します。これを持つSinkでは、同一MQTTクライアントからのメッセージを順序通りに処理する必要がある場合、max_inflight
を1に設定する必要があります。
バッチモード
バッチモードは複数のデータをまとめて外部データ統合システムに書き込む機能です。バッチモード有効時、EMQXは各リクエストのデータ(単一エントリ)を一時的に蓄積し、一定時間経過または一定数のデータが溜まったタイミングでまとめて書き込みます(いずれも設定可能)。
利点:
- 書き込み効率の向上:単一メッセージ書き込みに比べ、データベースがメッセージをキャッシュや前処理できるため効率的です。
- ネットワークレイテンシの削減:バッチ書き込みによりネットワーク送信回数が減り、レイテンシが低減します。
課題:
書き込み遅延:設定された時間や件数に達するまで書き込みが遅延します。これらの設定はパラメータで調整可能です。
バッファキュー
バッファキューはSinkに一定のフォールトトレランスを提供し、データ安全性向上のため有効化が推奨されます。
各リソース接続(MQTT接続ではありません)にはバッファキュー長(容量サイズ)があり、これを超えたデータはFIFO原則に従い破棄されます。
バッファファイルの場所
Kafka Sinkの場合、ディスクキャッシュファイルはdata/kafka
に保存されます。他のSinkではdata/bufs
に保存されます。
実運用では、data
ディレクトリを高性能ディスクにマウントすることでスループットを向上できます。
プリペアドステートメント
MySQLやPostgreSQLなどのSQLデータベースでは、SQLテンプレートが事前処理され、フィールド変数を明示的に指定する必要がありません。
直接SQLを実行する場合、topicやpayloadは文字列型、qosは整数型としてシングルクォートで明示的に指定する必要があります:
INSERT INTO msg(topic, qos, payload) VALUES('${topic}', ${qos}, '${payload}');
しかし、プリペアドステートメント対応Sinkでは、SQLテンプレートはクォートなしで記述する必要があります:
INSERT INTO msg(topic, qos, payload) VALUES(${topic}, ${qos}, ${payload});
プリペアドステートメントはフィールド型を自動推論するほか、SQLインジェクション防止にも寄与しセキュリティを強化します。
フォールバックアクション
EMQX 5.9.0以降、任意のアクションに対してフォールバックアクションを定義可能です。これは、プライマリアクションがメッセージ処理に失敗した際にトリガーされ、データ信頼性と可観測性を向上させます。メッセージを別のSinkや再パブリッシュアクションなどのセカンダリターゲットにリダイレクトできます。
フォールバックアクションの用途例:
- 失敗メッセージをバックアップデータシステム(別のSinkなど)へ転送
- 失敗メッセージを監視用トピックに再パブリッシュしトラブルシューティングやアラートに活用
- プライマリアクションの一時的な問題によるデータ損失を最小化
主な特徴
- フォールバックアクションはプライマリアクションがメッセージ処理に失敗した場合のみトリガーされます。失敗には配信エラー、バッファオーバーフロー、リクエストTTL切れが含まれます。
- 常に非同期リクエストモードで動作し、独自の設定に関わらず非同期です。
- 定義されたすべてのフォールバックアクションは同時にトリガーされ、順次試行や最初の成功で停止はしません。
- フォールバックアクションは通常アクションと同じバッファリング機構を共有し、メッセージはリクエストTTLまたはバッファオーバーフローまで再試行されます。
- フォールバックアクションはさらにフォールバックアクションをトリガーしません。フォールバックアクション自身が失敗しても、その設定されたフォールバックはトリガーされません。
- フォールバックアクションのメッセージ処理はプライマリアクションや元のルールのメトリクスに影響を与えません。
フォールバックアクションの定義例
HTTPアクションmy_http
に対してフォールバックアクションを定義し、既存のMQTTアクションfallback
を利用する例です。
actions {
http {
my_http {
fallback_actions = [
{kind = reference, type = mqtt, name = fallback},
{
kind = republish,
args = {
topic = "fallback/republish/topic"
qos = 1
payload = "${payload}"
}
}
]
# その他の設定は省略
}
}
mqtt {
fallback {
fallback_actions = [
{kind = reference, type = mqtt, name = another_fallback}
]
# その他の設定は省略
}
}
}
この例では:
- HTTPアクション
my_http
が失敗した場合、メッセージは- MQTTアクション
fallback
に転送され - トピック
fallback/republish/topic
に再パブリッシュされます
- MQTTアクション
fallback
が失敗しても、そのフォールバックアクションanother_fallback
はトリガーされません。フォールバックアクションは再帰的なチェーンをサポートしません。- ただし、
fallback
が別ルールのプライマリアクションとしてトリガーされ失敗した場合は、そのフォールバックanother_fallback
が適用されます。
Sinkの状態と統計情報
Dashboard上でSinkの稼働状況や統計情報を確認し、正常に動作しているか把握できます。
稼働状態
Sinkは以下の状態を持ちます:
connecting
:ヘルスプローブが行われる前の初期状態で、外部データシステムへの接続を試みている状態connected
:Sinkが正常に接続され、正常稼働中。ヘルスプローブ失敗時は失敗の程度に応じてconnecting
またはdisconnected
に遷移する場合がありますdisconnected
:ヘルスプローブに失敗し非正常状態。設定により自動再接続を試みる場合がありますstopped
:手動で無効化された状態inconsistent
:クラスター内のノード間でSinkの状態に不一致がある状態
稼働統計
EMQXはデータ統合の稼働統計を以下のカテゴリで提供します:
- Matched(カウンター)
- Sent Successfully(カウンター)
- Sent Failed(カウンター)
- Dropped(カウンター)
- Late Reply(カウンター)
- Inflight(ゲージ)
- Queuing(ゲージ)

Matched
matched
はSinkにルーティングされたリクエスト/メッセージの総数をカウントします。各メッセージは他のメトリクスで最終的にカウントされるため、matched = success + failed + inflight + queuing + late_reply + dropped
となります。
Sent Successfully
success
は外部データシステムに正常に受信されたメッセージ数をカウントします。retried.success
は再送が1回以上あった成功メッセージ数のサブカウントであり、retried.success <= success
です。
Sent Failed
failed
は外部データシステムへの受信に失敗したメッセージ数をカウントします。retried.failed
は再送が1回以上あった失敗メッセージ数のサブカウントであり、retried.failed <= failed
です。
Dropped
dropped
は配信試行なしで破棄されたメッセージ数をカウントします。複数の具体的な理由別カテゴリを含み、計算式はdropped = dropped.expired + dropped.queue_full + dropped.resource_stopped + dropped.resource_not_found
です。
expired
:キューイング中にメッセージのTTLが切れたため破棄queue_full
:キューが最大容量に達し、メモリオーバーフロー防止のため破棄resource_stopped
:Sinkが停止中に配信を試みたメッセージresource_not_found
:Sinkが存在しない状態で配信を試みたメッセージ。主にSink削除時の競合状態で稀に発生
Late Reply
late_reply
はメッセージ送信を試みたが、基盤ドライバーからの応答がメッセージTTL切れ後に届いた場合にインクリメントされます。
TIP
late_reply
はメッセージの送信成功・失敗を示すものではなく、状態不明です。外部データシステムへの挿入成功、失敗、または接続タイムアウトのいずれもあり得ます。
Inflight
inflight
はバッファリング層で現在送信待ちのメッセージ数を示すゲージです。
Queuing
queuing
はバッファリング層で受信済みだがまだ外部データシステムに送信されていないメッセージ数を示すゲージです。