Skip to content

他の MQTT サービスとのブリッジ

MQTT ブローカーのデータ統合により、EMQX は別の EMQX クラスターや他の MQTT サービスに接続してメッセージブリッジを実現し、ネットワークやサービスを跨いだデータの相互連携と通信を可能にします。本ページでは、EMQX における MQTT メッセージブリッジの動作原理を紹介し、メッセージブリッジの作成および検証の実践的な手順を説明します。

動作原理

ブリッジング中、EMQX はターゲットサービスに対してクライアントとして MQTT 接続を確立し、パブリッシュ・サブスクライブモデルを通じて双方向のメッセージ送受信を実現します。

  • 送信メッセージ(Sink):ローカルのトピックからメッセージをパブリッシュし、リモート MQTT サービスの指定トピックに転送します。
  • 受信メッセージ(Source):リモート MQTT サービスのトピックをサブスクライブし、そのメッセージを EMQX ローカルに転送します。

EMQX は同一接続上で複数のブリッジルールを設定可能で、それぞれ異なるトピックマッピングやメッセージ変換ルールを持たせることで、メッセージルーティングに類似した機能を実現します。ブリッジング時にはルールエンジンを介してメッセージのフィルタリング、付加、変換処理も行えます。

以下の図は、EMQX と他の MQTT サービス間のデータ統合の典型的なアーキテクチャを示しています。

EMQX Integration MQTT

特長と利点

MQTT ブローカーのデータ統合は以下の特長と利点を持ちます。

  • 幅広い互換性:標準 MQTT プロトコルを使用しているため、AWS IoT Core、Azure IoT Hubs をはじめとする各種 IoT プラットフォームや、オープンソースやその他業界標準の MQTT ブローカーや IoT プラットフォームとブリッジ可能です。これにより、多様なデバイスやプラットフォームとのシームレスな統合と通信が可能となります。
  • 双方向データフロー:EMQX ローカルからリモート MQTT サービスへのメッセージパブリッシュと、リモート MQTT サービスからのメッセージサブスクライブおよびローカルパブリッシュの双方向データフローをサポートします。この双方向通信により、異なるシステム間のデータ転送が柔軟かつ制御可能になります。
  • 柔軟なトピックマッピング:MQTT のパブリッシュ・サブスクライブモデルに基づき、トピックにプレフィックスを付加したり、クライアントのコンテキスト情報(クライアントID、ユーザー名など)を用いて動的にトピックを構築することが可能です。この柔軟性により、用途に応じたカスタマイズ処理やメッセージルーティングが実現できます。
  • 高性能:接続プーリングや共有サブスクリプションなどのパフォーマンス最適化オプションを提供し、個々のブリッジクライアントの負荷を軽減、ブリッジのレイテンシ低減とメッセージのスループット向上を実現します。これらの最適化により、システム全体のパフォーマンスとスケーラビリティが向上します。
  • ペイロード変換:SQL ルールを定義してメッセージのペイロードを処理可能です。これにより、メッセージ送信時にデータ抽出、フィルタリング、付加、変換などの操作を行えます。例えば、ペイロードからリアルタイムメトリクスを抽出し、変換・加工してからリモート MQTT ブローカーに送信できます。
  • メトリクス監視:各 Sink/Source ごとにランタイムのメトリクス監視が提供され、総メッセージ数、成功/失敗数、現在のレートなどを確認可能です。これにより、Sink/Source のパフォーマンスや状態をリアルタイムで把握できます。

MQTT 接続情報の準備

前提条件

以下の内容を把握していることを推奨します。

MQTT ブローカーのデータ統合を作成する前に、リモート MQTT サービスの接続情報を取得する必要があります。ここでは EMQX の オンライン MQTT サーバー を例に説明します。

  • MQTT サービスアドレス:ターゲット MQTT サービスのアドレスとポート。例:broker.emqx.io:1883
  • ユーザー名:接続に必要なユーザー名。認証不要の場合は空欄可。
  • パスワード:接続に必要なパスワード。認証不要の場合は空欄可。
  • プロトコルタイプ:ターゲットサービスが TLS を有効化しているか、TCP/TLS 上の MQTT を使用しているかを確認します。EMQX MQTT ブリッジは現時点で MQTT over WebSocket や MQTT over QUIC などのプロトコルはサポートしていません。
  • プロトコルバージョン:ターゲット MQTT サービスが使用するプロトコルバージョン。EMQX は MQTT 3.1、3.1.1、MQTT 5.0 をサポートしています。

データ統合は EMQX やその他標準 MQTT サーバーとの高い互換性を持ちます。その他の種類の MQTT サービスに接続する場合は、それぞれのドキュメントを参照し接続情報を取得してください。一般的に多くの IoT プラットフォームは標準的な MQTT アクセス方法を提供しており、それらの案内に従いデバイス情報を上記の MQTT 接続情報に変換できます。

注意

EMQX がクラスター運用中、または接続プールを有効にしている場合、同一のクライアントIDで複数ノードが同一 MQTT サービスに接続するとデバイス競合が発生しやすいため、MQTT メッセージブリッジでは固定クライアントIDの設定は現在サポートしていません。

コネクターの作成

ここではリモート MQTT サーバーとの接続設定方法を説明します。

  1. ダッシュボードの Integration -> Connector ページに移動します。

  2. ページ右上の Create をクリックします。

  3. コネクタータイプ一覧から MQTT Broker を選択し、Next をクリックします。

  4. コネクターの 名前 を入力します。英数字の組み合わせで、例:my_mqtt_bridge

  5. 接続情報を設定します:

    • MQTT Broker:TCP/TLS 上の MQTT のみサポート。例:broker.emqx.io:1883

    • ClientID Prefix:空欄可。実際の運用ではクライアントIDプレフィックスを指定するとクライアント管理が容易になります。EMQX は接続プールのサイズに応じてプレフィックスを元にクライアントIDを自動生成します。詳細は 接続プールとクライアントID生成ルール を参照してください。

    • UsernamePassword:MQTT ブローカーが認証を要求する場合はクライアントIDに紐づくユーザー名とパスワードを入力します。認証不要(パブリックブローカー等)の場合は空欄可。

    • Keepalive:希望するキープアライブ間隔を指定。

    • MQTT Version:ブローカー接続に適したバージョンを選択。

    • Static ClientId Entries:Azure IoT Hubs など特定サービス接続時に安定した接続を確保するため、静的クライアントIDを設定可能です。詳細は 静的クライアントIDの設定 を参照してください。

      TIP

      静的クライアントIDエントリを定義した場合、静的クライアントIDが割り当てられた EMQX ノードのみが MQTT 接続を開始します。

その他の設定はデフォルトのままにして、Create ボタンをクリックしコネクター作成を完了します。作成したコネクターは Sink と Source の両方で利用可能です。続いて、このコネクターを用いたデータブリッジルールを作成できます。

接続プールとクライアントID生成ルール

EMQX は複数のクライアントが同時にブリッジ先 MQTT サービスに接続可能です。コネクター作成時に MQTT クライアント接続プールを設定でき、そのサイズでプール内のクライアント接続数を指定します。接続プールはサーバーリソースを最大限活用し、メッセージのスループットと同時接続性能を向上させ、高負荷・高同時接続環境でのパフォーマンスを確保します。

MQTT プロトコルでは MQTT サーバーに接続するクライアントは固有のクライアントIDを持つ必要があり、EMQX はクラスター展開可能なため、MQTT ブリッジの各クライアントに一意のクライアントIDを割り当てます。EMQX は以下のパターンでクライアントIDを自動生成します。

bash
[Client ID Prefix]:{Connector Name}{8桁のランダム文字列}:{プール内接続のシーケンス番号}

例として、クライアントIDプレフィックスが myprefix、コネクター名が foo の場合、実際のクライアントIDは以下のようになります。

bash
myprefix:foo2bd61c44:1

バージョン 5.4.1 以降、EMQX は MQTT クライアントIDの長さを 23 バイトに制限しています。これを超える場合はハッシュ値に置き換えられます。プレフィックスやコネクター名が長すぎるとユーザー体験が悪化する可能性があります。

この問題に対処するため、バージョン 5.7.1 以降は以下のルールを実装しています。

  • プレフィックスなし:従来通り、長さ 23 バイト超のクライアントIDはハッシュ化されて 23 バイトに収まる。
  • プレフィックスあり
    • 19バイト以下のプレフィックス:プレフィックスは保持され、残りのクライアントID部分は 4 バイトのハッシュに置き換えられ、全体が 23 バイト以内に収まる。
    • 20バイト以上のプレフィックス:設定されたプレフィックスをそのまま使用し、クライアントIDの短縮は行わない。

静的クライアントIDの設定

統合で使用できるクライアントIDが限定されている場合、コネクター設定時にノードごとに静的クライアントIDセットを割り当てることが可能です。静的クライアントIDを設定するには、EMQX クラスター内の各ノードに対してクライアントIDのリストを提供し、それぞれに対応するユーザー名とパスワードを指定できます。これは Azure IoT Hubs のように、各デバイス(クライアントID)に固有の認証情報が必要なシナリオで特に有用です。

静的クライアントIDを設定する手順は以下の通りです。

  1. Static ClientId Entries セクションで Add ボタンを押し、新しい静的クライアントIDエントリを追加します。必要に応じて複数ノードのエントリを追加可能です。

  2. 各エントリに以下の項目を入力します。

    • Node Name:クライアントIDを割り当てるノード名。例:emqx@10.0.0.1
    • Client ID:静的クライアントID。例:device1。ノードに複数のクライアントIDを追加する場合は Add をクリック。
      • Username:(任意)認証用のユーザー名。
      • Password:(任意)認証用パスワード。プラットフォームによりデバイス固有のキー、シークレット、証明書などが用いられます(例:Azure IoT Hubs の認証キー)。

    設定例

    ノード名クライアントIDユーザー名(任意)パスワード(任意)
    emqx@10.0.0.1clientid1username1secret1
    clientid3
    emqx@10.0.0.2clientid2username2
    emqx@10.0.0.3clientid4
    clientid5

設定ファイルでノードごとに static_clientids パラメータを個別に定義することも可能です。

静的クライアントIDを設定した場合、これらのクライアントIDを用いた MQTT 接続のみが開始され、pool_sizeclientid_prefix など動的クライアントID関連の設定は無効になります。

MQTT ブローカー Sink を使ったルールの作成

ここでは、リモート MQTT サービスに転送するデータを指定するルールの作成方法を示します。

  1. ダッシュボードの Integration -> Rules ページに移動します。

  2. ページ右上の Create をクリックします。

  3. ルールIDに my_rule を入力します。

  4. SQL エディターに、t/# トピックの MQTT メッセージをリモート MQTT サーバーに保存するルールを入力します。SQL は以下の通りです。

    sql
    SELECT
      *
    FROM
      "t/#"

    MQTTv5 プロトコルを使用している場合、pub_props フィールドをルール SQL に含めると、パブリッシュプロパティがそのままリモートブローカーに転送されます。上記の SELECT * FROM t/# の例はこれに該当します。

    ユーザープロパティを動的に追加したい場合は、ルールの出力に pub_props フィールドを含めてください。例えば、以下のルールはペイロードからキーと値を取得してユーザープロパティを追加します。

    sql
    SELECT
      *,
      map_put(concat('User-Property.', payload.extra_key), payload.extra_value, pub_props) as pub_props
    FROM
      't/#'
  5. アクションを追加し、Action Type ドロップダウンから MQTT Broker を選択します。Action はデフォルトの Create Action のままにします。この操作で新しい Sink が作成され、ルールに追加されます。

  6. Sink の名前と説明をフォームに入力します。

  7. Connector ドロップダウンから先ほど作成した my_mqtt_bridge コネクターを選択します。新規作成する場合は、ドロップダウン横の作成ボタンをクリックし、コネクターの作成 の設定パラメータを利用してください。

  8. EMQX から外部 MQTT サービスへメッセージをパブリッシュする Sink 情報を設定します。

    • Topic:外部 MQTT サービスにパブリッシュするトピック。${var} プレースホルダー対応。ここでは pub/${topic} を入力し、元のトピックに pub/ プレフィックスを付加して転送します。例えば元トピックが t/1 の場合、外部 MQTT サービスには pub/t/1 として転送されます。
    • QoS:メッセージパブリッシュの QoS。ドロップダウンから 012${qos} を選択可能。ここでは元メッセージの QoS を引き継ぐため ${qos} を選択します。
    • Retain:メッセージをリテインとしてパブリッシュするか。truefalse${flags.retain} から選択可能。ここでは元メッセージのリテインフラグを引き継ぐため ${flags.retain} を選択します。
    • Payload:転送メッセージのペイロード生成テンプレート。デフォルトは空欄でルール出力結果を転送。ここでは ${payload} を入力し、ペイロードのみを転送します。
  9. フォールバックアクション(任意):メッセージ配信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義できます。詳細は フォールバックアクション を参照してください。

  10. その他の設定はデフォルトのままにして、Create ボタンをクリックし Sink 作成を完了します。作成後、ルール作成ページに戻り、新しい Sink がルールのアクション出力に追加されます。

  11. ルール作成ページ下部の Create ボタンをクリックし、ルール作成を完了します。

これでルールが正常に作成されました。Integration -> Rules ページで新規ルールを確認でき、Actions(Sink) タブで新しい MQTT ブローカー Sink を確認できます。

また、Integration -> Flow Designer をクリックするとトポロジーが表示され、t/# トピックのメッセージがルール my_rule によって処理され、リモート MQTT ブローカーに送信される様子が視覚的に確認できます。

MQTT ブローカー Sink を使ったルールのテスト

  1. ルール作成ページ下部の Create ボタンをクリックし、ルール作成を完了します。

これでルールの作成が完了しました。Integration -> Rules ページで新規ルールを確認できます。Actions(Sink) タブをクリックすると、新しい MQTT ブローカー Sink が表示されます。

また、Integration -> Flow Designer をクリックするとトポロジーが表示され、t/# トピックのメッセージがルール my_rule を経由してリモート MQTT ブローカーに送信される様子を視覚的に確認できます。

MQTT ブローカー Sink を使ったルールのテスト

MQTTX CLI を使い、EMQX の t/# トピックから外部 MQTT サービスの pub/${topic} トピックへメッセージがブリッジされるルールをテストできます。EMQX の t/1 トピックにメッセージをパブリッシュすると、外部 MQTT サービスの pub/t/1 トピックに転送されるはずです。

  1. 外部 MQTT サービスで pub/# トピックをサブスクライブします。

    bash
    mqttx sub -t pub/# -q 1 -h broker.emqx.io -v
  2. MQTTX で t/1 トピックにメッセージをパブリッシュします。

    bash
    mqttx pub -t t/1 -m "hello world" -r
  3. MQTTX で pub/t/1 トピックをサブスクライブすると、メッセージが受信でき、EMQX から外部 MQTT サービスへ正常に転送されたことが確認できます。

    bash
    [2024-1-31] [16:43:13] › topic: pub/t/1
    payload: hello world
  4. ステップ1を繰り返すと、pub/t/1 トピックのリテインメッセージも MQTTX で受信できます。

    bash
    [2024-1-31] [16:44:29] › topic: pub/t/1
    payload: hello world
    retain: true

MQTT ブローカー Source を使ったルールの作成

ここでは、リモート MQTT サービスからローカル EMQX へデータを転送するルールの作成方法を示します。MQTT Source とメッセージ再パブリッシュアクションを作成し、リモート MQTT サービスのサブスクライブと受信データのローカル転送を実現します。

MQTT ブローカー Source の作成とルールへの追加

  1. ダッシュボードの Integration -> Rules ページに移動します。

  2. ページ右上の Create をクリックします。

  3. ルールIDに my_rule_source を入力します。

  4. ルールのトリガーソース(データ入力)を設定します。ページ右側の Data Inputs タブで、デフォルトの Message タイプ入力を削除し、Add Input をクリックして MQTT Source を新規作成します。

  5. Add Input ポップアップで、Input Type ドロップダウンから MQTT Broker を選択し、Source ドロップダウンはデフォルトの Create Source のままにします。これにより新しい Source が作成され、ルールに追加されます。

  6. Source の名前と説明を入力します。

  7. 先ほど作成した my_mqtt_bridge コネクターをドロップダウンから選択します。新規作成する場合は、ドロップダウン横の作成ボタンをクリックし、コネクターの作成 の設定パラメータを利用してください。

  8. 外部 MQTT サービスから EMQX へのサブスクライブを完了するために、Source 情報を設定します。

    • Topic:サブスクライブするトピック。+# のワイルドカード使用可。

      TIP

      EMQX がクラスター運用中、またはコネクターが接続プール設定されている場合、重複メッセージを避けるため共有サブスクリプションを使用する必要があります。

      ここでは $share/1/f/# を入力し、f/# にマッチする全メッセージをサブスクライブします。

    • QoS:サブスクライブの QoS。ドロップダウンから 0 または 1 を選択。

  9. その他の設定はデフォルトのままにして、Create ボタンをクリックし Source 作成を完了します。ルールのデータ入力に Source が追加され、ルール SQL は以下のように変更されます。

    sql
    SELECT
      *
    FROM
      "$bridges/mqtt:my_source"

    ルール SQL は MQTT Source から以下のフィールドを抽出可能で、SQL を調整してデータ処理を行えます。今回の例ではデフォルトの SQL で十分です。

    フィールド名説明
    topic元メッセージのトピック
    server接続された Source のサーバーアドレス
    retainメッセージがリテインか否か。値は false
    qosメッセージの QoS(サービス品質)
    pub_propsMQTT 5.0 メッセージプロパティオブジェクト(ユーザープロパティペア等含む)
    pub_props.User-Property-Pairsキーと値のペアを含むユーザープロパティ配列。例:{"key":"foo", "value":"bar"}
    pub_props.User-Propertyキーと値のペアを含むユーザープロパティオブジェクト。例:{"foo":"bar"}
    pub_props.*その他のメッセージプロパティのキーと値。例:Content-Type: JSON
    payloadメッセージ内容
    message_received_atメッセージ受信タイムスタンプ(ミリ秒)
    idメッセージID
    dupメッセージが重複か否か

これで MQTT Source の作成は完了しましたが、サブスクライブしたデータは直接 EMQX ローカルにパブリッシュされません。次に、Source がサブスクライブしたメッセージを EMQX ローカルに転送するためのメッセージ再パブリッシュアクションを作成します。

再パブリッシュアクションの作成

  1. ルール作成ページの右側 Action Outputs タブに切り替え、Add Action ボタンをクリックします。Type of Action ドロップダウンから Republish アクションを選択します。

  2. メッセージ再パブリッシュの設定を行います。

    • Topicsub/${topic} を入力し、元トピックに sub/ プレフィックスを付加して転送します。例えば元トピックが f/1 の場合、EMQX に転送されるトピックは sub/f/1 となります。
    • QoS012${qos} から選択可能。プレースホルダーを使って他フィールドから QoS を設定可能です。ここでは元メッセージの QoS を引き継ぐため ${qos} を選択します。
    • Retaintrue または false を選択し、メッセージをリテインとしてパブリッシュするか指定します。プレースホルダーで他フィールドからリテインフラグを設定可能です。ここでは false を選択します。
      • データソースが MQTT Source のため、${flags.retain} は使用できません。
      • ${retain} を指定すると、外部 MQTT サービスのリテンション機構でリテインされたメッセージのみリテインフラグを引き継ぎます。元メッセージが EMQX にパブリッシュされた際のリテインフラグは反映されません。
    • Payload:転送メッセージのペイロード生成テンプレート。デフォルトは空欄でルール出力結果を転送。ここでは ${payload} を入力し、ペイロードのみを転送します。
  3. Add ボタンをクリックしアクション作成を完了します。ルール作成ページに戻り、新しいアクションが Action Outputs タブに追加されます。

  4. ルール作成ページ下部の Create ボタンをクリックし、ルール作成を完了します。

これでルールが正常に作成されました。Integration -> Rules ページで新規ルールを確認でき、Source タブで新しい MQTT Source を確認できます。

また、Integration -> Flow Designer をクリックするとトポロジーが表示され、MQTT Source からのメッセージが再パブリッシュアクションを通じて sub/${topic} に転送される様子が視覚的に確認できます。

MQTT ブローカー Source を使ったルールのテスト

MQTT ブローカー Source を使ったルールのテスト

MQTTX CLI を使い、外部 MQTT サービスの f/# トピックから EMQX の sub/${topic} トピックへメッセージがブリッジされるルールをテストできます。外部 MQTT サービスの f/1 トピックにメッセージをパブリッシュすると、EMQX の sub/f/1 トピックに転送されるはずです。

  1. EMQX の sub/# トピックをサブスクライブします。

    bash
    mqttx sub -t sub/# -q 1 -v
  2. MQTTX を使い、外部 MQTT サービスの f/1 トピックにメッセージをパブリッシュします。

    bash
    mqttx pub -t f/1 -m "I'm from broker.emqx.io" -r -h broker.emqx.io
  3. MQTTX で sub/f/1 トピックにメッセージが届くのを確認できれば、外部 MQTT サービスから EMQX への正常な転送が確認できます。

    bash
    [2024-1-31] [16:49:22] › topic: sub/f/1
    payload: I'm from broker.emqx.io