---
title: 数据处理
---

# 数据处理

数据处理引擎用于对流式数据进行实时接入、转换与分发，实现**规则驱动的计算流水线**：从数据源接入、经 SQL 等逻辑处理、再输出到多种目标系统。

## 特点及优势

### 低延迟

提供低延迟的数据处理分析，能够更快速地将数据在多系统间传递，结合 AI/ML 算法，可以实现智能决策与控制。

### 完整的数据分析

- 数据抽取、转换和过滤
- 数据排序、分组、聚合、连接
- 丰富的内置函数，覆盖数学运算、字符串处理、聚合运算和哈希运算等
- 多类时间窗口，以及计数窗口

### AI/ML 集成

可通过扩展接入自定义函数、AI/ML 算法等，实现在源（Source）、函数（Function）、动作（Sink）三个方面的扩展：

- **源（Source）**：接入更多数据源用于数据分析
- **目标（Sink）**：将分析结果发送到不同的扩展系统
- **函数（Function）**：增加自定义函数用于数据分析（如 AI/ML 函数调用）

## 核心概念

### 源（Source）

源（Source）定义了与外部系统的连接方式，以便将数据加载进来。在规则中，根据数据使用逻辑，数据源可作为**流（Stream）**或**表（Table）**使用。

- **流（Stream）**：数据源接入的主要运行方式，用户通过选择数据源类型及配置参数定义如何连接外部资源。有数据流入时，会触发规则中的计算。
- **表（Table）**：用于表示流的当前状态，可视为流的快照，用于批处理。组合使用流与表可实现更多数据处理功能。
  - **扫描表**：在内存中保存状态数据，按事件驱动逐个加载数据。
  - **查询表**：绑定外部数据（如 SQL 数据库）并在需要时查询引用。

**源的解码**：创建源时可指定 `format` 定义解码方式，支持 `json`、`binary`、`protobuf`、`delimited` 等格式，也可使用自定义编码（`custom`）。

**源的定义与运行**：创建流的逻辑定义后，并未立即启动物理数据输入；该定义可在多条规则的 SQL `from` 子句中复用。仅当使用该定义的规则启动后，数据流才会真正运行。

常见数据源类型包括：MQTT、Kafka 等。

### 规则（Rule）

规则代表一条数据处理流程，定义从数据源输入、经处理逻辑、到输出至动作（Sink）的完整链路。

- **规则生命周期**：启动后持续运行，直至用户停止或异常退出。
- **规则隔离**：各规则独立运行，单条规则错误不影响其他规则；共享硬件资源，可通过算子缓冲区限制处理速度。
- **规则流水线**：多条规则可通过「内存 Source/Sink」或「MQTT Source/Sink」串联成处理管道。例如，第一条规则将结果写入内存 sink，后续规则从其内存源订阅并继续处理。
- **SQL 语句**：提供类 SQL 的查询语言，对数据流执行转换和计算，支持 DDL、DML 及查询，为 ANSI SQL 子集并带定制扩展。
- **规则调试**：开启调试后，可实时查看数据源接入、经 SQL 处理后的输出结果，便于验证 SQL、函数及数据模板是否符合预期。

### 动作（Sink）

- **动作 Sink**：向外部系统写入数据，一条规则可有多个动作，动作类型可重复。
- **数据模板**：对规则输出进行二次处理后再送入各 Sink，为可选项；不配置时，处理结果直接输出。
- **动作类型**：MQTT、REST、内存、Log、SQL、InfluxDB V1/V2、文件、Nop、Kafka、Image 等。

### 流式处理

流数据是不断增长的无界数据集，流处理对其做低延迟、近实时处理，适用于实时分析、实时计算、实时预测等场景。

- **有状态流处理**：计算保持上下文状态，例如聚合（总和、计数、平均值）、检测事件变化、在事件序列中搜索模式等。
- **窗口**：将无界数据分割为有界区间进行计算，支持：
  - 时间窗口：按时间分割
  - 计数窗口：按元素数量分割  
  时间窗口同时支持处理时间和事件时间。
- **多源连接**：将多个数据源合并计算，支持 LEFT、RIGHT、FULL、CROSS 等连接类型。
- **时间概念**：
  - **事件时间**：事件实际发生时间，通常由时间戳字段表示。
  - **处理时间**：系统观察到事件的时间。
- **事件时间与水印**：支持事件时间的引擎通过水印衡量事件时间进度。水印携带时间戳 t，表示该流中不应再有 t' ≤ t 的事件；用于触发窗口输出等逻辑。

### 扩展

支持用户自定义扩展以接入 AI/ML 等能力，可通过插件系统编写扩展，或调用外部 REST 服务作为函数及算法扩展。

### 配置

支持资源配置、模式配置等，用于调整数据处理模块的运行参数。

## 操作示例

本示例演示如何对 MQTT 主题上的 JSON 数据进行转换后发送到另一主题。假设设备向 `flowmq/data/raw` 上报如下格式的数据，目标是将 `data.value` 转为数值并加 1 后写入 `flowmq/data/adjusted`。

### 示例数据

```json
{
    "id": "e6eb1d31-5ab7-4ace-8bf7-e7c3e7a30a73",
    "source": "1SDA526VD_POP_P1",
    "time": "2023-05-30T15:23:23.123+08:00",
    "type": "1",
    "data": {
        "value": "280.44",
        "quality": "0",
        "timestamp": 1710400220000
    }
}
```

### 1. 创建开发任务

1.1 在数据处理模块中依次点击“开发任务” -> “创建开发任务”, 在弹出框中输入开发任务名称以及可选的描述，点击“创建”按钮。 
![创建开发任务](.assets/data-processing1_cn.png)

1.2 此时我们会在开发任务列表中看到刚刚创建的开发任务项。
![开发任务列表](.assets/data-processing2_cn.png)

1.3 在开发任务项右侧操作菜单中选择“配置”进入配置页面， 选择配置页面的“连接器”Tab，然后点击“添加连接器”按钮,并在接下来的页面中添加连接器名称——mqttconnector1，选择“MQTT”卡片，点击“下一步”按钮，填写连接器配置表单，测试并提交。
![连接器表单](.assets/data-processing3_cn.png)

### 2. 创建数据源

2.1 回到开发任务列表，在操作菜单中选择“源管理”，并且点击“创建流”按钮，选择“MQTT”卡片后点击“下一步”按钮进入流配置表单页面，填写流名称为“mqttstream1”，数据源填写为“flowmq/data/raw”， 

2.2 点击“添加配置组”按钮，在弹出的表单中填写配置组名称为“mqttconf1”，连接器下拉列表中选择之前创建的“mqttconnector1”，然后点击“提交”按钮后回到创建流表单中
![创建流表单](.assets/data-processing4_cn.png)


2.3 点击“提交”按钮，生成数据源。

### 3. 新建规则

3.1 回到开发任务列表，在操作菜单中选择“规则”，点击“创建规则”按钮，编写 SQL 对 `data.value` 做类型转换与 +1 运算并输出：

```sql
SELECT
  id,
  source,
  time,
  type,
  cast(data.value,"float") + 1 AS adjusted_value,
  data.quality,
  data.timestamp
FROM mqttstream1
```

3.2 点击右侧“添加动作”按钮，选择“MQTT”卡片并点击“下一步”按钮进入动作配置表单，选择连接器“mqttconnector1”，主题填写为`flowmq/data/adjusted`，点击“提交”按钮保存配置。
![动作配置](.assets/data-processing5_cn.png)

3.3 点击“保存规则”按钮保存规则。
![规则配置](.assets/data-processing6_cn.png)

### 4. 部署任务
4.1 回到开发任务列表，在操作菜单中选择“停止”，然后继续在该菜单中选择“部署”选项，在弹出对话框中填写部署名称和运行的实例数量，点击“部署”按钮，完成规则的部署。
![部署任务](.assets/data-processing7_cn.png)

### 5. 启动并验证

5.1 进入“数据处理\部署管理”页面，在部署管理列表的操作菜单中选择“启动”。

5.2 启动部署后，向 `flowmq/data/raw` 发布上述示例数据，订阅 `flowmq/data/adjusted` 即可收到处理后的结果：

```json
[{
    "id": "e6eb1d31-5ab7-4ace-8bf7-e7c3e7a30a73",
    "source": "1SDA526VD_POP_P1",
    "time": "2023-05-30T15:23:23.123+08:00",
    "type": "1",
    "adjusted_value": 281.44,
    "quality": "0",
    "timestamp": 1710400220000
}]
```

5.3 在部署管理列表的操作菜单中选择“状态”可查看部署实例的指标信息。


## 相关文档

- [数据流](./streaming) — 追加写日志持久化，支持回放消费与 Kafka 协议接入
- [数据集成](./data-integration) — 数据管道与外部系统集成
