Skip to content

Apache IoTDB に MQTT データを取り込む

Apache IoTDB は、多種多様な IoT デバイスやシステムから生成される膨大な時系列データを処理するために設計された高性能かつスケーラブルな時系列データベースです。EMQX プラットフォームは Apache IoTDB とのデータ統合をサポートしており、軽量な MQTT プロトコルを介してデータを Apache IoTDB の REST API V2 にシームレスに転送できます。このデータ統合は一方向のデータフローを実現します。EMQX プラットフォームからの MQTT メッセージは IoTDB データベースに書き込まれ、EMQX プラットフォームの優れたリアルタイムデータ取り込み能力と IoTDB の時系列データ保存およびクエリ性能を活用します。この強力な組み合わせは、IoT データを効果的に管理したい企業にとって堅実な基盤となります。

本ページでは、EMQX プラットフォームと Apache IoTDB 間のデータ統合について包括的に紹介し、データ統合の作成および検証に関する実践的な手順を提供します。

動作概要

Apache IoTDB データ統合は、EMQX プラットフォームに標準搭載された機能であり、生の MQTT ベースの時系列データと IoTDB の強力なデータ保存機能との橋渡しを目的としています。組み込みの ルールエンジン コンポーネントにより、EMQX プラットフォームから IoTDB へのデータ取り込みを簡素化し、複雑なコーディングを不要にします。

以下の図は、EMQX と IoTDB 間のデータ統合の典型的なアーキテクチャを示しています。

EMQX Integration Apache IoTDB

データ統合のワークフローは以下の通りです:

  1. メッセージのパブリッシュと受信:接続された車両、IIoT システム、エネルギー管理プラットフォームなどのデバイスは、MQTT プロトコルを通じて EMQX プラットフォームに正常に接続し、稼働状態、測定値、またはトリガーされたイベントに基づいて MQTT メッセージを送信します。EMQX プラットフォームがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理が開始されます。
  2. メッセージデータの処理:メッセージが到着すると、ルールエンジンを通過し、EMQX プラットフォームで定義されたルールにより処理されます。ルールは事前定義された条件に基づき、どのメッセージを IoTDB にルーティングするかを決定します。ペイロード変換が指定されている場合は、データ形式の変換、特定情報のフィルタリング、追加コンテキストによるペイロードの強化などの変換が適用されます。
  3. データのバッファリング:IoTDB が利用できない場合にデータ損失を防ぐため、EMQX はインメモリのメッセージバッファを提供します。データは一時的にバッファに保持され、メモリ過負荷を防ぐためにディスクにオフロードされることもあります。ただし、データ統合や EMQX ノードが再起動されるとデータは保持されません。
  4. IoTDB へのデータ取り込み:ルールエンジンが IoTDB への保存対象メッセージを特定すると、メッセージ転送アクションがトリガーされます。処理済みデータは時系列データとして IoTDB にシームレスに書き込まれます。
  5. データの保存と活用:データが IoTDB に保存された後、企業はそのクエリ機能を活用して様々なユースケースに対応できます。例えば、コネクテッドビークル分野では、蓄積されたデータを用いて車両の状態管理、リアルタイム指標に基づくルート最適化、資産追跡などに活用できます。同様に IIoT 環境では、機械の状態監視、メンテナンス予測、生産スケジュールの最適化などに利用されます。

特長とメリット

IoTDB とのデータ統合は、効果的なデータ処理と保存を実現するために特化した多くの特長とメリットを提供します:

  • 効率的なデータ収集

    EMQX プラットフォームと IoTDB を統合することで、リソースが限られた IoT デバイスから軽量な MQTT メッセージングプロトコルを通じて時系列データを効率的に収集し、データベースに取り込むことができ、信頼性の高い効率的なデータ収集を実現します。

  • 柔軟なデータ変換

    EMQX プラットフォームは強力な SQL ベースのルールエンジンを提供し、IoTDB に保存する前にデータを前処理できます。フィルタリング、ルーティング、集約、強化など多様なデータ変換機能をサポートし、組織のニーズに応じてデータを整形可能です。

  • スケーラビリティと高スループット

    EMQX プラットフォームは水平スケーラビリティを考慮して設計されており、増加する IoT デバイス群からのメッセージトラフィックを容易に処理します。このソリューションはデータ量の増加に柔軟に対応し、高い同時アクセスをサポートします。結果として、IoT の展開規模が拡大しても、時系列データの取り込み、保存、処理の要求に対応できます。

  • 最適化された時系列ストレージ

    IoTDB はタイムスタンプ付きデータの保存に最適化されており、時間パーティショニング、圧縮、データ保持ポリシーを活用して大量の時系列データを効率的に管理します。これにより、ストレージの占有を最小限に抑えつつ高性能を維持し、膨大な時系列データを生成する IoT ワークロードに不可欠な機能を提供します。

  • 高速かつ複雑なクエリ

    IoTDB は豊富なクエリセマンティクスを備え、デバイスやセンサー間の時系列データの時間整合、時系列フィールドでの計算(周波数領域変換)、時間軸における多彩な集約関数をサポートします。また、Apache Hadoop、Spark、Flink と深く統合されており、より強力な分析機能を提供します。EMQX は IoTDB とシームレスに統合し、MQTT データの保存と分析の統一ソリューションを実現します。

はじめる前に

本セクションでは、EMQX プラットフォームで Apache IoTDB データ統合を作成するための準備作業を紹介します。

前提条件

ネットワーク設定

EMQX Platform上でデプロイメント(EMQXクラスター)を作成し、ネットワークを構成する必要があります。

  • 専用デプロイメントユーザーの場合:まず、VPCピアリング接続を作成してください。ピアリング接続が確立された後、内部ネットワークのIPを介してプラットフォームコンソールにログインし、対象のコネクターにアクセスできます。あるいは、NATゲートウェイを設定して、パブリックIP経由で対象のコネクターにアクセスすることも可能です。
  • BYOC(Bring Your Own Cloud)デプロイメントユーザーの場合:BYOCがデプロイされているVPCと対象コネクターが存在するVPC間でピアリング接続を確立してください。ピアリング接続作成後、内部ネットワークのIPを介して対象コネクターにアクセスできます。パブリックIPアドレス経由でリソースにアクセスする必要がある場合は、パブリッククラウドコンソールでBYOCがデプロイされているVPCに対してNATゲートウェイを構成してください。

Apache IoTDB サーバーの起動

本セクションでは、Docker を使用して Apache IoTDB サーバーを起動する方法を紹介します。IoTDB の設定で enable_rest_service=true が有効になっていることを確認してください。

以下のコマンドを実行して、REST インターフェースが有効な Apache IoTDB サーバーを起動します:

bash
docker run -d --name iotdb-service \
              --hostname iotdb-service \
              -p 6667:6667 \
              -p 18080:18080 \
              -e enable_rest_service=true \
              -e cn_internal_address=iotdb-service \
              -e cn_target_config_node_list=iotdb-service:10710 \
              -e cn_internal_port=10710 \
              -e cn_consensus_port=10720 \
              -e dn_rpc_address=iotdb-service \
              -e dn_internal_address=iotdb-service \
              -e dn_target_config_node_list=iotdb-service:10710 \
              -e dn_mpp_data_exchange_port=10740 \
              -e dn_schema_region_consensus_port=10750 \
              -e dn_data_region_consensus_port=10760 \
              -e dn_rpc_port=6667 \
              apache/iotdb:1.1.0-standalone

詳細は Docker Hub の IoTDB 実行情報 をご参照ください。

コネクターの作成

データ統合ルールを作成する前に、Apache IoTDB サーバーにアクセスするためのコネクターを作成する必要があります。

  1. デプロイメントに移動し、左側のナビゲーションメニューから データ統合 をクリックします。
  2. 初めてコネクターを作成する場合は、データ永続化 カテゴリの中から Apache IoTDB を選択します。既にコネクターを作成済みの場合は、新しいコネクター を選択し、同じく Apache IoTDB を選択します。
  3. ドライバー ドロップダウンからドライバーを選択します。
    • REST API を選択した場合は、外部 IoTDB サービスの REST インターフェースのベース URL を IoTDB REST Service Base URL に入力してください。URL は Host:Port 形式である必要があります。
    • Thrift プロトコルを使用したい場合は、Thrift Protocol を選択し、Server Host フィールドに IoTDB Thrift サーバーのアドレスを入力します。
  4. Apache IoTDB サーバーにアクセスするためのユーザー名とパスワードを入力します。
  5. IoTDB バージョン ドロップダウンから接続したい IoTDB システムのバージョンを選択します。
  6. その他のオプションはデフォルトのままにします。詳細設定(任意)の構成については 詳細設定 を参照してください。
  7. テスト ボタンをクリックし、Apache IoTDB サービスにアクセス可能であれば成功メッセージが表示されます。
  8. 新規作成 ボタンをクリックして作成を完了します。

ルールの作成

次に、書き込むデータを指定し、処理済みデータを Apache IoTDB に転送するアクションをルールに追加します。

  1. ルールエリアで 新規ルール をクリックするか、作成したコネクターの アクション 列にある新規ルールアイコンをクリックします。

  2. SQL エディター にルールのマッチング SQL 文を入力します。以下のルールでは、メッセージが報告された時刻 up_timestamp、クライアント ID、temp_hum/emqx トピックのペイロードを読み取ります。また、このトピックから温度と湿度を取得します。

    sql
    SELECT
     payload.temp as temp,
     payload.hum as hum,
     payload.device_id as device_id
    FROM
     "temp_hum/emqx"

    TIP

    初心者の方は、SQL Examples をクリックし、Enable Test を有効にして SQL ルールの学習とテストを行うことをおすすめします。

  3. 次へ をクリックしてアクションを追加します。

  4. コネクター ドロップダウンから先ほど作成したコネクターを選択します。

  5. Device ID(任意):IoTDB インスタンスに時系列データを転送・挿入する際のデバイス名として使用する特定のデバイス ID を入力します。

    TIP

    空欄の場合でも、パブリッシュされたメッセージ内やルール内でデバイス ID を指定できます。例えば、JSON エンコードされたメッセージに device_id フィールドが含まれていれば、その値が出力デバイス ID となります。ルールエンジンでこの情報を抽出するには、以下のような SQL を使用できます:

    sql
    SELECT
      payload,
      `my_device` as payload.device_id

    ただし、このフィールドに固定で設定されたデバイス ID が優先されます。

  6. Align Timeseries:デフォルトで無効です。有効にすると、グループ化されたアラインド時系列のタイムスタンプ列が IoTDB に一度だけ保存され、グループ内の各時系列で重複して保存されることを防ぎます。詳細は Aligned timeseries を参照してください。

  7. Write Data の設定

この方法では、Write Data セクションでテンプレートを定義でき、必要なだけ多くの項目を含め、それぞれの行に必要なコンテキスト情報を指定します。このテンプレートを提供すると、システムは MQTT メッセージに適用して IoTDB データを生成します。書き込み用テンプレートは CSV ファイルによるバッチ設定もサポートしています。詳細は バッチ設定 を参照してください。

例えば、以下のテンプレートを考えます:

TimestampMeasurementData TypeValue
temptext${temp}
humtext${hum}

TIP

各列はプレースホルダー構文をサポートし、変数で埋めることができます。

Timestamp を省略すると、現在のシステム時刻(ミリ秒単位)で自動的に埋められます。

この場合、MQTT メッセージは以下のような構造になります:

json
{
  "temp": "27.5",
  "hum": "41.8"
}
  1. 詳細設定(任意):詳細設定 を参照してください。

  2. 確認 ボタンをクリックしてルール作成を完了します。

  3. 新規ルール成功 ポップアップで ルールに戻る をクリックし、データ統合設定の一連の流れを完了します。

バッチ設定

Apache IoTDB では、ダッシュボード上で数百件のデータを一括で書き込む設定は困難です。この問題を解決するため、EMQX はバッチ設定機能を提供しています。

Write Data 設定時に、CSV ファイルから挿入操作のフィールドをインポートするバッチ設定機能を利用できます。

  1. Write Data テーブルの バッチ設定 ボタンをクリックし、バッチ設定インポート ポップアップを開きます。

  2. 指示に従いバッチ設定テンプレートファイルをダウンロードし、テンプレートファイル内にデータ書き込み設定を記入します。デフォルトのテンプレート内容は以下の通りです:

    TimestampMeasurementData TypeValue備考(任意)
    nowtempFLOAT${payload.temp}フィールド、値、データ型は必須。データ型は BOOLEAN、INT32、INT64、FLOAT、DOUBLE、TEXT が利用可能
    nowhumFLOAT${payload.hum}
    nowstatusBOOLEAN${payload.status}
    nowclientidTEXT${clientid}
    • Timestamp${var} 形式のプレースホルダーをサポートし、タイムスタンプ形式である必要があります。以下の特殊文字も利用可能です:
      • now:現在のミリ秒タイムスタンプ
      • now_ms:現在のミリ秒タイムスタンプ
      • now_us:現在のマイクロ秒タイムスタンプ
      • now_ns:現在のナノ秒タイムスタンプ
    • Measurement:フィールド名。定数または ${var} 形式のプレースホルダーをサポート。
    • Data Type:データ型。BOOLEAN、INT32、INT64、FLOAT、DOUBLE、TEXT のいずれか。
    • Value:書き込むデータ値。定数または ${var} 形式のプレースホルダーをサポートし、データ型と一致する必要があります。
    • 備考:CSV ファイル内の注釈用で、EMQX プラットフォームへのインポートには使用されません。

    なお、1MB 以下の CSV ファイルかつ 2000 行以内のデータのみサポートされます。

  3. 記入したテンプレートファイルを保存し、バッチ設定インポート ポップアップにアップロードして インポート をクリックし、バッチ設定を完了します。

  4. インポート後、Write Data テーブル内でさらにデータを調整できます。

ルールのテスト

温湿度データの報告をシミュレートするために MQTTX の使用を推奨しますが、他のクライアントでも問題ありません。

  1. MQTTX を使ってデプロイメントに接続し、以下のトピックにメッセージを送信します。

    • トピック: temp_hum/emqx

    • ペイロード:

      json
      {
        "temp": "27.5",
        "hum": "41.8",
        "device_id": "root.sg27" // デバイス ID
      }
  2. メッセージが Apache IoTDB に転送されているか確認します。

    • IoTDB のコマンドラインインターフェースを使用してメッセージを確認します。上記のように Docker で起動している場合は、以下のコマンドでサーバーに接続可能です:

      bash
      $ docker exec -ti iotdb-service /iotdb/sbin/start-cli.sh -h iotdb-service
    • データをクエリします:

      bash
        IoTDB> select * from root.sg27
        +------------------------+-------------+--------------+
        |                    Time|root.sg27.hum|root.sg27.temp|
        +------------------------+-------------+--------------+
        |2024-03-25T08:45:19.541Z|         41.8|          27.5|
        +------------------------+-------------+--------------+
        Total line number = 1
        It costs 0.166s
  3. コンソールで運用データを確認します。ルール一覧でルール ID をクリックすると、ルールの統計情報およびそのルール配下のすべてのアクションの統計情報を確認できます。