Ingest MQTT Data into OpenTSDB
OpenTSDB is a scalable, distributed time series database. EMQX supports integration with OpenTSDB. You can save MQTT messages to OpenTSDB for subsequent analysis and retrieval.
This page provides a comprehensive introduction to the data integration between EMQX and OpenTSDB with practical instructions on creating and validating the data integration.
How It Works
OpenTSDB data integration is an out-of-the-box feature in EMQX that combines EMQX's real-time data capturing and transmission capabilities with OpenTSDB's data storage and analysis functionality. With a built-in rule engine component, the integration simplifies the process of ingesting data from EMQX to OpenTSDB for storage and analysis, eliminating the need for complex coding.
The diagram below illustrates a typical architecture of data integration between EMQX and OpenTSDB:
EMQX inserts device data to OpenTSDB through the rule engine and Sink. OpenTSDB provides extensive query capabilities, supporting the generation of reports, charts, and other data analysis results. Taking industrial energy management scenarios as an example, the workflow is as follows:
- Message publication and reception: Industrial devices establish successful connections to EMQX through the MQTT protocol and regularly publish energy consumption data using the MQTT protocol. This data includes production line identifiers and energy consumption values. When EMQX receives these messages, it initiates the matching process within its rules engine.
- Rule Engine Processes Messages: The built-in rule engine processes messages from specific sources based on topic matching. When a message arrives, it passes through the rule engine, which matches it with corresponding rules and processes the message data. This can include transforming data formats, filtering specific information, or enriching messages with context information.
- Data ingestion into OpenTSDB: Rules defined in the rule engine trigger operations to write messages to OpenTSDB.
After data is written to OpenTSDB, you can flexibly use the data, for example:
- Connect to visualization tools like Grafana to generate charts based on the data, displaying energy storage data.
- Connect to business systems for monitoring and alerting on the status of energy storage devices.
Features and Benefits
The OpenTSDB data integration offers the following features and advantages:
- Efficient Data Processing: EMQX can handle a massive number of IoT device connections and message throughput, while OpenTSDB excels in data writing, storage, and querying, providing outstanding performance to meet the data processing needs of IoT scenarios without overburdening the system.
- Message Transformation: Messages can undergo extensive processing and transformation through EMQX rules before being written into OpenTSDB.
- Large-Scale Data Storage: By integrating EMQX with OpenTSDB, a vast amount of device data can be directly stored in OpenTSDB. OpenTSDB is a database designed for storing and querying large-scale time-series data, capable of efficiently handling the massive volume of time-series data generated by IoT devices.
- Rich Query Capabilities: OpenTSDB's optimized storage structure and indexing enable rapid writing and querying of billions of data points, which is extremely beneficial for applications requiring real-time monitoring, analysis, and visualization of IoT device data.
- Scalability: Both EMQX and OpenTSDB are capable of cluster scaling, allowing flexible horizontal expansion of clusters as business needs grow.
Before You Start
This section describes the preparations you need to complete before you start to create the OpenTSDB data integration, including how to set up the OpenTSDB server.
Prerequisites
- Knowledge about EMQX data integration rules
- Knowledge about data integration
Install OpenTSDB
Install OpenTSDB via Docker, and then run the docker image.
docker pull petergrace/opentsdb-docker
docker run -d --name opentsdb -p 4242:4242 petergrace/opentsdb-docker
Create Connector
This section demonstrates how to create rules to specify the data to be saved into OpenTSDB. It assumes that you run both EMQX and OpenTSDB on the local machine. If you have OpenTSDB and EMQX running remotely, adjust the settings accordingly. The data reported by the client in this tutorial is as follows:
- Topic:
t/opents
- Payload:
{
"metric": "cpu",
"tags": {
"host": "serverA"
},
"value":12
}
Go to EMQX Dashboard, and click Integration -> Rules.
Click Create at the upper right corner of the page.
Input
my_rule
as the rule ID, and set the rules in the SQL Editor using the following statement, which means the MQTT messages under topict/#
will be saved to OpenTSDB.Note: If you want to specify your own SQL syntax, make sure that you have included all fields required by the Sink in the
SELECT
part.sqlSELECT payload.metric as metric, payload.tags as tags, payload.value as value FROM "t/#"
Note: If you are a beginner user, click SQL Examples and Enable Test to learn and test the SQL rule.
Click the + Add Action button to define an action that will be triggered by the rule. With this action, EMQX sends the data processed by the rule to OpenTSDB.
Select
OpenTSDB
from the Type of Action dropdown list. Keep the Action dropdown with the defaultCreate Action
value. You can also select a Sink if you have created one. This demonstration will create a new Sink.Enter a name for the Connector. The name should be a combination of upper/lower case letters and numbers.
Enter the connection information:
- URL: Input
http://127.0.0.1:4242
, or the actual URL if the OpenTSDB server runs remotely. - Leave other options as default.
- URL: Input
Advanced settings (optional): Choose whether to use sync or async query mode as needed. For details, see the relevant configuration information in Features of Sink.
Before clicking Create, you can click Test Connectivity to test that the Sink can be connected to the OpenTSDB server.
Click the Create button to complete the Sink configuration. A new Sink will be added to the Action Outputs.
Back on the Create Rule page, verify the configured information. Click the Create button to generate the rule.
You have now successfully created the rule for forwarding data through the OpenTSDB Sink. You can see the newly created rule on the Integration -> Rules page. Click the Actions(Sink) tab and you can see the new OpenTSDB Sink.
You can also click Integration -> Flow Designer to view the topology and you can see that the messages under topic t/#
are sent and saved to OpenTSDB after parsing by rule my_rule
.
Test the Rule
Use MQTTX to publish a message on topic t/opents
.
mqttx pub -i emqx_c -t t/opents -m '{"metric":"cpu","tags":{"host":"serverA"},"value":12}'
Check the running status of the Sink, there should be one new incoming and one new outgoing message.
Check whether the data is written into the OpenTSDB:
curl -X POST -H "Accept: Application/json" -H "Content-Type: application/json" http://localhost:4242/api/query -d '{
"start": "1h-ago",
"queries": [
{
"aggregator": "last",
"metric": "cpu",
"tags": {
"host": "*"
}
}
],
"showTSUIDs": "true",
"showQuery": "true",
"delete": "false"
}'
The formatted output of the query result is as follows:
[
{
"metric": "cpu",
"tags": {
"host": "serverA"
},
"aggregateTags": [],
"query": {
"aggregator": "last",
"metric": "cpu",
"tsuids": null,
"downsample": null,
"rate": false,
"filters": [
{
"tagk": "host",
"filter": "*",
"group_by": true,
"type": "wildcard"
}
],
"percentiles": null,
"index": 0,
"rateOptions": null,
"filterTagKs": [
"AAAB"
],
"explicitTags": false,
"useFuzzyFilter": true,
"preAggregate": false,
"rollupUsage": null,
"rollupTable": "raw",
"showHistogramBuckets": false,
"useMultiGets": true,
"tags": {
"host": "wildcard(*)"
},
"histogramQuery": false
},
"tsuids": [
"000001000001000001"
],
"dps": {
"1683532519": 12
}
}
]%