Skip to content

Azure Event Hubs に MQTT データをストリームする

Azure Event Hubs は、マイクロソフトが提供するスケール対応のリアルタイムデータ取り込み向けのフルマネージドイベントストリーミングサービスです。EMQX Cloud は Azure Event Hubs と連携し、MQTT データを Azure エコシステムに対して信頼性高く高スループットでストリームします。

本ドキュメントでは、EMQX Cloud と Azure Event Hubs の連携方法を紹介します。コネクターの作成、ルールの定義、連携のテストを順を追って説明し、MQTT クライアントからシミュレートした温度・湿度データを Azure Event Hubs にストリームする簡単な例を用います。

動作概要

EMQX Cloud は Kafka 互換のエンドポイントを介して Azure Event Hubs と連携し、TLS 上の SASL/PLAIN 認証を利用します。この連携は EMQX のルールエンジンとシンク機構を活用し、MQTT データを安全かつ効率的に Azure エコシステムにストリームします。

エンドツーエンドのワークフローは以下の通りです。

  1. デバイスが MQTT メッセージをパブリッシュ
    IoT デバイスが EMQX Cloud 上の特定の MQTT トピックにテレメトリデータをパブリッシュします。

  2. ルールエンジンがデータを処理
    ルールエンジンはトピックマッチングと SQL ルールに基づき受信した MQTT メッセージをフィルタリング・変換します。データの抽出、再フォーマット、追加コンテキストの付与が可能です。

  3. データが Azure Event Hubs に転送される
    マッチしたルールがトリガーとなり、処理済みメッセージを Kafka プロトコル経由で Azure Event Hubs に送信します。MQTT トピックやユーザープロパティは Event Hubs のメッセージヘッダーにマッピングでき、メッセージキーの設定により順序制御も可能です。

Azure Event Hubs に取り込まれたデータは、Stream Analytics、Blob Storage、Azure 仮想マシン上のカスタムアプリケーションなどの下流 Azure サービスでシームレスに消費でき、リアルタイム処理、ストレージ、イベント駆動型アプリケーション開発を実現します。

特長と利点

  • 高性能なメッセージスループット

    EMQX Cloud は低レイテンシで毎秒数百万件の MQTT メッセージを Azure Event Hubs に取り込み、ハイパフォーマンスな IoT ワークロードをサポートします。

  • 柔軟なデータマッピング

    MQTT トピックやユーザープロパティを Azure Event Hubs のイベントやヘッダーに柔軟にマッピングでき、より豊かなコンテキスト付与やメッセージ順序の制御が可能です。

  • 弾力的なスケーラビリティ

    EMQX Cloud と Azure Event Hubs はいずれもメガバイトからテラバイト規模のデータ量に対応可能で、アーキテクチャ変更なしにスケールアウトできます。

  • 豊富な Azure と Kafka エコシステムとの連携

    MQTT デバイス接続と Azure Event Hubs の Azure サービス、SDK、Kafka エコシステムのサポートを組み合わせることで、シームレスなエンドツーエンドの IoT データ取り込みと処理を実現します。

はじめる前に

このセクションでは、データ統合の設定前に必要な前提条件と準備について説明します。EMQX Cloud でコネクターやルールを作成する前に、ネットワーク設定、Azure リソース、アクセス認証情報が適切に準備されていることを確認してください。

前提条件

ネットワーク設定

データ統合を構成する前に、EMQX Cloudのデプロイメントを作成し、EMQX Cloudと対象サービス間のネットワーク接続を確立していることを確認してください。

  • Dedicated Flexデプロイメントの場合

    EMQX CloudのVPCと対象サービスのVPC間でVPCピアリング接続を作成します。ピアリング接続が確立されると、EMQX Cloudは対象サービスのプライベートIPアドレスを介してアクセス可能になります。

    パブリックIP経由でのアクセスが必要な場合は、NATゲートウェイを構成してアウトバウンド接続を有効にしてください。

  • BYOC(Bring Your Own Cloud)デプロイメントの場合

    BYOCデプロイメントが稼働しているVPCと対象サービスをホストするVPC間でVPCピアリング接続を作成します。ピアリングが確立されると、対象サービスのプライベートIPアドレスを介してアクセス可能になります。

    対象サービスにパブリックIP経由でアクセスする必要がある場合は、クラウドプロバイダーのコンソールを使用してBYOC VPCにNATゲートウェイを構成してください。

Azure Event Hubs ネームスペースの設定

EMQX Cloud からデータを受信するために Azure Event Hubs ネームスペースを作成する必要があります。既にネームスペースをお持ちの場合は、この手順をスキップし、プライベートエンドポイントの作成に進んでください。

  1. Azure ポータルで Event Hubs に移動し、作成 を選択します。

  2. 基本 タブで以下を設定します:

    • サブスクリプション:Azure サブスクリプションを選択

    • リソースグループ:既存のリソースグループを選択するか新規作成

    • ネームスペース名:一意の名前を入力

    • リージョン:EMQX Cloud のデプロイと同じリージョンを選択

    • 価格レベルStandard, Premium, または Dedicated を選択

      Basic レベルは Apache Kafka をサポートしておらず、使用できません。

    event_hubs_namespace

  3. 次へ: 詳細設定 をクリックし、デフォルト設定のまま進みます。

  4. 次へ: ネットワーク をクリックして続行します。

プライベートエンドポイントの作成

EMQX Cloud が Azure Event Hubs に安全にアクセスできるよう、ネームスペースにプライベートエンドポイントを設定する必要があります。

  • 新規ネームスペース作成時は、ネットワーク タブで接続方法として プライベートアクセス を選択し、+ プライベートエンドポイント をクリックします。
  • 既存ネームスペースの場合は、概要 -> ネットワーク -> プライベートアクセス -> + プライベートエンドポイント に移動します。
  1. プライベートエンドポイントの作成 パネルで以下を設定します:

    • サブスクリプションリソースグループ を選択
    • ターゲット サブリソースnamespace に設定
    • プライベート接続に使用する 仮想ネットワークサブネット を選択
  2. プライベート DNS 統合 では、プライベート DNS ゾーンと統合するはい のままにし、デフォルトのプライベート DNS ゾーンを使用します。

  3. OK をクリックしてプライベートエンドポイントを作成します。

    event_hubs_private_endpoint

  4. プライベートエンドポイントが作成され、ステータスが Succeeded になったら、プライベートエンドポイント -> DNS 構成 に移動します。

  5. 以下の情報を控えてください:

    • FQDN(例:xxx.servicebus.windows.net
    • プライベート IP アドレス

    event_hubs_dns_config

  6. FQDN と IP アドレスを EMQX Cloud にサポートの チケット で提出してください。EMQX Cloud の SRE チームがデプロイメントの DNS 解決設定を完了します。

Azure Event Hubs とアクセス ポリシーの設定

ネームスペースが準備できたら、Event Hub を作成し、EMQX Cloud で使用する接続文字列を取得します。

  1. Event Hubs ネームスペース ページで + Event Hub をクリックします。

  2. Event Hub 名を入力し、その他の設定はデフォルトのままにして 確認および作成 をクリックします。

  3. Event Hub 作成後、開いて 設定 -> 共有アクセス ポリシー に移動します。

  4. + 追加 をクリックし、ポリシー名を入力して必要な権限(例:送信 または 管理)を割り当てます。

  5. ポリシーを保存し、プライマリ接続文字列 をコピーします。

    event_hunbs_sas_policies

後ほど EMQX Cloud で Azure Event Hubs コネクターを作成する際に、この接続文字列を使用します。

コネクターの作成

Azure Event Hubs は Kafka 互換のエンドポイントを提供しており、EMQX Cloud は Apache Kafka プロトコルを使ってデータをパブリッシュできます。MQTT データを Azure Event Hubs に転送するには、まず EMQX Cloud で Azure Event Hubs コネクターを作成する必要があります。

事前に Event Hubs ネームスペースの共有アクセス ポリシーから プライマリ接続文字列 を取得しておいてください。

  1. デプロイメントにアクセスし、左側メニューから データ統合 をクリックします。
  2. コネクターを作成します:
    • コネクターがまだない場合は、データ転送 カテゴリの中から Azure Event Hubs を選択します。
    • 既にコネクターがある場合は、新規コネクター をクリックし、データ転送 の中から Azure Event Hubs を選択します。
  3. 新規コネクター ページで以下を設定します:
    • コネクター名:自動生成されますが、必要に応じて変更可能
    • Bootstrap Hosts:ネームスペースのホスト名を入力(例:xxx.servicebus.windows.net:9093
    • 接続文字列:ネームスペースの共有アクセス ポリシーからコピーしたプライマリ接続文字列を貼り付け
    • TLS を有効にする:デフォルトで有効。Azure Event Hubs への接続に必須
  4. テスト をクリックして接続を検証します。成功メッセージが表示されれば EMQX Cloud が Azure Event Hubs にアクセス可能です。
  5. 新規作成 をクリックしてコネクターを作成します。

作成したコネクターは複数のルールで再利用でき、MQTT データを Azure Event Hubs に転送できます。

ルールの作成

Azure Event Hubs コネクターを作成したら、どの MQTT データを転送し、どのように Azure Event Hubs に書き込むかを定義するルールを作成します。

  1. 新しいルールを作成します:

    • データ統合 -> ルール新規ルール をクリック、
    • または Azure Event Hubs コネクターの アクション 列にある 新規ルール アイコンをクリックします。
  2. SQL エディター にて、受信 MQTT メッセージにマッチし処理するルールを記述します。

    以下の例は temp_hum/emqx トピックからメッセージを読み取り、タイムスタンプ、クライアント ID、温度、湿度の値を抽出します。

    sql
     SELECT 
     timestamp as up_timestamp, 
     clientid as client_id, 
     payload.temp as temp,
     payload.hum as hum
     FROM
     "temp_hum/emqx"

    Try It Out 機能を使ってデータ入力をシミュレートし、クエリ結果を検証できます。

  3. 次へ をクリックしてアクションを追加します。

  4. アクション 設定で以下を指定します:

    • コネクター:作成済みの Azure Event Hubs コネクターを選択

    • Event Hub 名:転送先の Event Hub 名を入力(変数は非対応)

    • Azure Event Hub ヘッダー:MQTT データを Event Hubs メッセージヘッダーにマッピングするプレースホルダーを指定

    • Azure Event Hub ヘッダー値のエンコードモード:ヘッダー値のエンコード方法を選択(例:none または json

    • 追加の Azure Event Hub ヘッダー:(任意)追加のキー・バリュー形式ヘッダーを設定

    • メッセージキー:固定文字列またはプレースホルダー(例:${client_id})でメッセージキーを定義し、メッセージの順序制御を行う

    • メッセージ値:メッセージペイロードを定義。抽出フィールドをプレースホルダーで挿入可能:

      json
      {"temp": ${temp}, "hum": ${hum}}
    • メッセージタイムスタンプ:Event Hubs メッセージに使用するタイムスタンプの種類を選択

  5. (任意)詳細設定 を構成します。例:

    • 最大バッチバイト数
    • 必要なアック数
    • パーティション戦略
  6. 確定 をクリックしてルールを作成します。

  7. 成功ダイアログで ルールに戻る をクリックし、設定を完了します。

ルールが有効になると、マッチした MQTT メッセージは自動的に処理され、Azure Event Hubs に転送されます。

ルールのテスト

ルール作成後、MQTT メッセージをパブリッシュして Azure Event Hubs に届いていることを確認し、データ統合を検証できます。

テストデータのパブリッシュ

任意の MQTT クライアントを使ってテストメッセージをパブリッシュできます。以下は MQTTX を使用した例です。

  1. MQTTX を EMQX Cloud のデプロイメントに接続します。

  2. 以下のトピックにメッセージをパブリッシュします:

    • トピックtemp_hum/emqx

    • ペイロード

      json
      {
        "temp": "27.5",
        "hum": "41.8"
      }

Azure Event Hubs でのデータ確認

  1. Kafka 互換のコンシューマーを使って Event Hub からメッセージを読み取ります。

  2. Azure GitHub リポジトリで提供されている Python コンシューマーなど、公式の Azure Event Hubs Kafka サンプルの使用を推奨します:

    bash
    python consumer.py <consumer-group> <topic.1> <topic.2> ... <topic.n>

    これにより、EMQX Cloud からパブリッシュされたメッセージが Kafka エンドポイント経由で正常に受信されていることを検証できます。

Kafka ツールを使ったメッセージ消費の詳細は、Azure ドキュメントの Kafka CLI 利用方法を参照してください。

ルール実行の監視

EMQX Cloud コンソールでルールの実行状況も確認できます:

  1. データ統合 -> ルール に移動します。

  2. ルール ID をクリックすると、メッセージスループットやアクション実行結果などのランタイム統計が表示されます。

メッセージの正常配送やルール指標の増加が確認できれば、データ統合が期待通りに機能していることを示します。