# DynamoDB Backend
TIP
After EMQX version 3.1, a powerful rule engine is introduced to replace plug-ins. It is recommended that you use it. See Save data to DynamoDB to setup Save data to DynamoDB in rule engine.
# Configure DynamoDB Cluster
Config file: etc/plugins/emqx_backend_dynamo.conf
## DynamoDB Region backend.dynamo.region = us-west-2 ## DynamoDB Server backend.dynamo.pool1.server = http://localhost:8000 ## DynamoDB Pool Size backend.dynamo.pool1.pool_size = 8 ## AWS ACCESS KEY ID backend.dynamo.pool1.aws_access_key_id = AKIAU5IM2XOC7AQWG7HK ## AWS SECRET ACCESS KEY backend.dynamo.pool1.aws_secret_access_key = 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 ## DynamoDB Backend Hooks backend.dynamo.hook.client.connected.1 = {"action": {"function": "on_client_connected"}, "pool": "pool1"} backend.dynamo.hook.session.created.1 = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"} backend.dynamo.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"} backend.dynamo.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch_for_queue"}, "pool": "pool1"} backend.dynamo.hook.session.subscribed.2 = {"topic": "#", "action": {"function": "on_retain_lookup"}, "pool": "pool1"} backend.dynamo.hook.session.unsubscribed.1= {"topic": "#", "action": {"function": "on_acked_delete"}, "pool": "pool1"} backend.dynamo.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"} backend.dynamo.hook.message.publish.2 = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"} backend.dynamo.hook.message.publish.3 = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"} backend.dynamo.hook.message.acked.1 = {"topic": "#", "action": {"function": "on_message_acked_for_queue"}, "pool": "pool1"} # backend.dynamo.hook.message.publish.4 = {"topic": "#", "action": {"function": "on_message_store"}, "pool": "pool1"}
Copied!
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# Description of DynamoDB Persistence Hooks
hook | topic | action | Description |
---|---|---|---|
client.connected | on_client_connected | Store client connected state | |
client.connected | on_subscribe_lookup | Subscribed topics | |
client.disconnected | on_client_disconnected | Store client disconnected state | |
session.subscribed | # | on_message_fetch_for_queue | Fetch offline messages |
session.subscribed | # | on_retain_lookup | Lookup retained messages |
message.publish | # | on_message_publish | Store published messages |
message.publish | # | on_message_retain | Store retained messages |
message.publish | # | on_retain_delete | Delete retained messages |
message.acked | # | on_message_acked_for_queue | Process ACK |
# Create DynamoDB DB
./test/dynamo_test.sh
Copied!
TIP
DB name is free of choice
# DynamoDB Client Connection Table
mqtt_client stores client connection states:
{ "TableName": "mqtt_client", "KeySchema": [ { "AttributeName": "clientid", "KeyType": "HASH" } ], "AttributeDefinitions": [ { "AttributeName": "clientid", "AttributeType": "S" } ], "ProvisionedThroughput": { "ReadCapacityUnits": 5, "WriteCapacityUnits": 5 } }
Copied!
2
3
4
5
6
7
8
9
10
11
12
13
Query the client connection state:
aws dynamodb scan --table-name mqtt_client --region us-west-2 --endpoint-url http://localhost:8000 { "Items": [ { "offline_at": { "N": "0" }, "node": { "S": "emqx@127.0.0.1" }, "clientid": { "S": "mqttjs_384b9c73a9" }, "connect_state": { "N": "1" }, "online_at": { "N": "1562224940" } } ], "Count": 1, "ScannedCount": 1, "ConsumedCapacity": null }
Copied!
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# DynamoDB Subscription Table
mqtt_sub table stores MQTT subscriptions of clients:
{ "TableName": "mqtt_sub", "KeySchema": [ { "AttributeName": "clientid", "KeyType": "HASH" }, { "AttributeName": "topic", "KeyType": "RANGE" } ], "AttributeDefinitions": [ { "AttributeName": "clientid", "AttributeType": "S" }, { "AttributeName": "topic", "AttributeType": "S" } ], "ProvisionedThroughput": { "ReadCapacityUnits": 5, "WriteCapacityUnits": 5 } }
Copied!
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Query topics subscribed by the client named "test-dynamo":
aws dynamodb scan --table-name mqtt_sub --region us-west-2 --endpoint-url http://localhost:8000 { "Items": [{"qos": { "N": "2" }, "topic": { "S": "test-dynamo-sub" }, "clientid": { "S": "test-dynamo" }}, {"qos": { "N": "2" }, "topic": { "S": "test-dynamo-sub-1"}, "clientid": { "S": "test-dynamo" }}, {"qos": { "N": "2" }, "topic": { "S": "test-dynamo-sub-2"}, "clientid": { "S": "test-dynamo" }}], "Count": 3, "ScannedCount": 3, "ConsumedCapacity": null }
Copied!
2
3
4
5
6
7
8
9
10
# DynamoDB Message Table
mqtt_msg stores MQTT messages:
{ "TableName": "mqtt_msg", "KeySchema": [ { "AttributeName": "msgid", "KeyType": "HASH" } ], "AttributeDefinitions": [ { "AttributeName": "msgid", "AttributeType": "S" } ], "ProvisionedThroughput": { "ReadCapacityUnits": 5, "WriteCapacityUnits": 5 } }
Copied!
2
3
4
5
6
7
8
9
10
11
12
13
mqtt_topic_msg_map stores the mapping between topics and messages:
{ "TableName": "mqtt_topic_msg_map", "KeySchema": [ { "AttributeName": "topic", "KeyType": "HASH" } ], "AttributeDefinitions": [ { "AttributeName": "topic", "AttributeType": "S" } ], "ProvisionedThroughput": { "ReadCapacityUnits": 5, "WriteCapacityUnits": 5 } }
Copied!
2
3
4
5
6
7
8
9
10
11
12
13
Query mqtt_msg and mqtt_topic_msg_map after a client publishes a message to the "test" topic:
Query mqtt_msg:
aws dynamodb scan --table-name mqtt_msg --region us-west-2 --endpoint-url http://localhost:8000
Copied!
> - { > - "Items": [ > - { > "arrived": { "N": "1562308553" }, "qos": { "N": "1" }, > "sender": { "S": "mqttjs\_231b962d5c" }, "payload": { "S": > "{ "msg": "Hello, World\!" }"}, "retain": { "N": "0" }, > "msgid": { "S": > "Mjg4MTk1MDYwNTk0NjYwNzYzMTg4MDk3OTQ2MDU2Nzg1OTD" }, > "topic": { "S": "test" } > } > ], "Count": 1, "ScannedCount": 1, "ConsumedCapacity": null > }
Copied!
2
3
4
5
6
7
8
9
10
11
12
Query mqtt_topic_msg_map:
aws dynamodb scan --table-name mqtt_topic_msg_map --region us-west-2 --endpoint-url http://localhost:8000
Copied!
> - { > - "Items": \[ > - { > "topic": { "S": "test" }, "MsgId": { "SS": \[ > "Mjg4MTk1MDYwNTk0NjYwNzYzMTg4MDk3OTQ2MDU2Nzg1OTD" \]} > } > \], "Count": 1, "ScannedCount": 1, "ConsumedCapacity": null > }
Copied!
2
3
4
5
6
7
8
# DynamoDB Retained Message Table
mqtt_retain stores retained messages:
{ "TableName": "mqtt_retain", "KeySchema": [ { "AttributeName": "topic", "KeyType": "HASH" } ], "AttributeDefinitions": [ { "AttributeName": "topic", "AttributeType": "S" } ], "ProvisionedThroughput": { "ReadCapacityUnits": 5, "WriteCapacityUnits": 5 } }
Copied!
2
3
4
5
6
7
8
9
10
11
12
13
Query mqtt_retain after a client publishes a message to the "test" topic:
{ "Items": [ { "arrived": { "N": "1562312113" }, "qos": { "N": "1" }, "sender": { "S": "mqttjs_d0513acfce" }, "payload": { "S": "test" }, "retain": { "N": "1" }, "msgid": { "S": "Mjg4MTk1NzE3MTY4MjYxMjA5MDExMDg0NTk5ODgzMjAyNTH" }, "topic": { "S": "testtopic" } } ], "Count": 1, "ScannedCount": 1, "ConsumedCapacity": null }
Copied!
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# DynamoDB Acknowledgement Table
mqtt_acked stores acknowledgements from the clients:
{ "TableName": "mqtt_acked", "KeySchema": [ { "AttributeName": "topic", "KeyType": "HASH" }, { "AttributeName": "clientid", "KeyType": "RANGE" } ], "AttributeDefinitions": [ { "AttributeName": "topic", "AttributeType": "S" }, { "AttributeName": "clientid", "AttributeType": "S" } ], "ProvisionedThroughput": { "ReadCapacityUnits": 5, "WriteCapacityUnits": 5 } }
Copied!
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Query mqtt_acked after a client publishes a message to the "test" topic:
{ "Items": [ { "topic": { "S": "test" }, "msgid": { "S": "Mjg4MTk1MDYwNTk0NjYwNzYzMTg4MDk3OTQ2MDU2Nzg1OTD" }, "clientid": { "S": "mqttjs_861e582a70" } } ], "Count": 1, "ScannedCount": 1, "ConsumedCapacity": null }
Copied!
2
3
4
5
6
7
8
9
10
11
12
# Enable DynamoDB Backend
./bin/emqx_ctl plugins load emqx_backend_dynamo
Copied!