Skip to content

TDengineへのMQTTデータ取り込み

TDengineは、IoTおよび産業用IoT(IIoT)シナリオ向けに設計・最適化されたビッグデータプラットフォームです。中核には高性能な時系列データベースがあり、クラスター指向のアーキテクチャ、クラウドネイティブ設計、ミニマリスティックなアプローチが特徴です。EMQXはTDengineとの統合をサポートしており、多数のデバイスやデータコレクターからの大量データの送信、保存、分析、配信を可能にします。これにより、ビジネス運用状態のリアルタイム監視や早期警告を実現し、リアルタイムなビジネスインサイトを提供します。

本ページでは、EMQXとTDengine間のデータ統合について包括的に紹介し、データ統合の作成および検証方法を実践的に解説します。

動作概要

TDengineデータ統合はEMQXの組み込み機能です。組み込みのルールエンジンコンポーネントを利用することで、EMQXからTDengineへのデータ取り込みが簡素化され、複雑なコーディングが不要になります。EMQXはルールエンジンとSinkを介してデバイスデータをTDengineに転送します。TDengineデータ統合により、MQTTメッセージやクライアントイベントをTDengineに保存可能です。さらに、TDengine内のデータ更新や削除はイベントによってトリガーされ、デバイスのオンライン状態や過去のオンライン/オフラインイベントの記録も可能になります。

以下の図は、産業用IoTにおけるEMQXとTDengineのデータ統合の典型的なアーキテクチャを示しています。

EMQX Integration TDengine

産業用エネルギー消費管理シナリオを例に、ワークフローは以下の通りです:

  1. メッセージのパブリッシュと受信:産業用デバイスはMQTTプロトコルを通じてEMQXに正常に接続し、定期的にエネルギー消費データをパブリッシュします。このデータには生産ライン識別子やエネルギー消費値が含まれます。EMQXがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
  2. ルールエンジンによるメッセージ処理:組み込みのルールエンジンはトピックマッチングに基づき特定のソースからのメッセージを処理します。メッセージが到着するとルールエンジンを通過し、対応するルールとマッチングしてメッセージデータを処理します。これにはデータ形式の変換、特定情報のフィルタリング、コンテキスト情報の付加などが含まれます。
  3. TDengineへのデータ取り込み:ルールエンジンで定義されたルールがトリガーとなり、メッセージをTDengineに書き込む操作を実行します。TDengine SinkはSQLテンプレートを提供し、特定のメッセージフィールドを対応するテーブルやカラムに柔軟に書き込む定義が可能です。

エネルギー消費データがTDengineに書き込まれた後は、標準SQLと強力な時系列拡張機能を使ってリアルタイムにデータ分析が可能です。多数のサードパーティのバッチ分析、リアルタイム分析、レポートツール、AI/MLツール、可視化ツールとシームレスに連携できます。例えば:

  • Grafanaなどの可視化ツールに接続し、エネルギー消費データのグラフを生成・表示。
  • ERPやPower BIなどのアプリケーションシステムに接続し、生産分析や生産計画の調整を実施。
  • ビジネスシステムに接続し、リアルタイムのエネルギー使用分析を行い、データ駆動型のエネルギー管理を支援。

特長とメリット

TDengineデータ統合は以下の特長と利点をビジネスにもたらします:

  • 効率的なデータ処理:EMQXは多数のIoTデバイス接続とメッセージスループットを効率的に処理可能です。TDengineはデータの書き込み、保存、クエリに優れており、IoTシナリオのデータ処理ニーズをシステムに負荷をかけずに満たします。
  • メッセージ変換:メッセージはEMQXルール内で豊富な処理や変換を経てからTDengineに書き込まれます。
  • クラスターとスケーラビリティ:EMQXとTDengineはクラスター機能をサポートし、クラウドネイティブアーキテクチャ上に構築されています。これによりクラウドプラットフォームの弾力的なストレージ、計算、ネットワーク資源を最大限活用でき、ビジネスの成長に応じた柔軟な水平スケーリングが可能です。
  • 高度なクエリ機能:TDengineはタイムスタンプデータの効率的なクエリと分析のために最適化された関数、演算子、インデックス技術を提供し、IoT時系列データから正確なインサイトを抽出できます。

はじめる前に

このセクションでは、TDengineデータ統合の作成を開始する前に必要な準備、TDengineサーバーのセットアップやデータテーブルの作成方法について説明します。

前提条件

TDengineの起動とデータベース作成

TDengineを起動またはTDengineサービスに接続し、データベースを作成するには以下の2つの方法があります。

TDengineでのデータテーブル作成

メッセージ保存および状態記録用に、TDengineデータベース内に2つのデータテーブルを作成します。

  1. 以下のSQL文を使い、t_mqtt_msgテーブルを作成します。このテーブルは各メッセージのクライアントID、トピック、ペイロード、作成時刻を保存します。
sql
   CREATE TABLE t_mqtt_msg (
       ts timestamp,
       msgid NCHAR(64),
       mqtt_topic NCHAR(255),
       qos TINYINT,
       payload BINARY(1024),
       arrived timestamp
     );
  1. 以下のSQL文を使い、emqx_client_eventsテーブルを作成します。このテーブルは各イベントのクライアントID、イベント種別、作成時刻を保存します。
sql
     CREATE TABLE emqx_client_events (
         ts timestamp,
         clientid VARCHAR(255),
         event VARCHAR(255)
       );

コネクターの作成

このセクションでは、SinkをTDengineサーバーに接続するためのコネクター作成方法を説明します。

  1. EMQXダッシュボードに入り、Integration -> Connectorsをクリックします。

  2. ページ右上のCreateをクリックします。

  3. Create ConnectorページでTDengineを選択し、Nextをクリックします。

  4. Configurationステップで、接続先に応じて以下の情報を設定します。

  5. 詳細設定(任意):詳細はSinkの機能を参照してください。

  6. Createをクリックする前に、Test ConnectivityをクリックしてコネクターがTDengineサーバーに接続できるか確認できます。

  7. ページ下部のCreateボタンをクリックしてコネクター作成を完了します。ポップアップダイアログでBack to Connector Listをクリックするか、Create RuleをクリックしてSinkを使ったルール作成を続行できます。詳細はメッセージ保存用のTDengine Sinkルール作成およびイベント記録用のTDengine Sinkルール作成を参照してください。

メッセージ保存用のTDengine Sinkルール作成

このセクションでは、ダッシュボードでMQTTトピックs/#からのメッセージを処理し、処理済みデータを設定済みSink経由でTDengineのt_mqtt_msgテーブルに保存するルールの作成方法を示します。

  1. EMQXダッシュボードでIntegration -> Rulesをクリックします。

  2. ページ右上のCreateをクリックします。

  3. ルールIDにmy_ruleを入力し、SQL Editorでメッセージ保存用のルールを作成します。例えば、以下のSQL文を入力するとトピックt/#配下のMQTTメッセージがTDengineに保存されます。

    注意:独自のSQL文を指定する場合は、Sinkが必要とするすべてのフィールドをSELECT句に含めてください。

    sql
      SELECT
        *,
        now_timestamp('millisecond')  as ts
      FROM
        "t/#"

    TIP

    初心者の方はSQL ExamplesEnable TestをクリックしてSQLルールを学習・テストしてください。

    • Add Actionボタンをクリックし、ルールによってトリガーされるアクションを定義します。このアクションにより、EMQXはルールで処理したデータをTDengineに送信します。
  4. Type of ActionドロップダウンからTDengineを選択します。ActionはデフォルトのCreate Actionのままにします。既存のTDengine Sinkがあれば選択可能ですが、この例では新規Sinkを作成します。

  5. Sinkの名前を入力します。英数字の組み合わせで指定してください。

  6. Connectorドロップダウンから先ほど作成したmy_tdengineを選択します。隣のボタンをクリックして新規コネクターを作成することも可能です。設定パラメータの詳細はコネクター作成を参照してください。

  7. SinkのSQL Templateを設定します。以下のSQLを使ってデータ挿入を完了できます。CSVファイルによるバッチ設定もサポートしています。詳細はバッチ設定を参照してください。

    TIP

    EMQX 5.1.1で破壊的変更がありました。5.1.1以前は文字列型の値が自動的にクォートされていましたが、5.1.1以降はユーザーが手動でクォートする必要があります。

    sql
    INSERT INTO t_mqtt_msg(ts, msgid, mqtt_topic, qos, payload, arrived) 
        VALUES (${ts}, '${id}', '${topic}', ${qos}, '${payload}', ${timestamp})

    SQLテンプレート内でプレースホルダー変数が未定義の場合、SQL template上部のUndefined Vars as Nullスイッチでルールエンジンの動作を設定できます:

    • 無効(デフォルト):ルールエンジンは文字列undefinedをデータベースに挿入します。

    • 有効:変数が未定義の場合、ルールエンジンはNULLを挿入します。

      TIP

      可能な限りこのオプションは有効にしてください。無効にするのは後方互換性確保のためのみです。

  8. フォールバックアクション(任意):メッセージ配信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義できます。詳細はフォールバックアクションを参照してください。

  9. 詳細設定(任意):必要に応じてsyncまたはasyncクエリモードを選択可能です。詳細はSinkの機能を参照してください。

  10. Createをクリックする前に、Test ConnectivityでSinkがTDengineに接続できるかテストできます。

  11. CreateボタンをクリックしてSink設定を完了します。新しいSinkがAction Outputsに追加されます。

  12. Create Ruleページに戻り、設定内容を確認してCreateをクリックしルールを生成します。

これでTDengine Sink用のルールが正常に作成されました。Integration -> Rulesページで新規ルールを確認できます。**Actions(Sink)**タブをクリックすると新しいTDengine Sinkが表示されます。

また、Integration -> Flow Designerをクリックするとトポロジーが表示され、トピックt/#配下のメッセージがルールmy_ruleによって解析され、TDengineに送信・保存されていることが確認できます。

バッチ設定

TDengineでは1つのデータエントリに数百のデータポイントを含むことがあり、SQL文の作成が困難になる場合があります。この問題に対応するため、EMQXはSQLのバッチ設定機能を提供しています。

SQLテンプレート編集時に、バッチ設定機能を使ってCSVファイルから挿入操作用のフィールドをインポートできます。

  1. SQL Template下のBatch Settingボタンをクリックし、Import Batch Settingポップアップを開きます。

  2. 指示に従いバッチ設定テンプレートファイルをダウンロードし、テンプレート内のフィールドのキー・値ペアを記入します。テンプレートのデフォルト内容は以下の通りです:

    FieldValueChar Value備考(任意)
    tsnowFALSE例示
    msgid${id}TRUE
    mqtt_topic${topic}TRUE
    qos${qos}FALSE
    temp${payload.temp}FALSE
    hum${payload.hum}FALSE
    status${payload.status}FALSE
    • Field:フィールドキー。定数または${var}形式のプレースホルダーをサポート。
    • Value:フィールド値。定数または${var}形式のプレースホルダーをサポート。SQLでは文字列型はクォートが必要ですが、テンプレートファイル内では不要です。文字列型かどうかはChar Value列で指定します。
    • Char Value:フィールドが文字列型かどうかを指定し、インポート時にSQL生成でクォートを付加します。文字列型ならTRUEまたは1、それ以外はFALSEまたは0を記入。
    • 備考:CSVファイル内の注釈用で、EMQXへのインポート対象外です。

    CSVファイルのバッチ設定データは2048行を超えないようにしてください。

  3. 記入済みテンプレートファイルを保存し、Import Batch SettingポップアップにアップロードしてImportをクリックしバッチ設定を完了します。

  4. インポート後、SQL Template内のSQLをさらに調整可能です。テーブル名の設定やSQLコードの整形などが行えます。

イベント記録用のTDengine Sinkルール作成

このセクションでは、クライアントのオンライン/オフライン状態を記録し、イベントデータを設定済みSink経由でTDengineのemqx_client_eventsテーブルに保存するルール作成方法を示します。

ルール作成手順はメッセージ保存用のTDengine Sinkルール作成とほぼ同様ですが、SQLルール構文とSQLテンプレートが異なります。

オンライン/オフライン状態記録用のSQLルール構文は以下の通りです:

sql
SELECT
      *,
      now_timestamp('millisecond')  as ts
    FROM 
      "$events/client_connected", "$events/client_disconnected"

Sink用のSQLテンプレートは以下の通りです:

注意:フィールドはクォートで囲まず、SQL文の末尾にセミコロン(;)を付けないでください。

sql
INSERT INTO emqx_client_events(ts, clientid, event) VALUES (
      ${ts},
      '${clientid}',
      '${event}'
    )

ルールのテスト

MQTTXを使ってトピックt/1にメッセージを送信し、オンライン/オフラインイベントをトリガーします。

bash
mqttx pub -i emqx_c -t t/1 -m '{ "msg": "hello TDengine" }'

2つのSinkの稼働状況を確認すると、新規の受信メッセージ1件、送信メッセージ1件、イベント記録2件があるはずです。

t_mqtt_msgデータテーブルにデータが書き込まれているか確認します。

bash
taos> select * from t_mqtt_msg;
           ts            |             msgid              |           mqtt_topic           | qos  |            payload             |         arrived         |
==============================================================================================================================================================
 2023-02-13 06:10:53.787 | 0005F48EB5A83865F440000014F... | t/1                            |    0 | { "msg": "hello TDengine" }    | 2023-02-13 06:10:53.787 |
Query OK, 1 row(s) in set (0.002968s)

emqx_client_eventsテーブル:

bash
taos> select * from emqx_client_events;
           ts            |            clientid            |             event              |
============================================================================================
 2023-02-13 06:10:53.777 | emqx_c                         | client.connected               |
 2023-02-13 06:10:53.791 | emqx_c                         | client.disconnected            |
Query OK, 2 row(s) in set (0.002327s)