Skip to content

DynamoDB に MQTT データをストリームする

DynamoDB は、AWS 上のフルマネージドで高性能なサーバレスのキー・バリューストア型データベースサービスです。高速でスケーラブルかつ信頼性の高いデータストレージを必要とするアプリケーション向けに設計されています。EMQX は DynamoDB との統合をサポートしており、MQTT メッセージやクライアントイベントを DynamoDB に保存することで、IoT デバイスの登録・管理やデバイスデータの長期保存およびリアルタイム分析を実現します。DynamoDB データ統合を通じて、MQTT メッセージやクライアントイベントを DynamoDB に保存できるだけでなく、イベントにより DynamoDB 内のデータの更新や削除をトリガーすることも可能であり、デバイスのオンライン状態や接続履歴などの情報を記録できます。

本ページでは、DynamoDB データ統合の機能について詳しく紹介し、作成手順を実践的に解説します。内容は DynamoDB コネクターの作成、ルールの作成、ルールのテストを含み、MQTT プロトコルを通じてシミュレートされた温度・湿度データを EMQX Cloud に報告し、設定したデータ統合を介して DynamoDB に保存する方法を示します。

動作の仕組み

DynamoDB データ統合は、EMQX Cloud に標準搭載された機能であり、EMQX のデバイス接続およびメッセージ送信機能と DynamoDB の強力なデータストレージ機能を組み合わせています。組み込みのデータ統合コンポーネントにより、EMQX から DynamoDB へのデータ取り込みと管理が簡素化され、複雑なコーディングを必要としません。

以下の図は、EMQX と DynamoDB 間のデータ統合の典型的なアーキテクチャを示しています。

EMQX Cloud Integration AWS DynamoDB

MQTT データを DynamoDB に取り込む流れは以下の通りです。

  1. メッセージのパブリッシュと受信:接続された車両、IIoT システム、エネルギー管理プラットフォームなどの IoT デバイスは、MQTT プロトコルを通じて EMQX に正常に接続し、特定のトピックに MQTT メッセージをパブリッシュします。EMQX はこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
  2. メッセージデータの処理:メッセージが到着すると、ルールエンジンを通過し、EMQX で定義されたルールによって処理されます。ルールは事前定義された条件に基づき、DynamoDB にルーティングすべきメッセージを判別します。ペイロード変換が指定されている場合は、データ形式の変換、特定情報のフィルタリング、追加コンテキストによるペイロードの拡充などの変換処理が適用されます。
  3. DynamoDB へのデータ取り込み:ルールエンジンが DynamoDB への保存対象メッセージを特定すると、DynamoDB への転送アクションをトリガーします。処理済みデータは DynamoDB データベースのコレクションにシームレスに書き込まれます。
  4. データの保存と活用:データが DynamoDB に保存されることで、企業はそのクエリ機能を活用してさまざまなユースケースに対応できます。例えば、コネクテッドビークル領域では、保存されたデータを用いて車両の状態管理、リアルタイムメトリクスに基づくルート最適化、資産追跡などを行えます。同様に IIoT 環境では、機械の状態監視、メンテナンス予測、生産スケジュールの最適化に利用可能です。

特徴と利点

DynamoDB とのデータ統合は、効率的なデータ送信、保存、活用を実現するための多彩な特徴と利点を提供します。

  • リアルタイムデータストリーミング:EMQX Cloud はリアルタイムのデータストリーム処理に最適化されており、ソースシステムから DynamoDB への効率的かつ信頼性の高いデータ伝送を保証します。即時のインサイトとアクションが必要なユースケースに適しています。
  • 柔軟なデータ変換:EMQX Cloud は強力な SQL ベースのルールエンジンを提供し、DynamoDB に保存する前にデータを前処理できます。フィルタリング、ルーティング、集約、拡充など多様なデータ変換機能をサポートし、ニーズに応じたデータ整形が可能です。
  • 柔軟なデータモデル:DynamoDB はキー・バリューおよびドキュメントデータモデルを採用しており、構造化されたデバイスイベントやメッセージデータの保存・管理に適しています。異なる MQTT メッセージ構造の保存も容易です。
  • 強力なスケーラビリティ:EMQX Cloud はクラスターのスケーラビリティを備え、デバイス接続数やメッセージ量に応じてシームレスに水平スケール可能です。DynamoDB はサーバやインフラ管理を不要とし、基盤リソースの管理とスケーリングを自動で行います。両者の組み合わせにより、高性能かつ高信頼なデータ保存とスケーラビリティを実現します。

はじめる前に

このセクションでは、EMQX Cloud で DynamoDB データ統合を作成するための準備作業を紹介します。

前提条件

ネットワークの設定

データ統合を構成する前に、EMQX Cloudのデプロイメントを作成し、EMQX Cloudと対象サービス間のネットワーク接続を確立していることを確認してください。

  • Dedicated Flexデプロイメントの場合

    EMQX CloudのVPCと対象サービスのVPC間でVPCピアリング接続を作成します。ピアリング接続が確立されると、EMQX Cloudは対象サービスのプライベートIPアドレスを介してアクセス可能になります。

    パブリックIP経由でのアクセスが必要な場合は、NATゲートウェイを構成してアウトバウンド接続を有効にしてください。

  • BYOC(Bring Your Own Cloud)デプロイメントの場合

    BYOCデプロイメントが稼働しているVPCと対象サービスをホストするVPC間でVPCピアリング接続を作成します。ピアリングが確立されると、対象サービスのプライベートIPアドレスを介してアクセス可能になります。

    対象サービスにパブリックIP経由でアクセスする必要がある場合は、クラウドプロバイダーのコンソールを使用してBYOC VPCにNATゲートウェイを構成してください。

AWS DynamoDB インスタンスのセットアップ

AWS DynamoDB の作成は、クラウド上または Docker を利用してローカルにインストールする方法があります。

コンソールで DynamoDB インスタンスとテーブルを作成

DynamoDB インスタンスを初めて作成する場合は、AWS ヘルプドキュメントを参照してください。

  1. DynamoDB コンソールにアクセスし、temp_hum という名前でテーブルを作成します。
  2. テーブル名、パーティションキーなどの主要情報を入力し、その他は必要に応じてデフォルト設定のままにします。
  3. テーブルのステータスが「アクティブ」になれば、temp_hum テーブルの作成は成功です。

DynamoDB ローカルサーバーのインストールとテーブル作成

  1. docker-compose ファイル dynamo.yaml を用意し、DynamoDB ローカルサーバーをセットアップします。

    bash
    version: '3.8'
    services:
    dynamo:
        command: "-jar DynamoDBLocal.jar -sharedDb"
        image: "amazon/dynamodb-local:latest"
        container_name: dynamo
        ports:
        - "8000:8000"
        environment:
        AWS_ACCESS_KEY_ID: root 
        AWS_SECRET_ACCESS_KEY: public
        AWS_DEFAULT_REGION: us-west-2
  2. サーバーを起動します。

    bash
    docker-compose -f dynamo.yaml up
  3. テーブル定義を作成し、ホームディレクトリに temp_hum.json として保存します。

    bash
    {
        "TableName": "temp_hum",
        "KeySchema": [
            { "AttributeName": "id", "KeyType": "HASH" }
        ],
        "AttributeDefinitions": [
            { "AttributeName": "id", "AttributeType": "S" }
        ],
        "ProvisionedThroughput": {
            "ReadCapacityUnits": 5,
            "WriteCapacityUnits": 5
        }
    }
  4. このファイルを使って新しいテーブルを作成します。

    bash
    docker run --rm -v ${HOME}:/dynamo_data -e AWS_ACCESS_KEY_ID=root -e AWS_SECRET_ACCESS_KEY=public -e AWS_DEFAULT_REGION=us-west-2 amazon/aws-cli dynamodb create-table --cli-input-json file:///dynamo_data/temp_hum.json --endpoint-url http://host.docker.internal:8000
  5. テーブルが正常に作成されたか確認します。

    bash
    docker run --rm -e AWS_ACCESS_KEY_ID=root -e AWS_SECRET_ACCESS_KEY=public -e AWS_DEFAULT_REGION=us-west-2 amazon/aws-cli dynamodb list-tables --endpoint-url http://host.docker.internal:8000

    テーブル作成が成功していれば、以下の JSON が出力されます。

    bash
        {
            "TableNames": [
                "temp_hum"
            ]
        }

DynamoDB コネクターの作成

データ統合ルールを作成する前に、まず DynamoDB サーバーにアクセスするための DynamoDB コネクターを作成する必要があります。

  1. ご利用のデプロイメントにアクセスし、左側ナビゲーションメニューから Data Integration をクリックします。
  2. 初めてコネクターを作成する場合は、Data Persistence カテゴリの中から DynamoDB を選択します。すでにコネクターを作成済みの場合は、New Connector を選択し、Data Forward カテゴリから DynamoDB を選択します。
  3. New Connector ページで以下の項目を設定します:
    • DynamoDB Region:DynamoDB インスタンスが存在するリージョンを入力します。例:us-west-2
    • DynamoDB Server:DynamoDB サービスのエンドポイントを入力します。必ず "https://" プレフィックスを含めてください。LocalStack を使用する場合は http://localhost:8000 を指定します。
    • AWS Access Key IDアクセスキーIDを入力します。例:root
    • AWS Secret Access Keyシークレットアクセスキーを入力します。例:public
    • その他の設定はデフォルト値を使用するか、ビジネス要件に応じて設定してください。
  4. Test ボタンをクリックします。DynamoDB サービスにアクセス可能であれば、成功メッセージが表示されます。
  5. New ボタンをクリックして作成を完了します。

ルールの作成

次に、書き込むデータを指定し、処理済みデータを DynamoDB に転送するアクションをルールに追加します。

  1. ルールエリアで New Rule をクリックするか、作成したコネクターの Actions 列にある新規ルールアイコンをクリックします。

  2. SQL editor にルールのマッチング SQL 文を入力します。以下の例は、temp_hum/emqx トピックに送信されたメッセージから報告時間 up_timestamp、クライアントID、メッセージ本文(ペイロード)を読み取り、温度と湿度を抽出します。

    sql
     SELECT 
     id as msgid,
     topic, 
     payload 
     
     FROM
     "temp_hum/emqx"

    オンライン/オフライン状態の記録用ルールを作成する場合は、以下の文を入力します。

    sql
     SELECT
     str(event) + timestamp as id, *
     FROM 
     "$events/client_connected", "$events/client_disconnected"

    TIP

    初心者の方は、SQL ExamplesTry It Out をクリックして SQL ルールを学習・テストできます。

  3. Next をクリックしてアクションを追加します。

  4. Connector ドロップダウンから先ほど作成したコネクターを選択します。

  5. 以下の情報を設定します:

    • Action Name:システムが自動的にアクション名を生成します。

    • Table Nametemp_hum と入力します。

    • Message Template:空欄の場合はメッセージ全体がデータベースに保存されます。テンプレートはプレースホルダーを含む有効な JSON で、テーブルの全てのキーを含める必要があります。例:{"id" : "${id}", "clientid" : "${clientid}", "data" : "${payload.data}"}

      SQL テンプレート内に未定義のプレースホルダー変数がある場合は、Message template 上部の Undefined Vars as Null スイッチでルールエンジンの動作を切り替えられます:

      • Disabled(デフォルト):未定義変数は文字列 undefined としてデータベースに挿入されます。

      • Enabled:未定義変数の場合、NULL をデータベースに挿入します。

        TIP

        可能な限りこのオプションは有効にしてください。無効化は後方互換性のためのみ推奨されます。

    • その他の設定はデフォルト値を使用するか、ビジネス要件に応じて設定してください。

  6. Confirm ボタンをクリックしてルール作成を完了します。

  7. Successful new rule ポップアップで Back to Rules をクリックし、データ統合の設定チェーンを完了します。

ルールのテスト

MQTTX を使って温度・湿度データの報告をシミュレートすることを推奨しますが、他のクライアントでも可能です。

  1. MQTTX でデプロイメントに接続し、以下のトピックにメッセージを送信します。

    • トピック:temp_hum/emqx

    • ペイロード:

      json
      {
        "temp": "27.5",
        "hum": "41.8"
      }
  2. メッセージが DynamoDB に転送されているか確認します。

    • NoSQL Workbench で結果を確認(任意)。

      NoSQL Workbench は Amazon DynamoDB 向けのクロスプラットフォームのクライアントサイド GUI アプリケーションです。DynamoDB に接続して Operation Builder ページで temp_hum テーブルを選択し、温度・湿度データの転送結果を確認できます。

    • データテーブルに書き込まれているか確認(任意)。

      bash
      docker run --rm -e AWS_ACCESS_KEY_ID=root -e AWS_SECRET_ACCESS_KEY=public -e AWS_DEFAULT_REGION=us-west-2 amazon/aws-cli dynamodb scan --table-name=temp_hum --endpoint-url http://host.docker.internal:8000
  3. コンソールで運用データを確認します。ルールリストのルール ID をクリックすると、ルールの統計情報およびそのルールに紐づく全アクションの統計情報を閲覧できます。