Ingest MQTT Data into OpenTSDB
TIP
The OpenTSDB data integration is an EMQX Enterprise edition feature.
OpenTSDB is a scalable, distributed time series database. EMQX supports integration with OpenTSDB. You can save MQTT messages to OpenTSDB for subsequent analysis and retrieval.
This page provides a comprehensive introduction to the data integration between EMQX and OpenTSDB with practical instructions on creating and validating the data integration.
How It Works
OpenTSDB data integration is an out-of-the-box feature in EMQX that combines EMQX's real-time data capturing and transmission capabilities with OpenTSDB's data storage and analysis functionality. With a built-in rule engine component, the integration simplifies the process of ingesting data from EMQX to OpenTSDB for storage and analysis, eliminating the need for complex coding.
The diagram below illustrates a typical architecture of data integration between EMQX and OpenTSDB:
EMQX inserts device data to OpenTSDB through the rule engine and Sink. OpenTSDB provides extensive query capabilities, supporting the generation of reports, charts, and other data analysis results. Taking industrial energy management scenarios as an example, the workflow is as follows:
- Message publication and reception: Industrial devices establish successful connections to EMQX through the MQTT protocol and regularly publish energy consumption data using the MQTT protocol. This data includes production line identifiers and energy consumption values. When EMQX receives these messages, it initiates the matching process within its rules engine.
- Rule Engine Processes Messages: The built-in rule engine processes messages from specific sources based on topic matching. When a message arrives, it passes through the rule engine, which matches it with corresponding rules and processes the message data. This can include transforming data formats, filtering specific information, or enriching messages with context information.
- Data ingestion into OpenTSDB: Rules defined in the rule engine trigger operations to write messages to OpenTSDB.
After data is written to OpenTSDB, you can flexibly use the data, for example:
- Connect to visualization tools like Grafana to generate charts based on the data, displaying energy storage data.
- Connect to business systems for monitoring and alerting on the status of energy storage devices.
Features and Benefits
The OpenTSDB data integration offers the following features and advantages:
- Efficient Data Processing: EMQX can handle a massive number of IoT device connections and message throughput, while OpenTSDB excels in data writing, storage, and querying, providing outstanding performance to meet the data processing needs of IoT scenarios without overburdening the system.
- Message Transformation: Messages can undergo extensive processing and transformation through EMQX rules before being written into OpenTSDB.
- Large-Scale Data Storage: By integrating EMQX with OpenTSDB, a vast amount of device data can be directly stored in OpenTSDB. OpenTSDB is a database designed for storing and querying large-scale time-series data, capable of efficiently handling the massive volume of time-series data generated by IoT devices.
- Rich Query Capabilities: OpenTSDB's optimized storage structure and indexing enable rapid writing and querying of billions of data points, which is extremely beneficial for applications requiring real-time monitoring, analysis, and visualization of IoT device data.
- Scalability: Both EMQX and OpenTSDB are capable of cluster scaling, allowing flexible horizontal expansion of clusters as business needs grow.
Before You Start
This section describes the preparations you need to complete before you start to create the OpenTSDB data integration, including how to set up the OpenTSDB server.
Prerequisites
- Knowledge about EMQX data integration rules
- Knowledge about data integration
Install OpenTSDB
Install OpenTSDB via Docker, and then run the docker image (Only x86 platform is supported now).
docker pull petergrace/opentsdb-docker
docker run -d --name opentsdb -p 4242:4242 petergrace/opentsdb-docker
Create a Connector
This section demonstrates how to create a Connector to connect the Sink to the OpenTSDB server.
The following steps assume that you run both EMQX and OpenTSDB on the local machine. If you have OpenTSDB and EMQX running remotely, adjust the settings accordingly.
- Enter the EMQX Dashboard and click Integration -> Connectors.
- Click Create in the top right corner of the page.
- On the Create Connector page, select OpenTSDB and then click Next.
- In the Configuration step, configure the following information:
- Enter the connector name, which should be a combination of upper and lower case letters and numbers, for example:
my_opentsdb
. - Enter
http://127.0.0.1:4242
as the Server Host, or the actual URL if the OpenTSDB server runs remotely. - Leave other options as default.
- Enter the connector name, which should be a combination of upper and lower case letters and numbers, for example:
- Advanced settings (optional): For details, see Features of Sink.
- Before clicking Create, you can click Test Connectivity to test if the connector can connect to the OpenTSDB server.
- Click the Create button at the bottom to complete the creation of the connector. In the pop-up dialog, you can click Back to Connector List or click Create Rule to continue creating rules and Sink to specify the data to be forwarded to OpenTSDB. For detailed steps, see Create a Rule with OpenTSDB Sink.
Create a Rule with OpenTSDB Sink
This section demonstrates how to create a rule in the Dashboard for processing messages from the source MQTT topic t/#
, and saving the processed data to OpenTSDB via a configured Sink.
Go to EMQX Dashboard, and click Integration -> Rules.
Click Create at the upper right corner of the page.
Input
my_rule
as the rule ID, and set the rules in the SQL Editor using the following statement, which means the MQTT messages under topict/#
will be saved to OpenTSDB.Note: If you want to specify your own SQL syntax, make sure that you have included all fields required by the Sink in the
SELECT
part.sqlSELECT payload.metric as metric, payload.tags as tags, payload.value as value FROM "t/#"
Note: If you are a beginner user, click SQL Examples and Enable Test to learn and test the SQL rule.
Click the + Add Action button to define an action to be triggered by the rule. With this action, EMQX sends the data processed by the rule to OpenTSDB.
Select
OpenTSDB
from the Type of Action dropdown list. Keep the Action dropdown with the defaultCreate Action
value. You can also select a Sink if you have created one. This demonstration will create a new Sink.Enter a name for the Sink. The name should combine upper/lower case letters and numbers.
Select the
my_opentsdb
just created from the Connector dropdown box. You can also create a new Connector by clicking the button next to the dropdown box. For the configuration parameters, see Create a Connector.Specify how data is written into OpenTSDB in the Write Data field to correctly convert MQTT messages into the format required by OpenTSDB. For example, the client reports the following data:
- Topic:
t/opents
- Payload:
json{ "metric": "cpu", "tags": { "host": "serverA" }, "value": 12 }
Based on the provided Payload data format, configure the following format information:
- Timestamp: OpenTSDB requires a timestamp to record the time of the data point. If the MQTT message does not provide a timestamp, you can use the current time as the timestamp when configuring the Sink in EMQX, or you may need to modify the client's reported data format to include a timestamp field.
- Metric: In this example,
"metric": "cpu"
indicates that the metric name iscpu
. - Tags: Tags are used to describe additional information about the metric. Here, the tag is
"tags": {"host": "serverA"}
, indicating that this metric data comes from hostserverA
. - Value: This is the actual metric value. In this example, it is
"value": 12
, indicating the metric value is 12.
- Topic:
Advanced settings (optional): Choose whether to use sync or async query mode as needed. For details, see the relevant configuration information in Features of Sink.
Before clicking Create, you can click Test Connectivity to test that the Sink can be connected to the OpenTSDB server.
Click the Create button to complete the Sink configuration. A new Sink will be added to the Action Outputs.
Back on the Create Rule page, verify the configured information. Click the Create button to generate the rule.
You have now successfully created the rule for forwarding data through the OpenTSDB Sink. You can see the newly created rule on the Integration -> Rules page. Click the Actions(Sink) tab and you can see the new OpenTSDB Sink.
You can also click Integration -> Flow Designer to view the topology and you can see that the messages under topic t/#
are sent and saved to OpenTSDB after parsing by rule my_rule
.
Test the Rule
Use MQTTX to publish a message on topic t/opents
.
mqttx pub -i emqx_c -t t/opents -m '{"metric":"cpu","tags":{"host":"serverA"},"value":12}'
Check the running status of the Sink, there should be one new incoming and one new outgoing message.
Check whether the data is written into the OpenTSDB:
curl -X POST -H "Accept: Application/json" -H "Content-Type: application/json" http://localhost:4242/api/query -d '{
"start": "1h-ago",
"queries": [
{
"aggregator": "last",
"metric": "cpu",
"tags": {
"host": "*"
}
}
],
"showTSUIDs": "true",
"showQuery": "true",
"delete": "false"
}'
The formatted output of the query result is as follows:
[
{
"metric": "cpu",
"tags": {
"host": "serverA"
},
"aggregateTags": [],
"query": {
"aggregator": "last",
"metric": "cpu",
"tsuids": null,
"downsample": null,
"rate": false,
"filters": [
{
"tagk": "host",
"filter": "*",
"group_by": true,
"type": "wildcard"
}
],
"percentiles": null,
"index": 0,
"rateOptions": null,
"filterTagKs": [
"AAAB"
],
"explicitTags": false,
"useFuzzyFilter": true,
"preAggregate": false,
"rollupUsage": null,
"rollupTable": "raw",
"showHistogramBuckets": false,
"useMultiGets": true,
"tags": {
"host": "wildcard(*)"
},
"histogramQuery": false
},
"tsuids": [
"000001000001000001"
],
"dps": {
"1683532519": 12
}
}
]%