# クイックスタート：MQTTデータをEMQX Tablesに取り込む

本ガイドでは、EMQX ブローカーとEMQX Tablesを使って、外部データベースに依存せずにMQTTからデータベースへのパイプラインを構築する方法を説明します。EMQX Tablesは、EMQX Cloudプラットフォームに組み込まれた完全マネージドのネイティブ時系列データベースで、リアルタイムのIoTデータ処理に最適化されています。

本ガイドで学べる内容：

- EMQX ブローカーとEMQX Tablesのデプロイメント作成
- データ統合を使ったMQTTデータのEMQX Tablesへの取り込み
- 組み込みのData Explorerを使ったSQLによる時系列データのクエリ
- 必要に応じたカスタムデータベースとテーブルの作成

::: tip 注意

本ドキュメントのスクリーンショットは参考用です。

EMQX Cloudは継続的に進化しており、コンソールのUIは更新や改善が行われるため、スクリーンショットが最新のインターフェースと完全に一致しない場合があります。ただし、全体のワークフローや機能は一貫しています。

:::

## EMQX Tables 無料トライアル

EMQX Tablesでは、ネイティブなMQTTからデータベースへの取り込みと時系列分析を無料で評価できるトライアルを提供しています。

### クォータと期間

EMQX Tables 無料トライアルには以下が含まれます：

- 14日間の無料トライアル期間
- 100 GBのアウトバウンドトラフィック
- 100 GBのストレージ容量

### 有効期限

- 3日間連続でアクティブな接続がないトライアルデプロイメントは自動的に停止されます。
- トライアル終了時にデプロイメントは即時停止します。
- インスタンスは有効期限の3日後に削除されます。
- 削除後、すべてのデータは完全に消去されます。

トライアル終了後もEMQX Tablesを継続利用するには、トライアル期間内に有料プランへアップグレードしてください。

## ユースケース：スマートファクトリーモニタリング

実際の例として、工場のデバイスが定期的に以下の時系列テレメトリデータを報告するとします：

- `machine_id`：デバイス識別子
- `production_line`：関連する生産ライン
- `temperature`：温度測定値
- `vibration`：振動強度
- `machine_status`：稼働状態（例：稼働中、警告、エラー）
- `ts`：測定時刻のタイムスタンプ

このデータをMQTT経由で取り込み、EMQX Tablesにネイティブに保存して監視、分析、アラートに活用します。

::: tip 注意

サーバレスデプロイメントでもEMQX Tablesへのデータ取り込みをサポートしています。本ガイドの手順はDedicated Flexを例にしていますが、サーバレスの場合もデータ統合の手順は同じです。ただし、サーバレスはEMQX TablesへTLSを使ってパブリックインターネット経由で接続するため、ネットワークアソシエーションやNATゲートウェイは適用されません。

:::

## EMQX ブローカーとEMQX Tablesのデプロイメント作成

1. EMQX Cloudコンソールにログインします。

2. 新規または既存の[プロジェクト](../feature/project.md#project-management)を作成または選択します。

3. **EMQX Brokers**の下で、**+ New Deployment**をクリックします。

4. **Dedicated Flex**プランを選択し、仕様を設定します。

   - 必要に応じて**クラウドプロバイダーとリージョン**を選択します。
   - 以下のオプションはデフォルトのままにして、簡単にデモを進められます。
     - Tier
     - Smart Data Hubのサブスクライブ（任意）
     - デプロイメント名とプロジェクト
     - EMQXバージョン

   右下の**Deploy**をクリックします。

   ![create_broker_deployment](./_assets/create_broker_deployment.png)

   詳細は[Dedicated Flexデプロイメントの作成](../create/dedicated.md)を参照してください。

5. **EMQX Tables**の下で、**+ New Deployment**をクリックします。

6. デフォルトの**Starter**プランを選択し、ブローカーのデプロイメントと**同じクラウドプロバイダーとリージョン**を選択します。

7. （任意）**Network Association**で既存のネットワークをドロップダウンから選択します。同じクラウドプラットフォームかつ同じリージョンにあるため、ブローカーのネットワークがリストに表示されます。これを選択すると両デプロイメントが同じネットワークを共有し、プライベート接続で通信できます。

8. Tierを選択します。

9. （任意）**Deployment Name**を入力します。

10. **Deploy**をクリックします。詳細は[EMQX Tablesデプロイメントの作成](./emqx_tables_create_deployment.md)を参照してください。

   ![new_table_deployment](./_assets/new_table_deployment.png)

デプロイメントが作成されたら、プロジェクト内のデプロイメントカードをクリックしてデプロイメントに入れます。

EMQX Tablesのデプロイメントに入り、左メニューの**Data Explorer**をクリックすると、`public`というデフォルトデータベースが利用可能なことが確認できます。

## ブローカーとTables間の接続

接続方法はブローカーのデプロイメントタイプとネットワーク設定によって異なります：

| デプロイメントタイプ | 条件 | 接続方法 | 必要な対応 | 備考 |
|---|---|---|---|---|
| Dedicated / Dedicated Flex | Tablesと同じクラウドプラットフォーム、リージョン、ネットワーク | プライベート（安全で低レイテンシ） | 2つ目のデプロイメント作成時に**Network Association**で既存ネットワークを選択。プロジェクトの[ネットワーク管理](https://docs.emqx.com/en/cloud/latest/deployments/network_management.html#project-level-network-management)で共有ネットワークを確認可能。 | 各ネットワークは最大1つのブローカーと1つのTablesデプロイメントをサポート。 |
| Dedicated / Dedicated Flex | Tablesと異なるリージョンまたはネットワーク | TLSを使ったパブリックインターネット | ブローカーの**ネットワーク管理**設定で**NAT Gateway**を有効化。 | — |
| Serverless | — | TLSを使ったパブリックインターネット | 対応不要 | ネットワークアソシエーションとNAT Gatewayは適用外。 |

## データ取り込み用ルールの作成

ルールエンジンを使ってMQTTメッセージをEMQX Tablesに取り込み、永続化します。

1. Dedicated Flex（またはServerless）デプロイメントに入り、**Data Integration**に移動します。

2. 初めてコネクターを作成する場合は、コネクター一覧から**EMQX Tables**を選択します。すでにコネクターがある場合は、**+ New Connector**をクリックして**EMQX Tables**を選択します。

   ![create_connector](./_assets/create_connector.png)

3. **Quick Setup**（デフォルト）を選び、プロジェクト内のTablesデプロイメントを選択します。

   ![connector_quick_setup](./_assets/connector_quick_setup.png)

4. **Test**をクリックして接続を検証します。成功メッセージが表示されます。

5. **New**をクリックし、**New Rule**を選択してルール作成に進みます。

6. **SQL Editor**でSQLルールを定義します。

   **ルールSQL例**：

   ```sql
   SELECT
     timestamp as ts,
     payload.machine_id as machine_id,
     payload.production_line as production_line,
     payload.temperature as temperature,
     payload.vibration as vibration,
     payload.machine_status as machine_status
   FROM "factory/+/metrics"
   ```

   > このルールは、トピック `factory/+/metrics` にマッチするすべてのMQTTメッセージのペイロードからフィールドを抽出します。抽出した値には別名を付けて、ルールアクションのWrite Syntax設定で参照可能にします。`timestamp`フィールドは`ts`にマッピングされ、EMQX Tablesに記録する際の時間インデックスになります。

7. **Next**をクリックしてルールにアクションを追加します。アクション設定を行います：

   - **Connector**：作成したEMQX Tablesコネクターを選択

   - **Time Precision**：`millisecond`

   - **Write Syntax**：

     ```text
     machine_metrics,production_line=${production_line},machine_id=${machine_id} temperature=${temperature},vibration=${vibration},machine_status=${machine_status} ${ts}
     ```
     
     > この構文は、`machine_metrics`テーブルが存在しない場合は自動作成し、InfluxDB Line Protocol形式でデータを書き込みます。
     >
     >- **タグ**：`production_line`、`machine_id`（ディメンションかつ主キー）
     >- **フィールド**：`temperature`、`vibration`、`machine_status`（実際のメトリクス値）
     >- **タイムスタンプ**：`${ts}` はメッセージから抽出した`timestamp`を使い時系列を整合

8. **Confirm**をクリックしてルールを保存します。

   **Data Integration**ページに戻ると、作成したコネクター、ルール、アクションが表示されます。

   ![create_rule](./_assets/create_rule.png)

## MQTTメッセージのパブリッシュ

簡単にテストするには、Dedicated Flex（またはServerless）デプロイメント内の組み込み診断ツールで、左メニューの**Online Test**を使います。

1. **Online Test**でユーザー名とパスワード、または自動生成された認証情報でデプロイメントに接続します。

2. **Messages**セクションで以下の2つのメッセージを送信します：

::: tip

テーブルが事前に作成されていない場合、EMQX Cloudは**最初に正常に書き込まれたメッセージのデータ型を基に**自動でテーブルを作成します。一度テーブルとカラムが作成されると、以降の書き込みは同じデータ型を使う必要があり、異なる型の場合は書き込みが失敗します。

:::

   - **トピック**：`factory/A/metrics`

   - **ペイロード**：

     ```json
     {
       "machine_id": "M001",
       "production_line": "A",
       "temperature": 36.5,
       "vibration": 0.03,
       "machine_status": "running"
     }
     ```

     ```json
     {
       "machine_id": "M002",
       "production_line": "A",
       "temperature": 39.1,
       "vibration": 0.06,
       "machine_status": "warning"
     }
     ```

   ![publish_messages](./_assets/publish_messages.png)

## EMQX Tablesでのデータクエリ

1. **EMQX Tables**デプロイメントに入ります。

2. 左メニューの**Data Explorer**をクリックします。

3. エディターに以下のSQLを入力し、**Run Query**をクリックします：

   ```sql
   SELECT * FROM machine_metrics;
   ```

   取り込んだメッセージが表示されます。

   ![query_test_data](./_assets/query_test_data.png)

## 完成：ネイティブMQTTからDBへのパイプライン

これでライブパイプラインが完成しました：

**MQTTクライアント -> EMQX ブローカー -> ルールエンジン -> EMQX Tables -> SQL分析**

サードパーティのインフラは不要で、完全マネージドかつ時系列IoTワークロードに最適化されています。

次はGrafanaやStreamlitでメトリクスを可視化できます。詳細は[統合ガイド](./integration/emqx_tables_integration_guide.md)を参照してください。

## データベース機能のクイックガイド

EMQX Brokerからデフォルトの`public`データベースへのデータ取り込みに加え、EMQX Tablesではカスタムデータベースやテーブルを定義し、SQLで手動挿入やクエリが可能です。これによりテストや開発の柔軟性が向上します。

### カスタムデータベースの作成

デフォルトの`public`とは別にカスタムデータベースを作成できます。

1. デプロイメントの**Data Explorer**ページに移動します。

2. 以下のSQLを入力し、**Run Query**をクリックします：

   ```sql
   CREATE DATABASE factory WITH (ttl='7d');
   ```

これで、データ保持期間（TTL）が7日の`factory`という新しいデータベースが作成されます。

![create_custom_database](./_assets/create_custom_database.png)

### テーブルの作成

新しいデータベース内に、工場のメトリクスを保存する時系列テーブルを定義します。

**Data Explorer**で以下のSQLを入力し、**Run Query**をクリックします：

```sql
CREATE TABLE factory.machine_metrics (
    ts TIMESTAMP NOT NULL,
    production_line STRING,
    machine_id STRING,
    temperature DOUBLE,
    vibration DOUBLE,
    machine_status STRING DEFAULT 'running',
    TIME INDEX (ts),
    PRIMARY KEY (production_line, machine_id)
) WITH (
    ttl='7d'
);
```

このテーブルは`ts`を時間インデックスに、`production_line`と`machine_id`の複合主キーを持ちます。

### SQLを使ったデータ挿入

EMQX TablesはSQLベースとLine Protocolベースの両方のデータ取り込みをサポートします。`.txt`や`.lp`ファイルのアップロードによる書き込みも可能です。

**Data Explorer**で以下のコマンドを実行してサンプルデータを挿入します：

```sql
INSERT INTO factory.machine_metrics (ts, production_line, machine_id, temperature, vibration, machine_status)
VALUES
    (now(), 'A', 'M001', 36.5, 0.03, 'running'),
    (now(), 'A', 'M002', 39.1, 0.06, 'warning'),
    (now(), 'B', 'M010', 37.2, 0.02, 'running'),
    (now(), 'B', 'M011', 45.6, 0.12, 'error');
```

> `now()`は現在のタイムスタンプを挿入します。

### データのクエリ

データを確認するには**Data Explorer**を使用します。

#### クエリ例

すべてのレコードを表示：

```sql
SELECT * FROM factory.machine_metrics;
```

![query_all_records](./_assets/query_all_records.png)

直近60分の平均温度をラインと状態別に集計：

```sql
SELECT production_line, machine_status, AVG(temperature) AS avg_temp
FROM factory.machine_metrics
WHERE ts > now() - INTERVAL '60 minute'
GROUP BY production_line, machine_status;
```

![query_by_time_limits](./_assets/query_by_time_limits.png)

特定デバイスでフィルタリング：

```sql
SELECT ts, temperature
FROM machine_metrics
WHERE machine_id = 'M001'
ORDER BY ts DESC
LIMIT 10;
```

![query_filter_by_device](./_assets/query_filter_by_device.png)

### クイッククエリで高速アクセス

テーブルスキーマからSQLスニペットを素早く生成できます：

1. **Data Explorer**の左側スキーマパネルでカラムにカーソルを合わせます。
2. カラム横の**縦三点リーダー（︙）**をクリックします。
3. **Quick Query**を選択し、
   - **Query column**、**Query max**、**Query min**などのオプションを使います。
4. 生成されたSQLが自動的にエディターに表示されます。

![quick_query](./_assets/quick_query.png)

## リソース

サポートされている[SQL文と句](https://docs.greptime.com/reference/sql/overview/)についてはGreptimeのドキュメントを参照してください。

より詳細なデータクエリ手順は[EMQX Tablesでのデータクエリ](./emqx_tables_query_guide.md)をご覧ください。
