# Amazon S3 Tables への MQTT データ取り込み

[Amazon S3 Tables](https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-tables.html) は、分析ワークロードに最適化された専用のストレージソリューションです。Apache Iceberg フォーマットで IoT センサーの読み取り値などの表形式データを、高性能かつスケーラブルかつ安全に保存できます。

EMQX は Amazon S3 Tables とのシームレスな連携をサポートし、MQTT メッセージを効率的に S3 テーブルバケットに保存できるようになりました。この連携により、柔軟かつスケーラブルな IoT データストレージが可能となり、Amazon Athena、Amazon Redshift、Amazon EMR などの AWS サービスを用いた高度な分析や処理を支援します。

本ページでは EMQX と Amazon S3 Tables 間のデータ統合について詳しく解説し、ルールおよび Sink の作成方法を実践的に案内します。

## 動作概要

EMQX は Amazon S3 Tables と連携し、MQTT データをリアルタイムかつ構造化された形で Amazon S3 に取り込み、長期保存および分析に活用できるようにします。この連携は EMQX のルールエンジンと S3 Tables Sink を活用し、MQTT メッセージを Apache Iceberg フォーマットのテーブルに変換して直接 S3 テーブルバケットにストリーミングします。

一般的な IoT シナリオでは以下のように動作します。

- **EMQX** は MQTT ブローカーとして、デバイスの接続管理、メッセージのルーティング、データ処理を担当します。
- **Amazon S3 Tables** は MQTT メッセージデータを表形式で耐久的かつクエリ可能なストレージとして保存します。
- **Amazon Athena** は Iceberg テーブルの定義や保存データへの SQL クエリ実行に利用されます。

![emqx-integration-s3-tables](./assets/emqx-integration-s3-tables.png)

ワークフローは以下の通りです。

1. **デバイスの EMQX への接続**：IoT デバイスが MQTT 経由で EMQX に接続し、テレメトリデータをパブリッシュします。
2. **メッセージルーティングとルールマッチング**：EMQX は組み込みのルールエンジンを使い、受信した MQTT メッセージを定義済みトピックにマッチさせ、特定のフィールドや値を抽出します。
3. **データ変換**：EMQX のルールでメッセージペイロードをフィルタリング、変換、または強化し、ターゲットとなる Iceberg テーブルのスキーマに合わせます。
4. **Amazon S3 Tables への書き込み**：ルールが S3 Tables Sink アクションをトリガーし、変換済みデータをバッチ処理して Iceberg 互換の書き込み API を用いて Amazon S3 Tables に送信します。データは Iceberg テーブルのパーティション下に Parquet ファイルとして永続化されます。
5. **クエリと分析**：取り込んだデータは Amazon Athena でクエリ可能となり、他のデータセットと結合したり、Redshift Spectrum、Amazon EMR、Presto、Trino などのサードパーティ分析エンジンで分析可能です。

## 特長と利点

EMQX で Amazon S3 Tables データ統合を利用することで、以下の特長と利点を得られます。

- **リアルタイムストリーム処理**：EMQX のルールエンジンにより、MQTT メッセージをリアルタイムに抽出・変換・条件付きルーティングし、S3 Tables に配信できます。
- **Iceberg ベースの S3 ストレージ**：メッセージは Apache Iceberg テーブルに書き込まれ、従来のデータベースを不要にしつつ、SQL ライクなアクセスパターンを実現します。
- **分析ツールとの簡単連携**：S3 Tables にデータがあれば、Amazon Athena（SQL）、Amazon EMR、Redshift Spectrum、Presto、Trino、Snowflake などでクエリや分析が可能です。
- **柔軟かつコスト効率の良いストレージ**：Amazon S3 は高耐久かつ低コストのオブジェクトストレージを提供し、アーカイブ、コンプライアンス、デバイス生成データの時系列分析に最適です。

## はじめる前に

このセクションでは、EMQX で Amazon S3 Tables Sink を作成するための準備について説明します。

### 前提条件

作業を進める前に、以下の内容を理解していることを確認してください。

#### EMQX の概念：

- [ルールエンジン](./rules.md)：MQTT メッセージからデータを抽出・変換するロジックを定義する方法を理解してください。
- [データ統合](./data-bridges.md)：EMQX のコネクターおよび Sink の概念を理解してください。

#### AWS の概念：

AWS S3 Tables に不慣れな場合は、以下の主要用語を確認してください。

- **テーブルバケット**：S3 Tables で Iceberg ベースのテーブルデータとメタデータを保存するための特殊な S3 バケット。
- **Amazon Athena**：Amazon S3 に保存されたデータに対して直接 SQL クエリを実行できるサーバーレスクエリエンジン。DDL ステートメント（例：`CREATE TABLE`）を使ってスキーマや構造を定義可能。
- **カタログ**：Athena のメタデータコンテナで、データベース（ネームスペース）やテーブルを管理。
- **データベース（ネームスペース）**：カタログ内のテーブルを論理的にグループ化したもの。
- **Iceberg テーブル**：高性能でトランザクション対応のデータレイク向けテーブルフォーマット。スキーマ進化、パーティションプルーニング、タイムトラベルクエリをサポート。

### S3 Tables バケットの準備

EMQX で Sink を作成する前に、AWS S3 Tables 側で MQTT データの保存先を準備する必要があります。以下を用意してください。

- 実際のデータファイルを保存するテーブルバケット
- 関連テーブルを論理的にまとめるネームスペース
- 構造化された MQTT データを受け取る Iceberg ベースのテーブル

1. AWS マネジメントコンソールにログインします。

2. S3 サービスに移動し、左側のナビゲーションペインで **Table buckets** をクリックします。

3. **Create table bucket** をクリックし、テーブルバケット名（例：`mybucket`）を入力して **Create table bucket** をクリックします。

4. バケット作成後、そのバケットをクリックしてテーブル一覧画面に移動します。

5. **Create table with Athena** をクリックすると、ネームスペースの入力を求めるポップアップが表示されます。

6. **Create a namespace** を選択し、ネームスペース名を入力して作成を確定します。

7. ネームスペース作成後、再度 **Create table with Athena** をクリックして続行します。

8. Iceberg テーブルのスキーマを定義します。

   - **Query table with Athena** をクリックし、**Query editor** で以下を設定します。

     - **Catalog** セレクターからカタログ（例：バケット名が `mybucket` の場合は `s3tablescatalog/mybucket`）を選択。
     - **Database** セレクターから先ほど作成したネームスペースを選択。

   - 以下の DDL を実行してテーブルを作成し、テーブルタイプが `ICEBERG` に設定されていることを確認します。例：

     ```sql
     CREATE TABLE testtable (
       c_str string,
       c_long int )
     TBLPROPERTIES ('table_type' = 'ICEBERG');
     ```

     これは EMQX からの構造化 MQTT データを保存する Iceberg ベースのテーブルを定義します。

9. テーブルの検証として、以下を実行しテーブルが作成されており空であることを確認します。

   ```sql
   select * from testtable
   ```

   ::: tip

   Athena で SQL を実行する際は、必ず正しいカタログとデータベース（ネームスペース）が選択されていることを確認してください。これにより、意図した S3 テーブルバケット内にテーブルが作成されます。

   :::

## コネクターの作成

S3 Tables Sink を追加する前に、対応するコネクターを作成します。

1. ダッシュボードの **Integration** -> **Connector** ページに移動します。

2. 右上の **Create** ボタンをクリックします。

3. コネクタータイプとして **S3 Tables** を選択し、次へ進みます。

4. コネクター名を入力します。英数字の大文字・小文字の組み合わせで、ここでは `my-s3-tables` と入力します。

5. 必要な接続情報を入力します。

   - **S3Tables ARN**：S3 テーブルバケットの Amazon リソースネーム（ARN）を入力します。AWS コンソールの Table buckets セクションで確認可能です。
   - **Access Key ID** と **Secret Access Key**：S3 Tables と Athena へのアクセス権限を持つ IAM ユーザーまたはロールの AWS アクセス認証情報を入力します。
   - **Enable TLS**：S3 Tables への接続時は TLS がデフォルトで有効です。TLS 接続オプションの詳細は [TLS for External Resource Access](../network/overview.md#enable-tls-encryption-for-accessing-external-resources) を参照してください。
   - **Health Check Timeout**：コネクターが S3 Tables との接続状態を自動的にヘルスチェックする際のタイムアウト時間を指定します。

7. 残りの設定はデフォルト値を使用します。

8. **Create** をクリックする前に、**Test Connectivity** をクリックして S3 Tables サービスへの接続確認が可能です。

9. 最後に **Create** ボタンをクリックしてコネクター作成を完了します。

これでコネクター作成が完了しました。次にルールと Sink を作成し、S3 Tables へのデータ書き込みを指定します。

## Amazon S3 Tables Sink を用いたルールの作成

このセクションでは、EMQX でソース MQTT トピック `t/#` からメッセージを処理し、処理結果を S3 Tables の `mybucket` バケットに書き込むルールの作成方法を示します。

1. ダッシュボードの **Integration** -> **Rules** ページに移動します。

2. 右上の **Create** ボタンをクリックします。

3. ルール ID に `my_rule` を入力し、SQL エディターに以下のルール SQL を入力します。

   ```sql
   SELECT
     payload.str as c_str,
     payload.int as c_long
   FROM
       "t/#"
   ```

   ::: tip

   SQL に不慣れな場合は、**SQL Examples** や **Enable Debug** をクリックしてルール SQL の学習や結果のテストが可能です。

   :::

   ::: tip

   出力フィールドが Iceberg テーブルのスキーマと一致していることを必ず確認してください。必須カラムが欠落または誤った名前の場合、データのテーブルへの追加に失敗する可能性があります。

   :::

4. アクションを追加し、**Action Type** ドロップダウンから `S3 Tables` を選択します。アクションのドロップダウンはデフォルトの `create action` のままにするか、既存の S3 Tables アクションを選択してください。ここでは新しい Sink を作成してルールに追加します。

5. Sink 名と任意で説明を入力します。

6. **Connector** ドロップダウンから先ほど作成した `my-s3-tables` コネクターを選択します。新しいコネクターを素早く定義したい場合は、ドロップダウン横の **Create** ボタンをクリックしてください。設定パラメーターは [コネクターの作成](#コネクターの作成) を参照してください。

7. Sink の設定を行います。

   - **Namespace**：テーブルが存在するネームスペース。複数セグメントの場合はドット区切りで指定（例：`my.name.space`）。
   - **Table**：データを追加する Iceberg テーブル名（例：`testtable`）。
   - **Max Records**：S3 へ書き込む前にバッチ処理する最大レコード数。到達すると即座にバッチをフラッシュしてアップロードします。
   - **Time Interval**：Max Records に達していなくても、指定した最大待機時間（ミリ秒）経過後にデータをフラッシュします。

8. **フォールバックアクション（任意）**：メッセージ配信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義可能です。詳細は [フォールバックアクション](./data-bridges.md#fallback-actions) を参照してください。

9. **詳細設定** を展開し、必要に応じて高度な設定オプションを調整します（任意）。詳細は [詳細設定](#advanced-settings) を参照してください。

10. 残りの設定はデフォルト値を使用し、**Create** ボタンをクリックして Sink 作成を完了します。作成成功後、ルール作成画面に戻り、新しい Sink がルールのアクションに追加されます。

11. ルール作成画面で **Create** ボタンをクリックし、ルール作成全体を完了します。

これでルール作成が正常に完了しました。**Rules** ページで新規作成したルールを確認でき、**Actions (Sink)** タブで新しい S3 Tables Sink を確認できます。

また、**Integration** -> **Flow Designer** をクリックするとトポロジーを視覚的に確認でき、トピック `t/#` のメッセージがルール `my_rule` によって解析され、S3 Tables に書き込まれる流れを把握できます。

## ルールのテスト

このセクションでは、S3 Tables Sink を設定したルールのテスト方法を説明します。

1. MQTTX を使い、トピック `t/1` にメッセージをパブリッシュします。

   ```bash
   mqttx pub -i emqx_c -t t/1 -m '{ "str": "hello S3 Tables", "int": 123 }'
   ```

   このメッセージは `payload.str` と `payload.int` フィールドを含み、ルール SQL とテーブルスキーマに合致しています。

2. **Rules** ページでルールのメトリクスおよび Sink の状態を監視します。新規の受信メッセージと送信メッセージがそれぞれ 1 件ずつ増えているはずです。

3. Athena のクエリエディターを開き、正しい **Catalog**（例：`s3tablescatalog/mybucket`）と **Database**（ネームスペース）が選択されていることを確認します。

4. 以下の SQL クエリを実行します。

   ```sql
   SELECT * FROM testtable
   ```

   以下のような行が表示されるはずです。

   | c_str           | c_long |
   | --------------- | ------ |
   | hello S3 Tables | 123    |

## 詳細設定

このセクションでは、S3 Tables Sink の詳細設定オプションについて説明します。ダッシュボードの Sink 設定画面で **Advanced Settings** を展開し、用途に応じて以下のパラメーターを調整できます。

| フィールド名               | 説明                                                                                                          | デフォルト値  |
| ------------------------- | ------------------------------------------------------------------------------------------------------------- | ------------ |
| **Buffer Pool Size**      | EMQX と S3 Tables 間のデータフローを管理するバッファワーカープロセスの数を指定します。これらのワーカーはデータを一時的に保存・処理し、ターゲットサービスへの送信を最適化しスムーズなデータ転送を支えます。 | `16`         |
| **Request TTL**           | バッファに入ったリクエストが有効とみなされる最大時間（秒）を指定します。リクエストがバッファ内にこの TTL を超えて滞留するか、送信後に S3 Tables からタイムリーな応答やアックが得られない場合、リクエストは期限切れとみなされます。 |              |
| **Health Check Interval** | Sink が S3 Tables との接続状態を自動的にヘルスチェックする間隔（秒）を指定します。                                                         | `15`         |
| **Max Buffer Queue Size** | S3 Tables Sink の各バッファワーカーがバッファリング可能な最大バイト数を指定します。ワーカーはデータを一時保存し、効率的なデータストリーム処理を実現します。システム性能やデータ転送要件に応じて調整してください。 | `256`        |
| **Query Mode**            | メッセージ送信を最適化するため、`synchronous`（同期）または `asynchronous`（非同期）モードを選択可能です。非同期モードでは S3 Tables への書き込みが MQTT メッセージのパブリッシュ処理をブロックしませんが、クライアントがメッセージ到着前に受信する可能性があります。 | `Asynchronous` |
| **In-flight Window**      | 「インフライトキューリクエスト」とは、送信済みで応答やアックをまだ受け取っていないリクエストを指します。この設定は Sink と S3 Tables 間の通信で同時に存在可能なインフライトリクエストの最大数を制御します。<br/>`Request Mode` が `asynchronous` の場合、同一 MQTT クライアントからのメッセージを厳密に順次処理したい場合は、この値を `1` に設定してください。 | `100`        |
