# Amazon S3へのMQTTデータ取り込み

[Amazon S3](https://aws.amazon.com/s3/)は、高い信頼性、安定性、およびセキュリティを備えたインターネットベースのストレージサービスであり、迅速なデプロイと使いやすさが特徴です。EMQX Platformは、MQTTメッセージを効率的にAmazon S3バケットに保存することができ、柔軟なIoTデータストレージ機能を実現します。

本ページでは、EMQX PlatformとAmazon S3間のデータ統合について詳しく紹介し、ルールおよびSinkの作成方法について実践的なガイドを提供します。

## 動作概要

EMQX PlatformにおけるAmazon S3データ統合は、複雑なビジネス開発に対応可能なすぐに使える機能として提供されています。典型的なIoTアプリケーションでは、EMQX Platformがデバイス接続およびメッセージ送受信を担うIoTプラットフォームとして機能し、Amazon S3がメッセージデータの保存を担当するデータストレージプラットフォームとして利用されます。

![EMQX Platform-integration-s3](./_assets/data-integration-s3.jpg)

EMQX PlatformはルールとSinkを利用してデバイスのイベントやデータをAmazon S3へ転送します。アプリケーションはAmazon S3からデータを読み出し、さらなるデータ活用を行います。具体的なワークフローは以下の通りです：

1. **デバイスのEMQX Platformへの接続**：IoTデバイスはMQTTプロトコルで正常に接続されるとオンラインイベントをトリガーします。イベントにはデバイスID、送信元IPアドレスなどのプロパティ情報が含まれます。
2. **デバイスからのメッセージパブリッシュと受信**：デバイスは特定のトピックを通じてテレメトリやステータスデータをパブリッシュします。EMQX Platformはメッセージを受信し、ルールエンジン内で比較処理を行います。
3. **ルールエンジンによるメッセージ処理**：組み込みのルールエンジンは、トピックマッチングに基づき特定のソースからのメッセージやイベントを処理します。対応するルールをマッチングし、データフォーマット変換、特定情報のフィルタリング、コンテキスト情報によるメッセージの付加などを行います。
4. **Amazon S3への書き込み**：ルールはメッセージをS3に書き込むアクションをトリガーします。Amazon S3 Sinkを使用して処理結果からデータを抽出し、S3へ送信します。メッセージはテキストまたはバイナリ形式で保存可能であり、メッセージ内容やSink設定に応じて複数行の構造化データを1つのCSVファイルにまとめることもできます。

イベントやメッセージデータがAmazon S3に書き込まれた後は、Amazon S3に接続してデータを読み出し、以下のような柔軟なアプリケーション開発が可能です：

- データアーカイブ：デバイスメッセージをAmazon S3のオブジェクトとして長期保存し、コンプライアンス要件やビジネスニーズに対応。
- データ分析：S3から分析サービス（例：Snowflake）にデータを取り込み、予知保全やデバイス効率評価などのデータ分析を実施。

## 特徴と利点

EMQX PlatformのAmazon S3データ統合を利用すると、以下の特徴と利点をビジネスにもたらします：

- **メッセージ変換**：メッセージはEMQX Platformのルール内で高度な処理・変換が可能であり、Amazon S3への保存や後続利用を容易にします。
- **柔軟なデータ操作**：S3 Sinkにより、特定のデータフィールドをAmazon S3バケットに簡単に書き込め、バケット名やオブジェクトキーの動的設定もサポートします。
- **統合されたビジネスプロセス**：S3 Sinkを通じてデバイスデータをAmazon S3の豊富なエコシステムと連携でき、データ分析やアーカイブなど多様なビジネスシナリオを実現します。
- **低コストの長期保存**：データベースと比較して、Amazon S3は高可用性かつ信頼性の高いコスト効率の良いオブジェクトストレージサービスであり、長期保存に適しています。

これらの特徴により、効率的で信頼性が高くスケーラブルなIoTアプリケーションを構築し、ビジネスの意思決定や最適化に役立てることができます。

## はじめる前に

このセクションでは、EMQX PlatformでAmazon S3 Sinkを作成する前の準備について説明します。

### 前提条件

- [ルール](./rules.md)の理解
- [データ統合](./introduction.md)の理解

### ネットワーク設定

EMQXがAmazon S3にパブリックネットワーク経由でアクセスするため、デプロイ環境で[NAT Gateway](../vas/nat-gateway.md)を有効にする必要があります。トップメニューバーの**VAS**をクリックし、NAT Gatewayカードを選択するか、デプロイ概要ページの下部タブバーで**Enable NAT Gateway service**を選択してください。

### S3バケットの準備

EMQX PlatformはAmazon S3およびその他のS3互換ストレージサービスをサポートしています。ここではAWSクラウドサービスを使ってS3バケットを作成します。

1. [AWS S3コンソール](https://console.amazonaws.cn/s3/home)で**Create bucket**ボタンをクリックし、バケット名やリージョンなどの必要情報を入力してS3バケットを作成します。詳細な操作は[AWSドキュメント](https://docs.amazonaws.cn/AmazonS3/latest/userguide/creating-bucket.html)を参照してください。
2. バケットの権限設定。バケット作成後、バケットを選択し**Permissions**タブをクリックします。ニーズに応じてバケットをパブリック読み書き、プライベート、その他の権限に設定可能です。以下のJSONを参考に設定してください：

   ```json
   {
    "Version": "2012-10-17",
    "Statement": [
      {
        "Sid": "Stmt1ListBucket",
        "Effect": "Allow",
        "Action": ["s3:ListBucket"],
        "Resource": ["arn:aws:s3:::emqx-cloud-s3-connector-test"]
      },
      {
        "Sid": "Stmt2GetAndPutObject",
        "Effect": "Allow",
        "Action": ["s3:GetObject", "s3:PutObject"],
        "Resource": ["arn:aws:s3:::emqx-cloud-s3-connector-test/*"]
      },
      {
        "Effect": "Allow",
        "Action": "s3:ListAllMyBuckets",
        "Resource": "*"
      }
    ]
   }
   ```

3. アクセスキーの取得。AWSコンソールで**IAM**サービスを検索・選択し、S3用の新規ユーザーを作成してAccess KeyとSecret Keyを取得します。

Amazon S3バケットの作成と設定が完了したら、EMQX PlatformでAmazon S3 Sinkを作成する準備が整います。

## コネクターの作成

データ統合ルールを作成する前に、対応するコネクターを作成する必要があります。

1. 初めてコネクターを作成する場合は、**Data Persistence**カテゴリの中から**Amazon S3**を選択します。既にコネクターを作成済みの場合は、**New Connector**を選択し、続けて**Data Persistence**カテゴリの中から**Amazon S3**を選択してください。
2. **Connector Name**：システムが自動的にコネクター名を生成します。
3. 接続情報を入力します：
   - **Host**：リージョンにより異なり、`s3.{region}.amazonaws.com`の形式です。
   - **Port**：`443`を入力します。
   - **Access Key ID**および**Secret Access Key**：AWSで作成したアクセスキーを入力します。
   - **Enable TLS**をクリックして有効化し、**TLS Verify**はオフにします。
4. **Test**ボタンをクリックします。Amazon S3サービスにアクセス可能であれば成功メッセージが表示されます。
5. **New**ボタンをクリックして作成を完了します。

これでコネクターの作成が完了し、続いてS3サービスに書き込むデータを指定するルールとSinkの作成に進みます。

## ルールの作成

このセクションでは、EMQX PlatformでソースMQTTトピック`t/#`からのメッセージを処理し、処理結果を設定済みのSinkを通じてS3の`emqx-cloud-s3-connector-test`バケットに書き込むルールの作成方法を示します。

1. ルールエリアの**New Rule**をクリックするか、作成したコネクターの**Actions**列にある新規ルールアイコンをクリックします。

2. **SQL editor**にルールのマッチングSQL文を入力します。

   ```sql
   SELECT
     *
   FROM
       "t/#"
   ```

   ::: tip

   SQLに不慣れな場合は、**SQL Examples**や**Enable Debug**をクリックしてルールSQLの学習やテストが可能です。

   :::

3. **Next**をクリックしてAmazon S3 Sinkを含むアクションを追加します。

4. **Connector**ドロップダウンボックスから先ほど作成したコネクターを選択します。

5. **Bucket**に`emqx-cloud-s3-connector-test`を入力します。このフィールドは`${var}`形式のプレースホルダーもサポートしますが、対応するバケット名が事前にS3に作成されている必要があります。

6. 必要に応じて**ACL**を選択し、アップロードされるオブジェクトのアクセス権限を指定します。

7. **Upload Method**を選択します。2つの方法の違いは以下の通りです：

   - **Direct Upload**：ルールがトリガーされるたびに、設定済みのオブジェクトキーと内容に従ってデータを直接S3にアップロードします。バイナリや大きなテキストデータの保存に適していますが、多数のファイルが生成される可能性があります。
   - **Aggregated Upload**：複数のルールトリガー結果を1つのファイル（例：CSVファイル）にまとめてS3にアップロードします。構造化データの保存に適しており、ファイル数を減らし書き込み効率を向上させます。

   各方法で設定項目が異なります。選択した方法に応じて設定してください：

   :::: tabs type

   ::: tab Direct Upload

   Direct Uploadでは以下の項目を設定します：

   - **Object Key**：バケット内のアップロード先オブジェクトの場所を定義します。`${var}`形式のプレースホルダーをサポートし、`/`で保存ディレクトリを指定可能です。管理や識別のためオブジェクトのサフィックス設定も必要です。ここでは`msgs/${clientid}_${timestamp}.json`と入力します。`${clientid}`はクライアントID、`${timestamp}`はメッセージのタイムスタンプで、各デバイスのメッセージを別オブジェクトに書き込みます。
   - **Object Content**：デフォルトはすべてのフィールドを含むJSONテキスト形式です。`${var}`形式のプレースホルダーをサポートし、ここでは`${payload}`を入力してメッセージ本文をオブジェクト内容として使用します。オブジェクトの保存形式はメッセージ本文の形式に依存し、圧縮ファイルや画像、その他バイナリ形式も対応可能です。

   :::

   ::: tab Aggregate Upload

   Aggregate Uploadでは以下のパラメータを設定します：

   - **Object Key**：オブジェクトの保存パスを指定します。以下の変数が使用可能です：

     - **`${action}`**：アクション名（必須）
     - **`${node}`**：アップロードを実行するEMQX Platformのノード名（必須）
     - **`${datetime.{format}}`**：集約開始日時。`{format}`でフォーマット指定（必須）：
       - **`${datetime.rfc3339utc}`**：UTC形式のRFC3339日時
       - **`${datetime.rfc3339}`**：ローカルタイムゾーン形式のRFC3339日時
       - **`${datetime.unix}`**：Unixタイムスタンプ
     - **`${datetime_until.{format}}`**：集約終了日時。フォーマットは上記と同様。
     - **`${sequence}`**：同一時間間隔内の集約アップロードの連番（必須）

     必須のプレースホルダーがテンプレートに含まれていない場合、これらは自動的にS3オブジェクトキーのパスサフィックスとして追加され、重複を防ぎます。その他のプレースホルダーは無効とみなされます。

   - **Aggregation Type**：現在はCSVのみサポート。データはカンマ区切りCSV形式でS3に書き込まれます。

   - **Column Order**：ルール結果のカラム順をドロップダウンで調整可能。生成されるCSVファイルは選択したカラム順にソートされ、未選択カラムはアルファベット順で続きます。

   - **Max Records**：最大レコード数に達すると単一ファイルの集約が完了しアップロードされ、時間間隔がリセットされます。

   - **Time Interval**：時間間隔に達すると最大レコード数に満たなくても単一ファイルの集約が完了しアップロードされ、最大レコード数がリセットされます。
   - **Min Part Size**：集約完了後のパートアップロードの最小チャンクサイズ。アップロード対象データはこのサイズに達するまでメモリに蓄積されます。
   - **Max Part Size**：パートアップロードの最大チャンクサイズ。S3 Sinkはこのサイズを超えるパートのアップロードを試みません。

   :::

   ::::

8. **Advanced Settings**を展開し、必要に応じて詳細設定を行います（任意）。詳細は[詳細設定](#advanced-settings)を参照してください。

9. **Confirm**ボタンをクリックしてルール作成を完了します。

10. **Successful new rule**ポップアップで**Back to Rules**をクリックし、データ統合の設定チェーンが完了します。

## ルールのテスト

このセクションでは、Direct Upload方式で設定したルールのテスト方法を示します。

MQTTXを使ってトピック`t/1`にメッセージをパブリッシュします：

```bash
mqttx pub -i EMQX Platform_c -t t/1 -m '{ "msg": "hello S3" }'
```

数件メッセージを送信した後、MinIOコンソールまたはAmazon S3コンソールで結果を確認します。

AWSマネジメントコンソールにログインし、Amazon S3コンソールを開きます：<https://console.aws.amazon.com/s3/>。

バケット一覧から`emqx-cloud-s3-connector-test`バケットを選択してバケット内に入り、オブジェクト一覧で直前にパブリッシュしたメッセージが`msg`オブジェクトとして正常に書き込まれていることを確認します。オブジェクトのチェックボックスを選択し、**Download**を選んでローカルにダウンロードし内容を確認できます。

## 詳細設定

このセクションでは、S3 Sinkの詳細設定オプションについて説明します。ダッシュボードでSinkを設定する際に、**Advanced Settings**を展開して以下のパラメータをニーズに応じて調整可能です。

| 項目名                     | 説明                                                                                              | デフォルト値  |
| -------------------------- | ------------------------------------------------------------------------------------------------ | ------------ |
| **Buffer Pool Size**       | EMQX PlatformとS3間のデータフローを管理するバッファワーカープロセスの数を指定します。これらのワーカーはデータを一時的に保存・処理し、ターゲットサービスへ送信する前にパフォーマンス最適化とスムーズなデータ伝送を担います。 | `16`         |
| **Request TTL**            | 「Request TTL」（Time To Live）は、リクエストがバッファに入ってから有効とみなされる最大時間（秒）を指定します。このタイマーはリクエストがバッファに入った時点からカウント開始されます。TTLを超えてバッファに滞留するか、送信後にS3からの応答やアックがタイムリーに返ってこない場合、リクエストは期限切れと判断されます。 |              |
| **Health Check Interval**  | SinkがS3との接続状態を自動的にヘルスチェックする間隔（秒）を指定します。                         | `15`         |
| **Max Buffer Queue Size**  | S3 Sinkの各バッファワーカーがバッファリング可能な最大バイト数を指定します。バッファワーカーはデータを一時保存し、効率的にS3へ送信するための中継役を果たします。システム性能やデータ伝送要件に応じて調整してください。 | `256`        |
| **Query Mode**             | メッセージ送信を最適化するため、`synchronous`または`asynchronous`のリクエストモードを選択可能です。非同期モードではS3への書き込みがMQTTメッセージのパブリッシュ処理をブロックしませんが、クライアントがメッセージを受信してからS3に到達するまでのタイムラグが発生する可能性があります。 | `Asynchronous` |
| **In-flight Window**       | 「In-flight queue requests」は、送信済みだがまだ応答やアックを受け取っていないリクエストを指します。この設定はSinkがS3と通信中に同時に存在可能なin-flightリクエストの最大数を制御します。<br/>**Request Mode**が`asynchronous`の場合、このパラメータは特に重要です。同一MQTTクライアントからのメッセージを厳密に順序処理する必要がある場合は、この値を`1`に設定してください。 | `100`        |
