TDengineへのMQTTデータ取り込み
TDengineは、IoTおよび産業用IoT(IIoT)シナリオ向けに設計・最適化されたビッグデータプラットフォームです。中核には高性能な時系列データベースがあり、クラスター指向のアーキテクチャ、クラウドネイティブ設計、ミニマリスティックなアプローチが特徴です。EMQXはTDengineとの統合をサポートしており、多数のデバイスやデータコレクターからの大量データの送信、保存、分析、配信を可能にします。これにより、ビジネス運用状態のリアルタイム監視や早期警告を実現し、リアルタイムなビジネスインサイトを提供します。
本ページでは、EMQXとTDengine間のデータ統合について包括的に紹介し、データ統合の作成および検証方法を実践的に解説します。
動作概要
TDengineデータ統合はEMQXの組み込み機能です。組み込みのルールエンジンコンポーネントを利用することで、EMQXからTDengineへのデータ取り込みが簡素化され、複雑なコーディングが不要になります。EMQXはルールエンジンとSinkを介してデバイスデータをTDengineに転送します。TDengineデータ統合により、MQTTメッセージやクライアントイベントをTDengineに保存可能です。さらに、TDengine内のデータ更新や削除はイベントによってトリガーされ、デバイスのオンライン状態や過去のオンライン/オフラインイベントの記録も可能になります。
以下の図は、産業用IoTにおけるEMQXとTDengineのデータ統合の典型的なアーキテクチャを示しています。

産業用エネルギー消費管理シナリオを例に、ワークフローは以下の通りです:
- メッセージのパブリッシュと受信:産業用デバイスはMQTTプロトコルを通じてEMQXに正常に接続し、定期的にエネルギー消費データをパブリッシュします。このデータには生産ライン識別子やエネルギー消費値が含まれます。EMQXがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
- ルールエンジンによるメッセージ処理:組み込みのルールエンジンはトピックマッチングに基づき特定のソースからのメッセージを処理します。メッセージが到着するとルールエンジンを通過し、対応するルールとマッチングしてメッセージデータを処理します。これにはデータ形式の変換、特定情報のフィルタリング、コンテキスト情報の付加などが含まれます。
- TDengineへのデータ取り込み:ルールエンジンで定義されたルールがトリガーとなり、メッセージをTDengineに書き込む操作を実行します。TDengine SinkはSQLテンプレートを提供し、特定のメッセージフィールドを対応するテーブルやカラムに柔軟に書き込む定義が可能です。
エネルギー消費データがTDengineに書き込まれた後は、標準SQLと強力な時系列拡張機能を使ってリアルタイムにデータ分析が可能です。多数のサードパーティのバッチ分析、リアルタイム分析、レポートツール、AI/MLツール、可視化ツールとシームレスに連携できます。例えば:
- Grafanaなどの可視化ツールに接続し、エネルギー消費データのグラフを生成・表示。
- ERPやPower BIなどのアプリケーションシステムに接続し、生産分析や生産計画の調整を実施。
- ビジネスシステムに接続し、リアルタイムのエネルギー使用分析を行い、データ駆動型のエネルギー管理を支援。
特長とメリット
TDengineデータ統合は以下の特長と利点をビジネスにもたらします:
- 効率的なデータ処理:EMQXは多数のIoTデバイス接続とメッセージスループットを効率的に処理可能です。TDengineはデータの書き込み、保存、クエリに優れており、IoTシナリオのデータ処理ニーズをシステムに負荷をかけずに満たします。
- メッセージ変換:メッセージはEMQXルール内で豊富な処理や変換を経てからTDengineに書き込まれます。
- クラスターとスケーラビリティ:EMQXとTDengineはクラスター機能をサポートし、クラウドネイティブアーキテクチャ上に構築されています。これによりクラウドプラットフォームの弾力的なストレージ、計算、ネットワーク資源を最大限活用でき、ビジネスの成長に応じた柔軟な水平スケーリングが可能です。
- 高度なクエリ機能:TDengineはタイムスタンプデータの効率的なクエリと分析のために最適化された関数、演算子、インデックス技術を提供し、IoT時系列データから正確なインサイトを抽出できます。
はじめる前に
このセクションでは、TDengineデータ統合の作成を開始する前に必要な準備、TDengineサーバーのセットアップやデータテーブルの作成方法について説明します。
前提条件
TDengineの起動とデータベース作成
TDengineを起動またはTDengineサービスに接続し、データベースを作成するには以下の2つの方法があります。
TDengineでのデータテーブル作成
メッセージ保存および状態記録用に、TDengineデータベース内に2つのデータテーブルを作成します。
- 以下のSQL文を使い、
t_mqtt_msgテーブルを作成します。このテーブルは各メッセージのクライアントID、トピック、ペイロード、作成時刻を保存します。
CREATE TABLE t_mqtt_msg (
ts timestamp,
msgid NCHAR(64),
mqtt_topic NCHAR(255),
qos TINYINT,
payload BINARY(1024),
arrived timestamp
);- 以下のSQL文を使い、
emqx_client_eventsテーブルを作成します。このテーブルは各イベントのクライアントID、イベント種別、作成時刻を保存します。
CREATE TABLE emqx_client_events (
ts timestamp,
clientid VARCHAR(255),
event VARCHAR(255)
);コネクターの作成
このセクションでは、SinkをTDengineサーバーに接続するためのコネクター作成方法を説明します。
EMQXダッシュボードに入り、Integration -> Connectorsをクリックします。
ページ右上のCreateをクリックします。
Create ConnectorページでTDengineを選択し、Nextをクリックします。
Configurationステップで、接続先に応じて以下の情報を設定します。
詳細設定(任意):詳細はSinkの機能を参照してください。
Createをクリックする前に、Test ConnectivityをクリックしてコネクターがTDengineサーバーに接続できるか確認できます。
ページ下部のCreateボタンをクリックしてコネクター作成を完了します。ポップアップダイアログでBack to Connector Listをクリックするか、Create RuleをクリックしてSinkを使ったルール作成を続行できます。詳細はメッセージ保存用のTDengine Sinkルール作成およびイベント記録用のTDengine Sinkルール作成を参照してください。
メッセージ保存用のTDengine Sinkルール作成
このセクションでは、ダッシュボードでMQTTトピックs/#からのメッセージを処理し、処理済みデータを設定済みSink経由でTDengineのt_mqtt_msgテーブルに保存するルールの作成方法を示します。
EMQXダッシュボードでIntegration -> Rulesをクリックします。
ページ右上のCreateをクリックします。
ルールIDに
my_ruleを入力し、SQL Editorでメッセージ保存用のルールを作成します。例えば、以下のSQL文を入力するとトピックt/#配下のMQTTメッセージがTDengineに保存されます。注意:独自のSQL文を指定する場合は、Sinkが必要とするすべてのフィールドを
SELECT句に含めてください。sqlSELECT *, now_timestamp('millisecond') as ts FROM "t/#"TIP
初心者の方はSQL ExamplesやEnable TestをクリックしてSQLルールを学習・テストしてください。
- Add Actionボタンをクリックし、ルールによってトリガーされるアクションを定義します。このアクションにより、EMQXはルールで処理したデータをTDengineに送信します。
Type of Actionドロップダウンから
TDengineを選択します。ActionはデフォルトのCreate Actionのままにします。既存のTDengine Sinkがあれば選択可能ですが、この例では新規Sinkを作成します。Sinkの名前を入力します。英数字の組み合わせで指定してください。
Connectorドロップダウンから先ほど作成した
my_tdengineを選択します。隣のボタンをクリックして新規コネクターを作成することも可能です。設定パラメータの詳細はコネクター作成を参照してください。SinkのSQL Templateを設定します。以下のSQLを使ってデータ挿入を完了できます。CSVファイルによるバッチ設定もサポートしています。詳細はバッチ設定を参照してください。
TIP
EMQX 5.1.1で破壊的変更がありました。5.1.1以前は文字列型の値が自動的にクォートされていましたが、5.1.1以降はユーザーが手動でクォートする必要があります。
sqlINSERT INTO t_mqtt_msg(ts, msgid, mqtt_topic, qos, payload, arrived) VALUES (${ts}, '${id}', '${topic}', ${qos}, '${payload}', ${timestamp})SQLテンプレート内でプレースホルダー変数が未定義の場合、SQL template上部のUndefined Vars as Nullスイッチでルールエンジンの動作を設定できます:
無効(デフォルト):ルールエンジンは文字列
undefinedをデータベースに挿入します。有効:変数が未定義の場合、ルールエンジンは
NULLを挿入します。TIP
可能な限りこのオプションは有効にしてください。無効にするのは後方互換性確保のためのみです。
フォールバックアクション(任意):メッセージ配信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義できます。詳細はフォールバックアクションを参照してください。
詳細設定(任意):必要に応じてsyncまたはasyncクエリモードを選択可能です。詳細はSinkの機能を参照してください。
Createをクリックする前に、Test ConnectivityでSinkがTDengineに接続できるかテストできます。
CreateボタンをクリックしてSink設定を完了します。新しいSinkがAction Outputsに追加されます。
Create Ruleページに戻り、設定内容を確認してCreateをクリックしルールを生成します。
これでTDengine Sink用のルールが正常に作成されました。Integration -> Rulesページで新規ルールを確認できます。**Actions(Sink)**タブをクリックすると新しいTDengine Sinkが表示されます。
また、Integration -> Flow Designerをクリックするとトポロジーが表示され、トピックt/#配下のメッセージがルールmy_ruleによって解析され、TDengineに送信・保存されていることが確認できます。
バッチ設定
TDengineでは1つのデータエントリに数百のデータポイントを含むことがあり、SQL文の作成が困難になる場合があります。この問題に対応するため、EMQXはSQLのバッチ設定機能を提供しています。
SQLテンプレート編集時に、バッチ設定機能を使ってCSVファイルから挿入操作用のフィールドをインポートできます。
SQL Template下のBatch Settingボタンをクリックし、Import Batch Settingポップアップを開きます。
指示に従いバッチ設定テンプレートファイルをダウンロードし、テンプレート内のフィールドのキー・値ペアを記入します。テンプレートのデフォルト内容は以下の通りです:
Field Value Char Value 備考(任意) ts now FALSE 例示 msgid ${id} TRUE mqtt_topic ${topic} TRUE qos ${qos} FALSE temp ${payload.temp} FALSE hum ${payload.hum} FALSE status ${payload.status} FALSE - Field:フィールドキー。定数または
${var}形式のプレースホルダーをサポート。 - Value:フィールド値。定数または
${var}形式のプレースホルダーをサポート。SQLでは文字列型はクォートが必要ですが、テンプレートファイル内では不要です。文字列型かどうかはChar Value列で指定します。 - Char Value:フィールドが文字列型かどうかを指定し、インポート時にSQL生成でクォートを付加します。文字列型なら
TRUEまたは1、それ以外はFALSEまたは0を記入。 - 備考:CSVファイル内の注釈用で、EMQXへのインポート対象外です。
CSVファイルのバッチ設定データは2048行を超えないようにしてください。
- Field:フィールドキー。定数または
記入済みテンプレートファイルを保存し、Import Batch SettingポップアップにアップロードしてImportをクリックしバッチ設定を完了します。
インポート後、SQL Template内のSQLをさらに調整可能です。テーブル名の設定やSQLコードの整形などが行えます。
イベント記録用のTDengine Sinkルール作成
このセクションでは、クライアントのオンライン/オフライン状態を記録し、イベントデータを設定済みSink経由でTDengineのemqx_client_eventsテーブルに保存するルール作成方法を示します。
ルール作成手順はメッセージ保存用のTDengine Sinkルール作成とほぼ同様ですが、SQLルール構文とSQLテンプレートが異なります。
オンライン/オフライン状態記録用のSQLルール構文は以下の通りです:
SELECT
*,
now_timestamp('millisecond') as ts
FROM
"$events/client_connected", "$events/client_disconnected"Sink用のSQLテンプレートは以下の通りです:
注意:フィールドはクォートで囲まず、SQL文の末尾にセミコロン(;)を付けないでください。
INSERT INTO emqx_client_events(ts, clientid, event) VALUES (
${ts},
'${clientid}',
'${event}'
)ルールのテスト
MQTTXを使ってトピックt/1にメッセージを送信し、オンライン/オフラインイベントをトリガーします。
mqttx pub -i emqx_c -t t/1 -m '{ "msg": "hello TDengine" }'2つのSinkの稼働状況を確認すると、新規の受信メッセージ1件、送信メッセージ1件、イベント記録2件があるはずです。
t_mqtt_msgデータテーブルにデータが書き込まれているか確認します。
taos> select * from t_mqtt_msg;
ts | msgid | mqtt_topic | qos | payload | arrived |
==============================================================================================================================================================
2023-02-13 06:10:53.787 | 0005F48EB5A83865F440000014F... | t/1 | 0 | { "msg": "hello TDengine" } | 2023-02-13 06:10:53.787 |
Query OK, 1 row(s) in set (0.002968s)emqx_client_eventsテーブル:
taos> select * from emqx_client_events;
ts | clientid | event |
============================================================================================
2023-02-13 06:10:53.777 | emqx_c | client.connected |
2023-02-13 06:10:53.791 | emqx_c | client.disconnected |
Query OK, 2 row(s) in set (0.002327s)
