Azure Event Hubs への MQTT データストリーミング
Azure Event Hubs は、Microsoft が提供するリアルタイムデータ取り込みのためのマネージドイベントストリーミングプラットフォームです。EMQX Platform の Azure Event Hub との統合により、高スループットシナリオにおいて信頼性の高いデータ転送および処理機能をユーザーに提供します。この統合により、Azure Event Hubs は EMQX Platform と Azure Blob Storage、Azure Stream Analytics、Azure 仮想マシン上で展開されたアプリケーションなど、さまざまな Azure クラウドサービス間のデータの仲介役を担います。現在、EMQX Platform は SASL/PLAIN 認証および Apache Kafka プロトコル互換のエンドポイントを通じて Azure Event Hub との統合をサポートしています。
本ページでは、Azure Event Hubs データ統合機能の包括的な概要と実装に関する実践的なガイドを提供します。Azure Event Hubs コネクターの作成、ルールの設定、テスト方法を解説し、さらに MQTT プロトコルを通じてシミュレートされた温度・湿度データを EMQX Platform に送信し、設定したデータ統合を介して Azure Event Hubs に保存する方法を示します。
動作の仕組み
Azure Event Hubs データ統合は、EMQX の標準機能として提供されており、ユーザーが MQTT データストリームを Azure Event Hubs とシームレスに統合し、IoT アプリケーション開発のための豊富なサービスや機能を活用できるよう支援します。
EMQX はルールエンジンと Sink を介して MQTT データを Azure Event Hubs に転送します。全体の流れは以下の通りです。
- IoT デバイスがメッセージをパブリッシュ:デバイスは特定のトピックを通じてテレメトリやステータスデータをパブリッシュし、ルールエンジンをトリガーします。
- ルールエンジンがメッセージを処理:組み込みのルールエンジンは、特定のトピックにマッチする MQTT メッセージを処理します。ルールエンジンは対応するルールをマッチングし、データ形式の変換、特定情報のフィルタリング、コンテキスト情報の付加などの処理を行います。
- Azure Event Hubs へのブリッジング:ルールはメッセージを Azure Event Hubs に転送するアクションをトリガーし、データプロパティ、オーダーキー、MQTT トピックと Azure Event Hubs ヘッダーのマッピングを簡単に設定できます。これにより、より豊かなコンテキスト情報と順序保証を持つデータ統合が可能となり、柔軟な IoT データ処理を実現します。
MQTT メッセージデータが Azure Event Hubs に書き込まれた後は、以下のような柔軟なアプリケーション開発が可能です。
- リアルタイムデータ処理・分析:Azure Event Hubs の強力なデータ処理・分析ツールやストリーミング機能を活用し、メッセージデータのリアルタイム処理と分析を行い、価値あるインサイトや意思決定支援を得られます。
- イベント駆動型機能:Azure のイベント処理をトリガーし、動的かつ柔軟な機能の起動や処理を実現します。
- データの保存・共有:メッセージデータを Azure Event Hubs のストレージサービスに送信し、大量データの安全な保存・管理を行います。これにより、他の Azure サービスと連携してデータを共有・分析し、多様なビジネスニーズに対応可能です。
特長と利点
EMQX Platform と Azure Event Hubs のデータ統合は、以下の機能と利点をビジネスにもたらします。
高性能な大量メッセージスループット:EMQX は膨大な数の MQTT クライアント接続をサポートし、毎秒数百万メッセージを継続的に Azure Event Hubs に取り込むことが可能です。これにより、極めて低いメッセージ転送および保存レイテンシを実現し、Azure Event Hubs の保持時間設定によりメッセージ量の制御も可能です。
柔軟なデータマッピング:設定した Azure Event Hubs を通じて、MQTT トピックと Azure Event Hubs イベントセンター間の柔軟なマッピングが可能です。MQTT ユーザープロパティを Azure Event Hubs ヘッダーにマッピングすることもサポートし、より豊かなコンテキスト情報と順序保証を提供します。
弾力的なスケーラビリティ対応:EMQX と Azure Event Hubs の両方が弾力的なスケーラビリティをサポートし、アプリケーション仕様に応じて数 MB から数 TB に及ぶ IoT データ規模を容易に拡張できます。
豊富なエコシステム:標準 MQTT プロトコルの採用と多様な主流 IoT 伝送プロトコルのサポートにより、EMQX はさまざまな IoT デバイスとの接続を実現します。さらに、Azure Event Hubs は Azure Functions、各種プログラミング言語 SDK、Kafka エコシステムをサポートし、デバイスからクラウドまでのシームレスな IoT データアクセスと処理を促進します。
これらの機能は統合能力と柔軟性を高め、ユーザーが大量の IoT デバイスデータを迅速に Azure と接続できるよう支援します。クラウドコンピューティングによるデータ分析・インテリジェンス機能をより便利に活用し、強力なデータ駆動型アプリケーションの構築を可能にします。
はじめる前に
本節では、EMQX Platform で Kafka データ統合を作成するための準備作業を紹介します。
前提条件
ネットワーク設定
EMQX Platform上でデプロイメント(EMQXクラスター)を作成し、ネットワークを構成する必要があります。
- 専用デプロイメントユーザーの場合:まず、VPCピアリング接続を作成してください。ピアリング接続が確立された後、内部ネットワークのIPを介してプラットフォームコンソールにログインし、対象のコネクターにアクセスできます。あるいは、NATゲートウェイを設定して、パブリックIP経由で対象のコネクターにアクセスすることも可能です。
- BYOC(Bring Your Own Cloud)デプロイメントユーザーの場合:BYOCがデプロイされているVPCと対象コネクターが存在するVPC間でピアリング接続を確立してください。ピアリング接続作成後、内部ネットワークのIPを介して対象コネクターにアクセスできます。パブリックIPアドレス経由でリソースにアクセスする必要がある場合は、パブリッククラウドコンソールでBYOCがデプロイされているVPCに対してNATゲートウェイを構成してください。
Azure Event Hub のセットアップ
Azure Event Hubs は Kafka エンドポイントを提供します。このエンドポイントにより、Event Hubs ネームスペースが Apache Kafka メッセージプロトコルおよび API をネイティブに理解できます。Event Hubs は Apache Kafka バージョン 1.0 以降をサポートしています。詳細は Apache Kafka アプリケーションから Azure Event Hubs を使用する をご参照ください。
Event Hubs ネームスペースの作成
Azure ポータルの左ナビゲーションメニューの FAVORITES から Event Hubs を選択し、ツールバーの「作成」をクリックします。作成ページで以下を実施します。
TIP
注意:Kafka 用の Event Hubs は Basic ティアではサポートされていません。
- ネームスペースを作成する サブスクリプション を選択
- 作成済みの リソースグループ を選択
- ネームスペース名を入力
- ネームスペースのリージョンを選択
- 価格ティアは Standard を選択
- ページ下部の「確認および作成」を選択
次に、ネットワークタブで プライベートネットワーク を選択し、プライベートエンドポイントを作成してください。
プライベートエンドポイントの作成
EMQX クラスターがプライベートネットワーク経由で Event Hubs にアクセスできるよう、以下の手順でプライベートエンドポイントを作成します。
ステップ1:基本情報ページで対応する サブスクリプション、リソースグループ、インスタンスの詳細を選択
ステップ2:リソースページはデフォルト設定のまま
ステップ3:仮想ネットワーク ページで、EMQX Platform VPC とピアリングされたネットワークとサブネットを選択し、プライベート IP 設定で静的 IP を選択
ステップ4:DNS ページで、プライベート DNS ゾーン の統合を「はい」に選択。作成完了後、ネームスペースのネットワークタブでプライベートエンドポイントのステータスが「成功」と表示されます。
ステップ6:プライベートエンドポイントをクリックし、FQDN と IP アドレス を記録してサポートチケットで送信してください。DevOps チームが解決策を追加します。Event Hub の作成
ネームスペース ページで、コマンドバーの + Event hub を選択。Event hub 名を入力し、「確認および作成」を選択。
SAS(共有アクセス署名)を使用した Event Hubs リソースへのアクセス認可
Event hubs ページで、コマンドバーの + Shared Access Signatures を選択。ポリシー名と対応する権限を入力。これにより、Kafka パスワードとして使用する Connection string-primary key を取得できます。
コンシューマーグループ の作成(任意)
Event hubs ページで、コマンドバーの + Consumer group をクリック。
Kafka コネクターの作成
データ統合ルールを作成する前に、Azure Event Hubs コネクターを作成してサーバーにアクセスできるようにします。
- デプロイメントに移動し、左ナビゲーションメニューから Data Integration をクリック。
- 初めてコネクターを作成する場合は、Data Forward カテゴリの下にある Azure Event Hubs を選択。すでにコネクターを作成済みの場合は、New Connector を選択し、続いて Data Forward カテゴリの Azure Event Hubs を選択。
- New Connector ページで以下を設定:
- Connector Name:システムが自動的にコネクター名を生成します。
- Bootstrap Hosts:ネームスペースのホスト名を入力。デフォルトポートは 9093。その他の項目は実際の設定に合わせて入力。
- Connection String:ネームスペースの共有アクセスポリシーの「Connection string - primary key」を入力。
- Enable TLS:Azure Event Hub への接続時は TLS がデフォルトで有効。
- Test ボタンをクリック。Azure Event Hubs サービスにアクセス可能であれば成功メッセージが返されます。
- New ボタンをクリックして作成を完了。
ルールの作成
次に、書き込むデータを指定し、処理済みデータを Azure Event Hubs に転送するアクションをルールに追加します。
ルールエリアの New Rule をクリック、または作成したコネクターの Actions 列にある新規ルールアイコンをクリック。
SQL editor にルールマッチング用の SQL 文を入力。以下の例は、
temp_hum/emqx
トピックに送信されたメッセージから報告時刻up_timestamp
、クライアント ID、メッセージ本文(ペイロード)を読み取り、温度と湿度を抽出します。sqlSELECT timestamp as up_timestamp, clientid as client_id, payload.temp as temp, payload.hum as hum FROM "temp_hum/emqx"
Enable Test を使ってデータ入力をシミュレートし、結果をテストできます。
Next をクリックしてアクション追加へ進む。
Connector ドロップダウンから先ほど作成したコネクターを選択。
以下の情報を設定:
- Event Hub Name:使用する Event Hub 名を入力。変数はサポートされません。
- Azure Event Hub Headers:Azure Event Hub にパブリッシュする際にメッセージに追加されるヘッダーのプレースホルダーを入力。
- Azure Event Hub Header value encode mode:ヘッダーの値のエンコードモードを選択。none または json が選択可能。
- Extra Azure Event Hub headers:追加のキー・バリューペアを Azure Event Hubs ヘッダーに提供可能。
- Message Key:Event Hub メッセージキー。プレーン文字列またはプレースホルダー(${var})を含む文字列を入力。
- Message Value:Event Hub メッセージ値。プレーン文字列またはプレースホルダー(${var})を含む文字列を入力。必要に応じて編集可能。温度と湿度の値を転送する例:
bash# Message value {"temp": ${temp}, "hum": ${hum}}
- Message Timestamp:使用するタイムスタンプの種類を指定。
高度な設定(任意):Max Batch Bytes、Required Acks、Partition Strategy をビジネス要件に応じて設定。
Confirm ボタンをクリックしてルール作成を完了。
Successful new rule ポップアップで Back to Rules をクリックし、データ統合設定チェーンを完了。
ルールのテスト
MQTTX を使って温度・湿度データの報告をシミュレートすることを推奨しますが、他の任意のクライアントも使用可能です。
MQTTX でデプロイメントに接続し、以下のトピックにメッセージを送信。
トピック:
temp_hum/emqx
ペイロード:
json{ "temp": "27.5", "hum": "41.8" }
Kafka 互換のコンシューマーを使い、設定した Event Hub にメッセージが書き込まれているか確認します。Python で書かれた サンプル のプロデューサー・コンシューマーを使い、イベントセンターの Kafka エンドポイントに接続してメッセージの消費結果を確認することを推奨します。
bashpython consumer.py <your-consumer-group> <topic.1> <topic.2> ... <topic.n>
Kafka CLI の使用方法の詳細は Kafka CLI を使って Azure Event Hubs for Apache Kafka Ecosystem へのメッセージ送受信 を参照してください。
EMQX Platform コンソールで運用データを確認。ルール一覧のルール ID をクリックすると、ルールの統計情報およびそのルール配下のすべてのアクションの統計情報を閲覧できます。