将 MQTT 数据写入到 HStreamDB
TIP
HStreamDB 数据集成是 EMQX 企业版功能。
HStreamDB 是一个开源的流数据平台,使您能够在一个统一的平台中高效地摄取、存储、处理和分发所有实时消息、事件和其他数据流。通过 EMQX 与 HStreamDB 的集成,您可以将 MQTT 消息和客户端事件保存到 HStreamDB 中,实现大规模物联网数据的采集、传输与存储,并使用标准 SQL 和物化视图对数据流进行实时处理以及监测和分析。
本页详细介绍了 EMQX 与 HStreamDB 的数据集成并提供了实用的规则和 Sink 创建指导。
TIP
仅 EMQX 5.2.0 及以上版本支持 HStreamDB Sink 功能。
工作原理
HStreamDB 数据集成是 EMQX 的即开即用功能,结合了 EMQX 的设备连接和消息传输能力以及 HStreamDB 强大的数据存储和处理能力。内置的规则引擎组件简化了两个平台之间的数据流和处理过程。
下图展示了 EMQX 和 HStreamDB 之间的数据集成的典型架构:
EMQX 通过规则引擎和配置的 Sink 将 MQTT 数据转发到 Apache HStreamDB,整个过程如下:
- 消息发布和接收:物联网设备通过 MQTT 协议建立成功连接,随后发布遥测和状态数据到特定主题。当 EMQX 接收到这些消息时,它将在其规则引擎中启动匹配过程。
- 规则引擎处理消息:使用内置的规则引擎,可以根据主题匹配处理来自特定来源的 MQTT 消息。规则引擎匹配相应规则并处理消息,例如数据格式转换、过滤特定信息或用上下文信息丰富消息。
- 数据流入 HStreamDB:规则触发将消息转发到 HStreamDB 的动作,可以轻松配置数据到 HStreamDB 流名称、分区键和记录,便于后续的数据处理和分析。
在 MQTT 消息数据写入 HStreamDB 后,您可以进行灵活的应用程序开发,例如:
- 在接收到特定 MQTT 消息时,可以使用 HStreamDB 的规则引擎组件触发相应的动作或事件,实现跨系统和应用的事件驱动功能。
- 在 HStreamDB 中实时分析 MQTT 数据流,检测异常或特定事件模式,并根据这些条件触发警报通知或执行相应动作。
- 将多个 MQTT 主题的数据集中到一个统一的数据流中,并利用 HStreamDB 的计算能力进行实时聚合、计算和分析,以获得更全面的数据洞察。
特性与优势
与 HStreamDB 的数据集成为您的业务带来以下特性和优势:
- 可靠的物联网数据消息传递:EMQX 能够可靠地批量发送 MQTT 消息到 HStreamDB,使物联网设备与 HStreamDB 和应用系统集成。
- MQTT 消息转换:使用规则引擎,EMQX 可以过滤和转换 MQTT 消息。在发送到 HStreamDB 之前,消息可以经过数据提取、过滤、丰富和转换。
- 大规模数据流存储:HStreamDB 支持在专门设计的分布式、容错的日志存储集群中可靠地存储数百万数据流,并在需要的时候重放或推送实时数据流的更新到应用中。能够与 EMQX 消息模型完美结合,实现大规模物联网数据采集传输与存储。
- 集群和可扩展性:EMQX 和 HStreamDB 采用云原生架构构建,支持集群在线伸缩、动态扩缩容,随着业务增长灵活地水平扩展以满足不断扩大的需求。
- 灵活的处理能力:在 HStreamDB 可以使用熟悉的 SQL 来过滤、转换、聚合以及连接多个数据流,也支持使用标准 SQL 和物化视图进行数据流实时处理以及监测和分析,获取实时数据洞察。
- 高吞吐量场景中的处理能力:HStreamDB Sink 支持同步和异步写入模式,允许根据不同场景在延迟和吞吐量之间灵活平衡。
准备工作
本节介绍了在 EMQX 中创建 HStreamDB 数据集成之前需要做的准备工作,包括如何设置 HStreamDB 服务器并创建 Stream。
以下小节描述如何使用 Docker 镜像在 Linux/MacOS 安装启动 HStreamDB,因此请确保 Docker 已安装并尽可能使用 Docker Compose v2。关于其他 HStreamDB 的安装方式及 HStreamDB Platform,请参阅使用 Docker-Compose 快速开始以及开始使用 HStream Platform。
本教程假设您在本地机器上同时运行 EMQX 和容器内的 HStreamDB。 如果您有远程运行的 EMQX 和 HStreamDB,请相应地调整设置。
前置准备
启动 HStreamDB 服务并创建 Stream
创建连接器
在创建 RocketMQ Sink 之前,您需要创建一个 HStreamDB 连接器,以便 EMQX 与 HStreamDB 服务建立连接。以下示例假定您在本地机器上同时运行 EMQX 和 HStreamDB。如果您在远程运行 HStreamDB 和 EMQX,请相应地调整设置。
转到 Dashboard 集成 -> 连接器 页面。点击页面右上角的创建。
在连接器类型中选择 HStreamDB,点击下一步。
在 配置 步骤,配置以下信息(带星号字段为必填字段。):
- 连接器名称:应为大写和小写字母及数字的组合,例如:
my_hstreamdb
。 - 服务器地址:
hstream://127.0.0.1:6570
,或使用实际的 HStreamDB 地址和端口。- schema 支持
http
、https
、hstream
、hstreams
。 - 对与 TLS 连接,scheme 需要使用
hstreams
或https
,如hstreams://127.0.0.1:6570
。
- schema 支持
- HStreamDB 流名称: 需要写入的 Stream 名,如
mqtt_message
(用于消息存储)或mqtt_connect
(用于事件记录)。 - HStreamDB 分区键:指定用于确定数据将存储在 HStreamDB 的哪个分区或节点内的分区键。例如,您可以输入
${topic}
以确保相同主题的消息被有序写入 HStreamDB。如果未指定,将使用默认键,数据将被映射到某个默认的分片。 - HStreamDB gRPC 超时:指定当发出 gRPC 请求到 HStreamDB 服务器时,系统将等待响应的最长时间。默认值是
30
秒。 - 启用 TLS: 启用 TLS 连接时,关闭验证服务器证书。
tls-deploy/ca
目录下生成的证书及私钥文件:ca/certs/root_ca.crt
,ca/hstream.crt
,ca/hstream.key
分别填入CA Cert
,TLS Cert
,TLS Key
。
- 连接器名称:应为大写和小写字母及数字的组合,例如:
高级配置(可选):详细请参考 Sink 的特性。
点击创建按钮完成连接器创建。
在弹出的创建成功对话框中您可以点击创建规则,继续创建规则以指定需要写入 HStreamDB 的数据和需要记录的客户端事件。您也可以按照创建消息存储 Sink 规则和创建事件记录 Sink 规则章节的步骤来创建规则。
创建消息存储 Sink 规则
本节演示了如何在 Dashboard 中创建一条规则,用于处理来自源 MQTT 主题 t/#
的消息,并通过配置的 Sink 将处理后的数据写入到 HStream 的 Stream mqtt_message
。
转到 Dashboard 集成 -> 规则页面。
点击页面右上角的创建。
输入规则 ID
my_rule
。在 SQL 编辑器中输入规则以实现对指定主题消息的转发。例如将t/#
主题的 MQTT 消息存储至 HStreamDB,输入以下 SQL 语句: 注意:如果您希望制定自己的 SQL 语句,需要确保规则选出的字段(SELECT 部分)包含之后配置的 HStream Record 模板中用到的所有变量。sqlSELECT * FROM "t/#"
TIP
如果您初次使用 SQL,可以点击 SQL 示例 和启用调试来学习和测试规则 SQL 的结果。
点击右侧的添加动作按钮,为规则在被触发的情况下指定一个动作。在动作类型下拉框中选择
HStreamDB
,保持动作下拉框为默认的创建动作
选项,您也可以选择一个之前已经创建好的 HStreamDB Sink。此处我们创建一个全新的 Sink 并添加到规则中。输入 Sink 名称,名称应为大/小写字母和数字的组合。
从连接器下拉框中选择刚刚创建的
my_hstreamdb
。您也可以通过点击下拉框旁边的按钮创建一个新的连接器。有关配置参数,请参见创建连接器。配置 HStream Record 模板以实现对指定主题消息的转发。使用如下 HRecord 模板完成数据插入:
json{"id": ${id}, "topic": "${topic}", "qos": ${qos}, "payload": "${payload}"}
高级配置(可选),根据情况配置同步/异步模式,队列等参数,详细请参考 Sink 的特性。
点击添加按钮完成 Sink 创建,新建的 Sink 将被添加到动作输出列表中。
回到创建规则页面,对配置的信息进行确认,点击创建。一条规则应该出现在规则列表中。
现在您已成功创建了通过 HStreamDB Sink 将数据转发到 HStreamDB 的规则,同时在规则页面的动作(Sink) 标签页看到新建的 HStreamDB Sink。
您还可以点击 集成 -> Flow 设计器可以查看拓扑,通过拓扑可以直观的看到,主题 t/#
下的消息在经过规则 my_rule
解析后被发送到 HStreamDB 中。
创建上下线记录 Sink 规则
本节展示如何创建用于记录客户端上/下线状态的规则,并通过配置的 Sink 将记录写入到 HStreamDB 的 Stream mqtt_connect
中。
注意:除规则 SQL 和 Sink 的 Stream Record 模板设置不同外,其他操作步骤与创建消息存储 Sink 规则章节完全相同。
规则 SQL 模版设置如下:
SELECT
*
FROM
"$events/client_connected", "$events/client_disconnected"
Sink 的 Stream Record 模板设置如下:
{"clientid": "${clientid}", "event_type": "${event}", "event_time": ${timestamp}}
测试规则
使用 MQTTX 向 t/1
主题发布消息。
mqttx pub -i emqx_c -t t/1 -m '{ "msg": "Hello HStreamDB" }'
查看 HStreamDB Sink 运行统计。
- 用于消息存储的 Sink ,命中、发送成功次数均 +1。查看数据是否已经写入流
mqtt_message
中:
# 读取 Stream `mqtt_message` 之后按 `Control-C` 停止
root@9c7ce2f51860:/# hstream stream read-stream mqtt_message
timestamp: "1693903488278", id: 1947758763121538-8589934593-0, key: "", record: {"id": 00060498A3B3C4F8F4400100127E0002, "topic": "t/1", "qos": 0, "payload": { "msg": "Hello HStreamDB" }}
^CRead Done.
- 用于存储上下线事件的 HStreamDB Sink ,命中、发送次数均 +2,即一次上线和一次下线。查看设备状态是否已经写入流
mqtt_connect
中:
# 读取 Stream `mqtt_connect` 之后按 `Control-C` 停止
root@9c7ce2f51860:/# hstream stream read-stream mqtt_connect
timestamp: "1693903488274", id: 1947758827604597-8589934593-0, key: "", record: {"clientid": "emqx_c", "event_type": "client.connected", "event_time": 1693903488266}
timestamp: "1693903488294", id: 1947758827604597-8589934594-0, key: "", record: {"clientid": "emqx_c", "event_type": "client.disconnected", "event_time": 1693903488271}
^CRead Done.