# **将 MQTT 数据导入 CockroachDB Cloud**

[CockroachDB](https://www.cockroachlabs.com/product/cloud/) 是一个分布式 SQL 数据库，专为高可用性、可扩展性以及跨多个节点和区域的一致性而设计。它简化了部署和维护数据库基础设施的复杂性，同时提供传统 SQL 数据库的优点。CockroachDB 特别适合需要强一致性和 ACID 事务的分布式应用场景。通过将 CockroachDB 与 EMQX 平台集成，可以高效地将 MQTT 消息和事件数据导入 CockroachDB，实现可靠存储和高级分析。

## 工作原理

将 CockroachDB 集成到 EMQX 平台中，结合了 MQTT 的实时数据收集能力与 CockroachDB 分布式 SQL 架构的强大功能。EMQX 平台内置的规则引擎简化了数据导入流程，无需复杂的编码即可确保 MQTT 消息高效路由、处理并存储到 CockroachDB 中。

下图展示了 EMQX 平台与 CockroachDB 的典型数据集成架构。

![data_integration_cockroachdb](./_assets/data_integration_cockroachdb.png)

导入 MQTT 数据到 CockroachDB 的流程如下：

1. **消息发布与接收：** 工业物联网设备通过 MQTT 协议连接到 EMQX 平台，实时发送基于其操作状态、传感器读数或触发事件的数据。EMQX 平台接收这些 MQTT 消息，这些消息包含了需要进一步处理和存储的有价值数据。
2. **消息数据处理：** 消息接收后，EMQX 平台通过 SQL 规则引擎对其进行路由。规则引擎负责匹配传入消息与预定义规则，以确定哪些消息需要发送到 CockroachDB。如果需要，规则引擎还可以执行数据转换，例如过滤、聚合或数据增强，将数据转换为适合存储到 CockroachDB 的格式。
3. **数据导入 CockroachDB：** 规则引擎识别需要存储到 CockroachDB 的消息后，会触发操作，将处理过的数据插入数据库。CockroachDB 的强一致性和 ACID 属性确保所有事务可靠，且即使在分布式环境中数据仍然保持一致性。
4. **数据存储与利用：** 数据安全存储在 CockroachDB 后，企业可以利用其强大的 SQL 查询能力开展多种应用。例如，将温度传感器、GPS 追踪器或工业设备的 IoT 数据存储到 CockroachDB 中，从而实现预测性维护、运营监控和决策支持的实时分析。

## 特性与优势

将 CockroachDB 与 EMQX 平台集成，提供了多种特性和优势，优化数据传输、存储和利用：

- **实时数据流：** EMQX 平台优化了实时数据流的处理，确保 IoT 设备的数据能快速可靠地传输到 CockroachDB。该功能适用于需要实时洞察的应用，例如制造业或能源行业的设备监控与控制。
- **高可用性与可扩展性：** CockroachDB 的分布式架构确保数据被复制到多个节点，提供高可用性和容错能力。随着数据量的增长，CockroachDB 可横向扩展，在大数据集下仍能保持一致的性能。
- **灵活的数据转换：** EMQX 平台的 SQL 规则引擎支持多种数据转换功能。企业可以在将数据导入 CockroachDB 前对其进行预处理，包括过滤不必要的数据、聚合传感器读数或为有效负载添加上下文。
- **强一致性与 ACID 事务：** CockroachDB 提供强一致性保证，确保所有事务符合 ACID 规范，即使在分布式环境中，这使其适合金融系统或管理关键基础设施的应用。
- **简易部署与管理：** EMQX 平台提供直观的界面，配置数据源、设置预处理规则以及管理 CockroachDB 存储设置，从而简化数据集成管道的设置和维护过程，降低系统管理的复杂性。
- **高级分析：** 通过 CockroachDB 的强大 SQL 查询功能，用户可以对 IoT 数据执行高级数据分析。能够对大数据集运行复杂查询，帮助企业挖掘价值，例如异常检测、趋势分析和预测性建模。
- **与现有系统集成：** CockroachDB 支持丰富的 SQL 特性，与现有数据工具和系统兼容。企业可以轻松将 IoT 数据与其他企业应用（包括商业智能工具、仪表板和机器学习平台）集成，以挖掘更深入的洞察。

## 准备工作

本节描述在开始在 EMQX 平台控制台中创建 CockroachDB Cloud 数据集成之前需要完成的准备工作。

### 前置准备

- 了解 [数据集成](https://github.com/emqx/cloud-docs/blob/master/en_US/data_integration/introduction.md)
- 了解数据集成 [规则](https://github.com/emqx/cloud-docs/blob/master/en_US/data_integration/rules.md)

### 启动 CockroachDB Cloud 服务

1. 在 [Cockroach Labs](https://www.cockroachlabs.com/) 注册一个 Cockroach Labs 帐户，并创建一个新项目和集群。

2. 按照 Cockroach Labs [快速入门指南](https://www.cockroachlabs.com/docs/cockroachcloud/create-your-cluster)创建集群。

    ::: warning 重要说明

    [VPC Peering](https://www.cockroachlabs.com/docs/cockroachcloud/network-authorization#vpc-peering) 和 [AWS PrivateLink](https://www.cockroachlabs.com/docs/cockroachcloud/network-authorization#aws-privatelink) 仅支持标准部署及更高版本。
    
    :::
    
3. 创建 CockroachDB Cloud 集群后，在 CockroachDB Cloud 控制台左侧菜单中点击 **SQL Shell**，打开一个可以执行 SQL 查询的界面。

4. 运行以下 SQL 查询创建一个新表：

    ```sql
    CREATE TABLE sensor_data (
        id SERIAL PRIMARY KEY,
        sensor_id STRING NOT NULL,
        temperature FLOAT,
        humidity FLOAT,
        timestamp TIMESTAMPTZ DEFAULT now(),
        created_at TIMESTAMPTZ DEFAULT now()
    );
    ```

5. 查询执行后，返回 **Databases** 部分的 **Tables** 标签页并刷新页面，即可看到新创建的表。

### 配置私网连接

在连接到 CockroachDB Cloud 之前，需要在 EMQX Cloud 中创建一个私网连接，并在 CockroachDB Cloud 中配置一个 PrivateLink 端点服务，以在 EMQX 部署与 CockroachDB Cloud 服务器之间建立私有网络连接。

1. 登录 [EMQX Cloud 控制台](https://cloud-intl.emqx.com/console/)，进入目标部署的概览页面。
2. 导航至**网络管理**，在 **私网连接 (PrivateLink)** 区域点击 **+ 私网连接** 按钮。
3. 确认提示并点击 **了解并进入下一步**。
4. 登录 CockroachDB Cloud 平台，打开集群。在左侧导航菜单中点击 **Networking**。

5. 找到 **Private endpoint**，点击 **Add a private endpoint**。 保存 **Service name**。

    ![cockroach_image](./_assets/cockroach_image.png)

6. 将 **Service Name** 填入 EMQX Cloud 的端点服务名字段，点击**创建私网连接**，等待**终端节点 ID** 显示。

    ![cockroach_image 1](./_assets/cockroach_image_1.png)

7. 返回 CockroachDB Cloud 平台，将 EMQX 平台中的**终端节点 ID** 添加到 **Private Endpoints**，提供描述（可选），点击 **Create Endpoint**。
8. 等待连接状态在部署概览页面中变为**运行中**。
9. 在 CockroachDB Cloud 的 **Private endpoints** 区域确认新配置的端点 ID 是否显示，表示 EMQX Cloud 与 CockroachDB Cloud 的 PrivateLink 已成功建立。

## **创建连接器**

在创建数据集成规则之前，需要先创建一个 PostgreSQL 连接器来访问 CockroachDB Cloud 服务器。

1. 进入您的部署。在左侧导航栏点击**数据集成**。首次创建连接器时，选择 **PostgreSQL**。

2. 在**新建连接器**页面，配置以下连接信息：
    - **连接器名称**：系统会自动生成一个连接器名称。
      
    - **服务器地址**：输入 CockroachDB 集群服务器主机名。
      
        在 CockroachDB Cloud 平台上点击右上角 **Connect**，如果已经建立 PrivateLink，选择 **Private connection**，复制高亮部分的服务器主机名。
        
        ![cockroach_image 2](./_assets/cockroach_image_2.png)
        
    - **数据库名称**：输入数据库名称，例如 `defaultdb` （创建 CockroachDB Cloud 集群时的默认数据库） 。

    - **用户名和密码**：输入创建 CockroachDB 集群时设置的用户名和密码。

    - **启用 TLS**：点击切换开关以启用 TLS 连接。在 **SNI** 输入框中输入服务器主机名。

3. 点击**测试连接**按钮，如果 CockroachDB Cloud 服务能够正常访问，则会返回成功提示。

4. 点击**新建**按钮完成连接器的创建。

## 创建规则

本节演示如何通过 EMQX Cloud 控制台创建 CockroachDB 规则并向规则添加动作。

1. 在规则区域点击**新建规则**，或在连接器的**操作**列点击创建规则图标。
2. 在 **SQL 编辑器**中设置规则：
   
    ```sql
    SELECT
        payload.sensor_id AS sensor_id,
        payload.temperature AS temperature,
        payload.humidity AS humidity,
        payload.timestamp AS timestamp
    FROM "sensor/#"
    ```
    
3. 点击**下一步**添加动作。
4. 从**连接器**下拉框中选择之前创建的连接器。
5. 在 SQL 模板中输入以下命令：
   
    ```sql
    INSERT INTO public.sensor_data(
        id,
        sensor_id,
        temperature,
        humidity,
        "timestamp",
        created_at
    )
    VALUES (
        DEFAULT,
        ${sensor_id},
        ${temperature},
        ${humidity},
        TO_TIMESTAMP(${timestamp} / 1000),
        DEFAULT
    );
    ```
    
6. 高级设置（可选）。
7. 点击**确认**按钮完成规则创建。
8. 在**成功创建规则**弹出窗口中点击**返回规则列表**，完成整个数据集成配置链。

## 测试规则

建议使用 [MQTTX](https://mqttx.app/zh) 模拟温度和湿度数据的上报，您也可以选择使用其他客户端。

1. 使用 MQTTX 连接到部署，并向以下主题发送消息：
    - 主题：`sensor/#`
    - 客户端 ID：`test_client`
    - Payload：
      
        ```json
        {
          "sensor_id": "sensor_001",
          "temperature": 25.5,
          "humidity": 60.0,
          "timestamp": 1674392048000
        }
        ```
    
2. 点击发送按钮发送消息。消息数据应已插入到 CockroachDB Cloud 服务器上的 `defaultdb` 数据库中的 `sensor_data` 表中。
