DynamoDBへのMQTTデータ取り込み
DynamoDBは、AWS上のフルマネージドで高性能なサーバーレスのキー・バリューストア型データベースサービスです。高速でスケーラブルかつ信頼性の高いデータストレージを必要とするアプリケーション向けに設計されています。EMQXはDynamoDBとの統合をサポートしており、MQTTメッセージやクライアントイベントをDynamoDBに保存することで、IoTデバイスの登録・管理やデバイスデータの長期保存およびリアルタイム分析を実現します。DynamoDBのデータ統合を通じて、MQTTメッセージやクライアントイベントをDynamoDBに格納できるほか、イベントに応じてDynamoDB内のデータの更新や削除をトリガーすることも可能であり、デバイスのオンライン状態や接続履歴などの情報を記録できます。
本ページでは、EMQXとDynamoDB間のデータ統合について包括的に紹介し、データ統合の作成および検証方法を実践的に解説します。
動作の仕組み
DynamoDBデータ統合は、EMQXに標準搭載された機能であり、EMQXのデバイス接続およびメッセージ送受信機能とDynamoDBの強力なデータストレージ機能を組み合わせています。組み込みのルールエンジンコンポーネントを活用することで、EMQXからDynamoDBへのデータ取り込みを簡素化し、複雑なコーディングを不要にします。
以下の図は、EMQXとDynamoDB間のデータ統合の典型的なアーキテクチャを示しています。

MQTTデータをDynamoDBに取り込む流れは次の通りです。
- メッセージのパブリッシュと受信:接続された車両、IIoTシステム、エネルギー管理プラットフォームなどのIoTデバイスは、MQTTプロトコルを通じてEMQXに正常に接続し、特定のトピックにMQTTメッセージをパブリッシュします。EMQXがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
- メッセージデータの処理:メッセージが到着すると、ルールエンジンを通過し、EMQXで定義されたルールによって処理されます。ルールは事前定義された条件に基づき、どのメッセージをDynamoDBにルーティングするかを判断します。ペイロード変換が指定されている場合は、データ形式の変換、特定情報のフィルタリング、追加コンテキストによるペイロードの拡充などの変換が適用されます。
- DynamoDBへのデータ取り込み:ルールエンジンがDynamoDBへの保存対象メッセージを特定すると、メッセージをDynamoDBに転送するアクションをトリガーします。処理済みデータはDynamoDBのコレクションにシームレスに書き込まれます。
- データの保存と活用:DynamoDBにデータが保存されることで、企業はそのクエリ機能を活用して様々なユースケースに対応できます。例えば、コネクテッドビークル分野では、保存されたデータを基に車両の状態管理、リアルタイム指標に基づくルート最適化、資産追跡が可能です。同様にIIoT環境では、機械の状態監視、メンテナンス予測、生産スケジュールの最適化に活用されます。
特長とメリット
DynamoDBとのデータ統合は、効率的なデータ送信、保存、活用を実現するために以下の特長とメリットを備えています。
- リアルタイムデータストリーミング:EMQXはリアルタイムデータストリーム処理に最適化されており、ソースシステムからDynamoDBへの効率的かつ信頼性の高いデータ送信を保証します。即時の洞察とアクションを必要とするユースケースに適しています。
- 柔軟なデータ変換:EMQXは強力なSQLベースのルールエンジンを提供し、DynamoDBに保存する前にデータを前処理できます。フィルタリング、ルーティング、集約、拡充など多様なデータ変換機能をサポートし、用途に応じたデータ整形が可能です。
- 柔軟なデータモデル:DynamoDBはキー・バリュー型およびドキュメント型のデータモデルを採用しており、構造化されたデバイスイベントやメッセージデータの保存・管理に適しています。異なるMQTTメッセージ構造の保存も容易です。
- 強力なスケーラビリティ:EMQXはクラスターのスケーラビリティを提供し、デバイス接続数やメッセージ量に応じてシームレスな水平スケールが可能です。DynamoDBはサーバーやインフラ管理を不要とし、基盤リソースの管理とスケーリングを自動で行います。両者の組み合わせにより、高性能かつ高信頼のデータストレージとスケーラビリティを実現します。
はじめる前に
本節では、DynamoDBデータ統合を作成する前に必要な準備について説明します。DynamoDBサーバーのインストール方法とデータテーブルの作成方法を含みます。
前提条件
DynamoDBローカルサーバーのインストールとテーブル作成
以下のコマンドを使ってDynamoDBサーバーをローカルで起動します。
- Access Key ID:
root - Secret Access Key:
public - Region:
us-west-2
bashdocker run -d -p 8000:8000 --name dynamodb-local \ -e AWS_ACCESS_KEY_ID=root \ -e AWS_SECRET_ACCESS_KEY=public \ -e AWS_DEFAULT_REGION=us-west-2 \ amazon/dynamodb-local:2.4.0- Access Key ID:
テーブル定義ファイルを作成し、カレントディレクトリに
mqtt_msg.jsonという名前で保存します。テーブル定義は以下の通りです。device_idをハッシュキー(パーティションキー)として定義timestampをレンジキー(ソートキー)として定義device_id属性は文字列(S)型timestamp属性は数値(N)型
json{ "TableName": "mqtt_msg", "AttributeDefinitions": [ { "AttributeName": "device_id", "AttributeType": "S" }, { "AttributeName": "timestamp", "AttributeType": "N" } ], "KeySchema": [ { "AttributeName": "device_id", "KeyType": "HASH" }, { "AttributeName": "timestamp", "KeyType": "RANGE" } ], "ProvisionedThroughput": { "ReadCapacityUnits": 5, "WriteCapacityUnits": 5 } }Dockerを使って
aws-cliコマンドを実行し、ファイルを用いて新しいテーブルを作成します。bashdocker run --rm -v $PWD:/dynamo_data \ -e AWS_ACCESS_KEY_ID=root \ -e AWS_SECRET_ACCESS_KEY=public \ -e AWS_DEFAULT_REGION=us-west-2 \ amazon/aws-cli:2.15.57 dynamodb create-table \ --cli-input-json file:///dynamo_data/mqtt_msg.json \ --endpoint-url http://host.docker.internal:8000Dockerを使って
aws-cliコマンドを実行し、テーブル作成が成功したかを確認します。bashdocker run --rm \ -e AWS_ACCESS_KEY_ID=root \ -e AWS_SECRET_ACCESS_KEY=public \ -e AWS_DEFAULT_REGION=us-west-2 \ amazon/aws-cli:2.15.57 dynamodb list-tables \ --endpoint-url http://host.docker.internal:8000テーブル作成が成功していれば、以下のJSONが表示されます。
json{ "TableNames": [ "mqtt_msg" ] }
コネクターの作成
この節では、SinkをDynamoDBサーバーに接続するためのコネクター作成方法を説明します。
以下の手順は、EMQXとDynamoDBの両方をローカルマシンで実行していることを前提としています。リモートで実行している場合は設定を適宜調整してください。
- EMQXダッシュボードに入り、Integration -> Connectorsをクリックします。
- ページ右上のCreateをクリックします。
- Create ConnectorページでDynamoDBを選択し、Nextをクリックします。
- Configurationステップで以下の情報を設定します。
- Connector name:コネクター名を入力します。大文字・小文字の英数字の組み合わせで、例:
my_dynamodb - DynamoDB Region:
us-west-2を入力 - DynamoDB Endpoint:
http://127.0.0.1:8000(DynamoDBサーバーがリモートの場合は実際のURLを入力) - AWS Access Key ID:
root - AWS Secret Access Key:
public
- Connector name:コネクター名を入力します。大文字・小文字の英数字の組み合わせで、例:
- 高度な設定(任意):詳細はSinkの特長を参照してください。
- Createをクリックする前に、Test ConnectivityをクリックしてコネクターがDynamoDBサーバーに接続できるかテストできます。
- ページ下部のCreateボタンをクリックしてコネクター作成を完了します。ポップアップダイアログでBack to Connector Listをクリックするか、Create RuleをクリックしてSinkを使ったルール作成に進むことができます。詳細はメッセージ保存用DynamoDB Sinkのルール作成およびイベント記録用DynamoDB Sinkのルール作成を参照してください。
メッセージ保存用DynamoDB Sinkのルール作成
この節では、ダッシュボード上でMQTTのソーストピックt/#からのメッセージを処理し、処理済みデータを設定済みのSinkを介してDynamoDBテーブルmqtt_msgに書き込むルールの作成方法を示します。
EMQXダッシュボードにアクセスし、Integration -> Rulesをクリックします。
ページ右上のCreateをクリックします。
ルールIDに
my_ruleを入力します。メッセージ保存用ルールを作成するため、SQL Editorに以下の文を入力します。これはトピックt/#配下のMQTTメッセージをDynamoDBに保存することを意味します。注意:独自のSQL構文を指定する場合は、Sinkが必要とするすべてのフィールドを
SELECT部分に含めていることを確認してください。sqlSELECT * FROM "t/#"TIP
初心者の方は、SQL Examplesをクリックし、Enable Testを有効にしてSQLルールを学習・テストしてください。
- Add Actionボタンをクリックし、ルールによってトリガーされるアクションを定義します。このアクションにより、EMQXはルールで処理したデータをDynamoDBに送信します。
Type of Actionのドロップダウンから
DynamoDBを選択します。ActionはデフォルトのCreate Actionのままにします。既に作成済みのSinkがあれば選択可能です。本デモでは新規Sinkを作成します。Sinkの名前を入力します。名前は大文字・小文字の英数字の組み合わせとしてください。
Connectorドロップダウンから先ほど作成した
my_dynamodbを選択します。隣のボタンをクリックして新規コネクターを作成することも可能です。設定パラメータの詳細はコネクターの作成を参照してください。以下の設定を行います。
Table:先に作成したテーブル名
mqtt_msgを入力Hash Key:
${clientid}を入力し、クライアントIDをハッシュキーとして使用Range Key(任意):
${timestamp}を入力し、メッセージのタイムスタンプをレンジキーとして使用Message Template:デフォルトで空欄のままにします。
TIP
この値が空欄の場合、メッセージ全体がデータベースに保存されます。実際の値はJSONテンプレートデータです。
SQLテンプレート内でプレースホルダー変数が未定義の場合、Message template上部のUndefined Vars as Nullスイッチを切り替えてルールエンジンの動作を設定できます。
無効(デフォルト):ルールエンジンは文字列
undefinedをデータベースに挿入します。有効:変数が未定義の場合、ルールエンジンは
NULLをデータベースに挿入します。TIP
可能な限りこのオプションは常に有効にしてください。無効にするのは後方互換性を確保する場合のみです。
フォールバックアクション(任意):メッセージ配信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義できます。これらはプライマリSinkがメッセージ処理に失敗した場合にトリガーされます。詳細はフォールバックアクションを参照してください。
高度な設定(任意):必要に応じてsyncまたはasyncクエリモードを選択します。詳細はSinkの特長を参照してください。
Createをクリックする前に、Test ConnectivityをクリックしてSinkがサーバーに接続できるかテストできます。
CreateボタンをクリックしてSink設定を完了します。新しいSinkがAction Outputsに追加されます。
Create Ruleページに戻り、設定内容を確認してCreateボタンをクリックしルールを生成します。
これでDynamoDB Sinkを通じてデータを転送するルールが正常に作成されました。Integration -> Rulesページで新規作成されたルールを確認できます。**Actions(Sink)**タブをクリックすると、新しいDynamoDB Sinkが表示されます。
また、Integration -> Flow Designerをクリックしてトポロジーを確認すると、トピックt/#配下のメッセージがルールmy_ruleで解析され、DynamoDBに送信・保存されていることがわかります。
イベント記録用DynamoDB Sinkのルール作成
この節では、クライアントのオンライン/オフライン状態を記録し、イベントデータを設定済みのSinkを介してDynamoDBテーブルmqtt_msgに書き込むルールの作成方法を示します。
TIP
便宜上、オンライン/オフラインイベントの受信にmqtt_msgトピックを再利用します。
ルールおよびアクション作成手順はメッセージ保存用DynamoDB Sinkのルール作成とほぼ同様ですが、SQLルール構文が異なります。
オンライン/オフライン状態記録用のSQLルール構文は以下の通りです。
SELECT
str(event) + timestamp as id, *
FROM
"$events/client_connected", "$events/client_disconnected"ルールのテスト
MQTT Xを使ってトピックt/1にメッセージを送信し、オンライン/オフラインイベントをトリガーします。
mqttx pub -i emqx_c -t t/1 -m '{ "msg": "hello DynamoDB" }'Sinkの稼働状況を確認すると、1件の新規受信メッセージと1件の新規送信メッセージ、2件のイベント記録があるはずです。
データがmqtt_msgデータテーブルに書き込まれているか確認します。
docker run --rm -e AWS_ACCESS_KEY_ID=root -e AWS_SECRET_ACCESS_KEY=public -e AWS_DEFAULT_REGION=us-west-2 amazon/aws-cli dynamodb scan --table-name=mqtt_msg --endpoint-url http://host.docker.internal:8000出力例は以下の通りです。
{
"Items": [
{
"metadata": {
"S": "{\"rule_id\":\"90d98f59\"}"
},
"peerhost": {
"S": "127.0.0.1"
},
"clientid": {
"S": "emqx_c"
},
"flags": {
"S": "{\"retain\":false,\"dup\":false}"
},
"node": {
"S": "emqx@127.0.0.1"
},
"qos": {
"N": "0"
},
"payload": {
"S": "{ \"msg\": \"hello DynamoDB\" }"
},
"pub_props": {
"S": "{\"User-Property\":{}}"
},
"publish_received_at": {
"N": "1678263363503"
},
"topic": {
"S": "t/1"
},
"id": {
"S": "0005F65F239F03FEF44300000BB40002"
},
"event": {
"S": "message.publish"
},
"username": {
"S": "undefined"
},
"timestamp": {
"N": "1678263363503"
}
},
{
"conn_props": {
"S": "{\"User-Property\":{},\"Request-Problem-Information\":1}"
},
"peername": {
"S": "127.0.0.1:59582"
},
"metadata": {
"S": "{\"rule_id\":\"703890a5\"}"
},
"clientid": {
"S": "emqx_c"
},
"is_bridge": {
"S": "false"
},
"keepalive": {
"N": "30"
},
"proto_ver": {
"N": "5"
},
"proto_name": {
"S": "MQTT"
},
"connected_at": {
"N": "1678263363499"
},
"receive_maximum": {
"N": "32"
},
"sockname": {
"S": "127.0.0.1:1883"
},
"mountpoint": {
"S": "undefined"
},
"node": {
"S": "emqx@127.0.0.1"
},
"id": {
"S": "client.connected1678263363499"
},
"expiry_interval": {
"N": "0"
},
"event": {
"S": "client.connected"
},
"username": {
"S": "undefined"
},
"timestamp": {
"N": "1678263363499"
},
"clean_start": {
"S": "true"
}
},
{
"reason": {
"S": "normal"
},
"peername": {
"S": "127.0.0.1:59582"
},
"metadata": {
"S": "{\"rule_id\":\"703890a5\"}"
},
"clientid": {
"S": "emqx_c"
},
"proto_ver": {
"N": "5"
},
"proto_name": {
"S": "MQTT"
},
"sockname": {
"S": "127.0.0.1:1883"
},
"disconn_props": {
"S": "{\"User-Property\":{}}"
},
"node": {
"S": "emqx@127.0.0.1"
},
"id": {
"S": "client.disconnected1678263363503"
},
"event": {
"S": "client.disconnected"
},
"disconnected_at": {
"N": "1678263363503"
},
"username": {
"S": "undefined"
},
"timestamp": {
"N": "1678263363503"
}
}
],
"Count": 3,
"ScannedCount": 3,
"ConsumedCapacity": null
}