数据集成
FlowMQ 提供数据集成能力,支持将 Stream 中的数据实时分发到外部系统(数据库、数据湖、云服务等),也支持将外部系统的数据实时导入到 Stream 中。每一个数据集成任务称为一个 Data Pipeline。
什么是 Data Pipeline
Data Pipeline 是 FlowMQ 中数据集成的基本单元。每个 Pipeline 定义了一条数据流转链路,包含:
- 数据源(Source):数据的来源,可以是 FlowMQ Stream 或外部系统
- 数据目标(Sink):数据的去向,可以是外部系统或 FlowMQ Stream
- 数据转换(Transform):可选的中间处理步骤,用于对数据进行过滤、格式转换或字段映射
用户可以创建多个 Pipeline,各 Pipeline 独立运行、互不影响。
数据流向
Stream → 外部系统
将 Stream 中的数据实时同步到外部存储或服务,常见目标包括:
- 数据库:PostgreSQL、MySQL、MongoDB、ClickHouse、InfluxDB 等
- 数据湖 / 数据仓库:Snowflake、Databricks、BigQuery、S3 (Parquet/ORC) 等
- 搜索与分析:Elasticsearch、OpenSearch 等
- 应用与服务:Webhook、HTTP API、Slack、邮件通知等
外部系统 → Stream
将外部数据源的变更实时捕获并写入 Stream,常见来源包括:
- 数据库 CDC:捕获 PostgreSQL、MySQL 等数据库的行级变更
- API 轮询:定期拉取外部 REST API 的数据
- 文件监听:监控对象存储或文件系统中的新文件
典型用例
- IoT 数据入湖:设备遥测数据经 Stream 实时写入数据湖,供离线分析与 BI 报表使用
- 实时数据同步:将业务事件从 Stream 同步到 Elasticsearch,支撑全文检索与实时仪表盘
- 数据库变更分发:通过 CDC 捕获数据库变更写入 Stream,下游多个系统按需消费
- 跨系统数据管道:将外部系统的数据导入 Stream,经转换后再分发到其他目标系统
注意事项
- Pipeline 独立性:各 Pipeline 独立运行,单个 Pipeline 的故障或暂停不影响其他 Pipeline
- 数据顺序:Pipeline 保持 Stream 分区内的消息顺序,跨分区不保证全局有序
- 幂等写入:目标系统支持幂等时建议开启,避免重试导致数据重复
Bloblang 插值表达式
FlowMQ 数据管道配置中支持使用 Bloblang 插值表达式 动态引用消息内容、元数据,并做字符串变换。语法为 ${! <bloblang 表达式> },在运行时按每条消息求值,将结果填入对应配置项。
常用函数
| 函数 | 说明 | 示例 |
|---|---|---|
json("path") | 从消息体的 JSON 中提取字段,支持点号路径 | ${! json("data.value") } 取 data.value |
metadata("key") | 从消息元数据中提取,如 kafka_topic、partition 等 | ${! metadata("kafka_topic") } |
方法链
Bloblang 支持方法链,可对返回值进一步处理,例如:
re_replace_all(pattern, replacement):替换匹配字符。metadata("kafka_topic").re_replace_all("[^a-zA-Z0-9_]", "_")将 topic 中非法字符替换为下划线re_replace_all("/", "."):将路径分隔符/替换为.,常用于构建 IoTDB 等时序库的路径
字面量转义
若配置中需要输出字面量 ${!foo} 而不是插值结果,使用双花括号转义:$true。
连接实例
准备工作
在“数据流”模块中点击“创建数据流”按钮,数据流名称为
flowmq.mqtt.kafka,主题过滤器设置为flowmq/mqtt/kafka,点击“创建数据流”按钮。
使用 MQTT 客户端软件,publish topic设置为
flowmq/mqtt/kafka,需要发送的数据如下:
{
"id": "e6eb1d31-5ab7-4ace-8bf7-e7c3e7a30a73",
"source": "1SDA526VD_POP_P1",
"time": "2023-05-30T15:23:23.123+08:00",
"type": "1",
"data": {
"value": "280.44",
"quality": "0",
"timestamp": 1710400220000
}
}DolphinDB 实例
将 Stream 中的时序数据实时写入 DolphinDB,适用于金融行情、IoT 传感器等场景的实时分析。
在“数据管道”模块中点击“创建管道”按钮,在第一步的页面中填写管道名称
dolphindb管道,方向选择下行,任务数为1,点击“继续”按钮进入下一步。
选择数据流
flowmq/mqtt/kafka,点击“继续”按钮进入下一步。
选择“dolphindb”卡片,填写连接配置,连接参数如下:
| 参数 | 值 |
|---|---|
| url | 192.168.55.31:8848 |
| username | admin |
| password | 123456 |
| directory | dfs://cydb |
| table | ${! metadata("kafka_topic").re_replace_all("[^a-zA-Z0-9_]", "_") } |
columns 设置如下:
| name | data_type | is_partition | is_sort | value |
|---|---|---|---|---|
| mpCode | STRING | false | true | ${! json("source") } |
| valueTime | TIMESTAMP | true | false | ${! json("data.timestamp") } |
| value | FLOAT | false | false | ${! json("data.value") } |
| unitCode | STRING | false | false | ${! json("type") } |
| createTime | TIMESTAMP | false | false | ${! json("time") } |
插值表达式说明:
- table:
metadata("kafka_topic").re_replace_all("[^a-zA-Z0-9_]", "_")— 取消息所属 Kafka 主题名,将非法字符(如/、-)替换为下划线,得到合法表名。例如flowmq/mqtt/kafka→flowmq_mqtt_kafka - mpCode:
json("source")— 从消息体中取source字段(设备标识) - valueTime:
json("data.timestamp")— 取data.timestamp作为时序时间戳 - value:
json("data.value")— 取data.value作为测点数值 - unitCode:
json("type")— 取type作为单位/类型代码 - createTime:
json("time")— 取time作为创建时间
点击“测试”测试目标数据连接是否正常,点击“继续”进入下一步。 
- 确认设置信息,点击“创建”可创建pipeline。
配置要点:
- 在数据集成模块中创建 Pipeline,选择 FlowMQ 作为 Source、DolphinDB 作为 Sink
- 确保 DolphinDB 中已创建目标表,表结构与
id、source、time、data.value等字段对应 - 时序场景建议配置批量写入以提升吞吐
- 在数据管道列表中点击操作栏“开始”按钮,即可运行该数据处理管道,在dolphindb数据库中可以查看到
flowmq_mqtt_kafka表中有数据存入。
IoTDB 实例
将 Stream 中的设备遥测数据实时写入 Apache IoTDB,适用于工业物联网、设备监控等场景。 IoTDB 的 stream 选择与 DolphinDB 类似,可以参考 DolphinDB 的配置过程。
在“数据管道”模块中点击“创建管道”按钮,在第一步的页面中填写管道名称
iotdb管道,方向选择下行,任务数为1,点击“继续”按钮进入下一步。选择数据流
flowmq/mqtt/kafka,点击“继续”按钮进入下一步。选择“iotdb”卡片,填写连接配置,连接参数如下:
| 参数 | 值 |
|---|---|
| urls | 192.168.55.31:6667 |
| username | root |
| password | root |
| device_id | root.ln.${! metadata("kafka_topic").re_replace_all("/", ".") } |
columns 设置如下:
| timestamp | measurement | data_type | value |
|---|---|---|---|
${! json("data.timestamp") } | ${! json("source") } | FLOAT | ${! json("data.value") } |
插值表达式说明:
- device_id:
root.ln.${! metadata("kafka_topic").re_replace_all("/", ".") }— 将主题中的/替换为.,构建 IoTDB 设备路径。例如flowmq/mqtt/kafka→root.ln.flowmq.mqtt.kafka - timestamp:
json("data.timestamp")— 取data.timestamp作为时序时间戳 - measurement:
json("source")— 取source作为测点名称(如传感器标识) - value:
json("data.value")— 取data.value作为测点数值
点击“测试”测试目标数据连接是否正常,点击“继续”进入下一步。 
确认设置信息,点击“创建”可创建pipeline。
在数据管道列表中点击操作栏“开始”按钮,即可运行该数据处理管道,在iotdb数据库中可以查看到
flowmq.mqtt.kafka表中有数据存入。