Skip to content

DatabricksへのMQTTデータ取り込み

Databricksは、Apache Sparkを基盤とした統合データ分析プラットフォームであり、大規模なデータエンジニアリング、機械学習、共同分析に対応しています。EMQXは、Databricksが管理するAmazon S3バケットにMQTTデータを書き込み、Databricksが外部ロケーション経由で直接クエリできる形でDatabricksと連携します。

本ページでは、EMQXとDatabricks間のデータ統合について詳しく解説し、コネクターおよびSinkの作成方法を実践的に案内します。

動作の仕組み

EMQXにおけるDatabricksデータ統合は、Amazon S3統合を基盤としています。EMQXはMQTTデータをDatabricksが管理するS3バケットに書き込み、Databricksは外部ロケーションを介してこのバケットにアクセスし、保存されたデータに対して直接SQLクエリを実行します。

EMQX Databricksデータ統合

具体的なワークフローは以下の通りです:

  1. デバイスのEMQXへの接続:IoTデバイスはMQTTプロトコルで正常に接続するとオンラインイベントをトリガーします。このイベントにはデバイスID、送信元IPアドレスなどのプロパティ情報が含まれます。
  2. デバイスからのメッセージパブリッシュと受信:デバイスは特定のトピックを通じてテレメトリやステータスデータをパブリッシュします。EMQXはこれらのメッセージを受信し、ルールエンジン内で比較処理を行います。
  3. ルールエンジンによるメッセージ処理:組み込みのルールエンジンは、トピックマッチングに基づいて特定のソースからのメッセージやイベントを処理します。該当するルールにマッチしたメッセージやイベントに対し、データ形式変換、特定情報のフィルタリング、コンテキスト情報の付加などを実施します。
  4. Amazon S3への書き込み:ルールはAmazon S3 Sinkをトリガーし、処理済みデータをDatabricksワークスペースに紐づくS3バケットへ書き込みます。
  5. DatabricksによるS3からの読み込み:Databricksは外部ロケーション経由でS3バケット内のデータを直接クエリし、リアルタイム分析や機械学習ワークフローを実現します。

特長と利点

EMQXのDatabricksデータ統合を利用することで、以下のような特長とメリットが得られます:

  • メッセージ変換:EMQXのルール内でメッセージを高度に処理・変換してからS3に書き込むため、その後の保存や分析が容易になります。
  • 柔軟なデータ操作:Amazon S3 Sinkを使い、Databricks管理のS3バケットに特定のデータフィールドを柔軟に書き込めます。動的なオブジェクトキー設定にも対応しています。
  • 統合分析プラットフォーム:EMQXとDatabricksを連携することで、IoTデータを即座にDatabricksワークスペース内のSQL分析、機械学習、データエンジニアリングパイプラインに活用可能です。
  • 低コストな長期保存:S3を基盤ストレージに利用することで、高可用性かつ信頼性の高いコスト効率の良いデータ保存が可能であり、大規模IoTワークロードに適しています。

はじめる前に

このセクションでは、EMQXでDatabricks向けのAmazon S3コネクターおよびSinkを作成する前に必要な準備について説明します。

前提条件

以下の内容にあらかじめ習熟していることを推奨します:

EMQXの概念:

  • ルールエンジン:MQTTメッセージからデータを抽出・変換するロジックを定義する方法を理解する。
  • データ統合:EMQXにおけるコネクターとSinkの概念を理解する。

Databricksの概念:

  • ワークスペース:Databricksのすべての資産にアクセスする環境。
  • 外部ロケーション:外部のS3パスをマッピングし、そこに保存されたデータをSQLで直接クエリ可能にするDatabricksの機能。
  • ストレージ認証情報:外部ストレージの読み書き権限を付与するDatabricksのアクセス認証情報。

AWS MarketplaceでのDatabricksセットアップ

ここでは、AWS MarketplaceでDatabricksをサブスクライブする例を用いてデプロイ手順を説明します。

  1. AWS MarketplaceでDatabricksをサブスクライブします。Databricksアカウントとワークスペース作成の案内が表示されます。

  2. サブスクライブ後、ワークスペースを作成します。リージョンとストレージオプションを選択し、Createをクリックします。

    Databricksワークスペース作成

    ワークスペース作成後、Workspacesリストに表示されます。ワークスペースに自動プロビジョニングされたS3バケット名(例:databricks-workspace-stack-142ec-bucket)を控えてください。このバケットにEMQXからのMQTTデータを保存します。

    Databricksワークスペース一覧

  3. ワークスペースを開き、Catalog -> External locationsに移動し、EMQXがデータを書き込むS3パスを指す外部ロケーションを作成します。

    Databricks外部ロケーション

    Create locationをクリックし、Storage typeS3に設定、URLs3://databricks-workspace-stack-142ec-bucket/emqx-iot-data-newを入力し、Storage credentialを選択します。

    外部ロケーション作成

  4. S3バケットへの読み書き権限を持つIAMユーザーまたはロールのAWSアクセス認証情報(Access Key IDとSecret Access Key)を取得します。これらはEMQXコネクターの設定に使用します。

DatabricksワークスペースとS3バケットの設定が完了したら、EMQXでコネクターおよびSinkの作成準備が整います。

コネクターの作成

Amazon S3 Sinkを追加する前に、対応するコネクターを作成します。

  1. ダッシュボードのIntegration -> Connectorsページに移動します。
  2. 右上のCreateボタンをクリックします。
  3. コネクタータイプとしてAmazon S3を選択し、Nextをクリックします。
  4. コネクター名を入力します。名前は英数字で始まり、英数字、ハイフン、アンダースコアを含めることができます。ここでは例としてmy-databricksと入力します。
  5. 接続情報を入力します:
    • Host:DatabricksワークスペースがデプロイされているAWSリージョンのS3エンドポイント(例:s3.{region}.amazonaws.com)を入力。
    • Port443を入力。
    • Access Key IDおよびSecret Access KeyAWS MarketplaceでのDatabricksセットアップで取得した認証情報を入力。
  6. その他の設定はデフォルト値のままにします。
  7. Createをクリックする前に、Test Connectivityを押してEMQXがS3サービスに接続できるか確認可能です。
  8. Createをクリックしてコネクター作成を完了します。作成成功のダイアログが表示され、ルールを今すぐ作成するか尋ねられます。Create Ruleをクリックすると、コネクターが事前選択された状態でルール作成画面に進みます。後で作成する場合はBack To Connector Listをクリックします。

Amazon S3 Sinkを使ったルールの作成

このセクションでは、EMQXでソースMQTTトピックt/#からのメッセージを処理し、処理結果をDatabricks管理のS3バケットに書き込むルールの作成方法を示します。

  1. 前のステップでCreate Ruleをクリックした場合、Add Actionパネルが自動で開き、Type of ActionAmazon S3、コネクターが事前選択されています。ステップ5に進んでください。そうでなければ、ダッシュボードのIntegration -> Rulesページに移動し、右上のCreateをクリックします。

  2. ルールIDを入力し、SQLエディターに以下のルールSQLを入力します:

    sql
    SELECT
      *
    FROM
        "t/#"

    TIP

    初心者の方はSQL Examplesをクリックし、Enable Testを有効にしてSQLルールの学習とテストが可能です。

  3. 右側の**+ Add Actionをクリックし、Add ActionパネルでType of Action**ドロップダウンからAmazon S3を選択し、ActionはデフォルトのCreate Actionのままにします。

  4. Connectorsドロップダウンから先ほど作成したmy-databricksコネクターを選択します。ドロップダウン横の作成ボタンを押すと、ポップアップで新規コネクターを素早く作成可能です。必要な設定項目はコネクターの作成を参照してください。

  5. Sinkの名前と任意で説明を入力します。

  6. Bucketdatabricks-workspace-stack-142ec-bucketを入力します。このフィールドは${var}形式のプレースホルダーもサポートしますが、対応するバケットがS3に存在することを確認してください。

  7. 必要に応じてACLを選択し、アップロードされるオブジェクトのアクセス権限を指定します。

  8. Upload Methodを選択します:

    • Direct Upload:ルールがトリガーされるたびに、設定済みのオブジェクトキーと内容に従いデータを直接S3にアップロードします。バイナリや大きなテキストデータの保存に適しています。
    • Aggregated Upload:複数回のルールトリガー結果を1つのファイル(例:CSVファイル)にまとめてS3にアップロードします。構造化データの保存に適し、ファイル数を減らし書き込み効率を向上させます。

    選択した方法により設定項目が異なります。以下のタブから該当する方法の設定を行ってください。

  9. フォールバックアクション(任意):メッセージ配信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義できます。詳細はフォールバックアクションを参照してください。

  10. Advanced Settingsを展開し、必要に応じて詳細設定を行います(任意)。詳細は詳細設定を参照してください。

  11. その他の設定はデフォルト値のままにします。Createをクリックする前に、Test Connectivityを押してSinkがS3サービスに接続できるか確認可能です。

  12. CreateをクリックしてSink作成を完了します。作成成功後、ルール作成画面に戻り、新規Sinkがルールアクションに追加されます。

  13. ルール作成画面でSaveをクリックし、ルール作成全体を完了します。

これでルールが正常に作成されました。Rulesページで新規ルールを確認でき、**Actions (Sink)**タブで新しいAmazon S3 Sinkが表示されます。

ルールのテスト

MQTTXを使ってトピックt/1にメッセージをパブリッシュします:

bash
mqttx pub -i emqx_c -t t/1 -m '{ "msg": "hello Databricks" }'

数回メッセージを送信した後、DatabricksワークスペースでWorkspaceを右クリックし、Create -> Notebookを選択して新しいノートブックを作成します。

ノートブック作成

ノートブック内で外部ロケーションに対してSQLクエリを実行し、データが正常に取り込まれていることを確認します:

sql
SELECT * FROM json.`s3://databricks-workspace-stack-142ec-bucket/emqx-iot-data-new/`

Databricksクエリ結果

詳細設定

このセクションでは、Amazon S3 Sinkの詳細設定オプションについて説明します。ダッシュボードでSinkを設定する際にAdvanced Settingsを展開し、用途に応じて以下のパラメーターを調整できます。

項目名説明デフォルト値
Buffer Pool SizeEMQXとS3間のデータフローを管理するバッファワーカープロセスの数を指定します。16
Request TTLリクエストがバッファに入ってから有効とみなされる最大時間(秒)を指定します。45
Health Check IntervalSinkがS3との接続状態を自動的にヘルスチェックする間隔(秒)を指定します。15
Health Check Interval Jitter複数ノードが同時にヘルスチェックを開始する確率を減らすために、基本間隔に加える一様ランダム遅延です。0ミリ秒
Health Check TimeoutコネクターがS3との接続状態をヘルスチェックする際のタイムアウト時間を指定します。60
Max Buffer Queue SizeS3 Sinkの各バッファワーカープロセスがバッファリング可能な最大バイト数を指定します。256 MB
Query Modeメッセージ送信の最適化のため、synchronous(同期)またはasynchronous(非同期)のリクエストモードを選択します。Asynchronous
In-flight WindowSinkがS3と通信中に同時に存在可能なインフライトキューリクエストの最大数を制御します。100
Min Part Size集約完了後のパートアップロードにおける最小チャンクサイズを指定します。5MB
Max Part Sizeパートアップロードの最大チャンクサイズを指定します。5GB