TDengineへのMQTTデータ取り込み
TDengineは、IoTおよび産業用IoT(IIoT)シナリオ向けに設計・最適化されたビッグデータプラットフォームです。中核には、高性能な時系列データベースがあり、クラスター指向のアーキテクチャ、クラウドネイティブ設計、ミニマリスティックなアプローチが特徴です。EMQXプラットフォームはTDengineとの連携をサポートしており、多数のデバイスやデータコレクターからの大量データの送信、保存、分析、配信を可能にします。これにより、ビジネス運用状態のリアルタイム監視や早期警告を実現し、リアルタイムのビジネスインサイトを提供します。
本ページでは、EMQXプラットフォームとTDengine間のデータ統合について包括的に紹介し、データ統合の作成と検証に関する実践的な手順を説明します。
動作の仕組み
TDengineデータ統合はEMQXプラットフォームに組み込まれた機能です。組み込みのルールエンジンコンポーネントにより、EMQXプラットフォームからTDengineへのデータ取り込みが簡素化され、複雑なコーディングが不要になります。EMQXプラットフォームはルールエンジンとアクションを通じてデバイスデータをTDengineに転送します。TDengineデータ統合により、MQTTメッセージやクライアントイベントをTDengineに保存可能です。さらに、TDengine内のデータ更新や削除はイベントによってトリガーされ、デバイスのオンライン状態や過去のオンライン/オフラインイベントの記録も実現します。
以下の図は、産業用IoTにおけるEMQXプラットフォームとTDengineの典型的なデータ統合アーキテクチャを示しています。
産業用エネルギー消費管理シナリオを例に、ワークフローは次の通りです。
- メッセージのパブリッシュと受信:産業用デバイスはMQTTプロトコルを通じてEMQXプラットフォームに正常に接続し、定期的にエネルギー消費データをパブリッシュします。このデータには生産ライン識別子やエネルギー消費値が含まれます。EMQXプラットフォームがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
- ルールエンジンによるメッセージ処理:組み込みのルールエンジンはトピックマッチングに基づき特定のソースからのメッセージを処理します。メッセージが到着するとルールエンジンを通過し、対応するルールとマッチングしてメッセージデータを処理します。ここではデータ形式の変換、特定情報のフィルタリング、コンテキスト情報の付加などが可能です。
- TDengineへのデータ取り込み:ルールエンジンで定義されたルールがトリガーとなり、メッセージをTDengineに書き込む操作が実行されます。TDengineアクションはSQLテンプレートを提供し、特定のメッセージフィールドをTDengineの対応テーブルやカラムに柔軟に書き込むデータ形式を定義できます。
エネルギー消費データがTDengineに書き込まれた後は、標準SQLおよび強力な時系列拡張機能を用いてリアルタイムにデータ分析が可能です。多数のサードパーティ製バッチ分析、リアルタイム分析、レポーティングツール、AI/MLツール、可視化ツールとシームレスに連携できます。例えば:
- Grafanaなどの可視化ツールに接続し、エネルギー消費データのグラフ表示を生成。
- ERPやPower BIなどのアプリケーションシステムに接続し、生産分析や生産計画の調整を実施。
- ビジネスシステムに接続し、リアルタイムのエネルギー使用分析を行い、データ駆動型のエネルギー管理を支援。
特長とメリット
TDengineデータ統合は、ビジネスに以下の特長と利点をもたらします。
- 効率的なデータ処理:EMQXプラットフォームは多数のIoTデバイス接続とメッセージスループットを効率的に処理可能です。TDengineはデータ書き込み、保存、クエリに優れ、IoTシナリオのデータ処理要件をシステムに過負荷をかけずに満たします。
- メッセージ変換:メッセージはEMQXプラットフォームのルール内で豊富な処理・変換が可能であり、その後TDengineに書き込まれます。
- クラスターとスケーラビリティ:EMQXプラットフォームとTDengineはクラスター機能をサポートし、クラウドネイティブアーキテクチャ上に構築されています。これによりクラウドプラットフォームの弾力的なストレージ、計算、ネットワークリソースを最大限活用でき、ビジネスの成長に応じて柔軟な水平スケーリングが可能です。
- 高度なクエリ機能:TDengineはタイムスタンプデータの効率的なクエリと分析のために最適化された関数、演算子、インデックス技術を提供し、IoT時系列データから正確なインサイトを抽出できます。
はじめる前に
本セクションでは、TDengineデータ統合の作成を開始する前に必要な準備、TDengineサーバーのセットアップやデータテーブルの作成方法について説明します。
前提条件
ネットワーク設定
EMQX Platform上でデプロイメント(EMQXクラスター)を作成し、ネットワークを構成する必要があります。
- 専用デプロイメントユーザーの場合:まず、VPCピアリング接続を作成してください。ピアリング接続が確立された後、内部ネットワークのIPを介してプラットフォームコンソールにログインし、対象のコネクターにアクセスできます。あるいは、NATゲートウェイを設定して、パブリックIP経由で対象のコネクターにアクセスすることも可能です。
- BYOC(Bring Your Own Cloud)デプロイメントユーザーの場合:BYOCがデプロイされているVPCと対象コネクターが存在するVPC間でピアリング接続を確立してください。ピアリング接続作成後、内部ネットワークのIPを介して対象コネクターにアクセスできます。パブリックIPアドレス経由でリソースにアクセスする必要がある場合は、パブリッククラウドコンソールでBYOCがデプロイされているVPCに対してNATゲートウェイを構成してください。
TDengineのインストール
Dockerを使ってTDengineをインストールし、Dockerイメージを起動します。
# TDengineのDockerイメージを起動
docker run --name TDengine -p 6041:6041 tdengine/tdengine
# コンテナにアクセス
docker exec -it TDengine bash
# コンテナ内でTDengineサーバーを起動
taos
# データベースを作成し、選択
CREATE DATABASE mqtt;
use mqtt;
TDengineでのデータテーブル作成
TDengine用のデータブリッジを作成する前に、メッセージ保存用のデータテーブルをTDengineデータベースに作成する必要があります。
以下のSQL文を使い、TDengineデータベースにt_mqtt_msg
テーブルを作成します。このテーブルは各メッセージのクライアントID、トピック、ペイロード、作成時刻を保存します。
CREATE TABLE t_mqtt_msg (
ts timestamp,
msgid NCHAR(64),
mqtt_topic NCHAR(255),
qos TINYINT,
payload BINARY(1024),
arrived timestamp
);
コネクターの作成
データ統合ルールを作成する前に、TDengineサーバーにアクセスするためのTDengineコネクターを作成します。
デプロイメントに移動し、左側メニューからデータ統合をクリックします。初めてコネクターを作成する場合は、データ永続化カテゴリの下にあるTDengineを選択します。すでにコネクターを作成済みの場合は、新規コネクターを選択し、同じくデータ永続化カテゴリのTDengineを選択します。
コネクター名:システムが自動的にコネクター名を生成します。
接続情報を入力します:
- サーバーホスト:
http://127.0.0.1:6041
またはTDengineサーバーがリモートで稼働している場合は実際のURLを入力。 - ユーザー名:
root
- パスワード:
taosdata
- ビジネス要件に応じて詳細設定を構成(任意)
- サーバーホスト:
テストボタンをクリックします。TDengineサービスにアクセス可能であれば、コネクター利用可能のメッセージが返されます。
新規作成ボタンをクリックして作成を完了します。
これで、このコネクターを基にデータブリッジルールを作成できます。
ルールの作成
本セクションでは、EMQXプラットフォームコンソールを使ってTDengineルールを作成し、ルールにアクションを追加する方法を示します。
ルールエリアの新規ルールをクリックするか、先ほど作成したコネクターのアクション列にある新規ルールアイコンをクリックします。
利用したい機能に基づき、SQLエディターでルールを設定します。ここでは、クライアントが
temp_hum/emqx
トピックに温湿度メッセージを送信した際にエンジンをトリガーするSQLを作成します。以下のようなSQLが必要です。sqlSELECT *, now_timestamp('millisecond') as ts FROM "temp_hum/emqx"
TIP
初心者の方は、SQL例をクリックし、テスト有効化を使ってSQLルールの学習とテストを行うことをおすすめします。
次へをクリックしてアクションを追加します。
コネクターのドロップダウンから先ほど作成したコネクターを選択します。
データベース名に
mqtt
を入力します。アクションのSQLテンプレートを設定します。以下のSQLを使ってデータ挿入を完了できます。CSVファイルによるバッチ設定もサポートしています。詳細はバッチ設定を参照してください。
sqlINSERT INTO t_mqtt_msg(ts, msgid, mqtt_topic, qos, payload, arrived) VALUES (${ts}, '${id}', '${topic}', ${qos}, '${payload}', ${timestamp})
SQLテンプレート内でプレースホルダー変数が未定義の場合、SQLテンプレート上の未定義変数をNULLとして扱うスイッチを切り替えてルールエンジンの動作を定義できます。
無効(デフォルト):ルールエンジンは文字列
undefined
をデータベースに挿入します。有効:変数が未定義の場合、ルールエンジンは
NULL
をデータベースに挿入します。TIP
可能な限りこのオプションは常に有効にすることを推奨します。無効化は互換性維持のための例外的な用途のみです。
詳細設定を展開し、同期/非同期モード、キューやバッチ、その他パラメータを適宜設定します(任意)。
確定ボタンをクリックしてルール作成を完了します。
新規ルール作成成功のポップアップでルールに戻るをクリックし、データ統合設定の一連の流れを完了します。
バッチ設定
TDengineでは単一のデータエントリに数百のデータポイントを含むことがあり、SQL文の記述が困難になる場合があります。これに対応するため、EMQXプラットフォームはSQLのバッチ設定機能を提供しています。
SQLテンプレート編集時に、バッチ設定機能を使ってCSVファイルから挿入操作用のフィールドをインポートできます。
SQLテンプレート下のバッチ設定ボタンをクリックし、バッチ設定インポートポップアップを開きます。
指示に従い、バッチ設定テンプレートファイルをダウンロードし、テンプレート内のフィールドのキー・バリューを記入します。デフォルトのテンプレート内容は以下の通りです。
フィールド 値 文字列値 備考(任意) ts now FALSE 例の備考 msgid ${id} TRUE mqtt_topic ${topic} TRUE qos ${qos} FALSE temp ${payload.temp} FALSE hum ${payload.hum} FALSE status ${payload.status} FALSE - フィールド:フィールドキー。定数または
${var}
形式のプレースホルダーをサポート。 - 値:フィールド値。定数または
${var}
形式のプレースホルダーをサポート。SQLでは文字列型は引用符で囲む必要がありますが、テンプレートファイルでは不要です。文字列型かどうかは文字列値
列で指定します。 - 文字列値:フィールドが文字列型かどうかを指定し、インポート時にSQL生成時に引用符を付加します。文字列型の場合は
TRUE
または1
、そうでなければFALSE
または0
を記入。 - 備考:CSVファイル内の注釈用で、EMQXプラットフォームにはインポートされません。
CSVファイルのバッチ設定データは2048行を超えないようにしてください。
- フィールド:フィールドキー。定数または
記入したテンプレートファイルを保存し、バッチ設定インポートポップアップにアップロード後、インポートをクリックしてバッチ設定を完了します。
インポート後、SQLテンプレート内でテーブル名の設定やSQLコードの整形など、さらにSQLを調整可能です。
ルールのテスト
温湿度データの送信シミュレーションにはMQTTXの利用を推奨しますが、他の任意のクライアントでも構いません。
MQTTXを使い、デプロイメントに接続して以下のトピックにメッセージを送信します。
トピック:
temp_hum/emqx
クライアントID:
test_client
ペイロード:
json{ "temp": "27.5", "hum": "41.8" }
アクションの稼働状況を確認し、新規の受信メッセージ1件と送信メッセージ1件があることを確認します。
t_mqtt_msg
データテーブルにデータが書き込まれているか確認します。
taos> select * from t_mqtt_msg;
ts | msgid | mqtt_topic | qos | payload | arrived |
==============================================================================================================================================================
2024-03-29 06:57:37.300 | 000614C727B230AE67180100069... | temp_hum/emqx | 1 | {
"temp": "27.5",
"hum"... | 2024-03-29 06:57:37.300 |
Query OK, 1 row(s) in set (0.002968s)