Apache IoTDB に MQTT データを取り込む
Apache IoTDB は、多種多様な IoT デバイスやシステムから生成される大量の時系列データを効率的に処理するために設計された高性能かつスケーラブルな時系列データベースです。
EMQX Cloud は Apache IoTDB とのシームレスなデータ統合を提供し、EMQX Cloud でリアルタイムに受信した MQTT メッセージを REST API V2 を通じて IoTDB に転送できます。この統合は一方向のデータフローをサポートし、MQTT データを IoTDB に書き込むことで効率的な時系列データの保存と分析を実現します。
本ページでは、EMQX Cloud と Apache IoTDB の統合方法を紹介し、統合の作成および検証手順をステップバイステップで解説します。
動作概要
Apache IoTDB データ統合は EMQX Cloud の組み込み機能であり、追加のコーディングなしに MQTT ベースの時系列データを Apache IoTDB に取り込むことを可能にします。EMQX Cloud の組み込み ルールエンジン を活用することで、データのフィルタリング、変換、転送を簡素化し、IoTDB での効率的な保存とクエリを実現します。
以下の図は、EMQX Cloud と IoTDB 間の典型的なデータ統合アーキテクチャを示しています。

データ統合のワークフローは以下の通りです:
- メッセージのパブリッシュと受信:デバイスは MQTT を介して EMQX Cloud に接続し、テレメトリデータ、ステータス更新、イベント情報を含むメッセージをパブリッシュします。ルールエンジンが受信メッセージを評価します。
- ルールベースの処理:定義されたルールにマッチしたメッセージが選択され、必要に応じてフィールドのフィルタリング、データ形式の変換、ペイロードの拡充などの変換が適用されます。
- データのバッファリング:IoTDB が一時的に利用できない場合に備え、EMQX Cloud はメモリ内でメッセージをバッファリングします。必要に応じてメモリ圧迫を避けるためにディスクにオフロードすることも可能です。統合や EMQX ノードが再起動するとバッファデータは保持されません。
- IoTDB へのデータ取り込み:マッチしたルールに対して、EMQX Cloud は IoTDB アクションをトリガーし、処理済みデータを IoTDB に転送して時系列データとして書き込みます。
- データの保存と活用:IoTDB に保存されたデータは、デバイス監視、資産追跡、予知保全、運用最適化などの下流アプリケーションでクエリや分析に利用できます。
特長とメリット
IoTDB とのデータ統合は、効果的なデータ処理と保存を実現するために以下のような特長とメリットを提供します:
ノーコードの IoT データパイプライン
EMQX と Apache IoTDB 間で、組み込みのルールとシンクを使い、カスタムコードや外部サービスなしで完全な MQTT から時系列データへのパイプラインを構築できます。
MQTT から IoTDB モデルへの柔軟なマッピング
ツリーモデルとテーブルモデルの両方をサポートし、デバイスのモデリングやクエリ要件に合った構造で MQTT データを IoTDB に書き込めます。
取り込みと保存の分離
EMQX はバースト的で高頻度な MQTT トラフィックを吸収し、IoTDB は耐久性の高い時系列保存に専念することで、システムの安定性とレジリエンスを向上させます。
本番対応のスケーラビリティ
デバイス数やデータ量に応じて水平スケール可能で、大規模な IoT、IIoT、エネルギー分野のシナリオに適しています。
分析に適した時系列データ
IoTDB に書き込まれたデータは直接クエリ、集計、分析できるほか、ビッグデータエンジンと連携して高度な分析や長期的なインサイト取得も可能です。
はじめる前に
このセクションでは、EMQX Cloud で Apache IoTDB データ統合を作成するための準備作業を紹介します。
前提条件
ネットワークの設定
データ統合を構成する前に、EMQX Cloudのデプロイメントを作成し、EMQX Cloudと対象サービス間のネットワーク接続を確立していることを確認してください。
Dedicated Flexデプロイメントの場合:
EMQX CloudのVPCと対象サービスのVPC間でVPCピアリング接続を作成します。ピアリング接続が確立されると、EMQX Cloudは対象サービスのプライベートIPアドレスを介してアクセス可能になります。
パブリックIP経由でのアクセスが必要な場合は、NATゲートウェイを構成してアウトバウンド接続を有効にしてください。
BYOC(Bring Your Own Cloud)デプロイメントの場合:
BYOCデプロイメントが稼働しているVPCと対象サービスをホストするVPC間でVPCピアリング接続を作成します。ピアリングが確立されると、対象サービスのプライベートIPアドレスを介してアクセス可能になります。
対象サービスにパブリックIP経由でアクセスする必要がある場合は、クラウドプロバイダーのコンソールを使用してBYOC VPCにNATゲートウェイを構成してください。
Apache IoTDB サーバーの起動
ここでは Docker を使った Apache IoTDB サーバーの起動方法を紹介します。IoTDB の設定で enable_rest_service=true が有効になっていることを確認してください。
以下のコマンドを実行して、REST インターフェースが有効な Apache IoTDB サーバーを起動します:
docker run -d --name iotdb-service \
--hostname iotdb-service \
-p 6667:6667 \
-p 18080:18080 \
-e enable_rest_service=true \
-e cn_internal_address=iotdb-service \
-e cn_target_config_node_list=iotdb-service:10710 \
-e cn_internal_port=10710 \
-e cn_consensus_port=10720 \
-e dn_rpc_address=iotdb-service \
-e dn_internal_address=iotdb-service \
-e dn_target_config_node_list=iotdb-service:10710 \
-e dn_mpp_data_exchange_port=10740 \
-e dn_schema_region_consensus_port=10750 \
-e dn_data_region_consensus_port=10760 \
-e dn_rpc_port=6667 \
apache/iotdb:2.0.5-standalone詳細は Docker Hub の IoTDB 実行方法 を参照してください。
データベースの作成
IoTDB はツリーモデルとテーブルモデルの2つのデータモデルをサポートしています。データベース作成前に、コネクターおよびシンクで使用する SQL Dialect(Tree または Table)を確認し、それに応じたデータベースを作成してください。
- ツリーモデルの場合はデータベースのみ作成すればよいです。
- テーブルモデルの場合は、データベースを作成した後、データ取り込み用のテーブルを作成する必要があります。
詳細な手順は IoTDB ユーザーガイドをご参照ください:
IoTDB コネクターの作成
Apache IoTDB データ統合を作成するには、Apache IoTDB シンクを Apache IoTDB サーバーに接続するためのコネクターを作成する必要があります。
デプロイメントに移動し、左側のナビゲーションメニューから Data Integration をクリックします。
デプロイメントの Data Integration ページで Apache IoTDB を選択します。既に他のコネクターが存在する場合は、New Connector をクリックしてから Apache IoTDB を選択します。
コネクターの設定:
- Connector Name:コネクターの一意の名前を入力します。大文字・小文字の英数字の組み合わせを使用してください。例:
my_iotdb - Description:(任意)コネクターの簡単な説明
- Driver:IoTDB への接続に使用するプロトコルを選択します。
REST API:IoTDB REST サービスのエンドポイント(例:http://localhost:18080)を IoTDB REST Service Base URL に入力します。Thrift Protocol:IoTDB Thrift サーバーのアドレスを Server Host に入力します。
- SQL Dialect:EMQX がデバイスデータを IoTDB に書き込む際のデータモデルを選択します。
Tree Model:階層的な時系列パスとしてデータを書き込み、パスベースのデバイスや計測管理に適しています。Table Model:リレーショナルテーブルにデータを書き込み、デバイスタイプやカテゴリごとの管理に適しています。
- Database Name:SQL Dialect が
Table Modelの場合、接続するデータベース名を指定します。 - Username と Password:EMQX が Apache IoTDB サーバーに認証するための資格情報を入力します。
- IoTDB Version:Apache IoTDB のバージョンを選択します。
- Enable TLS:Apache IoTDB サーバーへの暗号化接続を確立する場合に有効にします。
- 任意のチューニング設定については、高度な設定 を参照してください。
- Connector Name:コネクターの一意の名前を入力します。大文字・小文字の英数字の組み合わせを使用してください。例:
(任意)Test Connectivity をクリックして、コネクターが Apache IoTDB サーバーに正常に接続できるか確認します。
New をクリックしてコネクターの作成を完了します。
表示されるダイアログで、Back to Connector List または Create Rule を選択して、ルールと Apache IoTDB シンクの設定を続けられます。詳細は Apache IoTDB シンク付きルールの作成 を参照してください。
Apache IoTDB シンク付きルールの作成
このセクションでは、EMQX Cloud でソース MQTT トピック temp_hum/emqx からメッセージを処理し、処理結果を設定済みの Apache IoTDB シンクを通じて Apache IoTDB に時系列データとして保存するルールの作成方法を示します。
ルールエリアの New Rule をクリックするか、作成したコネクターの Actions 列にある新規ルールアイコンをクリックします。
SQL editor にルールのマッチング SQL 文を入力します。以下のルールでは、メッセージの報告時刻
up_timestamp、クライアント ID、ペイロードをtemp_hum/emqxトピックから読み取ります。また、このトピックから温度と湿度を取得しています。sqlSELECT payload.temp as temp, payload.hum as hum, payload.device_id as device_id FROM "temp_hum/emqx"TIP
初心者の方は SQL Examples と Try It Out をクリックして、SQL ルールの学習とテストを行ってください。
処理結果を IoTDB に書き込むために、ルールに Apache IoTDB シンクを追加します。詳細は Apache IoTDB シンクの追加 を参照してください。
Confirm ボタンをクリックしてルール作成を完了します。
Successful new rule ポップアップで Back to Rules をクリックし、データ統合の設定チェーンを完了させます。
Apache IoTDB シンクの追加
New Rule ページで Next をクリックしてアクションを追加します。
Connector ドロップダウンから先ほど作成した
my_iotdbコネクターを選択します。シンクの設定を行います:
SQL Dialect:Apache IoTDB シンクが IoTDB にデータを書き込む方法を選択します。コネクターで選択した SQL Dialect と一致させる必要があります。
Tree Model:IoTDB の時系列パスとしてデータを書き込みます。各シンクレコードはデバイスパスに挿入され、計測はそのデバイス下の個別時系列として書き込まれます。このモデル選択時は Device ID フィールドを指定できます。Table Model:IoTDB のリレーショナルテーブルにデータを書き込みます。各シンクレコードは指定したテーブルの行として挿入され、フィールドはテーブルのカラムにマッピングされます。このモデル選択時は Table フィールドを指定する必要があります。
Device ID(任意):IoTDB インスタンスに転送・挿入する時系列データのデバイス名として使用する特定のデバイス ID を入力します。
TIP
空欄の場合でも、パブリッシュされたメッセージ内やルール内でデバイス ID を指定できます。例えば、JSON エンコードされたメッセージに
device_idフィールドが含まれていれば、その値が出力デバイス ID となります。ルールエンジンでこの情報を抽出するには、以下のような SQL を使用できます:sqlSELECT payload, `my_device` as payload.device_idただし、このフィールドで固定されたデバイス ID が優先されます。
Table:データを書き込む IoTDB テーブル名を指定します。
Align Timeseries:デフォルトでは無効です。有効にすると、グループ化されたアラインド時系列のタイムスタンプ列が IoTDB に一度だけ保存され、各時系列で重複して保存されることを防ぎます。
Write Data の設定で、MQTT メッセージから IoTDB データを生成する方法を指定します。
Write Data セクションでは、必要なだけ項目を含むテンプレートを定義でき、各行に必要なコンテキスト情報を指定します。このテンプレートを元に MQTT メッセージから IoTDB データが生成されます。データ書き込みテンプレートは CSV ファイルによる一括設定もサポートしています。詳細は 一括設定 を参照してください。
例えば、以下のテンプレートを考えます:
補足
Column Category は SQL Dialect で
Table Modelを選択した場合のみ表示されます。Column Category Timestamp Measurement Data Type Value field index INT32 ${index} temperature FLOAT ${temp} TimestampとValueはプレースホルダー構文をサポートし、変数で埋められます。Timestampが省略された場合は、現在のシステム時刻(ミリ秒単位)が自動的に設定されます。これに対応する MQTT メッセージ例:
json{ "index": "42", "temp": "32.67" }
高度な設定(任意):高度な設定 を参照してください。
(任意)Test Connectivity をクリックして、シンクが Apache IoTDB サーバーに接続できるかテストします。
一括設定
Apache IoTDB では、Cloud コンソール上で数百件のデータを同時に書き込む設定は困難な場合があります。これに対応するため、EMQX Cloud はデータ書き込みの一括設定機能を提供しています。
Write Data の設定時に、一括設定機能を使って CSV ファイルから挿入操作用のフィールドをインポートできます。
Write Data テーブルの Batch Setting ボタンをクリックし、Import Batch Setting ポップアップを開きます。
指示に従って一括設定テンプレートファイルをダウンロードし、テンプレートにデータ書き込み設定を記入します。デフォルトのテンプレート内容は以下の通りです:
補足
以下は
Table Model用のデフォルトテンプレートです。Tree Modelでは Column Category 列は表示されません。Column Category Timestamp Measurement Data Type Value 備考(任意) tag now clientid text ${clientid} field now temp float ${payload.temp} フィールド、値、データ型は必須。データ型は boolean, int32, int64, float, double, text が利用可能 attribute now hum text ${payload.hum} attribute now status text ${payload.status} - Column Category:カラムのデータモデル。
tag、field、attributeのいずれか。tagは文字列である必要があり、fieldまたはattributeが推奨されます。 - Timestamp:
${var}形式のプレースホルダーをサポートし、タイムスタンプ形式である必要があります。以下の特殊文字も使用可能です:- now:現在のミリ秒タイムスタンプ
- now_ms:現在のミリ秒タイムスタンプ
- now_us:現在のマイクロ秒タイムスタンプ
- now_ns:現在のナノ秒タイムスタンプ
- Measurement:フィールド名。定数または
${var}形式のプレースホルダーをサポート。 - Data Type:データ型。BOOLEAN、INT32、INT64、FLOAT、DOUBLE、TEXT から選択。
- Value:書き込むデータ値。定数または
${var}形式のプレースホルダーをサポートし、データ型と一致する必要があります。 - 備考:CSV ファイル内のメモ用で、EMQX Cloud にはインポートされません。
ファイルサイズは 1MB 以下、データ行数は 2000 行以下の CSV ファイルのみサポートされます。
- Column Category:カラムのデータモデル。
記入済みのテンプレートファイルを保存し、Import Batch Setting ポップアップにアップロードして Import をクリックし、一括設定を完了します。
インポート後、Write Data テーブルでさらにデータを調整できます。
ルールのテスト
温度・湿度データの報告をシミュレートするには MQTTX の使用を推奨しますが、他のクライアントでも構いません。
MQTTX を使ってデプロイメントに接続し、以下のトピックにメッセージを送信します。
トピック:
temp_hum/emqxペイロード:
json{ "temp": "27.5", "hum": "41.8", "device_id": "root.sg27" // デバイス ID }
メッセージが Apache IoTDB に転送されているか確認します。
IoTDB のコマンドラインインターフェースを使ってメッセージを確認します。上記のように Docker で起動している場合は、以下のコマンドでサーバーに接続できます:
bash$ docker exec -ti iotdb-service /iotdb/sbin/start-cli.sh -h iotdb-serviceデータをクエリします:
bashIoTDB> select * from root.sg27 +------------------------+-------------+--------------+ | Time|root.sg27.hum|root.sg27.temp| +------------------------+-------------+--------------+ |2024-03-25T08:45:19.541Z| 41.8| 27.5| +------------------------+-------------+--------------+ Total line number = 1 It costs 0.166s
コンソールで運用データを確認します。ルール一覧のルール ID をクリックすると、ルールの統計情報およびそのルール配下のすべてのアクションの統計が表示されます。