# Apache IoTDBへのMQTTデータ取り込み

[Apache IoTDB](https://iotdb.apache.org/) は、多種多様なIoTデバイスやシステムから生成される膨大な時系列データを処理するために設計された高性能かつスケーラブルな時系列データベースです。  
EMQXはApache IoTDBとのデータ連携をサポートしており、軽量なMQTTプロトコルを介してデータをApache IoTDBの[REST API V2](https://iotdb.apache.org/UserGuide/latest/API/RestServiceV2.html)にシームレスに転送できます。  
このデータ連携は一方向のデータフローを保証し、EMQXの卓越したリアルタイムデータ取り込み能力とIoTDBの時系列データ保存およびクエリ性能を活用して、MQTTメッセージをIoTDBデータベースに書き込みます。  
この強力な組み合わせは、IoTデータを効果的に管理したい企業にとって堅実な基盤となります。

本ページでは、EMQXとApache IoTDB間のデータ連携について、作成方法や検証方法を含めて包括的に紹介します。

## 動作概要

Apache IoTDBデータ連携は、EMQXに標準搭載された機能であり、生のMQTTベースの時系列データとIoTDBの強力なデータ保存機能を橋渡しします。  
組み込みの[ルールエンジン](./rules.md)コンポーネントにより、EMQXからIoTDBへのデータ取り込みを簡素化し、複雑なコーディングを不要にします。

以下の図は、EMQXとIoTDB間のデータ連携の典型的なアーキテクチャを示しています。<!-- この画像はIoTDBに特化したものに修正が必要です -->

<img src="./assets/IoTDB_bridge_architecture.png" alt="IoTDB_bridge_architecture" style="zoom:67%;" />

データ連携のワークフローは以下の通りです：

1. **メッセージのパブリッシュと受信**：接続された車両、IIoTシステム、エネルギー管理プラットフォームなどのデバイスは、MQTTプロトコルを通じてEMQXに正常に接続し、運用状態や計測値、トリガーされたイベントに基づいてMQTTメッセージを送信します。EMQXがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理が開始されます。  
2. **メッセージデータ処理**：メッセージが到着するとルールエンジンを通過し、EMQXで定義されたルールにより処理されます。ルールは事前定義された条件に基づき、どのメッセージをIoTDBにルーティングするかを決定します。ペイロード変換が指定されている場合は、データ形式の変換、特定情報のフィルタリング、追加コンテキストによるペイロードの強化などが適用されます。  
3. **データバッファリング**：IoTDBが利用できない場合のデータ損失を防ぐため、EMQXはインメモリのメッセージバッファを提供します。データは一時的にバッファに保持され、メモリ過負荷を防ぐためにディスクにオフロードされることもあります。ただし、データ連携やEMQXノードの再起動時にはデータは保持されません。  
4. **IoTDBへのデータ取り込み**：ルールエンジンがIoTDB保存対象のメッセージを特定すると、メッセージをIoTDBへ転送するアクションがトリガーされます。処理済みデータは時系列データとしてIoTDBにシームレスに書き込まれます。  
5. **データ保存と活用**：データがIoTDBに保存された後、企業はそのクエリ機能を活用して様々なユースケースに対応できます。例えば、コネクテッドビークル分野では、車両の健康状態の把握、リアルタイムメトリクスに基づくルート最適化、資産追跡などに利用されます。IIoT環境では、機械の状態監視、メンテナンス予測、生産スケジュールの最適化などに活用可能です。

## 特長とメリット

IoTDBとのデータ連携は、効果的なデータ処理と保存を実現するための多彩な特長とメリットを提供します：

- **効率的なデータ収集**

  EMQXとIoTDBを連携させることで、リソースが限られたIoTデバイスから軽量なMQTTメッセージングプロトコルを通じてIoT時系列データを効率的に収集し、データベースに取り込むことができ、信頼性と効率性の高いデータ収集を実現します。

- **柔軟なデータ変換**

  EMQXは強力なSQLベースのルールエンジンを提供し、IoTDBに保存する前にデータを前処理できます。フィルタリング、ルーティング、集約、エンリッチメントなど多様なデータ変換機能をサポートし、組織のニーズに応じてデータを整形可能です。

- **スケーラビリティと高スループット**

  EMQXは水平スケーラビリティを考慮して設計されており、増加するIoTデバイス群からのメッセージトラフィックを容易に処理します。拡大するデータ量に柔軟に対応し、高い同時アクセスをサポートします。その結果、IoT時系列ワークロードは、IoT展開が未曾有の規模に拡大しても、データ取り込み、保存、処理の増大する要求を難なく管理できます。

- **最適化された時系列ストレージ**

  IoTDBはタイムスタンプ付きデータの最適化された保存を提供します。時間パーティショニング、圧縮、データ保持ポリシーを活用し、大量の時系列データを効率的に保存・管理します。これにより、ストレージのフットプリントを最小限に抑えつつ高性能を維持し、大量の時系列データを生成するIoTワークロードに不可欠な性能を実現します。

- **高速かつ複雑なクエリ処理**

  IoTDBは豊富なクエリセマンティクスを持ち、デバイスやセンサー間の時系列データの時間整合、時系列フィールドの計算（周波数領域変換）、時間軸における豊富な集約関数をサポートします。さらにApache Hadoop、Spark、Flinkとの深い統合により、より強力な分析機能を提供します。EMQXはIoTDBとシームレスに統合し、MQTTデータの保存と分析の統一ソリューションを実現します。

## はじめる前に

このセクションでは、EMQXダッシュボードでApache IoTDBデータ連携を作成する前に必要な準備について説明します。

### 前提条件

- EMQXのデータ連携[ルール](./rules.md)に関する知識  
- [データ連携](./data-bridges.md)に関する知識

### Apache IoTDBサーバーの起動

ここでは、[Docker](https://www.docker.com/)を使ってApache IoTDBサーバーを起動する方法を紹介します。IoTDBの設定で`enable_rest_service=true`が有効になっていることを確認してください。

以下のコマンドを実行して、RESTインターフェースを有効にしたApache IoTDBサーバーを起動します：

```bash
docker run -d --name iotdb-service \
              --hostname iotdb-service \
              -p 6667:6667 \
              -p 18080:18080 \
              -e enable_rest_service=true \
              -e cn_internal_address=iotdb-service \
              -e cn_target_config_node_list=iotdb-service:10710 \
              -e cn_internal_port=10710 \
              -e cn_consensus_port=10720 \
              -e dn_rpc_address=iotdb-service \
              -e dn_internal_address=iotdb-service \
              -e dn_target_config_node_list=iotdb-service:10710 \
              -e dn_mpp_data_exchange_port=10740 \
              -e dn_schema_region_consensus_port=10750 \
              -e dn_data_region_consensus_port=10760 \
              -e dn_rpc_port=6667 \
              apache/iotdb:1.1.0-standalone
```

Docker Hubの[IoTDBのDocker実行に関する情報](https://hub.docker.com/r/apache/iotdb)も参照してください。

## コネクターの作成

Apache IoTDBデータ連携を作成するには、Apache IoTDB SinkとApache IoTDBサーバーを接続するコネクターを作成する必要があります。

EMQXはREST APIまたはThriftプロトコルを介したIoTDBとの通信をサポートしています。

1. EMQXダッシュボードにアクセスし、**Integrations** -> **Connectors**をクリックします。  
2. ページ右上の**Create**をクリックします。  
3. **Create Connector**ページで、コネクタータイプとして**Apache IoTDB**を選択し、**Next**をクリックします。  
4. コネクターの名前と説明を入力します。名前は大文字・小文字の英数字の組み合わせで構いません。例：`my_iotdb`。  
5. **Driver**ドロップダウンからドライバーを選択します。  
   - `REST API`を選択した場合、**IoTDB REST Service Base URL**に`http://localhost:18080`を入力します。  
   - Thriftプロトコルを使用する場合は`Thrift Protocol`を選択し、**Server Host**にIoTDB Thriftサーバーのアドレスを入力します。  
6. コネクターがApache IoTDBサーバーにアクセスするためのユーザー名とパスワードを入力します。  
7. **IoTDB Version**ドロップダウンから接続したいIoTDBのバージョンを選択します。  
8. 他のオプションはデフォルトのままで構いません。**Advanced Settings**（任意）の設定は[高度な設定](#advanced-configurations)を参照してください。  
9. **Create**をクリックする前に、**Test Connectivity**をクリックしてコネクターがApache IoTDBに接続できるかテストできます。  
10. **Create**をクリックしてコネクター作成を完了します。ポップアップで**Back to Connector List**をクリックするか、**Create Rule**をクリックしてルールとSinkの作成に進むことができます。詳細は[ルールとApache IoTDB Sinkの作成](#create-a-rule-and-apache-iotdb-sink)を参照してください。

## Apache IoTDB Sinkを用いたルールの作成

このセクションでは、EMQXでルールを作成し、ソースMQTTトピック`root/#`からのメッセージを処理し、処理結果を設定済みのApache IoTDB Sinkを通じてApache IoTDBに時系列データとして保存する方法を示します。

1. EMQXダッシュボードで、**Integration -> Rules**をクリックします。  
2. ページ右上の**Create**をクリックします。  
3. ルールIDを入力します。例：`my_rule`。  
4. SQLエディタに以下のステートメントを入力します。これはトピックパターン`root/#`にマッチするMQTTメッセージを転送します：

   ```sql
   SELECT
     *
   FROM
     "root/#"
   ```

   ::: tip

   初心者の方は、**SQL Examples**や**Enable Test**をクリックしてSQLルールの学習やテストが可能です。

   :::

5. **Add Action**ボタンをクリックし、ルールでトリガーされるアクションを定義します。**Type of Action**ドロップダウンから`Apache IoTDB`を選択します。これにより、EMQXはルールで処理したデータをApache IoTDBに送信します。  
6. **Action**ドロップダウンは`Create Action`のままにするか、既存のApache IoTDB Sinkを選択できます。ここでは新しいSinkを作成し、ルールに追加します。  
7. Sinkの名前と説明を入力します。  
8. **Connector**ドロップダウンから先ほど作成した`my_iotdb`コネクターを選択します。隣のボタンから新規コネクター作成も可能です。設定パラメータは[コネクターの作成](#create-connector)を参照してください。  
9. Sinkの設定情報を以下のように構成します：

    * **Device ID**（任意）：IoTDBインスタンスに時系列データを転送・挿入する際のデバイス名として使用する特定のデバイスIDを入力します。

      :::tip

      空欄の場合、デバイスIDはパブリッシュされたメッセージ内やルール内で指定可能です。例えば、JSONエンコードされたメッセージに`device_id`フィールドがあれば、その値が出力デバイスIDとなります。ルールエンジンでこの情報を抽出するには、以下のようなSQLを使えます：

      ```sql
      SELECT
       payload,
       `my_device` as payload.device_id
      ```

      ただし、このフィールドで固定したデバイスIDが優先されます。

      :::

    - **Align Timeseries**：デフォルトは無効です。有効にすると、グループ化されたアラインド時系列のタイムスタンプ列がIoTDB内で一度だけ保存され、グループ内の各時系列で重複保存されません。詳細は[Aligned timeseries](https://iotdb.apache.org/UserGuide/V1.1.x/Data-Concept/Data-Model-and-Terminology.html#aligned-timeseries)を参照してください。

10. **Write Data**を設定し、MQTTメッセージからIoTDBデータを生成する方法を指定します。

    **Write Data**セクションでテンプレートを定義でき、必要な項目を複数含めて各行に必要なコンテキスト情報を記述します。このテンプレートを用いてMQTTメッセージに適用し、IoTDBデータを生成します。テンプレートはCSVファイルによる一括設定もサポートします。詳細は[一括設定](#batch-setting)を参照してください。

    例として以下のテンプレートを考えます：

       | Timestamp | Measurement | Data Type | Value    |
       | --------- | ----------- | --------- | -------- |
       |           | index       | INT32     | ${index} |
       |           | temperature | FLOAT     | ${temp}  |

    :::tip

    `Timestamp`と`Value`はプレースホルダー構文をサポートし、変数で埋められます。

    `Timestamp`が省略された場合は、現在のシステム時刻（ミリ秒単位）で自動的に埋められます。

    :::

    例えば、MQTTメッセージは以下のように構成できます：

      ```json
      {
        "index": "42",
        "temp": "32.67"
      }
      ```

11. **フォールバックアクション（任意）**：メッセージ配信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義できます。これらはプライマリSinkがメッセージ処理に失敗した際にトリガーされます。詳細は[フォールバックアクション](./data-bridges.md#fallback-actions)を参照してください。

12. **高度な設定（任意）**：[高度な設定](#advanced-configurations)を参照してください。

13. **Create**をクリックする前に、**Test Connectivity**をクリックしてSinkがApache IoTDBサーバーに接続できるかテストできます。

14. **Create**をクリックしてSink作成を完了します。**Create Rule**ページに戻ると、**Action Outputs**タブに新しいSinkが表示されます。

15. **Create Rule**ページで設定内容を確認し、**Create**ボタンをクリックしてルールを生成します。

これでルールが正常に作成され、**Rule**ページに新しいルールが表示されます。**Actions(Sink)**タブをクリックすると、新しいApache IoTDB Sinkが確認できます。

**Integration** -> **Flow Designer**をクリックするとトポロジーが表示され、トピック`root/#`のメッセージがルール`my_rule`で解析されてApache IoTDBに転送されていることがわかります。

### 一括設定

Apache IoTDBでは、ダッシュボード上で数百件のデータを同時に書き込む設定は困難です。これに対応するため、EMQXはデータ書き込みの一括設定機能を提供しています。

**Write Data**の設定時に、一括設定機能を使ってCSVファイルから挿入操作のフィールドをインポートできます。

1. **Write Data**テーブルの**Batch Setting**ボタンをクリックし、**Import Batch Setting**ポップアップを開きます。

2. 指示に従い、一括設定テンプレートファイルをダウンロードし、テンプレートにデータ書き込み設定を記入します。デフォルトのテンプレート内容は以下の通りです：

   | Timestamp | Measurement | Data Type | Value             | 備考（任意）                                               |
   | --------- | ----------- | --------- | ----------------- | ---------------------------------------------------------- |
   | now       | temp        | float     | ${payload.temp}   | フィールド、値、データ型は必須。データ型はboolean, int32, int64, float, double, textが利用可能 |
   | now       | hum         | float     | ${payload.hum}    |                                                            |
   | now       | status      | boolean   | ${payload.status} |                                                            |
   | now       | clientid    | test      | ${clientid}       |                                                            |

   - **Timestamp**：`${var}`形式のプレースホルダーをサポートし、タイムスタンプ形式が必要です。以下の特殊文字も使用可能です：  
     - now：現在のミリ秒タイムスタンプ  
     - now_ms：現在のミリ秒タイムスタンプ  
     - now_us：現在のマイクロ秒タイムスタンプ  
     - now_ns：現在のナノ秒タイムスタンプ  
   - **Measurement**：フィールド名  
   - **Data Type**：データ型（boolean, int32, int64, float, double, text）  
   - **Value**：書き込むデータ値。定数または`${var}`形式のプレースホルダーをサポートし、データ型と一致する必要があります。  
   - **備考**：CSVファイル内のメモ用で、EMQXにはインポートされません。  

   1MB以下、かつ2000行以内のCSVファイルのみサポートされます。

3. 記入済みテンプレートファイルを保存し、**Import Batch Setting**ポップアップにアップロードして**Import**をクリックし、一括設定を完了します。

4. インポート後、**Write Data**テーブル内のデータをさらに調整可能です。

## ルールのテスト

EMQXダッシュボード内蔵のWebSocketクライアントを使って、Apache IoTDB Sinkとルールの動作をテストできます。

1. ダッシュボード左メニューの**Diagnose** -> **WebSocket Client**をクリックします。  
2. 現在のEMQXインスタンスの接続情報を入力します。  
   - ローカルでEMQXを実行している場合はデフォルト値を使用可能です。  
   - 認証設定などを変更している場合は、ユーザー名とパスワードの入力が必要です。  
3. **Connect**をクリックしてクライアントをEMQXインスタンスに接続します。  
4. ページ下部のパブリッシュエリアにスクロールし、メッセージ内にデバイスIDを指定して以下を入力します：

   - **Topic**：`root/sg27`

     :::tip

     トピックが`root`で始まらない場合、自動的に`root`がプレフィックスされます。例えば、`test/sg27`にメッセージをパブリッシュすると、デバイス名は`root.test.sg27`になります。ルールとトピックの設定が正しく行われ、該当トピックのメッセージがSinkに転送されるようにしてください。

     :::

   - **Payload**：

     ```json
      {
       "value": "37.6",
       "device_id": "root.sg27"
      }
     ```

     ::: tip

     `Write Data`テンプレートは以下の通りです：  
     ```now, "temp", float, "${payload.value}"```

     :::

   - **QoS**：`2`

5. **Publish**をクリックしてメッセージを送信します。

   Sinkとルールが正常に作成されていれば、メッセージは指定されたApache IoTDBサーバーの時系列テーブルにパブリッシュされているはずです。

6. IoTDBのコマンドラインインターフェースでメッセージを確認します。上記のDocker環境を利用している場合、以下のコマンドでサーバーに接続可能です：

   ```shell
       $ docker exec -ti iotdb-service /iotdb/sbin/start-cli.sh -h iotdb-service
   ```

7. コンソールで以下を入力します：

   ```sql
   IoTDB> select * from root.sg27
   ```

   以下のようにデータが表示されるはずです：

   ```
   +------------------------+--------------+
   |                    Time|root.sg27.temp|
   +------------------------+--------------+
   |2023-05-05T14:26:44.743Z|          37.6|
   +------------------------+--------------+
   ```

## 高度な設定

このセクションでは、コネクターのパフォーマンスを最適化し、特定のシナリオに応じて動作をカスタマイズするための高度な設定オプションを説明します。コネクター作成時に**Advanced Settings**を展開し、ビジネスニーズに応じて以下の設定を行えます。

| 項目                     | 説明                                                         | 推奨値             |
| ------------------------ | ------------------------------------------------------------ | ------------------ |
| HTTP Pipelining          | サーバーに対して応答を待たずに連続して送信可能なHTTPリクエスト数を指定します。正の整数値で最大リクエスト数を表します。<br />`1`の場合は従来のリクエスト-レスポンスモデルで、各リクエスト送信後に応答を待ちます。値を大きくすると複数リクエストをバッチ送信し、往復時間を削減しネットワークリソースを効率的に利用します。 | `100`              |
| Pool Type                | EMQXとApache IoTDB間のコネクション管理・分配戦略を指定します。<br />`random`は利用可能なコネクションプールからランダムに接続を選択し、シンプルで均等な分配を実現します。<br />`hash`はハッシュアルゴリズムを用いてリクエストを一貫して特定のコネクションにマッピングし、クライアントIDやトピック名に基づくロードバランシングなど決定的な分配が必要な場合に適します。<br />**注意**：適切なプールタイプはユースケースと分配特性に依存します。 | `random`           |
| Connection Pool Size     | Apache IoTDBサービスとの接続プールで維持可能な同時接続数を指定します。システムのスケーラビリティとパフォーマンス管理に役立ちます。<br />**注意**：適切な接続プールサイズはシステムリソース、ネットワークレイテンシ、アプリケーションのワークロードに依存します。大きすぎるとリソース枯渇、小さすぎるとスループット制限の可能性があります。 | `8`                |
| Connect Timeout          | EMQXがApache IoTDB HTTPサーバーへの接続確立を試みる際の最大待機時間（秒）を指定します。<br />**注意**：適切なタイムアウト設定はシステム性能とリソース利用のバランスに重要です。様々なネットワーク条件でテストし最適値を見つけることを推奨します。 | `15`               |
| HTTP Request Max Retries | EMQXとApache IoTDB間の通信でHTTPリクエストが失敗した場合に再試行する最大回数を指定します。 | `2`                |
| Start Timeout            | コネクターが自動起動したリソースの正常状態到達を待つ最大時間（秒）を指定します。これにより、Apache IoTDBのデータベースインスタンスなど接続リソースが完全に稼働しデータ処理準備が整うまで操作を進めないようにします。 | `5`                |
| Buffer Pool Size         | EMQXとApache IoTDB間のイグレス型ブリッジでデータフロー管理に割り当てるバッファワーカープロセス数を指定します。これらのワーカーはデータ送信前の一時保存・処理を担当し、パフォーマンス最適化とスムーズなデータ伝送に寄与します。イングレス（インバウンド）専用のブリッジでは無効（`0`）に設定可能です。 | `18`               |
| Request TTL              | バッファに入ったリクエストが有効とみなされる最大期間（秒）を指定します。バッファ投入時からカウントを開始し、この期間を超えるか、送信後にApache IoTDBからの応答やアックが得られない場合、リクエストは期限切れと判断されます。 | `45`               |
| Health Check Interval    | コネクターがApache IoTDB接続の自動ヘルスチェックを行う間隔（秒）を指定します。 | `15`               |
| Max Buffer Queue Size    | Apache IoTDBデータ連携における各バッファワーカーがバッファリング可能な最大バイト数を指定します。バッファワーカーはデータ送信前の一時保存を担い、システム性能やデータ転送要件に応じて調整してください。 | `265`              |
| Query Mode               | メッセージ送信要件に応じて`asynchronous`または`synchronous`のクエリモードを選択可能です。非同期モードではIoTDBへの書き込みがMQTTメッセージのパブリッシュ処理をブロックしません。ただし、クライアントがIoTDB到着前にメッセージを受信する可能性があります。 | `Async`            |
| Inflight Window          | 「インフライトクエリ」とは、開始されたが応答やアックをまだ受け取っていないクエリを指します。コネクターがApache IoTDBと通信する際に同時に存在可能なインフライトクエリの最大数を制御します。<br />`query_mode`が`async`の場合、このパラメータは特に重要です。同一MQTTクライアントからのメッセージを厳密に順序処理したい場合は、値を1に設定してください。 | `100`              |

## さらに詳しく

EMQXはApache IoTDBとのデータ連携に関する豊富な学習リソースを提供しています。以下のリンクから詳細を学べます：

**ブログ：**

[Time-Series Database (TSDB) for IoT: The Missing Piece](https://www.emqx.com/en/blog/time-series-database-for-iot-the-missing-piece)
