# Save device data to HStreamDB using the Data Integrations
HStreamDB (opens new window) is a streaming database designed for large-scale real-time data stream management, including access, storage, processing, and distribution. It uses standard SQL (and its streaming extensions) as the main interface language, with real-time as the main feature, and aims to simplify the operation and management of data streams and the development of real-time applications.
In this article, we will simulate temperature and humidity data and report it to EMQX Cloud via the MQTT protocol, and then use the EMQX Cloud data integrations to dump the data to HStreamDB.
Before you start, you need to complete the following operations:
- Deployments have already been created on EMQX Cloud (EMQX Cluster).
- For Professional Plan users: Please complete Peering Connection Creation first, all IPs mentioned below refer to the internal network IP of the resource.(Professional Plan with a NAT gateway can also use public IP to connect to resources)
# HStreamDB Configuration
HStreamDB Installation
To deploy, refer to the HStreamDB Help (opens new window) and create the hstream-client using the following command.
Create quick-start.yaml
## quick-start.yaml version: "3.5" services: hserver0: image: hstreamdb/hstream:v0.8.0 depends_on: - zookeeper - hstore ports: - "6570:6570" expose: - 6570 networks: - hstream-network volumes: - data_store:/data/store command: - bash - "-c" - | set -e /usr/local/script/wait-for-storage.sh hstore 6440 zookeeper 2181 600 \ /usr/local/bin/hstream-server \ --host 0.0.0.0 --port 6570 \ --internal-port 6571 \ --server-id 100 \ --address < server ip > \ --zkuri zookeeper:2181 \ --store-config /data/store/logdevice.conf \ --store-admin-host hstore --store-admin-port 6440 hserver1: image: hstreamdb/hstream:v0.8.0 depends_on: - zookeeper - hstore ports: - "6572:6572" expose: - 6572 networks: - hstream-network volumes: - data_store:/data/store command: - bash - "-c" - | set -e /usr/local/script/wait-for-storage.sh hstore 6440 zookeeper 2181 600 \ /usr/local/bin/hstream-server \ --host 0.0.0.0 --port 6572 \ --internal-port 6573 \ --server-id 101 \ --address < server ip > \ --zkuri zookeeper:2181 \ --store-config /data/store/logdevice.conf \ --store-admin-host hstore --store-admin-port 6440 hstream-http-server: image: hstreamdb/hstream:v0.8.0 depends_on: - hserver0 - hserver1 ports: - "6580:6580" expose: - 6580 networks: - hstream-network command: - bash - "-c" - | set -e /usr/local/bin/hstream-http-server \ -gRPCServerHost hserver \ -httpServerPort 6580 \ -gRPCServerPort 6570 hstore: image: hstreamdb/hstream:v0.8.0 networks: - hstream-network volumes: - data_store:/data/store command: - bash - "-c" - | set -ex /usr/local/bin/ld-dev-cluster --root /data/store \ --use-tcp --tcp-host $$(hostname -I | awk '{print $$1}') \ --user-admin-port 6440 \ --no-interactive zookeeper: image: zookeeper expose: - 2181 networks: - hstream-network volumes: - data_zk_data:/data - data_zk_datalog:/datalog networks: hstream-network: name: hstream-network volumes: data_store: name: quickstart_data_store data_zk_data: name: quickstart_data_zk_data data_zk_datalog: name: quickstart_data_zk_datalog
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119Start HStreamDB service
# Launch in the same folder docker-compose -f quick-start.yaml up # Backstage start-up docker-compose -f quick-start.yaml up -d
1
2
3
4
5Launching the SQL command line interface of HStreamDB
Use the following command to create the hstream-client :
docker run -it --rm --name some-hstream-cli --network host hstreamdb/hstream:v0.8.0 hstream-client --port 6570 --client-id 1
1Access to the console.
__ _________________ _________ __ ___ / / / / ___/_ __/ __ \/ ____/ | / |/ / / /_/ /\__ \ / / / /_/ / __/ / /| | / /|_/ / / __ /___/ // / / _, _/ /___/ ___ |/ / / / /_/ /_//____//_/ /_/ |_/_____/_/ |_/_/ /_/ Command :h To show these help info :q To exit command line interface :help [sql_operation] To show full usage of sql statement
1
2
3
4
5
6
7
8
9
10
11Stream creation
> CREATE STREAM temp_hum; temp_hum > SHOW STREAMS; temp_hum
1
2
3
4
# EMQX Cloud Data Integration Configuration
Create HStreamDB resource
Click on
Data Integration
in the left menu bar, find HStreamDB under Data Persistence, and click onNew Resource
.Fill in the information about the HStreamDB database you have just created and click on Test. If an error occurs, you should check if the database configuration is correct.
Create Rule
Once the resource is created click on New Rule and enter the following rule to match the SQL statement. In the following rule we read the message upload time
up_timestamp
, the client ID, the message body (Payload) from thetemp_hum/emqx
subject and the temperature and humidity from the message body respectively.SELECT timestamp as up_timestamp, clientid as client_id, payload.temp as temp, payload.hum as hum FROM "temp_hum/emqx"
1
2
3
4
5
6
7
8
9We can use the
SQL test
to test and see the results.Add Action
Click next to the action screen, select the resource created in the first step, select
Data Persistence - Save Data to HStreamDB
for the action type and enter the parameters, which are defined in the following table.Parameter Name Definition Type Stream Stream name, no variables allowed String Ordering Key Partition key, use variable String Enable Bulk Insertion Enables or disables bulk writing, default is on Boolean Maximum number of batches Maximum number of message entries in a batch Integer Max Batch Interval (ms) Maximum interval between batches, in milliseconds Integer Message content template Content of the message message to be written Binary Insert the message content template and click on Confirm.
{"up_timestamp": ${up_timestamp}, "client_id": ${client_id}, "temp": ${temp}, "hum": ${hum}}
1View Resource Detail
Once the action has been created, return to the list and click on the resource to view the resource details and rule monitoring information.
Check Rules Monitoring
The rule details screen allows you to view rule and action details by clicking on the rule.
# Test
Use MQTT X (opens new window) to connect the deployment
You need to replace broker.emqx.io with the created deployment connection address, and add client authentication information to the EMQX Cloud Dashboard.
topic:
temp_hum/emqx
payload:
{ "temp": "36.4", "hum": "23.5" }
1
2
3
4
View the data saving results
The data has been written to HStreamDB and you can use any consumption method to consume the message.
The consumption tool fetcher (opens new window) based on the HStream Golang SDK is used in this example, and the result is as follows.
./fetcher -s f1 -n temp_hum -p <server ip>:6570 -c cons1 -v
1