Azure Blob StorageへのMQTTデータ取り込み
Azure Blob Storageは、Microsoftが提供するクラウドベースのオブジェクトストレージソリューションで、大量の非構造化データの取り扱いに特化しています。非構造化データとは、特定のデータモデルやフォーマットに従わないデータタイプ(テキストファイルやバイナリデータなど)を指します。EMQX CloudはMQTTメッセージをBlob Storageコンテナに効率的に保存でき、IoTデータの保存に柔軟なソリューションを提供します。
本ページでは、EMQX CloudとAzure Blob Storage間のデータ統合について詳しく解説し、ルールおよびSinkの作成方法について実践的なガイダンスを提供します。
動作概要
EMQX CloudにおけるAzure Blob Storageのデータ統合はすぐに使える機能で、複雑なビジネス開発にも簡単に設定可能です。典型的なIoTアプリケーションでは、EMQX Cloudがデバイス接続とメッセージ送受信を担うIoTプラットフォームとして機能し、Azure Blob Storageがメッセージデータの保存プラットフォームとして利用されます。

EMQX CloudはルールエンジンとSinkを利用してデバイスイベントやデータをAzure Blob Storageに転送します。アプリケーションはAzure Blob Storageからデータを読み取り、さらなるデータ活用が可能です。具体的なワークフローは以下の通りです:
- デバイスのEMQX Cloudへの接続:IoTデバイスはMQTTプロトコルで正常に接続するとオンラインイベントをトリガーします。このイベントにはデバイスID、送信元IPアドレスなどのプロパティ情報が含まれます。
- デバイスメッセージのパブリッシュと受信:デバイスは特定のトピックを通じてテレメトリやステータスデータをパブリッシュします。EMQX Cloudはメッセージを受信し、ルールエンジン内で照合します。
- ルールエンジンによるメッセージ処理:組み込みのルールエンジンは、トピックマッチングに基づいて特定のソースからのメッセージやイベントを処理します。対応するルールをマッチングし、データフォーマット変換、特定情報のフィルタリング、コンテキスト情報の付加などの処理を行います。
- Azure Blob Storageへの書き込み:ルールはメッセージをストレージコンテナに書き込むアクションをトリガーします。Azure Blob Storage Sinkを使用して、処理結果からデータを抽出しBlob Storageに送信します。メッセージはテキストまたはバイナリ形式で保存可能で、複数行の構造化データは単一のCSVファイルにまとめて保存することもできます。これはメッセージ内容やSinkの設定によります。
イベントやメッセージデータがストレージコンテナに書き込まれた後、Azure Blob Storageに接続してデータを読み出し、以下のような柔軟なアプリケーション開発が可能です:
- データアーカイブ:デバイスメッセージをAzure Blob Storageのオブジェクトとして長期保存し、コンプライアンス要件やビジネスニーズに対応。
- データ分析:ストレージコンテナのデータをSnowflakeなどの分析サービスに取り込み、予知保全、デバイス効率評価などのデータ分析サービスに活用。
特長と利点
EMQX CloudでAzure Blob Storageのデータ統合を利用することで、以下の特長と利点をビジネスにもたらします:
- メッセージ変換:メッセージはEMQX Cloudのルール内で高度な処理や変換を経てからAzure Blob Storageに書き込まれるため、その後の保存や利用が容易になります。
- 柔軟なデータ操作:Azure Blob Storage Sinkにより、特定のデータフィールドをAzure Blob Storageコンテナに簡単に書き込め、コンテナやオブジェクトキーの動的設定もサポートし柔軟なデータ保存が可能です。
- 統合されたビジネスプロセス:Azure Blob Storage Sinkを使うことで、デバイスデータをAzure Blob Storageの豊富なエコシステムアプリケーションと組み合わせ、データ分析やアーカイブなど多様なビジネスシナリオを実現できます。
- 低コストの長期保存:データベースと比較して、Azure Blob Storageは高可用性、信頼性、コスト効率に優れたオブジェクトストレージサービスを提供し、長期保存に適しています。
これらの特長により、効率的で信頼性が高くスケーラブルなIoTアプリケーションを構築し、ビジネスの意思決定や最適化に役立てることが可能です。
はじめる前に
このセクションでは、EMQXでAzure Blob Storage Sinkを作成する前に必要な準備について説明します。
前提条件
Azure Storageでのコンテナ作成
Azure StorageにアクセスするにはAzureサブスクリプションが必要です。まだお持ちでない場合は、無料アカウントを作成してください。
Azure Storageへのすべてのアクセスはストレージアカウントを通じて行われます。このクイックスタートでは、Azureポータル、Azure PowerShell、またはAzure CLIを使ってストレージアカウントを作成します。ストレージアカウントの作成方法はストレージアカウントの作成を参照してください。
Azureポータルでコンテナを作成するには、新規作成したストレージアカウントに移動し、左メニューの「データストレージ」セクションから「コンテナ」を選択します。+ コンテナボタンをクリックし、新しいコンテナ名に
iot-dataを入力して作成をクリックします。
ストレージアカウントのセキュリティ+ネットワーク -> アクセスキーに移動し、キーをコピーします。このキーはEMQXでSinkを設定する際に必要です。

コネクターの作成
Azure Blob Storage Sinkを追加する前に、対応するコネクターを作成する必要があります。
デプロイメニューでデータ統合を選択し、データ永続化サービスカテゴリの中からAzure Blob Storageサービスを選びます。すでに他のコネクターを作成済みの場合は、新規コネクターをクリックし、同様にAzure Blob Storageサービスを選択します。
コネクター名はシステムが自動生成します。
接続情報を入力します:
- アカウント名:ストレージアカウント名
- アカウントキー:前述のストレージアカウントキー
- 詳細設定(任意):詳細設定を参照してください。
接続テストボタンをクリックし、Azure Blob Storageに正常にアクセスできれば成功メッセージが返されます。
作成ボタンをクリックしてコネクターの作成を完了します。
ルールの作成
次に、書き込むデータを指定するルールを作成し、処理済みデータをAzure Blob Storageに転送するアクションを追加します。
コネクター一覧のアクション列にある新規ルールアイコンをクリックするか、ルール一覧で新規ルールをクリックして新規ルール作成画面に入ります。
SQLエディターに以下のルールSQLを入力します:
sqlSELECT * FROM "t/#"TIP
SQLが初めての場合は、SQL例や試してみるをクリックしてルールSQLの学習や結果のテストが可能です。
次へをクリックしてアクション作成を開始します。
コネクターを利用のドロップダウンから先ほど作成したコネクターを選択します。
コンテナに
iot-dataを入力します。アップロード方法を選択します。2つの方法の違いは以下の通りです:
- 直接アップロード:ルールがトリガーされるたびに、設定済みのオブジェクトキーと内容に従ってデータを直接Azure Storageにアップロードします。バイナリや大きなテキストデータの保存に適していますが、多数のファイルが生成される可能性があります。
- 集約アップロード:複数のルールトリガー結果を1つのファイル(例:CSVファイル)にまとめてAzure Storageにアップロードします。構造化データの保存に適しており、ファイル数を減らし書き込み効率を向上させます。
各方法で設定項目が異なります。選択した方法に応じて設定してください:
必要に応じて詳細設定オプションを構成します(任意)。詳細は詳細設定を参照してください。
確定ボタンをクリックしてアクション設定を完了します。
成功メッセージのポップアップでルール一覧に戻るをクリックし、データ統合設定を完了します。
ルールのテスト
ここでは、直接アップロード方式で設定したルールのテスト方法を示します。
- MQTTXを使ってトピック
t/1にメッセージをパブリッシュします:
mqttx pub -i emqx_c -t t/1 -m '{ "msg": "Hello Azure" }'数件のメッセージ送信後、Azureポータルにログインし、ストレージアカウントに移動して
iot-dataコンテナを開きます。アップロードされたオブジェクトが確認できるはずです。EMQX Cloudコンソールで実行時データを確認します。ルール一覧でルールIDをクリックすると、ルールの統計情報やこのルールに紐づくすべてのアクションの実行時統計ページが表示されます。