TDengineへのMQTTデータ取り込み
TDengineは、IoTおよび産業用IoT(IIoT)シナリオ向けに設計・最適化されたビッグデータプラットフォームです。その中核には、高性能な時系列データベースがあり、クラスター指向のアーキテクチャ、クラウドネイティブ設計、ミニマリスティックなアプローチを特徴としています。EMQX CloudはTDengineとの連携をサポートしており、多数のデバイスやデータコレクターからの大量データの送信、保存、分析、配信を可能にします。これにより、ビジネス運用状態のリアルタイム監視や早期警告を実現し、リアルタイムのビジネスインサイトを提供します。
本ページでは、EMQX CloudとTDengineのデータ統合について包括的に紹介し、データ統合の作成および検証方法を実践的に解説します。
動作概要
TDengineデータ統合はEMQX Cloudに組み込まれた機能です。組み込みのルールエンジンコンポーネントを利用することで、EMQX CloudからTDengineへのデータ取り込みを簡素化し、複雑なコーディングを不要にします。EMQX Cloudはルールエンジンとアクションを介してデバイスデータをTDengineへ転送します。TDengineデータ統合により、MQTTメッセージやクライアントイベントをTDengineに保存可能です。さらに、TDengine内のデータ更新や削除をイベントでトリガーでき、デバイスのオンライン状態や過去のオンライン/オフラインイベントの記録も実現します。
以下の図は、産業用IoTにおけるEMQX CloudとTDengineの典型的なデータ統合アーキテクチャを示しています。

産業用エネルギー消費管理シナリオを例に、ワークフローは以下の通りです。
- メッセージのパブリッシュと受信:産業用デバイスはMQTTプロトコルを通じてEMQX Cloudに正常に接続し、定期的にエネルギー消費データをパブリッシュします。このデータには生産ライン識別子やエネルギー消費値が含まれます。EMQX Cloudがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
- ルールエンジンによるメッセージ処理:組み込みのルールエンジンはトピックマッチングに基づき特定のソースからのメッセージを処理します。メッセージが到着するとルールエンジンを通過し、対応するルールとマッチングしてメッセージデータを処理します。これにはデータ形式の変換、特定情報のフィルタリング、コンテキスト情報によるメッセージの付加などが含まれます。
- TDengineへのデータ取り込み:ルールエンジンで定義されたルールがトリガーとなり、メッセージをTDengineに書き込む操作が実行されます。TDengineアクションはSQLテンプレートを提供し、特定のメッセージフィールドを対応するテーブルやカラムに柔軟に書き込むデータ形式を定義可能です。
エネルギー消費データがTDengineに書き込まれた後は、標準SQLと強力な時系列拡張機能を用いてリアルタイムにデータ分析が可能となり、多数のサードパーティのバッチ分析、リアルタイム分析、レポーティングツール、AI/MLツール、可視化ツールとシームレスに連携できます。例えば:
- Grafanaなどの可視化ツールに接続し、エネルギー消費データのチャートを生成・表示。
- ERPやPower BIなどのアプリケーションシステムに接続し、生産分析や生産計画の調整を実施。
- ビジネスシステムに接続し、リアルタイムのエネルギー使用分析を行い、データ駆動型のエネルギー管理を促進。
特長と利点
TDengineデータ統合は以下の特長と利点をビジネスにもたらします。
- 効率的なデータ処理:EMQX Cloudは多数のIoTデバイス接続とメッセージスループットを効率的に処理可能です。TDengineはデータの書き込み、保存、クエリに優れており、IoTシナリオのデータ処理ニーズをシステムに過負荷をかけずに満たします。
- メッセージ変換:メッセージはEMQX Cloudのルール内で豊富な処理や変換を経てTDengineに書き込まれます。
- クラスターおよびスケーラビリティ:EMQX CloudとTDengineはクラスター機能をサポートし、クラウドネイティブアーキテクチャ上に構築されているため、クラウドプラットフォームの弾力的なストレージ、コンピューティング、ネットワークリソースを最大限に活用し、ビジネスの成長に応じた柔軟な水平スケーリングが可能です。
- 高度なクエリ機能:TDengineは時刻データの効率的なクエリと分析のために最適化された関数、演算子、インデックス技術を提供し、IoT時系列データから正確なインサイトを抽出できます。
はじめる前に
このセクションでは、TDengineデータ統合の作成を開始する前に必要な準備、TDengineサーバーのセットアップやデータテーブルの作成方法について説明します。
前提条件
ネットワーク設定
データ統合を構成する前に、EMQX Cloudのデプロイメントを作成し、EMQX Cloudと対象サービス間のネットワーク接続を確立していることを確認してください。
Dedicated Flexデプロイメントの場合:
EMQX CloudのVPCと対象サービスのVPC間でVPCピアリング接続を作成します。ピアリング接続が確立されると、EMQX Cloudは対象サービスのプライベートIPアドレスを介してアクセス可能になります。
パブリックIP経由でのアクセスが必要な場合は、NATゲートウェイを構成してアウトバウンド接続を有効にしてください。
BYOC(Bring Your Own Cloud)デプロイメントの場合:
BYOCデプロイメントが稼働しているVPCと対象サービスをホストするVPC間でVPCピアリング接続を作成します。ピアリングが確立されると、対象サービスのプライベートIPアドレスを介してアクセス可能になります。
対象サービスにパブリックIP経由でアクセスする必要がある場合は、クラウドプロバイダーのコンソールを使用してBYOC VPCにNATゲートウェイを構成してください。
TDengineのインストール
Dockerを使ってTDengineをインストールし、Dockerイメージを起動します。
# TDengineのDockerイメージを起動
docker run --name TDengine -p 6041:6041 tdengine/tdengine
# コンテナにアクセス
docker exec -it TDengine bash
# コンテナ内でTDengineサーバーを起動
taos
# データベースを作成し、選択
CREATE DATABASE mqtt;
use mqtt;TDengineでデータテーブルを作成
TDengine用のデータブリッジを作成する前に、メッセージ保存用のデータテーブルをTDengineデータベースに作成する必要があります。
以下のSQL文を使って、TDengineデータベースにデータテーブルt_mqtt_msgを作成します。このテーブルは各メッセージのクライアントID、トピック、ペイロード、作成時間を保存します。
CREATE TABLE t_mqtt_msg (
ts timestamp,
msgid NCHAR(64),
mqtt_topic NCHAR(255),
qos TINYINT,
payload BINARY(1024),
arrived timestamp
);コネクターの作成
データ統合ルールを作成する前に、TDengineサーバーにアクセスするためのTDengineコネクターを作成します。
デプロイメント画面に移動し、左側ナビゲーションメニューからデータ統合をクリックします。初めてコネクターを作成する場合は、データ永続化カテゴリの中からTDengineを選択します。すでにコネクターを作成済みの場合は、新規コネクターを選択し、同じくデータ永続化カテゴリの中からTDengineを選択します。
コネクター名はシステムが自動生成します。
接続情報を入力します:
- サーバーホスト:
http://127.0.0.1:6041(TDengineサーバーがリモートで稼働している場合は実際のURLを入力) - ユーザー名:
root - パスワード:
taosdata - 必要に応じて詳細設定を行います(任意)。
- サーバーホスト:
テストボタンをクリックします。TDengineサービスにアクセス可能であれば、コネクター利用可能のメッセージが表示されます。
新規作成ボタンをクリックして作成を完了します。
これで、このコネクターを基にデータブリッジルールを作成できます。
ルールの作成
このセクションでは、EMQX Cloudコンソールを使ってTDengineルールを作成し、ルールにアクションを追加する方法を説明します。
ルールエリアで新規ルールをクリックするか、先ほど作成したコネクターのアクション列にある新規ルールアイコンをクリックします。
利用する機能に基づき、SQLエディターでルールを設定します。ここでは、クライアントが
temp_hum/emqxトピックに温度と湿度のメッセージを送信した際にエンジンをトリガーするSQL例を示します。sqlSELECT *, now_timestamp('millisecond') as ts FROM "temp_hum/emqx"TIP
初心者の方は、SQL例をクリックし、試してみるでSQLルールを学習・テストできます。
次へをクリックしてアクションを追加します。
コネクターのドロップダウンから先ほど作成したコネクターを選択します。
データベース名に
mqttを入力します。アクションのSQLテンプレートを設定します。以下のSQLを使ってデータ挿入を行えます。CSVファイルによるバッチ設定もサポートしています。詳細はバッチ設定を参照してください。
sqlINSERT INTO t_mqtt_msg(ts, msgid, mqtt_topic, qos, payload, arrived) VALUES (${ts}, '${id}', '${topic}', ${qos}, '${payload}', ${timestamp})SQLテンプレート内のプレースホルダー変数が未定義の場合、SQLテンプレート上部の未定義変数をNULLとして扱うスイッチでルールエンジンの動作を切り替えられます。
無効(デフォルト):ルールエンジンは文字列
undefinedをデータベースに挿入します。有効:変数が未定義の場合、ルールエンジンは
NULLを挿入します。TIP
可能な限りこのオプションは有効にしてください。無効にするのは後方互換性を確保する場合のみです。
詳細設定を展開し、同期/非同期モード、キューやバッチ、その他パラメータを必要に応じて設定します(任意)。
確定ボタンをクリックしてルール作成を完了します。
新規ルール作成成功のポップアップでルールに戻るをクリックし、データ統合の設定チェーンを完了します。
バッチ設定
TDengineでは1つのデータエントリに数百のデータポイントを含むことがあり、SQL文の作成が困難になる場合があります。これに対応するため、EMQX CloudはSQLのバッチ設定機能を提供しています。
SQLテンプレート編集時に、バッチ設定機能を使ってCSVファイルから挿入操作用のフィールドをインポートできます。
SQLテンプレート下のバッチ設定ボタンをクリックし、バッチ設定インポートポップアップを開きます。
指示に従いバッチ設定テンプレートファイルをダウンロードし、テンプレート内のフィールドのキーと値のペアを記入します。テンプレートファイルのデフォルト内容は以下の通りです。
Field Value Char Value 備考(任意) ts now FALSE 例示備考 msgid ${id} TRUE mqtt_topic ${topic} TRUE qos ${qos} FALSE temp ${payload.temp} FALSE hum ${payload.hum} FALSE status ${payload.status} FALSE - Field:フィールドキー。定数または
${var}形式のプレースホルダーをサポート。 - Value:フィールド値。定数または
${var}形式のプレースホルダーをサポート。SQLでは文字型はクォートで囲む必要がありますが、テンプレートファイルではクォート不要です。文字型かどうかはChar Value列で指定します。 - Char Value:フィールドが文字型かどうかを指定し、インポート時にSQL生成時にクォートを付加します。文字型の場合は
TRUEまたは1、それ以外はFALSEまたは0を記入してください。 - 備考:CSVファイル内の注釈用で、EMQX Cloudにはインポートされません。
CSVファイルのバッチ設定データは2048行を超えないようにしてください。
- Field:フィールドキー。定数または
記入したテンプレートファイルを保存し、バッチ設定インポートポップアップにアップロード後、インポートをクリックしてバッチ設定を完了します。
インポート後、SQLテンプレートでテーブル名の設定やSQLコードの整形など、さらにSQLを調整できます。
ルールのテスト
温湿度データの報告をシミュレートするには、MQTTXの利用を推奨しますが、他のクライアントでも問題ありません。
MQTTXを使ってデプロイメントに接続し、以下のトピックにメッセージを送信します。
トピック:
temp_hum/emqxクライアントID:
test_clientペイロード:
json{ "temp": "27.5", "hum": "41.8" }
アクションの稼働状況を確認し、新規の受信メッセージ1件、新規の送信メッセージ1件があることを確認します。
t_mqtt_msgデータテーブルにデータが書き込まれているか確認します。
taos> select * from t_mqtt_msg;
ts | msgid | mqtt_topic | qos | payload | arrived |
==============================================================================================================================================================
2024-03-29 06:57:37.300 | 000614C727B230AE67180100069... | temp_hum/emqx | 1 | {
"temp": "27.5",
"hum"... | 2024-03-29 06:57:37.300 |
Query OK, 1 row(s) in set (0.002968s)