# Save device data to HStreamDB using the Data Integrations

HStreamDB (opens new window) is a streaming database designed for large-scale real-time data stream management, including access, storage, processing, and distribution. It uses standard SQL (and its streaming extensions) as the main interface language, with real-time as the main feature, and aims to simplify the operation and management of data streams and the development of real-time applications.

In this article, we will simulate temperature and humidity data and report it to EMQX Cloud via the MQTT protocol, and then use the EMQX Cloud data integrations to dump the data to HStreamDB.

Before you start, you need to complete the following operations:

  • Deployments have already been created on EMQX Cloud (EMQX Cluster).
  • For Professional Plan users: Please complete Peering Connection Creation first, all IPs mentioned below refer to the intranet IP of the resource.(Professional Plan with a NAT gateway can also use public IP to connect to resources)

# HStreamDB Configuration

  1. HStreamDB Installation

    To deploy, refer to the HStreamDB Help (opens new window) and create the hstream-client using the following command.

    Create quick-start.yaml

    ## quick-start.yaml
    
    version: "3.5"
    
    services:
    hserver0:
       image: hstreamdb/hstream:v0.8.0
       depends_on:
          - zookeeper
          - hstore
       ports:
          - "6570:6570"
       expose:
          - 6570
       networks:
          - hstream-network
       volumes:
          - data_store:/data/store
       command:
          - bash
          - "-c"
          - |
          set -e
          /usr/local/script/wait-for-storage.sh hstore 6440 zookeeper 2181 600 \
          /usr/local/bin/hstream-server \
          --host 0.0.0.0 --port 6570 \
          --internal-port 6571 \
          --server-id 100 \
          --address < server ip >  \
          --zkuri zookeeper:2181 \
          --store-config /data/store/logdevice.conf \
          --store-admin-host hstore --store-admin-port 6440
    
    hserver1:
       image: hstreamdb/hstream:v0.8.0
       depends_on:
          - zookeeper
          - hstore
       ports:
          - "6572:6572"
       expose:
          - 6572
       networks:
          - hstream-network
       volumes:
             - data_store:/data/store
       command:
          - bash
          - "-c"
          - |
          set -e
          /usr/local/script/wait-for-storage.sh hstore 6440 zookeeper 2181 600 \
          /usr/local/bin/hstream-server \
          --host 0.0.0.0 --port 6572 \
          --internal-port 6573 \
          --server-id 101 \
          --address < server ip > \
          --zkuri zookeeper:2181 \
          --store-config /data/store/logdevice.conf \
          --store-admin-host hstore --store-admin-port 6440
    
    hstream-http-server:
       image: hstreamdb/hstream:v0.8.0
       depends_on:
          - hserver0
          - hserver1
       ports:
          - "6580:6580"
       expose:
          - 6580
       networks:
          - hstream-network
       command:
          - bash
          - "-c"
          - |
          set -e
          /usr/local/bin/hstream-http-server \
             -gRPCServerHost hserver \
             -httpServerPort 6580 \
             -gRPCServerPort 6570
    
    hstore:
       image: hstreamdb/hstream:v0.8.0
       networks:
          - hstream-network
       volumes:
          - data_store:/data/store
       command:
          - bash
          - "-c"
          - |
          set -ex
          /usr/local/bin/ld-dev-cluster --root /data/store \
          --use-tcp --tcp-host $$(hostname -I | awk '{print $$1}') \
          --user-admin-port 6440 \
          --no-interactive
    
    zookeeper:
       image: zookeeper
       expose:
          - 2181
       networks:
          - hstream-network
       volumes:
          - data_zk_data:/data
          - data_zk_datalog:/datalog
    
    networks:
    hstream-network:
       name: hstream-network
    
    volumes:
    data_store:
       name: quickstart_data_store
    data_zk_data:
       name: quickstart_data_zk_data
    data_zk_datalog:
       name: quickstart_data_zk_datalog
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119

    Start HStreamDB service

    # Launch in the same folder
    docker-compose -f quick-start.yaml up
    
    # Backstage start-up
    docker-compose -f quick-start.yaml up -d
    
    1
    2
    3
    4
    5
  2. Launching the SQL command line interface of HStreamDB

    Use the following command to create the hstream-client :

    docker run -it --rm --name some-hstream-cli --network host hstreamdb/hstream:v0.8.0 hstream-client --port 6570 --client-id 1
    
    1

    Access to the console.

            __  _________________  _________    __  ___
      / / / / ___/_  __/ __ \/ ____/   |  /  |/  /
      / /_/ /\__ \ / / / /_/ / __/ / /| | / /|_/ /
      / __  /___/ // / / _, _/ /___/ ___ |/ /  / /
    /_/ /_//____//_/ /_/ |_/_____/_/  |_/_/  /_/
    
    
    Command
    :h                           To show these help info
    :q                           To exit command line interface
    :help [sql_operation]        To show full usage of sql statement
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
  3. Stream creation

    > CREATE STREAM temp_hum;
    temp_hum
    > SHOW STREAMS;
    temp_hum
    
    1
    2
    3
    4

# EMQX Cloud Data Integration Configuration

  1. Create HStreamDB resource

    Click on Data Integration in the left menu bar, find HStreamDB under Data Persistence, and click on New Resource.

    data_integration_hstreamdb

    Fill in the information about the HStreamDB database you have just created and click on Test. If an error occurs, you should check if the database configuration is correct.

    hstreamdb_resource

  2. Create Rule

    Once the resource is created click on New Rule and enter the following rule to match the SQL statement. In the following rule we read the message upload time up_timestamp, the client ID, the message body (Payload) from the temp_hum/emqx subject and the temperature and humidity from the message body respectively.

    SELECT
    
    timestamp as up_timestamp, 
    clientid as client_id, 
    payload.temp as temp, 
    payload.hum as hum
    
    FROM
    "temp_hum/emqx"  
    
    1
    2
    3
    4
    5
    6
    7
    8
    9

    We can use the SQL test to test and see the results.

    hstreamdb_rule hstreamdb_rule

  3. Add Action

    Click next to the action screen, select the resource created in the first step, select Data Persistence - Save Data to HStreamDB for the action type and enter the parameters, which are defined in the following table.

    Parameter NameDefinitionType
    StreamStream name, no variables allowedString
    Ordering KeyPartition key, use variableString
    Enable Bulk InsertionEnables or disables bulk writing, default is onBoolean
    Maximum number of batchesMaximum number of message entries in a batchInteger
    Max Batch Interval (ms)Maximum interval between batches, in millisecondsInteger
    Message content templateContent of the message message to be writtenBinary

    Insert the message content template and click on Confirm.

    {"up_timestamp": ${up_timestamp}, "client_id": ${client_id}, "temp": ${temp}, "hum": ${hum}} 
    
    1

    hstreamdb_action

  4. View Resource Detail

    Once the action has been created, return to the list and click on the resource to view the resource details and rule monitoring information.

    hstreamdb_resource_details

  5. Check Rules Monitoring

    The rule details screen allows you to view rule and action details by clicking on the rule.

    hstreamdb_rule_details

# Test

  1. Use MQTT X (opens new window) to connect the deployment

    You need to replace broker.emqx.io with the created deployment connection address, and add client authentication information to the EMQX Cloud Dashboard.

    • topic: temp_hum/emqx

    • payload:

      {
         "temp": "36.4",
         "hum": "23.5"
      }
      
      1
      2
      3
      4

    hstreamdb_mqttx

  2. View the data saving results

    The data has been written to HStreamDB and you can use any consumption method to consume the message.

    The consumption tool fetcher (opens new window) based on the HStream Golang SDK is used in this example, and the result is as follows.

    ./fetcher -s f1 -n temp_hum -p <server ip>:6570 -c cons1 -v
    
    1

    hstreamdb_result