Stream MQTT Data into DynamoDB
DynamoDB is a fully managed, high-performance, serverless key-value store database service on AWS. It is designed for applications that require fast, scalable, and reliable data storage. EMQX supports integration with DynamoDB, enabling you to save MQTT messages and client events to DynamoDB, facilitating the registration and management of IoT devices, as well as the long-term storage and real-time analysis of device data. Through the DynamoDB data integration, MQTT messages and client events can be stored in DynamoDB, and events can also trigger updates or deletions of data within DynamoDB, thereby enabling the recording of information such as device online status and connection history.
This page provides a detailed introduction to the functional features of DynamoDB Data Integration and offers practical guidance for creating it. The content includes creating DynamoDB connectors, creating rules, and testing rules. It demonstrates how to report simulated temperature and humidity data to EMQX Platform via the MQTT protocol and store the data in DynamoDB through configured data integration.
How It Works
DynamoDB data integration is an out-of-the-box feature in EMQX Platform that combines EMQX's device connectivity and message transmission capabilities with DynamoDB's powerful data storage capabilities. With a built-in data integration component, the integration simplifies the process of ingesting data from EMQX to DynamoDB for storage and management, eliminating the need for complex coding.
The diagram below illustrates a typical architecture of data integration between EMQX and DynamoDB:
Ingesting MQTT data into DynamoDB works as follows:
- Message publication and reception: IoT devices, whether they are part of connected vehicles, IIoT systems, or energy management platforms, establish successful connections to EMQX through the MQTT protocol and publish MQTT messages to specific topics. When EMQX receives these messages, it initiates the matching process within its rules engine.
- Message data processing: When a message arrives, it passes through the rule engine and is then processed by the rule defined in EMQX. The rules, based on predefined criteria, determine which messages need to be routed to DynamoDB. If any rules specify payload transformations, those transformations are applied, such as converting data formats, filtering out specific information, or enriching the payload with additional context.
- Data ingestion into DynamoDB: Once the rule engine identifies a message for DynamoDB storage, it triggers an action of forwarding the messages to DynamoDB. Processed data will be seamlessly written into the collection of the DynamoDB database.
- Data storage and utilization: With the data now stored in DynamoDB, businesses can harness its querying power for various use cases. For instance, in the realm of connected vehicles, this stored data can inform fleet management systems about vehicle health, optimize route planning based on real-time metrics, or track assets. Similarly, in IIoT settings, the data might be used to monitor machinery health, forecast maintenance, or optimize production schedules.
Features and Advantages
The data integration with DynamoDB offers a range of features and benefits tailored to ensure efficient data transmission, storage, and utilization:
- Real-time Data Streaming: EMQX Platform is built for handling real-time data streams, ensuring efficient and reliable data transmission from source systems to DynamoDB. It enables organizations to capture and analyze data in real-time, making it ideal for use cases requiring immediate insights and actions.
- Flexibility in Data Transformation: EMQX Platform provides a powerful SQL-based Rule Engine, allowing organizations to pre-process data before storing it in DynamoDB. It supports various data transformation mechanisms, such as filtering, routing, aggregation, and enrichment, enabling organizations to shape the data according to their needs.
- Flexible Data Model: DynamoDB uses key-value and document data models, suitable for storing and managing structured device events and message data, allowing for easy storage of different MQTT message structures.
- Powerful Scalability: EMQX Platform offers cluster scalability, capable of seamless horizontal scaling based on device connections and message volume; DynamoDB, requiring no server or infrastructure management, automatically handles underlying resource management and scaling. The combination of both provides high-performance and highly reliable data storage and scalability.
Before You Start
This section introduces the preparatory work needed to create DynamoDB Data Integration in EMQX Platform.
Prerequisites
- Understand rules.
- Understand data integration.
Set up Network
Before you start, you need to create a deployment (EMQX cluster) on the EMQX Platform and configure the network.
- For Dedicated/Premium deployment users: First, create a VPC Peering Connection. After establishing the peering connection, you can log in to the Platform Console via the internal network IP to access the target connector. Alternatively, set up a NAT Gateway to access the target connector through a public IP.
- For BYOC (Bring Your Own Cloud) deployment users: Establish a peering connection between the VPC where BYOC is deployed and the VPC where the target connector is located. After creating the peering connection, you can access the target connector via the internal network IP. If you need to access resources via a public IP address, configure a NAT gateway for the VPC where BYOC is deployed in the public cloud console.
Set up AWS DynamoDB Instance
For the creation of AWS DynamoDB, you have the option to install it either in the cloud or using Docker.
Create DynamoDB Instance and Table in Console
If you are creating a DynamoDB instance for the first time, you can refer to the AWS help document.
- Go to the DynamoDB console and click Create Table
temp_hum
. - Fill in key information such as table name, partition key, and other defaults, which can be set according to your actual needs.
- Until the status of the table is active, it means that you have successfully created the table
temp_hum
.
Install DynamoDB Local Server and Create Table
Prepare a docker-compose file, dynamo.yaml, to set up the Dynamodb local server.
bashversion: '3.8' services: dynamo: command: "-jar DynamoDBLocal.jar -sharedDb" image: "amazon/dynamodb-local:latest" container_name: dynamo ports: - "8000:8000" environment: AWS_ACCESS_KEY_ID: root AWS_SECRET_ACCESS_KEY: public AWS_DEFAULT_REGION: us-west-2
Start the server.
bashdocker-compose -f dynamo.yaml up
Prepare a table definition and save it to your home directory as temp_hum.json.
bash{ "TableName": "temp_hum", "KeySchema": [ { "AttributeName": "id", "KeyType": "HASH" } ], "AttributeDefinitions": [ { "AttributeName": "id", "AttributeType": "S" } ], "ProvisionedThroughput": { "ReadCapacityUnits": 5, "WriteCapacityUnits": 5 } }
Create a new table via this file.
bashdocker run --rm -v ${HOME}:/dynamo_data -e AWS_ACCESS_KEY_ID=root -e AWS_SECRET_ACCESS_KEY=public -e AWS_DEFAULT_REGION=us-west-2 amazon/aws-cli dynamodb create-table --cli-input-json file:///dynamo_data/temp_hum.json --endpoint-url http://host.docker.internal:8000
Check if the table was created successfully.
bashdocker run --rm -e AWS_ACCESS_KEY_ID=root -e AWS_SECRET_ACCESS_KEY=public -e AWS_DEFAULT_REGION=us-west-2 amazon/aws-cli dynamodb list-tables --endpoint-url http://host.docker.internal:8000
The following JSON will be printed if the table was created successfully.
bash{ "TableNames": [ "temp_hum" ] }
Create a DynamoDB Connector
Before creating data integration rules, you need to first create a DynamoDB connector to access the DynamoDB server.
- Go to your deployment. Click Data Integration from the left-navigation menu.
- If it is the first time for you to create a connector, select DynamoDB under the Data Persistence category. If you have already created connectors, select New Connector and then select DynamoDB under the Data Forward category.
- On the New Connector page, configure the following options:
- DynamoDB Region: Enter the region where the DynamoDB instance is located, for example,
us-west-2
in this case. - DynamoDB Server: Enter the Endpoint for the DynamoDB service. Make sure to include the prefix "https://". If you are using LocalStack, use
http://localhost:8000
. - AWS Access Key ID: Enter the Access key ID, for example,
root
in this case. - AWS Secret Access Key: Enter the secret access key, for example,
public
in this case. - Use default values for other settings, or configure them according to your business needs.
- DynamoDB Region: Enter the region where the DynamoDB instance is located, for example,
- Click the Test button. If the DynamoDB service is accessible, a success prompt will be returned.
- Click the New button to complete the creation.
Create a Rule
Next, you need to create a rule to specify the data to be written and add corresponding actions in the rule to forward the processed data to DynamoDB.
Click New Rule in the Rules area or click the New Rule icon in the Actions column of the connector you just created.
Enter the rule matching SQL statement in the SQL editor. The following SQL example reads the message reporting time
up_timestamp
, client ID, and message body (Payload) from messages sent to thetemp_hum/emqx
topic, extracting temperature and humidity.sqlSELECT id as msgid, topic, payload FROM "temp_hum/emqx"
To create a rule for online/offline status recording, enter the following statement:
sqlSELECT str(event) + timestamp as id, * FROM "$events/client_connected", "$events/client_disconnected"
TIP
If you are a beginner user, click SQL Examples and Enable Test to learn and test the SQL rule.
Click Next to add an action.
Select the connector you just created from the Connector dropdown box.
Configure the following information:
Action Name: The system will automatically generate an action name.
Table Name: Input
temp_hum
.Message Template: When this value is empty the whole message will be stored in the database. The template can be any valid JSON with placeholders and make sure all keys for table are here, example:
{"id" : "${id}", "clientid" : "${clientid}", "data" : "${payload.data}"}
.If a placeholder variable is undefined in the SQL template, you can toggle the Undefined Vars as Null switch above the Message template to define the rule engine behavior:
Disabled (default): The rule engine can insert the string
undefined
into the database.Enabled: Allow the rule engine to insert
NULL
into the database when a variable is undefined.TIP
If possible, this option should always be enabled; disabling the option is only used to ensure backward compatibility.
Use default values for other settings, or configure them according to your business needs.
Click the Confirm button to complete the rule creation.
In the Successful new rule pop-up, click Back to Rules, thus completing the entire data integration configuration chain.
Test the Rule
You are recommended to use MQTTX to simulate temperature and humidity data reporting, but you can also use any other client.
Use MQTTX to connect to the deployment and send messages to the following Topic.
topic:
temp_hum/emqx
payload:
json{ "temp": "27.5", "hum": "41.8" }
Check if the message has been forwarded to DynamoDB.
View results in NoSQL Workbench (Optional).
NoSQL Workbench for Amazon DynamoDB is a cross-platform client-side GUI application for modern database development and operations. You can reach out to it to connect to DynamoDB. Go to the Operation Builder page. Select the table 'temp_hum'. Here you can see the results of the temperature and humidity data forwarding.
Check whether the data is written into the data table (Optional).
bashdocker run --rm -e AWS_ACCESS_KEY_ID=root -e AWS_SECRET_ACCESS_KEY=public -e AWS_DEFAULT_REGION=us-west-2 amazon/aws-cli dynamodb scan --table-name=temp_hum --endpoint-url http://host.docker.internal:8000
View operational data in the console. Click the rule ID in the rule list, and you can see the statistics of the rule and the statistics of all actions under this rule.