Skip to content

Apache IoTDBへのMQTTデータ取り込み

Apache IoTDB は、多種多様なIoTデバイスやシステムから生成される膨大な時系列データを処理するために設計された高性能かつスケーラブルな時系列データベースです。
EMQXはApache IoTDBとのデータ連携をサポートしており、軽量なMQTTプロトコルを介してデータをApache IoTDBのREST API V2にシームレスに転送できます。
このデータ連携は一方向のデータフローを保証し、EMQXの卓越したリアルタイムデータ取り込み能力とIoTDBの時系列データ保存およびクエリ性能を活用して、MQTTメッセージをIoTDBデータベースに書き込みます。
この強力な組み合わせは、IoTデータを効果的に管理したい企業にとって堅実な基盤となります。

本ページでは、EMQXとApache IoTDB間のデータ連携について、作成方法や検証方法を含めて包括的に紹介します。

動作概要

Apache IoTDBデータ連携は、EMQXに標準搭載された機能であり、生のMQTTベースの時系列データとIoTDBの強力なデータ保存機能を橋渡しします。
組み込みのルールエンジンコンポーネントにより、EMQXからIoTDBへのデータ取り込みを簡素化し、複雑なコーディングを不要にします。

以下の図は、EMQXとIoTDB間のデータ連携の典型的なアーキテクチャを示しています。

IoTDB_bridge_architecture

データ連携のワークフローは以下の通りです:

  1. メッセージのパブリッシュと受信:接続された車両、IIoTシステム、エネルギー管理プラットフォームなどのデバイスは、MQTTプロトコルを通じてEMQXに正常に接続し、運用状態や計測値、トリガーされたイベントに基づいてMQTTメッセージを送信します。EMQXがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理が開始されます。
  2. メッセージデータ処理:メッセージが到着するとルールエンジンを通過し、EMQXで定義されたルールにより処理されます。ルールは事前定義された条件に基づき、どのメッセージをIoTDBにルーティングするかを決定します。ペイロード変換が指定されている場合は、データ形式の変換、特定情報のフィルタリング、追加コンテキストによるペイロードの強化などが適用されます。
  3. データバッファリング:IoTDBが利用できない場合のデータ損失を防ぐため、EMQXはインメモリのメッセージバッファを提供します。データは一時的にバッファに保持され、メモリ過負荷を防ぐためにディスクにオフロードされることもあります。ただし、データ連携やEMQXノードの再起動時にはデータは保持されません。
  4. IoTDBへのデータ取り込み:ルールエンジンがIoTDB保存対象のメッセージを特定すると、メッセージをIoTDBへ転送するアクションがトリガーされます。処理済みデータは時系列データとしてIoTDBにシームレスに書き込まれます。
  5. データ保存と活用:データがIoTDBに保存された後、企業はそのクエリ機能を活用して様々なユースケースに対応できます。例えば、コネクテッドビークル分野では、車両の健康状態の把握、リアルタイムメトリクスに基づくルート最適化、資産追跡などに利用されます。IIoT環境では、機械の状態監視、メンテナンス予測、生産スケジュールの最適化などに活用可能です。

特長とメリット

IoTDBとのデータ連携は、効果的なデータ処理と保存を実現するための多彩な特長とメリットを提供します:

  • 効率的なデータ収集

    EMQXとIoTDBを連携させることで、リソースが限られたIoTデバイスから軽量なMQTTメッセージングプロトコルを通じてIoT時系列データを効率的に収集し、データベースに取り込むことができ、信頼性と効率性の高いデータ収集を実現します。

  • 柔軟なデータ変換

    EMQXは強力なSQLベースのルールエンジンを提供し、IoTDBに保存する前にデータを前処理できます。フィルタリング、ルーティング、集約、エンリッチメントなど多様なデータ変換機能をサポートし、組織のニーズに応じてデータを整形可能です。

  • スケーラビリティと高スループット

    EMQXは水平スケーラビリティを考慮して設計されており、増加するIoTデバイス群からのメッセージトラフィックを容易に処理します。拡大するデータ量に柔軟に対応し、高い同時アクセスをサポートします。その結果、IoT時系列ワークロードは、IoT展開が未曾有の規模に拡大しても、データ取り込み、保存、処理の増大する要求を難なく管理できます。

  • 最適化された時系列ストレージ

    IoTDBはタイムスタンプ付きデータの最適化された保存を提供します。時間パーティショニング、圧縮、データ保持ポリシーを活用し、大量の時系列データを効率的に保存・管理します。これにより、ストレージのフットプリントを最小限に抑えつつ高性能を維持し、大量の時系列データを生成するIoTワークロードに不可欠な性能を実現します。

  • 高速かつ複雑なクエリ処理

    IoTDBは豊富なクエリセマンティクスを持ち、デバイスやセンサー間の時系列データの時間整合、時系列フィールドの計算(周波数領域変換)、時間軸における豊富な集約関数をサポートします。さらにApache Hadoop、Spark、Flinkとの深い統合により、より強力な分析機能を提供します。EMQXはIoTDBとシームレスに統合し、MQTTデータの保存と分析の統一ソリューションを実現します。

はじめる前に

このセクションでは、EMQXダッシュボードでApache IoTDBデータ連携を作成する前に必要な準備について説明します。

前提条件

Apache IoTDBサーバーの起動

ここでは、Dockerを使ってApache IoTDBサーバーを起動する方法を紹介します。IoTDBの設定でenable_rest_service=trueが有効になっていることを確認してください。

以下のコマンドを実行して、RESTインターフェースを有効にしたApache IoTDBサーバーを起動します:

bash
docker run -d --name iotdb-service \
              --hostname iotdb-service \
              -p 6667:6667 \
              -p 18080:18080 \
              -e enable_rest_service=true \
              -e cn_internal_address=iotdb-service \
              -e cn_target_config_node_list=iotdb-service:10710 \
              -e cn_internal_port=10710 \
              -e cn_consensus_port=10720 \
              -e dn_rpc_address=iotdb-service \
              -e dn_internal_address=iotdb-service \
              -e dn_target_config_node_list=iotdb-service:10710 \
              -e dn_mpp_data_exchange_port=10740 \
              -e dn_schema_region_consensus_port=10750 \
              -e dn_data_region_consensus_port=10760 \
              -e dn_rpc_port=6667 \
              apache/iotdb:1.1.0-standalone

Docker HubのIoTDBのDocker実行に関する情報も参照してください。

コネクターの作成

Apache IoTDBデータ連携を作成するには、Apache IoTDB SinkとApache IoTDBサーバーを接続するコネクターを作成する必要があります。

EMQXはREST APIまたはThriftプロトコルを介したIoTDBとの通信をサポートしています。

  1. EMQXダッシュボードにアクセスし、Integrations -> Connectorsをクリックします。
  2. ページ右上のCreateをクリックします。
  3. Create Connectorページで、コネクタータイプとしてApache IoTDBを選択し、Nextをクリックします。
  4. コネクターの名前と説明を入力します。名前は大文字・小文字の英数字の組み合わせで構いません。例:my_iotdb
  5. Driverドロップダウンからドライバーを選択します。
    • REST APIを選択した場合、IoTDB REST Service Base URLhttp://localhost:18080を入力します。
    • Thriftプロトコルを使用する場合はThrift Protocolを選択し、Server HostにIoTDB Thriftサーバーのアドレスを入力します。
  6. コネクターがApache IoTDBサーバーにアクセスするためのユーザー名とパスワードを入力します。
  7. IoTDB Versionドロップダウンから接続したいIoTDBのバージョンを選択します。
  8. 他のオプションはデフォルトのままで構いません。Advanced Settings(任意)の設定は高度な設定を参照してください。
  9. Createをクリックする前に、Test ConnectivityをクリックしてコネクターがApache IoTDBに接続できるかテストできます。
  10. Createをクリックしてコネクター作成を完了します。ポップアップでBack to Connector Listをクリックするか、Create RuleをクリックしてルールとSinkの作成に進むことができます。詳細はルールとApache IoTDB Sinkの作成を参照してください。

Apache IoTDB Sinkを用いたルールの作成

このセクションでは、EMQXでルールを作成し、ソースMQTTトピックroot/#からのメッセージを処理し、処理結果を設定済みのApache IoTDB Sinkを通じてApache IoTDBに時系列データとして保存する方法を示します。

  1. EMQXダッシュボードで、Integration -> Rulesをクリックします。

  2. ページ右上のCreateをクリックします。

  3. ルールIDを入力します。例:my_rule

  4. SQLエディタに以下のステートメントを入力します。これはトピックパターンroot/#にマッチするMQTTメッセージを転送します:

    sql
    SELECT
      *
    FROM
      "root/#"

    TIP

    初心者の方は、SQL ExamplesEnable TestをクリックしてSQLルールの学習やテストが可能です。

  5. Add Actionボタンをクリックし、ルールでトリガーされるアクションを定義します。Type of ActionドロップダウンからApache IoTDBを選択します。これにより、EMQXはルールで処理したデータをApache IoTDBに送信します。

  6. ActionドロップダウンはCreate Actionのままにするか、既存のApache IoTDB Sinkを選択できます。ここでは新しいSinkを作成し、ルールに追加します。

  7. Sinkの名前と説明を入力します。

  8. Connectorドロップダウンから先ほど作成したmy_iotdbコネクターを選択します。隣のボタンから新規コネクター作成も可能です。設定パラメータはコネクターの作成を参照してください。

  9. Sinkの設定情報を以下のように構成します:

    • Device ID(任意):IoTDBインスタンスに時系列データを転送・挿入する際のデバイス名として使用する特定のデバイスIDを入力します。

      TIP

      空欄の場合、デバイスIDはパブリッシュされたメッセージ内やルール内で指定可能です。例えば、JSONエンコードされたメッセージにdevice_idフィールドがあれば、その値が出力デバイスIDとなります。ルールエンジンでこの情報を抽出するには、以下のようなSQLを使えます:

      sql
      SELECT
       payload,
       `my_device` as payload.device_id

      ただし、このフィールドで固定したデバイスIDが優先されます。

    • Align Timeseries:デフォルトは無効です。有効にすると、グループ化されたアラインド時系列のタイムスタンプ列がIoTDB内で一度だけ保存され、グループ内の各時系列で重複保存されません。詳細はAligned timeseriesを参照してください。
  10. Write Dataを設定し、MQTTメッセージからIoTDBデータを生成する方法を指定します。

    Write Dataセクションでテンプレートを定義でき、必要な項目を複数含めて各行に必要なコンテキスト情報を記述します。このテンプレートを用いてMQTTメッセージに適用し、IoTDBデータを生成します。テンプレートはCSVファイルによる一括設定もサポートします。詳細は一括設定を参照してください。

    例として以下のテンプレートを考えます:

    TimestampMeasurementData TypeValue
    indexINT32${index}
    temperatureFLOAT${temp}

    TIP

    TimestampValueはプレースホルダー構文をサポートし、変数で埋められます。

    Timestampが省略された場合は、現在のシステム時刻(ミリ秒単位)で自動的に埋められます。

    例えば、MQTTメッセージは以下のように構成できます:

    json
    {
      "index": "42",
      "temp": "32.67"
    }
  11. フォールバックアクション(任意):メッセージ配信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義できます。これらはプライマリSinkがメッセージ処理に失敗した際にトリガーされます。詳細はフォールバックアクションを参照してください。

  12. 高度な設定(任意)高度な設定を参照してください。

  13. Createをクリックする前に、Test ConnectivityをクリックしてSinkがApache IoTDBサーバーに接続できるかテストできます。

  14. CreateをクリックしてSink作成を完了します。Create Ruleページに戻ると、Action Outputsタブに新しいSinkが表示されます。

  15. Create Ruleページで設定内容を確認し、Createボタンをクリックしてルールを生成します。

これでルールが正常に作成され、Ruleページに新しいルールが表示されます。**Actions(Sink)**タブをクリックすると、新しいApache IoTDB Sinkが確認できます。

Integration -> Flow Designerをクリックするとトポロジーが表示され、トピックroot/#のメッセージがルールmy_ruleで解析されてApache IoTDBに転送されていることがわかります。

一括設定

Apache IoTDBでは、ダッシュボード上で数百件のデータを同時に書き込む設定は困難です。これに対応するため、EMQXはデータ書き込みの一括設定機能を提供しています。

Write Dataの設定時に、一括設定機能を使ってCSVファイルから挿入操作のフィールドをインポートできます。

  1. Write DataテーブルのBatch Settingボタンをクリックし、Import Batch Settingポップアップを開きます。

  2. 指示に従い、一括設定テンプレートファイルをダウンロードし、テンプレートにデータ書き込み設定を記入します。デフォルトのテンプレート内容は以下の通りです:

    TimestampMeasurementData TypeValue備考(任意)
    nowtempfloat${payload.temp}フィールド、値、データ型は必須。データ型はboolean, int32, int64, float, double, textが利用可能
    nowhumfloat${payload.hum}
    nowstatusboolean${payload.status}
    nowclientidtest${clientid}
    • Timestamp${var}形式のプレースホルダーをサポートし、タイムスタンプ形式が必要です。以下の特殊文字も使用可能です:
      • now:現在のミリ秒タイムスタンプ
      • now_ms:現在のミリ秒タイムスタンプ
      • now_us:現在のマイクロ秒タイムスタンプ
      • now_ns:現在のナノ秒タイムスタンプ
    • Measurement:フィールド名
    • Data Type:データ型(boolean, int32, int64, float, double, text)
    • Value:書き込むデータ値。定数または${var}形式のプレースホルダーをサポートし、データ型と一致する必要があります。
    • 備考:CSVファイル内のメモ用で、EMQXにはインポートされません。

    1MB以下、かつ2000行以内のCSVファイルのみサポートされます。

  3. 記入済みテンプレートファイルを保存し、Import Batch SettingポップアップにアップロードしてImportをクリックし、一括設定を完了します。

  4. インポート後、Write Dataテーブル内のデータをさらに調整可能です。

ルールのテスト

EMQXダッシュボード内蔵のWebSocketクライアントを使って、Apache IoTDB Sinkとルールの動作をテストできます。

  1. ダッシュボード左メニューのDiagnose -> WebSocket Clientをクリックします。

  2. 現在のEMQXインスタンスの接続情報を入力します。

    • ローカルでEMQXを実行している場合はデフォルト値を使用可能です。
    • 認証設定などを変更している場合は、ユーザー名とパスワードの入力が必要です。
  3. ConnectをクリックしてクライアントをEMQXインスタンスに接続します。

  4. ページ下部のパブリッシュエリアにスクロールし、メッセージ内にデバイスIDを指定して以下を入力します:

    • Topicroot/sg27

      TIP

      トピックがrootで始まらない場合、自動的にrootがプレフィックスされます。例えば、test/sg27にメッセージをパブリッシュすると、デバイス名はroot.test.sg27になります。ルールとトピックの設定が正しく行われ、該当トピックのメッセージがSinkに転送されるようにしてください。

    • Payload

      json
       {
        "value": "37.6",
        "device_id": "root.sg27"
       }

      TIP

      Write Dataテンプレートは以下の通りです:
      now, "temp", float, "${payload.value}"

    • QoS2

  5. Publishをクリックしてメッセージを送信します。

    Sinkとルールが正常に作成されていれば、メッセージは指定されたApache IoTDBサーバーの時系列テーブルにパブリッシュされているはずです。

  6. IoTDBのコマンドラインインターフェースでメッセージを確認します。上記のDocker環境を利用している場合、以下のコマンドでサーバーに接続可能です:

    shell
        $ docker exec -ti iotdb-service /iotdb/sbin/start-cli.sh -h iotdb-service
  7. コンソールで以下を入力します:

    sql
    IoTDB> select * from root.sg27

    以下のようにデータが表示されるはずです:

    +------------------------+--------------+
    |                    Time|root.sg27.temp|
    +------------------------+--------------+
    |2023-05-05T14:26:44.743Z|          37.6|
    +------------------------+--------------+

高度な設定

このセクションでは、コネクターのパフォーマンスを最適化し、特定のシナリオに応じて動作をカスタマイズするための高度な設定オプションを説明します。コネクター作成時にAdvanced Settingsを展開し、ビジネスニーズに応じて以下の設定を行えます。

項目説明推奨値
HTTP Pipeliningサーバーに対して応答を待たずに連続して送信可能なHTTPリクエスト数を指定します。正の整数値で最大リクエスト数を表します。
1の場合は従来のリクエスト-レスポンスモデルで、各リクエスト送信後に応答を待ちます。値を大きくすると複数リクエストをバッチ送信し、往復時間を削減しネットワークリソースを効率的に利用します。
100
Pool TypeEMQXとApache IoTDB間のコネクション管理・分配戦略を指定します。
randomは利用可能なコネクションプールからランダムに接続を選択し、シンプルで均等な分配を実現します。
hashはハッシュアルゴリズムを用いてリクエストを一貫して特定のコネクションにマッピングし、クライアントIDやトピック名に基づくロードバランシングなど決定的な分配が必要な場合に適します。
注意:適切なプールタイプはユースケースと分配特性に依存します。
random
Connection Pool SizeApache IoTDBサービスとの接続プールで維持可能な同時接続数を指定します。システムのスケーラビリティとパフォーマンス管理に役立ちます。
注意:適切な接続プールサイズはシステムリソース、ネットワークレイテンシ、アプリケーションのワークロードに依存します。大きすぎるとリソース枯渇、小さすぎるとスループット制限の可能性があります。
8
Connect TimeoutEMQXがApache IoTDB HTTPサーバーへの接続確立を試みる際の最大待機時間(秒)を指定します。
注意:適切なタイムアウト設定はシステム性能とリソース利用のバランスに重要です。様々なネットワーク条件でテストし最適値を見つけることを推奨します。
15
HTTP Request Max RetriesEMQXとApache IoTDB間の通信でHTTPリクエストが失敗した場合に再試行する最大回数を指定します。2
Start Timeoutコネクターが自動起動したリソースの正常状態到達を待つ最大時間(秒)を指定します。これにより、Apache IoTDBのデータベースインスタンスなど接続リソースが完全に稼働しデータ処理準備が整うまで操作を進めないようにします。5
Buffer Pool SizeEMQXとApache IoTDB間のイグレス型ブリッジでデータフロー管理に割り当てるバッファワーカープロセス数を指定します。これらのワーカーはデータ送信前の一時保存・処理を担当し、パフォーマンス最適化とスムーズなデータ伝送に寄与します。イングレス(インバウンド)専用のブリッジでは無効(0)に設定可能です。18
Request TTLバッファに入ったリクエストが有効とみなされる最大期間(秒)を指定します。バッファ投入時からカウントを開始し、この期間を超えるか、送信後にApache IoTDBからの応答やアックが得られない場合、リクエストは期限切れと判断されます。45
Health Check IntervalコネクターがApache IoTDB接続の自動ヘルスチェックを行う間隔(秒)を指定します。15
Max Buffer Queue SizeApache IoTDBデータ連携における各バッファワーカーがバッファリング可能な最大バイト数を指定します。バッファワーカーはデータ送信前の一時保存を担い、システム性能やデータ転送要件に応じて調整してください。265
Query Modeメッセージ送信要件に応じてasynchronousまたはsynchronousのクエリモードを選択可能です。非同期モードではIoTDBへの書き込みがMQTTメッセージのパブリッシュ処理をブロックしません。ただし、クライアントがIoTDB到着前にメッセージを受信する可能性があります。Async
Inflight Window「インフライトクエリ」とは、開始されたが応答やアックをまだ受け取っていないクエリを指します。コネクターがApache IoTDBと通信する際に同時に存在可能なインフライトクエリの最大数を制御します。
query_modeasyncの場合、このパラメータは特に重要です。同一MQTTクライアントからのメッセージを厳密に順序処理したい場合は、値を1に設定してください。
100

さらに詳しく

EMQXはApache IoTDBとのデータ連携に関する豊富な学習リソースを提供しています。以下のリンクから詳細を学べます:

ブログ:

Time-Series Database (TSDB) for IoT: The Missing Piece