DynamoDBへのMQTTデータストリーミング
DynamoDBは、AWS上のフルマネージドで高性能なサーバーレスのキー・バリューストア型データベースサービスです。高速でスケーラブルかつ信頼性の高いデータストレージを必要とするアプリケーション向けに設計されています。EMQXはDynamoDBとの連携をサポートしており、MQTTメッセージやクライアントイベントをDynamoDBに保存することで、IoTデバイスの登録・管理やデバイスデータの長期保存およびリアルタイム分析を実現します。DynamoDBのデータ統合を通じて、MQTTメッセージやクライアントイベントをDynamoDBに格納できるほか、イベントに応じてDynamoDB内のデータの更新や削除をトリガーし、デバイスのオンライン状態や接続履歴などの情報を記録可能です。
本ページでは、DynamoDBデータ統合の機能概要を詳しく紹介し、作成手順を実践的に解説します。内容はDynamoDBコネクターの作成、ルールの作成、ルールのテストを含み、MQTTプロトコル経由でEMQXプラットフォームにシミュレートされた温度・湿度データを報告し、設定したデータ統合を通じてDynamoDBに保存する方法を示します。
動作の仕組み
DynamoDBデータ統合は、EMQXプラットフォームに標準搭載された機能であり、EMQXのデバイス接続とメッセージ送信機能をDynamoDBの強力なデータストレージ機能と組み合わせています。組み込みのデータ統合コンポーネントにより、EMQXからDynamoDBへのデータ取り込みと管理が簡素化され、複雑なコーディングを不要にします。
以下の図は、EMQXとDynamoDB間のデータ統合の典型的なアーキテクチャを示しています。
MQTTデータをDynamoDBに取り込む流れは以下の通りです。
- メッセージのパブリッシュと受信:接続された車両、IIoTシステム、エネルギー管理プラットフォームなどのIoTデバイスは、MQTTプロトコルを通じてEMQXに正常に接続し、特定のトピックにMQTTメッセージをパブリッシュします。EMQXはこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
- メッセージデータの処理:メッセージが到着するとルールエンジンを通過し、EMQXで定義されたルールに基づいて処理されます。ルールは事前に定義された条件に基づき、DynamoDBにルーティングすべきメッセージを判定します。ペイロード変換が指定されている場合は、データ形式の変換、特定情報のフィルタリング、追加コンテキストによるペイロードの拡充などが適用されます。
- DynamoDBへのデータ取り込み:ルールエンジンがDynamoDBへの保存対象メッセージを特定すると、DynamoDBへの転送アクションをトリガーします。処理済みデータはDynamoDBのテーブルにシームレスに書き込まれます。
- データの保存と活用:データがDynamoDBに保存されることで、企業はそのクエリ機能を活用して様々なユースケースに対応可能です。例えば、コネクテッドカー分野では、車両の状態管理やリアルタイム指標に基づくルート最適化、資産追跡に利用できます。IIoT環境では、機械の状態監視、メンテナンス予測、生産スケジュールの最適化などに活用されます。
特徴と利点
DynamoDBとのデータ統合は、効率的なデータ送信・保存・活用を実現するための多彩な特徴とメリットを提供します。
- リアルタイムデータストリーミング:EMQXプラットフォームはリアルタイムデータストリーム処理に最適化されており、ソースシステムからDynamoDBへの効率的かつ信頼性の高いデータ送信を実現します。即時のインサイトやアクションが求められるユースケースに適しています。
- 柔軟なデータ変換:EMQXプラットフォームは強力なSQLベースのルールエンジンを備え、DynamoDBに保存する前にデータを前処理可能です。フィルタリング、ルーティング、集約、拡充など多様なデータ変換機構をサポートし、ニーズに応じたデータ整形が可能です。
- 柔軟なデータモデル:DynamoDBはキー・バリュー型およびドキュメント型のデータモデルを採用しており、構造化されたデバイスイベントやメッセージデータの保存・管理に適しています。異なるMQTTメッセージ構造の保存も容易です。
- 強力なスケーラビリティ:EMQXプラットフォームはクラスターのスケーラビリティを備え、デバイス接続数やメッセージ量に応じてシームレスに水平スケール可能です。DynamoDBはサーバーやインフラ管理不要で、基盤リソースの管理とスケーリングを自動で行います。両者の組み合わせにより、高性能かつ高信頼のデータ保存とスケーラビリティを実現します。
はじめる前に
本節では、EMQXプラットフォームでDynamoDBデータ統合を作成するための準備作業を紹介します。
前提条件
ネットワーク設定
EMQX Platform上でデプロイメント(EMQXクラスター)を作成し、ネットワークを構成する必要があります。
- 専用デプロイメントユーザーの場合:まず、VPCピアリング接続を作成してください。ピアリング接続が確立された後、内部ネットワークのIPを介してプラットフォームコンソールにログインし、対象のコネクターにアクセスできます。あるいは、NATゲートウェイを設定して、パブリックIP経由で対象のコネクターにアクセスすることも可能です。
- BYOC(Bring Your Own Cloud)デプロイメントユーザーの場合:BYOCがデプロイされているVPCと対象コネクターが存在するVPC間でピアリング接続を確立してください。ピアリング接続作成後、内部ネットワークのIPを介して対象コネクターにアクセスできます。パブリックIPアドレス経由でリソースにアクセスする必要がある場合は、パブリッククラウドコンソールでBYOCがデプロイされているVPCに対してNATゲートウェイを構成してください。
AWS DynamoDBインスタンスのセットアップ
AWS DynamoDBの作成は、クラウド上またはDockerを利用してローカルに構築する方法があります。
コンソールでDynamoDBインスタンスとテーブルを作成
初めてDynamoDBインスタンスを作成する場合は、AWSのヘルプドキュメントを参照してください。
- DynamoDBコンソールにアクセスし、テーブル作成で
temp_hum
を指定します。 - テーブル名、パーティションキーなどの主要情報を入力し、その他は必要に応じてデフォルト設定のままにします。
- テーブルのステータスが「アクティブ」になれば、
temp_hum
テーブルの作成が完了しています。
DynamoDBローカルサーバーのインストールとテーブル作成
docker-composeファイル
dynamo.yaml
を用意し、DynamoDBローカルサーバーをセットアップします。bashversion: '3.8' services: dynamo: command: "-jar DynamoDBLocal.jar -sharedDb" image: "amazon/dynamodb-local:latest" container_name: dynamo ports: - "8000:8000" environment: AWS_ACCESS_KEY_ID: root AWS_SECRET_ACCESS_KEY: public AWS_DEFAULT_REGION: us-west-2
サーバーを起動します。
bashdocker-compose -f dynamo.yaml up
テーブル定義を作成し、ホームディレクトリに
temp_hum.json
として保存します。bash{ "TableName": "temp_hum", "KeySchema": [ { "AttributeName": "id", "KeyType": "HASH" } ], "AttributeDefinitions": [ { "AttributeName": "id", "AttributeType": "S" } ], "ProvisionedThroughput": { "ReadCapacityUnits": 5, "WriteCapacityUnits": 5 } }
このファイルを使って新しいテーブルを作成します。
bashdocker run --rm -v ${HOME}:/dynamo_data -e AWS_ACCESS_KEY_ID=root -e AWS_SECRET_ACCESS_KEY=public -e AWS_DEFAULT_REGION=us-west-2 amazon/aws-cli dynamodb create-table --cli-input-json file:///dynamo_data/temp_hum.json --endpoint-url http://host.docker.internal:8000
テーブルが正常に作成されたか確認します。
bashdocker run --rm -e AWS_ACCESS_KEY_ID=root -e AWS_SECRET_ACCESS_KEY=public -e AWS_DEFAULT_REGION=us-west-2 amazon/aws-cli dynamodb list-tables --endpoint-url http://host.docker.internal:8000
正常に作成されていれば、以下のJSONが表示されます。
bash{ "TableNames": [ "temp_hum" ] }
DynamoDBコネクターの作成
データ統合ルールを作成する前に、まずDynamoDBサーバーにアクセスするためのDynamoDBコネクターを作成します。
- デプロイメントに移動し、左側ナビゲーションメニューからデータ統合をクリックします。
- 初めてコネクターを作成する場合は、データ永続化カテゴリの中からDynamoDBを選択します。既にコネクターを作成済みの場合は、新規コネクターを選択し、データ転送カテゴリの中からDynamoDBを選択します。
- 新規コネクター画面で以下の項目を設定します。
- DynamoDBリージョン:DynamoDBインスタンスが存在するリージョンを入力します。例:
us-west-2
- DynamoDBサーバー:DynamoDBサービスのエンドポイントを入力します。必ず「https://」のプレフィックスを含めてください。LocalStackを利用する場合は
http://localhost:8000
を指定します。 - AWSアクセスキーID:アクセスキーIDを入力します。例:
root
- AWSシークレットアクセスキー:シークレットアクセスキーを入力します。例:
public
- その他の設定はデフォルトのままか、ビジネス要件に応じて設定してください。
- DynamoDBリージョン:DynamoDBインスタンスが存在するリージョンを入力します。例:
- テストボタンをクリックし、DynamoDBサービスにアクセス可能であれば成功メッセージが表示されます。
- 新規作成ボタンをクリックして作成を完了します。
ルールの作成
次に、書き込むデータを指定するルールを作成し、処理済みデータをDynamoDBに転送するアクションを追加します。
ルールエリアで新規ルールをクリックするか、作成したコネクターのアクション列にある新規ルールアイコンをクリックします。
SQLエディターにルールのマッチングSQL文を入力します。以下のSQL例は、
temp_hum/emqx
トピックに送信されたメッセージから報告時間up_timestamp
、クライアントID、メッセージ本文(ペイロード)を読み取り、温度と湿度を抽出します。sqlSELECT id as msgid, topic, payload FROM "temp_hum/emqx"
オンライン/オフライン状態の記録用ルールを作成する場合は、以下の文を入力します。
sqlSELECT str(event) + timestamp as id, * FROM "$events/client_connected", "$events/client_disconnected"
TIP
初心者の方は、SQL例をクリックし、テストを有効化してSQLルールの学習とテストを行うことをおすすめします。
次へをクリックしてアクションを追加します。
コネクターのドロップダウンから先ほど作成したコネクターを選択します。
以下の情報を設定します。
アクション名:システムが自動生成します。
テーブル名:
temp_hum
と入力します。メッセージテンプレート:空欄の場合はメッセージ全体がデータベースに保存されます。テンプレートは有効なJSONで、プレースホルダーを含めることができ、テーブルのすべてのキーが含まれている必要があります。例:
{"id" : "${id}", "clientid" : "${clientid}", "data" : "${payload.data}"}
SQLテンプレート内で未定義のプレースホルダー変数がある場合は、メッセージテンプレート上部の未定義変数をNULLとして扱うスイッチでルールエンジンの動作を切り替えられます。
無効(デフォルト):ルールエンジンは文字列
undefined
をデータベースに挿入します。有効:変数が未定義の場合、ルールエンジンは
NULL
をデータベースに挿入します。TIP
可能な限りこのオプションは有効にしてください。無効にするのは後方互換性を確保する場合のみです。
その他の設定はデフォルトのままか、ビジネス要件に応じて設定してください。
確認ボタンをクリックしてルール作成を完了します。
新規ルール作成成功のポップアップでルールに戻るをクリックし、データ統合設定の一連の流れを完了します。
ルールのテスト
MQTTXを使って温度・湿度データの報告をシミュレートすることを推奨しますが、他のクライアントでも構いません。
MQTTXでデプロイメントに接続し、以下のトピックにメッセージを送信します。
トピック:
temp_hum/emqx
ペイロード:
json{ "temp": "27.5", "hum": "41.8" }
メッセージがDynamoDBに転送されているか確認します。
NoSQL Workbenchで結果を確認(任意)
NoSQL WorkbenchはAmazon DynamoDB向けのクロスプラットフォームのクライアントGUIアプリケーションです。DynamoDBに接続してOperation Builderページに移動し、
temp_hum
テーブルを選択すると、温度・湿度データの転送結果を確認できます。データテーブルに書き込まれているか確認(任意)
bashdocker run --rm -e AWS_ACCESS_KEY_ID=root -e AWS_SECRET_ACCESS_KEY=public -e AWS_DEFAULT_REGION=us-west-2 amazon/aws-cli dynamodb scan --table-name=temp_hum --endpoint-url http://host.docker.internal:8000
コンソールで運用データを確認します。ルール一覧のルールIDをクリックすると、そのルールの統計情報やルール配下のすべてのアクションの統計情報を閲覧できます。