# EMQX TablesへのMQTTデータ取り込み

EMQX Tablesは、EMQX Cloudに組み込まれたネイティブでフルマネージドの時系列データストレージサービスです。MQTTデータの高スループットかつ低レイテンシな取り込みと分析に最適化されており、IoTユースケースに理想的です。

[GreptimeDB](https://greptime.com/)を基盤としており、EMQX ブローカーとシームレスに統合され、[InfluxDB Line Protocol](https://docs.influxdata.com/influxdb/v2.5/reference/syntax/line-protocol/)をサポートすることで、テレメトリデータの効率的な保存、クエリ、可視化を可能にします。

詳細は[EMQX Tables概要](../emqx_tables/emqx_tables_overview.md)をご覧ください。

本ガイドでは、以下の手順でMQTTデータをEMQX Tablesに取り込む方法を説明します。

- コネクターの作成
- Line Protocolライターを用いたルールの作成
- データ取り込みのテストと結果のクエリ

## 前提条件

以下のデプロイメントを作成済みであること：

- EMQX ブローカーのデプロイメント：[サーバレス](../create/serverless.md)、[専用]または[専用Flex](../create/dedicated.md)
- [EMQX Tablesのデプロイメント](../emqx_tables/emqx_tables_create_deployment.md)

以下に習熟していること：

- [EMQXルール](./rules.md)
- [EMQXにおけるデータ統合](./introduction.md)
- [InfluxDB Line Protocol](https://docs.influxdata.com/influxdb/v2.5/reference/syntax/line-protocol/)

## ブローカーとTables間の接続性

接続方法はブローカーのデプロイメントタイプとネットワーク構成によって異なります。

| デプロイメントタイプ | 条件 | 接続方法 | 必要な対応 |
|---|---|---|---|
| 専用 / 専用Flex | Tablesと同じクラウドプラットフォーム、リージョン、ネットワーク | プライベート（セキュアかつ低レイテンシ） | [プロジェクトレベルネットワーク管理](https://docs.emqx.com/en/cloud/latest/deployments/network_management.html#project-level-network-management)でネットワーク共有を設定 |
| 専用 / 専用Flex | Tablesと異なるリージョンまたはネットワーク | TLS経由のパブリックインターネット | ブローカーの**ネットワーク管理**設定で**NATゲートウェイ**を有効化 |
| サーバレス | — | TLS経由のパブリックインターネット | 対応不要。ネットワーク関連設定やNATゲートウェイは不要 |

## EMQX Tablesコネクターの作成

データを書き込む前に、EMQX Tablesへのコネクターを作成します。

1. EMQX ブローカーのデプロイメントにアクセスし、左メニューから**データ統合**をクリックします。

2. 初めてコネクターを作成する場合は**EMQX Tables**を探します。既にコネクターを作成済みの場合は、**+ 新規コネクター**をクリックし、**EMQX Tables**を選択します。

3. **新規コネクター**ページで、**コネクター名**は自動生成されます。以下の2つのセットアップモードから選択します。

   :::: tabs type:card

   ::: tab クイックセットアップ（推奨）

   クイックセットアップを使うと接続情報が自動入力されます。ドロップダウンには同一プロジェクト内のすべてのEMQX Tablesデプロイメントが表示されます。

   1. **クイックセットアップ**を選択（デフォルト）。
   2. **EMQX Tablesデプロイメントの選択**：ドロップダウンからTablesデプロイメントを選択します。選択したデプロイメントがブローカーと異なるネットワークまたはリージョンにある場合、ブローカー側で**NATゲートウェイ**を有効化する必要がある旨の通知が表示されます。
   3. **データベース名**：デフォルトの`public`データベースを使用するか、カスタムデータベース名を指定します。指定する場合は、EMQX Tablesデプロイメント内に既に作成されている必要があります。詳細は[カスタムデータベースの作成](../emqx_tables/emqx_tables_quick_start.md#create-a-custom-database)を参照してください。

   ![connector_quick_setup](./_assets/connector_quick_setup.png)

   :::

   ::: tab カスタムセットアップ

   EMQX Tablesデプロイメントが異なるプロジェクトにある場合や、接続設定を手動で行う必要がある場合は**カスタムセットアップ**を使用します。

   > 専用および専用Flexデプロイメントの場合、ブローカーの**ネットワーク管理**でパブリックアクセス用に**NATゲートウェイ**を有効化する必要があります。

   EMQX Tablesデプロイメントの**デプロイメント概要**ページの**接続情報**欄から情報をコピーして入力してください。

   | 項目             | 説明                                                  | 例値                                                |
   | ----------------- | ----------------------------------------------------- | --------------------------------------------------- |
   | **パブリックホスト**   | ホストアドレスとポート。パブリックインターネット接続の場合、TLSを有効にした状態でポート`5001`（gRPCS）を使用することを推奨。TLS無効時はポート`4001`（gRPC）も利用可能。 | `your-emqx-tables-public-host:5001`                 |
   | **データベース名** | 対象データベース名                                    | `public`またはカスタムデータベース名（既に作成済みであること。詳細は[カスタムデータベースの作成](../emqx_tables/emqx_tables_quick_start.md#create-a-custom-database)） |
   | **ユーザー名**      | EMQX Tablesデプロイメントのユーザー名                | 自動生成                                             |
   | **パスワード**      | EMQX Tablesデプロイメントのパスワード                | 自動生成                                             |

   ![emqx_tables_connector_custom_setup](./_assets/emqx_tables_connector_custom_setup.png)

   :::

   ::::

4. **テスト**をクリックして接続を検証します。EMQX Tablesサービスにアクセス可能であれば成功メッセージが表示されます。

5. **新規作成**をクリックして作成を完了します。これでこのコネクターを使ったルール作成に進めます。

## EMQX Tablesへのデータ取り込み用ルールの作成

次に、書き込むデータを指定し、EMQX Tablesへ書き込むアクションを追加するルールを作成します。

1. **ルール**セクションで**新規ルール**をクリックするか、コネクター横の**アクション**アイコンを使います。

2. **SQLエディター**でSQLルールを定義します。この例では、クライアントが`temp_hum/emqx`トピックに温度と湿度のメッセージを送信した際にエンジンをトリガーすることを目的としています。以下のようにSQLを設定します。

   ```sql
     SELECT
       timestamp,
       payload.location as location,
       payload.temp as temp,
       payload.hum as hum
     FROM "temp_hum/emqx"
   ```

   ::: tip

   初心者の方は**Try It Out**をクリックしてSQLルールの学習とテストが可能です。

   :::

3. **次へ**をクリックしてルールにアクションを追加します。

4. **コネクター**のドロップダウンから先ほど作成したコネクターを選択します。

5. **時間精度**は`millisecond`（デフォルト）に設定します。

6. **書き込み構文**を設定し、EMQX Tablesにデータを解析・書き込みするためのLine Protocolフォーマットを定義します。

   測定値、タグセット、フィールドセット、タイムスタンプを含むテキストベースのフォーマットを指定し、サポートされるプレースホルダーを利用します。詳細は[InfluxDB 2.3 Line Protocol](https://docs.influxdata.com/influxdb/v2/reference/syntax/line-protocol/)および[InfluxDB 1.8 Line Protocol](https://docs.influxdata.com/influxdb/v1/write_protocols/line_protocol_tutorial/)を参照してください。

   例：

   ```
    temp_hum,location=${location} temp=${temp},hum=${hum} ${timestamp}
   ```

   ::: tip

   - 符号付き整数型の値を書き込む場合は、プレースホルダーの後に`i`を付けます。例：`${payload.int}i`。詳細は[InfluxDB 1.8 整数値の書き込み](https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_reference/#write-the-field-value-1-as-an-integer-to-influxdb)を参照してください。
   - 符号なし整数型の値を書く場合は、プレースホルダーの後に`u`を付けます。例：`${payload.int}u`。詳細は同上リンクを参照してください。

   :::

7. **確定**をクリックしてルールを保存します。

8. **新規ルール作成成功**のポップアップで**ルールに戻る**をクリックし、ルール作成を完了します。

![emqx_tables_rule_action](./_assets/emqx_tables_rule_action.png)

## ルールのテストとデータのクエリ

[MQTTX](https://mqttx.app/)などのクライアントツールを使って温度・湿度データの送信をシミュレートすることを推奨します。簡単なデモとしては、ブローカーの左メニューにある**オンラインテスト**機能を使うことも可能です。

1. **オンラインテスト**で、ユーザー名とパスワードまたは自動生成された認証情報を使ってデプロイメントに接続します。

2. **メッセージ**欄で以下のメッセージを送信します。

   - **トピック**：`temp_hum/emqx`

   - **ペイロード**：

     ```json
     {
       "temp": 27.5,
       "hum": 41.8,
       "location": "Prague"
     }
     ```

   ![emqx_tables_online_test](./_assets/emqx_tables_online_test.png)

3. EMQX Tablesのデプロイメントに移動し、左メニューから**データエクスプローラー**をクリックします。

4. 以下のSQLを実行して、`public`テーブルに取り込まれたデータをクエリします。

   ```sql
   select * from "temp_hum"
   ```

   **クエリ結果**テーブルに1件のレコードが表示されるはずです。

   ![emqx_tables_query](./_assets/emqx_tables_query.png)

5. EMQX ブローカーのデプロイメントでルール統計を確認します。ルール一覧のルールIDをクリックすると、そのルールおよび関連アクションの統計情報が表示されます。

   ![emqx_tables_rule_statistics](./_assets/emqx_tables_rule_statistics.png)
