Skip to content

将 MQTT 数据写入到 Apache IoTDB

Apache IoTDB 是一款高性能、可扩展的时序数据库,专为处理由各类物联网(IoT)设备和系统产生的大规模时序数据而设计。

EMQX Cloud 支持与 Apache IoTDB 无缝数据集成,可通过 REST API V2 将 EMQX Cloud 接收到的实时 MQTT 消息转发并写入 IoTDB。该集成采用单向数据流模式,将 MQTT 数据写入 IoTDB,用于高效的时序数据存储与分析。

本文将介绍如何将 EMQX Cloud 与 Apache IoTDB 集成,并提供创建和验证该集成的实用指南。

工作原理

Apache IoTDB 数据集成是 EMQX Cloud 内置的一项功能,可在无需额外编写代码的情况下,将基于 MQTT 的时序数据写入 Apache IoTDB。通过利用 EMQX Cloud 内置的规则引擎,该集成能够简化数据的过滤、转换和转发过程,从而实现数据在 IoTDB 中的高效存储与查询。

下图展示了 EMQX Cloud 与 IoTDB 之间典型的数据集成架构。

EMQX Cloud Apache IoTDB 数据集成

数据集成的整体流程如下:

  1. 消息发布与接收:设备通过 MQTT 协议连接至 EMQX Cloud,并发布包含遥测数据、状态更新或事件信息的消息。规则引擎会对接收到的消息进行匹配。
  2. 基于规则的处理:匹配到已定义规则的消息将被选中进行后续处理。可选地对消息进行转换,例如字段过滤、数据格式转换或负载内容增强。
  3. 数据缓冲:为提升系统可靠性,当 IoTDB 暂时不可用时,EMQX Cloud 会将消息缓存在内存中;必要时,可将缓冲数据转移至磁盘以避免内存压力。当集成或 EMQX 节点重启时,缓冲数据不会被保留。
  4. 数据写入 IoTDB:对于匹配规则的消息,EMQX Cloud 会触发将消息转发到 IoTDB 的动作,将处理后的数据转发并以时序数据的形式写入 IoTDB。
  5. 数据存储与使用:数据写入 IoTDB 后,可用于下游应用的查询与分析,例如设备监控、资产跟踪、预测性维护以及运维优化等场景。

特性与优势

与 IoTDB 的数据集成为高效的数据处理与存储提供了多项关键能力和优势:

  • 无需编写代码的 IoT 数据管道

    借助内置的规则和 Sink,无需自定义代码或引入外部服务,即可在 EMQX 与 Apache IoTDB 之间构建完整的 MQTT 到时序数据的数据管道。

  • 灵活的 MQTT 到 IoTDB 数据模型映射

    同时支持 Tree 模型和 Table 模型,可根据设备建模方式和查询需求,将 MQTT 数据写入适配的 IoTDB 数据结构中。

  • 写入与存储解耦

    EMQX 可承载突发的高频 MQTT 流量,而 IoTDB 专注于可靠的时序数据存储,从而提升整体系统的稳定性和容错能力。

  • 面向生产环境的可扩展性

    该集成可随设备数量和数据规模进行水平扩展,适用于大规模 IoT、工业物联网(IIoT)以及能源等应用场景。

  • 可直接用于分析的时序数据

    写入 IoTDB 的数据可直接进行查询、聚合和分析,或与大数据计算引擎集成,用于高级分析和长期数据洞察。

准备工作

本节介绍了在 EMQX Cloud 中创建 Apache IoTDB 数据集成之前需要做的准备工作。

前置准备

网络设置

开始之前,您需要在 EMQX Cloud 上创建一个部署(EMQX 群集)并配置网络。

  • 对于专有版部署用户: 请先创建 VPC 对等连接,创建完对等连接之后,可以通过内部网络 IP 登录 Platform Console 访问目标连接器。或者开通 NAT 网关,通过公网 IP 访问目标连接器。
  • 对于 BYOC 部署用户: 请在部署 BYOC 的 VPC 和目标连接器所在的 VPC 之间建立对等连接,创建完对等连接之后,可以通过内部网络 IP 访问目标连接器。如果您需要通过公共 IP 地址访问资源,请在公共云控制台中为部署 BYOC 的 VPC 配置 NAT 网关。

启动 Apache IoTDB 服务器

本节将介绍如何通过 Docker 启动 Apache IoTDB 服务器。 确保在您的 IoTDB 配置中具备该字段: enable_rest_service=true

在 REST 接口开启的情况下运行下面的命令启动 Apache IoTDB 服务器:

bash
docker run -d --name iotdb-service \
              --hostname iotdb-service \
              -p 6667:6667 \
              -p 18080:18080 \
              -e enable_rest_service=true \
              -e cn_internal_address=iotdb-service \
              -e cn_target_config_node_list=iotdb-service:10710 \
              -e cn_internal_port=10710 \
              -e cn_consensus_port=10720 \
              -e dn_rpc_address=iotdb-service \
              -e dn_internal_address=iotdb-service \
              -e dn_target_config_node_list=iotdb-service:10710 \
              -e dn_mpp_data_exchange_port=10740 \
              -e dn_schema_region_consensus_port=10750 \
              -e dn_data_region_consensus_port=10760 \
              -e dn_rpc_port=6667 \
              apache/iotdb:2.0.5-standalone

有关如何通过 Docker 运行 IoTDB 的更多信息,请参阅: IoTDB in Docker on Docker Hub

创建数据库

IoTDB 支持两种数据建模方式:树模型和表模型。 在创建数据库前,请先确认后续在连接器和 Sink 中使用的 SQL 方言(树模型或表模型),并根据所选模型创建对应的数据库。

  • 对于树模型,只需创建数据库即可。
  • 对于表模型,在创建数据库后,还需要创建表用于数据写入。

具体步骤请参考 IoTDB 用户手册:

创建 Apache IoTDB 连接器

在创建数据集成的规则之前,您需要先创建一个 Apache IoTDB 连接器用于访问 Apache IoTDB 服务器。

  1. 在部署菜单中选择数据集成,在数据持久化服务分类下选择 Apache IoTDB 服务。如果您已经创建了其他的连接器,点击新建连接器,然后在数据持久化服务分类下选择 Apache IoTDB 服务。

  2. 配置连接器:

    • 连接器名称:输入连接器的唯一名称。可由大小写字母或数字组成,例如 my_iotdb
    • 描述:(可选)填写连接器的简要说明。
    • 驱动:选择用于连接 IoTDB 的协议。
      • REST API:在 IoTDB REST 服务基础 URL 中输入 IoTDB REST 服务地址,例如 http://localhost:18080
      • Thrift 协议:在服务器地址中输入 IoTDB Thrift 服务器地址,例如:localhost:6667
    • SQL 方言:选择 EMQX 向 IoTDB 写入设备数据时使用的数据模型。
      • 树模型:将数据写入为分层的时序路径,适用于基于路径的设备和测点管理。
      • 表模型:将数据写入关系表中,适用于按设备类型或设备类别管理数据。
    • 数据库名字:当 SQL 方言选择表模型时,需要配置所连接到的数据库名称。
    • 用户名密码:输入 EMQX 用于认证 Apache IoTDB 服务器的凭据。
    • IoTDB 版本:选择 Apache IoTDB 的部署版本。此处,选择 v2.0.x
    • 启用 TLS:启用该选项以建立与 Apache IoTDB 服务器之间的加密连接。
    • 如需进行可选调优配置,请参见高级设置,详见高级设置
  3. (可选)点击测试连接,验证连接器是否能够成功连接到 Apache IoTDB 服务器。

  4. 点击创建完成连接器创建。

    在弹出的对话框中,可以选择返回连接器列表,或点击创建规则继续配置规则和 Apache IoTDB Sink。详细步骤请参见创建 Apache IoTDB Sink 规则

创建 Apache IoTDB Sink 规则

本节演示如何在 EMQX Cloud 中创建一条规则,用于处理来自 MQTT 源主题 temp_hum/emqx 的消息,并通过已配置的 Apache IoTDB Sink 将处理后的结果写入 Apache IoTDB,以存储时序数据。

  1. 点击连接器列表操作列下的新建规则图标或在规则列表中点击新建规则进入新建规则步骤页。

  2. SQL 编辑器中输入规则匹配的 SQL 语句。在以下示例规则中,我们通过主题 temp_hum/emqx 读取消息中的数据,包括上报时间 up_timestamp、客户端 ID 以及消息负载内容。同时,还可以从该主题中读取温度和湿度数据。

    sql
     SELECT
       payload.temp as temp,
       payload.hum as hum,
       payload.device_id as device_id
     FROM
       "temp_hum/emqx"

    TIP

    如果您初次使用 SQL,可以点击 SQL 示例启用调试来学习和测试规则 SQL 的结果。

  3. 为规则添加一个 Apache IoTDB Sink,用于将处理后的结果写入 IoTDB。具体操作步骤请参见添加 Apache IoTDB Sink

  4. 点击确认按钮,完成规则创建。

  5. 成功创建规则弹窗中,点击返回规则列表,至此完成整个数据集成配置流程。

添加 Apache IoTDB Sink

  1. 新建规则 页面中,点击下一步以添加当规则匹配时触发的动作。该动作会将处理后的数据转发至 IoTDB。

  2. 使用连接器下拉框中选择刚创建的 my_iotdb 连接器。

  3. 配置 Sink 的以下信息:

    • SQL 方言:选择 Apache IoTDB Sink 向 IoTDB 写入数据的方式。该选项必须与连接器中配置的 SQL 方言保持一致,否则数据将无法写入。

      • 树模型:以时序路径的形式向 IoTDB 写入数据。每条 Sink 记录会写入一个设备路径,测点作为该设备下的独立时间序列。当选择此模型,您可以填写设备 ID 字段。
      • 表模型:将数据写入 IoTDB 的关系表中。每条 Sink 记录对应表中的一行数据,字段映射为表中的列。当选择此模型,您必须填写字段。
    • 设备 ID (可选):输入特定的设备 ID,用作将时序数据转发和插入到 IoTDB 实例的设备名称。

      TIP

      如果留空,设备 ID 仍然可以在发布的消息中指定或在规则中配置。例如,如果您发布一个包含 device_id 字段的 JSON 编码消息,该字段的值将确定输出设备 ID。要使用规则引擎提取这些信息,您可以使用类似以下的 SQL:

      sql
      SELECT
        payload,
        `my_device` as payload.device_id

      但是,此字段中配置的固定设备 ID 优先于前面提到的任何方法。

    • :指定写入数据的 IoTDB 表名。

    • 对齐时间序列:默认禁用。启用后,一组对齐的时序数据的时间戳列将在 IoTDB 中仅存储一次,而不是在该组内的每个单独时序数据中重复存储。

  4. 为 Sink 配置写入数据以指定从 MQTT 消息生成 IoTDB 数据的方式。

    您可以在写入数据中定义一个模板,包括所需的每行的上下文信息。当提供此模板时,系统将通过应用它到MQTT 消息来生成 IoTDB 数据。写入数据的模版支持通过 CSV 文件批量设置,详细说明请参考批量设置

    例如,使用以下模板:

    注意

    列类别列仅适用于 SQL 方言为表模型的情况。

    列类别时间戳字段数据类型
    fieldindexINT32${index}
    temperatureFLOAT${temp}

    每列支持占位符语法以用变量填充。如果省略时间戳,它将自动填充为当前系统时间(毫秒)。然后,您的 MQTT 消息将如下所示:

    json
    {
    "index": "42",
    "temp": "32.67"
    }
  5. 高级设置(可选):详细请参考高级设置

  6. (可选)点击测试连接,验证 Sink 是否能够成功连接到 Apache IoTDB 服务器。

批量设置

在 Apache IoTDB 中,可能需要同时写入数百条数据,在 EMQX Cloud 上进行配置是具有挑战性的工作。为了解决这个问题,EMQX Cloud 提供了批量设置数据写入的功能。

当配置写入数据时,您可以使用批量设置功能,从 CSV 文件中导入要进行插入操作的字段。

  1. 点击写入数据表格的批量设置按钮,打开导入批量设置弹窗。

  2. 根据指引,先下载批量设置模板文件,然后在模板文件中填入数据写入配置,默认的模板文件内容如下:

    注意

    以下为 SQL 方言设置为表模型时的默认模板。 当 SQL 方言设置为树模型时,不包含 Column Category 列。

    Column CategoryTimestampMeasurementData TypeValueRemarks (Optional)
    tagnowclientidtext${clientid}
    fieldnowtempfloat${payload.temp}字段、值、数据类型是必填选项,数据类型可选的值为 boolean、 int32、 int64、 float、 double、 text
    attributenowhumtext${payload.hum}
    attributenowstatustext${payload.status}
    • Column Category: 列的数据模型,可选值:tag、field、attribute。tag 必须是字符串,推荐选择 field 或者 attribute。
    • Timestamp: 支持使用 ${var} 格式的占位符,要求是时间戳格式。也可以使用以下特殊字符插入系统时间:
      • now: 当前毫秒级时间戳
      • now_ms: 当前毫秒级时间戳
      • now_us: 当前微秒级时间戳
      • now_ns: 当前纳秒级时间戳
    • Measurement: 字段名。
    • Data Type: 数据类型,可选值包括 boolean、 int32、 int64、 float、 double、 text。
    • Value: 写入的数据值,支持常量或 ${var} 格式的占位符,需要与数据类型匹配。
    • Remarks: 仅用于 CSV 文件内字段的备注,无法导入到 EMQX 中。

    注意,仅支持 1M 以内的 CSV 格式文件,文件中数据不能超过 2000 行。

  3. 将填好的模板文件保存并上传到导入批量设置弹窗中,点击导入完成批量设置。

  4. 导入完成后,您可以在写入数据表格中对数据进行进一步的调整。

测试规则

推荐使用 MQTTX 模拟温湿度数据上报,同时您也可以使用其他任意客户端完成。

  1. 使用 MQTTX 连接到部署,并向以下 Topic 发送消息。

    • topic: temp_hum/emqx

    • payload:

      json
      {
        "temp": "27.5",
        "hum": "41.8",
        "device_id": "root.sg27" // 设备 ID
      }
  2. 查看消息是否转发到了 Apache IoTDB。

    • 您可以使用 IoTDB 的命令行查看。如果服务器在 docker 中运行,可以使用下面的命令连接服务器:

      bash
      $ docker exec -ti iotdb-service /iotdb/sbin/start-cli.sh -h iotdb-service
    • 查询数据:

      bash
      IoTDB> select * from root.sg27
      +------------------------+-------------+--------------+
      |                    Time|root.sg27.hum|root.sg27.temp|
      +------------------------+-------------+--------------+
      |2024-03-25T08:45:19.541Z|         41.8|          27.5|
      +------------------------+-------------+--------------+
      Total line number = 1
      It costs 0.166s
  3. 在控制台查看运行数据。在规则列表点击规则 ID,在运行统计页面可以查看到规则的统计以及此规则下所有动作的统计。