Apache IoTDB への MQTT データ取り込み
Apache IoTDBは、多種多様なIoTデバイスやシステムから生成される膨大な時系列データを効率的に処理するために設計された高性能かつスケーラブルな時系列データベースです。
EMQXはApache IoTDBとのシームレスなデータ統合を提供しており、EMQXで受信したリアルタイムのMQTTメッセージをREST API V2を通じてIoTDBに転送できます。この統合は一方向のデータフローをサポートし、MQTTデータをIoTDBに書き込むことで効率的な時系列の保存と分析を実現します。
本ページでは、EMQXとApache IoTDBの統合方法を紹介し、統合の作成および検証手順をステップバイステップで説明します。
動作概要
Apache IoTDBデータ統合はEMQXの組み込み機能であり、MQTTベースの時系列データを追加のコーディングなしでApache IoTDBに取り込むことを可能にします。EMQXの組み込みルールエンジンを活用することで、データのフィルタリング、変換、転送を簡素化し、IoTDBでの効率的な保存とクエリを実現します。
以下の図は、EMQXとIoTDB間の典型的なデータ統合アーキテクチャを示しています。

データ統合のワークフローは以下の通りです:
- メッセージのパブリッシュと受信:デバイスはMQTTでEMQXに接続し、テレメトリデータ、ステータス更新、イベント情報を含むメッセージをパブリッシュします。ルールエンジンが受信メッセージを評価します。
- ルールベースの処理:定義されたルールにマッチしたメッセージが選択され、必要に応じてフィールドのフィルタリング、データ形式の変換、ペイロードの強化などの変換が適用されます。
- データバッファリング:信頼性向上のため、IoTDBが一時的に利用不可の場合はEMQXがメッセージをメモリにバッファします。必要に応じてメモリ圧迫を避けるためにディスクにオフロードできます。統合またはEMQXノードが再起動するとバッファデータは保持されません。
- IoTDBへのデータ取り込み:マッチしたルールに対して、EMQXはIoTDBシンクをトリガーし、処理済みデータをIoTDBに時系列データとして書き込みます。
- データの保存と活用:IoTDBに保存されたデータは、デバイス監視、資産追跡、予知保全、運用最適化などの下流アプリケーションでクエリや分析に利用できます。
特長とメリット
IoTDBとのデータ統合は、効果的なデータ処理と保存を実現するために以下の特長とメリットを提供します:
ノーコードのIoTデータパイプライン
組み込みのルールとシンクを使い、カスタムコードや外部サービスなしでEMQXとApache IoTDB間の完全なMQTTから時系列データへのパイプラインを構築可能です。
MQTTからIoTDBモデルへの柔軟なマッピング
ツリーモデルとテーブルモデルの両方をサポートし、デバイスモデリングやクエリ要件に合わせた構造でMQTTデータをIoTDBに書き込めます。
取り込みと保存の分離
EMQXはバースト的かつ高頻度のMQTTトラフィックを吸収し、IoTDBは耐久性のある時系列保存に専念することで、システムの安定性とレジリエンスを向上させます。
本番対応のスケーラビリティ
デバイス数やデータ量に応じて水平スケール可能で、大規模なIoT、IIoT、エネルギー分野に適しています。
分析に適した時系列データ
IoTDBに書き込まれたデータは直接クエリ、集計、分析できるほか、ビッグデータエンジンと連携して高度な分析や長期的な洞察を得られます。
はじめる前に
このセクションでは、EMQXダッシュボードでApache IoTDBデータ統合を作成する前に完了すべき準備について説明します。
前提条件
Apache IoTDB サーバーの起動
ここではDockerを使ってApache IoTDBサーバーを起動する方法を紹介します。IoTDBの設定でenable_rest_service=trueが有効になっていることを確認してください。
以下のコマンドを実行して、RESTインターフェースが有効なApache IoTDBサーバーを起動します:
docker run -d --name iotdb-service \
--hostname iotdb-service \
-p 6667:6667 \
-p 18080:18080 \
-e enable_rest_service=true \
-e cn_internal_address=iotdb-service \
-e cn_target_config_node_list=iotdb-service:10710 \
-e cn_internal_port=10710 \
-e cn_consensus_port=10720 \
-e dn_rpc_address=iotdb-service \
-e dn_internal_address=iotdb-service \
-e dn_target_config_node_list=iotdb-service:10710 \
-e dn_mpp_data_exchange_port=10740 \
-e dn_schema_region_consensus_port=10750 \
-e dn_data_region_consensus_port=10760 \
-e dn_rpc_port=6667 \
apache/iotdb:2.0.5-standalone詳細はDocker HubのIoTDB実行方法をご参照ください。
データベースの作成
IoTDBはツリーモデルとテーブルモデルの2つのデータモデルをサポートしています。データベース作成前に、コネクターとシンクで使用するSQL方言(TreeまたはTable)を確認し、それに応じたデータベースを作成してください。
- ツリーモデルの場合はデータベースのみ作成すればよいです。
- テーブルモデルの場合は、まずデータベースを作成し、その後データ取り込み用のテーブルを作成する必要があります。
詳細な手順はIoTDBユーザーガイドをご参照ください:
IoTDBコネクターの作成
Apache IoTDBデータ統合を作成するには、Apache IoTDBシンクとApache IoTDBサーバーを接続するコネクターを作成する必要があります。
EMQXはREST APIまたはThriftプロトコルを通じてIoTDBと通信をサポートしています。
EMQXダッシュボードでIntegrations -> Connectorsに移動します。
右上のCreateをクリックします。
Create ConnectorページでApache IoTDBを選択します。
コネクターを設定します:
Connector Name:コネクターの一意な名前を入力します。大文字・小文字の英数字の組み合わせを使用してください。例:
my_iotdbDescription:(任意)コネクターの簡単な説明
Driver:IoTDBへの接続に使用するプロトコルを選択します。
REST API:IoTDB RESTサービスのエンドポイント(例:http://localhost:18080)をIoTDB REST Service Base URLに入力Thrift Protocol:IoTDB ThriftサーバーのアドレスをServer Hostに入力
SQL Dialect:EMQXがデバイスデータをIoTDBに書き込む際のIoTDBデータモデルを選択します。
Tree Model:階層的な時系列パスとしてデータを書き込み、パスベースのデバイス・測定管理に適しています。Table Model:リレーショナルテーブルにデータを書き込み、デバイス種別やカテゴリごとの管理に適しています。
Database Name:SQL Dialectが
Table Modelの場合、接続するデータベース名を指定する必要があります。UsernameおよびPassword:EMQXがApache IoTDBサーバーに認証するための資格情報を入力します。
IoTDB Version:Apache IoTDBのバージョンを選択します。
Enable TLS:有効にするとApache IoTDBサーバーへの暗号化接続を確立します。詳細は外部リソースアクセスのTLSをご参照ください。
任意のチューニングは高度な設定のAdvanced Settingsを参照してください。
(任意)Test Connectivityをクリックして、コネクターがApache IoTDBサーバーに正常に接続できるか検証します。
Createをクリックしてコネクター作成を完了します。
表示されるダイアログでBack to Connector ListまたはCreate Ruleを選択し、ルールとApache IoTDBシンクの設定を続けられます。詳細はルールとApache IoTDBシンクの作成をご覧ください。
Apache IoTDBシンクを用いたルールの作成
このセクションでは、EMQXでソースMQTTトピックroot/#からメッセージを処理し、処理結果を設定済みのApache IoTDBシンク経由でApache IoTDBに時系列データとして保存するルールの作成方法を示します。
SQLを定義したルールの作成
EMQXダッシュボードでIntegration -> Rulesに移動します。
ページ右上のCreateをクリックします。
ルールIDを入力します(例:
my_rule)。SQLエディターに以下の文を入力し、トピックパターン
root/#にマッチするMQTTメッセージを転送します:sqlSELECT * FROM "root/#"TIP
初心者の方はSQL ExamplesやEnable TestをクリックしてSQLルールの学習やテストが可能です。
処理結果をIoTDBに書き込むため、ルールにApache IoTDBシンクを追加します。詳細はApache IoTDBシンクの追加を参照してください。
Create Ruleページで設定内容を確認し、Saveをクリックしてルールを作成します。
ルール作成後、Rules一覧に表示されます。**Actions (Sink)**タブをクリックすると、このルールに関連付けられたIoTDBシンクを確認できます。
また、Integrations -> Flow Designerでトポロジーグラフを表示可能です。root/#トピックのメッセージがmy_ruleルールで処理されIoTDBに書き込まれる様子が確認できます。
Apache IoTDBシンクの追加
ルールの右側にあるAdd Actionボタンをクリックし、ルールマッチ時にトリガーされるアクションを定義します。このアクションは処理済みデータをIoTDBに転送します。
Type of Actionドロップダウンで
Apache IoTDBを選択し、ActionはデフォルトのCreate Actionのままにします。既存のIoTDBシンクを選択することも可能ですが、ここでは新規作成を想定しています。シンクの名前と説明を入力します。
Connectorドロップダウンで先ほど作成したコネクター
my_iotdbを選択します。利用可能なコネクターがない場合は隣のボタンから作成可能です。IoTDBコネクターの作成を参照してください。シンクの設定を行います:
SQL Dialect:Apache IoTDBシンクがIoTDBにデータを書き込む方法を選択します。コネクターで選択したSQL方言と一致させる必要があります。
Tree Model:IoTDBの時系列パスとしてデータを書き込みます。各シンクレコードはデバイスパスに挿入され、その下に測定値が個別の時系列として書き込まれます。このモデル選択時はDevice IDフィールドを指定可能です。Table Model:IoTDBのリレーショナルテーブルにデータを書き込みます。各シンクレコードは指定テーブルの行として挿入され、フィールドはテーブルのカラムにマッピングされます。このモデル選択時はTableフィールドの指定が必須です。
Device ID(任意):IoTDBインスタンスに時系列データを転送・挿入する際のデバイス名として使用する特定のデバイスIDを入力します。
TIP
空欄の場合でも、パブリッシュされたメッセージ内やルール内でデバイスIDを指定可能です。例えば、JSONエンコードされたメッセージに
device_idフィールドが含まれていれば、その値が出力デバイスIDになります。ルールエンジンでこの情報を抽出するには、以下のようなSQLを使用できます:sqlSELECT payload, `my_device` as payload.device_idただし、このフィールドで固定したデバイスIDが優先されます。
Table:データを書き込むIoTDBテーブル名を指定します。
Align Timeseries:デフォルトで無効。これを有効にすると、グループ化されたアラインド時系列のタイムスタンプ列がIoTDBに一度だけ保存され、グループ内の各時系列で重複保存されなくなります。詳細はAligned timeseriesを参照してください。
Write Dataを設定し、MQTTメッセージからIoTDBデータを生成する方法を指定します。
Write Dataセクションでは、必要な数だけ項目を含むテンプレートを定義可能で、各行に必要なコンテキスト情報を記述します。このテンプレートに基づき、MQTTメッセージからIoTDBデータが生成されます。書き込みテンプレートはCSVファイルによる一括設定もサポートしています。詳細はバッチ設定を参照してください。
例えば、以下のテンプレートを考えます:
注意
Column CategoryはSQL方言で
Table Modelを選択した場合のみ表示されます。Column Category Timestamp Measurement Data Type Value field index INT32 ${index} temperature FLOAT ${temp} TimestampとValueはプレースホルダー構文をサポートし、変数で埋められます。Timestampを省略すると、現在のシステム時刻(ミリ秒単位)で自動補完されます。その場合、MQTTメッセージは以下のように構成できます:
json{ "index": "42", "temp": "32.67" }
フォールバックアクション:(任意)メッセージ配信失敗時の信頼性向上のため、プライマリシンクが処理に失敗した場合にトリガーされる1つ以上のフォールバックアクションを定義できます。詳細はフォールバックアクションを参照してください。
高度な設定:(任意)高度な設定を参照してください。
(任意)Test Connectivityをクリックして、シンクがApache IoTDBサーバーに接続可能かテストします。
バッチ設定
Apache IoTDBでは、ダッシュボード上で数百件のデータを同時に書き込む設定は困難な場合があります。これを解決するため、EMQXは書き込みのバッチ設定機能を提供しています。
Write Dataの設定時に、CSVファイルから挿入操作用のフィールドを一括インポートできます。
Write Data テーブルの Batch Setting ボタンをクリックし、Import Batch Setting ポップアップを開きます。
指示に従いバッチ設定テンプレートファイルをダウンロードし、テンプレートにデータ書き込み設定を記入します。デフォルトテンプレートの内容は以下の通りです:
注意
以下は
Table Model用のデフォルトテンプレートです。Tree ModelではColumn Category列はありません。Column Category Timestamp Measurement Data Type Value 備考(任意) tag now clientid text ${clientid} field now temp float ${payload.temp} フィールド、値、データ型は必須。利用可能なデータ型はboolean, int32, int64, float, double, text attribute now hum text ${payload.hum} attribute now status text ${payload.status} - Column Category:カラムのデータモデル。
tag、field、attributeがサポートされます。tagは文字列でなければならず、fieldまたはattributeが推奨されます。 - Timestamp:
${var}形式のプレースホルダーをサポートし、タイムスタンプ形式が必要です。以下の特殊文字でシステム時刻を挿入可能です:- now:現在のミリ秒タイムスタンプ
- now_ms:現在のミリ秒タイムスタンプ
- now_us:現在のマイクロ秒タイムスタンプ
- now_ns:現在のナノ秒タイムスタンプ
- Measurement:フィールド名
- Data Type:データ型。boolean, int32, int64, float, double, textが選択可能
- Value:書き込むデータ値。定数または
${var}形式のプレースホルダーをサポートし、データ型と一致する必要があります。 - 備考:CSVファイル内の注釈用で、EMQXにはインポートされません。
ファイルサイズは1MB以下、行数は2000行以下のCSVファイルのみ対応しています。
- Column Category:カラムのデータモデル。
記入済みテンプレートファイルを保存し、Import Batch Settingポップアップにアップロード後、Importをクリックしてバッチ設定を完了します。
インポート後、Write Data テーブル内でさらにデータを調整できます。
ルールのテスト
EMQXダッシュボード内蔵のWebSocketクライアントを使って、Apache IoTDBシンクとルールの動作をテストできます。
ダッシュボード左メニューのDiagnose -> WebSocket Clientをクリックします。
現在のEMQXインスタンスの接続情報を入力します。
- ローカルでEMQXを実行している場合はデフォルト値を使用可能です。
- 認証設定などEMQXのデフォルト設定を変更している場合は、ユーザー名やパスワードの入力が必要です。
ConnectをクリックしてクライアントをEMQXに接続します。
下にスクロールしてパブリッシュエリアに移動し、メッセージ内にデバイスIDを指定して以下を入力します:
Topic:
root/sg27TIP
トピックが
rootで始まらない場合、自動的にroot.がプレフィックスされます。例えば、test/sg27にメッセージをパブリッシュすると、デバイス名はroot.test.sg27になります。ルールとトピックの設定が正しく、該当トピックのメッセージがシンクに転送されるようにしてください。Payload:
json{ "value": "37.6", "device_id": "root.sg27" }TIP
Write Dataテンプレートは以下の通りです:now, "temp", float, "${payload.value}"QoS:
2
Publishをクリックしてメッセージを送信します。
シンクとルールが正常に作成されていれば、メッセージは指定したApache IoTDBサーバーの時系列テーブルにパブリッシュされているはずです。
IoTDBのコマンドラインインターフェースを使ってメッセージを確認します。上記のDocker環境の場合、以下のコマンドでサーバーに接続できます:
shell$ docker exec -ti iotdb-service /iotdb/sbin/start-cli.sh -h iotdb-serviceコンソールで以下を入力します:
sqlIoTDB> select * from root.sg27以下のようにデータが表示されるはずです:
+------------------------+--------------+ | Time|root.sg27.temp| +------------------------+--------------+ |2023-05-05T14:26:44.743Z| 37.6| +------------------------+--------------+
高度な設定
このセクションでは、コネクターのパフォーマンスを最適化し、特定のシナリオに応じた動作をカスタマイズするための高度な設定オプションを説明します。コネクター作成時にAdvanced Settingsを展開し、ビジネスニーズに応じて以下の設定を行えます。
| 項目 | 説明 | 推奨値 |
|---|---|---|
| HTTP Pipelining | サーバーに対して連続してレスポンスを待たずに送信可能なHTTPリクエスト数を指定します。正の整数値で、1の場合は従来のリクエスト-レスポンスモデルとなり、各リクエスト送信後にレスポンスを待ちます。値を大きくすると複数リクエストをバッチ送信でき、ラウンドトリップ時間を削減しネットワークリソースを効率化します。 | 100 |
| Pool Type | EMQXとApache IoTDB間のコネクション管理・分配アルゴリズムを定義します。randomは利用可能な接続プールからランダムに接続を選択し、シンプルでバランスの取れた分配を実現します。hashはハッシュアルゴリズムを用いてリクエストを一貫して接続にマッピングし、クライアントIDやトピック名に基づくロードバランシングなど決定的な分配が必要な場合に適します。注意:適切なプールタイプはユースケースや求める分配特性によります。 | random |
| Connection Pool Size | Apache IoTDBサービスとの接続プールに保持可能な同時接続数を指定します。システムのスケーラビリティとパフォーマンス管理に役立ちます。 注意:適切なプールサイズはシステムリソース、ネットワークレイテンシ、ワークロードに依存します。大きすぎるとリソース枯渇、小さすぎるとスループット制限の原因となります。 | 8 |
| Connect Timeout | EMQXがApache IoTDB HTTPサーバーへの接続確立を試みる最大待機時間(秒)を指定します。 注意:適切なタイムアウト設定はシステムパフォーマンスとリソース利用のバランスに重要です。ネットワーク状況を考慮して最適値をテストしてください。 | 15 |
| HTTP Request Max Retries | EMQXとApache IoTDB間の通信でHTTPリクエストが失敗した場合の最大再試行回数を指定します。 | 2 |
| Start Timeout | 自動起動されたリソースが正常状態になるまで待機する最大時間(秒)を指定します。これにより、Apache IoTDBのデータベースインスタンスなど接続リソースが完全に稼働し、データ処理可能になるまで統合処理を進めないようにします。 | 5 |
| Buffer Pool Size | EMQXとApache IoTDB間のイグレス(送信)タイプのブリッジでデータフロー管理に割り当てるバッファワーカープロセス数を指定します。これらのワーカーはターゲットサービスに送信する前のデータを一時的に保持・処理します。イングレス(受信)専用のブリッジでは0に設定可能です。 | 18 |
| Request TTL | リクエストがバッファに入ってから有効とみなされる最大時間(秒)を指定します。この時間を超えてバッファ内にあるか、送信後にApache IoTDBから適時のレスポンスやアックが得られない場合、リクエストは期限切れとみなされます。 | 45 |
| Health Check Interval | Apache IoTDBへの接続の自動ヘルスチェックを行う間隔(秒)を指定します。 | 15 |
| Max Buffer Queue Size | Apache IoTDBデータ統合における各バッファワーカーがバッファ可能な最大バイト数を指定します。バッファワーカーはデータ送信前の一時保管を担い、システム性能やデータ転送要件に応じて調整してください。 | 265 |
| Query Mode | メッセージ送信要件に応じてasynchronousまたはsynchronousのクエリモードを選択可能です。非同期モードではIoTDBへの書き込みがMQTTメッセージのパブリッシュ処理をブロックしませんが、クライアントがIoTDB到着前にメッセージを受信する可能性があります。 | Async |
| Inflight Window | 「インフライトクエリ」とは開始されたがレスポンスやアックをまだ受け取っていないクエリを指します。ConnectorがApache IoTDBと通信する際に同時に存在可能なインフライトクエリの最大数を制御します。query_modeがasyncの場合、このパラメータは特に重要です。同一MQTTクライアントからのメッセージを厳密な順序で処理する必要がある場合は、この値を1に設定してください。 | 100 |
参考情報
EMQXはApache IoTDBとのデータ統合に関する豊富な学習リソースを提供しています。以下のリンクから詳細をご覧ください:
ブログ: