Skip to content

TDengineへのMQTTデータ取り込み

TDengineは、IoT(モノのインターネット)および産業用IoT(IIoT)シナリオに特化して設計・最適化されたビッグデータプラットフォームです。中核には高性能な時系列データベースがあり、クラスター指向のアーキテクチャ、クラウドネイティブ設計、ミニマリスティックなアプローチが特徴です。EMQXはTDengineとの統合をサポートしており、多数のデバイスやデータコレクターからの大量データの送信、保存、分析、配信を可能にします。これにより、ビジネス運用状態のリアルタイム監視や早期警告が実現し、リアルタイムのビジネスインサイトを提供します。

本ページでは、EMQXとTDengine間のデータ統合について包括的に紹介し、実際の作成および検証手順を解説します。

動作概要

TDengineデータ統合はEMQXに組み込まれた機能です。組み込みのルールエンジンコンポーネントを利用することで、EMQXからTDengineへのデータ取り込みが簡素化され、複雑なコーディングを不要にします。EMQXはルールエンジンとSinkを通じてデバイスデータをTDengineに転送します。TDengineデータ統合により、MQTTメッセージやクライアントイベントをTDengineに保存可能です。さらに、TDengine内のデータ更新や削除をイベントでトリガーでき、デバイスのオンライン状態や過去のオンライン/オフラインイベントの記録が可能になります。

以下の図は、産業用IoTにおけるEMQXとTDengineの典型的なデータ統合アーキテクチャを示しています。

EMQX Integration TDengine

産業用エネルギー消費管理シナリオを例に、ワークフローは以下の通りです:

  1. メッセージのパブリッシュと受信:産業用デバイスはMQTTプロトコルでEMQXに正常に接続し、定期的にエネルギー消費データをパブリッシュします。このデータには生産ライン識別子やエネルギー消費値が含まれます。EMQXがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
  2. ルールエンジンによるメッセージ処理:組み込みのルールエンジンは、トピックマッチングに基づき特定のソースからのメッセージを処理します。メッセージが到着するとルールエンジンを通過し、対応するルールとマッチングしてメッセージデータを処理します。これにはデータ形式の変換、特定情報のフィルタリング、コンテキスト情報の付加などが含まれます。
  3. TDengineへのデータ取り込み:ルールエンジンで定義されたルールがメッセージをTDengineに書き込む操作をトリガーします。TDengine SinkはSQLテンプレートを提供し、特定のメッセージフィールドをTDengineの対応テーブルおよびカラムに柔軟に書き込むデータ形式を定義可能です。

エネルギー消費データがTDengineに書き込まれた後は、標準SQLと強力な時系列拡張機能を用いてリアルタイムにデータ分析が可能となり、多数のサードパーティのバッチ分析、リアルタイム分析、レポートツール、AI/MLツール、可視化ツールとシームレスに連携できます。例えば:

  • Grafanaなどの可視化ツールに接続し、エネルギー消費データのチャート表示を行う。
  • ERPやPower BIなどのアプリケーションシステムに接続し、生産分析や生産計画の調整を行う。
  • ビジネスシステムに接続し、リアルタイムのエネルギー使用分析を実施してデータ駆動のエネルギー管理を促進する。

特長と利点

TDengineデータ統合は以下の特長と利点をビジネスにもたらします:

  • 効率的なデータ処理:EMQXは多数のIoTデバイス接続とメッセージスループットを効率的に処理可能です。TDengineはデータ書き込み、保存、クエリに優れ、IoTシナリオのデータ処理ニーズをシステム負荷を抑えつつ満たします。
  • メッセージ変換:メッセージはEMQXルール内で豊富な処理や変換を経てからTDengineに書き込まれます。
  • クラスターおよびスケーラビリティ:EMQXとTDengineはクラスター機能をサポートし、クラウドネイティブアーキテクチャ上に構築されています。クラウドプラットフォームの弾力的なストレージ、コンピューティング、ネットワーク資源をフル活用し、ビジネスの成長に合わせた柔軟な水平スケーリングが可能です。
  • 高度なクエリ機能:TDengineはタイムスタンプデータの効率的なクエリおよび分析のために最適化された関数、演算子、インデックス技術を提供し、IoT時系列データから正確なインサイトを抽出できます。

はじめる前に

本節では、TDengineデータ統合の作成を始める前に必要な準備、TDengineサーバーのセットアップやデータテーブルの作成方法について説明します。

前提条件

TDengineの起動およびデータベース作成

TDengineを起動またはTDengineサービスに接続し、データベースを作成するには以下の2つの方法があります。

TDengineでデータテーブルを作成

メッセージ保存と状態記録のために、TDengineデータベース内に2つのデータテーブルを作成します。

  1. 以下のSQL文で、クライアントID、トピック、ペイロード、作成時間を保存するデータテーブル t_mqtt_msg を作成します。
sql
   CREATE TABLE t_mqtt_msg (
       ts timestamp,
       msgid NCHAR(64),
       mqtt_topic NCHAR(255),
       qos TINYINT,
       payload BINARY(1024),
       arrived timestamp
     );
  1. 以下のSQL文で、クライアントID、イベントタイプ、作成時間を保存するデータテーブル emqx_client_events を作成します。
sql
     CREATE TABLE emqx_client_events (
         ts timestamp,
         clientid VARCHAR(255),
         event VARCHAR(255)
       );

コネクターの作成

本節では、SinkをTDengineサーバーに接続するためのコネクター作成方法を示します。

  1. EMQXダッシュボードに入り、Integration -> Connectorsをクリックします。

  2. 画面右上のCreateをクリックします。

  3. Create ConnectorページでTDengineを選択し、Nextをクリックします。

  4. Configurationステップで、接続先に応じて以下の情報を設定します。

  5. 詳細設定(任意):詳細はSinkの機能を参照してください。

  6. Createをクリックする前に、Test ConnectivityをクリックしてコネクターがTDengineサーバーに接続可能かテストできます。

  7. 画面下のCreateボタンをクリックしてコネクター作成を完了します。ポップアップダイアログでBack to Connector Listをクリックするか、Create RuleをクリックしてSinkを使ったルール作成に進めます。詳細はメッセージ保存用のTDengine Sinkを使ったルール作成およびイベント記録用のTDengine Sinkを使ったルール作成を参照してください。

メッセージ保存用のTDengine Sinkを使ったルール作成

本節では、ダッシュボードでMQTTトピック t/# からのメッセージを処理し、処理済みデータを設定済みSink経由でTDengineのデータテーブル t_mqtt_msg に保存するルールの作成方法を示します。

  1. EMQXダッシュボードに入り、Integration -> Rulesをクリックします。

  2. 画面右上のCreateをクリックします。

  3. ルールIDに my_rule と入力し、SQL Editorでメッセージ保存用のルールを作成します。例えば以下のステートメントはトピック t/# 配下のMQTTメッセージをTDengineに保存します。

    注意:独自のSQL構文を指定する場合は、Sinkが必要とするすべてのフィールドをSELECT句に含めてください。

    sql
      SELECT
        *,
        now_timestamp('millisecond')  as ts
      FROM
        "t/#"

    TIP

    初心者の方はSQL ExamplesEnable TestをクリックしてSQLルールの学習やテストを行うことを推奨します。

    • Add Actionボタンをクリックして、ルール発動時にトリガーされるアクションを定義します。このアクションによりEMQXはルールで処理したデータをTDengineに送信します。
  4. Type of ActionドロップダウンリストからTDengineを選択します。ActionはデフォルトのCreate Actionのままにします。既に作成済みのTDengine Sinkを選択することも可能ですが、本デモでは新規Sinkを作成します。

  5. Sinkの名前を入力します。英数字の組み合わせで指定してください。

  6. Connectorドロップダウンから先ほど作成したmy_tdengineを選択します。隣のボタンから新規コネクター作成も可能です。設定パラメータはコネクターの作成を参照してください。

  7. SinkのSQL Templateを設定します。以下のSQLを使ってデータ挿入を完了できます。CSVファイルによるバッチ設定もサポートしています。詳細はバッチ設定を参照してください。

    TIP

    EMQX 5.1.1で破壊的変更があります。それ以前のバージョンでは文字列型の値は自動的に引用符で囲まれていましたが、5.1.1以降はユーザーが手動で引用符を付ける必要があります。

    sql
    INSERT INTO t_mqtt_msg(ts, msgid, mqtt_topic, qos, payload, arrived) 
        VALUES (${ts}, '${id}', '${topic}', ${qos}, '${payload}', ${timestamp})

    SQLテンプレート内でプレースホルダー変数が未定義の場合、SQL template上部のUndefined Vars as Nullスイッチでルールエンジンの挙動を切り替えられます:

    • Disabled(デフォルト):ルールエンジンは文字列undefinedをデータベースに挿入します。

    • Enabled:変数が未定義の場合、ルールエンジンはNULLを挿入します。

      TIP

      可能な限りこのオプションは有効にしてください。無効化は後方互換性確保のためのみ推奨されます。

  8. フォールバックアクション(任意):メッセージ配信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義可能です。詳細はフォールバックアクションを参照してください。

  9. 詳細設定(任意):必要に応じてsyncまたはasyncクエリモードを選択します。詳細はSinkの機能を参照してください。

  10. Createをクリックする前に、Test ConnectivityをクリックしてSinkがTDengineに接続可能かテストできます。

  11. CreateボタンをクリックしてSink設定を完了します。新しいSinkがAction Outputsに追加されます。

  12. Create Ruleページに戻り、設定内容を確認してCreateをクリックしルールを生成します。

これでTDengine Sink用のルールが正常に作成されました。Integration -> Rulesページで新規ルールを確認できます。**Actions(Sink)**タブをクリックすると新しいTDengine Sinkが表示されます。

また、Integration -> Flow Designerをクリックしてトポロジーを確認すると、トピック t/# 配下のメッセージがルールmy_ruleで解析され、TDengineに送信・保存されていることがわかります。

バッチ設定

TDengineでは1つのデータエントリーに数百のデータポイントを含むことがあり、SQL文の記述が複雑になる場合があります。この課題を解決するため、EMQXはSQLのバッチ設定機能を提供しています。

SQLテンプレート編集時に、バッチ設定機能を使ってCSVファイルから挿入操作用のフィールドをインポート可能です。

  1. SQL Template下のBatch Settingボタンをクリックし、Import Batch Settingポップアップを開きます。

  2. 指示に従いバッチ設定テンプレートファイルをダウンロードし、テンプレート内のフィールドのキーと値のペアを入力します。テンプレートファイルのデフォルト内容は以下の通りです:

    FieldValueChar Value備考(任意)
    tsnowFALSE
    msgid${id}TRUE
    mqtt_topic${topic}TRUE
    qos${qos}FALSE
    temp${payload.temp}FALSE
    hum${payload.hum}FALSE
    status${payload.status}FALSE
    • Field:フィールドキー。定数または${var}形式のプレースホルダーをサポート。
    • Value:フィールド値。定数または${var}形式のプレースホルダーをサポート。SQLでは文字列型は引用符で囲む必要がありますが、テンプレートファイルでは不要です。文字列型かどうかはChar Value列で指定します。
    • Char Value:フィールドが文字列型かどうかを指定し、インポート時にSQL生成時に引用符を付加します。文字列型ならTRUEまたは1、そうでなければFALSEまたは0を記入します。
    • 備考:CSVファイル内のメモ用で、EMQXへのインポート対象ではありません。

    CSVファイルのデータ行数は2048行を超えないようにしてください。

  3. 入力済みテンプレートファイルを保存し、Import Batch SettingポップアップにアップロードしてImportをクリックしバッチ設定を完了します。

  4. インポート後、SQL Template内でテーブル名設定やSQLコードのフォーマットなどをさらに調整可能です。

イベント記録用のTDengine Sinkを使ったルール作成

本節では、クライアントのオンライン/オフライン状態を記録し、イベントデータを設定済みSink経由でTDengineテーブル emqx_client_events に保存するルール作成方法を示します。

ルール作成手順はメッセージ保存用のTDengine Sinkを使ったルール作成とほぼ同様ですが、SQLルール文とSQLテンプレートが異なります。

オンライン/オフライン状態記録用のSQLルール文は以下の通りです:

sql
SELECT
      *,
      now_timestamp('millisecond')  as ts
    FROM 
      "$events/client_connected", "$events/client_disconnected"

Sink用のSQLテンプレートは以下の通りです:

注意:フィールドは引用符で囲まず、SQL文の末尾にセミコロン(;)を付けないでください。

sql
INSERT INTO emqx_client_events(ts, clientid, event) VALUES (
      ${ts},
      '${clientid}',
      '${event}'
    )

ルールのテスト

MQTTXを使ってトピック t/1 にメッセージを送信し、オンライン/オフラインイベントをトリガーします。

bash
mqttx pub -i emqx_c -t t/1 -m '{ "msg": "hello TDengine" }'

2つのSinkの稼働状況を確認すると、新規の受信メッセージ1件、送信メッセージ1件、イベントレコード2件があるはずです。

t_mqtt_msgデータテーブルにデータが書き込まれているか確認します。

bash
taos> select * from t_mqtt_msg;
           ts            |             msgid              |           mqtt_topic           | qos  |            payload             |         arrived         |
==============================================================================================================================================================
 2023-02-13 06:10:53.787 | 0005F48EB5A83865F440000014F... | t/1                            |    0 | { "msg": "hello TDengine" }    | 2023-02-13 06:10:53.787 |
Query OK, 1 row(s) in set (0.002968s)

emqx_client_eventsテーブル:

bash
taos> select * from emqx_client_events;
           ts            |            clientid            |             event              |
============================================================================================
 2023-02-13 06:10:53.777 | emqx_c                         | client.connected               |
 2023-02-13 06:10:53.791 | emqx_c                         | client.disconnected            |
Query OK, 2 row(s) in set (0.002327s)