Skip to content

将 MQTT 数据写入到 HStreamDB

TIP

EMQX 企业版功能。EMQX 企业版可以为您带来更全面的关键业务场景覆盖、更丰富的数据集成支持,更高的生产级可靠性保证以及 24/7 的全球技术支持,欢迎免费试用

HStreamDB 是一个开源的流数据平台,使您能够在一个统一的平台中高效地摄取、存储、处理和分发所有实时消息、事件和其他数据流。通过 EMQX 与 HStreamDB 的集成,您可以将 MQTT 消息和客户端事件保存到 HStreamDB 中,实现大规模物联网数据的采集、传输与存储,并使用标准 SQL 和物化视图对数据流进行实时处理以及监测和分析。

本页详细介绍了 EMQX 与 HStreamDB 的数据集成并提供了实用的规则和 Sink 创建指导。

TIP

仅 EMQX 5.2.0 及以上版本支持 HStreamDB Sink 功能。

工作原理

HStreamDB 数据集成是 EMQX 的即开即用功能,结合了 EMQX 的设备连接和消息传输能力以及 HStreamDB 强大的数据存储和处理能力。内置的规则引擎组件简化了两个平台之间的数据流和处理过程。

下图展示了 EMQX 和 HStreamDB 之间的数据集成的典型架构:

EMQX-HStreamDB 集成

EMQX 通过规则引擎和配置的 Sink 将 MQTT 数据转发到 Apache HStreamDB,整个过程如下:

  1. 消息发布和接收:物联网设备通过 MQTT 协议建立成功连接,随后发布遥测和状态数据到特定主题。当 EMQX 接收到这些消息时,它将在其规则引擎中启动匹配过程。
  2. 规则引擎处理消息:使用内置的规则引擎,可以根据主题匹配处理来自特定来源的 MQTT 消息。规则引擎匹配相应规则并处理消息,例如数据格式转换、过滤特定信息或用上下文信息丰富消息。
  3. 数据流入 HStreamDB:规则触发将消息转发到 HStreamDB 的动作,可以轻松配置数据到 HStreamDB 流名称、分区键和记录,便于后续的数据处理和分析。

在 MQTT 消息数据写入 HStreamDB 后,您可以进行灵活的应用程序开发,例如:

  • 在接收到特定 MQTT 消息时,可以使用 HStreamDB 的规则引擎组件触发相应的动作或事件,实现跨系统和应用的事件驱动功能。
  • 在 HStreamDB 中实时分析 MQTT 数据流,检测异常或特定事件模式,并根据这些条件触发警报通知或执行相应动作。
  • 将多个 MQTT 主题的数据集中到一个统一的数据流中,并利用 HStreamDB 的计算能力进行实时聚合、计算和分析,以获得更全面的数据洞察。

特性与优势

与 HStreamDB 的数据集成为您的业务带来以下特性和优势:

  • 可靠的物联网数据消息传递:EMQX 能够可靠地批量发送 MQTT 消息到 HStreamDB,使物联网设备与 HStreamDB 和应用系统集成。
  • MQTT 消息转换:使用规则引擎,EMQX 可以过滤和转换 MQTT 消息。在发送到 HStreamDB 之前,消息可以经过数据提取、过滤、丰富和转换。
  • 大规模数据流存储:HStreamDB 支持在专门设计的分布式、容错的日志存储集群中可靠地存储数百万数据流,并在需要的时候重放或推送实时数据流的更新到应用中。能够与 EMQX 消息模型完美结合,实现大规模物联网数据采集传输与存储。
  • 集群和可扩展性:EMQX 和 HStreamDB 采用云原生架构构建,支持集群在线伸缩、动态扩缩容,随着业务增长灵活地水平扩展以满足不断扩大的需求。
  • 灵活的处理能力:在 HStreamDB 可以使用熟悉的 SQL 来过滤、转换、聚合以及连接多个数据流,也支持使用标准 SQL 和物化视图进行数据流实时处理以及监测和分析,获取实时数据洞察。
  • 高吞吐量场景中的处理能力:HStreamDB Sink 支持同步和异步写入模式,允许根据不同场景在延迟和吞吐量之间灵活平衡。

准备工作

本节介绍了在 EMQX 中创建 HStreamDB 数据集成之前需要做的准备工作,包括如何设置 HStreamDB 服务器并创建 Stream。

以下小节描述如何使用 Docker 镜像在 Linux/MacOS 安装启动 HStreamDB,因此请确保 Docker 已安装并尽可能使用 Docker Compose v2。关于其他 HStreamDB 的安装方式及 HStreamDB Platform,请参阅使用 Docker-Compose 快速开始以及开始使用 HStream Platform

本教程假设您在本地机器上同时运行 EMQX 和容器内的 HStreamDB。 如果您有远程运行的 EMQX 和 HStreamDB,请相应地调整设置。

前置准备

启动 HStreamDB 服务并创建 Stream

创建连接器

在创建 RocketMQ Sink 之前,您需要创建一个 HStreamDB 连接器,以便 EMQX 与 HStreamDB 服务建立连接。以下示例假定您在本地机器上同时运行 EMQX 和 HStreamDB。如果您在远程运行 HStreamDB 和 EMQX,请相应地调整设置。

  1. 转到 Dashboard 集成 -> 连接器 页面。点击页面右上角的创建

  2. 在连接器类型中选择 HStreamDB,点击下一步

  3. 配置 步骤,配置以下信息(带星号字段为必填字段。):

    • 连接器名称:应为大写和小写字母及数字的组合,例如:my_hstreamdb
    • 服务器地址hstream://127.0.0.1:6570,或使用实际的 HStreamDB 地址和端口。
      • schema 支持 httphttpshstreamhstreams
      • 对与 TLS 连接,scheme 需要使用 hstreamshttps,如 hstreams://127.0.0.1:6570
    • HStreamDB 流名称: 需要写入的 Stream 名,如 mqtt_message (用于消息存储)或 mqtt_connect (用于事件记录)。
    • HStreamDB 分区键:指定用于确定数据将存储在 HStreamDB 的哪个分区或节点内的分区键。例如,您可以输入 ${topic} 以确保相同主题的消息被有序写入 HStreamDB。如果未指定,将使用默认键,数据将被映射到某个默认的分片。
    • HStreamDB gRPC 超时:指定当发出 gRPC 请求到 HStreamDB 服务器时,系统将等待响应的最长时间。默认值是 30 秒。
    • 启用 TLS: 启用 TLS 连接时,关闭验证服务器证书
      • tls-deploy/ca 目录下生成的证书及私钥文件: ca/certs/root_ca.crtca/hstream.crtca/hstream.key 分别填入 CA CertTLS CertTLS Key
  4. 高级配置(可选):详细请参考 Sink 的特性

  5. 点击创建按钮完成连接器创建。

  6. 在弹出的创建成功对话框中您可以点击创建规则,继续创建规则以指定需要写入 HStreamDB 的数据和需要记录的客户端事件。您也可以按照创建消息存储 Sink 规则创建事件记录 Sink 规则章节的步骤来创建规则。

创建消息存储 Sink 规则

本节演示了如何在 Dashboard 中创建一条规则,用于处理来自源 MQTT 主题 t/# 的消息,并通过配置的 Sink 将处理后的数据写入到 HStream 的 Stream mqtt_message

  1. 转到 Dashboard 集成 -> 规则页面。

  2. 点击页面右上角的创建

  3. 输入规则 ID my_rule。在 SQL 编辑器中输入规则以实现对指定主题消息的转发。例如将 t/# 主题的 MQTT 消息存储至 HStreamDB,输入以下 SQL 语句: 注意:如果您希望制定自己的 SQL 语句,需要确保规则选出的字段(SELECT 部分)包含之后配置的 HStream Record 模板中用到的所有变量。

    sql
     SELECT
       *
     FROM
       "t/#"

    TIP

    如果您初次使用 SQL,可以点击 SQL 示例启用调试来学习和测试规则 SQL 的结果。

  4. 点击右侧的添加动作按钮,为规则在被触发的情况下指定一个动作。在动作类型下拉框中选择 HStreamDB,保持动作下拉框为默认的创建动作选项,您也可以选择一个之前已经创建好的 HStreamDB Sink。此处我们创建一个全新的 Sink 并添加到规则中。

  5. 输入 Sink 名称,名称应为大/小写字母和数字的组合。

  6. 连接器下拉框中选择刚刚创建的 my_hstreamdb。您也可以通过点击下拉框旁边的按钮创建一个新的连接器。有关配置参数,请参见创建连接器

  7. 配置 HStream Record 模板以实现对指定主题消息的转发。使用如下 HRecord 模板完成数据插入:

    json
    {"id": ${id}, "topic": "${topic}", "qos": ${qos}, "payload": "${payload}"}
  8. 高级配置(可选),根据情况配置同步/异步模式,队列等参数,详细请参考 Sink 的特性

  9. 点击添加按钮完成 Sink 创建,新建的 Sink 将被添加到动作输出列表中。

  10. 回到创建规则页面,对配置的信息进行确认,点击创建。一条规则应该出现在规则列表中。

现在您已成功创建了通过 HStreamDB Sink 将数据转发到 HStreamDB 的规则,同时在规则页面的动作(Sink) 标签页看到新建的 HStreamDB Sink。

您还可以点击 集成 -> Flow 设计器可以查看拓扑,通过拓扑可以直观的看到,主题 t/# 下的消息在经过规则 my_rule 解析后被发送到 HStreamDB 中。

创建上下线记录 Sink 规则

本节展示如何创建用于记录客户端上/下线状态的规则,并通过配置的 Sink 将记录写入到 HStreamDB 的 Stream mqtt_connect 中。

注意:除规则 SQL 和 Sink 的 Stream Record 模板设置不同外,其他操作步骤与创建消息存储 Sink 规则章节完全相同。

规则 SQL 模版设置如下:

sql
SELECT
  *
FROM
  "$events/client_connected", "$events/client_disconnected"

Sink 的 Stream Record 模板设置如下:

sql
{"clientid": "${clientid}", "event_type": "${event}", "event_time": ${timestamp}}

测试规则

使用 MQTTX 向 t/1 主题发布消息。

bash
mqttx pub -i emqx_c -t t/1 -m '{ "msg": "Hello HStreamDB" }'

查看 HStreamDB Sink 运行统计。

  • 用于消息存储的 Sink ,命中、发送成功次数均 +1。查看数据是否已经写入流 mqtt_message 中:
bash
# 读取 Stream `mqtt_message` 之后按 `Control-C` 停止
root@9c7ce2f51860:/# hstream stream read-stream mqtt_message
timestamp: "1693903488278", id: 1947758763121538-8589934593-0, key: "", record: {"id": 00060498A3B3C4F8F4400100127E0002, "topic": "t/1", "qos": 0, "payload": { "msg": "Hello HStreamDB" }}
^CRead Done.
  • 用于存储上下线事件的 HStreamDB Sink ,命中、发送次数均 +2,即一次上线和一次下线。查看设备状态是否已经写入流 mqtt_connect 中:
bash
# 读取 Stream `mqtt_connect` 之后按 `Control-C` 停止
root@9c7ce2f51860:/# hstream stream read-stream mqtt_connect
timestamp: "1693903488274", id: 1947758827604597-8589934593-0, key: "", record: {"clientid": "emqx_c", "event_type": "client.connected", "event_time": 1693903488266}
timestamp: "1693903488294", id: 1947758827604597-8589934594-0, key: "", record: {"clientid": "emqx_c", "event_type": "client.disconnected", "event_time": 1693903488271}
^CRead Done.