# DynamoDBへのMQTTデータストリーミング

[DynamoDB](https://www.amazonaws.cn/en/dynamodb/)は、AWS上のフルマネージドで高性能なサーバーレスのキー・バリューストア型データベースサービスです。高速でスケーラブルかつ信頼性の高いデータストレージを必要とするアプリケーション向けに設計されています。EMQXはDynamoDBとの連携をサポートしており、MQTTメッセージやクライアントイベントをDynamoDBに保存することで、IoTデバイスの登録・管理やデバイスデータの長期保存およびリアルタイム分析を実現します。DynamoDBのデータ統合を通じて、MQTTメッセージやクライアントイベントをDynamoDBに格納できるほか、イベントに応じてDynamoDB内のデータの更新や削除をトリガーし、デバイスのオンライン状態や接続履歴などの情報を記録可能です。

本ページでは、DynamoDBデータ統合の機能概要を詳しく紹介し、作成手順を実践的に解説します。内容はDynamoDBコネクターの作成、ルールの作成、ルールのテストを含み、MQTTプロトコル経由でEMQXプラットフォームにシミュレートされた温度・湿度データを報告し、設定したデータ統合を通じてDynamoDBに保存する方法を示します。

## 動作の仕組み

DynamoDBデータ統合は、EMQXプラットフォームに標準搭載された機能であり、EMQXのデバイス接続とメッセージ送信機能をDynamoDBの強力なデータストレージ機能と組み合わせています。組み込みのデータ統合コンポーネントにより、EMQXからDynamoDBへのデータ取り込みと管理が簡素化され、複雑なコーディングを不要にします。

以下の図は、EMQXとDynamoDB間のデータ統合の典型的なアーキテクチャを示しています。

![EMQX Platform Integration AWS DynamoDB](./_assets/data_integration_aws_dynamodb.png)

MQTTデータをDynamoDBに取り込む流れは以下の通りです。

1. **メッセージのパブリッシュと受信**：接続された車両、IIoTシステム、エネルギー管理プラットフォームなどのIoTデバイスは、MQTTプロトコルを通じてEMQXに正常に接続し、特定のトピックにMQTTメッセージをパブリッシュします。EMQXはこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
2. **メッセージデータの処理**：メッセージが到着するとルールエンジンを通過し、EMQXで定義されたルールに基づいて処理されます。ルールは事前に定義された条件に基づき、DynamoDBにルーティングすべきメッセージを判定します。ペイロード変換が指定されている場合は、データ形式の変換、特定情報のフィルタリング、追加コンテキストによるペイロードの拡充などが適用されます。
3. **DynamoDBへのデータ取り込み**：ルールエンジンがDynamoDBへの保存対象メッセージを特定すると、DynamoDBへの転送アクションをトリガーします。処理済みデータはDynamoDBのテーブルにシームレスに書き込まれます。
4. **データの保存と活用**：データがDynamoDBに保存されることで、企業はそのクエリ機能を活用して様々なユースケースに対応可能です。例えば、コネクテッドカー分野では、車両の状態管理やリアルタイム指標に基づくルート最適化、資産追跡に利用できます。IIoT環境では、機械の状態監視、メンテナンス予測、生産スケジュールの最適化などに活用されます。

## 特徴と利点

DynamoDBとのデータ統合は、効率的なデータ送信・保存・活用を実現するための多彩な特徴とメリットを提供します。

- **リアルタイムデータストリーミング**：EMQXプラットフォームはリアルタイムデータストリーム処理に最適化されており、ソースシステムからDynamoDBへの効率的かつ信頼性の高いデータ送信を実現します。即時のインサイトやアクションが求められるユースケースに適しています。
- **柔軟なデータ変換**：EMQXプラットフォームは強力なSQLベースのルールエンジンを備え、DynamoDBに保存する前にデータを前処理可能です。フィルタリング、ルーティング、集約、拡充など多様なデータ変換機構をサポートし、ニーズに応じたデータ整形が可能です。
- **柔軟なデータモデル**：DynamoDBはキー・バリュー型およびドキュメント型のデータモデルを採用しており、構造化されたデバイスイベントやメッセージデータの保存・管理に適しています。異なるMQTTメッセージ構造の保存も容易です。
- **強力なスケーラビリティ**：EMQXプラットフォームはクラスターのスケーラビリティを備え、デバイス接続数やメッセージ量に応じてシームレスに水平スケール可能です。DynamoDBはサーバーやインフラ管理不要で、基盤リソースの管理とスケーリングを自動で行います。両者の組み合わせにより、高性能かつ高信頼のデータ保存とスケーラビリティを実現します。

## はじめる前に

本節では、EMQXプラットフォームでDynamoDBデータ統合を作成するための準備作業を紹介します。

### 前提条件

- [ルール](./rules.md)の理解
- [データ統合](./introduction.md)の理解

### ネットワーク設定

<!--@include: ./network-setting.md-->

### AWS DynamoDBインスタンスのセットアップ

AWS DynamoDBの作成は、クラウド上またはDockerを利用してローカルに構築する方法があります。

#### コンソールでDynamoDBインスタンスとテーブルを作成

初めてDynamoDBインスタンスを作成する場合は、[AWSのヘルプドキュメント](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/GettingStartedDynamoDB.html)を参照してください。

1. DynamoDBコンソールにアクセスし、テーブル作成で`temp_hum`を指定します。
2. テーブル名、パーティションキーなどの主要情報を入力し、その他は必要に応じてデフォルト設定のままにします。
3. テーブルのステータスが「アクティブ」になれば、`temp_hum`テーブルの作成が完了しています。

#### DynamoDBローカルサーバーのインストールとテーブル作成

1. docker-composeファイル`dynamo.yaml`を用意し、DynamoDBローカルサーバーをセットアップします。

    ```bash
    version: '3.8'
    services:
    dynamo:
        command: "-jar DynamoDBLocal.jar -sharedDb"
        image: "amazon/dynamodb-local:latest"
        container_name: dynamo
        ports:
        - "8000:8000"
        environment:
        AWS_ACCESS_KEY_ID: root 
        AWS_SECRET_ACCESS_KEY: public
        AWS_DEFAULT_REGION: us-west-2
    ```

2. サーバーを起動します。

    ```bash
    docker-compose -f dynamo.yaml up
    ```

3. テーブル定義を作成し、ホームディレクトリに`temp_hum.json`として保存します。

    ```bash
    {
        "TableName": "temp_hum",
        "KeySchema": [
            { "AttributeName": "id", "KeyType": "HASH" }
        ],
        "AttributeDefinitions": [
            { "AttributeName": "id", "AttributeType": "S" }
        ],
        "ProvisionedThroughput": {
            "ReadCapacityUnits": 5,
            "WriteCapacityUnits": 5
        }
    }
    ```

4. このファイルを使って新しいテーブルを作成します。

    ```bash
    docker run --rm -v ${HOME}:/dynamo_data -e AWS_ACCESS_KEY_ID=root -e AWS_SECRET_ACCESS_KEY=public -e AWS_DEFAULT_REGION=us-west-2 amazon/aws-cli dynamodb create-table --cli-input-json file:///dynamo_data/temp_hum.json --endpoint-url http://host.docker.internal:8000
    ```

5. テーブルが正常に作成されたか確認します。

    ```bash
    docker run --rm -e AWS_ACCESS_KEY_ID=root -e AWS_SECRET_ACCESS_KEY=public -e AWS_DEFAULT_REGION=us-west-2 amazon/aws-cli dynamodb list-tables --endpoint-url http://host.docker.internal:8000
    ```

    正常に作成されていれば、以下のJSONが表示されます。

    ```bash
        {
            "TableNames": [
                "temp_hum"
            ]
        }
    ```

## DynamoDBコネクターの作成

データ統合ルールを作成する前に、まずDynamoDBサーバーにアクセスするためのDynamoDBコネクターを作成します。

1. デプロイメントに移動し、左側ナビゲーションメニューから**データ統合**をクリックします。
2. 初めてコネクターを作成する場合は、**データ永続化**カテゴリの中から**DynamoDB**を選択します。既にコネクターを作成済みの場合は、**新規コネクター**を選択し、**データ転送**カテゴリの中から**DynamoDB**を選択します。
3. **新規コネクター**画面で以下の項目を設定します。
   - **DynamoDBリージョン**：DynamoDBインスタンスが存在するリージョンを入力します。例：`us-west-2`
   - **DynamoDBサーバー**：DynamoDBサービスの[エンドポイント](https://docs.aws.amazon.com/general/latest/gr/ddb.html)を入力します。必ず「https://」のプレフィックスを含めてください。LocalStackを利用する場合は`http://localhost:8000`を指定します。
   - **AWSアクセスキーID**：[アクセスキーID](https://docs.aws.amazon.com/powershell/latest/userguide/creds-idc.html)を入力します。例：`root`
   - **AWSシークレットアクセスキー**：[シークレットアクセスキー](https://docs.aws.amazon.com/powershell/latest/userguide/creds-idc.html)を入力します。例：`public`
   - その他の設定はデフォルトのままか、ビジネス要件に応じて設定してください。
4. **テスト**ボタンをクリックし、DynamoDBサービスにアクセス可能であれば成功メッセージが表示されます。
5. **新規作成**ボタンをクリックして作成を完了します。

## ルールの作成

次に、書き込むデータを指定するルールを作成し、処理済みデータをDynamoDBに転送するアクションを追加します。

1. ルールエリアで**新規ルール**をクリックするか、作成したコネクターの**アクション**列にある新規ルールアイコンをクリックします。

2. **SQLエディター**にルールのマッチングSQL文を入力します。以下のSQL例は、`temp_hum/emqx`トピックに送信されたメッセージから報告時間`up_timestamp`、クライアントID、メッセージ本文（ペイロード）を読み取り、温度と湿度を抽出します。

   ```sql
    SELECT 
    id as msgid,
    topic, 
    payload 
    
    FROM
    "temp_hum/emqx"
   ```

   オンライン／オフライン状態の記録用ルールを作成する場合は、以下の文を入力します。

   ```sql
    SELECT
    str(event) + timestamp as id, *
    FROM 
    "$events/client_connected", "$events/client_disconnected"
   ```

   ::: tip

   初心者の方は、**SQL例**をクリックし、**テストを有効化**してSQLルールの学習とテストを行うことをおすすめします。

   :::

3. **次へ**をクリックしてアクションを追加します。

4. **コネクター**のドロップダウンから先ほど作成したコネクターを選択します。

5. 以下の情報を設定します。

   - **アクション名**：システムが自動生成します。

   - **テーブル名**：`temp_hum`と入力します。

   - **メッセージテンプレート**：空欄の場合はメッセージ全体がデータベースに保存されます。テンプレートは有効なJSONで、プレースホルダーを含めることができ、テーブルのすべてのキーが含まれている必要があります。例：`{"id" : "${id}", "clientid" : "${clientid}", "data" : "${payload.data}"}`

     SQLテンプレート内で未定義のプレースホルダー変数がある場合は、**メッセージテンプレート**上部の**未定義変数をNULLとして扱う**スイッチでルールエンジンの動作を切り替えられます。

     - **無効**（デフォルト）：ルールエンジンは文字列`undefined`をデータベースに挿入します。

     - **有効**：変数が未定義の場合、ルールエンジンは`NULL`をデータベースに挿入します。

       ::: tip

       可能な限りこのオプションは有効にしてください。無効にするのは後方互換性を確保する場合のみです。

       :::

   - その他の設定はデフォルトのままか、ビジネス要件に応じて設定してください。

6. **確認**ボタンをクリックしてルール作成を完了します。

7. **新規ルール作成成功**のポップアップで**ルールに戻る**をクリックし、データ統合設定の一連の流れを完了します。

## ルールのテスト

[MQTTX](https://mqttx.app/)を使って温度・湿度データの報告をシミュレートすることを推奨しますが、他のクライアントでも構いません。

1. MQTTXでデプロイメントに接続し、以下のトピックにメッセージを送信します。

   - トピック：`temp_hum/emqx`

   - ペイロード：

     ```json
     {
       "temp": "27.5",
       "hum": "41.8"
     }
     ```

2. メッセージがDynamoDBに転送されているか確認します。

   - NoSQL Workbenchで結果を確認（任意）

     **[NoSQL Workbench](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/workbench.settingup.html)**はAmazon DynamoDB向けのクロスプラットフォームのクライアントGUIアプリケーションです。DynamoDBに接続して**Operation Builder**ページに移動し、`temp_hum`テーブルを選択すると、温度・湿度データの転送結果を確認できます。

   - データテーブルに書き込まれているか確認（任意）

     ```bash
     docker run --rm -e AWS_ACCESS_KEY_ID=root -e AWS_SECRET_ACCESS_KEY=public -e AWS_DEFAULT_REGION=us-west-2 amazon/aws-cli dynamodb scan --table-name=temp_hum --endpoint-url http://host.docker.internal:8000
     ```

3. コンソールで運用データを確認します。ルール一覧のルールIDをクリックすると、そのルールの統計情報やルール配下のすべてのアクションの統計情報を閲覧できます。
