Skip to content

TDengineへのMQTTデータ取り込み

TDengineは、IoTおよび産業用IoT(IIoT)シナリオ向けに設計・最適化されたビッグデータプラットフォームです。その中核には、高性能な時系列データベースがあり、クラスター指向のアーキテクチャ、クラウドネイティブ設計、ミニマリスティックなアプローチを特徴としています。EMQX CloudはTDengineとの連携をサポートしており、多数のデバイスやデータコレクターからの大量データの送信、保存、分析、配信を可能にします。これにより、ビジネス運用状態のリアルタイム監視や早期警告を実現し、リアルタイムのビジネスインサイトを提供します。

本ページでは、EMQX CloudとTDengineのデータ統合について包括的に紹介し、データ統合の作成および検証方法を実践的に解説します。

動作概要

TDengineデータ統合はEMQX Cloudに組み込まれた機能です。組み込みのルールエンジンコンポーネントを利用することで、EMQX CloudからTDengineへのデータ取り込みを簡素化し、複雑なコーディングを不要にします。EMQX Cloudはルールエンジンとアクションを介してデバイスデータをTDengineへ転送します。TDengineデータ統合により、MQTTメッセージやクライアントイベントをTDengineに保存可能です。さらに、TDengine内のデータ更新や削除をイベントでトリガーでき、デバイスのオンライン状態や過去のオンライン/オフラインイベントの記録も実現します。

以下の図は、産業用IoTにおけるEMQX CloudとTDengineの典型的なデータ統合アーキテクチャを示しています。

EMQX Cloud-TDengine Integration

産業用エネルギー消費管理シナリオを例に、ワークフローは以下の通りです。

  1. メッセージのパブリッシュと受信:産業用デバイスはMQTTプロトコルを通じてEMQX Cloudに正常に接続し、定期的にエネルギー消費データをパブリッシュします。このデータには生産ライン識別子やエネルギー消費値が含まれます。EMQX Cloudがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
  2. ルールエンジンによるメッセージ処理:組み込みのルールエンジンはトピックマッチングに基づき特定のソースからのメッセージを処理します。メッセージが到着するとルールエンジンを通過し、対応するルールとマッチングしてメッセージデータを処理します。これにはデータ形式の変換、特定情報のフィルタリング、コンテキスト情報によるメッセージの付加などが含まれます。
  3. TDengineへのデータ取り込み:ルールエンジンで定義されたルールがトリガーとなり、メッセージをTDengineに書き込む操作が実行されます。TDengineアクションはSQLテンプレートを提供し、特定のメッセージフィールドを対応するテーブルやカラムに柔軟に書き込むデータ形式を定義可能です。

エネルギー消費データがTDengineに書き込まれた後は、標準SQLと強力な時系列拡張機能を用いてリアルタイムにデータ分析が可能となり、多数のサードパーティのバッチ分析、リアルタイム分析、レポーティングツール、AI/MLツール、可視化ツールとシームレスに連携できます。例えば:

  • Grafanaなどの可視化ツールに接続し、エネルギー消費データのチャートを生成・表示。
  • ERPやPower BIなどのアプリケーションシステムに接続し、生産分析や生産計画の調整を実施。
  • ビジネスシステムに接続し、リアルタイムのエネルギー使用分析を行い、データ駆動型のエネルギー管理を促進。

特長と利点

TDengineデータ統合は以下の特長と利点をビジネスにもたらします。

  • 効率的なデータ処理:EMQX Cloudは多数のIoTデバイス接続とメッセージスループットを効率的に処理可能です。TDengineはデータの書き込み、保存、クエリに優れており、IoTシナリオのデータ処理ニーズをシステムに過負荷をかけずに満たします。
  • メッセージ変換:メッセージはEMQX Cloudのルール内で豊富な処理や変換を経てTDengineに書き込まれます。
  • クラスターおよびスケーラビリティ:EMQX CloudとTDengineはクラスター機能をサポートし、クラウドネイティブアーキテクチャ上に構築されているため、クラウドプラットフォームの弾力的なストレージ、コンピューティング、ネットワークリソースを最大限に活用し、ビジネスの成長に応じた柔軟な水平スケーリングが可能です。
  • 高度なクエリ機能:TDengineは時刻データの効率的なクエリと分析のために最適化された関数、演算子、インデックス技術を提供し、IoT時系列データから正確なインサイトを抽出できます。

はじめる前に

このセクションでは、TDengineデータ統合の作成を開始する前に必要な準備、TDengineサーバーのセットアップやデータテーブルの作成方法について説明します。

前提条件

ネットワーク設定

データ統合を構成する前に、EMQX Cloudのデプロイメントを作成し、EMQX Cloudと対象サービス間のネットワーク接続を確立していることを確認してください。

  • Dedicated Flexデプロイメントの場合

    EMQX CloudのVPCと対象サービスのVPC間でVPCピアリング接続を作成します。ピアリング接続が確立されると、EMQX Cloudは対象サービスのプライベートIPアドレスを介してアクセス可能になります。

    パブリックIP経由でのアクセスが必要な場合は、NATゲートウェイを構成してアウトバウンド接続を有効にしてください。

  • BYOC(Bring Your Own Cloud)デプロイメントの場合

    BYOCデプロイメントが稼働しているVPCと対象サービスをホストするVPC間でVPCピアリング接続を作成します。ピアリングが確立されると、対象サービスのプライベートIPアドレスを介してアクセス可能になります。

    対象サービスにパブリックIP経由でアクセスする必要がある場合は、クラウドプロバイダーのコンソールを使用してBYOC VPCにNATゲートウェイを構成してください。

TDengineのインストール

Dockerを使ってTDengineをインストールし、Dockerイメージを起動します。

bash
# TDengineのDockerイメージを起動
docker run --name TDengine -p 6041:6041 tdengine/tdengine

# コンテナにアクセス
docker exec -it TDengine bash

# コンテナ内でTDengineサーバーを起動
taos

# データベースを作成し、選択
CREATE DATABASE mqtt;

use mqtt;

TDengineでデータテーブルを作成

TDengine用のデータブリッジを作成する前に、メッセージ保存用のデータテーブルをTDengineデータベースに作成する必要があります。

以下のSQL文を使って、TDengineデータベースにデータテーブルt_mqtt_msgを作成します。このテーブルは各メッセージのクライアントID、トピック、ペイロード、作成時間を保存します。

sql
   CREATE TABLE t_mqtt_msg (
       ts timestamp,
       msgid NCHAR(64),
       mqtt_topic NCHAR(255),
       qos TINYINT,
       payload BINARY(1024),
       arrived timestamp
     );

コネクターの作成

データ統合ルールを作成する前に、TDengineサーバーにアクセスするためのTDengineコネクターを作成します。

  1. デプロイメント画面に移動し、左側ナビゲーションメニューからデータ統合をクリックします。初めてコネクターを作成する場合は、データ永続化カテゴリの中からTDengineを選択します。すでにコネクターを作成済みの場合は、新規コネクターを選択し、同じくデータ永続化カテゴリの中からTDengineを選択します。

  2. コネクター名はシステムが自動生成します。

  3. 接続情報を入力します:

    • サーバーホストhttp://127.0.0.1:6041(TDengineサーバーがリモートで稼働している場合は実際のURLを入力)
    • ユーザー名root
    • パスワードtaosdata
    • 必要に応じて詳細設定を行います(任意)。
  4. テストボタンをクリックします。TDengineサービスにアクセス可能であれば、コネクター利用可能のメッセージが表示されます。

  5. 新規作成ボタンをクリックして作成を完了します。

これで、このコネクターを基にデータブリッジルールを作成できます。

ルールの作成

このセクションでは、EMQX Cloudコンソールを使ってTDengineルールを作成し、ルールにアクションを追加する方法を説明します。

  1. ルールエリアで新規ルールをクリックするか、先ほど作成したコネクターのアクション列にある新規ルールアイコンをクリックします。

  2. 利用する機能に基づき、SQLエディターでルールを設定します。ここでは、クライアントがtemp_hum/emqxトピックに温度と湿度のメッセージを送信した際にエンジンをトリガーするSQL例を示します。

    sql
     SELECT
       *,
       now_timestamp('millisecond')  as ts
     FROM
       "temp_hum/emqx"

    TIP

    初心者の方は、SQL例をクリックし、試してみるでSQLルールを学習・テストできます。

  3. 次へをクリックしてアクションを追加します。

  4. コネクターのドロップダウンから先ほど作成したコネクターを選択します。

  5. データベース名mqttを入力します。

  6. アクションのSQLテンプレートを設定します。以下のSQLを使ってデータ挿入を行えます。CSVファイルによるバッチ設定もサポートしています。詳細はバッチ設定を参照してください。

    sql
    INSERT INTO t_mqtt_msg(ts, msgid, mqtt_topic, qos, payload, arrived)
        VALUES (${ts}, '${id}', '${topic}', ${qos}, '${payload}', ${timestamp})

    SQLテンプレート内のプレースホルダー変数が未定義の場合、SQLテンプレート上部の未定義変数をNULLとして扱うスイッチでルールエンジンの動作を切り替えられます。

    • 無効(デフォルト):ルールエンジンは文字列undefinedをデータベースに挿入します。

    • 有効:変数が未定義の場合、ルールエンジンはNULLを挿入します。

      TIP

      可能な限りこのオプションは有効にしてください。無効にするのは後方互換性を確保する場合のみです。

  7. 詳細設定を展開し、同期/非同期モード、キューやバッチ、その他パラメータを必要に応じて設定します(任意)。

  8. 確定ボタンをクリックしてルール作成を完了します。

  9. 新規ルール作成成功のポップアップでルールに戻るをクリックし、データ統合の設定チェーンを完了します。

バッチ設定

TDengineでは1つのデータエントリに数百のデータポイントを含むことがあり、SQL文の作成が困難になる場合があります。これに対応するため、EMQX CloudはSQLのバッチ設定機能を提供しています。

SQLテンプレート編集時に、バッチ設定機能を使ってCSVファイルから挿入操作用のフィールドをインポートできます。

  1. SQLテンプレート下のバッチ設定ボタンをクリックし、バッチ設定インポートポップアップを開きます。

  2. 指示に従いバッチ設定テンプレートファイルをダウンロードし、テンプレート内のフィールドのキーと値のペアを記入します。テンプレートファイルのデフォルト内容は以下の通りです。

    FieldValueChar Value備考(任意)
    tsnowFALSE例示備考
    msgid${id}TRUE
    mqtt_topic${topic}TRUE
    qos${qos}FALSE
    temp${payload.temp}FALSE
    hum${payload.hum}FALSE
    status${payload.status}FALSE
    • Field:フィールドキー。定数または${var}形式のプレースホルダーをサポート。
    • Value:フィールド値。定数または${var}形式のプレースホルダーをサポート。SQLでは文字型はクォートで囲む必要がありますが、テンプレートファイルではクォート不要です。文字型かどうかはChar Value列で指定します。
    • Char Value:フィールドが文字型かどうかを指定し、インポート時にSQL生成時にクォートを付加します。文字型の場合はTRUEまたは1、それ以外はFALSEまたは0を記入してください。
    • 備考:CSVファイル内の注釈用で、EMQX Cloudにはインポートされません。

    CSVファイルのバッチ設定データは2048行を超えないようにしてください。

  3. 記入したテンプレートファイルを保存し、バッチ設定インポートポップアップにアップロード後、インポートをクリックしてバッチ設定を完了します。

  4. インポート後、SQLテンプレートでテーブル名の設定やSQLコードの整形など、さらにSQLを調整できます。

ルールのテスト

温湿度データの報告をシミュレートするには、MQTTXの利用を推奨しますが、他のクライアントでも問題ありません。

  1. MQTTXを使ってデプロイメントに接続し、以下のトピックにメッセージを送信します。

    • トピック:temp_hum/emqx

    • クライアントID:test_client

    • ペイロード:

      json
      {
        "temp": "27.5",
        "hum": "41.8"
      }
  2. アクションの稼働状況を確認し、新規の受信メッセージ1件、新規の送信メッセージ1件があることを確認します。

  3. t_mqtt_msgデータテーブルにデータが書き込まれているか確認します。

bash
taos> select * from t_mqtt_msg;
           ts            |             msgid              |           mqtt_topic           | qos  |            payload             |         arrived         |
==============================================================================================================================================================
 2024-03-29 06:57:37.300 | 000614C727B230AE67180100069... | temp_hum/emqx                            |    1 | {
  "temp": "27.5",
  "hum"... | 2024-03-29 06:57:37.300 |
Query OK, 1 row(s) in set (0.002968s)