Skip to content

集成 HStreamDB

HStreamDB 是一款专为流式数据设计的, 针对大规模实时数据流的接入、存储、处理、 分发等环节进行全生命周期管理的流数据库。它使用标准 SQL (及其流式拓展)作为主要接口语言,以实时性作为主要特征,旨在简化数据流的运维管理以及实时应用的开发

image

更多详细信息,请参考HStream官网

创建 HStreamDB 服务

部署参考文档,可使用 docker 本地部署,或云主机部署。 使用命令创建出 hstream-client :

bash
docker run -it --rm --name some-hstream-cli --network host hstreamdb/hstream:v0.8.0 hstream-client --port 6570 --client-id 1

进入控制台

bash
      __  _________________  _________    __  ___
     / / / / ___/_  __/ __ \/ ____/   |  /  |/  /
    / /_/ /\__ \ / / / /_/ / __/ / /| | / /|_/ /
   / __  /___/ // / / _, _/ /___/ ___ |/ /  / /
  /_/ /_//____//_/ /_/ |_/_____/_/  |_/_/  /_/


Command
  :h                           To show these help info
  :q                           To exit command line interface
  :help [sql_operation]        To show full usage of sql statement

创建 stream:

SQL
> CREATE STREAM demo_stream;
demo_stream
> SHOW STREAMS;
demo_stream
>

创建 HStreamDB 资源

进入 EMQX Dashboard,点击右侧 规则引擎 > 资源 > 创建,选取 HStreamDB 资源,输入资源地址与链接池。

image

创建规则

点击,规则引擎 > 规则 > 创建。 编辑规则 SQL:

SQL
SELECT

  payload

FROM

  "#"

文档中的规则 SQL 仅作为示例,请按照业务设计编写 SQL。

点击添加动作,选择数据持久化,保存数据到 HSTreamDB。 选择上一步中创建的资源,并输入参数,参数定义见下表:

参数名定义类型
StreamStream 名称,不可使用变量String
Ordering Key分区键,可使用变量String
启用批量插入开启或关闭批量写入,默认开启Boolean
最大批量数批量最大消息条目数量Integer
最大批量间隔(ms)批量最大间隔,单位毫秒Integer
消息内容模板写入的消息报文内容Binary

点击确定,创建。

image

现在使用 MQTT 桌面客户端 MQTTX 连接至 EMQX,发送一条数据。

image

点击规则监控

image

这时数据已经写入 HStreamDB,使用任意消费方式,将消息消费出来。文档中使用的是基于 HStream golang SDK 编写的简单消费工具 fetcher,读者可自行按照熟悉的编程语言编写消费端。可见消费日志如下:

shell
./fetcher -s f1 -n demo_stream -p 127.0.0.1:6570 -c cons1 -v
shell
{"level":"info","ts":1656311005.5250711,"msg":"[f1]","recordId":"[BatchId: 8589934593, BatchIndex: 0, ShardId: 1317059070792293]","payload":"Hello HSreamDB !"}

消息已经成功写入,并被消费完成。