---
title: DolphinDB 数据集成
---

# DolphinDB 数据集成

本文介绍如何将 FlowMQ Stream 中的数据实时写入 DolphinDB。

## 适用场景

- 金融行情实时写入与分析
- IoT 设备时序数据落库
- 高吞吐指标数据聚合

## 前提条件

- 已完成基础数据流准备，参考 [数据集成总览](data-integration.md)
- DolphinDB 服务可访问
- 已具备目标库与表的写入权限

## 配置步骤

1. 在“数据管道”模块点击“创建管道”，填写名称（如 `dolphindb管道`），方向选择下行，任务数按需设置。
![dolphin第一步](.assets/data-integration2_cn.png)

2. 选择数据流（如 `flowmq/mqtt/kafka`），点击“继续”。
![dolphin第二步](.assets/data-integration3_cn.png)

3. 选择“dolphindb”卡片，填写连接参数：

| 参数      | 值 |
|-----------|----|
| url       | 192.168.55.31:8848 |
| username  | admin |
| password  | 123456 |
| directory | dfs://cydb |
| table     | `${! metadata("kafka_topic").re_replace_all("[^a-zA-Z0-9_]", "_") }` |

4. 配置 `columns` 字段映射：

| name       | data_type | is_partition | is_sort | value |
|------------|-----------|--------------|---------|-------|
| mpCode     | STRING    | false        | true    | `${! json("source") }` |
| valueTime  | TIMESTAMP | true         | false   | `${! json("data.timestamp") }` |
| value      | FLOAT     | false        | false   | `${! json("data.value") }` |
| unitCode   | STRING    | false        | false   | `${! json("type") }` |
| createTime | TIMESTAMP | false        | false   | `${! json("time") }` |

5. 点击“测试”验证目标连接。
![dolphin第三步](.assets/data-integration4_cn.png)

6. 点击“继续”，确认配置并创建 Pipeline。

7. 在管道列表点击“开始”运行。运行后可在 DolphinDB 中查看写入结果（例如表 `flowmq_mqtt_kafka`）。

## 参数与表达式说明

- `table`：`metadata("kafka_topic").re_replace_all("[^a-zA-Z0-9_]", "_")`
将消息所属主题中的非法字符（如 `/`、`-`）替换为下划线，生成合法表名。

- `mpCode`：`json("source")`
提取设备标识。

- `valueTime`：`json("data.timestamp")`
提取时序时间戳。

- `value`：`json("data.value")`
提取测点数值。

- `unitCode`：`json("type")`
提取类型或单位代码。

- `createTime`：`json("time")`
提取创建时间。

## 配置建议

- 预先在 DolphinDB 侧规划分区字段和排序字段。
- 表结构应与映射字段保持一致，避免类型转换错误。
- 时序场景建议开启批量写入以提升吞吐。
