Azure Event HubsへのMQTTデータストリーム
Azure Event Hubsは、リアルタイムのマネージドイベントストリーミングプラットフォームであり、データ取り込みに利用されます。EMQXのAzure Event Hubsとの統合により、高スループット環境下で信頼性の高いデータ転送および処理機能をユーザーに提供します。Azure Event Hubsは、EMQXとAzureの豊富なクラウドサービスアプリケーション間のデータチャネルとして機能し、IoTデータをAzure Blob Storage、Azure Stream Analytics、およびAzure仮想マシン上に展開された各種アプリケーションやサービスに統合可能です。現在、EMQXはSASL/PLAIN認証およびApache Kafkaプロトコル互換のエンドポイントを通じてAzure Event Hubsとの統合をサポートしています。
本ページでは、EMQXとAzure Event Hubs間のデータ統合について包括的に解説し、ルールおよびSinkの作成と検証に関する実践的な手順を提供します。
動作概要
Azure Event Hubsデータ統合は、EMQXの標準機能として提供されており、ユーザーがMQTTデータストリームをAzure Event Hubsとシームレスに連携し、IoTアプリケーション開発における豊富なサービスと機能を活用できるよう支援します。

EMQXはルールエンジンとSinkを介してMQTTデータをAzure Event Hubsに転送します。全体の流れは以下の通りです:
- IoTデバイスがメッセージをパブリッシュ:デバイスは特定のトピックを通じてテレメトリや状態データをパブリッシュし、ルールエンジンをトリガーします。
- ルールエンジンがメッセージを処理:組み込みのルールエンジンを用いて、特定のトピックにマッチするMQTTメッセージを処理します。ルールエンジンは対応するルールに基づき、データ形式の変換、特定情報のフィルタリング、コンテキスト情報の付加などの処理を行います。
- Azure Event Hubsへのブリッジング:ルールはメッセージをAzure Event Hubsに転送するアクションをトリガーし、データプロパティやオーダーキーの設定、MQTTトピックとAzure Event Hubsヘッダーのマッピングを容易に行えます。これにより、より豊富なコンテキスト情報と順序保証を伴うデータ統合が実現し、柔軟なIoTデータ処理が可能となります。
MQTTメッセージデータがAzure Event Hubsに書き込まれた後は、以下のような柔軟なアプリケーション開発が可能です:
- リアルタイムデータ処理と分析:強力なAzure Event Hubsのデータ処理・分析ツールおよびストリーミング機能を活用し、メッセージデータのリアルタイム処理・分析を行い、有益な洞察や意思決定支援を得られます。
- イベント駆動型機能:Azureのイベントハンドリングをトリガーし、動的かつ柔軟な機能トリガーと処理を実現します。
- データの保存と共有:メッセージデータをAzure Event Hubsのストレージサービスに送信し、大量データの安全な保存と管理を行います。これにより、他のAzureサービスと連携してデータの共有や分析が可能となり、多様なビジネスニーズに対応します。
特長と利点
EMQXとAzure Event Hubs間のデータ統合は、以下の機能とメリットをビジネスにもたらします:
- 高性能な大規模メッセージスループット:EMQXは膨大な数のMQTTクライアント接続をサポートし、毎秒数百万件のメッセージを継続的にAzure Event Hubsに取り込みます。これにより、極めて低いメッセージ転送および保存レイテンシを実現し、Azure Event Hubsの保持時間設定によりメッセージ量の制御も可能です。
- 柔軟なデータマッピング:Azure Event Hubsの設定を通じて、MQTTトピックとAzure Event Hubsのイベントセンター間で柔軟なマッピングを実現します。また、MQTTユーザープロパティをAzure Event Hubsヘッダーにマッピングすることも可能で、より豊富なコンテキスト情報と順序保証を伴うデータ統合を提供します。
- 弾力的なスケーラビリティ対応:EMQXとAzure Event Hubsは共に弾力的なスケーリングをサポートし、アプリケーション仕様に応じて数MBから数TBまでのIoTデータ規模を容易に拡張可能です。
- 豊富なエコシステム:標準MQTTプロトコルを採用し、各種主流IoT通信プロトコルに対応するEMQXは、多様なIoTデバイスとの接続を実現します。さらに、Azure Event HubsはAzure Functionsや各種プログラミング言語SDK、Kafkaエコシステムをサポートし、デバイスからクラウドまでのIoTデータアクセスと処理をシームレスに促進します。
これらの機能は統合能力と柔軟性を高め、ユーザーが大規模なIoTデバイスデータのAzure接続を迅速に実装できるよう支援します。クラウドコンピューティングがもたらすデータ分析およびインテリジェンス機能をより便利に活用し、強力なデータ駆動型アプリケーションの構築を可能にします。
はじめる前に
このセクションでは、EMQXダッシュボードでAzure Event Hubデータ統合を作成する前に完了すべき準備について説明します。
前提条件
Azure Event Hubのセットアップ
Azure Event Hubデータ統合を利用するには、Azureアカウント内でNamespaceとEvent Hubをセットアップする必要があります。以下の公式ドキュメントリンクにセットアップ方法の詳細があります。
- クイックスタート:Azureポータルを使ってイベントハブを作成する
- クイックスタート:Azure Event HubsとApache Kafkaでデータをストリームする
- EMQXが接続に使用するため、「接続文字列」の手順に従ってください。
- イベントハブの接続文字列を取得する
- Apache Kafka向けAzure Event Hubsとは
コネクターの作成
Azure Event Hubsデータ統合を作成するには、Azure Event Hubs SinkをAzure Event Hubsに接続するためのコネクターを作成する必要があります。
- EMQXダッシュボードにアクセスし、Integrations -> Connectorsをクリックします。
- ページ右上のCreateをクリックします。
- Create Connectorページで、コネクタータイプとしてAzure Event Hubsを選択し、Nextをクリックします。
- コネクターの名前と説明を入力します。名前は英数字の組み合わせで、例:
my-azure-event-hubs。 - 接続情報を設定します。
- Bootstrap Host:Namespaceのホスト名を入力します。デフォルトポートは
9093です。その他の項目は実際の環境に合わせて設定してください。 - Connection String:Namespaceの共有アクセスポリシーの「接続文字列 - プライマリキー」を入力します。詳細はイベントハブの接続文字列を取得するを参照してください。
- Enable TLS:Azure Event Hub接続時はTLSがデフォルトで有効です。TLS接続オプションの詳細は外部リソースアクセスのTLS有効化を参照してください。
- Bootstrap Host:Namespaceのホスト名を入力します。デフォルトポートは
- ページ下部のCreateボタンをクリックし、コネクター作成を完了します。
これでAzure Event Hubsがコネクター一覧(Integration -> Connector)に表示され、Connection StatusがConnectedとなっているはずです。次に、ルールとSinkを作成し、Azure Event Hubsにストリームするデータを指定します。
Azure Event Hubs Sink付きルールの作成
このセクションでは、Azure Event Hubs Sinkを追加したルールの作成方法を示します。
EMQXダッシュボードでIntegration -> Rulesをクリックします。
ページ右上のCreateをクリックします。
例として、ルールIDに
my_ruleを入力します。SQL Editorに、トピック
t/#のMQTTメッセージをAzure Event Hubsに保存する場合、以下のステートメントを入力します。注意:独自のSQL構文を指定する場合は、Sinkで必要なすべてのフィールドを
SELECT部分に含めていることを確認してください。sqlSELECT * FROM "t/#"+ Add Actionボタンをクリックし、ルールでトリガーされるアクションを定義します。Type of Actionのドロップダウンリストから
Azure Event Hubsを選択し、EMQXがルールで処理したデータをAzure Event Hubsに送信するようにします。Actionドロップダウンは
Create Actionのままにするか、既存のAzure Event Hubsアクションを選択できます。ここでは新しいSinkを作成し、ルールに追加します。Sinkの名前と説明をNameおよびDescriptionテキストボックスに入力します。
Connectorドロップダウンから先ほど作成した
my-azure-event-hubsを選択します。新しいコネクターを作成する場合は、ドロップダウン横のボタンをクリックしてください。設定パラメータはコネクターの作成を参照してください。Sink情報を設定します。
- Event Hub Name:使用するEvent Hubの名前を入力します。EMQX v5.7.2以降、このフィールドは動的トピック設定にも対応しています。詳細はKafka動的トピックの設定を参照してください。
- Azure Event Hub Headers:Azure Event Hubにパブリッシュされるメッセージに追加するヘッダーのプレースホルダーを入力します。
- Azure Event Hub Header value encode mode:ヘッダーの値のエンコードモードを選択します。選択肢は
noneまたはjsonです。 - Extra Azure Event Hub headers:AddをクリックしてAzure Event Hubsヘッダーの追加のキー・バリューを設定できます。
- Message Key:Event Hubのメッセージキーを入力します。プレーン文字列またはプレースホルダー(${var})を含む文字列が使用可能です。
- Message Value:Event Hubのメッセージ値を入力します。プレーン文字列またはプレースホルダー(${var})を含む文字列が使用可能です。
- Partition Strategy:プロデューサーがAzure Event Hubsのパーティションにメッセージを割り当てる方法を指定します。
random:各メッセージに対してランダムにパーティションを選択します。key_dispatch:Event Hubsメッセージキーのハッシュ値をパーティション番号として使用します。
- Partitions Limit:プロデューサーがメッセージを送信できる最大パーティション数を制限します。デフォルトは無効で、すべてのパーティションに送信可能です。
フォールバックアクション(任意):メッセージ配信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義できます。プライマリSinkがメッセージ処理に失敗した場合にこれらのアクションがトリガーされます。詳細はフォールバックアクションを参照してください。
詳細設定(任意):必要に応じてsyncまたはasyncクエリモードを選択します。詳細はSinkの機能を参照してください。
CreateボタンをクリックしてSink設定を完了します。Create Ruleページに戻ると、Action Outputsタブに新しいSinkが表示されます。
Create Ruleページで設定内容を確認し、Createボタンをクリックしてルールを生成します。作成したルールはルール一覧に表示されます。
これでルールが正常に作成され、Ruleページに新しいルールが表示されます。**Actions(Sink)**タブをクリックすると、新しいAzure Event Hubs Sinkを確認できます。
また、Integration -> Flow Designerをクリックするとトポロジーを表示でき、トピックt/#のメッセージがルールmy_ruleで解析され、Azure Event Hubsに送信・保存されていることが確認できます。
ルールのテスト
Azure Event Hubsデータ統合が期待通りに動作するかテストするため、MQTTXを使ってクライアントをシミュレートし、EMQXにMQTTメッセージをパブリッシュします。
- MQTTXでトピック
t/1にメッセージを送信します:
mqttx pub -i emqx_c -t t/1 -m '{ "msg": "Hello Azure Event Hub" }'- Ruleページでルール名をクリックし、統計情報を確認します。Sinkの稼働状況をチェックし、新しい送信メッセージが1件あることを確認してください。
- Kafka互換のコンシューマーを使って、設定したEvent Hubにメッセージが書き込まれているか確認します。Kafka CLIの使用方法については、Azure Event Hubs for Apache Kafka EcosystemでのKafka CLIによるメッセージ送受信を参照してください。
高度な設定
このセクションでは、コネクターのパフォーマンス最適化や特定シナリオに応じたカスタマイズ操作のための高度な設定オプションを説明します。該当オブジェクト作成時にAdvanced Settingsを展開し、ビジネスニーズに応じて以下の設定を行えます。
| 項目 | 説明 | 推奨値 |
|---|---|---|
| Allow Auto Topic Creation | (プロデューサーのみ)有効にすると、クライアントがメタデータ取得リクエストを送信した際にKafkaトピックが存在しなければ自動作成を許可します。 | Disabled |
| Connect Timeout | TCP接続確立の最大待機時間(認証有効時は認証時間も含む) | 5 秒 |
| Start Timeout | コネクターが自動起動したリソースの正常状態到達を待つ最大時間(秒)。Confluentクラスターなどの接続リソースが完全に稼働しデータ処理可能になるまで待機し、リソース作成要求に応答します。 | 5 秒 |
| Health Check Interval | コネクターの稼働状況チェック間隔 | 15 秒 |
| Health Check Timeout | Azure Event Hubsとの接続に対する自動ヘルスチェックのタイムアウト時間 | 60 秒 |
| Min Metadata Refresh Interval | クライアントがAzure Event Hubs Kafkaブローカーおよびトピックのメタデータを更新する際の最小間隔。短すぎるとKafkaサーバーへの負荷が増加する可能性があります。 | 3 秒 |
| Metadata Request Timeout | Kafkaからメタデータを要求する際の最大待機時間 | 5 秒 |
| Socket Send / Receive Buffer Size | ネットワーク伝送性能最適化のためのソケットバッファサイズ管理 | 1 MB |
| No Delay | システムカーネルがTCPソケットを即時送信するか遅延送信するかの設定。トグルをオンにすると「No Delay」が有効となり即時送信されます。オフの場合、送信内容が少量の際に約40ミリ秒の遅延が発生します。 | Enabled |
| TCP Keepalive | Kafkaブリッジ接続のTCPキープアライブ機能を有効化し、長時間の非アクティブ状態による接続切断を防止します。値はIdle, Interval, Probesの3つの数値をカンマ区切りで指定します。Idle:接続がアイドル状態となってからキープアライブプローブを送信開始するまでの秒数(Linuxデフォルトは7200秒)。 Interval:各キープアライブプローブ間の秒数(Linuxデフォルトは75秒)。 Probes:応答なしと判断するまでの最大プローブ送信回数(Linuxデフォルトは9回)。 例: 240,30,5,は240秒のアイドル後にプローブ開始、30秒間隔で送信し、5回連続応答なしで接続切断と判断します。 | none |