Skip to content

将 MQTT 数据导入 CockroachDB Cloud

CockroachDB 是一个分布式 SQL 数据库,专为高可用性、可扩展性以及跨多个节点和区域的一致性而设计。它简化了部署和维护数据库基础设施的复杂性,同时提供传统 SQL 数据库的优点。CockroachDB 特别适合需要强一致性和 ACID 事务的分布式应用场景。通过将 CockroachDB 与 EMQX 平台集成,可以高效地将 MQTT 消息和事件数据导入 CockroachDB,实现可靠存储和高级分析。

工作原理

将 CockroachDB 集成到 EMQX 平台中,结合了 MQTT 的实时数据收集能力与 CockroachDB 分布式 SQL 架构的强大功能。EMQX 平台内置的规则引擎简化了数据导入流程,无需复杂的编码即可确保 MQTT 消息高效路由、处理并存储到 CockroachDB 中。

下图展示了 EMQX 平台与 CockroachDB 的典型数据集成架构。

data_integration_cockroachdb

导入 MQTT 数据到 CockroachDB 的流程如下:

  1. 消息发布与接收: 工业物联网设备通过 MQTT 协议连接到 EMQX 平台,实时发送基于其操作状态、传感器读数或触发事件的数据。EMQX 平台接收这些 MQTT 消息,这些消息包含了需要进一步处理和存储的有价值数据。
  2. 消息数据处理: 消息接收后,EMQX 平台通过 SQL 规则引擎对其进行路由。规则引擎负责匹配传入消息与预定义规则,以确定哪些消息需要发送到 CockroachDB。如果需要,规则引擎还可以执行数据转换,例如过滤、聚合或数据增强,将数据转换为适合存储到 CockroachDB 的格式。
  3. 数据导入 CockroachDB: 规则引擎识别需要存储到 CockroachDB 的消息后,会触发操作,将处理过的数据插入数据库。CockroachDB 的强一致性和 ACID 属性确保所有事务可靠,且即使在分布式环境中数据仍然保持一致性。
  4. 数据存储与利用: 数据安全存储在 CockroachDB 后,企业可以利用其强大的 SQL 查询能力开展多种应用。例如,将温度传感器、GPS 追踪器或工业设备的 IoT 数据存储到 CockroachDB 中,从而实现预测性维护、运营监控和决策支持的实时分析。

特性与优势

将 CockroachDB 与 EMQX 平台集成,提供了多种特性和优势,优化数据传输、存储和利用:

  • 实时数据流: EMQX 平台优化了实时数据流的处理,确保 IoT 设备的数据能快速可靠地传输到 CockroachDB。该功能适用于需要实时洞察的应用,例如制造业或能源行业的设备监控与控制。
  • 高可用性与可扩展性: CockroachDB 的分布式架构确保数据被复制到多个节点,提供高可用性和容错能力。随着数据量的增长,CockroachDB 可横向扩展,在大数据集下仍能保持一致的性能。
  • 灵活的数据转换: EMQX 平台的 SQL 规则引擎支持多种数据转换功能。企业可以在将数据导入 CockroachDB 前对其进行预处理,包括过滤不必要的数据、聚合传感器读数或为有效负载添加上下文。
  • 强一致性与 ACID 事务: CockroachDB 提供强一致性保证,确保所有事务符合 ACID 规范,即使在分布式环境中,这使其适合金融系统或管理关键基础设施的应用。
  • 简易部署与管理: EMQX 平台提供直观的界面,配置数据源、设置预处理规则以及管理 CockroachDB 存储设置,从而简化数据集成管道的设置和维护过程,降低系统管理的复杂性。
  • 高级分析: 通过 CockroachDB 的强大 SQL 查询功能,用户可以对 IoT 数据执行高级数据分析。能够对大数据集运行复杂查询,帮助企业挖掘价值,例如异常检测、趋势分析和预测性建模。
  • 与现有系统集成: CockroachDB 支持丰富的 SQL 特性,与现有数据工具和系统兼容。企业可以轻松将 IoT 数据与其他企业应用(包括商业智能工具、仪表板和机器学习平台)集成,以挖掘更深入的洞察。

准备工作

本节描述在开始在 EMQX 平台控制台中创建 CockroachDB Cloud 数据集成之前需要完成的准备工作。

前置准备

启动 CockroachDB Cloud 服务

  1. Cockroach Labs 注册一个 Cockroach Labs 帐户,并创建一个新项目和集群。

  2. 按照 Cockroach Labs 快速入门指南创建集群。

    重要说明

    VPC PeeringAWS PrivateLink 仅支持标准部署及更高版本。

  3. 创建 CockroachDB Cloud 集群后,在 CockroachDB Cloud 控制台左侧菜单中点击 SQL Shell,打开一个可以执行 SQL 查询的界面。

  4. 运行以下 SQL 查询创建一个新表:

    sql
    CREATE TABLE sensor_data (
        id SERIAL PRIMARY KEY,
        sensor_id STRING NOT NULL,
        temperature FLOAT,
        humidity FLOAT,
        timestamp TIMESTAMPTZ DEFAULT now(),
        created_at TIMESTAMPTZ DEFAULT now()
    );
  5. 查询执行后,返回 Databases 部分的 Tables 标签页并刷新页面,即可看到新创建的表。

配置私网连接

在连接到 CockroachDB Cloud 之前,需要在 EMQX Platform 中创建一个私网连接,并在 CockroachDB Cloud 中配置一个 PrivateLink 端点服务,以在 EMQX 部署与 CockroachDB Cloud 服务器之间建立私有网络连接。

  1. 登录 EMQX Platform 控制台,进入目标部署的概览页面。

  2. 导航至网络管理,在 私网连接 (PrivateLink) 区域点击 + 私网连接 按钮。

  3. 确认提示并点击 了解并进入下一步

  4. 登录 CockroachDB Cloud 平台,打开集群。在左侧导航菜单中点击 Networking

  5. 找到 Private endpoint,点击 Add a private endpoint。 保存 Service name

    cockroach_image

  6. Service Name 填入 EMQX Platform 的端点服务名字段,点击创建私网连接,等待终端节点 ID 显示。

    ![cockroach_image 1](./_assets/cockroach_image 1.png)

  7. 返回 CockroachDB Cloud 平台,将 EMQX 平台中的终端节点 ID 添加到 Private Endpoints,提供描述(可选),点击 Create Endpoint

  8. 等待连接状态在部署概览页面中变为运行中

  9. 在 CockroachDB Cloud 的 Private endpoints 区域确认新配置的端点 ID 是否显示,表示 EMQX Platform 与 CockroachDB Cloud 的 PrivateLink 已成功建立。

创建连接器

在创建数据集成规则之前,需要先创建一个 PostgreSQL 连接器来访问 CockroachDB Cloud 服务器。

  1. 进入您的部署。在左侧导航栏点击数据集成。首次创建连接器时,选择 PostgreSQL

  2. 新建连接器页面,配置以下连接信息:

    • 连接器名称:系统会自动生成一个连接器名称。

    • 服务器地址:输入 CockroachDB 集群服务器主机名。

      在 CockroachDB Cloud 平台上点击右上角 Connect,如果已经建立 PrivateLink,选择 Private connection,复制高亮部分的服务器主机名。

      ![cockroach_image 2](./_assets/cockroach_image 2.png)

    • 数据库名称:输入数据库名称,例如 defaultdb (创建 CockroachDB Cloud 集群时的默认数据库) 。

    • 用户名和密码:输入创建 CockroachDB 集群时设置的用户名和密码。

    • 启用 TLS:点击切换开关以启用 TLS 连接。在 SNI 输入框中输入服务器主机名。

  3. 点击测试连接按钮,如果 CockroachDB Cloud 服务能够正常访问,则会返回成功提示。

  4. 点击新建按钮完成连接器的创建。

创建规则

本节演示如何通过 EMQX Platform 控制台创建 CockroachDB 规则并向规则添加动作。

  1. 在规则区域点击新建规则,或在连接器的操作列点击创建规则图标。

  2. SQL 编辑器中设置规则:

    sql
    SELECT
        payload.sensor_id AS sensor_id,
        payload.temperature AS temperature,
        payload.humidity AS humidity,
        payload.timestamp AS timestamp
    FROM "sensor/#"
  3. 点击下一步添加动作。

  4. 连接器下拉框中选择之前创建的连接器。

  5. 在 SQL 模板中输入以下命令:

    sql
    INSERT INTO public.sensor_data(
        id,
        sensor_id,
        temperature,
        humidity,
        "timestamp",
        created_at
    )
    VALUES (
        DEFAULT,
        ${sensor_id},
        ${temperature},
        ${humidity},
        TO_TIMESTAMP(${timestamp} / 1000),
        DEFAULT
    );
  6. 高级设置(可选)。

  7. 点击确认按钮完成规则创建。

  8. 成功创建规则弹出窗口中点击返回规则列表,完成整个数据集成配置链。

测试规则

建议使用 MQTTX 模拟温度和湿度数据的上报,您也可以选择使用其他客户端。

  1. 使用 MQTTX 连接到部署,并向以下主题发送消息:

    • 主题:sensor/#

    • 客户端 ID:test_client

    • Payload:

      json
      {
        "sensor_id": "sensor_001",
        "temperature": 25.5,
        "humidity": 60.0,
        "timestamp": 1674392048000
      }
  2. 点击发送按钮发送消息。消息数据应已插入到 CockroachDB Cloud 服务器上的 defaultdb 数据库中的 sensor_data 表中。