Stream MQTT Data into HStreamDB
TIP
The HStreamDB data integration is an EMQX Enterprise edition feature.
HStreamDB is an open-source streaming data platform that enables you to efficiently ingest, store, process, and distribute all real-time messages, events, and other data streams in one unified platform. Through EMQX's integration with HStreamDB, you can save MQTT messages and client events to HStreamDB, achieving large-scale IoT data collection, transmission, and storage, and enabling real-time processing, monitoring, and analysis of data streams using standard SQL and materialized views.
This page provides a comprehensive introduction to the data integration between EMQX and HStreamDB with practical instructions on creating and validating the data integration.
TIP
HSreamDB data integration is only supported in EMQX 5.2.0 and above.
How It Works
HStreamDB data integration is an out-of-the-box feature of EMQX that combines EMQX's device connectivity and message transmission capabilities with HStreamDB's robust data storage and processing capabilities. With the built-in rule engine component, the data streaming and processing process is simplified between the two platforms.
The diagram below illustrates a typical architecture of data integration between EMQX and HStreamDB:
EMQX forwards MQTT data to HStreamDB through the rule engine and configured Sink, and the complete process is as follows:
- Message publication and reception: IoT devices establish successful connections through the MQTT protocol and subsequently publish telemetry and status data to specific topics. When EMQX receives these messages, it initiates the matching process within its rules engine.
- Rule engine processes messages: Using the built-in rule engine, MQTT messages from specific sources can be processed based on topic matching. The rule engine matches corresponding rules and processes messages, such as data format conversion, filtering specific information, or enriching messages with context information.
- Data streaming into HStreamDB: Rule triggers the action of forwarding messages to HStreamDB where data can be easily configured to HStreamDB stream name, partition key and record, facilitating subsequent data processing and analysis.
After MQTT message data is written to Apache HStreamDB, you can engage in flexible application development, such as:
- Upon receiving specific MQTT messages, you can use HStreamDB's rule engine component to trigger corresponding actions or events, enabling cross-system and application event-driven functionality.
- Analyze MQTT data streams in real-time within HStreamDB, detect anomalies or specific event patterns, and trigger alert notifications or perform corresponding actions based on these conditions.
- Centralize data from multiple MQTT topics into a unified data stream and utilize HStreamDB's computational capabilities for real-time aggregation, calculation, and analysis to gain more comprehensive data insights.
Features and Benefits
The data integration with HStreamDB brings the following features and advantages to your business:
- Reliable IoT Data Message Delivery: EMQX can reliably batch and send MQTT messages to HStreamDB, enabling the integration of IoT devices with HStreamDB and application systems.
- MQTT Message Transformation: Using the rule engine, EMQX can filter and transform MQTT messages. Messages can undergo data extraction, filtering, enrichment, and transformation before being sent to HStreamDB.
- Large-Scale Data Stream Storage: HStreamDB supports the reliable storage of millions of data streams in a specially designed distributed, fault-tolerant log storage cluster. It can replay or push real-time data stream updates to applications as needed. Perfectly integrating with EMQX's message model, it achieves large-scale IoT data collection, transmission, and storage.
- Cluster and Scalability: Built with a cloud-native architecture, EMQX and HStreamDB support online scaling and dynamic expansion and contraction of clusters, allowing flexible horizontal scaling to meet growing business demands.
- Flexible Processing Capabilities: In HStreamDB, you can use familiar SQL to filter, transform, aggregate, and join multiple data streams. It also supports real-time processing, monitoring, and analysis of data streams using standard SQL and materialized views, providing real-time data insights.
- Processing Capabilities in High-Throughput Scenarios: HStreamDB data integration supports both synchronous and asynchronous write modes, allowing for a flexible balance between latency and throughput according to different scenarios.
Before You Start
This section describes the preparations you need to complete before you start to create a HStreamDB data integration, including how to start HStreamDB services and create streams.
The sub-sections below describe how to install and connect to HStreamDB on Linux/MacOS using Docker images. Make sure you have installed Docker and use Docker Compose v2 if possible. For other installation methods of HStreamDB and HStreamDB Platform, please refer to Quickstart with Docker-Compose and Getting Started with HStream Platform.
Prerequisites
- Knowledge about EMQX data integration rules
- Knowledge about data integration
Start HStreamDB Service and Create Streams
Create a Connector
This section demonstrates how to create a Connector to connect the Sink to the HStreamDB server.
The following steps assume that you run both EMQX and HStreamDB on the local machine. If you have HStreamDB and EMQX running remotely, adjust the settings accordingly.
- Enter the EMQX Dashboard and click Integration -> Connectors.
- Click Create in the top right corner of the page.
- On the Create Connector page, select HStreamDB and then click Next.
- In the Configuration step, configure the following information (Fields marked with an asterisk are required fields.):
- Connector name: Enter a name for the connector, which should be a combination of upper and lower-case letters and numbers, for example:
my_hstreamdb
. - HStreamDB Server URL:
hstream://127.0.0.1:6570
, or use the actual HStreamDB address and port.- Scheme supports
http
,https
,hstream
, andhstreams
. - For TLS connection, scheme needs to be
hstreams
orhttps
, for examplehstreams://127.0.0.1:6570
.
- Scheme supports
- HStreamDB Stream Name: Enter the name of the Streams you created before.
- For client message storage, enter
mqtt_message
. - For event recording, enter
mqtt_connect
.
- For client message storage, enter
- HStreamDB Partition Key: Sepecify the partition key that is used to determine where data will be stored within the HStreamDB's various partitions or nodes. For example, you can enter
${topic]}
to ensure that messages of the same topic are written into HStreamDB in order. If not specified, a default key is used and data will be mapped to some default shard. - HStreamDB gRPC Timeout: Specify the maximum amount of time the system will wait for a response from the HStreamDB server when a gRPC request is made. The default value is
30
seconds. - Enable TLS: You can click the toggle switch to enable the TLS connection if required. When TLS is enabled, disable TLS Verify. Upload the certificates and key generated under the
tls-deploy/ca
directory:- Upload
ca/hstream.crt
to TLS Cert. - Upload
ca/hstream.key
to TLS Key. - Upload
ca/certs/root_ca.crt
to CA Cert.
- Upload
- Connector name: Enter a name for the connector, which should be a combination of upper and lower-case letters and numbers, for example:
- Advanced settings (optional): For details, see Features of Sink.
- Before clicking Create, you can click Test Connectivity to test if the connector can connect to the HStreamDB server.
- Click the Create button at the bottom to complete the creation of the connector. In the pop-up dialog, you can click Back to Connector List or click Create Rule to continue creating rules with Sinks to specify the data to be forwarded to the HStreamDB and to record client events. For detailed steps, see Create a Rule with HStreamDB Sink for Message Storage and Create a Rule with HStreamDB Sink for Events Recording.
Create a Rule with HStreamDB Sink for Message Storage
This section demonstrates how to create a rule in the Dashboard for processing messages from the source MQTT topic t/#
, and writing the processed data to the HStreamDB stream mqtt_message
via configured Sink.
Go to EMQX Dashboard, and click Integration -> Rules.
Click Create on the top right corner of the page.
Enter
my_rule
as the rule ID. Set the rules in the SQL Editor using the following statement, which means the MQTT messages under topict/#
will be saved to HStreamDB.Note: If you want to specify your own SQL syntax, make sure that you have included all fields required by the Sink in
SELECT
part.sqlSELECT * FROM "t/#"
TIP
If you are a beginner user, click SQL Examples and Enable Test to learn and test the SQL rule.
Click the + Add Action button to define an action that the rule will trigger. With this action, EMQX sends the data processed by the rule to HStreamDB.
Select
HStreamDB
from the Type of Action dropdown list. Keep the Action dropdown with the defaultCreate Action
value. You can also select a Sink if you have created one. This demonstration will create a new Sink.Enter a name for the Sink. The name should be a combination of upper/lower case letters and numbers.
Select the
my_hstreamdb
just created from the Connector dropdown box. You can also create a new Connector by clicking the button next to the dropdown box. For the configuration parameters, see Create a Connector.Configure HStream Record Template for forwarding messages to the specific topic by using the template below for data insert:
json{"id": ${id}, "topic": "${topic}", "qos": ${qos}, "payload": "${payload}"}
Advanced settings (optional): Choose whether to use sync or async query mode as needed. For details, see Features of Sink.
Before clicking Create, you can click Test Connectivity to test that the Sink can be connected to the HStreamDB server.
Click the Create button to complete the Sink configuration. A new Sink will be added to the Action Outputs.
Back on the Create Rule page, verify the configured information. Click the Create button to generate the rule.
You have now successfully created the rule for forwarding data and recording online/offline status through the HStreamDB Sink. You can see the newly created rule on the Integration -> Rules page. Click the Actions(Sink) tab and you can see the new HStreamDB Sink.
You can also click Integration -> Flow Designer to view the topology and you can see that the messages under topic t/#
are sent and saved to HStreamDB after parsing by rule my_rule
.
Create a Rule with HStreamDB Sink for Events Recording
This section demonstrates how to create a rule for recording the clients' online/offline status and writing the events data to the HStreamDB stream mqtt_connect
via configured Sink.
The rule creation steps are similar to those in Creating a rule with Stream Sink for Message Storage except for the SQL rule syntax and Stream Record template.
The SQL rule syntax for online/offline status recording is as follows:
SELECT
*
FROM
"$events/client_connected", "$events/client_disconnected"
The Stream Record Template for the Sink is as follows:
{"clientid": "${clientid}", "event_type": "${event}", "event_time": ${timestamp}}
Test the Rules
Use MQTTX to send a message to topic t/1
to trigger an online/offline event.
mqttx pub -i emqx_c -t t/1 -m '{ "msg": "Hello HStreamDB" }'
Check the running status of the two Sinks.
- For the Sink for message storage, there should be one new incoming and one new outgoing message. Check whether the data is written into the Stream
mqtt_messages
:
# Enter `Control-C` to stop after reading Stream `mqtt_message`
root@9c7ce2f51860:/# hstream stream read-stream mqtt_message
timestamp: "1693903488278", id: 1947758763121538-8589934593-0, key: "", record: {"id": 00060498A3B3C4F8F4400100127E0002, "topic": "t/1", "qos": 0, "payload": { "msg": "Hello HStreamDB" }}
^CRead Done.
- For the Sink used to record online/offline status, there should be two new events recorded: client connected and client disconnected. Check whether the status recording is written into the Stream
mqtt_connect
:
# Enter `Control-C` to stop after reading Stream `mqtt_connect`
root@9c7ce2f51860:/# hstream stream read-stream mqtt_connect
timestamp: "1693903488274", id: 1947758827604597-8589934593-0, key: "", record: {"clientid": "emqx_c", "event_type": "client.connected", "event_time": 1693903488266}
timestamp: "1693903488294", id: 1947758827604597-8589934594-0, key: "", record: {"clientid": "emqx_c", "event_type": "client.disconnected", "event_time": 1693903488271}
^CRead Done.