Skip to content

IoTDB 数据集成

本文介绍如何将 FlowMQ Stream 中的设备遥测数据实时写入 Apache IoTDB。

适用场景

  • 工业设备监控
  • 能源与环境采集
  • 海量时序数据持久化

前提条件

  • 已完成基础数据流准备,参考 数据集成总览
  • IoTDB 服务可访问
  • 已具备写入目标路径的权限

配置步骤

  1. 在“数据管道”模块点击“创建管道”,填写名称(如 iotdb管道),方向选择下行,任务数按需设置。

  2. 选择数据流(如 flowmq/mqtt/kafka),点击“继续”。

  3. 选择“iotdb”卡片,填写连接参数:

参数
urls192.168.55.31:6667
usernameroot
passwordroot
device_idroot.ln.${! metadata("kafka_topic").re_replace_all("/", ".") }
  1. 配置 columns 字段映射:
timestampmeasurementdata_typevalue
${! json("data.timestamp") }${! json("source") }FLOAT${! json("data.value") }
  1. 点击“测试”验证目标连接。 iotdb第三步

  2. 点击“继续”,确认配置并创建 Pipeline。

  3. 在管道列表点击“开始”运行。运行后可在 IoTDB 中查看写入结果(例如路径 root.ln.flowmq.mqtt.kafka)。

参数与表达式说明

  • device_idroot.ln.${! metadata("kafka_topic").re_replace_all("/", ".") } 将主题中的 / 替换为 .,拼接成 IoTDB 设备路径。

  • timestampjson("data.timestamp") 提取时序时间戳。

  • measurementjson("source") 提取测点名称(如传感器 ID)。

  • valuejson("data.value") 提取测点数值。

配置建议

  • 统一规划 device_id 命名层级,避免路径混乱。
  • 关注 timestamp 精度与 IoTDB 配置一致性。
  • 对异常值可在上游或转换阶段做清洗后再写入。