Skip to content

他の MQTT サービスとのブリッジ

MQTT ブローカーのデータ統合は、EMQX に別の EMQX クラスターや他の MQTT サービスへの接続機能を提供し、メッセージブリッジを実現します。これにより、ネットワークやサービスを跨いだデータの相互作用と通信が可能になります。本ページでは、EMQX Cloud における MQTT メッセージブリッジの動作原理を紹介し、メッセージブリッジの作成および検証の実践的な手順を説明します。

動作原理

ブリッジング中、EMQX Cloud はターゲットサービスとクライアントとして MQTT 接続を確立し、パブリッシュ・サブスクライブモデルを通じて双方向のメッセージ送受信を実現します。

  • 送信メッセージ:ローカルトピックからリモート MQTT サービスの指定トピックへメッセージをパブリッシュする。
  • 受信メッセージ:リモート MQTT サービスのトピックをサブスクライブし、そのメッセージを現在のデプロイメントに転送する。

EMQX Cloud は同一接続上で複数のブリッジングルールを設定可能で、それぞれ異なるトピックマッピングやメッセージ変換ルールを持つことができ、メッセージルーティングに類似した機能を実現します。ブリッジング中には、ルールエンジンを介してメッセージのフィルタリング、拡充、変換処理を行い、転送前にメッセージを加工することも可能です。

以下の図は、EMQX Cloud と他の MQTT サービス間のデータ統合の典型的なアーキテクチャを示しています。

EMQX Cloud-MQTT data integration

特長とメリット

MQTT ブローカーのデータ統合には以下の特長とメリットがあります。

  • 広範な互換性:標準 MQTT プロトコルを利用し、AWS IoT Core、Azure IoT Hubs などの主要な IoT プラットフォームやオープンソース、業界標準の MQTT ブローカーと統合可能です。
  • 柔軟なトピックマッピング:トピックにプレフィックスを追加したり、クライアントのコンテキスト情報(クライアントID、ユーザー名など)を用いて動的にトピックを構築し、カスタマイズされたメッセージルーティングを実現します。
  • 高性能:コネクションプーリングや共有サブスクリプションなどの最適化により、スループットの向上とレイテンシの低減を図っています。
  • ペイロード変換:SQL ベースでメッセージペイロードの抽出、フィルタリング、拡充、変換処理を行い、転送前にメッセージを加工できます。
  • メトリクス監視:メッセージ数、成功率・失敗率、スループットなどのリアルタイムメトリクスを提供し、統合の健全性とパフォーマンスを監視可能です。

はじめる前に

前提条件

MQTT 接続情報の準備

MQTT ブローカーのデータ統合を作成する前に、リモート MQTT サービスの接続情報を取得しておく必要があります。主な項目は以下の通りです。

  • MQTT サービスアドレス:対象 MQTT サービスのアドレスとポート(例:broker.emqx.io:1883)。
  • ユーザー名:接続に必要なユーザー名。認証不要の場合は空欄で構いません。
  • パスワード:接続に必要なパスワード。認証不要の場合は空欄で構いません。
  • プロトコルタイプ:対象サービスが TLS を有効にしているか、TCP/TLS 上の MQTT を使用しているかを確認してください。EMQX Cloud の MQTT ブリッジは現時点で MQTT over WebSocket や MQTT over QUIC には対応していません。
  • プロトコルバージョン:対象 MQTT サービスが使用するプロトコルバージョン。EMQX Cloud は MQTT 3.1、3.1.1、MQTT 5.0 をサポートしています。

データ統合は EMQX Cloud や他の標準 MQTT サーバーとの高い互換性とサポートを提供します。その他の種類の MQTT サービスに接続する場合は、該当サービスのドキュメントを参照し接続情報を取得してください。一般的に多くの IoT プラットフォームは標準 MQTT アクセス方法を提供しており、それに基づきデバイス情報を上記の MQTT 接続情報に変換できます。

クラスター モードの注意点

EMQX Cloud がクラスター モードで稼働している場合や接続プールを有効にしている場合、同一クライアントIDを用いて複数ノードが同じ MQTT サービスに接続するとデバイス競合が発生しやすくなります。そのため、MQTT メッセージブリッジでは固定クライアントIDの設定は現在サポートしていません。

ネットワーク設定

データ統合を構成する前に、EMQX Cloudのデプロイメントを作成し、EMQX Cloudと対象サービス間のネットワーク接続を確立していることを確認してください。

  • Dedicated Flexデプロイメントの場合

    EMQX CloudのVPCと対象サービスのVPC間でVPCピアリング接続を作成します。ピアリング接続が確立されると、EMQX Cloudは対象サービスのプライベートIPアドレスを介してアクセス可能になります。

    パブリックIP経由でのアクセスが必要な場合は、NATゲートウェイを構成してアウトバウンド接続を有効にしてください。

  • BYOC(Bring Your Own Cloud)デプロイメントの場合

    BYOCデプロイメントが稼働しているVPCと対象サービスをホストするVPC間でVPCピアリング接続を作成します。ピアリングが確立されると、対象サービスのプライベートIPアドレスを介してアクセス可能になります。

    対象サービスにパブリックIP経由でアクセスする必要がある場合は、クラウドプロバイダーのコンソールを使用してBYOC VPCにNATゲートウェイを構成してください。

コネクターの作成

ここでは、EMQX のオンライン MQTT サーバーを例に、リモート MQTT サーバーとの接続設定方法を案内します。

データ統合のルールを作成する前に、MQTT サービスにアクセスするための MQTT ブローカーコネクターを作成する必要があります。コネクターは MQTT(Sink)および MQTT(Source)の両方で利用可能です。

  1. デプロイメントメニューから データ統合 を選択し、データ転送 カテゴリの中の MQTT (Sink) を選択します。既にコネクターを作成済みの場合は、新規コネクター を選択し、同様に MQTT (Sink) を選択します。

  2. コネクター名:システムが自動でコネクター名を生成します。

  3. 接続情報を設定します:

    • MQTT ブローカー:TCP/TLS 上の MQTT のみサポート。ここでは broker.emqx.io:1883 を設定します。

    • ClientID プレフィックス:空欄でも構いません。実際の運用ではクライアントIDプレフィックスを指定するとクライアント管理が容易になります。EMQX Cloud はクライアントIDプレフィックスと接続プールサイズに基づきクライアントIDを自動生成します。詳細は 接続プールとクライアントID生成ルール を参照してください。

    • ユーザー名パスワード:このサーバーでは認証不要のため空欄で構いません。

    • キープアライブ:希望するキープアライブ間隔を指定します。

    • MQTT バージョン:ブローカー接続に適したバージョンを選択します。

    • 静的クライアントIDエントリー:(高度な設定)Azure IoT Hubs などのサービスに接続する際に安定した接続を確保するため、静的クライアントIDを設定できます。詳細は 静的クライアントIDの設定 を参照してください。

      TIP

      静的クライアントIDエントリーが定義されている場合、静的クライアントIDが割り当てられた EMQX ノードのみが MQTT 接続を開始します。

  4. その他の設定はデフォルトのままにします。

  5. テスト ボタンをクリックし、MQTT ブローカーにアクセス可能であれば connector available のメッセージが返されます。

  6. 新規作成 ボタンをクリックして作成を完了します。

これで、このコネクターを基にデータブリッジルールを作成できます。

接続プールとクライアントID生成ルール

EMQX Cloud は複数のクライアントが同時にブリッジ先 MQTT サービスに接続可能です。コネクター作成時に MQTT クライアント接続プールを設定し、そのサイズを指定できます。接続プールはサーバーリソースを最大限に活用し、より高いメッセージスループットと優れた同時処理性能を実現します。これは高負荷・高同時接続のシナリオで重要です。

MQTT プロトコルでは、MQTT サーバーに接続するクライアントはユニークなクライアントIDを持つ必要があります。EMQX Cloud はクラスターとしてデプロイ可能なため、MQTT ブリッジの各クライアントには固有のクライアントIDが割り当てられます。EMQX Cloud は以下のパターンでクライアントIDを自動生成します。

bash
[Client ID Prefix]:{Connector Name}{8桁のランダム文字列}:{接続プール内の接続番号}

例えば、クライアントIDプレフィックスが myprefix、コネクター名が foo の場合、実際のクライアントIDは以下のようになります。

bash
myprefix:foo2bd61c44:1

静的クライアントIDの設定

統合で使用できるクライアントIDが限られている場合、コネクター設定時に個別のノードに静的クライアントIDセットを割り当てることが可能です。静的クライアントIDを設定するには、EMQX クラスター内の各ノードに対してクライアントIDのリストを提供します。各クライアントIDに対応するユーザー名とパスワードも指定できます。これは Azure IoT Hubs のように、各デバイス(クライアントID)に固有の認証情報が必要なシナリオで特に有用です。

静的クライアントIDを設定する手順は以下の通りです。

  1. コネクター作成時に 詳細設定 をクリックします。

  2. 静的 ClientId エントリー セクションで 追加 ボタンを押し、新しい静的クライアントIDエントリーを追加します。必要に応じて複数のノードに対して複数のエントリーを追加可能です。

  3. 各エントリーに以下の項目を入力します。

    • ノード名:クライアントIDを割り当てるノード名を指定します。例:emqx@10.0.0.1。現在のデプロイメントの EMQX ノード名は チケット提出 により取得してください。
    • クライアントID:静的クライアントIDを入力します。例:device1。ノードに複数のクライアントIDを追加する場合は 追加 をクリックしてください。
      • ユーザー名:(任意)このクライアントIDに対応する認証用ユーザー名を入力します。
      • パスワード:(任意)このクライアントIDに対応するパスワードを入力します。プラットフォームによりデバイス固有のキー、シークレット、証明書などが用いられます(例:Azure IoT Hubsの認証キー)。

    設定例

    ノードクライアントIDユーザー名(任意)パスワード(任意)
    emqx@10.0.0.1clientid1username1secret1
    clientid3
    emqx@10.0.0.2clientid2username2
    emqx@10.0.0.3clientid4
    clientid5

静的クライアントIDを設定した場合、これらのクライアントIDを用いた MQTT 接続のみが開始されます。pool_sizeclientid_prefix など動的クライアントIDに関する設定は無効になります。

MQTT (Sink) でルールを作成する

このセクションでは、リモート MQTT サービスへ転送するデータを指定するルールの作成方法を示します。

  1. ルールエリアの 新規ルール をクリックするか、作成したコネクターの 操作 列にある新規ルールアイコンをクリックします。

  2. 使用する機能に基づき、SQL エディターでルールを設定します。ここではクライアントが temp_hum/emqx トピックに温湿度メッセージを送信した際にエンジンをトリガーする例を示します。SQL は以下のようになります。

    sql
     SELECT
       topic,
       payload
     FROM
       "temp_hum/emqx"

    TIP

    初心者の方は SQL ExamplesTry It Out をクリックして、SQL ルールの学習とテストを行うことを推奨します。

  3. 次へ をクリックしてアクションを追加します。

  4. コネクター ドロップダウンから先ほど作成したコネクターを選択します。

  5. EMQX Cloud から外部 MQTT サービスへメッセージをパブリッシュするための情報を設定します。

    • トピック:外部 MQTT サービスにパブリッシュするトピック。${var} プレースホルダーをサポートします。ここでは pub/${topic} と入力し、元のトピックに pub/ プレフィックスを付けて転送します。例えば元のメッセージトピックが t/1 の場合、外部 MQTT サービスに転送されるトピックは pub/t/1 となります。
    • QoS:メッセージパブリッシュの QoS。ドロップダウンから 012${qos} を選択可能です。ここでは ${qos} を選び、元メッセージの QoS に従います。
    • Retain:メッセージをリテインとしてパブリッシュするかどうか。truefalse${flags.retain} を選択可能で、他のフィールドからリテインフラグを設定するプレースホルダーも使用可能です。ここでは ${flags.retain} を選択し、元メッセージのリテインフラグに従います。
    • ペイロード:転送メッセージのペイロード生成用テンプレート。デフォルトは空欄でルールの出力結果を転送します。ここでは ${payload} と入力し、ペイロードのみを転送します。
  6. 他の設定はデフォルトのままにして、確定 ボタンをクリックしルール作成を完了します。

  7. 新規ルール作成成功 のポップアップで ルールに戻る をクリックし、データ統合の一連の設定が完了します。

作成成功後、ルール作成ページに戻り、ルール リストに新規ルールが表示されます。操作 リストにはデータ転送アクション(Sink)が表示されます。

ルールのテスト

MQTTX を使って温湿度データの報告をシミュレートすることを推奨しますが、他のクライアントでも構いません。

  1. 外部 MQTT サービスで pub/# トピックをサブスクライブします。

    bash
    mqttx sub -t pub/# -q 1 -h broker.emqx.io -v
  2. MQTTX でデプロイメントに接続し、以下のトピックにメッセージを送信します。

    • トピック:temp_hum/emqx

    • クライアントID:test_client

    • ペイロード:

      json
      {
        "temp": "27.5",
        "hum": "41.8"
      }
  3. MQTTX で pub/temp_hum/emqx トピックをサブスクライブし、メッセージを受信できれば、EMQX Cloud から外部 MQTT サービスへのメッセージ転送が成功したことを示します。

    bash
    [2024-3-21] [10:43:13] › topic: pub/temp_hum/emqx
    payload:
    { "temp": "27.5", "hum": "41.8"}

MQTT (Source) でルールを作成する

このセクションでは、リモート MQTT サービスから現在のデプロイメントへデータを転送するルールの作成方法を示します。リモート MQTT サービスから EMQX へのサブスクライブと、サブスクライブしたデータの転送を実現するために、MQTT (Source) とメッセージ再パブリッシュアクションの両方を作成する必要があります。

MQTT (Source) コネクターの作成方法は MQTT (Sink) コネクターと同様です。詳細は コネクターの作成 を参照してください。

  1. コネクターリストの 操作 列にある新規ルールアイコンをクリックするか、ルールリストの 新規ルール をクリックして 新規ルール 作成ページに入ります。

  2. ソースデータルールではまず入力アクションを設定します。ルール編集ページで自動的に入力アクション設定がポップアップするか、パネル右側の アクション(Source) -> 新規アクション を選択し、MQTT (Source) を選択して 次へ をクリックします。

    • 先ほど作成したコネクターを選択します。
    • トピック:リモート MQTT からサブスクライブするトピックを入力します。例:リモート側でサブスクライブする temp_hum/emqx
    • QoS:メッセージの QoS を選択します。

    確定 ボタンをクリックして設定を完了します。

  3. SQL エディター のデータソースフィールドが以下のように更新されます。

    sql
    SELECT
      *
    FROM
      "$bridges/mqtt:source-d1f51e81"
  4. 次へ をクリックして出力アクションの作成を開始します。

  5. 新規出力アクションで 再パブリッシュ を選択します。

  6. 出力アクション情報を設定します。

    • トピック:転送先 MQTT トピック。${var} プレースホルダーをサポートします。ここでは sub/${topic} と入力し、元トピックに sub/ プレフィックスを付けて転送します。例えば元のメッセージトピックが t/1 の場合、転送先トピックは sub/t/1 となります。
    • QoS:メッセージパブリッシュの QoS。012${qos} から選択可能。プレースホルダーを使い他のフィールドから設定することもできます。ここでは ${qos} を選択し、元メッセージの QoS に従います。
    • Retain:リテインメッセージとしてパブリッシュするかどうか。truefalse${flags.retain} から選択可能。プレースホルダーで他のフィールドから設定も可能。ここでは ${flags.retain} を選択し、元メッセージのリテインフラグに従います。
    • メッセージテンプレート:転送メッセージのペイロード生成テンプレート。デフォルトは空欄でルール出力結果を転送。ここでは ${payload} と入力し、ペイロードのみ転送します。
  7. 他の設定はデフォルトのままにして、確定 ボタンをクリックし出力アクションの作成を完了します。

作成成功後、ルール作成ページに戻ります。再パブリッシュアクションは現在 アクション(Source) に表示されません。必要に応じてルール編集ボタンをクリックすると、ルール設定の下部に再パブリッシュ出力アクションが表示されます。

ルールのテスト

作成したルールは、外部 MQTT サービスの temp_hum/emqx トピックからメッセージを受け取り、現在のデプロイメントの sub/${topic} トピックに転送する設定です。したがって、外部 MQTT サービスの temp_hum/emqx トピックにメッセージをパブリッシュすると、デプロイメントの sub/temp_hum/emqx トピックに転送されます。

以下の手順は、MQTTX を使って外部 MQTT サービスにメッセージを送信し、転送先トピックをサブスクライブしてメッセージを受信する方法を示します。

  1. MQTTX で現在のデプロイメントのトピック sub/# をサブスクライブします。

  2. MQTTX で外部 MQTT サービスの temp_hum/emqx トピックにメッセージをパブリッシュします。

    json
    {
       "temp": 55,
       "hum": 32
    }
  3. MQTTX は現在のデプロイメントの sub/temp_hum/emqx トピックでメッセージを受信します。

    json
    {
       "temp": 55,
       "hum": 32
    }

トラブルシューティング

このセクションでは、MQTT ブローカーのデータ統合で MQTT コネクターを使用する際に発生しやすい問題の解決策を紹介します。

MQTT コネクター画面に「クラスター内ノードの不整合」が表示される

説明

クラスター内のノードで MQTT ブローカーコネクターの状態が不整合に見える問題です。

考えられる原因

  1. リモート MQTT ブローカーの接続制限

    リモート MQTT ブローカーが接続数制限に達しているか、制限を設けており、EMQX クラスターのノードが正常に接続できていない可能性があります。

  2. VPC ピアリング接続の制限

    VPC ピアリング接続を利用している場合、セキュリティグループのルールにより EMQX ノードからリモートブローカーへのアクセスが制限されている可能性があります。

対処方法

  • リモートブローカーの制限の場合

    MQTT コネクター設定で接続プールサイズを現在の半分に減らし、再試行してください。

  • VPC 制限の場合

    EMQX がデプロイされている VPC サブネット(通常は 10.x.x.0/24)からリモートブローカーへのアクセスがセキュリティグループルールで許可されているか確認してください。

  • 問題が解決しない場合は、EMQ サポートへサポートチケットを提出してください。

MQTT コネクター設定の更新・保存時に「Axios Error: timeout」が発生する

説明

共有サブスクリプションを使用している MQTT コネクターの設定更新や保存時にタイムアウトエラーが発生します。

考えられる原因

ソースコネクターに複数の接続プールとサブスクライブトピックが含まれている場合、EMQX は更新時にすべての共有サブスクリプション接続を再確立します。

設定が大規模(例:2ノードで16プール、各クライアントに複数トピック)だと、リモートブローカーは再確立時にサブスクライブ承認(SUBACK)を逐次処理します。

共有サブスクリプションの性能レイテンシにより初期化処理が長時間かかり、コンソールや API リクエストが 30 秒のタイムアウトを超え、timeout エラーとなります。

対処方法

  1. データソースごとにコネクターを分割作成し、例として1コネクターあたり2~4ソースを扱うようにします。
  2. 各コネクターの接続プールサイズを 2 に設定し、再初期化性能を改善します。

MQTT アクションのデータが転送されず、メッセージが期限切れで破棄される

説明

一部の MQTT アクションデータが正常に転送されず、監視でメッセージが期限切れで破棄されていることが確認されます。

考えられる原因

リモートブローカーが現在のスループットに対応できていないか、接続プールサイズが不足している場合、メッセージがバッファキューに蓄積されます。デフォルトでこのキュー内のメッセージは TTL(有効期限)45秒が設定されており、期限切れになると破棄されます。

対処方法

  1. リモートブローカーが現在の TPS(トランザクション毎秒)を処理可能か確認してください。
  2. スループット要件に応じて接続プールサイズを増加させてください。
  3. アクション設定の リクエストタイムアウト 値を調整し、メッセージの早期期限切れを防止してください。