# PostgreSQLへのMQTTデータ取り込み

[PostgreSQL](https://www.postgresql.org/)は、世界で最も高度なオープンソースのリレーショナルデータベースであり、シンプルなアプリケーションから複雑なデータ処理まで対応可能な強力なデータ処理能力を備えています。EMQXプラットフォームはPostgreSQLとの統合をサポートしており、IoTデバイスからのリアルタイムデータストリームを効率的に処理できます。この統合により、大規模なデータ保存、正確なクエリ、複雑なデータ関連分析が可能となり、データの整合性も確保されます。EMQXの効率的なメッセージルーティングとPostgreSQLの柔軟なデータモデルを活用することで、デバイスの状態監視、イベント追跡、操作監査が容易になり、ビジネスに深いデータ洞察と強力なビジネスインテリジェンスの支援を提供します。

本ページでは、EMQXプラットフォームとPostgreSQL間のデータ統合について、作成方法と検証手順を実践的に紹介します。

::: tip
本ページの内容はMatrixDBにも適用可能です。
:::

## 動作概要

PostgreSQLデータ統合は、EMQXプラットフォームに標準搭載された機能で、MQTTベースのIoTデータとPostgreSQLの強力なデータ保存機能を橋渡しします。組み込みの[ルールエンジン](./rules.md)コンポーネントにより、EMQXプラットフォームからPostgreSQLへのデータ取り込みと管理が簡素化され、複雑なコーディングを不要にします。

以下の図は、EMQXプラットフォームとPostgreSQL間の典型的なデータ統合アーキテクチャを示しています。

![EMQX Integration PostgreSQL](./_assets/data-integration-postgresql.png)

MQTTデータをPostgreSQLに取り込む流れは以下の通りです：

- **IoTデバイスがEMQXプラットフォームに接続**：IoTデバイスがMQTTプロトコルを通じて正常に接続されると、オンラインイベントがトリガーされます。このイベントにはデバイスID、送信元IPアドレスなどの情報が含まれます。
- **メッセージのパブリッシュと受信**：デバイスは特定のトピックにテレメトリや状態データをパブリッシュします。EMQXプラットフォームはこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
- **ルールエンジンによるメッセージ処理**：組み込みのルールエンジンは、特定のトピックに基づいてメッセージやイベントを処理します。ルールエンジンは対応するルールをマッチングし、データ形式の変換、特定情報のフィルタリング、メッセージへのコンテキスト情報の付加などを行います。
- **PostgreSQLへの書き込み**：ルールによりメッセージのPostgreSQLへの書き込みがトリガーされます。SQLテンプレートを活用し、ルール処理結果からデータを抽出してSQLを構築し、PostgreSQLに送信して実行します。これにより、メッセージの特定フィールドをデータベースの対応するテーブルやカラムに書き込んだり更新したりできます。

イベントやメッセージデータがPostgreSQLに書き込まれた後は、PostgreSQLに接続してデータを読み取り、柔軟なアプリケーション開発が可能です。例えば：

- Grafanaなどの可視化ツールに接続し、データに基づくグラフを生成してデータ変化を表示する。
- デバイス管理システムに接続し、デバイス一覧や状態を確認、異常なデバイス挙動を検知して潜在的な問題を速やかに解消する。

## 特長とメリット

PostgreSQLは豊富な機能を持つ人気のオープンソースリレーショナルデータベースです。PostgreSQLとのデータ統合により、以下の特長とメリットをビジネスにもたらします：

- **柔軟なイベント処理**：EMQXルールエンジンを通じて、PostgreSQLはデバイスのライフサイクルイベントを処理可能で、IoTアプリケーションに必要な各種管理・監視タスクの開発を大幅に促進します。イベントデータを分析することで、デバイスの故障や異常挙動、トレンド変化を迅速に検知し、適切な対応を取ることができます。
- **メッセージ変換**：EMQXプラットフォームのルールにより、メッセージはPostgreSQLに書き込まれる前に多様な処理や変換を受け、保存や利用がより便利になります。
- **柔軟なデータ操作**：PostgreSQLデータブリッジが提供するSQLテンプレートを活用し、特定フィールドのデータをPostgreSQLデータベースの対応テーブル・カラムに簡単に書き込みや更新ができ、柔軟なデータ保存と管理を実現します。
- **ビジネスプロセスの統合**：PostgreSQLデータブリッジにより、デバイスデータをPostgreSQLの豊富なエコシステムアプリケーションと連携可能です。ERP、CRM、その他カスタムビジネスシステムとの統合を促進し、高度なビジネスプロセスや自動化を実現します。
- **IoTとGIS技術の融合**：PostgreSQLはGISデータの保存・クエリ機能を備え、地理空間インデックス、ジオフェンシングやアラート、リアルタイム位置追跡、地理情報処理などをサポートします。EMQXの信頼性の高いメッセージ伝送能力と組み合わせることで、車両などのモバイルデバイスからの地理位置情報を効率的に処理・分析し、リアルタイム監視、インテリジェントな意思決定、ビジネス最適化を実現します。
- **ランタイムメトリクス**：各ルールのランタイムメトリクス（総メッセージ数、成功／失敗数、現在のレートなど）を閲覧可能です。

柔軟なイベント処理、多彩なメッセージ変換、柔軟なデータ操作、リアルタイムの監視・分析機能を通じて、効率的で信頼性が高くスケーラブルなIoTアプリケーションを構築し、ビジネスの意思決定や最適化に貢献します。

## はじめる前に

本節では、EMQXプラットフォームでPostgreSQLデータ統合を作成するための準備作業を紹介します。

### 前提条件

- [データ統合](./introduction.md)に関する知識
- EMQXデータ統合の[ルール](./rules.md)に関する知識

### ネットワーク設定

<!--@include: ./network-setting.md-->

### PostgreSQLのインストール

1. DockerでPostgreSQLをインストールし、dockerイメージを起動します。

```bash
# PostgreSQLのdockerイメージを起動し、パスワードをpublicに設定
docker run --name PostgreSQL -p 5432:5432 -e POSTGRES_PASSWORD=public -d postgres

# コンテナにアクセス
docker exec -it PostgreSQL bash

# コンテナ内でPostgreSQLサーバーに接続し、設定したパスワードを入力
psql -U postgres -W

# データベースを作成し、選択する

CREATE DATABASE emqx_data;

\c emqx_data;
```

2. テーブルを作成します。以下のSQLコマンドで`temp_hum`テーブルを作成します。このテーブルはデバイスから報告される温度・湿度データの保存に使用します。

```sql
CREATE TABLE temp_hum (
  up_timestamp   TIMESTAMPTZ       NOT NULL,
  client_id      TEXT              NOT NULL,
  temp           DOUBLE PRECISION  NULL,
  hum            DOUBLE PRECISION  NULL
);
```

3. テストデータを挿入し、確認します。

```sql
INSERT INTO temp_hum(up_timestamp, client_id, temp, hum)
VALUES (to_timestamp(1603963414), 'temp_hum-001', 19.1, 55);
```

```bash
emqx_data=# SELECT * FROM temp_hum;
      up_timestamp      |  client_id   | temp | hum
------------------------+--------------+------+-----
 2020-10-29 09:23:34+00 | temp_hum-001 | 19.1 |  55
(1 row)
```

## コネクターの作成

データ統合ルールを作成する前に、PostgreSQLサーバーにアクセスするためのPostgreSQLコネクターを作成する必要があります。

1. デプロイメントにアクセスし、左側ナビゲーションメニューから**データ統合**をクリックします。

2. 初めてコネクターを作成する場合は、**データ永続化**カテゴリの中から**PostgreSQL**を選択します。すでにコネクターを作成済みの場合は、**新規コネクター**を選択し、続いて**データ永続化**カテゴリの**PostgreSQL**を選択します。

3. **コネクター名**：システムが自動的にコネクター名を生成します。

4. 接続情報を入力します：

   - **サーバーホスト**：サーバーのIPアドレスとポート番号
   - **データベース名**：`emqx_data`を入力
   - **ユーザー名**：`postgres`を入力
   - **パスワード**：`public`を入力

5. 暗号化接続を確立したい場合は、**TLSを有効にする**のトグルスイッチをクリックします。

6. 詳細設定（任意）

7. **テスト**ボタンをクリックし、PostgreSQLサービスにアクセス可能であれば成功メッセージが表示されます。

8. **新規作成**ボタンをクリックして作成を完了します。

## ルールの作成

次に、書き込むデータを指定し、処理済みデータをPostgreSQLに転送するためのアクションをルールに追加します。

1. ルールエリアで**新規ルール**をクリックするか、作成したコネクターの**アクション**列にある新規ルールアイコンをクリックします。

2. **SQLエディター**にルールのマッチングSQL文を入力します。以下のルールでは、メッセージが報告された時刻`up_timestamp`、クライアントID、`temp_hum/emqx`トピックからのメッセージ本文（ペイロード）内の温度と湿度をそれぞれ読み取ります。

   ```sql
    SELECT
      timestamp div 1000 as up_timestamp,
      clientid as client_id,
      payload.temp as temp,
      payload.hum as hum
    FROM
      "temp_hum/emqx"
   ```

   ::: tip

   初心者の方は、**SQL例**をクリックし、**テストを有効にする**を使ってSQLルールを学習・検証できます。

   :::

3. **次へ**をクリックしてアクションを追加します。

4. **コネクター**のドロップダウンから先ほど作成したコネクターを選択します。

5. 利用する機能に基づいて**SQLテンプレート**を設定します。注意：これは事前処理されたSQLなので、フィールドは引用符で囲まず、文末にセミコロンを付けないでください。

   ```sql
    INSERT INTO temp_hum(up_timestamp, client_id, temp, hum)
    VALUES (
      to_timestamp(${up_timestamp}),
      ${client_id},
      ${temp},
      ${hum}
    )
   ```

6. 詳細設定（任意）

7. **確認**ボタンをクリックしてルール作成を完了します。

8. **ルール作成成功**のポップアップで**ルールに戻る**をクリックし、データ統合の設定チェーンを完了します。

## ルールのテスト

温度・湿度データの報告をシミュレーションするために、[MQTTX](https://mqttx.app/)の使用を推奨しますが、他のクライアントでも構いません。

1. MQTTXでデプロイメントに接続し、以下のトピックにメッセージを送信します。

   - トピック：`temp_hum/emqx`

   - ペイロード：

     ```json
     {
       "temp": "27.5",
       "hum": "41.8"
     }
     ```

2. データダンプ結果を確認します。

```bash
emqx_data=# SELECT * from temp_hum ORDER BY up_timestamp DESC LIMIT 10;
      up_timestamp      |  client_id   | temp | hum
------------------------+--------------+------+------
 2024-03-20 09:39:17+00 | test_client  | 27.5 | 41.8
 2020-10-29 09:23:34+00 | temp_hum-001 | 19.1 |   55
(2 rows)
```

3. コンソールで運用データを確認します。ルール一覧のルールIDをクリックすると、ルールの統計情報およびそのルールに属するすべてのアクションの統計情報を閲覧できます。
