DolphinDB 数据集成
本文介绍如何将 FlowMQ Stream 中的数据实时写入 DolphinDB。
适用场景
- 金融行情实时写入与分析
- IoT 设备时序数据落库
- 高吞吐指标数据聚合
前提条件
- 已完成基础数据流准备,参考 数据集成总览
- DolphinDB 服务可访问
- 已具备目标库与表的写入权限
配置步骤
在“数据管道”模块点击“创建管道”,填写名称(如
dolphindb管道),方向选择下行,任务数按需设置。
选择数据流(如
flowmq/mqtt/kafka),点击“继续”。
选择“dolphindb”卡片,填写连接参数:
| 参数 | 值 |
|---|---|
| url | 192.168.55.31:8848 |
| username | admin |
| password | 123456 |
| directory | dfs://cydb |
| table | ${! metadata("kafka_topic").re_replace_all("[^a-zA-Z0-9_]", "_") } |
- 配置
columns字段映射:
| name | data_type | is_partition | is_sort | value |
|---|---|---|---|---|
| mpCode | STRING | false | true | ${! json("source") } |
| valueTime | TIMESTAMP | true | false | ${! json("data.timestamp") } |
| value | FLOAT | false | false | ${! json("data.value") } |
| unitCode | STRING | false | false | ${! json("type") } |
| createTime | TIMESTAMP | false | false | ${! json("time") } |
点击“测试”验证目标连接。

点击“继续”,确认配置并创建 Pipeline。
在管道列表点击“开始”运行。运行后可在 DolphinDB 中查看写入结果(例如表
flowmq_mqtt_kafka)。
参数与表达式说明
table:metadata("kafka_topic").re_replace_all("[^a-zA-Z0-9_]", "_")将消息所属主题中的非法字符(如/、-)替换为下划线,生成合法表名。mpCode:json("source")提取设备标识。valueTime:json("data.timestamp")提取时序时间戳。value:json("data.value")提取测点数值。unitCode:json("type")提取类型或单位代码。createTime:json("time")提取创建时间。
配置建议
- 预先在 DolphinDB 侧规划分区字段和排序字段。
- 表结构应与映射字段保持一致,避免类型转换错误。
- 时序场景建议开启批量写入以提升吞吐。