Skip to content

他の MQTT サービスとのブリッジ

MQTT ブローカーのデータ統合により、EMQX は別の EMQX クラスターや他の MQTT サービスとメッセージブリッジで接続でき、ネットワークやサービスを跨いだデータの相互作用と通信を実現します。本ページでは、EMQX プラットフォームにおける MQTT メッセージブリッジの動作原理を紹介し、メッセージブリッジの作成および検証方法について実践的なガイダンスを提供します。

動作原理

ブリッジング中、EMQX プラットフォームはターゲットサービスとクライアントとして MQTT 接続を確立し、パブリッシュ・サブスクライブモデルを通じて双方向のメッセージ伝送を実現します。

  • 送信メッセージ:ローカルトピックからリモート MQTT サービスの指定トピックへメッセージをパブリッシュする。
  • 受信メッセージ:リモート MQTT サービスのトピックをサブスクライブし、そのメッセージを現在のデプロイメントに転送する。

EMQX プラットフォームは同一接続上で複数のブリッジングルールを設定可能で、それぞれ異なるトピックマッピングやメッセージ変換ルールを持ち、メッセージルーティングに類似した機能を実現します。ブリッジング中はルールエンジンを介してメッセージのフィルタリング、拡充、変換処理も行えます。

以下の図は、EMQX プラットフォームと他の MQTT サービス間のデータ統合の典型的なアーキテクチャを示しています。

EMQX Platform-MQTT data integration

特長と利点

MQTT ブローカーのデータ統合は以下の特長と利点を備えています。

  • 幅広い互換性:標準 MQTT プロトコルを使用しており、AWS IoT Core、Azure IoT Hub を含む多様な IoT プラットフォームとのブリッジを可能にします。また、オープンソースや他の業界標準 MQTT ブローカーや IoT プラットフォームもサポートし、多様なデバイスやプラットフォームとのシームレスな統合と通信を実現します。
  • 柔軟なトピックマッピング:MQTT のパブリッシュ・サブスクライブモデルに基づき、トピックにプレフィックスを追加したり、クライアントのコンテキスト情報(クライアントID、ユーザー名など)を用いて動的にトピックを構築したりする柔軟なトピックマッピングを実装しています。これにより、特定のニーズに応じたメッセージのカスタマイズ処理やルーティングが可能です。
  • 高性能:接続プールや共有サブスクリプションなどのパフォーマンス最適化オプションを提供し、個々のブリッジクライアントの負荷を軽減し、ブリッジングのレイテンシ低減とメッセージスループット向上を実現します。これらの最適化により、システム全体のパフォーマンスとスケーラビリティが向上します。
  • ペイロード変換:SQL ルールを定義してメッセージペイロードの処理を可能にします。これにより、メッセージ伝送中にデータ抽出、フィルタリング、拡充、変換などの操作をペイロードに対して実行できます。例えば、ペイロードからリアルタイムのメトリクスを抽出し、変換・加工してからリモート MQTT ブローカーに届けることが可能です。
  • メトリクス監視:各 MQTT ブローカーのデータ統合に対してランタイムメトリクス監視を提供し、総メッセージ数、成功/失敗数、現在のレートなどを閲覧可能です。これにより、MQTT ブローカーのデータ統合のパフォーマンスと健全性をリアルタイムで監視・評価できます。

はじめる前に

前提条件

MQTT 接続情報の準備

MQTT ブローカーのデータ統合を作成する前に、リモート MQTT サービスの接続情報を取得する必要があります。主な項目は以下の通りです。

  • MQTT サービスアドレス:ターゲット MQTT サービスのアドレスとポート。例:broker.emqx.io:1883
  • ユーザー名:接続に必要なユーザー名。認証不要の場合は空欄可。
  • パスワード:接続に必要なパスワード。認証不要の場合は空欄可。
  • プロトコルタイプ:ターゲットサービスが TLS を有効にしているか、TCP/TLS 上の MQTT を使用しているかを確認してください。EMQX プラットフォームの MQTT ブリッジは現時点で MQTT over WebSocket や MQTT over QUIC などのプロトコルはサポートしていません。
  • プロトコルバージョン:ターゲット MQTT サービスが使用するプロトコルバージョン。EMQX プラットフォームは MQTT 3.1、3.1.1、MQTT 5.0 をサポートしています。

データ統合は EMQX プラットフォームや他の標準 MQTT サーバーとの高い互換性とサポートを提供します。その他のタイプの MQTT サービスに接続する場合は、該当サービスのドキュメントを参照し接続情報を取得してください。一般的に多くの IoT プラットフォームは標準的な MQTT アクセス方法を提供しており、それに基づいてデバイス情報を上記の MQTT 接続情報に変換できます。

クラスター モードに関する注意

EMQX プラットフォームがクラスター モードで稼働している場合や接続プールが有効な場合、同一のクライアントIDで複数ノードが同じ MQTT サービスに接続するとデバイス競合が発生しやすくなります。そのため、MQTT メッセージブリッジは現時点で固定クライアントIDの設定をサポートしていません。

ネットワーク設定

EMQX Platform上でデプロイメント(EMQXクラスター)を作成し、ネットワークを構成する必要があります。

  • 専用デプロイメントユーザーの場合:まず、VPCピアリング接続を作成してください。ピアリング接続が確立された後、内部ネットワークのIPを介してプラットフォームコンソールにログインし、対象のコネクターにアクセスできます。あるいは、NATゲートウェイを設定して、パブリックIP経由で対象のコネクターにアクセスすることも可能です。
  • BYOC(Bring Your Own Cloud)デプロイメントユーザーの場合:BYOCがデプロイされているVPCと対象コネクターが存在するVPC間でピアリング接続を確立してください。ピアリング接続作成後、内部ネットワークのIPを介して対象コネクターにアクセスできます。パブリックIPアドレス経由でリソースにアクセスする必要がある場合は、パブリッククラウドコンソールでBYOCがデプロイされているVPCに対してNATゲートウェイを構成してください。

コネクターの作成

ここでは、EMQX のオンライン MQTT サーバーを例に、リモート MQTT サーバーとの接続設定方法を案内します。

データ統合のルールを作成する前に、MQTT サービスにアクセスするための MQTT ブローカーコネクターを作成する必要があります。このコネクターは MQTT (Sink) と MQTT (Source) の両方で使用可能です。

  1. デプロイメントメニューから データ統合 を選択し、データ転送 カテゴリの中の MQTT (Sink) を選択します。すでにコネクターを作成済みの場合は、新規コネクター を選択し、同様に MQTT (Sink) を選択します。

  2. コネクター名:システムが自動的にコネクター名を生成します。

  3. 接続情報を設定します:

    • MQTT ブローカー:TCP/TLS 上の MQTT のみサポート。broker.emqx.io:1883 と設定します。
    • ClientID プレフィックス:空欄のままで構いません。実際の運用ではクライアントIDプレフィックスを指定するとクライアント管理が容易になります。EMQX プラットフォームはクライアントIDプレフィックスと接続プールのサイズに基づきクライアントIDを自動生成します。
    • ユーザー名パスワード:このサーバーは認証不要のため空欄のままで構いません。
  4. その他の設定はデフォルトのままにします。

  5. テスト ボタンをクリックし、MQTT ブローカーにアクセス可能であれば connector available のメッセージが返されます。

  6. 新規 ボタンをクリックして作成を完了します。

これで、このコネクターを基にデータブリッジルールを作成できます。

接続プールとクライアントID生成規則

EMQX プラットフォームは複数のクライアントが同時にブリッジ先 MQTT サービスに接続することを可能にします。コネクター作成時に MQTT クライアント接続プールを設定し、そのサイズを指定することでプール内のクライアント接続数を示します。接続プールはサーバーリソースを最大限活用し、メッセージスループットや同時処理性能を向上させ、高負荷・高同時接続のシナリオに対応します。

MQTT プロトコルでは MQTT サーバーに接続するクライアントはユニークなクライアントIDを持つ必要があり、EMQX プラットフォームはクラスター展開可能なため、MQTT ブリッジ内の各クライアントにユニークなクライアントIDを割り当てます。EMQX プラットフォームは以下のパターンでクライアントIDを自動生成します。

bash
[Client ID Prefix]:{Connector Name}{8桁のランダム文字列}:{プール内の接続シーケンス番号}

例として、クライアントIDプレフィックスが myprefix、コネクター名が foo の場合、実際のクライアントIDは以下のようになります。

bash
myprefix:foo2bd61c44:1

MQTT (Sink) でルールを作成する

ここでは、リモート MQTT サービスへ転送するデータを指定するルールの作成方法を示します。

  1. ルールエリアの 新規ルール をクリックするか、作成したコネクターの 操作 列にある新規ルールアイコンをクリックします。

  2. 使用する機能に基づき、SQL エディターでルールを設定します。ここではクライアントが temp_hum/emqx トピックに温湿度メッセージを送信した際にエンジンをトリガーする例を示します。SQL は以下の通りです。

    sql
     SELECT
       topic,
       payload
     FROM
       "temp_hum/emqx"

    TIP

    初心者の方は SQL Examples をクリックし、Enable Test を有効にして SQL ルールの学習とテストを行うことを推奨します。

  3. 次へ をクリックしてアクションを追加します。

  4. コネクター ドロップダウンから先ほど作成したコネクターを選択します。

  5. EMQX プラットフォームから外部 MQTT サービスへメッセージをパブリッシュするための情報を設定します。

    • トピック:外部 MQTT サービスにパブリッシュするトピック。${var} プレースホルダーをサポートします。ここでは pub/${topic} と入力し、元のトピックに pub/ プレフィックスを付けて転送します。例えば元のメッセージトピックが t/1 の場合、外部 MQTT サービスには pub/t/1 として転送されます。
    • QoS:メッセージパブリッシュの QoS。ドロップダウンから 012${qos} を選択可能で、他のフィールドから QoS を設定するプレースホルダーも入力できます。ここでは元のメッセージの QoS に従うため ${qos} を選択します。
    • Retain:メッセージをリテインとしてパブリッシュするかどうかを truefalse${flags.retain} から選択可能で、他のフィールドからリテインフラグを設定するプレースホルダーも入力できます。ここでは元のメッセージのリテインフラグに従うため ${flags.retain} を選択します。
    • ペイロード:転送メッセージのペイロード生成に使うテンプレート。デフォルトは空欄でルール出力結果を転送します。ここでは ${payload} と入力し、ペイロードのみを転送します。
  6. その他の設定はデフォルトのままにし、確認 ボタンをクリックしてルール作成を完了します。

  7. 新規ルール成功 のポップアップで ルールに戻る をクリックし、データ統合の設定チェーンを完了します。

作成成功後はルール作成ページに戻り、ルール リストに新規ルールが表示されます。操作 リストにはデータ転送アクション(Sink)が表示されます。

ルールのテスト

温湿度データの報告をシミュレートするために MQTTX の使用を推奨しますが、他のクライアントでも構いません。

  1. 外部 MQTT サービスで pub/# トピックをサブスクライブします。

    bash
    mqttx sub -t pub/# -q 1 -h broker.emqx.io -v
  2. MQTTX でデプロイメントに接続し、以下のトピックにメッセージを送信します。

    • トピック:temp_hum/emqx

    • クライアントID:test_client

    • ペイロード:

      json
      {
        "temp": "27.5",
        "hum": "41.8"
      }
  3. MQTTX で pub/temp_hum/emqx トピックをサブスクライブし、メッセージを受信できれば、EMQX プラットフォームから外部 MQTT サービスへのメッセージ転送が成功したことを示します。

    bash
    [2024-3-21] [10:43:13] › topic: pub/temp_hum/emqx
    payload:
    { "temp": "27.5", "hum": "41.8"}

MQTT (Source) でルールを作成する

ここでは、リモート MQTT サービスから現在のデプロイメントへデータを転送するルールの作成方法を示します。リモート MQTT サービスからのサブスクライブとサブスクライブしたデータの転送を実現するために、MQTT (Source) とメッセージ再パブリッシュアクションの両方を作成する必要があります。

MQTT (Source) コネクターの作成方法は MQTT (Sink) コネクターと同様です。詳細はコネクターの作成を参照してください。

  1. コネクター一覧の 操作 列にある新規ルールアイコンをクリックするか、ルール一覧の 新規ルール をクリックして 新規ルール ページに入ります。

  2. ソースデータルールでは、まず入力アクションを設定します。ルール編集ページで自動的に入力アクション設定がポップアップするか、パネル右側の アクション(Source) -> 新規アクション を選択し、MQTT (Source) を選んで 次へ をクリックします。

    • 先ほど作成したコネクターを選択します。
    • トピック:リモート MQTT からサブスクライブするトピックを入力します。例として temp_hum/emqx を入力します。
    • QoS:メッセージの QoS を選択します。

    確認 ボタンをクリックして設定を完了します。

  3. SQL エディター は以下のようにデータソースフィールドを更新します。

    sql
    SELECT
      *
    FROM
      "$bridges/mqtt:source-d1f51e81"
  4. 次へ をクリックして出力アクションの作成を開始します。

  5. 新規出力アクションで Republish を選択します。

  6. 出力アクション情報を設定します。

    • トピック:転送先 MQTT トピック。${var} 形式のプレースホルダーをサポートします。ここでは sub/${topic} と入力し、元のトピックに sub/ プレフィックスを付けて転送します。例えば元のメッセージトピックが t/1 の場合、転送先トピックは sub/t/1 となります。
    • QoS:メッセージパブリッシュの QoS。012${qos} から選択可能で、他のフィールドから QoS を設定するプレースホルダーも入力できます。ここでは元のメッセージの QoS に従うため ${qos} を選択します。
    • Retain:メッセージをリテインとしてパブリッシュするかどうかを truefalse${flags.retain} から選択可能で、他のフィールドからリテインフラグを設定するプレースホルダーも入力できます。ここでは元のメッセージのリテインフラグに従うため ${flags.retain} を選択します。
    • メッセージテンプレート:転送メッセージのペイロード生成テンプレート。デフォルトは空欄でルール出力結果を転送します。ここでは ${payload} と入力し、ペイロードのみを転送します。
  7. その他の設定はデフォルトのままにし、確認 ボタンをクリックして出力アクションの作成を完了します。

作成成功後はルール作成ページに戻ります。Republish アクションは現時点で アクション(Source) に表示されません。必要に応じてルール編集ボタンをクリックすると、ルール設定の下部に Republish 出力アクションが表示されます。

ルールのテスト

作成したルールは、外部 MQTT サービスの temp_hum/emqx トピックから現在のデプロイメントの sub/${topic} トピックへメッセージをブリッジする設定です。したがって、外部 MQTT サービスの temp_hum/emqx トピックにメッセージをパブリッシュすると、デプロイメントの sub/temp_hum/emqx トピックに転送されます。

以下の手順は MQTTX を使い、外部 MQTT サービスにメッセージを送信し、転送先トピックをサブスクライブしてメッセージを受信する方法を示します。

  1. MQTTX で現在のデプロイメントのトピック sub/# をサブスクライブします。

  2. MQTTX で外部 MQTT サービスの temp_hum/emqx トピックにメッセージをパブリッシュします。

    json
    {
       "temp": 55,
       "hum": 32
    }
  3. MQTTX は現在のデプロイメントの sub/temp_hum/emqx トピックでメッセージを受信します。

    json
    {
       "temp": 55,
       "hum": 32
    }