Skip to content

将 MQTT 数据写入到 Apache Doris

注意

Apache Doris 数据集成功能仅适用于 EMQX 5.91 版本以及之后的专有版部署。

Apache Doris 是一款现代化的大规模并行处理(MPP)分析型数据库系统,具有高并发、高性能、易于使用等特点。它特别适用于实时分析和数据仓库等场景。您可以将 MQTT 数据集成至 Apache Doris,实现高效存储、实时分析和强大的数据可视化能力。

本页详细介绍了 EMQX Platform 与 Apache Doris 的数据集成并提供了实用的规则创建指导。

提示

EMQX Platform 中的 Apahe Doris 数据集成支持 Apache Doris 2.1.7 及之后版本。

工作原理

Apache Doris 数据集成是 EMQX Platform 中开箱即用的功能,通过简单的配置即可实现复杂的业务开发。在一个典型的物联网应用中,EMQX Platform 作为物联网平台,负责接入设备,进行消息传输,Apache Doris 作为数据存储平台,负责设备状态与元数据的存储,以及消息数据存储和数据分析等。

doris-integration

EMQX Platform 通过规则引擎与 Sink 将设备事件和数据转发至 Apache Doris,应用读取 Apache Doris 中数据即可感知设备状态,获取设备上下线记录,以及分析设备数据。其具体的工作流程如下:

  • 设备连接到 EMQX Platform:物联网设备通过 MQTT 协议连接成功后将触发上线事件,事件包含设备 ID、来源 IP 地址以及其他属性等信息。
  • 设备消息发布和接收:设备通过特定的主题发布遥测和状态数据,EMQX Platform 接收到消息后将在规则引擎中进行比对。
  • 规则引擎处理消息:通过内置的规则引擎,可以根据主题匹配处理特定来源的消息和事件。规则引擎会匹配对应的规则,并对消息和事件进行处理,例如转换数据格式、过滤掉特定信息或使用上下文信息丰富消息。
  • 写入到 Apache Doris:规则触发将消息写入到 Apache Doris 的操作。借助 SQL 模板,用户可以从规则处理结果中提取数据构造 SQL 发送给 Apache Doris 执行,实现将消息特定字段写入或更新到数据库对应表和列中。

事件和消息数据写入到 Apache Doris 后,您可以连接到 Apache Doris 读取数据,进行灵活的应用开发,例如:

  • 连接到可视化工具,例如 Grafana,根据数据生成图表,展示数据变化。
  • 连接到设备管理系统,查看设备列表与状态,并检测设备异常行为,及时排除潜在的问题。

特性与优势

在 EMQX Platform 中使用 Apache Doris 数据集成能够为您的业务带来以下特性与优势:

  • 灵活的事件处理:通过 EMQX Platform 规则引擎,Apache Doris 可以处理设备全生命周期事件,极大的方便开发实现物联网应用所需的各类管理与监控业务。您可以通过分析事件数据,及时发现设备故障、异常行为或趋势变化,以便采取适当的措施。
  • 消息转换:消息可以写入 Apache Doris 之前,通过 EMQX Platform 规则中进行丰富的处理和转换,方便后续的存储和使用。
  • 实时数据写入:Apache Doris 支持通过 HTTP 和 JDBC 接口进行实时数据写入。结合 EMQX Platform 使用时,MQTT 数据可以低延迟地直接写入 Doris 表,非常适合对实时查询和分析有高要求的场景。
  • 流式同步:Apache Doris 还支持从 Flink、Kafka 以及各类事务型数据库等数据源接入实时数据流。这使得用户可以构建统一的数据处理流水线,将 EMQX Platform 中的 MQTT 数据与其他流式数据源结合,实现全面的实时分析能力。
  • 标准 SQL 与生态兼容性:Doris 完全兼容 MySQL 语法并支持标准 SQL,使用户无需学习新语言即可执行强大的分析查询。同时,它能够轻松集成各类商业智能(BI)工具和客户端应用,用于构建仪表盘、生成报表以及实现自动化工作流程。
  • 运行时指标:支持查看每个 Sink 的运行时指标,例如消息总数、成功/失败计数、当前速率等。

通过灵活的事件处理、丰富的消息转换、灵活的数据操作以及实时监控与分析能力,您可以构建高效、可靠和可扩展的物联网应用,并在业务决策和优化方面受益。

准备工作

本节介绍了在 EMQX Platform 中创建 Apache Doris 数据集成之前需要做的准备工作,包括安装 Apache Doris 和创建数据表。

前置准备

网络设置

开始之前,您需要在 EMQX Platform 上创建一个部署(EMQX 群集)并配置网络。

  • 对于专有版部署用户: 请先创建 VPC 对等连接,创建完对等连接之后,可以通过内部网络 IP 登录 Platform Console 访问目标连接器。或者开通 NAT 网关,通过公网 IP 访问目标连接器。
  • 对于 BYOC 部署用户: 请在部署 BYOC 的 VPC 和目标连接器所在的 VPC 之间建立对等连接,创建完对等连接之后,可以通过内部网络 IP 访问目标连接器。如果您需要通过公共 IP 地址访问资源,请在公共云控制台中为部署 BYOC 的 VPC 配置 NAT 网关。

安装 Apache Doris 服务器

请按照官方指南使用 Docker Compose 在本地运行 Doris。

创建数据表

您可以使用 MySQL 客户端连接到 Doris 的 Frontend 并执行命令。参见官方文档

例如:

mysql -uroot -P9030 -h127.0.0.1

您需要在 Apache Doris 中创建数据库和两张数据表:

  • 数据表 emqx_messages 存储每条消息的发布者客户端 ID、主题、Payload 以及发布时间。

  • 数据表 emqx_client_events 存储上下线的客户端 ID、事件类型以及事件发生时间。

    sql
    create database mqtt;
    use mqtt;
    
    create table if not exists
      emqx_messages(
        clientid varchar,
        topic string,
        payload string,
        created_at datetime
      )
      properties (replication_num = 1);
    
    create table if not exists
      emqx_client_events(
        clientid varchar,
        event varchar,
        created_at datetime)
      properties (replication_num = 1);

创建连接器

您需要创建一个 Apache Doris 连接器,以便 EMQX Platform 与 Apache Doris 服务建立连接。

  1. 在部署菜单中选择数据集成,在数据持久化分类下选择 Doris。如果您已经创建了其他的连接器,点击新建连接器,然后在数据持久化分类下选择 Doris
  2. 新建连接器 页面中配置以下信息:
    • 连接器名称:系统将自动生成一个连接器的名称。
    • 服务器地址:填写 127.0.0.1:3306
    • 数据库名字:填写 mqtt
    • 用户名:填写 root
    • 密码:填写 public
  3. 根据需要配置高级设置选项(可选)。
  4. 点击测试连接按钮,如果 Doris 服务能够正常访问,则会返回成功提示。
  5. 点击新建按钮完成连接器创建。

创建消息存储 Sink 规则

本节演示了如何创建一条规则,用于处理来自源 MQTT 主题 t/# 的消息,并通过配置的动作将处理后的结果写入到 Apache Doris 的数据表 emqx_messages 中。

  1. 点击连接器列表操作列下的新建规则图标或在规则列表中点击新建规则进入新建规则步骤页。

  2. SQL 编辑器中输入规则,此处选择将 t/# 主题的 MQTT 消息存储至 Apache Doris,请确保规则选择出来的字段(SELECT 部分)包含所有 SQL 模板中用到的变量,此处规则 SQL 如下:

    sql
    SELECT 
      *
    FROM
      "t/#"

    TIP

    如果您初次使用 SQL,可以点击 SQL 示例启用调试来学习和测试规则 SQL 的结果。

  3. 点击下一步开始创建动作。

  4. 使用连接器下拉框中选择您之前创建的连接器。

  5. 配置 SQL 模板,使用如下 SQL 完成数据插入,此处为预处理 SQL,字段不应当包含引号,SQL 末尾不要带分号 (;):

    sql
    INSERT INTO emqx_messages(clientid, topic, payload, created_at) VALUES(
      ${clientid},
      ${topic},
      ${payload},
      FROM_UNIXTIME(${timestamp}/1000)
    )

    如果在模板中使用未定义的占位符变量,您可以切换未定义变量作为 NULL 开关(位于 SQL 模板 上方)来定义规则引擎的行为:

    • 关闭(默认):规则引擎可以将字符串 undefined 插入数据库。

    • 启用:允许规则引擎在变量未定义时将 NULL 插入数据库。

      TIP

      如果可能,应该始终启用此选项;禁用该选项仅用于确保向后兼容性。

  6. 备选动作(可选):如果您希望在消息投递失败时提升系统的可靠性,可以为动作配置一个或多个备选动作。当动作无法成功处理消息时,将触发这些备选动作。更多信息请参见:备选动作

  7. 展开高级设置,根据需要配置高级设置选项(可选),详细请参考高级设置

  8. 点击确认按钮完成动作的配置。

  9. 在弹出的成功创建规则提示框中点击返回规则列表,从而完成整个数据集成的配置。

创建事件记录 Sink 规则

本节展示如何创建用于记录客户端上/下线状态的规则,并通过配置的 Sink 将记录写入 Apache Doris 的数据表 emqx_client_events 中。除 SQL 模板与规则外,其他操作步骤与创建消息存储 Sink 规则章节完全相同。

您可以使用以下规则 SQL 创建规则:

sql
SELECT
  *
FROM 
  "$events/client/connected", "$events/client/disconnected"

您可以使用以下 SQL 模板创建实现设备上下线记录的 Sink,请注意字段不应当包含引号,SQL 末尾不要带分号 ;:

sql
INSERT INTO emqx_client_events(clientid, event, created_at) VALUES (
  ${clientid},
  ${event},
  FROM_UNIXTIME(${timestamp}/1000)
)

测试规则

使用 MQTTX 向 t/1 主题发布消息,此操作同时会触发上下线事件:

bash
mqttx pub -i emqx_c -t t/1 -m '{ "msg": "hello Apache Doris" }'

分别查看两个 Sink 运行统计,命中数和发送成功次数应各增加 1 次,上下线记录 Sink 命中和成功次数应增加 2 次。

查看数据是否已经写入表中,emqx_messages 表:

bash
mysql> select * from emqx_messages;
+----+----------+-------+--------------------------+---------------------+
| id | clientid | topic | payload                  | created_at          |
+----+----------+-------+--------------------------+---------------------+
|  1 | emqx_c   | t/1   | { "msg": "hello Apache Doris" } | 2022-12-09 08:44:07 |
+----+----------+-------+--------------------------+---------------------+
1 row in set (0.01 sec)

emqx_client_events 表:

bash
mysql> select * from emqx_client_events;
+----+----------+---------------------+---------------------+
| id | clientid | event               | created_at          |
+----+----------+---------------------+---------------------+
|  1 | emqx_c   | client.connected    | 2022-12-09 08:44:07 |
|  2 | emqx_c   | client.disconnected | 2022-12-09 08:44:07 |
+----+----------+---------------------+---------------------+
2 rows in set (0.00 sec)