DynamoDB に MQTT データをストリームする
DynamoDB は、AWS 上のフルマネージドで高性能なサーバレスのキー・バリューストア型データベースサービスです。高速でスケーラブルかつ信頼性の高いデータストレージを必要とするアプリケーション向けに設計されています。EMQX は DynamoDB との統合をサポートしており、MQTT メッセージやクライアントイベントを DynamoDB に保存することで、IoT デバイスの登録・管理やデバイスデータの長期保存およびリアルタイム分析を実現します。DynamoDB データ統合を通じて、MQTT メッセージやクライアントイベントを DynamoDB に保存できるだけでなく、イベントにより DynamoDB 内のデータの更新や削除をトリガーすることも可能であり、デバイスのオンライン状態や接続履歴などの情報を記録できます。
本ページでは、DynamoDB データ統合の機能について詳しく紹介し、作成手順を実践的に解説します。内容は DynamoDB コネクターの作成、ルールの作成、ルールのテストを含み、MQTT プロトコルを通じてシミュレートされた温度・湿度データを EMQX Cloud に報告し、設定したデータ統合を介して DynamoDB に保存する方法を示します。
動作の仕組み
DynamoDB データ統合は、EMQX Cloud に標準搭載された機能であり、EMQX のデバイス接続およびメッセージ送信機能と DynamoDB の強力なデータストレージ機能を組み合わせています。組み込みのデータ統合コンポーネントにより、EMQX から DynamoDB へのデータ取り込みと管理が簡素化され、複雑なコーディングを必要としません。
以下の図は、EMQX と DynamoDB 間のデータ統合の典型的なアーキテクチャを示しています。

MQTT データを DynamoDB に取り込む流れは以下の通りです。
- メッセージのパブリッシュと受信:接続された車両、IIoT システム、エネルギー管理プラットフォームなどの IoT デバイスは、MQTT プロトコルを通じて EMQX に正常に接続し、特定のトピックに MQTT メッセージをパブリッシュします。EMQX はこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
- メッセージデータの処理:メッセージが到着すると、ルールエンジンを通過し、EMQX で定義されたルールによって処理されます。ルールは事前定義された条件に基づき、DynamoDB にルーティングすべきメッセージを判別します。ペイロード変換が指定されている場合は、データ形式の変換、特定情報のフィルタリング、追加コンテキストによるペイロードの拡充などの変換処理が適用されます。
- DynamoDB へのデータ取り込み:ルールエンジンが DynamoDB への保存対象メッセージを特定すると、DynamoDB への転送アクションをトリガーします。処理済みデータは DynamoDB データベースのコレクションにシームレスに書き込まれます。
- データの保存と活用:データが DynamoDB に保存されることで、企業はそのクエリ機能を活用してさまざまなユースケースに対応できます。例えば、コネクテッドビークル領域では、保存されたデータを用いて車両の状態管理、リアルタイムメトリクスに基づくルート最適化、資産追跡などを行えます。同様に IIoT 環境では、機械の状態監視、メンテナンス予測、生産スケジュールの最適化に利用可能です。
特徴と利点
DynamoDB とのデータ統合は、効率的なデータ送信、保存、活用を実現するための多彩な特徴と利点を提供します。
- リアルタイムデータストリーミング:EMQX Cloud はリアルタイムのデータストリーム処理に最適化されており、ソースシステムから DynamoDB への効率的かつ信頼性の高いデータ伝送を保証します。即時のインサイトとアクションが必要なユースケースに適しています。
- 柔軟なデータ変換:EMQX Cloud は強力な SQL ベースのルールエンジンを提供し、DynamoDB に保存する前にデータを前処理できます。フィルタリング、ルーティング、集約、拡充など多様なデータ変換機能をサポートし、ニーズに応じたデータ整形が可能です。
- 柔軟なデータモデル:DynamoDB はキー・バリューおよびドキュメントデータモデルを採用しており、構造化されたデバイスイベントやメッセージデータの保存・管理に適しています。異なる MQTT メッセージ構造の保存も容易です。
- 強力なスケーラビリティ:EMQX Cloud はクラスターのスケーラビリティを備え、デバイス接続数やメッセージ量に応じてシームレスに水平スケール可能です。DynamoDB はサーバやインフラ管理を不要とし、基盤リソースの管理とスケーリングを自動で行います。両者の組み合わせにより、高性能かつ高信頼なデータ保存とスケーラビリティを実現します。
はじめる前に
このセクションでは、EMQX Cloud で DynamoDB データ統合を作成するための準備作業を紹介します。
前提条件
ネットワークの設定
データ統合を構成する前に、EMQX Cloudのデプロイメントを作成し、EMQX Cloudと対象サービス間のネットワーク接続を確立していることを確認してください。
Dedicated Flexデプロイメントの場合:
EMQX CloudのVPCと対象サービスのVPC間でVPCピアリング接続を作成します。ピアリング接続が確立されると、EMQX Cloudは対象サービスのプライベートIPアドレスを介してアクセス可能になります。
パブリックIP経由でのアクセスが必要な場合は、NATゲートウェイを構成してアウトバウンド接続を有効にしてください。
BYOC(Bring Your Own Cloud)デプロイメントの場合:
BYOCデプロイメントが稼働しているVPCと対象サービスをホストするVPC間でVPCピアリング接続を作成します。ピアリングが確立されると、対象サービスのプライベートIPアドレスを介してアクセス可能になります。
対象サービスにパブリックIP経由でアクセスする必要がある場合は、クラウドプロバイダーのコンソールを使用してBYOC VPCにNATゲートウェイを構成してください。
AWS DynamoDB インスタンスのセットアップ
AWS DynamoDB の作成は、クラウド上または Docker を利用してローカルにインストールする方法があります。
コンソールで DynamoDB インスタンスとテーブルを作成
DynamoDB インスタンスを初めて作成する場合は、AWS ヘルプドキュメントを参照してください。
- DynamoDB コンソールにアクセスし、
temp_humという名前でテーブルを作成します。 - テーブル名、パーティションキーなどの主要情報を入力し、その他は必要に応じてデフォルト設定のままにします。
- テーブルのステータスが「アクティブ」になれば、
temp_humテーブルの作成は成功です。
DynamoDB ローカルサーバーのインストールとテーブル作成
docker-compose ファイル
dynamo.yamlを用意し、DynamoDB ローカルサーバーをセットアップします。bashversion: '3.8' services: dynamo: command: "-jar DynamoDBLocal.jar -sharedDb" image: "amazon/dynamodb-local:latest" container_name: dynamo ports: - "8000:8000" environment: AWS_ACCESS_KEY_ID: root AWS_SECRET_ACCESS_KEY: public AWS_DEFAULT_REGION: us-west-2サーバーを起動します。
bashdocker-compose -f dynamo.yaml upテーブル定義を作成し、ホームディレクトリに
temp_hum.jsonとして保存します。bash{ "TableName": "temp_hum", "KeySchema": [ { "AttributeName": "id", "KeyType": "HASH" } ], "AttributeDefinitions": [ { "AttributeName": "id", "AttributeType": "S" } ], "ProvisionedThroughput": { "ReadCapacityUnits": 5, "WriteCapacityUnits": 5 } }このファイルを使って新しいテーブルを作成します。
bashdocker run --rm -v ${HOME}:/dynamo_data -e AWS_ACCESS_KEY_ID=root -e AWS_SECRET_ACCESS_KEY=public -e AWS_DEFAULT_REGION=us-west-2 amazon/aws-cli dynamodb create-table --cli-input-json file:///dynamo_data/temp_hum.json --endpoint-url http://host.docker.internal:8000テーブルが正常に作成されたか確認します。
bashdocker run --rm -e AWS_ACCESS_KEY_ID=root -e AWS_SECRET_ACCESS_KEY=public -e AWS_DEFAULT_REGION=us-west-2 amazon/aws-cli dynamodb list-tables --endpoint-url http://host.docker.internal:8000テーブル作成が成功していれば、以下の JSON が出力されます。
bash{ "TableNames": [ "temp_hum" ] }
DynamoDB コネクターの作成
データ統合ルールを作成する前に、まず DynamoDB サーバーにアクセスするための DynamoDB コネクターを作成する必要があります。
- ご利用のデプロイメントにアクセスし、左側ナビゲーションメニューから Data Integration をクリックします。
- 初めてコネクターを作成する場合は、Data Persistence カテゴリの中から DynamoDB を選択します。すでにコネクターを作成済みの場合は、New Connector を選択し、Data Forward カテゴリから DynamoDB を選択します。
- New Connector ページで以下の項目を設定します:
- DynamoDB Region:DynamoDB インスタンスが存在するリージョンを入力します。例:
us-west-2。 - DynamoDB Server:DynamoDB サービスのエンドポイントを入力します。必ず "https://" プレフィックスを含めてください。LocalStack を使用する場合は
http://localhost:8000を指定します。 - AWS Access Key ID:アクセスキーIDを入力します。例:
root。 - AWS Secret Access Key:シークレットアクセスキーを入力します。例:
public。 - その他の設定はデフォルト値を使用するか、ビジネス要件に応じて設定してください。
- DynamoDB Region:DynamoDB インスタンスが存在するリージョンを入力します。例:
- Test ボタンをクリックします。DynamoDB サービスにアクセス可能であれば、成功メッセージが表示されます。
- New ボタンをクリックして作成を完了します。
ルールの作成
次に、書き込むデータを指定し、処理済みデータを DynamoDB に転送するアクションをルールに追加します。
ルールエリアで New Rule をクリックするか、作成したコネクターの Actions 列にある新規ルールアイコンをクリックします。
SQL editor にルールのマッチング SQL 文を入力します。以下の例は、
temp_hum/emqxトピックに送信されたメッセージから報告時間up_timestamp、クライアントID、メッセージ本文(ペイロード)を読み取り、温度と湿度を抽出します。sqlSELECT id as msgid, topic, payload FROM "temp_hum/emqx"オンライン/オフライン状態の記録用ルールを作成する場合は、以下の文を入力します。
sqlSELECT str(event) + timestamp as id, * FROM "$events/client_connected", "$events/client_disconnected"TIP
初心者の方は、SQL Examples と Try It Out をクリックして SQL ルールを学習・テストできます。
Next をクリックしてアクションを追加します。
Connector ドロップダウンから先ほど作成したコネクターを選択します。
以下の情報を設定します:
Action Name:システムが自動的にアクション名を生成します。
Table Name:
temp_humと入力します。Message Template:空欄の場合はメッセージ全体がデータベースに保存されます。テンプレートはプレースホルダーを含む有効な JSON で、テーブルの全てのキーを含める必要があります。例:
{"id" : "${id}", "clientid" : "${clientid}", "data" : "${payload.data}"}。SQL テンプレート内に未定義のプレースホルダー変数がある場合は、Message template 上部の Undefined Vars as Null スイッチでルールエンジンの動作を切り替えられます:
Disabled(デフォルト):未定義変数は文字列
undefinedとしてデータベースに挿入されます。Enabled:未定義変数の場合、
NULLをデータベースに挿入します。TIP
可能な限りこのオプションは有効にしてください。無効化は後方互換性のためのみ推奨されます。
その他の設定はデフォルト値を使用するか、ビジネス要件に応じて設定してください。
Confirm ボタンをクリックしてルール作成を完了します。
Successful new rule ポップアップで Back to Rules をクリックし、データ統合の設定チェーンを完了します。
ルールのテスト
MQTTX を使って温度・湿度データの報告をシミュレートすることを推奨しますが、他のクライアントでも可能です。
MQTTX でデプロイメントに接続し、以下のトピックにメッセージを送信します。
トピック:
temp_hum/emqxペイロード:
json{ "temp": "27.5", "hum": "41.8" }
メッセージが DynamoDB に転送されているか確認します。
NoSQL Workbench で結果を確認(任意)。
NoSQL Workbench は Amazon DynamoDB 向けのクロスプラットフォームのクライアントサイド GUI アプリケーションです。DynamoDB に接続して Operation Builder ページで
temp_humテーブルを選択し、温度・湿度データの転送結果を確認できます。データテーブルに書き込まれているか確認(任意)。
bashdocker run --rm -e AWS_ACCESS_KEY_ID=root -e AWS_SECRET_ACCESS_KEY=public -e AWS_DEFAULT_REGION=us-west-2 amazon/aws-cli dynamodb scan --table-name=temp_hum --endpoint-url http://host.docker.internal:8000
コンソールで運用データを確認します。ルールリストのルール ID をクリックすると、ルールの統計情報およびそのルールに紐づく全アクションの統計情報を閲覧できます。