Skip to content

Azure Blob Storage に MQTT データを取り込む

Azure Blob Storage は、マイクロソフトのクラウドベースのオブジェクトストレージソリューションで、大量の非構造化データを扱うために設計されています。非構造化データとは、特定のデータモデルやフォーマットに従わないデータタイプ(テキストファイルやバイナリデータなど)を指します。EMQX は MQTT メッセージを効率的に Blob Storage コンテナに保存でき、IoT データの保存に柔軟なソリューションを提供します。

本ページでは、EMQX と Azure Blob Storage 間のデータ統合について詳しく解説し、ルールおよび Sink の作成方法について実践的なガイドを提供します。

動作概要

EMQX における Azure Blob Storage データ統合はすぐに使える機能で、複雑なビジネス開発にも簡単に設定できます。典型的な IoT アプリケーションでは、EMQX がデバイス接続とメッセージ伝送を担う IoT プラットフォームとして機能し、Azure Blob Storage はメッセージデータの保存を担当するデータストレージプラットフォームとして利用されます。

azure-blob-storage-architecture

EMQX はルールエンジンと Sink を活用してデバイスのイベントやデータを Azure Blob Storage に転送します。アプリケーションは Azure Blob Storage からデータを読み取り、さらなるデータ活用を行えます。具体的なワークフローは以下の通りです:

  1. デバイスの EMQX への接続:IoT デバイスは MQTT プロトコルで正常に接続するとオンラインイベントをトリガーします。このイベントにはデバイスID、送信元IPアドレスなどのプロパティ情報が含まれます。
  2. デバイスのメッセージパブリッシュと受信:デバイスは特定のトピックを通じてテレメトリやステータスデータをパブリッシュします。EMQX はメッセージを受信し、ルールエンジン内で照合します。
  3. ルールエンジンによるメッセージ処理:組み込みのルールエンジンはトピックマッチングに基づき特定のソースからのメッセージやイベントを処理します。対応するルールにマッチしたメッセージやイベントは、データ形式の変換、特定情報のフィルタリング、コンテキスト情報の付加などの処理が行われます。
  4. Azure Blob Storage への書き込み:ルールはメッセージをストレージコンテナに書き込むアクションをトリガーします。Azure Blob Storage Sink を利用して、処理結果からデータを抽出し Blob Storage に送信します。メッセージはテキストまたはバイナリ形式で保存でき、複数行の構造化データは CSV や JSON Lines ファイルにまとめて保存可能です。これはメッセージ内容や Sink の設定によります。

イベントやメッセージデータがストレージコンテナに書き込まれた後は、Azure Blob Storage に接続してデータを読み取り、以下のような柔軟なアプリケーション開発が可能です:

  • データアーカイブ:デバイスメッセージを Azure Blob Storage のオブジェクトとして長期保存し、コンプライアンス要件やビジネスニーズに対応。
  • データ分析:ストレージコンテナのデータを Snowflake などの分析サービスに取り込み、予知保全やデバイス効率評価などのデータ分析に活用。

特長とメリット

EMQX における Azure Blob Storage データ統合を利用することで、以下の特長と利点が得られます:

  • メッセージ変換:メッセージは Azure Blob Storage への書き込み前に EMQX ルール内で高度な処理や変換が可能で、後続の保存や利用を容易にします。
  • 柔軟なデータ操作:Azure Blob Storage Sink により、特定のデータフィールドを Azure Blob Storage コンテナに簡単に書き込み可能で、コンテナやオブジェクトキーの動的設定にも対応し柔軟なデータ保存を実現します。
  • 統合されたビジネスプロセス:Azure Blob Storage Sink はデバイスデータを Azure Blob Storage の豊富なエコシステムアプリケーションと連携させ、データ分析やアーカイブなど多様なビジネスシナリオを実現します。
  • 低コストの長期保存:データベースと比較して、Azure Blob Storage は高可用性かつ信頼性の高いコスト効率の良いオブジェクトストレージサービスを提供し、長期保存に適しています。

これらの特長により、効率的で信頼性が高くスケーラブルな IoT アプリケーションを構築し、ビジネスの意思決定や最適化に役立てることができます。

はじめる前に

このセクションでは、EMQX で Azure Blob Storage Sink を作成する前に必要な準備について説明します。

前提条件

Azure Storage でコンテナを作成する

  1. Azure Storage にアクセスするには Azure サブスクリプションが必要です。まだお持ちでない場合は、無料アカウントを作成してください。

  2. Azure Storage へのすべてのアクセスはストレージアカウントを通じて行われます。このクイックスタートでは、Azure ポータル、Azure PowerShell、または Azure CLI を使ってストレージアカウントを作成します。ストレージアカウントの作成方法は ストレージアカウントの作成 を参照してください。

  3. Azure ポータルでコンテナを作成するには、新しく作成したストレージアカウントに移動します。ストレージアカウントの左メニューで「データストレージ」セクションまでスクロールし、「コンテナ」を選択します。+ コンテナ ボタンを選択し、新しいコンテナ名に iot-data を入力して 作成 をクリックします。

    azure-storage-container-create

  4. ストレージアカウントの セキュリティ+ネットワーク -> アクセスキー に移動し、キー をコピーします。このキーは EMQX で Sink を設定する際に必要です。

    azure-storage-access-keys

コネクターを作成する

Azure Blob Storage Sink を追加する前に、対応するコネクターを作成する必要があります。

  1. ダッシュボードの Integration -> Connector ページに移動します。
  2. 右上の 作成 ボタンをクリックします。
  3. コネクタータイプとして Azure Blob Storage を選択し、次へ進みます。
  4. コネクター名を入力します。英数字の組み合わせで、ここでは my-azure と入力します。
  5. 接続情報を入力します。
    • アカウント名:ストレージアカウント名
    • アカウントキー:前の手順で取得したストレージアカウントキー
  6. 作成 をクリックする前に、接続テスト をクリックしてコネクターが Azure Storage に接続できるか確認できます。
  7. 画面下部の 作成 ボタンをクリックしてコネクター作成を完了します。

これでコネクターの作成が完了し、次にルールと Sink を作成して Azure Storage サービスに書き込むデータを指定します。

Azure Blob Storage Sink を使ったルールの作成

このセクションでは、EMQX でソース MQTT トピック t/# からのメッセージを処理し、処理結果を設定済みの Sink を通じて Azure Storage の iot-data コンテナに書き込むルールの作成方法を示します。

  1. ダッシュボードの Integration -> Rules ページに移動します。

  2. 右上の 作成 ボタンをクリックします。

  3. ルールID に my_rule を入力し、SQL エディターに以下のルール SQL を入力します:

    sql
    SELECT
      *
    FROM
        "t/#"

    TIP

    SQL に不慣れな場合は、SQL ExamplesEnable Debug をクリックしてルール SQL の学習とテストが可能です。

  4. アクションを追加し、Action Type ドロップダウンリストから Azure Blob Storage を選択します。アクションのドロップダウンはデフォルトの create action のままにするか、既存の Azure Blob Storage アクションを選択します。ここでは新しい Sink を作成し、ルールに追加します。

  5. Sink の名前と説明を入力します。

  6. コネクターのドロップダウンから先ほど作成した my-azure コネクターを選択します。ドロップダウン横の作成ボタンをクリックするとポップアップで新規コネクターを素早く作成可能です。必要な設定パラメータは コネクターの作成 を参照してください。

  7. Containeriot-data と入力します。

  8. アップロード方法 を選択します。2つの方法の違いは以下の通りです:

    • 直接アップロード:ルールがトリガーされるたびに、設定済みのオブジェクトキーと内容に従ってデータを直接 Azure Storage にアップロードします。バイナリや大きなテキストデータの保存に適していますが、多数のファイルが生成される可能性があります。
    • 集約アップロード:複数のルールトリガー結果を1つのファイル(CSV など)にまとめて Azure Storage にアップロードします。構造化データの保存に適し、ファイル数を減らし書き込み効率を向上させます。

    アップロード方法ごとに設定パラメータが異なります。選択した方法に応じて設定してください:

  9. フォールバックアクション(任意):メッセージ配信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義可能です。プライマリ Sink がメッセージ処理に失敗した場合にトリガーされます。詳細は フォールバックアクション を参照してください。

  10. 詳細設定 を展開し、必要に応じて高度な設定オプションを構成します(任意)。詳細は 詳細設定 を参照してください。

  11. 残りの設定はデフォルト値のままにし、作成 ボタンをクリックして Sink の作成を完了します。作成成功後はルール作成画面に戻り、新しい Sink がルールのアクションに追加されます。

  12. ルール作成画面に戻り、作成 ボタンをクリックしてルール作成全体を完了します。

これでルールの作成が完了しました。Rules ページで新規作成したルールを確認でき、Actions (Sink) タブで新しい Azure Blob Storage Sink を確認できます。

また、Integration -> Flow Designer をクリックするとトポロジーを確認できます。トポロジーは、トピック t/# 配下のメッセージがルール my_rule によって解析され、Azure Storage コンテナに書き込まれる流れを視覚的に示します。

ルールのテスト

このセクションでは、直接アップロード方式で設定したルールのテスト方法を示します。

MQTTX を使ってトピック t/1 にメッセージをパブリッシュします:

bash
mqttx pub -i emqx_c -t t/1 -m '{ "msg": "Hello Azure" }'

数件のメッセージを送信した後、Azure ポータルにアクセスして iot-data コンテナ内のアップロードされたオブジェクトを確認します。

Azure ポータルにログインし、ストレージアカウントに移動して iot-data コンテナを開くと、アップロード済みのオブジェクトが表示されます。

詳細設定

このセクションでは、Azure Blob Storage Sink の詳細設定オプションについて解説します。ダッシュボードで Sink を設定する際に 詳細設定 を展開し、用途に応じて以下のパラメータを調整できます。

フィールド名説明デフォルト値
バッファプールサイズEMQX と Azure Storage 間のデータフローを管理するバッファワーカーの数を指定します。これらのワーカーはデータを一時的に保存・処理し、ターゲットサービスへの送信を最適化しスムーズなデータ伝送を実現します。16
リクエスト TTLバッファに入ったリクエストが有効とみなされる最大時間(秒)を指定します。リクエストがこの TTL を超えてバッファ内に留まるか、送信後に Azure Storage からの応答やアックがタイムリーに得られない場合、リクエストは期限切れと判断されます。45
ヘルスチェック間隔Sink が Azure Storage との接続状態を自動的にヘルスチェックする間隔(秒)を指定します。15
最大バッファキューサイズAzure Blob Storage Sink の各バッファワーカーがバッファリング可能な最大バイト数を指定します。バッファワーカーはデータを一時保存し、効率的にデータストリームを処理します。システム性能やデータ伝送要件に応じて調整してください。256
クエリモードメッセージ伝送を最適化するために、synchronous(同期)または asynchronous(非同期)のリクエストモードを選択できます。非同期モードでは Azure Storage への書き込みが MQTT メッセージパブリッシュをブロックしませんが、クライアントがメッセージ到着前に受信する可能性があります。Asynchronous
バッチサイズEMQX から Azure Storage へ一度に転送するデータバッチの最大サイズを指定します。サイズを調整することでデータ転送の効率と性能を微調整できます。
「バッチサイズ」が「1」の場合、データレコードはバッチ化せず個別に送信されます。
1
インフライトウィンドウ「インフライトキューリクエスト」とは、開始されたがまだ応答やアックを受け取っていないリクエストを指します。この設定は Sink と Azure Storage 間の通信で同時に存在可能なインフライトキューリクエストの最大数を制御します。
リクエストモードasynchronous の場合、同一 MQTT クライアントからのメッセージを厳密に順次処理したい場合は、この値を 1 に設定してください。
100