# Azure Blob Storage に MQTT データを取り込む

[Azure Blob Storage](https://azure.microsoft.com/en-us/products/storage/blobs/) は、マイクロソフトのクラウドベースのオブジェクトストレージソリューションで、大量の非構造化データの取り扱いに特化しています。非構造化データとは、特定のデータモデルや形式に従わないデータタイプ（テキストファイルやバイナリデータなど）を指します。EMQX Platform は MQTT メッセージを効率的に Blob Storage コンテナに保存でき、IoT データの保存に柔軟なソリューションを提供します。

本ページでは、EMQX Platform と Azure Blob Storage 間のデータ統合について詳しく解説し、ルールおよび Sink の作成方法について実践的なガイダンスを提供します。

## 動作概要

EMQX Platform における Azure Blob Storage データ統合は、すぐに利用可能な機能であり、複雑なビジネス開発に簡単に設定できます。典型的な IoT アプリケーションでは、EMQX Platform がデバイス接続とメッセージ伝送を担う IoT プラットフォームとして機能し、Azure Blob Storage がメッセージデータの保存を担当するデータストレージプラットフォームとして機能します。

![EMQX Platform Azure Blob Storage](./_assets/data_integration_azure-blob-storage.png)

EMQX Platform はルールエンジンと Sink を利用してデバイスイベントやデータを Azure Blob Storage に転送します。アプリケーションは Azure Blob Storage からデータを読み取り、さらなるデータ活用が可能です。具体的なワークフローは以下の通りです：

1. **デバイスの EMQX Platform への接続**：IoT デバイスは MQTT プロトコルで正常に接続されるとオンラインイベントをトリガーします。このイベントにはデバイスID、送信元IPアドレス、その他のプロパティ情報が含まれます。
2. **デバイスメッセージのパブリッシュと受信**：デバイスは特定のトピックを通じてテレメトリおよびステータスデータをパブリッシュします。EMQX Platform はメッセージを受信し、ルールエンジン内で照合します。
3. **ルールエンジンによるメッセージ処理**：組み込みのルールエンジンは、トピックマッチングに基づき特定のソースからのメッセージやイベントを処理します。対応するルールにマッチしたメッセージやイベントは、データ形式変換、特定情報のフィルタリング、コンテキスト情報の付加などの処理が行われます。
4. **Azure Blob Storage への書き込み**：ルールはメッセージをストレージコンテナに書き込むアクションをトリガーします。Azure Blob Storage Sink を使用することで、処理結果からデータを抽出し Blob Storage に送信できます。メッセージはテキストまたはバイナリ形式で保存でき、メッセージ内容や Sink の設定に応じて複数行の構造化データを単一の CSV ファイルにまとめることも可能です。

イベントやメッセージデータがストレージコンテナに書き込まれた後は、Azure Blob Storage に接続してデータを読み取り、以下のような柔軟なアプリケーション開発が可能です：

- データアーカイブ：デバイスメッセージを Azure Blob Storage のオブジェクトとして保存し、長期保存によるコンプライアンス要件やビジネスニーズに対応。
- データ分析：ストレージコンテナからデータを分析サービス（例：Snowflake）に取り込み、予知保全やデバイス効率評価などのデータ分析サービスを実施。

## 特長と利点

EMQX Platform における Azure Blob Storage データ統合を利用することで、以下の特長と利点が得られます：

- **メッセージ変換**：メッセージは Azure Blob Storage への書き込み前に EMQX Platform のルールで高度な処理や変換が可能で、後続の保存や利用を容易にします。
- **柔軟なデータ操作**：Azure Blob Storage Sink により、特定のデータフィールドを Azure Blob Storage コンテナに簡単に書き込め、コンテナやオブジェクトキーの動的設定もサポートし柔軟なデータ保存を実現します。
- **統合されたビジネスプロセス**：Azure Blob Storage Sink を利用することで、デバイスデータを Azure Blob Storage の豊富なエコシステムアプリケーションと連携でき、データ分析やアーカイブなど多様なビジネスシナリオを実現します。
- **低コストの長期保存**：データベースと比較して、Azure Blob Storage は高可用性と信頼性を持つコスト効率の高いオブジェクトストレージサービスであり、長期保存に適しています。

これらの特長により、効率的で信頼性が高くスケーラブルな IoT アプリケーションの構築と、ビジネス意思決定や最適化に役立てることが可能です。

## はじめる前に

本節では、EMQX で Azure Blob Storage Sink を作成する前の準備について説明します。

### 前提条件

- [データ統合](./introduction.md)の理解
- [ルール](./rules.md)の理解
- Azure Storage へのパブリックアクセスをサポートするために [NAT ゲートウェイ](../vas/nat-gateway.md) を有効化

### Azure Storage でコンテナを作成する

1. Azure Storage にアクセスするには Azure サブスクリプションが必要です。まだお持ちでない場合は、[無料アカウント](https://azure.microsoft.com/free/)を作成してください。

2. Azure Storage へのすべてのアクセスはストレージアカウントを通じて行われます。このクイックスタートでは、[Azure ポータル](https://portal.azure.com/)、Azure PowerShell、または Azure CLI を使ってストレージアカウントを作成します。ストレージアカウントの作成方法は [ストレージアカウントの作成](https://learn.microsoft.com/en-us/azure/storage/common/storage-account-create) を参照してください。

3. Azure ポータルでコンテナを作成するには、新しく作成したストレージアカウントに移動します。ストレージアカウントの左メニューで「データストレージ」セクションまでスクロールし、「コンテナ」を選択します。+ **コンテナ** ボタンをクリックし、新しいコンテナ名に `iot-data` を入力して **作成** をクリックします。

   ![azure-storage-container-create](./_assets/azure-storage-container-create.png)

4. ストレージアカウントの **セキュリティ+ネットワーク** -> **アクセスキー** に移動し、**キー** をコピーします。EMQX で Sink を設定する際にこのキーが必要です。

   ![azure-storage-access-keys](./_assets/azure-storage-access-keys.png)

## コネクターを作成する

Azure Blob Storage Sink を追加する前に、対応するコネクターを作成する必要があります。

1. デプロイメントメニューで **データ統合** を選択し、データ永続化サービスカテゴリの中から Azure Blob Storage サービスを選択します。すでに他のコネクターを作成済みの場合は、**新規コネクター** をクリックし、同様に Azure Blob Storage サービスを選択します。

2. **コネクター名**：システムが自動的に名前を生成します。

3. 接続情報を入力します：

   - **アカウント名**：ストレージアカウント名
   - **アカウントキー**：前のステップで取得したストレージアカウントキー
   - **詳細設定（任意）**：[詳細設定](https://docs.emqx.com/en/emqx/latest/data-integration/azure-blob-storage.html#advanced-settings) を参照してください。

4. **接続テスト** ボタンをクリックし、Azure Blob Storage に正常にアクセスできれば成功メッセージが表示されます。

5. **作成** ボタンをクリックし、コネクターの作成を完了します。

## ルールを作成する

次に、書き込むデータを指定するルールを作成し、処理済みデータを Azure Blob Storage に転送するアクションを追加します。

1. コネクター一覧の **アクション** 列にある新規ルールアイコンをクリックするか、**ルール一覧** の **新規ルール** をクリックして、**新規ルール作成** ページに入ります。

2. SQL エディターに以下のルール SQL を入力します：

   ```sql
   SELECT
     *
   FROM
       "t/#"
   ```

   ::: tip

   SQL に不慣れな場合は、**SQL Examples** と **Enable Test** をクリックしてルール SQL の学習とテストが可能です。

   :::

3. **次へ** をクリックしてアクション作成を開始します。

4. **コネクターを使用** のドロップダウンから、先に作成したコネクターを選択します。

5. **コンテナ** に `iot-data` と入力します。

6. **アップロード方法** を選択します。2つの方法の違いは以下の通りです：

   - **直接アップロード**：ルールがトリガーされるたびに、設定済みのオブジェクトキーと内容に従ってデータを直接 Azure Storage にアップロードします。バイナリや大きなテキストデータの保存に適していますが、多数のファイルが生成される可能性があります。
   - **集約アップロード**：複数のルールトリガー結果を単一ファイル（例：CSVファイル）にまとめて Azure Storage にアップロードします。構造化データの保存に適し、ファイル数を減らし書き込み効率を向上させます。

   各方法で設定項目が異なります。選択した方法に応じて設定してください：

   :::: tabs type

   ::: tab 直接アップロード

   直接アップロードでは以下の項目を設定します：

   - **Blob 名**：コンテナ内にアップロードするオブジェクトの場所を定義します。`${var}` 形式のプレースホルダーをサポートし、`/` で保存ディレクトリを指定可能です。管理や識別のためにオブジェクトのサフィックスも設定します。ここでは `msgs/${clientid}_${timestamp}.json` と入力します。`${clientid}` はクライアントID、`${timestamp}` はメッセージのタイムスタンプで、各デバイスのメッセージを異なるオブジェクトに書き込みます。
   - **オブジェクト内容**：デフォルトはすべてのフィールドを含む JSON テキスト形式です。`${var}` 形式のプレースホルダーをサポートします。ここでは `${payload}` と入力し、メッセージ本文をオブジェクト内容として使用します。オブジェクトの保存形式はメッセージ本文の形式に依存し、圧縮ファイル、画像、その他バイナリ形式もサポートします。

   :::

   ::: tab 集約アップロード

   集約アップロードでは以下のパラメーターを設定します：

   - **Blob 名**：オブジェクトの保存パスを指定します。以下の変数が使用可能です：

     - **`${action}`**：アクション名（必須）
     - **`${node}`**：アップロードを実行する EMQX ノード名（必須）
     - **`${datetime.{format}}`**：集約開始日時。`{format}` でフォーマットを指定（必須）：
       - **`${datetime.rfc3339utc}`**：UTC フォーマットの RFC3339 日時
       - **`${datetime.rfc3339}`**：ローカルタイムゾーンの RFC3339 日時
       - **`${datetime.unix}`**：Unix タイムスタンプ
     - **`${datetime_until.{format}}`**：集約終了日時。フォーマットは上記と同様
     - **`${sequence}`**：同一時間間隔内の集約アップロードのシーケンス番号（必須）

     必須のプレースホルダーがテンプレートに含まれていない場合は、自動的に Blob 名のパスサフィックスとして追加され重複を回避します。その他のプレースホルダーは無効とみなされます。

   - **集約タイプ**：現状は CSV のみ対応。カンマ区切りの CSV 形式で Azure Storage に書き込みます。

   - **カラム順序**：ドロップダウンでルール結果のカラム順を調整可能。生成される CSV ファイルは選択されたカラムを優先的にソートし、未選択カラムはアルファベット順で続きます。

   - **最大レコード数**：最大レコード数に達すると、単一ファイルの集約が完了しアップロードされ、時間間隔がリセットされます。

   - **時間間隔**：時間間隔に達すると、最大レコード数に達していなくても単一ファイルの集約が完了しアップロードされ、最大レコード数がリセットされます。

   ::: 

   ::::

7. 必要に応じて詳細設定オプションを構成します（任意）。詳細は [詳細設定](https://docs.emqx.com/en/emqx/latest/data-integration/azure-blob-storage.html#advanced-settings) を参照してください。

8. **確認** ボタンをクリックしてアクション設定を完了します。

9. 成功メッセージのポップアップで **ルール一覧に戻る** をクリックし、データ統合設定を完了します。

## ルールのテスト

本節では、直接アップロード方式で設定したルールのテスト方法を示します。

1. MQTTX を使ってトピック `t/1` にメッセージをパブリッシュします：

```bash
mqttx pub -i emqx_c -t t/1 -m '{ "msg": "Hello Azure" }'
```

2. 数件のメッセージ送信後、[Azure ポータル](https://portal.azure.com/)にログインし、ストレージアカウントに移動して `iot-data` コンテナを開きます。アップロードされたオブジェクトがコンテナ内に表示されているはずです。

3. EMQX Platform コンソールでランタイムデータを確認します。ルール一覧のルールIDをクリックすると、ルールの統計情報とこのルールに紐づくすべてのアクションの統計をランタイム統計ページで閲覧できます。
