# Azure Event Hubs への MQTT データストリーミング

[Azure Event Hubs](https://azure.microsoft.com/en-us/products/event-hubs) は、Microsoft が提供するリアルタイムデータ取り込みのためのマネージドイベントストリーミングプラットフォームです。EMQX Platform の Azure Event Hub との統合により、高スループットシナリオにおいて信頼性の高いデータ転送および処理機能をユーザーに提供します。この統合により、Azure Event Hubs は EMQX Platform と Azure Blob Storage、Azure Stream Analytics、Azure 仮想マシン上で展開されたアプリケーションなど、さまざまな Azure クラウドサービス間のデータの仲介役を担います。現在、EMQX Platform は SASL/PLAIN 認証および Apache Kafka プロトコル互換のエンドポイントを通じて Azure Event Hub との統合をサポートしています。

本ページでは、Azure Event Hubs データ統合機能の包括的な概要と実装に関する実践的なガイドを提供します。Azure Event Hubs コネクターの作成、ルールの設定、テスト方法を解説し、さらに MQTT プロトコルを通じてシミュレートされた温度・湿度データを EMQX Platform に送信し、設定したデータ統合を介して Azure Event Hubs に保存する方法を示します。

## 動作の仕組み

Azure Event Hubs データ統合は、EMQX の標準機能として提供されており、ユーザーが MQTT データストリームを Azure Event Hubs とシームレスに統合し、IoT アプリケーション開発のための豊富なサービスや機能を活用できるよう支援します。

EMQX はルールエンジンと Sink を介して MQTT データを Azure Event Hubs に転送します。全体の流れは以下の通りです。

1. **IoT デバイスがメッセージをパブリッシュ**：デバイスは特定のトピックを通じてテレメトリやステータスデータをパブリッシュし、ルールエンジンをトリガーします。
2. **ルールエンジンがメッセージを処理**：組み込みのルールエンジンは、特定のトピックにマッチする MQTT メッセージを処理します。ルールエンジンは対応するルールをマッチングし、データ形式の変換、特定情報のフィルタリング、コンテキスト情報の付加などの処理を行います。
3. **Azure Event Hubs へのブリッジング**：ルールはメッセージを Azure Event Hubs に転送するアクションをトリガーし、データプロパティ、オーダーキー、MQTT トピックと Azure Event Hubs ヘッダーのマッピングを簡単に設定できます。これにより、より豊かなコンテキスト情報と順序保証を持つデータ統合が可能となり、柔軟な IoT データ処理を実現します。

MQTT メッセージデータが Azure Event Hubs に書き込まれた後は、以下のような柔軟なアプリケーション開発が可能です。

- リアルタイムデータ処理・分析：Azure Event Hubs の強力なデータ処理・分析ツールやストリーミング機能を活用し、メッセージデータのリアルタイム処理と分析を行い、価値あるインサイトや意思決定支援を得られます。
- イベント駆動型機能：Azure のイベント処理をトリガーし、動的かつ柔軟な機能の起動や処理を実現します。
- データの保存・共有：メッセージデータを Azure Event Hubs のストレージサービスに送信し、大量データの安全な保存・管理を行います。これにより、他の Azure サービスと連携してデータを共有・分析し、多様なビジネスニーズに対応可能です。

## 特長と利点

EMQX Platform と Azure Event Hubs のデータ統合は、以下の機能と利点をビジネスにもたらします。

**高性能な大量メッセージスループット**：EMQX は膨大な数の MQTT クライアント接続をサポートし、毎秒数百万メッセージを継続的に Azure Event Hubs に取り込むことが可能です。これにより、極めて低いメッセージ転送および保存レイテンシを実現し、Azure Event Hubs の保持時間設定によりメッセージ量の制御も可能です。

**柔軟なデータマッピング**：設定した Azure Event Hubs を通じて、MQTT トピックと Azure Event Hubs イベントセンター間の柔軟なマッピングが可能です。MQTT ユーザープロパティを Azure Event Hubs ヘッダーにマッピングすることもサポートし、より豊かなコンテキスト情報と順序保証を提供します。

**弾力的なスケーラビリティ対応**：EMQX と Azure Event Hubs の両方が弾力的なスケーラビリティをサポートし、アプリケーション仕様に応じて数 MB から数 TB に及ぶ IoT データ規模を容易に拡張できます。

**豊富なエコシステム**：標準 MQTT プロトコルの採用と多様な主流 IoT 伝送プロトコルのサポートにより、EMQX はさまざまな IoT デバイスとの接続を実現します。さらに、Azure Event Hubs は Azure Functions、各種プログラミング言語 SDK、Kafka エコシステムをサポートし、デバイスからクラウドまでのシームレスな IoT データアクセスと処理を促進します。

これらの機能は統合能力と柔軟性を高め、ユーザーが大量の IoT デバイスデータを迅速に Azure と接続できるよう支援します。クラウドコンピューティングによるデータ分析・インテリジェンス機能をより便利に活用し、強力なデータ駆動型アプリケーションの構築を可能にします。

## はじめる前に

本節では、EMQX Platform で Kafka データ統合を作成するための準備作業を紹介します。

### 前提条件

- [ルール](./rules.md)の理解
- [データ統合](./introduction.md)の理解

### ネットワーク設定

<!--@include: ./network-setting.md-->

### Azure Event Hub のセットアップ

Azure Event Hubs は Kafka エンドポイントを提供します。このエンドポイントにより、Event Hubs ネームスペースが Apache Kafka メッセージプロトコルおよび API をネイティブに理解できます。Event Hubs は Apache Kafka バージョン 1.0 以降をサポートしています。詳細は [Apache Kafka アプリケーションから Azure Event Hubs を使用する](https://learn.microsoft.com/en-us/azure/event-hubs/azure-event-hubs-kafka-overview) をご参照ください。

1. Event Hubs ネームスペースの作成

    Azure ポータルの左ナビゲーションメニューの FAVORITES から Event Hubs を選択し、ツールバーの「作成」をクリックします。作成ページで以下を実施します。

    ::: tip
    注意：Kafka 用の Event Hubs は Basic ティアではサポートされていません。
    :::

    - ネームスペースを作成する **サブスクリプション** を選択
    - 作成済みの **リソースグループ** を選択
    - ネームスペース名を入力
    - ネームスペースのリージョンを選択
    - 価格ティアは **Standard** を選択
    - ページ下部の「確認および作成」を選択

    次に、ネットワークタブで **プライベートネットワーク** を選択し、プライベートエンドポイントを作成してください。

2. プライベートエンドポイントの作成

    EMQX クラスターがプライベートネットワーク経由で Event Hubs にアクセスできるよう、以下の手順でプライベートエンドポイントを作成します。

    **ステップ1**：基本情報ページで対応する **サブスクリプション**、**リソースグループ**、インスタンスの詳細を選択  
    **ステップ2**：リソースページはデフォルト設定のまま  
    **ステップ3**：**仮想ネットワーク** ページで、EMQX Platform VPC とピアリングされたネットワークとサブネットを選択し、プライベート IP 設定で静的 IP を選択  
    **ステップ4**：DNS ページで、**プライベート DNS ゾーン** の統合を「はい」に選択。作成完了後、ネームスペースのネットワークタブでプライベートエンドポイントのステータスが「成功」と表示されます。  
    **ステップ6**：プライベートエンドポイントをクリックし、**FQDN** と **IP アドレス** を記録してサポートチケットで送信してください。DevOps チームが解決策を追加します。

3. Event Hub の作成

    **ネームスペース** ページで、コマンドバーの + Event hub を選択。Event hub 名を入力し、「確認および作成」を選択。

4. SAS（共有アクセス署名）を使用した Event Hubs リソースへのアクセス認可

    Event hubs ページで、コマンドバーの + **Shared Access Signatures** を選択。ポリシー名と対応する権限を入力。これにより、Kafka パスワードとして使用する **Connection string-primary key** を取得できます。

5. **コンシューマーグループ** の作成（任意）

    Event hubs ページで、コマンドバーの + Consumer group をクリック。

## Kafka コネクターの作成

データ統合ルールを作成する前に、Azure Event Hubs コネクターを作成してサーバーにアクセスできるようにします。

1. デプロイメントに移動し、左ナビゲーションメニューから **Data Integration** をクリック。  
2. 初めてコネクターを作成する場合は、**Data Forward** カテゴリの下にある **Azure Event Hubs** を選択。すでにコネクターを作成済みの場合は、**New Connector** を選択し、続いて **Data Forward** カテゴリの **Azure Event Hubs** を選択。  
3. **New Connector** ページで以下を設定：  
   - **Connector Name**：システムが自動的にコネクター名を生成します。  
   - **Bootstrap Hosts**：ネームスペースのホスト名を入力。デフォルトポートは 9093。その他の項目は実際の設定に合わせて入力。  
   - **Connection String**：ネームスペースの共有アクセスポリシーの「Connection string - primary key」を入力。  
   - **Enable TLS**：Azure Event Hub への接続時は TLS がデフォルトで有効。  
4. **Test** ボタンをクリック。Azure Event Hubs サービスにアクセス可能であれば成功メッセージが返されます。  
5. **New** ボタンをクリックして作成を完了。

## ルールの作成

次に、書き込むデータを指定し、処理済みデータを Azure Event Hubs に転送するアクションをルールに追加します。

1. ルールエリアの **New Rule** をクリック、または作成したコネクターの **Actions** 列にある新規ルールアイコンをクリック。  
2. **SQL editor** にルールマッチング用の SQL 文を入力。以下の例は、`temp_hum/emqx` トピックに送信されたメッセージから報告時刻 `up_timestamp`、クライアント ID、メッセージ本文（ペイロード）を読み取り、温度と湿度を抽出します。

   ```sql
    SELECT 
    timestamp as up_timestamp, 
    clientid as client_id, 
    payload.temp as temp,
    payload.hum as hum
    FROM
    "temp_hum/emqx"
   ```

   **Enable Test** を使ってデータ入力をシミュレートし、結果をテストできます。

3. **Next** をクリックしてアクション追加へ進む。  
4. **Connector** ドロップダウンから先ほど作成したコネクターを選択。  
5. 以下の情報を設定：

    - **Event Hub Name**：使用する Event Hub 名を入力。変数はサポートされません。  
    - **Azure Event Hub Headers**：Azure Event Hub にパブリッシュする際にメッセージに追加されるヘッダーのプレースホルダーを入力。  
    - **Azure Event Hub Header value encode mode**：ヘッダーの値のエンコードモードを選択。none または json が選択可能。  
    - **Extra Azure Event Hub headers**：追加のキー・バリューペアを Azure Event Hubs ヘッダーに提供可能。  
    - **Message Key**：Event Hub メッセージキー。プレーン文字列またはプレースホルダー（${var}）を含む文字列を入力。  
    - **Message Value**：Event Hub メッセージ値。プレーン文字列またはプレースホルダー（${var}）を含む文字列を入力。必要に応じて編集可能。温度と湿度の値を転送する例：

     ```bash
     # Message value
     {"temp": ${temp}, "hum": ${hum}}
     ```

    - **Message Timestamp**：使用するタイムスタンプの種類を指定。

6. 高度な設定（任意）：**Max Batch Bytes**、**Required Acks**、**Partition Strategy** をビジネス要件に応じて設定。  
7. **Confirm** ボタンをクリックしてルール作成を完了。  
8. **Successful new rule** ポップアップで **Back to Rules** をクリックし、データ統合設定チェーンを完了。

## ルールのテスト

[MQTTX](https://mqttx.app/) を使って温度・湿度データの報告をシミュレートすることを推奨しますが、他の任意のクライアントも使用可能です。

1. MQTTX でデプロイメントに接続し、以下のトピックにメッセージを送信。

   - トピック: `temp_hum/emqx`

   - ペイロード:

     ```json
     {
       "temp": "27.5",
       "hum": "41.8"
     }
     ```

2. Kafka 互換のコンシューマーを使い、設定した Event Hub にメッセージが書き込まれているか確認します。Python で書かれた [サンプル](https://github.com/Azure/azure-event-hubs-for-kafka/tree/master/quickstart/python) のプロデューサー・コンシューマーを使い、イベントセンターの Kafka エンドポイントに接続してメッセージの消費結果を確認することを推奨します。

    ```bash
    python consumer.py <your-consumer-group> <topic.1> <topic.2> ... <topic.n>
    ```

    Kafka CLI の使用方法の詳細は [Kafka CLI を使って Azure Event Hubs for Apache Kafka Ecosystem へのメッセージ送受信](https://github.com/Azure/azure-event-hubs-for-kafka/tree/master/quickstart/kafka-cli) を参照してください。

3. EMQX Platform コンソールで運用データを確認。ルール一覧のルール ID をクリックすると、ルールの統計情報およびそのルール配下のすべてのアクションの統計情報を閲覧できます。
