# TDengineへのMQTTデータ取り込み

[TDengine](https://tdengine.com/)は、IoTおよび産業用IoT（IIoT）シナリオ向けに設計・最適化されたビッグデータプラットフォームです。中核には、高性能な時系列データベースがあり、クラスター指向のアーキテクチャ、クラウドネイティブ設計、ミニマリスティックなアプローチが特徴です。EMQXプラットフォームはTDengineとの連携をサポートしており、多数のデバイスやデータコレクターからの大量データの送信、保存、分析、配信を可能にします。これにより、ビジネス運用状態のリアルタイム監視や早期警告を実現し、リアルタイムのビジネスインサイトを提供します。

本ページでは、EMQXプラットフォームとTDengine間のデータ統合について包括的に紹介し、データ統合の作成と検証に関する実践的な手順を説明します。

## 動作の仕組み

TDengineデータ統合はEMQXプラットフォームに組み込まれた機能です。組み込みの[ルールエンジン](./rules.md)コンポーネントにより、EMQXプラットフォームからTDengineへのデータ取り込みが簡素化され、複雑なコーディングが不要になります。EMQXプラットフォームはルールエンジンとアクションを通じてデバイスデータをTDengineに転送します。TDengineデータ統合により、MQTTメッセージやクライアントイベントをTDengineに保存可能です。さらに、TDengine内のデータ更新や削除はイベントによってトリガーされ、デバイスのオンライン状態や過去のオンライン／オフラインイベントの記録も実現します。

以下の図は、産業用IoTにおけるEMQXプラットフォームとTDengineの典型的なデータ統合アーキテクチャを示しています。

![EMQX Platform-TDengine Integration](./_assets/data_integration_tdengine.jpg)

産業用エネルギー消費管理シナリオを例に、ワークフローは次の通りです。

1. **メッセージのパブリッシュと受信**：産業用デバイスはMQTTプロトコルを通じてEMQXプラットフォームに正常に接続し、定期的にエネルギー消費データをパブリッシュします。このデータには生産ライン識別子やエネルギー消費値が含まれます。EMQXプラットフォームがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
2. **ルールエンジンによるメッセージ処理**：組み込みのルールエンジンはトピックマッチングに基づき特定のソースからのメッセージを処理します。メッセージが到着するとルールエンジンを通過し、対応するルールとマッチングしてメッセージデータを処理します。ここではデータ形式の変換、特定情報のフィルタリング、コンテキスト情報の付加などが可能です。
3. **TDengineへのデータ取り込み**：ルールエンジンで定義されたルールがトリガーとなり、メッセージをTDengineに書き込む操作が実行されます。TDengineアクションはSQLテンプレートを提供し、特定のメッセージフィールドをTDengineの対応テーブルやカラムに柔軟に書き込むデータ形式を定義できます。

エネルギー消費データがTDengineに書き込まれた後は、標準SQLおよび強力な時系列拡張機能を用いてリアルタイムにデータ分析が可能です。多数のサードパーティ製バッチ分析、リアルタイム分析、レポーティングツール、AI/MLツール、可視化ツールとシームレスに連携できます。例えば：

- Grafanaなどの可視化ツールに接続し、エネルギー消費データのグラフ表示を生成。
- ERPやPower BIなどのアプリケーションシステムに接続し、生産分析や生産計画の調整を実施。
- ビジネスシステムに接続し、リアルタイムのエネルギー使用分析を行い、データ駆動型のエネルギー管理を支援。

## 特長とメリット

TDengineデータ統合は、ビジネスに以下の特長と利点をもたらします。

- **効率的なデータ処理**：EMQXプラットフォームは多数のIoTデバイス接続とメッセージスループットを効率的に処理可能です。TDengineはデータ書き込み、保存、クエリに優れ、IoTシナリオのデータ処理要件をシステムに過負荷をかけずに満たします。
- **メッセージ変換**：メッセージはEMQXプラットフォームのルール内で豊富な処理・変換が可能であり、その後TDengineに書き込まれます。
- **クラスターとスケーラビリティ**：EMQXプラットフォームとTDengineはクラスター機能をサポートし、クラウドネイティブアーキテクチャ上に構築されています。これによりクラウドプラットフォームの弾力的なストレージ、計算、ネットワークリソースを最大限活用でき、ビジネスの成長に応じて柔軟な水平スケーリングが可能です。
- **高度なクエリ機能**：TDengineはタイムスタンプデータの効率的なクエリと分析のために最適化された関数、演算子、インデックス技術を提供し、IoT時系列データから正確なインサイトを抽出できます。

## はじめる前に

本セクションでは、TDengineデータ統合の作成を開始する前に必要な準備、TDengineサーバーのセットアップやデータテーブルの作成方法について説明します。

### 前提条件

- [データ統合](./introduction.md)の知識
- データ統合の[ルール](./rules.md)の知識

### ネットワーク設定

<!--@include: ./network-setting.md-->

### TDengineのインストール

Dockerを使ってTDengineをインストールし、Dockerイメージを起動します。

```bash
# TDengineのDockerイメージを起動
docker run --name TDengine -p 6041:6041 tdengine/tdengine

# コンテナにアクセス
docker exec -it TDengine bash

# コンテナ内でTDengineサーバーを起動
taos

# データベースを作成し、選択
CREATE DATABASE mqtt;

use mqtt;
```

### TDengineでのデータテーブル作成

TDengine用のデータブリッジを作成する前に、メッセージ保存用のデータテーブルをTDengineデータベースに作成する必要があります。

以下のSQL文を使い、TDengineデータベースに`t_mqtt_msg`テーブルを作成します。このテーブルは各メッセージのクライアントID、トピック、ペイロード、作成時刻を保存します。

```sql
   CREATE TABLE t_mqtt_msg (
       ts timestamp,
       msgid NCHAR(64),
       mqtt_topic NCHAR(255),
       qos TINYINT,
       payload BINARY(1024),
       arrived timestamp
     );
```

## コネクターの作成

データ統合ルールを作成する前に、TDengineサーバーにアクセスするためのTDengineコネクターを作成します。

1. デプロイメントに移動し、左側メニューから**データ統合**をクリックします。初めてコネクターを作成する場合は、**データ永続化**カテゴリの下にある**TDengine**を選択します。すでにコネクターを作成済みの場合は、**新規コネクター**を選択し、同じく**データ永続化**カテゴリの**TDengine**を選択します。

2. **コネクター名**：システムが自動的にコネクター名を生成します。

3. 接続情報を入力します：

    - **サーバーホスト**：`http://127.0.0.1:6041` またはTDengineサーバーがリモートで稼働している場合は実際のURLを入力。
    - **ユーザー名**：`root`
    - **パスワード**：`taosdata`
    - ビジネス要件に応じて詳細設定を構成（任意）

4. **テスト**ボタンをクリックします。TDengineサービスにアクセス可能であれば、**コネクター利用可能**のメッセージが返されます。

5. **新規作成**ボタンをクリックして作成を完了します。

これで、このコネクターを基にデータブリッジルールを作成できます。

## ルールの作成

本セクションでは、EMQXプラットフォームコンソールを使ってTDengineルールを作成し、ルールにアクションを追加する方法を示します。

1. ルールエリアの**新規ルール**をクリックするか、先ほど作成したコネクターの**アクション**列にある新規ルールアイコンをクリックします。

2. 利用したい機能に基づき、**SQLエディター**でルールを設定します。ここでは、クライアントが`temp_hum/emqx`トピックに温湿度メッセージを送信した際にエンジンをトリガーするSQLを作成します。以下のようなSQLが必要です。

   ```sql
    SELECT
      *,
      now_timestamp('millisecond')  as ts
    FROM
      "temp_hum/emqx"
   ```

   ::: tip

   初心者の方は、**SQL例**をクリックし、**テスト有効化**を使ってSQLルールの学習とテストを行うことをおすすめします。

   :::

3. **次へ**をクリックしてアクションを追加します。

4. **コネクター**のドロップダウンから先ほど作成したコネクターを選択します。

5. **データベース名**に`mqtt`を入力します。

6. アクションの**SQLテンプレート**を設定します。以下のSQLを使ってデータ挿入を完了できます。CSVファイルによるバッチ設定もサポートしています。詳細は[バッチ設定](#batch-setting)を参照してください。

   ```sql
   INSERT INTO t_mqtt_msg(ts, msgid, mqtt_topic, qos, payload, arrived)
       VALUES (${ts}, '${id}', '${topic}', ${qos}, '${payload}', ${timestamp})
   ```

   SQLテンプレート内でプレースホルダー変数が未定義の場合、**SQLテンプレート**上の**未定義変数をNULLとして扱う**スイッチを切り替えてルールエンジンの動作を定義できます。

   - **無効**（デフォルト）：ルールエンジンは文字列`undefined`をデータベースに挿入します。
   - **有効**：変数が未定義の場合、ルールエンジンは`NULL`をデータベースに挿入します。

     ::: tip

     可能な限りこのオプションは常に有効にすることを推奨します。無効化は互換性維持のための例外的な用途のみです。

     :::

7. **詳細設定**を展開し、同期／非同期モード、キューやバッチ、その他パラメータを適宜設定します（任意）。

8. **確定**ボタンをクリックしてルール作成を完了します。

9. **新規ルール作成成功**のポップアップで**ルールに戻る**をクリックし、データ統合設定の一連の流れを完了します。

### バッチ設定

TDengineでは単一のデータエントリに数百のデータポイントを含むことがあり、SQL文の記述が困難になる場合があります。これに対応するため、EMQXプラットフォームはSQLのバッチ設定機能を提供しています。

SQLテンプレート編集時に、バッチ設定機能を使ってCSVファイルから挿入操作用のフィールドをインポートできます。

1. **SQLテンプレート**下の**バッチ設定**ボタンをクリックし、**バッチ設定インポート**ポップアップを開きます。

2. 指示に従い、バッチ設定テンプレートファイルをダウンロードし、テンプレート内のフィールドのキー・バリューを記入します。デフォルトのテンプレート内容は以下の通りです。

   | フィールド | 値               | 文字列値 | 備考（任意）       |
   | ---------- | ---------------- | -------- | ------------------ |
   | ts         | now              | FALSE    | 例の備考           |
   | msgid      | ${id}            | TRUE     |                    |
   | mqtt_topic | ${topic}         | TRUE     |                    |
   | qos        | ${qos}           | FALSE    |                    |
   | temp       | ${payload.temp}  | FALSE    |                    |
   | hum        | ${payload.hum}   | FALSE    |                    |
   | status     | ${payload.status}| FALSE    |                    |

   - **フィールド**：フィールドキー。定数または`${var}`形式のプレースホルダーをサポート。
   - **値**：フィールド値。定数または`${var}`形式のプレースホルダーをサポート。SQLでは文字列型は引用符で囲む必要がありますが、テンプレートファイルでは不要です。文字列型かどうかは`文字列値`列で指定します。
   - **文字列値**：フィールドが文字列型かどうかを指定し、インポート時にSQL生成時に引用符を付加します。文字列型の場合は`TRUE`または`1`、そうでなければ`FALSE`または`0`を記入。
   - **備考**：CSVファイル内の注釈用で、EMQXプラットフォームにはインポートされません。

   CSVファイルのバッチ設定データは2048行を超えないようにしてください。

3. 記入したテンプレートファイルを保存し、**バッチ設定インポート**ポップアップにアップロード後、**インポート**をクリックしてバッチ設定を完了します。

4. インポート後、**SQLテンプレート**内でテーブル名の設定やSQLコードの整形など、さらにSQLを調整可能です。

## ルールのテスト

温湿度データの送信シミュレーションには[MQTTX](https://mqttx.app/)の利用を推奨しますが、他の任意のクライアントでも構いません。

1. MQTTXを使い、デプロイメントに接続して以下のトピックにメッセージを送信します。

   - トピック：`temp_hum/emqx`
   - クライアントID：`test_client`
   - ペイロード：

     ```json
     {
       "temp": "27.5",
       "hum": "41.8"
     }
     ```

2. アクションの稼働状況を確認し、新規の受信メッセージ1件と送信メッセージ1件があることを確認します。

3. `t_mqtt_msg`データテーブルにデータが書き込まれているか確認します。

```bash
taos> select * from t_mqtt_msg;
           ts            |             msgid              |           mqtt_topic           | qos  |            payload             |         arrived         |
==============================================================================================================================================================
 2024-03-29 06:57:37.300 | 000614C727B230AE67180100069... | temp_hum/emqx                            |    1 | {
  "temp": "27.5",
  "hum"... | 2024-03-29 06:57:37.300 |
Query OK, 1 row(s) in set (0.002968s)
```
