# Flow 设计器

::: tip 注意

Flow 设计器不适用于 EMQX Serverless 版部署。

:::

Flow 设计器是一个强大的可视化工具，您可以用它来创建和编辑数据处理流程 (Flow) ，使数据处理和集成的配置变得更加简单和高效。您还可以对创建的数据处理流程进行测试。

在 Flow 设计器中创建的规则和在数据集成中创建的表单规则是互通的。这意味着，您可以在 Flow 设计器中创建规则，并在数据集成页面查看对应的规则 SQL 及相关配置；或者，您也可以在数据集成的 SQL 编辑器中创建规则，然后在 Flow 设计器中查看规则的数据流处理配置。

![flow_designer_overview](./_assets/flow_designer_overview.png)

## 主要功能

想要访问 Flow 设计器，您可以在 EMQX Cloud 控制台中进入您的部署，在左侧菜单中点击**数据集成** -> **Flow 设计器**。如果您已经创建了规则或数据集成，您可以看到由多个节点组成的有向无环图，每个节点代表一个数据处理步骤，例如从某个主题、事件或数据源（[Source](../data_integration/introduction.md#输入动作)）中读取数据、通过规则转换数据，再使用 [Sink](../data_integration/introduction.md#输出动作) 转发数据。

该页面展示了通过规则引擎和 Flow 设计器创建的所有数据处理流程。您可以直观地查看数据流向，即数据如何从设备或客户端经过规则处理后流向外部数据系统，或者从外部数据系统经过规则处理后流向设备。当有规则和数据集成发生变化时，刷新该页面就能看到最新的状态。

点击**新建 Flow** 按钮，可进入 Flow 创建页面进行可视化配置。通过拖拽的方式选择每个步骤所需的节点，并将节点连线即可构建数据处理流程。

![drag_node](./_assets/drag_node.png)

### Source

输入动作支持从消息、事件或外部数据系统中获取数据。每个 Flow 至少需要一个 Source 节点，且支持多个数据输入节点。当前支持的 Source 节点包括：

- **消息**：通过**主题**和**主题通配符**指定客户端发布的消息。
- **事件**：支持 EMQX 内的所有客户端事件，详见[客户端事件](../rule_engine/rule_engine_events.md#客户端事件)。
- **外部数据系统**：
  - [Kafka 消费者](../data_integration/kafka.md)
  - [MQTT 服务（源数据）](../data_integration/mqtt.md)

### Processing

规则处理使用函数和过滤器节点对数据进行处理和筛选。这一步是可选的，每个 Flow 最多可包含一个函数和一个过滤节点：

- **数据处理**：支持规则引擎的 SQL 内置函数，详见[内置 SQL 函数](../data_integration/rule-sql-builtin-functions.md)。
- **过滤器**：支持对数据字段进行比较筛选，支持的运算符包括 `>, <, <=, >=, <>, !=, =, =~`。

除了可视化表单编辑外，数据处理节点还支持切换到表达式模式，使用规则 SQL 语法进行编辑。过滤节点必须连接在数据处理节点之后，即数据必须先经过处理，然后才能进行过滤。

### Sink

输出动作将数据源以及处理结果输出到特定节点或外部数据系统中。同一个 Flow 中至少包含一个 Sink 节点，支持的 Sink 节点包括：

- **消息重发布**：将消息发布到本地指定 MQTT 主题。
- **外部数据系统**：支持 40 余种数据系统，如 MySQL 和 Kafka，具体参考[数据集成](../data_integration/introduction.md)。

### Flow 编辑与测试

Flow 创建时系统将随机产生一个编号，点击编号旁边的编辑图标可以修改 Flow 的名称和描述。

将鼠标移动到 Flow 中的节点上，点击节点右上角的删除图标便可以删除该节点。点击节点可以进入编辑模式，您可以修改此节点中配置的信息并保存修改，最后点击**保存**以保存整个 Flow。点击**开始测试**按钮可以输入模拟数据或通过真实客户端测试 Flow 是否能正确执行。

## 特性与优势

Flow 设计器是一个功能丰富且易于使用的工具，有助于用户更加高效地处理和集成数据，推动业务创新，并提高数据管理的可视性和控制性。其主要特点和优势包括：

- **直观的可视化界面：** 用户可以利用直观的拖拽和连接功能，轻松创建、调整和定制数据处理流程，即使没有编程经验的用户也可以快速上手，设计复杂的数据集成逻辑。
- **快速实时处理：** 用户能够在几分钟内建立消息和事件的实时处理流程，有助于企业更迅速地响应不断涌现的数据和事件，支持实时业务需求。
- **广泛的集成能力：** 支持与超过 40 种数据系统无缝集成，为用户提供了灵活的数据连接和交换选项。
- **一体化管理和监控：** 用户可以通过统一的视图清晰地管理整个数据集成流程，了解每个处理节点的状态和性能，有助于实时监控和追踪数据流，以确保数据的高可靠性和完整性。
- **EMQX 数据处理能力：** Flow 设计器底层利用规则 SQL 和 Sink/Source，继承了 EMQX 强大的数据处理和性能优势。用户可以根据需要在 UI 和 SQL 编辑器之间自由切换，从而在保留 SQL 编辑的灵活性的同时，提供了更简单、更快速的上手体验。这意味着用户无需深入学习 EMQX 规则 SQL 语法，也能够利用其强大的数据处理功能，实现业务创新和数据驱动决策。

## 快速体验

本节通过一个使用场景为您演示如何在 Flow 设计器中快速创建一个 Flow，并对创建的 Flow 进行测试。

该演示将创建一个数据处理流程用于接收设备温度过高的告警，通过 MQTT 主题接收设备温湿度传感器数据，设置数据过滤和转换规则，并在温度超过 40°C 时重发布告警信息到新主题 `alert`。最后，通过测试验证规则的有效性以及数据处理的结果。

### 场景描述

假设有一个设备里面包含温湿度传感器，每隔 5 秒发送一次数据到 MQTT 主题 `sensor/temperature`。EMQX Cloud 规则引擎将处理这些数据，步骤包括：

1. 数据过滤：只处理温度大于 40°C 的数据。
2. 数据转换：
   - 提取设备 ID。
   - 提取温度信息。
   - 使用内置函数将 payload 中的时间戳信息转换为日期。
3. 消息重发布：将处理后的数据整理为一条告警信息发布到新的主题 `alert`。

重发布的数据样例：

```json
{
  "device_id": "device123",
  "temperature": 22.5,
  "humidity": 60
}
```

### 创建 Flow

1. 点击 Flow 设计器页面上的**创建 Flow** 按钮。

2. 从 **Source** 中拖动一个**消息**节点到画布上，并配置一个消息源主题，例如 `sensor/temperature`，然后点击**保存**。这一步通过主题指定客户端发布的消息来源。

   ![message_node](./_assets/message_node.png)

3. 从 **Processing** 中拖动一个**数据处理**节点到画布上，并配置数据处理规则，从消息中提取以下字段：

   - `payload.device_id`：设置别名为 `device_id`。
   - `payload.temperature`：设置别名为 `temperature`。
   - `timestamp`：使用`时间与日期函数`中的 `format_date` 函数将发布过来的消息时间戳数值转换为可读的日期时间格式。设置别名为 `date`。
     - `时间单位`：选择 `milisecond`。
     - `时区偏移`：输入 `+08:00`。
     - `日期格式`：输入 `%Y-%m-%d %H:%M:%S.%6N%z`。参考[日期与时间转换函数](../data_integration/rule-sql-builtin-functions.md#format-date-unit-string-offset-string-integer-formatstring-string-time-integer-string)。
     - `时间戳`：输入 `timestamp`。

   完成后点击**保存**。

   ![data_processing_node](./_assets/data_processing_node.png)

4. 从 **Processing** 中再拖动一个**过滤器**节点，并配置一个过滤条件以实现一个数据过滤规则。添加一个过滤项并输入 `payload.temperature`，选择操作符 `>=`，输入 `40`，然后点击**保存**。

   ![filter_rule](./_assets/filter_rule.png)

5. 从 **Sink** 中选择一个**消息重新发布**节点，并配置转发消息的主题，这里设置为 `alert`。通过设置以下 payload 将处理和转换后的数据整理为一条告警信息：

   ```bash
   ${device_id} device reported a high temperature of ${temperature}°C at ${date}.
   ```

   点击**保存**。

   ![republish_node](./_assets/republish_node.png)

6. 您可以在页面上看到新建的 Flow。击页面右上角的**保存**以保存 Flow。

   ![flow_created](./_assets/flow_created.png)

   Flow 和表单规则是互通的，您也可以在数据集成页面的规则区域看到刚才创建的规则。点击规则 ID 可以查看 规则 SQL 和相关配置。

   ![rule_in_sql_editor](./_assets/rule_in_sql_editor.png)

### 测试 Flow

1. 在 Flow 设计器中，点击任意节点会打开编辑面板，点击面板底部的**编辑 Flow** 按钮。

2. 点击**保存**按钮旁的**开始测试**，将出现一个底部测试面板。您可以点击**输入模拟数据**，在弹出的页面中输入模拟数据，也可以直接使用一个真实的客户端发布消息来查看结果。该演示将使用 [MQTTX](https://mqttx.app/zh)，发布一条真实数据。

   ![start_test](./_assets/start_test.png)

3. 打开 [MQTTX Web](https://mqttx.app/web-client#/recent_connections)，点击**新建连接**创建一个客户端连接作为发布者，并配置以下字段：

   - **名称**：输入 `device1`。
   - **服务器地址**：在部署的**概览** -> **MQTT 连接信息**区域找到连接地址并输入。
   - **端口**：输入 `8084`。
   - **用户名**和**密码**：在部署的**访问控制** -> **认证**中配置认证信息，并在此输入对应的认证信息。

   其余设置保持默认，点击**连接**。

4. 创建一个新的订阅，并将主题设置为 `alert`。

5. 发布一条温度低于 40°C 的消息。可以看到消息没有达到条件，规则 SQL 没有执行结果。

   ![message_publish_1](./_assets/message_publish_1.png)

6. 发布一条温度高于 40°C 的消息。可以看到 `alert` 主题接收到了告警信息。

   ![message_publish_2](./_assets/message_publish_2.png)

7. 回到测试页面，同样可以查看到测试结果成功。

   ![test_success](./_assets/test_success.png)

   如果测试失败，将显示相应的错误信息。

   ![test_fail](./_assets/test_fail.png)
