Flow 设计器
注意
Flow 设计器不适用于 EMQX Serverless 版部署。
Flow 设计器是一个强大的可视化工具,您可以用它来创建和编辑数据处理流程 (Flow) ,使数据处理和集成的配置变得更加简单和高效。您还可以对创建的数据处理流程进行测试。
在 Flow 设计器中创建的规则和在数据集成中创建的表单规则是互通的。这意味着,您可以在 Flow 设计器中创建规则,并在数据集成页面查看对应的规则 SQL 及相关配置;或者,您也可以在数据集成的 SQL 编辑器中创建规则,然后在 Flow 设计器中查看规则的数据流处理配置。
主要功能
想要访问 Flow 设计器,您可以在 EMQX Platform 控制台中进入您的部署,在左侧菜单中点击数据集成 -> Flow 设计器。如果您已经创建了规则或数据集成,您可以看到由多个节点组成的有向无环图,每个节点代表一个数据处理步骤,例如从某个主题、事件或数据源(Source)中读取数据、通过规则转换数据,再使用 Sink 转发数据。
该页面展示了通过规则引擎和 Flow 设计器创建的所有数据处理流程。您可以直观地查看数据流向,即数据如何从设备或客户端经过规则处理后流向外部数据系统,或者从外部数据系统经过规则处理后流向设备。当有规则和数据集成发生变化时,刷新该页面就能看到最新的状态。
点击新建 Flow 按钮,可进入 Flow 创建页面进行可视化配置。通过拖拽的方式选择每个步骤所需的节点,并将节点连线即可构建数据处理流程。
Source
输入动作支持从消息、事件或外部数据系统中获取数据。每个 Flow 至少需要一个 Source 节点,且支持多个数据输入节点。当前支持的 Source 节点包括:
- 消息:通过主题和主题通配符指定客户端发布的消息。
- 事件:支持 EMQX 内的所有客户端事件,详见客户端事件。
- 外部数据系统:
Processing
规则处理使用函数和过滤器节点对数据进行处理和筛选。这一步是可选的,每个 Flow 最多可包含一个函数和一个过滤节点:
- 数据处理:支持规则引擎的 SQL 内置函数,详见内置 SQL 函数。
- 过滤器:支持对数据字段进行比较筛选,支持的运算符包括
>, <, <=, >=, <>, !=, =, =~
。
除了可视化表单编辑外,数据处理节点还支持切换到表达式模式,使用规则 SQL 语法进行编辑。过滤节点必须连接在数据处理节点之后,即数据必须先经过处理,然后才能进行过滤。
Sink
输出动作将数据源以及处理结果输出到特定节点或外部数据系统中。同一个 Flow 中至少包含一个 Sink 节点,支持的 Sink 节点包括:
- 消息重发布:将消息发布到本地指定 MQTT 主题。
- 外部数据系统:支持 40 余种数据系统,如 MySQL 和 Kafka,具体参考数据集成。
Flow 编辑与测试
Flow 创建时系统将随机产生一个编号,点击编号旁边的编辑图标可以修改 Flow 的名称和描述。
将鼠标移动到 Flow 中的节点上,点击节点右上角的删除图标便可以删除该节点。点击节点可以进入编辑模式,您可以修改此节点中配置的信息并保存修改,最后点击保存以保存整个 Flow。点击开始测试按钮可以输入模拟数据或通过真实客户端测试 Flow 是否能正确执行。
特性与优势
Flow 设计器是一个功能丰富且易于使用的工具,有助于用户更加高效地处理和集成数据,推动业务创新,并提高数据管理的可视性和控制性。其主要特点和优势包括:
- 直观的可视化界面: 用户可以利用直观的拖拽和连接功能,轻松创建、调整和定制数据处理流程,即使没有编程经验的用户也可以快速上手,设计复杂的数据集成逻辑。
- 快速实时处理: 用户能够在几分钟内建立消息和事件的实时处理流程,有助于企业更迅速地响应不断涌现的数据和事件,支持实时业务需求。
- 广泛的集成能力: 支持与超过 40 种数据系统无缝集成,为用户提供了灵活的数据连接和交换选项。
- 一体化管理和监控: 用户可以通过统一的视图清晰地管理整个数据集成流程,了解每个处理节点的状态和性能,有助于实时监控和追踪数据流,以确保数据的高可靠性和完整性。
- EMQX 数据处理能力: Flow 设计器底层利用规则 SQL 和 Sink/Source,继承了 EMQX 强大的数据处理和性能优势。用户可以根据需要在 UI 和 SQL 编辑器之间自由切换,从而在保留 SQL 编辑的灵活性的同时,提供了更简单、更快速的上手体验。这意味着用户无需深入学习 EMQX 规则 SQL 语法,也能够利用其强大的数据处理功能,实现业务创新和数据驱动决策。
快速体验
本节通过一个使用场景为您演示如何在 Flow 设计器中快速创建一个 Flow,并对创建的 Flow 进行测试。
该演示将创建一个数据处理流程用于接收设备温度过高的告警,通过 MQTT 主题接收设备温湿度传感器数据,设置数据过滤和转换规则,并在温度超过 40°C 时重发布告警信息到新主题 alert
。最后,通过测试验证规则的有效性以及数据处理的结果。
场景描述
假设有一个设备里面包含温湿度传感器,每隔 5 秒发送一次数据到 MQTT 主题 sensor/temperature
。EMQX Platform 规则引擎将处理这些数据,步骤包括:
- 数据过滤:只处理温度大于 40°C 的数据。
- 数据转换:
- 提取设备 ID。
- 提取温度信息。
- 使用内置函数将 payload 中的时间戳信息转换为日期。
- 消息重发布:将处理后的数据整理为一条告警信息发布到新的主题
alert
。
重发布的数据样例:
{
"device_id": "device123",
"temperature": 22.5,
"humidity": 60
}
创建 Flow
点击 Flow 设计器页面上的创建 Flow 按钮。
从 Source 中拖动一个消息节点到画布上,并配置一个消息源主题,例如
sensor/temperature
,然后点击保存。这一步通过主题指定客户端发布的消息来源。从 Processing 中拖动一个数据处理节点到画布上,并配置数据处理规则,从消息中提取以下字段:
payload.device_id
:设置别名为device_id
。payload.temperature
:设置别名为temperature
。timestamp
:使用时间与日期函数
中的format_date
函数将发布过来的消息时间戳数值转换为可读的日期时间格式。设置别名为date
。时间单位
:选择milisecond
。时区偏移
:输入+08:00
。日期格式
:输入%Y-%m-%d %H:%M:%S.%6N%z
。参考日期与时间转换函数。时间戳
:输入timestamp
。
完成后点击保存。
从 Processing 中再拖动一个过滤器节点,并配置一个过滤条件以实现一个数据过滤规则。添加一个过滤项并输入
payload.temperature
,选择操作符>=
,输入40
,然后点击保存。从 Sink 中选择一个消息重新发布节点,并配置转发消息的主题,这里设置为
alert
。通过设置以下 payload 将处理和转换后的数据整理为一条告警信息:bash${device_id} device reported a high temperature of ${temperature}°C at ${date}.
点击保存。
您可以在页面上看到新建的 Flow。击页面右上角的保存以保存 Flow。
Flow 和表单规则是互通的,您也可以在数据集成页面的规则区域看到刚才创建的规则。点击规则 ID 可以查看 规则 SQL 和相关配置。
测试 Flow
在 Flow 设计器中,点击任意节点会打开编辑面板,点击面板底部的编辑 Flow 按钮。
点击保存按钮旁的开始测试,将出现一个底部测试面板。您可以点击输入模拟数据,在弹出的页面中输入模拟数据,也可以直接使用一个真实的客户端发布消息来查看结果。该演示将使用 MQTTX,发布一条真实数据。
打开 MQTTX Web,点击新建连接创建一个客户端连接作为发布者,并配置以下字段:
- 名称:输入
device1
。 - 服务器地址:在部署的概览 -> MQTT 连接信息区域找到连接地址并输入。
- 端口:输入
8084
。 - 用户名和密码:在部署的访问控制 -> 认证中配置认证信息,并在此输入对应的认证信息。
其余设置保持默认,点击连接。
- 名称:输入
创建一个新的订阅,并将主题设置为
alert
。发布一条温度低于 40°C 的消息。可以看到消息没有达到条件,规则 SQL 没有执行结果。
发布一条温度高于 40°C 的消息。可以看到
alert
主题接收到了告警信息。回到测试页面,同样可以查看到测试结果成功。
如果测试失败,将显示相应的错误信息。