Skip to content

DolphinDB 数据集成

本文介绍如何将 FlowMQ Stream 中的数据实时写入 DolphinDB。

适用场景

  • 金融行情实时写入与分析
  • IoT 设备时序数据落库
  • 高吞吐指标数据聚合

前提条件

  • 已完成基础数据流准备,参考 数据集成总览
  • DolphinDB 服务可访问
  • 已具备目标库与表的写入权限

配置步骤

  1. 在“数据管道”模块点击“创建管道”,填写名称(如 dolphindb管道),方向选择下行,任务数按需设置。 dolphin第一步

  2. 选择数据流(如 flowmq/mqtt/kafka),点击“继续”。 dolphin第二步

  3. 选择“dolphindb”卡片,填写连接参数:

参数
url192.168.55.31:8848
usernameadmin
password123456
directorydfs://cydb
table${! metadata("kafka_topic").re_replace_all("[^a-zA-Z0-9_]", "_") }
  1. 配置 columns 字段映射:
namedata_typeis_partitionis_sortvalue
mpCodeSTRINGfalsetrue${! json("source") }
valueTimeTIMESTAMPtruefalse${! json("data.timestamp") }
valueFLOATfalsefalse${! json("data.value") }
unitCodeSTRINGfalsefalse${! json("type") }
createTimeTIMESTAMPfalsefalse${! json("time") }
  1. 点击“测试”验证目标连接。 dolphin第三步

  2. 点击“继续”,确认配置并创建 Pipeline。

  3. 在管道列表点击“开始”运行。运行后可在 DolphinDB 中查看写入结果(例如表 flowmq_mqtt_kafka)。

参数与表达式说明

  • tablemetadata("kafka_topic").re_replace_all("[^a-zA-Z0-9_]", "_") 将消息所属主题中的非法字符(如 /-)替换为下划线,生成合法表名。

  • mpCodejson("source") 提取设备标识。

  • valueTimejson("data.timestamp") 提取时序时间戳。

  • valuejson("data.value") 提取测点数值。

  • unitCodejson("type") 提取类型或单位代码。

  • createTimejson("time") 提取创建时间。

配置建议

  • 预先在 DolphinDB 侧规划分区字段和排序字段。
  • 表结构应与映射字段保持一致,避免类型转换错误。
  • 时序场景建议开启批量写入以提升吞吐。