Skip to content

Flow 设计器

注意

Flow 设计器不适用于 EMQX Serverless 版部署。

Flow 设计器是一个强大的可视化工具,您可以用它来创建和编辑数据处理流程 (Flow) ,使数据处理和集成的配置变得更加简单和高效。您还可以对创建的数据处理流程进行测试。

在 Flow 设计器中创建的规则和在数据集成中创建的表单规则是互通的。这意味着,您可以在 Flow 设计器中创建规则,并在数据集成页面查看对应的规则 SQL 及相关配置;或者,您也可以在数据集成的 SQL 编辑器中创建规则,然后在 Flow 设计器中查看规则的数据流处理配置。

flow_designer_overview

主要功能

想要访问 Flow 设计器,您可以在 EMQX Platform 控制台中进入您的部署,在左侧菜单中点击数据集成 -> Flow 设计器。如果您已经创建了规则或数据集成,您可以看到由多个节点组成的有向无环图,每个节点代表一个数据处理步骤,例如从某个主题、事件或数据源(Source)中读取数据、通过规则转换数据,再使用 Sink 转发数据。

该页面展示了通过规则引擎和 Flow 设计器创建的所有数据处理流程。您可以直观地查看数据流向,即数据如何从设备或客户端经过规则处理后流向外部数据系统,或者从外部数据系统经过规则处理后流向设备。当有规则和数据集成发生变化时,刷新该页面就能看到最新的状态。

点击新建 Flow 按钮,可进入 Flow 创建页面进行可视化配置。通过拖拽的方式选择每个步骤所需的节点,并将节点连线即可构建数据处理流程。

drag_node

Source

输入动作支持从消息、事件或外部数据系统中获取数据。每个 Flow 至少需要一个 Source 节点,且支持多个数据输入节点。当前支持的 Source 节点包括:

Processing

规则处理使用函数和过滤器节点对数据进行处理和筛选。这一步是可选的,每个 Flow 最多可包含一个函数和一个过滤节点:

  • 数据处理:支持规则引擎的 SQL 内置函数,详见内置 SQL 函数
  • 过滤器:支持对数据字段进行比较筛选,支持的运算符包括 >, <, <=, >=, <>, !=, =, =~

除了可视化表单编辑外,数据处理节点还支持切换到表达式模式,使用规则 SQL 语法进行编辑。过滤节点必须连接在数据处理节点之后,即数据必须先经过处理,然后才能进行过滤。

Sink

输出动作将数据源以及处理结果输出到特定节点或外部数据系统中。同一个 Flow 中至少包含一个 Sink 节点,支持的 Sink 节点包括:

  • 消息重发布:将消息发布到本地指定 MQTT 主题。
  • 外部数据系统:支持 40 余种数据系统,如 MySQL 和 Kafka,具体参考数据集成

Flow 编辑与测试

Flow 创建时系统将随机产生一个编号,点击编号旁边的编辑图标可以修改 Flow 的名称和描述。

将鼠标移动到 Flow 中的节点上,点击节点右上角的删除图标便可以删除该节点。点击节点可以进入编辑模式,您可以修改此节点中配置的信息并保存修改,最后点击保存以保存整个 Flow。点击开始测试按钮可以输入模拟数据或通过真实客户端测试 Flow 是否能正确执行。

特性与优势

Flow 设计器是一个功能丰富且易于使用的工具,有助于用户更加高效地处理和集成数据,推动业务创新,并提高数据管理的可视性和控制性。其主要特点和优势包括:

  • 直观的可视化界面: 用户可以利用直观的拖拽和连接功能,轻松创建、调整和定制数据处理流程,即使没有编程经验的用户也可以快速上手,设计复杂的数据集成逻辑。
  • 快速实时处理: 用户能够在几分钟内建立消息和事件的实时处理流程,有助于企业更迅速地响应不断涌现的数据和事件,支持实时业务需求。
  • 广泛的集成能力: 支持与超过 40 种数据系统无缝集成,为用户提供了灵活的数据连接和交换选项。
  • 一体化管理和监控: 用户可以通过统一的视图清晰地管理整个数据集成流程,了解每个处理节点的状态和性能,有助于实时监控和追踪数据流,以确保数据的高可靠性和完整性。
  • EMQX 数据处理能力: Flow 设计器底层利用规则 SQL 和 Sink/Source,继承了 EMQX 强大的数据处理和性能优势。用户可以根据需要在 UI 和 SQL 编辑器之间自由切换,从而在保留 SQL 编辑的灵活性的同时,提供了更简单、更快速的上手体验。这意味着用户无需深入学习 EMQX 规则 SQL 语法,也能够利用其强大的数据处理功能,实现业务创新和数据驱动决策。

快速体验

本节通过一个使用场景为您演示如何在 Flow 设计器中快速创建一个 Flow,并对创建的 Flow 进行测试。

该演示将创建一个数据处理流程用于接收设备温度过高的告警,通过 MQTT 主题接收设备温湿度传感器数据,设置数据过滤和转换规则,并在温度超过 40°C 时重发布告警信息到新主题 alert。最后,通过测试验证规则的有效性以及数据处理的结果。

场景描述

假设有一个设备里面包含温湿度传感器,每隔 5 秒发送一次数据到 MQTT 主题 sensor/temperature。EMQX Platform 规则引擎将处理这些数据,步骤包括:

  1. 数据过滤:只处理温度大于 40°C 的数据。
  2. 数据转换:
    • 提取设备 ID。
    • 提取温度信息。
    • 使用内置函数将 payload 中的时间戳信息转换为日期。
  3. 消息重发布:将处理后的数据整理为一条告警信息发布到新的主题 alert

重发布的数据样例:

json
{
  "device_id": "device123",
  "temperature": 22.5,
  "humidity": 60
}

创建 Flow

  1. 点击 Flow 设计器页面上的创建 Flow 按钮。

  2. Source 中拖动一个消息节点到画布上,并配置一个消息源主题,例如 sensor/temperature,然后点击保存。这一步通过主题指定客户端发布的消息来源。

    message_node

  3. Processing 中拖动一个数据处理节点到画布上,并配置数据处理规则,从消息中提取以下字段:

    • payload.device_id:设置别名为 device_id
    • payload.temperature:设置别名为 temperature
    • timestamp:使用时间与日期函数中的 format_date 函数将发布过来的消息时间戳数值转换为可读的日期时间格式。设置别名为 date
      • 时间单位:选择 milisecond
      • 时区偏移:输入 +08:00
      • 日期格式:输入 %Y-%m-%d %H:%M:%S.%6N%z。参考日期与时间转换函数
      • 时间戳:输入 timestamp

    完成后点击保存

    data_processing_node

  4. Processing 中再拖动一个过滤器节点,并配置一个过滤条件以实现一个数据过滤规则。添加一个过滤项并输入 payload.temperature,选择操作符 >=,输入 40,然后点击保存

    filter_rule

  5. Sink 中选择一个消息重新发布节点,并配置转发消息的主题,这里设置为 alert。通过设置以下 payload 将处理和转换后的数据整理为一条告警信息:

    bash
    ${device_id} device reported a high temperature of ${temperature}°C at ${date}.

    点击保存

    republish_node

  6. 您可以在页面上看到新建的 Flow。击页面右上角的保存以保存 Flow。

    flow_created

    Flow 和表单规则是互通的,您也可以在数据集成页面的规则区域看到刚才创建的规则。点击规则 ID 可以查看 规则 SQL 和相关配置。

    rule_in_sql_editor

测试 Flow

  1. 在 Flow 设计器中,点击任意节点会打开编辑面板,点击面板底部的编辑 Flow 按钮。

  2. 点击保存按钮旁的开始测试,将出现一个底部测试面板。您可以点击输入模拟数据,在弹出的页面中输入模拟数据,也可以直接使用一个真实的客户端发布消息来查看结果。该演示将使用 MQTTX,发布一条真实数据。

    start_test

  3. 打开 MQTTX Web,点击新建连接创建一个客户端连接作为发布者,并配置以下字段:

    • 名称:输入 device1
    • 服务器地址:在部署的概览 -> MQTT 连接信息区域找到连接地址并输入。
    • 端口:输入 8084
    • 用户名密码:在部署的访问控制 -> 认证中配置认证信息,并在此输入对应的认证信息。

    其余设置保持默认,点击连接

  4. 创建一个新的订阅,并将主题设置为 alert

  5. 发布一条温度低于 40°C 的消息。可以看到消息没有达到条件,规则 SQL 没有执行结果。

    message_publish_1

  6. 发布一条温度高于 40°C 的消息。可以看到 alert 主题接收到了告警信息。

    message_publish_2

  7. 回到测试页面,同样可以查看到测试结果成功。

    test_success

    如果测试失败,将显示相应的错误信息。

    test_fail