# Amazon S3 Tables への MQTT データ取り込み

::: tip 注意

Amazon S3 Tables のデータ統合は、EMQX バージョン 5.91 以降の専用版で利用可能です。

:::

[Amazon S3 Tables](https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-tables.html) は、分析ワークロードに最適化された専用のストレージソリューションです。IoT センサーの計測値などの表形式データを Apache Iceberg フォーマットで高性能かつスケーラブルかつ安全に保存できます。

EMQX プラットフォームは Amazon S3 Tables とのシームレスな統合をサポートし、MQTT メッセージを効率的に S3 テーブルバケットに保存できます。この統合により、柔軟かつスケーラブルな IoT データ保存が可能となり、Amazon Athena、Amazon Redshift、Amazon EMR などの AWS サービスを用いた高度な分析や処理を促進します。

本ページでは、EMQX プラットフォームと Amazon S3 Tables のデータ統合について詳しく解説し、ルール作成の実践的な手順を紹介します。

## 動作概要

EMQX プラットフォームは Amazon S3 Tables と連携し、MQTT データをリアルタイムかつ構造化された形で Amazon S3 に取り込み、長期保存および分析を可能にします。この統合は EMQX のルールエンジンと S3 Tables アクションを活用し、MQTT メッセージを変換して Apache Iceberg フォーマットのテーブルに直接ストリーム書き込みします。

典型的な IoT シナリオでは以下のように動作します：

- **EMQX プラットフォーム** は MQTT ブローカーとして機能し、デバイスの接続管理、メッセージルーティング、データ処理を担当します。
- **Amazon S3 Tables** は MQTT メッセージデータを表形式で耐久的かつクエリ可能なストレージとして提供します。
- **Amazon Athena** は Iceberg テーブルの定義と保存データに対する SQL クエリ実行を担当します。

![emqx-integration-s3-tables](./_assets/emqx-integration-s3-tables.png)

ワークフローは以下の通りです：

1. **デバイスの EMQX プラットフォームへの接続**：IoT デバイスが MQTT 経由で EMQX に接続し、テレメトリデータをパブリッシュし始めます。
2. **メッセージルーティングとルールマッチング**：EMQX の組み込みルールエンジンが受信した MQTT メッセージを定義済みトピックにマッチングし、特定のフィールドや値を抽出します。
3. **データ変換**：EMQX のルールでメッセージペイロードをフィルタリング、変換、または拡張し、ターゲットの Iceberg テーブルのスキーマに合わせます。
4. **Amazon S3 Tables への書き込み**：ルールが S3 Tables アクションをトリガーし、変換済みデータをバッチ処理して Iceberg 互換の書き込み API を使い Amazon S3 Tables に送信します。データは Iceberg テーブルのパーティション下に Parquet ファイルとして永続化されます。
5. **クエリと分析**：取り込んだデータは Amazon Athena でクエリ可能となり、他のデータセットとの結合や Redshift Spectrum、Amazon EMR、Presto、Trino などのサードパーティ分析エンジンでの分析が可能です。

## 特長と利点

Amazon S3 Tables データ統合を利用することで、以下の特長と利点が得られます：

- **リアルタイムストリーム処理**：EMQX のルールエンジンにより、MQTT メッセージのリアルタイム抽出・変換・条件付きルーティングが可能で、S3 Tables への配信前に柔軟な処理が行えます。
- **Iceberg ベースの S3 ストレージ**：メッセージは Apache Iceberg テーブルに書き込まれ、従来のデータベースを不要にしつつ SQL ライクなアクセスが可能です。
- **分析ツールとの簡単な連携**：データが S3 Tables に入ると、Amazon Athena（SQL）、Amazon EMR、Redshift Spectrum、Presto、Trino、Snowflake などのツールでクエリや分析が行えます。
- **柔軟かつコスト効率の良いストレージ**：Amazon S3 は高耐久で低コストのオブジェクトストレージを提供し、アーカイブ、コンプライアンス、時系列分析に最適です。

## はじめる前に

このセクションでは、EMQX プラットフォームで Amazon S3 Tables 統合を作成するための準備を紹介します。

### 前提条件

以下の内容に慣れていることを確認してください：

#### EMQX の概念：

- [ルールエンジン](./rules.md)：MQTT メッセージからデータ抽出・変換のロジックを定義する方法を理解してください。
- [データ統合](./introduction.md)：EMQX のコネクターとシンクの概念を理解してください。

#### AWS の概念：

AWS S3 Tables に不慣れな場合は、以下の主要用語を確認してください：

- **テーブルバケット**：S3 Tables で Iceberg ベースのテーブルデータとメタデータを格納する専用の S3 バケット。
- **Amazon Athena**：Amazon S3 上のデータに対して直接 SQL クエリを実行できるサーバレスクエリエンジン。`CREATE TABLE` などの DDL ステートメントでスキーマや構造を定義可能。
- **カタログ**：Athena のメタデータコンテナで、データベース（ネームスペース）やテーブルを管理。
- **データベース（ネームスペース）**：カタログ内のテーブルを論理的にグループ化したもの。
- **Iceberg テーブル**：高性能でトランザクション対応のデータレイク向けテーブルフォーマット。スキーマ進化、パーティションプルーニング、タイムトラベルクエリをサポート。

### ネットワーク設定

<!--@include: ./network-setting.md-->

### S3 Tables バケットの準備

MQTT データの送信先となる AWS S3 Tables の準備が必要です。以下を用意してください：

- 実際のデータファイルを格納するテーブルバケット
- 関連テーブルを論理的にまとめるネームスペース
- 構造化された MQTT データを受け取る Iceberg ベースのテーブル

1. AWS マネジメントコンソールにログインします。

2. S3 サービスに移動し、左側ナビゲーションペインで **Table buckets** をクリックします。

3. **Create table bucket** をクリックし、テーブルバケット名（例：`mybucket`）を入力して **Create table bucket** をクリックします。

4. バケット作成後、クリックしてテーブル一覧に移動します。

5. **Create table with Athena** をクリックします。ポップアップでネームスペースの指定を求められます。

6. **Create a namespace** を選択し、ネームスペース名を入力して作成を確定します。

7. ネームスペース作成後、再度 **Create table with Athena** をクリックします。

8. Iceberg テーブルのスキーマを定義します：

   - **Query table with Athena** をクリックし、**Query editor** で：

     - カタログセレクターからカタログ（例：バケット名が `mybucket` の場合は `s3tablescatalog/mybucket`）を選択。
     - データベースセレクターから作成したネームスペースを選択。

   - 以下の DDL を実行し、テーブルタイプが `ICEBERG` であることを指定します。例：

     ```sql
     CREATE TABLE testtable (
       c_str string,
       c_long int )
     TBLPROPERTIES ('table_type' = 'ICEBERG');
     ```

     これにより、EMQX からの構造化 MQTT データを格納する Iceberg ベースのテーブルが定義されます。

9. テーブルの検証として、以下を実行してテーブルが空であることを確認します：

   ```sql
   select * from testtable
   ```

   ::: tip

   Athena で SQL を実行する際は、必ず正しいカタログとデータベース（ネームスペース）が選択されていることを確認してください。これにより、意図した S3 テーブルバケット内にテーブルが作成されます。

   :::

## コネクターの作成

S3 Tables ルールを追加する前に、EMQX プラットフォームが S3 Tables にデータ送信できるよう対応するコネクターを作成します。

1. デプロイメントに移動し、左ナビゲーションメニューから **Data Integration** をクリックします。

2. 初めてコネクターを作成する場合は、**Data Persistence** カテゴリの中から **S3 Tables** を選択します。既にコネクターを作成済みの場合は、**New Connector** を選択し、同じく **Data Persistence** の中から **S3 Tables** を選択します。

3. **New Connector** ページで以下を設定します：

   - **Connector Name**：システムが自動的にコネクター名を生成します。
   - **S3Tables ARN**：S3 テーブルバケットの Amazon リソースネーム（ARN）を入力します。AWS コンソールの Table buckets セクションで確認可能です。
   - **Access Key ID** と **Secret Access Key**：S3 Tables および Athena へのアクセス権限を持つ IAM ユーザーまたはロールの AWS アクセス認証情報を入力します。
   - **Enable TLS**：S3 Tables への接続時に TLS を有効または無効にします。TLS 接続オプションの詳細は [TLS for External Resource Access](https://docs.emqx.com/en/emqx/latest/network/overview.html#tls-for-external-resource-access) を参照してください。
   - **Health Check Timeout**：コネクターが S3 Tables との接続の自動ヘルスチェックを行う際のタイムアウト時間を指定します。

4. その他の設定はデフォルト値を使用します。

5. **New** をクリックする前に、**Test** ボタンでコネクターが S3 Tables サーバーに接続できるかテスト可能です。

6. ページ下部の **New** ボタンをクリックし、コネクター作成を完了します。

これでコネクター作成が完了し、続いて S3 Tables サービスに書き込むデータを指定するルールとアクションの作成に進みます。

## ルールの作成

このセクションでは、ソース MQTT トピック `t/#` からメッセージを処理し、処理結果を設定済みの S3 Tables の `mybucket` バケットに書き込むルール作成手順を示します。

1. ルールエリアで **New Rule** をクリックするか、作成したコネクターの **Actions** 列にある新規ルールアイコンをクリックします。

2. **SQL Editor** に以下のルール SQL を入力します：

   ::: tip

   出力フィールドが Iceberg テーブルで定義したスキーマと一致していることを必ず確認してください。必須カラムが欠落または誤った名前の場合、データのテーブル追加に失敗する可能性があります。

   :::

   ```sql
   SELECT
     payload.str as c_str,
     payload.int as c_long
   FROM
       "t/#"
   ```

   ::: tip

   SQL に不慣れな場合は、**SQL Examples** をクリックし、**Enable Debug** を有効にしてルール SQL の結果を学習・テストできます。

   :::

3. **Next** をクリックしてアクションを追加します。

4. **Connector** ドロップダウンから先ほど作成したコネクターを選択します。

5. アクション設定を行います：

   - **Namespace**：テーブルが存在するネームスペースを指定します。複数セグメントの場合はドット区切りで記述（例：`my.name.space`）。
   - **Table**：データを追加する Iceberg テーブル名を指定（例：`testtable`）。
   - **Max Records**：書き込み前にバッチ処理する最大レコード数。到達時にバッチがフラッシュされ即時アップロードされます。
   - **Time Interval**：レコード数が最大に達しなくても、指定ミリ秒経過後にデータをフラッシュする最大待機時間。

6. **フォールバックアクション（任意）**：メッセージ配信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義できます。詳細は [Fallback Actions](./actions.md#fallback-actions) を参照してください。

7. **Advanced Settings** を展開し、必要に応じて詳細設定を行います（任意）。

8. **Confirm** ボタンをクリックしてルール設定を完了します。

9. **Successful new rule** ポップアップで **Back to Rules** をクリックし、データ統合設定の全工程が完了します。

## ルールのテスト

このセクションでは、S3 Tables アクションを設定したルールの動作確認方法を示します。

1. MQTTX を使い、トピック `t/1` にメッセージをパブリッシュします：

   ```bash
   mqttx pub -i emqx_c -t t/1 -m '{ "str": "hello S3 Tables", "int": 123 }'
   ```

   このメッセージは `payload.str` と `payload.int` フィールドを含み、先に定義したルール SQL とテーブルスキーマに対応しています。

2. **Rules** ページでルールのメトリクスとアクションのステータスを監視します。新規の受信メッセージと送信メッセージがそれぞれ 1 件ずつ表示されるはずです。

3. Athena クエリエディターを開き、正しい **Catalog**（例：`s3tablescatalog/mybucket`）と **Database**（ネームスペース）が選択されていることを確認します。

4. 以下の SQL クエリを実行します：

   ```sql
   SELECT * FROM testtable
   ```

   以下のような行が表示されるはずです：

   | c_str           | c_long |
   | --------------- | ------ |
   | hello S3 Tables | 123    |
