# Amazon Kinesis への MQTT データストリーミング

[AWS Kinesis](https://aws.amazon.com/cn/kinesis/) は、AWS 上のフルマネージドリアルタイムストリーミングデータ処理サービスであり、ストリーミングデータの収集、処理、分析を容易にします。あらゆる規模のストリーミングデータを経済的かつ効率的にリアルタイム処理でき、高い柔軟性を持ち、数十万のソースからの大量データを低レイテンシで処理可能です。

EMQX プラットフォームは [Amazon Kinesis Data Streams](https://aws.amazon.com/kinesis/data-streams/) とシームレスに連携でき、大量の IoT デバイスを接続してリアルタイムのメッセージ収集と送信を実現します。このデータ統合を通じて、Amazon Kinesis Data Streams と接続し、リアルタイムのデータ分析や複雑なストリーム処理を可能にします。

本ページでは、Kafka データ統合の機能紹介と作成手順を詳述します。内容は、Amazon Kinesis コネクターの作成、ルールの作成、ルールのテストを含み、MQTT プロトコルを介してシミュレートした温度・湿度データを EMQX プラットフォームに報告し、設定したデータ統合を通じて Amazon Kinesis に保存する方法を示します。

## 動作の仕組み

Amazon Kinesis データ統合は、EMQX の標準機能であり、MQTT データストリームを Amazon Kinesis とシームレスに統合し、IoT アプリケーション開発における豊富なサービスと機能を活用できるよう支援します。

![EMQX プラットフォームと Amazon Kinesis の統合](./_assets/data_integration_amazon_kinesis.png)

EMQX はルールエンジンと Sink を介して MQTT データを Amazon Kinesis に転送します。全体の流れは以下の通りです。

1. **IoT デバイスがメッセージをパブリッシュ**：デバイスは特定のトピックを通じてテレメトリや状態データをパブリッシュし、ルールエンジンをトリガーします。
2. **ルールエンジンがメッセージを処理**：組み込みのルールエンジンは特定のトピックに基づいて MQTT メッセージを処理します。ルールエンジンは対応するルールをマッチングし、データ形式の変換、特定情報のフィルタリング、コンテキスト情報の付加などの処理を行います。
3. **Amazon Kinesis へのブリッジング**：ルールによってトリガーされたアクションは、メッセージを Amazon Kinesis に転送します。パーティションキー、書き込み先のデータストリーム、メッセージ形式をカスタマイズ可能で、柔軟なデータ統合を実現します。

MQTT メッセージデータが Amazon Kinesis に書き込まれた後、以下のような柔軟なアプリケーション開発が可能です。

- リアルタイムデータ処理・分析：Amazon Kinesis の強力なデータ処理・分析ツールとストリーミング機能を活用し、メッセージデータのリアルタイム処理・分析を行い、有益なインサイトや意思決定支援を得られます。
- イベント駆動機能：Amazon のイベント処理をトリガーし、動的かつ柔軟な機能の起動と処理を実現します。
- データ保存・共有：メッセージデータを Amazon Kinesis のストレージサービスに送信し、大量データを安全に保存・管理します。これにより、他の Amazon サービスと連携してデータの共有や分析が可能となり、多様なビジネスニーズに対応します。

## 特徴と利点

EMQX プラットフォームと AWS Kinesis Data Streams のデータ統合は、以下の機能と利点をもたらします。

- **信頼性の高いデータ伝送と順序保証**：EMQX と AWS Kinesis Data Streams はともに信頼性の高いデータ伝送機構を提供します。EMQX は MQTT プロトコルを通じてメッセージの確実な送信を保証し、AWS Kinesis Data Streams はパーティションとシーケンス番号でメッセージの順序を保証します。これにより、デバイスから送信されたメッセージが正確に届き、正しい順序で処理されます。
- **リアルタイムデータ処理**：デバイスからの高頻度データは EMQX のルール SQL によるリアルタイムの一次処理を受け、MQTT メッセージのフィルタリング、抽出、付加、変換が容易に行えます。AWS Kinesis Data Streams へ送信後は、AWS Lambda や AWS 管理の Apache Flink と組み合わせてさらなるリアルタイム分析が可能です。
- **弾力的なスケーラビリティ対応**：EMQX は数百万の IoT デバイス接続を容易に実現し、弾力的なスケーラビリティを提供します。一方、AWS Kinesis Data Streams はオンデマンドの自動リソース割り当てと拡張を行います。両者を組み合わせたアプリケーションは接続数やデータ量の増加に応じてスケールし、ビジネスの成長に継続的に対応可能です。
- **パーシステンスなデータ保存**：AWS Kinesis Data Streams はパーシステンスなデータ保存機能を備え、毎秒数百万のデバイスデータストリームを信頼性高く保存します。必要に応じて過去データを取得でき、オフライン分析や処理も容易に行えます。

AWS Kinesis Data Streams を利用したストリーミングデータパイプラインの構築は、EMQX と AWS プラットフォームの統合の難易度を大幅に低減し、ユーザーにより豊かで柔軟なデータ処理ソリューションを提供します。これにより、EMQX ユーザーは AWS 上で機能的に充実した高性能なデータ駆動型アプリケーションを構築できます。

## はじめる前に

本節では、EMQX プラットフォームで AWS Kinesis データ統合を作成する前に必要な設定手順を説明します。ネットワーク設定、AWS リソースの作成、およびローカルテスト環境のオプション設定を行います。

### 前提条件

作業を進める前に、EMQX データ統合で使用する以下の概念に慣れていることを確認してください。

- EMQX の [ルール](./rules.md)
- [データ統合](./introduction.md) 機能

### ネットワーク設定

EMQX が AWS Kinesis と通信できるようにネットワークアクセスを設定する必要があります。以下の2つのサポートされる方法を説明します。

- **NAT ゲートウェイ**：デプロイメントに NAT ゲートウェイを設定し、パブリック IP を介して AWS サービスにアクセスします。設定方法は [NAT Gateway](../vas/nat-gateway.md) を参照してください。
- **AWS PrivateLink**：EMQX と AWS Kinesis 間に PrivateLink 接続を確立します。以下の手順に従い、VPC エンドポイントを作成し PrivateLink 接続を設定します。

#### Kinesis へのアクセス用インターフェイス VPC エンドポイントの作成

この節では、AWS でインターフェイス VPC エンドポイントを作成する手順を説明します。このエンドポイントにより、VPC と AWS Kinesis 間でパブリックインターネットを経由せずにプライベートかつ安全な接続が可能になります。

1. [Amazon VPC コンソール](https://console.aws.amazon.com/vpc/) を開きます。
2. ナビゲーションペインで **Endpoints** を選択します。
3. **Create endpoint** をクリックします。
4. **Type** で **AWS services** を選択します。
5. **Service name** で **Kinesis** サービスを選択します。
6. **VPC**、**Subnet(s)** を設定し、EMQX デプロイメントからのアクセスを許可するよう **Security Group** を設定します。
   - 選択したサブネットが EMQX デプロイメントと同じアベイラビリティゾーンにあることを確認してください。詳細は [EMQX プラットフォームコンソールから AZ ID を取得する](../deployments/privatelink-aws.md#obtain-az-id-from-eqmx-platform-console) を参照してください。
7. **Create Endpoint** をクリックし、ターゲットグループ設定時に使用する IP アドレスを控えます。

詳細は [AWS 公式ドキュメント](https://docs.aws.amazon.com/vpc/latest/privatelink/create-interface-endpoint.html) を参照してください。

![VPC エンドポイント作成](./_assets/create_vpc_endpoint.png)

#### EMQX と Kinesis 間の PrivateLink 接続の作成

EMQX と AWS Kinesis 間でパブリックインターネットアクセスなしに安全かつプライベートな通信を実現するため、PrivateLink 接続を確立します。以下の手順で必要な AWS リソースを設定し、EMQX プラットフォームで PrivateLink を有効化します。

1. [AWS プラットフォームでの準備手順完了](../deployments/privatelink-aws.md#complete-preparatory-steps-on-the-aws-platform) のステップ1～3を完了します。

   - ステップ3「ロードバランシング用ターゲットグループの作成」では以下を設定します。

     - **基本設定**：

       - **ターゲットタイプの選択**：`IP addresses` を選択
       - **プロトコル: ポート**：`TCP : 443`
       - **VPC**：Kinesis エンドポイントと同じ VPC を選択

       ![ターゲットグループ作成](./_assets/create_target_group.png)

     - **ターゲット登録**：前節「Kinesis へのアクセス用インターフェイス VPC エンドポイントの作成」で控えたサブネットの IP アドレスを入力し、**Port** を `443` に設定。

2. [AWS プラットフォームでの準備手順完了](../deployments/privatelink-aws.md#complete-preparatory-steps-on-the-aws-platform) のステップ4に従い、ネットワークロードバランサー（NLB）を作成・設定します。

   - **アベイラビリティゾーン**が EMQX デプロイメントと一致していることを確認してください。
   - **リスナー**の **Protocol:Port** を `TCP:443` に設定。

     ![ロードバランサー設定](./_assets/configure_load_balancer.png)

3. [エンドポイントサービスの作成](../deployments/privatelink-aws.md#create-an-endpoint-service) に従います。

   - **利用可能なロードバランサー**で前ステップで作成した NLB を選択。

4. [EMQX プラットフォームで PrivateLink を有効化](../deployments/privatelink-aws.md#enable-privatelink-on-emqx-platform) に従います。

   - 生成された PrivateLink アドレスを保存してください。Kinesis コネクター設定時に必要です。

     ![PrivateLink ステータス](./_assets/privatelink_status.png)

### Amazon Kinesis Data Streams でストリームを作成

このステップでは、AWS マネジメントコンソールを使って Kinesis Data Stream を作成します（詳細はこの[チュートリアル](https://docs.aws.amazon.com/streams/latest/dev/how-do-i-create-a-stream.html)を参照）。このデータストリームは EMQX プラットフォームから送信されるメッセージの送信先となります。

1. AWS マネジメントコンソールにサインインし、Kinesis コンソールを開きます。
2. ナビゲーションバーのリージョンセレクターを展開し、リージョンを選択します。
3. **Create data stream** を選択します。
4. 「Create Kinesis stream」ページでデータストリーム名を入力し、**On-demand** または **Provisioned** キャパシティモードを選択します。

### Amazon Kinesis Data Streams をローカルでエミュレート（任意）

開発やテスト目的で、[LocalStack](https://www.localstack.cloud/) を使ってローカル環境で Kinesis サービスをエミュレートできます。これにより、クラウドに接続せずに AWS 互換サービスをローカルで実行可能です。

1. Docker イメージを使ってインストール・起動します。

   ```bash
   # LocalStack docker イメージをローカルで起動
   docker run --name localstack -p '4566:4566' -e 'KINESIS_LATENCY=0' -d localstack/localstack:2.1

   # コンテナにアクセス
   docker exec -it localstack bash
   ```

2. シャード数1のストリーム **my_stream** を作成します。

   ```bash
   awslocal kinesis create-stream --stream-name "my_stream" --shard-count 1
   ```

## Amazon Kinesis コネクターの作成

データ統合ルールを作成する前に、EMQX プラットフォームが Kinesis サービスにデータを送信できるよう、Amazon Kinesis コネクターを作成します。

1. ご自身のデプロイメントに移動し、左ナビゲーションメニューから **Data Integration** をクリックします。
2. 初めてコネクターを作成する場合は、**Data Forward** カテゴリの中から **Amazon Kinesis** を選択します。
   既存のコネクターがある場合は、**New Connector** をクリックし、**Data Forward** の中から **Amazon Kinesis** を選択します。
3. **New Connector** ページで以下の項目を設定します。
   - **Amazon Kinesis Endpoint**：
     - NAT ゲートウェイを使用する場合は、`https://kinesis.<region>.amazonaws.com` の形式でエンドポイントを入力します。`<region>` は AWS Kinesis サービスがホストされているリージョンに置き換えてください。
     - AWS PrivateLink を使用する場合は、Kinesis サービスの PrivateLink アドレスを入力します。`https://` で始まることを確認してください。
     - LocalStack を使用する場合は `http://localhost:4566` を使用します。
   - **AWS Access Key ID**：[アクセスキーID](https://docs.aws.amazon.com/powershell/latest/userguide/creds-idc.html) を入力します。LocalStack 使用時は任意の値で構いません。
   - **AWS Secret Access Key**：[シークレットアクセスキー](https://docs.aws.amazon.com/powershell/latest/userguide/creds-idc.html) を入力します。LocalStack 使用時は任意の値で構いません。
   - その他の設定はデフォルト値を使用するか、ビジネス要件に応じて設定してください。
4. **Test** ボタンをクリックし、Kinesis サービスへの接続を検証します。接続成功時は確認メッセージが表示されます。

   ![Kinesis コネクター](./_assets/kinesis_connector.png)

5. **New** をクリックしてコネクターの作成を完了します。

## ルールの作成

次に、書き込むデータを指定するルールを作成し、処理したデータを Amazon Kinesis に転送するアクションをルールに追加します。

1. ルールエリアで **New Rule** をクリックするか、作成したコネクターの **Actions** 列にある新規ルールアイコンをクリックします。
2. **SQL editor** にルールマッチング用の SQL 文を入力します。以下の例は、`temp_hum/emqx` トピックに送信されたメッセージから報告時刻 `up_timestamp`、クライアント ID、およびメッセージ本文（ペイロード）を読み取り、温度と湿度を抽出します。

   ```sql
   SELECT 
   timestamp,
   clientid, 
   payload.temp as temp, 
   payload.hum as hum
   
   FROM
   "temp_hum/emqx"
   ```

   **Enable Test** を使ってデータ入力をシミュレートし、結果をテストできます。
3. **Next** をクリックしてアクションを追加します。
4. **Connector** ドロップダウンから先ほど作成したコネクターを選択します。
5. 以下の情報を設定します。

   - **Action Name**：システムが自動的にアクション名を生成します。

   - **Amazon Kinesis Stream**：Amazon Kinesis Data Streams で作成したストリーム名を入力します。

   - **Partition Key**：このストリームに送信されるレコードに関連付けるパーティションキーを入力します。`${variable_name}` 形式のプレースホルダーが使用可能です（次のステップの例を参照）。

   - **Payload Template** フィールドは空白のままにするかテンプレートを定義します。
     - 空白の場合、MQTT メッセージのクライアント ID、トピック、ペイロードなどの可視入力を JSON 形式でエンコードします。
     - 定義したテンプレートを使用する場合、`${variable_name}` 形式のプレースホルダーは MQTT コンテキストの対応する値に置き換えられます。
       
       例えば、MQTT メッセージのトピックが `my/topic` の場合、`${topic}` はそれに置き換えられます。テンプレートは要件に応じて柔軟に調整可能です。転送したい温度と湿度の値だけを入力することもできます。

       ```bash
       {"timestamp": ${timestamp}, "client_id": ${client_id}, "temp": ${temp}, "hum": ${hum}}
       ```

   - 高度な設定（任意）：バッファキューやバッチモードの使用を必要に応じて選択します。

6. **Confirm** ボタンをクリックしてルール作成を完了します。
7. **Successful new rule** ポップアップで **Back to Rules** をクリックし、データ統合の一連の設定を完了します。

## ルールのテスト

[MQTTX](https://mqttx.app/) を使って温度・湿度データの報告をシミュレートすることを推奨しますが、他のクライアントでも構いません。

1. MQTTX でデプロイメントに接続し、以下のトピックにメッセージを送信します。

   - トピック：`temp_hum/emqx`
   - ペイロード：

     ```json
     {"temp":"23.5","hum":"32.6"}
     ```

2. コンソールで運用データを確認します。ルール一覧のルール ID をクリックすると、ルールの統計情報およびそのルール配下のすべてのアクションの統計が表示されます。
3. [Amazon Kinesis Data Viewer](https://docs.aws.amazon.com/streams/latest/dev/data-viewer.html) にアクセスし、レコード取得時にメッセージが表示されることを確認します。

    ![Amazon Kinesis 結果](./_assets/data_integration_amazon_kinesis_results.png)

4. LocalStack を使った確認

    LocalStack を使用している場合は、以下の手順で受信データを確認できます。

    - EMQX にメッセージを送信する前に、以下のコマンドで ShardIterator を取得します。

    ```bash
    awslocal kinesis get-shard-iterator --stream-name my_stream --shard-id shardId-000000000000 --shard-iterator-type LATEST
    {
    "ShardIterator": "AAAAAAAAAAG3YjBK9sp0uSIFGTPIYBI17bJ1RsqX4uJmRllBAZmFRnjq1kPLrgcyn7RVigmH+WsGciWpImxjXYLJhmqI2QO/DrlLfp6d1IyJFixg1s+MhtKoM6IOH0Tb2CPW9NwPYoT809x03n1zL8HbkXg7hpZjWXPmsEvkXjn4UCBf5dBerq7NLKS3RtAmOiXVN6skPpk="
    }
    ```

    - MQTTX で `temp_hum/emqx` トピックにメッセージを送信します。

    - 受信したデータを読み取り、デコードします。

    ```bash
    awslocal kinesis get-records n--shard-iterator="AAAAAAAAAAG3YjBK9sp0uSIFGTPIYBI17bJ1RsqX4uJmRllBAZmFRnjq1kPLrgcyn7RVigmH+WsGciWpImxjXYLJhmqI2QO/DrlLfp6d1IyJFixg1s+MhtKoM6IOH0Tb2CPW9NwPYoT809x03n1zL8HbkXg7hpZjWXPmsEvkXjn4UCBf5dBerq7NLKS3RtAmOiXVN6skPpk="
    {
        "Records": [
            {
                "SequenceNumber": "49642650476690467334495639799144299020426020544120356866",
                "ApproximateArrivalTimestamp": 1689389148.261,
                "Data": "eyAibXNnIjogImhlbGxvIEFtYXpvbiBLaW5lc2lzIiB9",
                "PartitionKey": "key",
                "EncryptionType": "NONE"
            }
        ],
        "NextShardIterator": "AAAAAAAAAAFj5M3+6XUECflJAlkoSNHV/LBciTYY9If2z1iP+egC/PtdVI2t1HCf3L0S6efAxb01UtvI+3ZSh6BO02+L0BxP5ssB6ONBPfFgqvUIjbfu0GOmzUaPiHTqS8nNjoBtqk0fkYFDOiATdCCnMSqZDVqvARng5oiObgigmxq8InciH+xry2vce1dF9+RRFkKLBc0=",
        "MillisBehindLatest": 0
    }

    echo 'eyAibXNnIjogImhlbGxvIEFtYXpvbiBLaW5lc2lzIiB9' | base64 -d
    {"temp":"23.5","hum":"32.6"}
    ```
