他の MQTT サービスとのブリッジ
MQTT ブローカーのデータ統合により、EMQX は別の EMQX クラスターや他の MQTT サービスに接続してメッセージブリッジを実現し、ネットワークやサービスを跨いだデータの相互作用および通信を可能にします。本ページでは、EMQX における MQTT メッセージブリッジの動作原理を紹介し、メッセージブリッジの作成と検証に関する実践的なガイドを提供します。
動作原理
ブリッジング中、EMQX はクライアントとしてターゲットサービスと MQTT 接続を確立し、パブリッシュ・サブスクライブモデルを通じて双方向のメッセージ送受信を実現します。
- 送信メッセージ(Sink):ローカルのトピックからメッセージをパブリッシュし、リモート MQTT サービスの指定トピックへ送信します。
- 受信メッセージ(Source):リモート MQTT サービスのトピックをサブスクライブし、そのメッセージを EMQX ローカルに転送します。
EMQX は同一接続上で複数のブリッジングルールを設定可能で、それぞれ異なるトピックマッピングやメッセージ変換ルールを持つことで、メッセージルーティングに類似した機能を実装しています。ブリッジング中はルールエンジンを介してメッセージのフィルタリング、拡充、変換処理も行えます。
以下の図は、EMQX と他の MQTT サービス間のデータ統合の典型的なアーキテクチャを示しています。
特長とメリット
MQTT ブローカーのデータ統合は以下の特長とメリットを備えています。
- 広範な互換性:標準 MQTT プロトコルを使用しているため、AWS IoT Core、Azure IoT Hubs をはじめとする各種 IoT プラットフォームや、オープンソースや業界標準の MQTT ブローカー、IoT プラットフォームとのブリッジが可能です。これにより多様なデバイスやプラットフォームとのシームレスな統合と通信を実現します。
- 双方向データフロー:双方向のデータフローをサポートし、EMQX からリモート MQTT サービスへのメッセージパブリッシュと、リモート MQTT サービスからのメッセージサブスクライブおよびローカルパブリッシュを可能にします。この双方向通信により、異なるシステム間のデータ転送が柔軟かつ制御可能になります。
- 柔軟なトピックマッピング:MQTT のパブリッシュ・サブスクライブモデルに基づき、トピックにプレフィックスを付加したり、クライアントのコンテキスト情報(クライアント ID、ユーザー名など)を用いて動的にトピックを構築する柔軟なトピックマッピングを実装しています。これにより、ニーズに応じたメッセージのカスタム処理やルーティングが可能です。
- 高性能:コネクションプーリングや共有サブスクリプションなどのパフォーマンス最適化オプションを提供し、個々のブリッジクライアントの負荷を軽減してブリッジレイテンシの低減とメッセージスループットの向上を実現します。これらの最適化により、システム全体の性能とスケーラビリティが向上します。
- ペイロード変換:SQL ルールを定義してメッセージペイロードの処理が可能です。メッセージ送信時にペイロードの抽出、フィルタリング、拡充、変換などの操作を行えます。例えば、リアルタイムメトリクスをペイロードから抽出し、変換・処理してからリモート MQTT ブローカーに配信できます。
- メトリクス監視:各 Sink/Source ごとにランタイムのメトリクス監視を提供し、総メッセージ数、成功/失敗数、現在のレートなどを確認できます。これにより、Sink/Source のパフォーマンスや状態をリアルタイムに監視・評価できます。
MQTT 接続情報の準備
MQTT ブローカーのデータ統合を作成する前に、リモート MQTT サービスの接続情報を取得する必要があります。主な情報は以下の通りです。
- MQTT サービスアドレス:ターゲット MQTT サービスのアドレスとポート、例:
broker.emqx.io:1883
- ユーザー名:接続に必要なユーザー名。認証不要の場合は空欄で構いません。
- パスワード:接続に必要なパスワード。認証不要の場合は空欄で構いません。
- プロトコルタイプ:ターゲットサービスが TLS を有効にしているか、TCP/TLS 上の MQTT を使用しているかを確認してください。EMQX MQTT ブリッジは現在、MQTT over WebSocket や MQTT over QUIC などのプロトコルには対応していません。
- プロトコルバージョン:ターゲット MQTT サービスが使用するプロトコルバージョン。EMQX は MQTT 3.1、3.1.1、MQTT 5.0 をサポートしています。
データ統合は EMQX や他の標準 MQTT サーバーに対して高い互換性とサポートを提供します。その他の MQTT サービスに接続する場合は、該当サービスのドキュメントを参照して接続情報を取得してください。一般的に多くの IoT プラットフォームは標準的な MQTT アクセス方法を提供しており、それに基づきデバイス情報を上記の MQTT 接続情報に変換できます。
注意
EMQX がクラスター運用中、またはコネクションプールを有効にしている場合、同一クライアント ID を使って複数ノードが同じ MQTT サービスに接続するとデバイス競合が発生しやすくなります。そのため、MQTT メッセージブリッジでは固定クライアント ID の設定は現在サポートしていません。
コネクターの作成
ここでは、EMQX の オンライン MQTT サーバー を例に、リモート MQTT サーバーとの接続設定方法を説明します。
- ダッシュボードの Integration -> Connector ページに移動します。
- ページ右上の Create をクリックします。
- コネクタータイプ一覧から MQTT Broker を選択し、Next をクリックします。
- コネクターの name を入力します。英数字の組み合わせで、例:
my_mqtt_bridge
。 - 接続情報を設定します:
- MQTT Broker:TCP/TLS 上の MQTT のみサポート。
broker.emqx.io:1883
と設定します。 - ClientID Prefix:空欄でも構いません。実際の運用ではクライアント ID プレフィックスを指定するとクライアント管理が容易になります。EMQX はクライアント ID プレフィックスとコネクションプールのサイズに基づき自動でクライアント ID を生成します。詳細は Connection Pool and Client ID Generation Rules を参照してください。
- Username と Password:このサーバーは認証不要なので空欄で構いません。
- MQTT Broker:TCP/TLS 上の MQTT のみサポート。
その他の設定はデフォルトのままにし、Create ボタンをクリックしてコネクターを作成します。作成したコネクターは Sink と Source の両方に使用可能です。次に、このコネクターを基にデータブリッジルールを作成できます。
コネクションプールとクライアント ID 生成ルール
EMQX は複数クライアントが同時にブリッジ先 MQTT サービスに接続可能です。コネクター作成時に MQTT クライアントのコネクションプールを設定でき、そのサイズを指定します。コネクションプールはサーバーリソースを最大限活用し、メッセージスループットと同時接続性能を向上させ、高負荷・高同時接続シナリオに対応します。
MQTT プロトコルでは、MQTT サーバーに接続するクライアントは一意のクライアント ID を持つ必要があります。EMQX はクラスター展開が可能なため、MQTT ブリッジの各クライアントには固有のクライアント ID が割り当てられます。EMQX は以下のパターンでクライアント ID を自動生成します。
[Client ID Prefix]:{Connector Name}{8桁のランダム文字列}:{プール内の接続シーケンス番号}
例えば、クライアント ID プレフィックスが myprefix
、コネクター名が foo
の場合、実際のクライアント ID は以下のようになります。
myprefix:foo2bd61c44:1
バージョン 5.4.1 以降、EMQX は MQTT クライアント ID の長さを最大 23 バイトに制限しています。これを超える場合はハッシュ値に置き換えられます。プレフィックスやコネクター名が長すぎるとユーザー体験が悪化する可能性があります。
この問題に対応し、バージョン 5.7.1 以降では以下のルールを実装しています。
- プレフィックスなし:動作は変わらず、23 バイト超のクライアント ID はハッシュ化されます。
- プレフィックスあり:
- プレフィックスが最大 19 バイト:プレフィックスは保持され、残り部分は 4 バイトのハッシュに変換され、全体を 23 バイト以内に収めます。
- プレフィックスが 20 バイト以上:設定されたプレフィックスをそのまま使用し、クライアント ID の短縮は行いません。
静的クライアント ID の設定
特定のユースケースでは、統合に使用するクライアント ID が限られている場合があります。この場合、各ノードに静的クライアント ID のセットを割り当ててコネクターを設定可能です。コネクター設定時に EMQX クラスターの各ノードごとにクライアント ID のリストを指定します。例は以下の通りです。
ノード | クライアント ID |
---|---|
emqx@10.0.0.1 | clientid1 , clientid3 |
emqx@10.0.0.2 | clientid2 |
emqx@10.0.0.3 | clientid4 , clientid5 |
静的クライアント ID は設定ファイル経由でのみ設定可能で、ダッシュボード UI からは設定できません。各ノードの設定ファイルに static_clientids
パラメータを個別に定義してください。
静的クライアント ID を設定した場合は、これらのクライアント ID を使った MQTT 接続のみが起動され、pool_size
や clientid_prefix
などの動的クライアント ID 設定は無効になります。
MQTT Broker Sink を使ったルール作成
ここでは、リモート MQTT サービスに転送するデータを指定するルールの作成方法を説明します。
ダッシュボードの Integration -> Rules ページに移動します。
ページ右上の Create をクリックします。
ルール ID に
my_rule
と入力します。SQL Editor に、
t/#
トピックの MQTT メッセージをリモート MQTT サーバーに保存するルール SQL を入力します。例は以下の通りです。sqlSELECT * FROM "t/#"
MQTTv5 プロトコルを使用している場合、
pub_props
フィールドをルール SQL に含めると、パブリッシュプロパティがそのままリモートブローカーに転送されます。上記のSELECT * FROM t/#
はこの例に該当します。さらにユーザープロパティを動的に追加したい場合は、ルールの出力に含める
pub_props
フィールドに追加可能です。例えば、以下のルールはペイロードから取得したキーと値でユーザープロパティを追加します。sqlSELECT *, map_put(concat('User-Property.', payload.extra_key), payload.extra_value, pub_props) as pub_props FROM 't/#'
アクションを追加し、Action Type ドロップダウンから
MQTT Broker
を選択します。Action はデフォルトのCreate Action
のままにします。この例では新しい Sink を作成してルールに追加します。Sink の名前と説明をフォームに入力します。
Connector ドロップダウンから先ほど作成した
my_mqtt_bridge
コネクターを選択します。新規作成する場合は、ドロップダウン横の作成ボタンから Create a Connector の設定パラメータを使って作成可能です。Sink の設定を行い、EMQX から外部 MQTT サービスへのメッセージパブリッシュを設定します。
- Topic:外部 MQTT サービスにパブリッシュするトピック。
${var}
プレースホルダーをサポートします。ここではpub/${topic}
と入力し、元のトピックにpub/
プレフィックスを付けて転送します。例えば元のメッセージトピックがt/1
の場合、外部 MQTT サービスに転送されるトピックはpub/t/1
になります。 - QoS:メッセージパブリッシュの QoS。ドロップダウンから
0
、1
、2
、${qos}
を選択可能です。ここでは元メッセージの QoS に従うため${qos}
を選択します。 - Retain:メッセージをリテインとしてパブリッシュするかどうか。
true
、false
、${flags.retain}
を選択可能で、他のフィールドからリテインフラグを設定するプレースホルダーも使用可能です。ここでは元メッセージのリテインフラグに従うため${flags.retain}
を選択します。 - Payload:転送メッセージのペイロード生成テンプレート。デフォルトは空欄でルール出力結果を転送します。ここでは
${payload}
と入力し、ペイロードのみを転送します。
- Topic:外部 MQTT サービスにパブリッシュするトピック。
フォールバックアクション(任意):メッセージ配信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義可能です。詳細は Fallback Actions を参照してください。
その他の設定はデフォルト値のままにし、Create ボタンをクリックして Sink の作成を完了します。作成後はルール作成ページに戻り、新しい Sink がルールのアクション出力に追加されます。
ルール作成ページの下部にある Create ボタンをクリックしてルール作成を完了します。
これでルールが正常に作成されました。Integration -> Rules ページで新規ルールを確認できます。Actions(Sink) タブをクリックすると、新しい MQTT Broker Sink が表示されます。
また、Integration -> Flow Designer をクリックするとトポロジーが表示されます。トポロジーは、t/#
トピックのメッセージがルール my_rule
によって処理され、リモート MQTT ブローカーに送信される様子を視覚的に表現しています。
MQTT Broker Sink を使ったルールのテスト
MQTTX CLI を使って、EMQX の t/#
トピックから外部 MQTT サービスの pub/${topic}
トピックへメッセージがブリッジされるルールをテストできます。EMQX の t/1
トピックにメッセージをパブリッシュすると、外部 MQTT サービスの pub/t/1
トピックに転送されるはずです。
外部 MQTT サービスで
pub/#
トピックをサブスクライブします。bashmqttx sub -t pub/# -q 1 -h broker.emqx.io -v
MQTTX で
t/1
トピックにメッセージをパブリッシュします。bashmqttx pub -t t/1 -m "hello world" -r
MQTTX で
pub/t/1
トピックをサブスクライブし、メッセージが受信できれば、EMQX から外部 MQTT サービスへのメッセージ転送が成功しています。bash[2024-1-31] [16:43:13] › topic: pub/t/1 payload: hello world
ステップ1を繰り返すと、MQTTX で
pub/t/1
トピックのリテインメッセージが確認できます。bash[2024-1-31] [16:44:29] › topic: pub/t/1 payload: hello world retain: true
MQTT Broker Source を使ったルール作成
ここでは、リモート MQTT サービスからローカル EMQX へデータを転送するルールの作成方法を説明します。MQTT Source とメッセージ再パブリッシュアクションを作成し、リモート MQTT サービスから EMQX へのサブスクライブと転送を実現します。
MQTT Broker Source の作成とルールへの追加
ダッシュボードの Integration -> Rules ページに移動します。
ページ右上の Create をクリックします。
ルール ID に
my_rule_source
と入力します。ルールのトリガーソース(データ入力)を設定します。ページ右側の Data Inputs タブで、デフォルトの Message タイプの入力を削除し、Add Input をクリックして MQTT Source を作成します。
Add Input ポップアップで、Input Type ドロップダウンから
MQTT Broker
を選択し、Source ドロップダウンはデフォルトのCreate Source
のままにします。この例では新しい Source を作成してルールに追加します。Source の名前と説明をフォームに入力します。
先に作成した
my_mqtt_bridge
コネクターをドロップダウンから選択します。新規作成する場合は、ドロップダウン横の作成ボタンから Create a Connector の設定パラメータを使って作成可能です。Source の設定を行い、外部 MQTT サービスから EMQX へのサブスクライブを完了します。
Topic:サブスクライブするトピック。
+
と#
のワイルドカードをサポートします。TIP
EMQX がクラスター運用中、またはコネクターがコネクションプール設定の場合は、重複メッセージを避けるため共有サブスクリプションを使用してください。
ここでは
$share/1/f/#
と入力し、f/#
トピックにマッチするすべてのメッセージをサブスクライブします。QoS:サブスクライブの QoS。ドロップダウンから
0
または1
を選択します。
その他の設定はデフォルトのままにし、Create ボタンをクリックして Source 作成を完了します。Source はルールのデータ入力に追加されます。ルール SQL は以下のように変更されます。
sqlSELECT * FROM "$bridges/mqtt:my_source"
ルール SQL は MQTT Source から以下のフィールドを抽出可能で、データ処理に応じて SQL を調整できます。デフォルトの SQL は本例に十分です。
フィールド名 説明 topic 発信元メッセージのトピック server 接続された Source のサーバーアドレス retain メッセージがリテインかどうか。値は false qos メッセージの QoS(サービス品質) pub_props MQTT 5.0 メッセージプロパティオブジェクト。ユーザープロパティペアやその他属性を含む pub_props.User-Property-Pairs ユーザープロパティペアの配列。各ペアはキーと値を持つ例: {"key":"foo", "value":"bar"}
pub_props.User-Property ユーザープロパティオブジェクト。キーと値を持つ例: {"foo":"bar"}
pub_props.* その他含まれるメッセージプロパティのキーと値。例: Content-Type: JSON
payload メッセージ内容 message_received_at メッセージ受信タイムスタンプ(ミリ秒) id メッセージ ID dup メッセージが重複かどうか
MQTT Source の作成は完了しましたが、サブスクライブしたデータは直接 EMQX にパブリッシュされません。次に、Source でサブスクライブしたメッセージを EMQX に転送するためのメッセージ再パブリッシュアクションを作成します。
再パブリッシュアクションの作成
ルール作成ページの右側 Action Outputs タブに切り替え、Add Action ボタンをクリックします。Type of Action ドロップダウンから
Republish
アクションを選択します。メッセージ再パブリッシュの設定を行います。
- Topic:
sub/${topic}
と入力し、元のトピックにsub/
プレフィックスを付けて転送します。例えば元のメッセージトピックがf/1
の場合、EMQX に転送されるトピックはsub/f/1
になります。 - QoS:
0
、1
、2
、${qos}
から選択可能。プレースホルダーで他フィールドから設定も可能です。ここでは元メッセージの QoS に従うため${qos}
を選択します。 - Retain:
true
またはfalse
を選択し、メッセージをリテインとしてパブリッシュするか決定します。プレースホルダーで他フィールドから設定も可能です。ここではfalse
を選択します。- データソースが MQTT Source のため、
${flags.retain}
は適用されません。 ${retain}
を指定すると、外部 MQTT サービスのリテイン機構でリテインされたメッセージの場合にのみ有効です。元メッセージが EMQX にパブリッシュされた場合は適用されません。
- データソースが MQTT Source のため、
- Payload:転送メッセージのペイロード生成に使用。デフォルトは空欄でルール出力結果を転送します。ここでは
${payload}
と入力し、ペイロードのみを転送します。
- Topic:
Add ボタンをクリックしてアクション作成を完了します。ルール作成ページに戻り、新しいアクションが Action Outputs タブに追加されます。
ルール作成ページ下部の Create ボタンをクリックしてルール作成を完了します。
これでルールが正常に作成されました。Integration -> Rules ページで新規ルールを確認できます。Source タブをクリックすると、新しい MQTT Source が表示されます。
また、Integration -> Flow Designer をクリックするとトポロジーが表示されます。トポロジーから、MQTT Source のメッセージが再パブリッシュアクションを通じて sub/${topic}
に転送されている様子が確認できます。
MQTT Broker Source を使ったルールのテスト
MQTTX CLI を使って、外部 MQTT サービスの f/#
トピックから EMQX の sub/${topic}
トピックへメッセージがブリッジされるルールをテストできます。外部 MQTT サービスの f/1
トピックにメッセージをパブリッシュすると、EMQX の sub/f/1
トピックに転送されるはずです。
EMQX の
sub/#
トピックをサブスクライブします。bashmqttx sub -t sub/# -q 1 -v
MQTTX で外部 MQTT サービスの
f/1
トピックにメッセージをパブリッシュします。bashmqttx pub -t f/1 -m "I'm from broker.emqx.io" -r -h broker.emqx.io
MQTTX で
sub/f/1
トピックにメッセージが届けば、外部 MQTT サービスから EMQX へのメッセージ転送が成功しています。bash[2024-1-31] [16:49:22] › topic: sub/f/1 payload: I'm from broker.emqx.io