Skip to content

DynamoDBへのMQTTデータストリーミング

DynamoDBは、AWS上のフルマネージドで高性能なサーバーレスのキー・バリューストア型データベースサービスです。高速でスケーラブルかつ信頼性の高いデータストレージを必要とするアプリケーション向けに設計されています。EMQXはDynamoDBとの連携をサポートしており、MQTTメッセージやクライアントイベントをDynamoDBに保存することで、IoTデバイスの登録・管理やデバイスデータの長期保存およびリアルタイム分析を実現します。DynamoDBのデータ統合を通じて、MQTTメッセージやクライアントイベントをDynamoDBに格納できるほか、イベントに応じてDynamoDB内のデータの更新や削除をトリガーし、デバイスのオンライン状態や接続履歴などの情報を記録可能です。

本ページでは、DynamoDBデータ統合の機能概要を詳しく紹介し、作成手順を実践的に解説します。内容はDynamoDBコネクターの作成、ルールの作成、ルールのテストを含み、MQTTプロトコル経由でEMQXプラットフォームにシミュレートされた温度・湿度データを報告し、設定したデータ統合を通じてDynamoDBに保存する方法を示します。

動作の仕組み

DynamoDBデータ統合は、EMQXプラットフォームに標準搭載された機能であり、EMQXのデバイス接続とメッセージ送信機能をDynamoDBの強力なデータストレージ機能と組み合わせています。組み込みのデータ統合コンポーネントにより、EMQXからDynamoDBへのデータ取り込みと管理が簡素化され、複雑なコーディングを不要にします。

以下の図は、EMQXとDynamoDB間のデータ統合の典型的なアーキテクチャを示しています。

EMQX Platform Integration AWS DynamoDB

MQTTデータをDynamoDBに取り込む流れは以下の通りです。

  1. メッセージのパブリッシュと受信:接続された車両、IIoTシステム、エネルギー管理プラットフォームなどのIoTデバイスは、MQTTプロトコルを通じてEMQXに正常に接続し、特定のトピックにMQTTメッセージをパブリッシュします。EMQXはこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
  2. メッセージデータの処理:メッセージが到着するとルールエンジンを通過し、EMQXで定義されたルールに基づいて処理されます。ルールは事前に定義された条件に基づき、DynamoDBにルーティングすべきメッセージを判定します。ペイロード変換が指定されている場合は、データ形式の変換、特定情報のフィルタリング、追加コンテキストによるペイロードの拡充などが適用されます。
  3. DynamoDBへのデータ取り込み:ルールエンジンがDynamoDBへの保存対象メッセージを特定すると、DynamoDBへの転送アクションをトリガーします。処理済みデータはDynamoDBのテーブルにシームレスに書き込まれます。
  4. データの保存と活用:データがDynamoDBに保存されることで、企業はそのクエリ機能を活用して様々なユースケースに対応可能です。例えば、コネクテッドカー分野では、車両の状態管理やリアルタイム指標に基づくルート最適化、資産追跡に利用できます。IIoT環境では、機械の状態監視、メンテナンス予測、生産スケジュールの最適化などに活用されます。

特徴と利点

DynamoDBとのデータ統合は、効率的なデータ送信・保存・活用を実現するための多彩な特徴とメリットを提供します。

  • リアルタイムデータストリーミング:EMQXプラットフォームはリアルタイムデータストリーム処理に最適化されており、ソースシステムからDynamoDBへの効率的かつ信頼性の高いデータ送信を実現します。即時のインサイトやアクションが求められるユースケースに適しています。
  • 柔軟なデータ変換:EMQXプラットフォームは強力なSQLベースのルールエンジンを備え、DynamoDBに保存する前にデータを前処理可能です。フィルタリング、ルーティング、集約、拡充など多様なデータ変換機構をサポートし、ニーズに応じたデータ整形が可能です。
  • 柔軟なデータモデル:DynamoDBはキー・バリュー型およびドキュメント型のデータモデルを採用しており、構造化されたデバイスイベントやメッセージデータの保存・管理に適しています。異なるMQTTメッセージ構造の保存も容易です。
  • 強力なスケーラビリティ:EMQXプラットフォームはクラスターのスケーラビリティを備え、デバイス接続数やメッセージ量に応じてシームレスに水平スケール可能です。DynamoDBはサーバーやインフラ管理不要で、基盤リソースの管理とスケーリングを自動で行います。両者の組み合わせにより、高性能かつ高信頼のデータ保存とスケーラビリティを実現します。

はじめる前に

本節では、EMQXプラットフォームでDynamoDBデータ統合を作成するための準備作業を紹介します。

前提条件

ネットワーク設定

EMQX Platform上でデプロイメント(EMQXクラスター)を作成し、ネットワークを構成する必要があります。

  • 専用デプロイメントユーザーの場合:まず、VPCピアリング接続を作成してください。ピアリング接続が確立された後、内部ネットワークのIPを介してプラットフォームコンソールにログインし、対象のコネクターにアクセスできます。あるいは、NATゲートウェイを設定して、パブリックIP経由で対象のコネクターにアクセスすることも可能です。
  • BYOC(Bring Your Own Cloud)デプロイメントユーザーの場合:BYOCがデプロイされているVPCと対象コネクターが存在するVPC間でピアリング接続を確立してください。ピアリング接続作成後、内部ネットワークのIPを介して対象コネクターにアクセスできます。パブリックIPアドレス経由でリソースにアクセスする必要がある場合は、パブリッククラウドコンソールでBYOCがデプロイされているVPCに対してNATゲートウェイを構成してください。

AWS DynamoDBインスタンスのセットアップ

AWS DynamoDBの作成は、クラウド上またはDockerを利用してローカルに構築する方法があります。

コンソールでDynamoDBインスタンスとテーブルを作成

初めてDynamoDBインスタンスを作成する場合は、AWSのヘルプドキュメントを参照してください。

  1. DynamoDBコンソールにアクセスし、テーブル作成でtemp_humを指定します。
  2. テーブル名、パーティションキーなどの主要情報を入力し、その他は必要に応じてデフォルト設定のままにします。
  3. テーブルのステータスが「アクティブ」になれば、temp_humテーブルの作成が完了しています。

DynamoDBローカルサーバーのインストールとテーブル作成

  1. docker-composeファイルdynamo.yamlを用意し、DynamoDBローカルサーバーをセットアップします。

    bash
    version: '3.8'
    services:
    dynamo:
        command: "-jar DynamoDBLocal.jar -sharedDb"
        image: "amazon/dynamodb-local:latest"
        container_name: dynamo
        ports:
        - "8000:8000"
        environment:
        AWS_ACCESS_KEY_ID: root 
        AWS_SECRET_ACCESS_KEY: public
        AWS_DEFAULT_REGION: us-west-2
  2. サーバーを起動します。

    bash
    docker-compose -f dynamo.yaml up
  3. テーブル定義を作成し、ホームディレクトリにtemp_hum.jsonとして保存します。

    bash
    {
        "TableName": "temp_hum",
        "KeySchema": [
            { "AttributeName": "id", "KeyType": "HASH" }
        ],
        "AttributeDefinitions": [
            { "AttributeName": "id", "AttributeType": "S" }
        ],
        "ProvisionedThroughput": {
            "ReadCapacityUnits": 5,
            "WriteCapacityUnits": 5
        }
    }
  4. このファイルを使って新しいテーブルを作成します。

    bash
    docker run --rm -v ${HOME}:/dynamo_data -e AWS_ACCESS_KEY_ID=root -e AWS_SECRET_ACCESS_KEY=public -e AWS_DEFAULT_REGION=us-west-2 amazon/aws-cli dynamodb create-table --cli-input-json file:///dynamo_data/temp_hum.json --endpoint-url http://host.docker.internal:8000
  5. テーブルが正常に作成されたか確認します。

    bash
    docker run --rm -e AWS_ACCESS_KEY_ID=root -e AWS_SECRET_ACCESS_KEY=public -e AWS_DEFAULT_REGION=us-west-2 amazon/aws-cli dynamodb list-tables --endpoint-url http://host.docker.internal:8000

    正常に作成されていれば、以下のJSONが表示されます。

    bash
        {
            "TableNames": [
                "temp_hum"
            ]
        }

DynamoDBコネクターの作成

データ統合ルールを作成する前に、まずDynamoDBサーバーにアクセスするためのDynamoDBコネクターを作成します。

  1. デプロイメントに移動し、左側ナビゲーションメニューからデータ統合をクリックします。
  2. 初めてコネクターを作成する場合は、データ永続化カテゴリの中からDynamoDBを選択します。既にコネクターを作成済みの場合は、新規コネクターを選択し、データ転送カテゴリの中からDynamoDBを選択します。
  3. 新規コネクター画面で以下の項目を設定します。
    • DynamoDBリージョン:DynamoDBインスタンスが存在するリージョンを入力します。例:us-west-2
    • DynamoDBサーバー:DynamoDBサービスのエンドポイントを入力します。必ず「https://」のプレフィックスを含めてください。LocalStackを利用する場合はhttp://localhost:8000を指定します。
    • AWSアクセスキーIDアクセスキーIDを入力します。例:root
    • AWSシークレットアクセスキーシークレットアクセスキーを入力します。例:public
    • その他の設定はデフォルトのままか、ビジネス要件に応じて設定してください。
  4. テストボタンをクリックし、DynamoDBサービスにアクセス可能であれば成功メッセージが表示されます。
  5. 新規作成ボタンをクリックして作成を完了します。

ルールの作成

次に、書き込むデータを指定するルールを作成し、処理済みデータをDynamoDBに転送するアクションを追加します。

  1. ルールエリアで新規ルールをクリックするか、作成したコネクターのアクション列にある新規ルールアイコンをクリックします。

  2. SQLエディターにルールのマッチングSQL文を入力します。以下のSQL例は、temp_hum/emqxトピックに送信されたメッセージから報告時間up_timestamp、クライアントID、メッセージ本文(ペイロード)を読み取り、温度と湿度を抽出します。

    sql
     SELECT 
     id as msgid,
     topic, 
     payload 
     
     FROM
     "temp_hum/emqx"

    オンライン/オフライン状態の記録用ルールを作成する場合は、以下の文を入力します。

    sql
     SELECT
     str(event) + timestamp as id, *
     FROM 
     "$events/client_connected", "$events/client_disconnected"

    TIP

    初心者の方は、SQL例をクリックし、テストを有効化してSQLルールの学習とテストを行うことをおすすめします。

  3. 次へをクリックしてアクションを追加します。

  4. コネクターのドロップダウンから先ほど作成したコネクターを選択します。

  5. 以下の情報を設定します。

    • アクション名:システムが自動生成します。

    • テーブル名temp_humと入力します。

    • メッセージテンプレート:空欄の場合はメッセージ全体がデータベースに保存されます。テンプレートは有効なJSONで、プレースホルダーを含めることができ、テーブルのすべてのキーが含まれている必要があります。例:{"id" : "${id}", "clientid" : "${clientid}", "data" : "${payload.data}"}

      SQLテンプレート内で未定義のプレースホルダー変数がある場合は、メッセージテンプレート上部の未定義変数をNULLとして扱うスイッチでルールエンジンの動作を切り替えられます。

      • 無効(デフォルト):ルールエンジンは文字列undefinedをデータベースに挿入します。

      • 有効:変数が未定義の場合、ルールエンジンはNULLをデータベースに挿入します。

        TIP

        可能な限りこのオプションは有効にしてください。無効にするのは後方互換性を確保する場合のみです。

    • その他の設定はデフォルトのままか、ビジネス要件に応じて設定してください。

  6. 確認ボタンをクリックしてルール作成を完了します。

  7. 新規ルール作成成功のポップアップでルールに戻るをクリックし、データ統合設定の一連の流れを完了します。

ルールのテスト

MQTTXを使って温度・湿度データの報告をシミュレートすることを推奨しますが、他のクライアントでも構いません。

  1. MQTTXでデプロイメントに接続し、以下のトピックにメッセージを送信します。

    • トピック:temp_hum/emqx

    • ペイロード:

      json
      {
        "temp": "27.5",
        "hum": "41.8"
      }
  2. メッセージがDynamoDBに転送されているか確認します。

    • NoSQL Workbenchで結果を確認(任意)

      NoSQL WorkbenchはAmazon DynamoDB向けのクロスプラットフォームのクライアントGUIアプリケーションです。DynamoDBに接続してOperation Builderページに移動し、temp_humテーブルを選択すると、温度・湿度データの転送結果を確認できます。

    • データテーブルに書き込まれているか確認(任意)

      bash
      docker run --rm -e AWS_ACCESS_KEY_ID=root -e AWS_SECRET_ACCESS_KEY=public -e AWS_DEFAULT_REGION=us-west-2 amazon/aws-cli dynamodb scan --table-name=temp_hum --endpoint-url http://host.docker.internal:8000
  3. コンソールで運用データを確認します。ルール一覧のルールIDをクリックすると、そのルールの統計情報やルール配下のすべてのアクションの統計情報を閲覧できます。