Skip to content

TDengineへのMQTTデータ取り込み

TDengineは、IoTおよび産業用IoT(IIoT)シナリオ向けに設計・最適化されたビッグデータプラットフォームです。中核には、高性能な時系列データベースがあり、クラスター指向のアーキテクチャ、クラウドネイティブ設計、ミニマリスティックなアプローチが特徴です。EMQXプラットフォームはTDengineとの連携をサポートしており、多数のデバイスやデータコレクターからの大量データの送信、保存、分析、配信を可能にします。これにより、ビジネス運用状態のリアルタイム監視や早期警告を実現し、リアルタイムのビジネスインサイトを提供します。

本ページでは、EMQXプラットフォームとTDengine間のデータ統合について包括的に紹介し、データ統合の作成と検証に関する実践的な手順を説明します。

動作の仕組み

TDengineデータ統合はEMQXプラットフォームに組み込まれた機能です。組み込みのルールエンジンコンポーネントにより、EMQXプラットフォームからTDengineへのデータ取り込みが簡素化され、複雑なコーディングが不要になります。EMQXプラットフォームはルールエンジンとアクションを通じてデバイスデータをTDengineに転送します。TDengineデータ統合により、MQTTメッセージやクライアントイベントをTDengineに保存可能です。さらに、TDengine内のデータ更新や削除はイベントによってトリガーされ、デバイスのオンライン状態や過去のオンライン/オフラインイベントの記録も実現します。

以下の図は、産業用IoTにおけるEMQXプラットフォームとTDengineの典型的なデータ統合アーキテクチャを示しています。

EMQX Platform-TDengine Integration

産業用エネルギー消費管理シナリオを例に、ワークフローは次の通りです。

  1. メッセージのパブリッシュと受信:産業用デバイスはMQTTプロトコルを通じてEMQXプラットフォームに正常に接続し、定期的にエネルギー消費データをパブリッシュします。このデータには生産ライン識別子やエネルギー消費値が含まれます。EMQXプラットフォームがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
  2. ルールエンジンによるメッセージ処理:組み込みのルールエンジンはトピックマッチングに基づき特定のソースからのメッセージを処理します。メッセージが到着するとルールエンジンを通過し、対応するルールとマッチングしてメッセージデータを処理します。ここではデータ形式の変換、特定情報のフィルタリング、コンテキスト情報の付加などが可能です。
  3. TDengineへのデータ取り込み:ルールエンジンで定義されたルールがトリガーとなり、メッセージをTDengineに書き込む操作が実行されます。TDengineアクションはSQLテンプレートを提供し、特定のメッセージフィールドをTDengineの対応テーブルやカラムに柔軟に書き込むデータ形式を定義できます。

エネルギー消費データがTDengineに書き込まれた後は、標準SQLおよび強力な時系列拡張機能を用いてリアルタイムにデータ分析が可能です。多数のサードパーティ製バッチ分析、リアルタイム分析、レポーティングツール、AI/MLツール、可視化ツールとシームレスに連携できます。例えば:

  • Grafanaなどの可視化ツールに接続し、エネルギー消費データのグラフ表示を生成。
  • ERPやPower BIなどのアプリケーションシステムに接続し、生産分析や生産計画の調整を実施。
  • ビジネスシステムに接続し、リアルタイムのエネルギー使用分析を行い、データ駆動型のエネルギー管理を支援。

特長とメリット

TDengineデータ統合は、ビジネスに以下の特長と利点をもたらします。

  • 効率的なデータ処理:EMQXプラットフォームは多数のIoTデバイス接続とメッセージスループットを効率的に処理可能です。TDengineはデータ書き込み、保存、クエリに優れ、IoTシナリオのデータ処理要件をシステムに過負荷をかけずに満たします。
  • メッセージ変換:メッセージはEMQXプラットフォームのルール内で豊富な処理・変換が可能であり、その後TDengineに書き込まれます。
  • クラスターとスケーラビリティ:EMQXプラットフォームとTDengineはクラスター機能をサポートし、クラウドネイティブアーキテクチャ上に構築されています。これによりクラウドプラットフォームの弾力的なストレージ、計算、ネットワークリソースを最大限活用でき、ビジネスの成長に応じて柔軟な水平スケーリングが可能です。
  • 高度なクエリ機能:TDengineはタイムスタンプデータの効率的なクエリと分析のために最適化された関数、演算子、インデックス技術を提供し、IoT時系列データから正確なインサイトを抽出できます。

はじめる前に

本セクションでは、TDengineデータ統合の作成を開始する前に必要な準備、TDengineサーバーのセットアップやデータテーブルの作成方法について説明します。

前提条件

ネットワーク設定

EMQX Platform上でデプロイメント(EMQXクラスター)を作成し、ネットワークを構成する必要があります。

  • 専用デプロイメントユーザーの場合:まず、VPCピアリング接続を作成してください。ピアリング接続が確立された後、内部ネットワークのIPを介してプラットフォームコンソールにログインし、対象のコネクターにアクセスできます。あるいは、NATゲートウェイを設定して、パブリックIP経由で対象のコネクターにアクセスすることも可能です。
  • BYOC(Bring Your Own Cloud)デプロイメントユーザーの場合:BYOCがデプロイされているVPCと対象コネクターが存在するVPC間でピアリング接続を確立してください。ピアリング接続作成後、内部ネットワークのIPを介して対象コネクターにアクセスできます。パブリックIPアドレス経由でリソースにアクセスする必要がある場合は、パブリッククラウドコンソールでBYOCがデプロイされているVPCに対してNATゲートウェイを構成してください。

TDengineのインストール

Dockerを使ってTDengineをインストールし、Dockerイメージを起動します。

bash
# TDengineのDockerイメージを起動
docker run --name TDengine -p 6041:6041 tdengine/tdengine

# コンテナにアクセス
docker exec -it TDengine bash

# コンテナ内でTDengineサーバーを起動
taos

# データベースを作成し、選択
CREATE DATABASE mqtt;

use mqtt;

TDengineでのデータテーブル作成

TDengine用のデータブリッジを作成する前に、メッセージ保存用のデータテーブルをTDengineデータベースに作成する必要があります。

以下のSQL文を使い、TDengineデータベースにt_mqtt_msgテーブルを作成します。このテーブルは各メッセージのクライアントID、トピック、ペイロード、作成時刻を保存します。

sql
   CREATE TABLE t_mqtt_msg (
       ts timestamp,
       msgid NCHAR(64),
       mqtt_topic NCHAR(255),
       qos TINYINT,
       payload BINARY(1024),
       arrived timestamp
     );

コネクターの作成

データ統合ルールを作成する前に、TDengineサーバーにアクセスするためのTDengineコネクターを作成します。

  1. デプロイメントに移動し、左側メニューからデータ統合をクリックします。初めてコネクターを作成する場合は、データ永続化カテゴリの下にあるTDengineを選択します。すでにコネクターを作成済みの場合は、新規コネクターを選択し、同じくデータ永続化カテゴリのTDengineを選択します。

  2. コネクター名:システムが自動的にコネクター名を生成します。

  3. 接続情報を入力します:

    • サーバーホストhttp://127.0.0.1:6041 またはTDengineサーバーがリモートで稼働している場合は実際のURLを入力。
    • ユーザー名root
    • パスワードtaosdata
    • ビジネス要件に応じて詳細設定を構成(任意)
  4. テストボタンをクリックします。TDengineサービスにアクセス可能であれば、コネクター利用可能のメッセージが返されます。

  5. 新規作成ボタンをクリックして作成を完了します。

これで、このコネクターを基にデータブリッジルールを作成できます。

ルールの作成

本セクションでは、EMQXプラットフォームコンソールを使ってTDengineルールを作成し、ルールにアクションを追加する方法を示します。

  1. ルールエリアの新規ルールをクリックするか、先ほど作成したコネクターのアクション列にある新規ルールアイコンをクリックします。

  2. 利用したい機能に基づき、SQLエディターでルールを設定します。ここでは、クライアントがtemp_hum/emqxトピックに温湿度メッセージを送信した際にエンジンをトリガーするSQLを作成します。以下のようなSQLが必要です。

    sql
     SELECT
       *,
       now_timestamp('millisecond')  as ts
     FROM
       "temp_hum/emqx"

    TIP

    初心者の方は、SQL例をクリックし、テスト有効化を使ってSQLルールの学習とテストを行うことをおすすめします。

  3. 次へをクリックしてアクションを追加します。

  4. コネクターのドロップダウンから先ほど作成したコネクターを選択します。

  5. データベース名mqttを入力します。

  6. アクションのSQLテンプレートを設定します。以下のSQLを使ってデータ挿入を完了できます。CSVファイルによるバッチ設定もサポートしています。詳細はバッチ設定を参照してください。

    sql
    INSERT INTO t_mqtt_msg(ts, msgid, mqtt_topic, qos, payload, arrived)
        VALUES (${ts}, '${id}', '${topic}', ${qos}, '${payload}', ${timestamp})

    SQLテンプレート内でプレースホルダー変数が未定義の場合、SQLテンプレート上の未定義変数をNULLとして扱うスイッチを切り替えてルールエンジンの動作を定義できます。

    • 無効(デフォルト):ルールエンジンは文字列undefinedをデータベースに挿入します。

    • 有効:変数が未定義の場合、ルールエンジンはNULLをデータベースに挿入します。

      TIP

      可能な限りこのオプションは常に有効にすることを推奨します。無効化は互換性維持のための例外的な用途のみです。

  7. 詳細設定を展開し、同期/非同期モード、キューやバッチ、その他パラメータを適宜設定します(任意)。

  8. 確定ボタンをクリックしてルール作成を完了します。

  9. 新規ルール作成成功のポップアップでルールに戻るをクリックし、データ統合設定の一連の流れを完了します。

バッチ設定

TDengineでは単一のデータエントリに数百のデータポイントを含むことがあり、SQL文の記述が困難になる場合があります。これに対応するため、EMQXプラットフォームはSQLのバッチ設定機能を提供しています。

SQLテンプレート編集時に、バッチ設定機能を使ってCSVファイルから挿入操作用のフィールドをインポートできます。

  1. SQLテンプレート下のバッチ設定ボタンをクリックし、バッチ設定インポートポップアップを開きます。

  2. 指示に従い、バッチ設定テンプレートファイルをダウンロードし、テンプレート内のフィールドのキー・バリューを記入します。デフォルトのテンプレート内容は以下の通りです。

    フィールド文字列値備考(任意)
    tsnowFALSE例の備考
    msgid${id}TRUE
    mqtt_topic${topic}TRUE
    qos${qos}FALSE
    temp${payload.temp}FALSE
    hum${payload.hum}FALSE
    status${payload.status}FALSE
    • フィールド:フィールドキー。定数または${var}形式のプレースホルダーをサポート。
    • :フィールド値。定数または${var}形式のプレースホルダーをサポート。SQLでは文字列型は引用符で囲む必要がありますが、テンプレートファイルでは不要です。文字列型かどうかは文字列値列で指定します。
    • 文字列値:フィールドが文字列型かどうかを指定し、インポート時にSQL生成時に引用符を付加します。文字列型の場合はTRUEまたは1、そうでなければFALSEまたは0を記入。
    • 備考:CSVファイル内の注釈用で、EMQXプラットフォームにはインポートされません。

    CSVファイルのバッチ設定データは2048行を超えないようにしてください。

  3. 記入したテンプレートファイルを保存し、バッチ設定インポートポップアップにアップロード後、インポートをクリックしてバッチ設定を完了します。

  4. インポート後、SQLテンプレート内でテーブル名の設定やSQLコードの整形など、さらにSQLを調整可能です。

ルールのテスト

温湿度データの送信シミュレーションにはMQTTXの利用を推奨しますが、他の任意のクライアントでも構いません。

  1. MQTTXを使い、デプロイメントに接続して以下のトピックにメッセージを送信します。

    • トピック:temp_hum/emqx

    • クライアントID:test_client

    • ペイロード:

      json
      {
        "temp": "27.5",
        "hum": "41.8"
      }
  2. アクションの稼働状況を確認し、新規の受信メッセージ1件と送信メッセージ1件があることを確認します。

  3. t_mqtt_msgデータテーブルにデータが書き込まれているか確認します。

bash
taos> select * from t_mqtt_msg;
           ts            |             msgid              |           mqtt_topic           | qos  |            payload             |         arrived         |
==============================================================================================================================================================
 2024-03-29 06:57:37.300 | 000614C727B230AE67180100069... | temp_hum/emqx                            |    1 | {
  "temp": "27.5",
  "hum"... | 2024-03-29 06:57:37.300 |
Query OK, 1 row(s) in set (0.002968s)