Skip to content

Azure Event Grid MQTTとのブリッジ

Azure Event Grid は、Azure上のフルマネージドイベントルーティングサービスです。そのMQTTブローカー機能により、IoTデバイスとクラウドアプリケーション間で標準ベースの双方向MQTT通信を大規模に実現できます。EMQXはAzure Event Grid用の組み込みコネクターを提供しており、EMQXとAzure Event Grid間でMQTTデータをブリッジし、Azureのクラウドサービスエコシステムとシームレスに統合できます。

本ページでは、EMQXとAzure Event Grid MQTTの連携について、SinkおよびSourceの作成と検証に関する実践的な手順を含めて詳細に解説します。

動作概要

Azure Event Gridのデータ統合は、EMQXのデバイス接続およびメッセージ送信機能と、Azure Event GridのクラウドネイティブMQTTブローカーを組み合わせた、EMQXの標準機能です。EMQXはMQTTクライアントとしてAzure Event Grid MQTTブローカーに接続し、双方向のメッセージ送受信を可能にします。

  • 送信メッセージ(Sink):EMQXはローカルのMQTTトピックからメッセージを取得し、Azure Event Gridの指定トピックにパブリッシュします。
  • 受信メッセージ(Source):EMQXはAzure Event Gridのトピックをサブスクライブし、受信したメッセージをローカルのEMQXトピックに転送します。

以下の図は連携の典型的なアーキテクチャを示しています。

EMQX Integration Azure Event Grid

特長とメリット

Azure Event Gridとのデータ統合は以下の特長とメリットを提供します。

  • 標準ベースのMQTTブリッジ:Azure Event GridはMQTT 3.1.1およびMQTT 5.0をサポートし、EMQXは標準MQTTプロトコルでブリッジ可能なため、MQTT対応の任意のクライアントやサービスと相互運用できます。
  • 双方向データフロー:EMQXからAzure Event Gridへのパブリッシュ(Sink)と、Azure Event GridのトピックのサブスクライブおよびEMQXへの転送(Source)の両方をサポートし、柔軟なIoTデータルーティングを実現します。
  • 安全な接続:Azure Event GridはTLSを必須としています。コネクターはデフォルトでTLSを有効にし、クライアント証明書認証もサポートしており、本番環境での推奨認証方式です。
  • 柔軟なトピックマッピング:EMQXのルールエンジンを通じて、メッセージのフィルタリング、変換、動的トピックマッピングによる特定のAzure Event Gridトピックスペースへのルーティングが可能です。
  • 豊富なAzureエコシステム連携:データがAzure Event Gridに到達すると、Azure Functions、Azure Event Hubs、Azure Storageなど他のAzureサービスへルーティングされ、さらなる処理や分析が行えます。

はじめる前に

前提条件

Azure Event Gridのセットアップ

EMQXでデータ統合を作成する前に、MQTTブローカーサポートが有効なAzure Event Gridネームスペースをセットアップしてください。以下のMicrosoft公式ドキュメントに手順が記載されています。

セットアップ完了後、EMQXでコネクター作成時に必要な以下の接続情報を控えてください。

  • ホスト名:Event GridネームスペースのMQTTブローカーホスト名。形式は <namespace>.ts.<region>.eventgrid.azure.net。ポートは 8883 です。
  • クライアント証明書と秘密鍵:Azure Event Gridはクライアント証明書認証を要求します。ネームスペースから証明書と秘密鍵をエクスポートし、コネクターのTLS設定時に使用します。
  • トピックスペース:Azure Event Gridで設定したトピックスペースと権限バインディング。

TIP

サポートされている認証方式やTLS要件については、Azure Event Gridドキュメントを参照してください。

コネクターの作成

EMQXとAzure Event Gridを接続するコネクターの作成手順を示します。

  1. EMQXダッシュボードで Integration -> Connectors をクリックします。

  2. 画面右上の Create をクリックします。

  3. Create Connector ページで Azure Event Grid を選択し、Next をクリックします。

  4. コネクター名を入力します。英数字の組み合わせで、例として my_azure_event_grid などを指定します。

  5. 接続情報を設定します:

    • Server Host:Event GridネームスペースのMQTTブローカーエンドポイントを入力します。例:myns.northeurope-1.ts.eventgrid.azure.net:8883。デフォルトポートは 8883 です。

    • ClientID Prefix:(任意)EMQXが生成するクライアントIDのプレフィックスを指定します。EMQXは [prefix]:{connector name}{random string}:{pool index} の形式で一意のクライアントIDを自動生成します。詳細は接続プールとクライアントID生成ルールを参照してください。

    • UsernamePassword:空欄のままにします。Azure Event Grid MQTTはユーザー名/パスワード認証を使用しません。

    • Keepalive:キープアライブ間隔(秒)を指定します。デフォルトは 160 秒です。

    • MQTT Version:MQTTプロトコルバージョンを選択します。Azure Event GridはMQTT 3.1.1(v4)およびMQTT 5.0(v5)の両方をサポートします。

    • Static ClientId Entries:(任意)特定のEMQXノード用に静的クライアントIDを設定します。Azure Event Gridで事前登録されたクライアントIDが必要な場合に有効です。詳細は静的クライアントIDの設定を参照してください。

      TIP

      静的クライアントIDが定義されている場合、明示的に割り当てられたEMQXノードのみがMQTT接続を開始します。

    • Clean Start:デフォルトで有効です。有効時は、EMQXがAzure Event Gridに接続するたびに新しいセッションを開始します。

    • Enable TLS:必ず有効にしてください。Azure Event GridはTLSを要求します。クライアント証明書認証を使用する場合は、ここで証明書と秘密鍵を設定します。TLS設定の詳細は外部リソースアクセスのTLS設定を参照してください。

  6. 詳細設定(任意):詳細はコネクターの詳細設定を参照してください。

  7. Createをクリックする前に、Test ConnectivityをクリックしてEMQXからAzure Event Gridへの接続を検証できます。

  8. Createをクリックしてコネクターの作成を完了します。作成成功ダイアログが表示され、ルールを今すぐ作成するか尋ねられます。Create Ruleをクリックすると、コネクターが事前選択された状態でルール作成画面に進みます。Back To Connector Listをクリックすると後でルールを作成できます。

Azure Event Grid Sinkを使ったルールの作成

ローカルEMQXトピック t/# からAzure Event GridへMQTTメッセージを転送するルールの作成手順を示します。

  1. 前のステップで Create Rule をクリックした場合、Add Action パネルが自動的に開き、Type of ActionAzure Event Grid に設定され、コネクターが事前選択されています。ステップ5へ進んでください。

    そうでない場合は、EMQXダッシュボードで Integration -> Rules をクリックし、右上の Create をクリックしてから + Add Action をクリックします。

  2. 左側の SQL Editor にルールIDと以下のSQLを入力し、トピック t/# のメッセージにマッチさせます。

    注意:独自のSQL構文を指定する場合は、Sinkで必要なすべてのフィールドが SELECT 部分に含まれていることを確認してください。

    sql
    SELECT
      *
    FROM
      "t/#"

    TIP

    初心者の方は SQL Examples をクリックし、Enable Test を有効にしてSQLルールを学習・テストしてください。

  3. 右側の Add Action パネルで、Type of Action ドロップダウンから Azure Event Grid を選択します。Action はデフォルトの Create Action のままにします。

  4. Connectors ドロップダウンから先ほど作成した my_azure_event_grid コネクターを選択します。ドロップダウン横のボタンから新規コネクターを作成することも可能です。設定パラメータはコネクターの作成を参照してください。

  5. Sinkの名前と任意の説明を入力します。

  6. Azure Event GridにメッセージをパブリッシュするためのSinkパラメータを設定します:

    • Topic:Azure Event Gridでパブリッシュするトピック。${var} プレースホルダーをサポートします。例:devices/${clientid}/messages と入力すると、クライアントIDに基づいて動的にトピックが設定されます。
    • QoS:パブリッシュメッセージのQoSレベル。012のいずれか、または ${qos} のようなプレースホルダーを選択し、元のメッセージのQoSに従うことも可能です。
    • Retaintruefalse、または ${flags.retain} のようなプレースホルダーを選択してリテインフラグを設定します。
    • Payload:メッセージペイロードのテンプレート。空欄の場合はルール出力全体を転送し、${payload} のように指定するとペイロードのみを転送します。
  7. フォールバックアクション(任意):メッセージ配信失敗時の信頼性向上のために1つ以上のフォールバックアクションを定義できます。詳細はフォールバックアクションを参照してください。

  8. 詳細設定(任意):詳細はSinkの詳細設定を参照してください。

  9. Createをクリックする前に、Test ConnectivityをクリックしてSinkがAzure Event Gridに接続できるかをテストできます。

  10. CreateをクリックしてSinkの設定を完了します。新しいSinkがAction Outputsに追加されます。

  11. Create Ruleページに戻り、設定内容を確認してSaveをクリックしルールを生成します。

これでルールの作成が完了しました。Integration -> Rules ページで新規作成したルールを確認できます。Actions(Sink) タブをクリックすると新しいAzure Event Grid Sinkが表示されます。

また、Integration -> Flow Designer をクリックするとトポロジーを確認でき、トピック t/# のメッセージがルール my_rule によって処理されAzure Event Gridに転送されていることを検証できます。

Azure Event Grid Sourceを使ったルールの作成

Azure Event Gridからのメッセージをサブスクライブし、ローカルEMQXトピックに転送するルールの作成手順を示します。

Azure Event Grid Sourceの作成とルールへの追加

  1. EMQXダッシュボードで Integration -> Rules をクリックし、右上の Create をクリックします。

  2. ルールIDに my_rule_source を入力します。

  3. ルールのトリガーソースを設定します。画面右側の Data Inputs タブでデフォルトの Message 入力を削除し、Add Input をクリックしてAzure Event Grid Sourceを作成します。

  4. Add Input ダイアログで、Input Type ドロップダウンから Azure Event Grid を選択します。Source ドロップダウンはデフォルトの Create Source のままにします。

  5. Sourceの名前と説明を入力します。

  6. ドロップダウンから my_azure_event_grid コネクターを選択します。

  7. Azure Event Gridのサブスクライブ設定を行います:

    • Topic:Azure Event Gridでサブスクライブするトピック。+ および # ワイルドカードをサポートします。

      TIP

      EMQXがクラスター運用中、またはコネクターが接続プール設定の場合は、重複メッセージを避けるため共有サブスクリプションを使用してください。例:$share/group/devices/#

    • QoS:サブスクライブのQoS。0 または 1 を選択します。

  8. Create をクリックしてSourceの作成を完了します。ルールのSQLは自動的に以下のように更新されます。

    sql
    SELECT
      *
    FROM
      "$bridges/azure_event_grid:<source_name>"

Republishアクションの作成

Azure Event Gridからサブスクライブしたメッセージは自動的にローカルEMQXトピックに転送されません。Republishアクションを作成してメッセージをルーティングします。

  1. Create Rule ページ右側の Action Outputs タブに切り替え、Add Action をクリックします。

  2. Type of Action ドロップダウンから Republish を選択します。

  3. Republishパラメータを設定します:

    • Topic:転送先のローカルトピック。例:azure/${topic} と入力すると、元のトピックに azure/ プレフィックスが付加されます。
    • QoS${qos} を選択して元のメッセージQoSに従うか、固定値を設定します。
    • Retainfalse を選択するかプレースホルダーを使用します。
    • Payload${payload} と入力してペイロードのみを転送するか、空欄のままにしてルール出力全体を転送します。
  4. Add をクリックしてアクションを追加し、Save をクリックしてルールを生成します。

ルールのテスト

Sinkのテスト

MQTTX を使ってEMQXのトピック t/1 にメッセージをパブリッシュします。

bash
mqttx pub -i emqx_c -t t/1 -m '{ "msg": "Hello Azure Event Grid" }'

Azure Event Grid Sinkの稼働統計を確認し、新しいマッチ数と送信数がそれぞれ1件増えていることを確認してください。AzureポータルやAzure Event Grid MQTTクライアントでメッセージ受信を検証できます。

Sourceのテスト

  1. ローカルEMQXトピック azure/# をサブスクライブします。

    bash
    mqttx sub -t azure/# -q 1 -v
  2. Azure Event Gridの認証情報で設定したMQTTクライアントを使い、Azure Event Gridにメッセージをパブリッシュします。

    bash
    mqttx pub -t devices/device1/messages -m "hello from azure" \
      -h myns.northeurope-1.ts.eventgrid.azure.net -p 8883 \
      --tls --cert /path/to/client.crt --key /path/to/client.key
  3. EMQXのトピック azure/devices/device1/messages にメッセージが転送されていることを確認します。

    bash
    topic: azure/devices/device1/messages
    payload: hello from azure

詳細設定

Azure Event GridコネクターおよびSinkの詳細設定オプションについて説明します。ダッシュボードで設定する際は、Advanced Settings を展開して用途に応じて以下のパラメータを調整できます。

コネクターの詳細設定

項目名説明デフォルト値
Message Retry Intervalメッセージ配信失敗時の再試行間隔15
Bridge Mode有効にすると、接続先ブローカーにブリッジ接続であることを通知するMQTTブリッジモードを使用無効
Max Inflight1接続あたり未アックメッセージの最大数32
Connection Pool SizeAzure Event Gridへの同時MQTT接続数。増加させるとスループット向上8
Connect TimeoutAzure Event GridへのTCP接続確立の最大待機時間10
Start Timeout自動起動リソースが正常になるまでの最大待機時間5
Health Check Interval接続の自動ヘルスチェック実行間隔15
Health Check Timeout各ヘルスチェックの最大実行時間60

Sinkの詳細設定

項目名説明デフォルト値
Buffer Pool SizeEMQXとAzure Event Grid間のデータフローを処理するバッファワーカープロセス数。負荷が高い場合は増加推奨16
Request TTLバッファ内でリクエストが有効な最大時間。超過したリクエストは破棄される45
Health Check IntervalSinkの自動ヘルスチェック実行間隔15
Health Check Interval Jitter複数ノードが同時にヘルスチェックを行わないようにランダム遅延を追加。複数のActionやSourceが同じコネクターを共有する場合に有効0 ミリ秒
Health Check TimeoutSinkヘルスチェックの最大実行時間60
Max Buffer Queue Size各バッファワーカーが保持可能な最大バイト数。バーストが多い場合は増加推奨256 MB
Query Modeasync はAzure Event Gridの書き込み確認を待たずにパブリッシュを継続。スループットは高いが順序が乱れる可能性あり。sync は確認後に処理継続Async
Inflight Window同時に許容される未アックリクエスト数。Query Modeasync の場合はクライアント単位のメッセージ順序保証のため 1 に設定推奨100

Sourceの詳細設定

項目名説明デフォルト値
Health Check IntervalSourceの自動ヘルスチェック実行間隔15
Health Check Interval Jitter複数ノードが同時にヘルスチェックを行わないようにランダム遅延を追加。複数のActionやSourceが同じコネクターを共有する場合に有効0 ミリ秒
Health Check TimeoutSourceヘルスチェックの最大実行時間60