Skip to content

数据集成

FlowMQ 提供数据集成能力,支持将 Stream 中的数据实时分发到外部系统(数据库、数据湖、云服务等),也支持将外部系统的数据实时导入到 Stream 中。每一个数据集成任务称为一个 Data Pipeline

什么是 Data Pipeline

Data Pipeline 是 FlowMQ 中数据集成的基本单元。每个 Pipeline 定义了一条数据流转链路,包含:

  • 数据源(Source):数据的来源,可以是 FlowMQ Stream 或外部系统
  • 数据目标(Sink):数据的去向,可以是外部系统或 FlowMQ Stream
  • 数据转换(Transform):可选的中间处理步骤,用于对数据进行过滤、格式转换或字段映射

用户可以创建多个 Pipeline,各 Pipeline 独立运行、互不影响。

数据流向

Stream → 外部系统

将 Stream 中的数据实时同步到外部存储或服务,常见目标包括:

  • 数据库:PostgreSQL、MySQL、MongoDB、ClickHouse、InfluxDB 等
  • 数据湖 / 数据仓库:Snowflake、Databricks、BigQuery、S3 (Parquet/ORC) 等
  • 搜索与分析:Elasticsearch、OpenSearch 等
  • 应用与服务:Webhook、HTTP API、Slack、邮件通知等

外部系统 → Stream

将外部数据源的变更实时捕获并写入 Stream,常见来源包括:

  • 数据库 CDC:捕获 PostgreSQL、MySQL 等数据库的行级变更
  • API 轮询:定期拉取外部 REST API 的数据
  • 文件监听:监控对象存储或文件系统中的新文件

典型用例

  • IoT 数据入湖:设备遥测数据经 Stream 实时写入数据湖,供离线分析与 BI 报表使用
  • 实时数据同步:将业务事件从 Stream 同步到 Elasticsearch,支撑全文检索与实时仪表盘
  • 数据库变更分发:通过 CDC 捕获数据库变更写入 Stream,下游多个系统按需消费
  • 跨系统数据管道:将外部系统的数据导入 Stream,经转换后再分发到其他目标系统

注意事项

  • Pipeline 独立性:各 Pipeline 独立运行,单个 Pipeline 的故障或暂停不影响其他 Pipeline
  • 数据顺序:Pipeline 保持 Stream 分区内的消息顺序,跨分区不保证全局有序
  • 幂等写入:目标系统支持幂等时建议开启,避免重试导致数据重复

Bloblang 插值表达式

FlowMQ 数据管道配置中支持使用 Bloblang 插值表达式 动态引用消息内容、元数据,并做字符串变换。语法为 ${! <bloblang 表达式> },在运行时按每条消息求值,将结果填入对应配置项。

常用函数

函数说明示例
json("path")从消息体的 JSON 中提取字段,支持点号路径${! json("data.value") }data.value
metadata("key")从消息元数据中提取,如 kafka_topicpartition${! metadata("kafka_topic") }

方法链

Bloblang 支持方法链,可对返回值进一步处理,例如:

  • re_replace_all(pattern, replacement):替换匹配字符。metadata("kafka_topic").re_replace_all("[^a-zA-Z0-9_]", "_") 将 topic 中非法字符替换为下划线
  • re_replace_all("/", "."):将路径分隔符 / 替换为 .,常用于构建 IoTDB 等时序库的路径

字面量转义

若配置中需要输出字面量 ${!foo} 而不是插值结果,使用双花括号转义:$true

连接实例

准备工作

  1. 在“数据流”模块中点击“创建数据流”按钮,数据流名称为flowmq.mqtt.kafka,主题过滤器设置为flowmq/mqtt/kafka,点击“创建数据流”按钮。 创建流

  2. 使用 MQTT 客户端软件,publish topic设置为flowmq/mqtt/kafka,需要发送的数据如下:

json
{
    "id": "e6eb1d31-5ab7-4ace-8bf7-e7c3e7a30a73",
    "source": "1SDA526VD_POP_P1",
    "time": "2023-05-30T15:23:23.123+08:00",
    "type": "1",
    "data": {
        "value": "280.44",
        "quality": "0",
        "timestamp": 1710400220000
    }
}

DolphinDB 实例

将 Stream 中的时序数据实时写入 DolphinDB,适用于金融行情、IoT 传感器等场景的实时分析。

  1. 在“数据管道”模块中点击“创建管道”按钮,在第一步的页面中填写管道名称dolphindb管道,方向选择下行,任务数为1,点击“继续”按钮进入下一步。 dolphin第一步

  2. 选择数据流flowmq/mqtt/kafka,点击“继续”按钮进入下一步。 dolphin第二步

  3. 选择“dolphindb”卡片,填写连接配置,连接参数如下:

参数
url192.168.55.31:8848
usernameadmin
password123456
directorydfs://cydb
table${! metadata("kafka_topic").re_replace_all("[^a-zA-Z0-9_]", "_") }

columns 设置如下:

namedata_typeis_partitionis_sortvalue
mpCodeSTRINGfalsetrue${! json("source") }
valueTimeTIMESTAMPtruefalse${! json("data.timestamp") }
valueFLOATfalsefalse${! json("data.value") }
unitCodeSTRINGfalsefalse${! json("type") }
createTimeTIMESTAMPfalsefalse${! json("time") }

插值表达式说明

  • tablemetadata("kafka_topic").re_replace_all("[^a-zA-Z0-9_]", "_") — 取消息所属 Kafka 主题名,将非法字符(如 /-)替换为下划线,得到合法表名。例如 flowmq/mqtt/kafkaflowmq_mqtt_kafka
  • mpCodejson("source") — 从消息体中取 source 字段(设备标识)
  • valueTimejson("data.timestamp") — 取 data.timestamp 作为时序时间戳
  • valuejson("data.value") — 取 data.value 作为测点数值
  • unitCodejson("type") — 取 type 作为单位/类型代码
  • createTimejson("time") — 取 time 作为创建时间

点击“测试”测试目标数据连接是否正常,点击“继续”进入下一步。 dolphin第三步

  1. 确认设置信息,点击“创建”可创建pipeline。

配置要点

  • 在数据集成模块中创建 Pipeline,选择 FlowMQ 作为 Source、DolphinDB 作为 Sink
  • 确保 DolphinDB 中已创建目标表,表结构与 idsourcetimedata.value 等字段对应
  • 时序场景建议配置批量写入以提升吞吐
  1. 在数据管道列表中点击操作栏“开始”按钮,即可运行该数据处理管道,在dolphindb数据库中可以查看到flowmq_mqtt_kafka表中有数据存入。

IoTDB 实例

将 Stream 中的设备遥测数据实时写入 Apache IoTDB,适用于工业物联网、设备监控等场景。 IoTDB 的 stream 选择与 DolphinDB 类似,可以参考 DolphinDB 的配置过程。

  1. 在“数据管道”模块中点击“创建管道”按钮,在第一步的页面中填写管道名称iotdb管道,方向选择下行,任务数为1,点击“继续”按钮进入下一步。

  2. 选择数据流flowmq/mqtt/kafka,点击“继续”按钮进入下一步。

  3. 选择“iotdb”卡片,填写连接配置,连接参数如下:

参数
urls192.168.55.31:6667
usernameroot
passwordroot
device_idroot.ln.${! metadata("kafka_topic").re_replace_all("/", ".") }

columns 设置如下:

timestampmeasurementdata_typevalue
${! json("data.timestamp") }${! json("source") }FLOAT${! json("data.value") }

插值表达式说明

  • device_idroot.ln.${! metadata("kafka_topic").re_replace_all("/", ".") } — 将主题中的 / 替换为 .,构建 IoTDB 设备路径。例如 flowmq/mqtt/kafkaroot.ln.flowmq.mqtt.kafka
  • timestampjson("data.timestamp") — 取 data.timestamp 作为时序时间戳
  • measurementjson("source") — 取 source 作为测点名称(如传感器标识)
  • valuejson("data.value") — 取 data.value 作为测点数值

点击“测试”测试目标数据连接是否正常,点击“继续”进入下一步。 iotdb第三步

  1. 确认设置信息,点击“创建”可创建pipeline。

  2. 在数据管道列表中点击操作栏“开始”按钮,即可运行该数据处理管道,在iotdb数据库中可以查看到flowmq.mqtt.kafka表中有数据存入。