# Azure Blob Storage に MQTT データを取り込む
[Azure Blob Storage](https://azure.microsoft.com/en-us/products/storage/blobs/) は、マイクロソフトのクラウドベースのオブジェクトストレージソリューションで、大量の非構造化データの取り扱いに特化しています。非構造化データとは、特定のデータモデルやフォーマットに従わないデータタイプ（テキストファイルやバイナリデータなど）を指します。EMQX は MQTT メッセージを Blob Storage コンテナに効率的に保存でき、IoT データの保存に柔軟なソリューションを提供します。

本ページでは、EMQX と Azure Blob Storage 間のデータ連携について詳しく解説し、ルールおよび Sink の作成方法について実践的なガイドを提供します。

## 動作の仕組み

EMQX における Azure Blob Storage データ連携は、すぐに使える機能として提供されており、複雑な業務開発も簡単に設定可能です。典型的な IoT アプリケーションでは、EMQX がデバイス接続およびメッセージ送受信を担う IoT プラットフォームとして機能し、Azure Blob Storage はメッセージデータの保存を担当するデータストレージプラットフォームとなります。

![azure-blob-storage-architecture](./assets/azure-blob-storage-architecture.png)

EMQX はルールエンジンと Sink を利用して、デバイスのイベントやデータを Azure Blob Storage に転送します。アプリケーションは Azure Blob Storage からデータを読み取り、さらなるデータ活用を行えます。具体的なワークフローは以下の通りです。

1. **デバイスの EMQX への接続**：IoT デバイスは MQTT プロトコルで正常に接続されるとオンラインイベントをトリガーします。このイベントにはデバイスID、送信元IPアドレスなどのプロパティ情報が含まれます。
2. **デバイスメッセージのパブリッシュと受信**：デバイスは特定のトピックを通じてテレメトリや状態データをパブリッシュします。EMQX はメッセージを受信し、ルールエンジン内で比較処理を行います。
3. **ルールエンジンによるメッセージ処理**：組み込みのルールエンジンは、トピックマッチングに基づき特定のソースからのメッセージやイベントを処理します。対応するルールをマッチさせ、データフォーマット変換、特定情報のフィルタリング、コンテキスト情報の付加などの処理を実行します。
4. **Azure Blob Storage への書き込み**：ルールはメッセージをストレージコンテナに書き込むアクションをトリガーします。Azure Blob Storage Sink を使うことで、処理結果からデータを抽出し Blob Storage に送信できます。メッセージはテキストまたはバイナリ形式で保存可能で、複数行の構造化データはメッセージ内容や Sink の設定に応じて単一の CSV または JSON Lines ファイルにまとめることもできます。

イベントやメッセージデータがストレージコンテナに書き込まれた後は、Azure Blob Storage に接続してデータを読み取り、柔軟なアプリケーション開発に活用可能です。例えば：

- データアーカイブ：デバイスメッセージを Azure Blob Storage のオブジェクトとして長期保存し、コンプライアンス要件や業務ニーズに対応。
- データ分析：ストレージコンテナからデータを分析サービス（例：Snowflake）に取り込み、予知保全やデバイス効率評価などの分析を実施。

## 特長と利点

EMQX で Azure Blob Storage データ連携を利用することで、以下の特長と利点が得られます。

- **メッセージ変換**：メッセージは EMQX のルール内で高度な処理・変換が可能で、Azure Blob Storage への書き込み前に最適化できます。
- **柔軟なデータ操作**：Azure Blob Storage Sink により、特定フィールドのデータをコンテナに書き込め、コンテナ名やオブジェクトキーを動的に設定して柔軟に保存できます。
- **統合された業務プロセス**：Azure Blob Storage Sink を使うことで、デバイスデータを Azure Blob Storage の豊富なエコシステムと連携させ、データ分析やアーカイブなど多様な業務シナリオを実現可能です。
- **低コストの長期保存**：データベースと比べて、Azure Blob Storage は高可用性かつ信頼性の高いコスト効率の良いオブジェクトストレージサービスであり、長期保存に適しています。

これらの特長により、効率的で信頼性が高くスケーラブルな IoT アプリケーションを構築し、ビジネスの意思決定や最適化に役立てられます。

## はじめる前に

このセクションでは、EMQX で Azure Blob Storage Sink を作成する前の準備について説明します。

### 前提条件

- [ルール](./rules.md)の理解
- [データ連携](./data-bridges.md)の理解

### Azure Storage にコンテナを作成する

1. Azure Storage にアクセスするには Azure サブスクリプションが必要です。まだお持ちでない場合は、[無料アカウント](https://azure.microsoft.com/free/)を作成してください。

2. Azure Storage へのすべてのアクセスはストレージアカウントを通じて行われます。このクイックスタートでは、[Azure ポータル](https://portal.azure.com/)、Azure PowerShell、または Azure CLI を使ってストレージアカウントを作成します。ストレージアカウントの作成方法は [Create a storage account](https://learn.microsoft.com/en-us/azure/storage/common/storage-account-create) を参照してください。

3. Azure ポータルでコンテナを作成するには、新しく作成したストレージアカウントに移動し、左メニューの「Data storage」セクションまでスクロールして「Containers」を選択します。+ **Container** ボタンをクリックし、新しいコンテナ名に `iot-data` を入力して **Create** をクリックし、コンテナを作成します。

   ![azure-storage-container-create](./assets/azure-storage-container-create.png)

4. ストレージアカウントの **Security+Networking** -> **Access keys** に移動し、**Key** をコピーします。このキーは EMQX で Sink を設定する際に必要です。

   ![azure-storage-access-keys](./assets/azure-storage-access-keys.png)

## コネクターを作成する

Azure Blob Storage Sink を追加する前に、対応するコネクターを作成する必要があります。

1. ダッシュボードの **Integration** -> **Connector** ページに移動します。
2. 右上の **Create** ボタンをクリックします。
3. コネクタータイプとして **Azure Blob Storage** を選択し、次へ進みます。
4. コネクター名を入力します。英数字の大文字・小文字を組み合わせてください。ここでは `my-azure` と入力します。
5. 接続情報を入力します。
   - **Account Name**：ストレージアカウント名
   - **Account Key**：前のステップで取得したストレージアカウントキー
6. **Create** をクリックする前に、**Test Connectivity** をクリックしてコネクターが Azure Storage に接続できるかテストできます。
7. 最後に **Create** ボタンをクリックしてコネクターの作成を完了します。

これでコネクター作成が完了しました。次に、Azure Storage に書き込むデータを指定するルールと Sink を作成します。

## Azure Blob Storage Sink を使ったルールの作成

このセクションでは、EMQX で MQTT トピック `t/#` からのメッセージを処理し、処理結果を設定済みの Sink を通じて Azure Storage の `iot-data` コンテナに書き込むルールの作成方法を示します。

1. ダッシュボードの **Integration** -> **Rules** ページに移動します。

2. 右上の **Create** ボタンをクリックします。

3. ルールID に `my_rule` を入力し、SQLエディターに以下のルールSQLを入力します。

   ```sql
   SELECT
     *
   FROM
       "t/#"
   ```

   ::: tip

   SQL に不慣れな場合は、**SQL Examples** と **Enable Debug** をクリックしてルールSQLの学習やテストが可能です。

   :::

4. アクションを追加し、**Action Type** ドロップダウンから `Azure Blob Storage` を選択します。アクションドロップダウンはデフォルトの `create action` のままにするか、既存の Azure Blob Storage アクションを選択します。ここでは新しい Sink を作成し、ルールに追加します。

5. Sink の名前と説明を入力します。

6. コネクタードロップダウンから先ほど作成した `my-azure` を選択します。ドロップダウン横の作成ボタンをクリックするとポップアップで新規コネクターを素早く作成可能です。必要な設定パラメータは [Create a Connector](#create-a-connector) を参照してください。

7. **Container** に `iot-data` を入力します。

9. **Upload Method** を選択します。2つの方法の違いは以下の通りです。

   - **Direct Upload**：ルールがトリガーされるたびに、事前設定したオブジェクトキーと内容に従ってデータを直接 Azure Storage にアップロードします。バイナリや大きなテキストデータの保存に適していますが、多数のファイルが生成される可能性があります。
   - **Aggregated Upload**：複数のルールトリガー結果を単一ファイル（CSVなど）にまとめて Azure Storage にアップロードします。構造化データの保存に適し、ファイル数を減らして書き込み効率を向上させます。

   各方法で設定パラメータが異なります。選択した方法に応じて設定してください。

   :::: tabs type

   ::: tab Direct Upload

   Direct Upload では以下の項目を設定します。

   - **Blob Name**：コンテナ内のアップロード先オブジェクトの場所を定義します。`${var}` 形式のプレースホルダーをサポートし、`/` でディレクトリ指定も可能です。管理や識別のためにオブジェクトのサフィックスも設定します。ここでは `msgs/${clientid}_${timestamp}.json` と入力します。`${clientid}` はクライアントID、`${timestamp}` はメッセージのタイムスタンプで、各デバイスのメッセージが異なるオブジェクトに書き込まれます。
   - **Object Content**：デフォルトはすべてのフィールドを含む JSON テキスト形式です。`${var}` 形式のプレースホルダーをサポートします。ここでは `${payload}` と入力し、メッセージ本文をオブジェクト内容として使用します。オブジェクトの保存形式はメッセージ本文の形式に依存し、圧縮ファイルや画像、その他バイナリ形式もサポートします。

   :::

   ::: tab Aggregate Upload

   Aggregate Upload では以下のパラメータを設定します。

   - **Blob Name**：オブジェクトの保存パスを指定します。以下の変数が使用可能です。

     - **`${action}`**：アクション名（必須）
     - **`${node}`**：アップロードを行う EMQX ノード名（必須）
     - **`${datetime.{format}}`**：集約開始日時。`{format}` でフォーマット指定（必須）
       - **`${datetime.rfc3339utc}`**：UTC 形式の RFC3339 日時
       - **`${datetime.rfc3339}`**：ローカルタイムゾーン形式の RFC3339 日時
       - **`${datetime.unix}`**：Unix タイムスタンプ
     - **`${datetime_until.{format}}`**：集約終了日時。フォーマットは上記と同様
     - **`${sequence}`**：同一時間間隔内の集約アップロードの連番（必須）

     必須のプレースホルダーがテンプレートに含まれない場合、自動的にパスのサフィックスとして追加され重複を防ぎます。その他のプレースホルダーは無効とみなされます。

   - **Aggregation Type**：現在は CSV と JSON Lines をサポート。
      - `CSV`：カンマ区切りの CSV 形式で Azure Storage に書き込みます。
      - `JSON Lines`：[JSON Lines](https://jsonlines.org/) 形式で Azure Storage に書き込みます。

   - **Column Order**（`CSV` の場合のみ適用）：ドロップダウンでルール結果の列順を調整可能。生成される CSV ファイルは選択列が先に並び、未選択列はアルファベット順で続きます。

   - **Max Records**：最大レコード数に達すると単一ファイルの集約が完了しアップロードされ、時間間隔がリセットされます。

   - **Time Interval**：時間間隔に達すると最大レコード数に満たなくても単一ファイルの集約が完了しアップロードされ、最大レコード数がリセットされます。

   :::

   ::::

10. **フォールバックアクション（任意）**：メッセージ配信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義できます。プライマリ Sink がメッセージ処理に失敗した場合にトリガーされます。詳細は [Fallback Actions](./data-bridges.md#fallback-actions) を参照してください。

11. **Advanced Settings** を展開し、必要に応じて詳細設定を行います（任意）。詳細は [Advanced Settings](#advanced-settings) を参照してください。

12. 残りの設定はデフォルト値のままにし、**Create** ボタンをクリックして Sink 作成を完了します。作成成功後はルール作成画面に戻り、新しい Sink がルールアクションに追加されます。

13. ルール作成画面に戻り、**Create** ボタンをクリックしてルール作成全体を完了します。

これでルールの作成が完了しました。**Rules** ページで新規作成したルールを確認でき、**Actions (Sink)** タブで新しい Azure Blob Storage Sink を確認できます。

また、**Integration** -> **Flow Designer** をクリックするとトポロジーを視覚的に確認できます。トポロジーは、トピック `t/#` のメッセージがルール `my_rule` によって解析され、Azure Storage コンテナに書き込まれる流れを示します。

## ルールのテスト

このセクションでは、Direct Upload メソッドで設定したルールのテスト方法を示します。

MQTTX を使い、トピック `t/1` にメッセージをパブリッシュします。

```bash
mqttx pub -i emqx_c -t t/1 -m '{ "msg": "Hello Azure" }'
```

数件メッセージを送信した後、Azure ポータルにアクセスして `iot-data` コンテナ内のアップロード済みオブジェクトを確認します。

[Azure portal](https://portal.azure.com/) にログインし、ストレージアカウントに移動して `iot-data` コンテナを開くと、アップロードされたオブジェクトが表示されます。

## 詳細設定

このセクションでは、Azure Blob Storage Sink の詳細設定オプションについて説明します。ダッシュボードの Sink 設定画面で **Advanced Settings** を展開し、用途に応じて以下のパラメータを調整可能です。

| フィールド名               | 説明                                                                                                         | デフォルト値  |
| ------------------------- | ------------------------------------------------------------------------------------------------------------ | ------------ |
| **Buffer Pool Size**      | EMQX と Azure Storage 間のデータフローを管理するバッファワーカープロセスの数を指定します。これらのワーカーはデータを一時的に保存・処理し、ターゲットサービスへの送信を最適化し、スムーズなデータ伝送を保証します。 | `16`         |
| **Request TTL**           | バッファに入ってからリクエストが有効とみなされる最大時間（秒）を指定します。このタイマーはリクエストがバッファに入った瞬間から開始されます。TTL を超えてバッファに滞留するか、送信後に Azure Storage からの応答やアックがタイムリーに得られない場合、リクエストは期限切れと判断されます。 |              |
| **Health Check Interval** | Sink が Azure Storage との接続状態を自動的にヘルスチェックする間隔（秒）を指定します。                                               | `15`         |
| **Max Buffer Queue Size** | Azure Blob Storage Sink の各バッファワーカーがバッファリング可能な最大バイト数を指定します。バッファワーカーはデータを一時保存し、効率的にデータストリームを処理します。システム性能やデータ伝送要件に応じて調整してください。 | `256`        |
| **Query Mode**            | メッセージ送信を最適化するため、`synchronous`（同期）または `asynchronous`（非同期）リクエストモードを選択可能です。非同期モードでは Azure Storage への書き込みが MQTT メッセージのパブリッシュ処理をブロックしませんが、クライアントがメッセージを受信してから Azure Storage に到達するまで時間差が生じる可能性があります。 | `Asynchronous` |
| **Batch Size**            | EMQX から Azure Storage へ単一転送操作で送信するデータバッチの最大サイズを指定します。サイズを調整することでデータ転送の効率と性能を微調整できます。<br />「Batch Size」が `1` の場合、データレコードはバッチ化されず個別に送信されます。 | `1`           |
| **Inflight Window**       | 「インフライトキューリクエスト」とは、開始されたがまだ応答やアックを受け取っていないリクエストを指します。この設定は Sink と Azure Storage 間の通信で同時に存在可能なインフライトキューリクエストの最大数を制御します。<br/>**Request Mode** が `asynchronous` の場合、このパラメータは特に重要です。同一 MQTT クライアントからのメッセージを厳密に順序処理する必要がある場合、この値は `1` に設定してください。 | `100`        |
