# TimescaleDBへMQTTデータをストリーム送信する

[TimescaleDB](https://www.amazon.cn/en/TimescaleDB/)（Timescale）は、時系列データの保存と解析に特化したデータベースです。優れたデータスループットと信頼性の高いパフォーマンスにより、IoT（Internet of Things）分野に最適な選択肢であり、IoTアプリケーション向けに効率的かつスケーラブルなデータ保存と解析ソリューションを提供します。

本ページでは、TimescaleDBデータ統合の機能について詳しく紹介し、作成方法の実践的な手順を解説します。内容はTimescaleDBコネクターの作成、ルールの作成、ルールのテストを含みます。シミュレートした温度・湿度データをMQTTプロトコル経由でEMQXプラットフォームに報告し、設定したデータ統合を通じてTimescaleDBにデータを保存する方法を示します。

## 動作概要

TimescaleDBデータ統合は、EMQXに組み込まれた機能であり、EMQXのリアルタイムデータキャプチャと転送機能とTimescaleDBのデータ保存・解析機能を組み合わせています。組み込みのルールエンジンコンポーネントにより、EMQXからTimescaleDBへのデータ取り込みを簡素化し、複雑なコーディングを不要にします。

以下の図は、産業用IoTにおけるEMQXとTimescaleDBのデータ統合の典型的なアーキテクチャを示しています。

![EMQX Platform Integration TimescaleDB](./_assets/data_integration_timescaledb.png)

EMQXとTimescaleDBは、エネルギー消費データをリアルタイムで効率的に収集・解析するためのスケーラブルなIoTプラットフォームを提供します。このアーキテクチャでは、EMQXがデバイスの接続、メッセージ転送、データルーティングを担当するIoTプラットフォームとして機能し、TimescaleDBはデータ保存と解析プラットフォームとして役割を担います。

EMQXはルールエンジンとSinkを通じてデバイスデータをTimescaleDBに転送します。TimescaleDBはSQL文でデータを解析し、レポートやグラフなどの解析結果を生成し、TimescaleDBの可視化ツールを通じてユーザーに表示します。ワークフローは以下の通りです。

1. **メッセージのパブリッシュと受信**：産業用デバイスはMQTTプロトコルを介してEMQXに正常に接続し、定期的にエネルギー消費データをパブリッシュします。このデータには生産ライン識別子やエネルギー消費値が含まれます。EMQXがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
2. **ルールエンジンによるメッセージ処理**：組み込みのルールエンジンは、トピックマッチングに基づき特定の送信元からのメッセージを処理します。メッセージが到着すると、ルールエンジンが対応するルールと照合し、メッセージデータを処理します。これにはデータ形式の変換、特定情報のフィルタリング、コンテキスト情報の付加などが含まれます。
3. **TimescaleDBへのデータ取り込み**：ルールエンジンで定義されたルールがメッセージをTimescaleDBに書き込む操作をトリガーします。TimescaleDB SinkはSQLテンプレートを提供し、特定のメッセージフィールドをTimescaleDBの対応テーブルやカラムに柔軟に書き込むことが可能です。

エネルギー消費データがTimescaleDBに書き込まれた後は、SQL文を用いて柔軟にデータ解析が行えます。例えば：

- Grafanaなどの可視化ツールに接続し、グラフを生成してエネルギー消費データを表示する。
- ERPなどのアプリケーションシステムに接続し、生産分析や生産計画の調整を行う。
- 業務システムに接続し、リアルタイムのエネルギー使用分析を実施し、データ駆動型のエネルギー管理を支援する。

## 特長と利点

EMQXにおけるTimescaleDBデータ統合は、以下の特長と利点をビジネスにもたらします。

- **効率的なデータ処理**：EMQXは多数のIoTデバイス接続とメッセージスループットを効率的に処理可能です。TimescaleDBはデータ書き込み、保存、クエリに優れており、IoTシナリオのデータ処理要件をシステムに過度な負担をかけずに満たします。
- **メッセージ変換**：メッセージはEMQXのルール内で豊富な処理や変換を経てからTimescaleDBに書き込まれます。
- **効率的な保存とスケーラビリティ**：EMQXとTimescaleDBは共にクラスターのスケーリング機能を備え、ビジネスの成長に応じて柔軟に水平スケールが可能です。
- **高度なクエリ機能**：TimescaleDBはタイムスタンプデータの効率的なクエリ・解析のために最適化された関数、演算子、インデックス技術を提供し、IoT時系列データから正確な洞察を引き出せます。

## はじめる前に

本節では、EMQXプラットフォームでTimescaleDBデータ統合を作成するための準備作業を紹介します。

### 前提条件

- [ルール](./rules.md)の理解
- [データ統合](./introduction.md)の理解

### ネットワーク設定

<!--@include: ./network-setting.md-->

### Timescaleのインストールとデータテーブルの作成

EMQXプラットフォームは、自身で構築したTimescaleDBまたはクラウド上のTimescaleサービスとの統合をサポートします。Timescaleサービスをクラウドサービスとして利用するか、Dockerを使ってTimescaleDBインスタンスをデプロイできます。

#### クラウド上でTimescaleサービスインスタンスとテーブルを作成する

TimescaleDB Cloudを初めて利用する場合は、[ヘルプドキュメント](https://docs.timescale.com/)を参照してください。

1. Timescaleアカウントをお持ちでない場合は、[Timescaleアカウントの作成](https://docs.timescale.com/getting-started/latest/services/#create-your-timescale-account)を参考にアカウントを作成します。
2. Timescaleポータルにログインし、[Timescaleサービスの作成](https://docs.timescale.com/getting-started/latest/services/#create-your-first-service)を行います。サービスのパスワードを控えておきます。
3. サービス概要ページから接続情報を取得します。EMQXで必要な項目は**データベース名**、**ホスト**、**ポート**、**ユーザー名**です。
4. psqlクライアントでサービスに接続します。

    ```bash
    # サービスURLで接続
    psql "postgres://tsdbadmin@xxxxx.xxxxx.tsdb.cloud.timescale.com:xxxxx/tsdb?sslmode=require"
    ```

5. クライアントからのメッセージデータを保存するためのテーブル`temp_hum`を作成します。

    ```bash
    CREATE TABLE temp_hum (
    up_timestamp   TIMESTAMPTZ       NOT NULL,
    client_id      TEXT              NOT NULL,
    temp           DOUBLE PRECISION  NULL,
    hum            DOUBLE PRECISION  NULL
    );

    SELECT create_hypertable('temp_hum', 'up_timestamp');
    ```

    テストデータを挿入し、確認します。

    ```bash
    INSERT INTO temp_hum(up_timestamp, client_id, temp, hum) values (to_timestamp(1603963414), 'temp_hum-001', 19.1, 55);

    SELECT * from temp_hum;
    ```

    テーブル作成後、サービスの**Explorer**タブで`temp_hum`テーブルの情報を確認できます。

6. TimescaleインスタンスのMax_Connectionsの最適化

    Timescaleインスタンス詳細ページの`Operations` -> `Database Parameters` -> `Common Parameters`に移動し、最大接続数を100以上に増やします。

    ![EMQX Platform Integration TimescaleDB](./_assets/data_integration_timescaledb_max_conn.png)

#### DockerでTimescaleDBをインストールしテーブルを作成する

1. Docker環境がない場合は、[Dockerのインストール](https://docs.docker.com/get-docker/)を参照してください。

2. DockerでTimescaleDBコンテナを作成し、POSTGRES_PASSWORD環境変数でデータベースのパスワードを設定します。

    ```bash
    docker run -d --name timescaledb \
    -p 5432:5432 \
    -e POSTGRES_PASSWORD=public \
    timescale/timescaledb:latest-pg13
    ```

3. クライアントデータを保存するデータベースを作成します。

    ```bash
    docker exec -it timescaledb psql -U postgres

    ## tsdbデータベースを作成
    CREATE database tsdb;

    \c tsdb;
    ```

4. クライアントからのメッセージデータを保存するテーブル`temp_hum`を作成します。デバイスから報告される温度・湿度データを保存するためのテーブルです。

    ```bash
    CREATE TABLE temp_hum (
    up_timestamp   TIMESTAMPTZ       NOT NULL,
    client_id      TEXT              NOT NULL,
    temp           DOUBLE PRECISION  NULL,
    hum            DOUBLE PRECISION  NULL
    );
    
    SELECT create_hypertable('temp_hum', 'up_timestamp');
    ```

    テストデータを挿入し、確認します。

    ```bash
    INSERT INTO temp_hum(up_timestamp, client_id, temp, hum) values (to_timestamp(1603963414), 'temp_hum-001', 19.1, 55);
    
    SELECT * from temp_hum;
    ```

## TimescaleDBコネクターを作成する

データ統合ルールを作成する前に、TimescaleDBサーバーにアクセスするためのコネクターを作成します。

1. デプロイメントに移動し、左側ナビゲーションメニューから**データ統合**をクリックします。
2. 初めてコネクターを作成する場合は、**データ永続化**カテゴリの中から**TimescaleDB**を選択します。既にコネクターを作成済みの場合は、**新規コネクター**を選択し、**データ転送**カテゴリの中から**TimescaleDB**を選択します。
3. **新規コネクター**ページで以下のオプションを設定します。
   - **サーバーホスト**：TimescaleDBサービスのホストとポートを入力します。Docker利用時は`http://<サーバーアドレス>:5432`を使用します。
   - **データベース名**：TimescaleDBのデータベース名を入力します。
   - **ユーザー名とパスワード**：TimescaleDBのユーザー名とパスワードを入力します。Dockerでデプロイした場合のデフォルトは以下です。
     - データベース名：`tsdb`
     - ユーザー名：`postgres`
     - パスワード：`public`
   - **詳細設定**：接続プールサイズはTimescaleDB Cloudを利用する場合、1に設定します。
   - その他の設定はデフォルト値を使用するか、ビジネス要件に応じて調整してください。
4. **テスト**ボタンをクリックし、TimescaleDBサービスにアクセス可能か確認します。成功すると成功メッセージが表示されます。
5. **新規作成**ボタンをクリックして作成を完了します。

## ルールを作成する

次に、書き込むデータを指定するルールを作成し、処理済みデータをTimescaleDBに転送するアクションをルールに追加します。

1. ルールエリアで**新規ルール**をクリックするか、作成したコネクターの**アクション**列にある新規ルールアイコンをクリックします。

2. **SQLエディター**にルールのマッチングSQL文を入力します。以下のSQL例は、`temp_hum/emqx`トピックに送信されたメッセージから報告時刻`up_timestamp`、クライアントID、メッセージ本文（ペイロード）を読み取り、温度と湿度を抽出します。

   ```sql
    SELECT 
    timestamp div 1000 AS up_timestamp, 
    clientid AS client_id, 
    payload.temp AS temp, 
    payload.hum AS hum

    FROM
    "temp_hum/emqx"
   ```

3. **テスト有効化**を使ってデータ入力をシミュレートし、結果をテストできます。

4. **次へ**をクリックしてアクションを追加します。

5. **コネクター**のドロップダウンから先ほど作成したコネクターを選択します。

6. SQLテンプレートに以下のデータを入力し、SQLテンプレートに挿入します。

   ```sql
    INSERT INTO temp_hum(up_timestamp, client_id, temp, hum) VALUES (to_timestamp(${up_timestamp}), ${client_id}, ${temp}, ${hum})
   ```

    詳細設定（任意）については、[詳細設定](https://docs.emqx.com/en/enterprise/latest/data-integration/data-bridge-timescale.html#advanced-configurations)を参照してください。

7. **確定**ボタンをクリックしてルール作成を完了します。

8. **新規ルール成功**のポップアップで**ルールに戻る**をクリックし、データ統合設定の一連の流れを完了します。

## ルールをテストする

温度・湿度データの報告をシミュレートするには、[MQTTX](https://mqttx.app/)の使用を推奨しますが、他の任意のクライアントでも構いません。

1. MQTTXでデプロイメントに接続し、以下のトピックにメッセージを送信します。

   - トピック：`temp_hum/emqx`

   - ペイロード：

     ```json
     {
       "temp": 27.5,
       "hum": 41.8
     }
     ```

2. メッセージがTimescaleDBに転送されているか確認します。

    ターミナルで以下のコマンドを実行し、Timescaleデータベースに取り込まれたデータを確認します。

    ```bash
    SELECT * from temp_hum order by up_timestamp desc limit 10;
    ```

    TimescaleDBの`temp_hum`テーブルを確認し、新しいレコードが挿入されていることを検証してください。

3. コンソールで運用データを確認します。ルール一覧でルールIDをクリックすると、ルールの統計情報およびそのルールに属するすべてのアクションの統計情報が表示されます。
