将 MQTT 数据导入 CockroachDB Cloud
CockroachDB 是一个分布式 SQL 数据库,专为高可用性、可扩展性以及跨多个节点和区域的一致性而设计。它简化了部署和维护数据库基础设施的复杂性,同时提供传统 SQL 数据库的优点。CockroachDB 特别适合需要强一致性和 ACID 事务的分布式应用场景。通过将 CockroachDB 与 EMQX 平台集成,可以高效地将 MQTT 消息和事件数据导入 CockroachDB,实现可靠存储和高级分析。
工作原理
将 CockroachDB 集成到 EMQX 平台中,结合了 MQTT 的实时数据收集能力与 CockroachDB 分布式 SQL 架构的强大功能。EMQX 平台内置的规则引擎简化了数据导入流程,无需复杂的编码即可确保 MQTT 消息高效路由、处理并存储到 CockroachDB 中。
下图展示了 EMQX 平台与 CockroachDB 的典型数据集成架构。
导入 MQTT 数据到 CockroachDB 的流程如下:
- 消息发布与接收: 工业物联网设备通过 MQTT 协议连接到 EMQX 平台,实时发送基于其操作状态、传感器读数或触发事件的数据。EMQX 平台接收这些 MQTT 消息,这些消息包含了需要进一步处理和存储的有价值数据。
- 消息数据处理: 消息接收后,EMQX 平台通过 SQL 规则引擎对其进行路由。规则引擎负责匹配传入消息与预定义规则,以确定哪些消息需要发送到 CockroachDB。如果需要,规则引擎还可以执行数据转换,例如过滤、聚合或数据增强,将数据转换为适合存储到 CockroachDB 的格式。
- 数据导入 CockroachDB: 规则引擎识别需要存储到 CockroachDB 的消息后,会触发操作,将处理过的数据插入数据库。CockroachDB 的强一致性和 ACID 属性确保所有事务可靠,且即使在分布式环境中数据仍然保持一致性。
- 数据存储与利用: 数据安全存储在 CockroachDB 后,企业可以利用其强大的 SQL 查询能力开展多种应用。例如,将温度传感器、GPS 追踪器或工业设备的 IoT 数据存储到 CockroachDB 中,从而实现预测性维护、运营监控和决策支持的实时分析。
特性与优势
将 CockroachDB 与 EMQX 平台集成,提供了多种特性和优势,优化数据传输、存储和利用:
- 实时数据流: EMQX 平台优化了实时数据流的处理,确保 IoT 设备的数据能快速可靠地传输到 CockroachDB。该功能适用于需要实时洞察的应用,例如制造业或能源行业的设备监控与控制。
- 高可用性与可扩展性: CockroachDB 的分布式架构确保数据被复制到多个节点,提供高可用性和容错能力。随着数据量的增长,CockroachDB 可横向扩展,在大数据集下仍能保持一致的性能。
- 灵活的数据转换: EMQX 平台的 SQL 规则引擎支持多种数据转换功能。企业可以在将数据导入 CockroachDB 前对其进行预处理,包括过滤不必要的数据、聚合传感器读数或为有效负载添加上下文。
- 强一致性与 ACID 事务: CockroachDB 提供强一致性保证,确保所有事务符合 ACID 规范,即使在分布式环境中,这使其适合金融系统或管理关键基础设施的应用。
- 简易部署与管理: EMQX 平台提供直观的界面,配置数据源、设置预处理规则以及管理 CockroachDB 存储设置,从而简化数据集成管道的设置和维护过程,降低系统管理的复杂性。
- 高级分析: 通过 CockroachDB 的强大 SQL 查询功能,用户可以对 IoT 数据执行高级数据分析。能够对大数据集运行复杂查询,帮助企业挖掘价值,例如异常检测、趋势分析和预测性建模。
- 与现有系统集成: CockroachDB 支持丰富的 SQL 特性,与现有数据工具和系统兼容。企业可以轻松将 IoT 数据与其他企业应用(包括商业智能工具、仪表板和机器学习平台)集成,以挖掘更深入的洞察。
准备工作
本节描述在开始在 EMQX 平台控制台中创建 CockroachDB Cloud 数据集成之前需要完成的准备工作。
前置准备
启动 CockroachDB Cloud 服务
在 Cockroach Labs 注册一个 Cockroach Labs 帐户,并创建一个新项目和集群。
按照 Cockroach Labs 快速入门指南创建集群。
重要说明
VPC Peering 和 AWS PrivateLink 仅支持标准部署及更高版本。
创建 CockroachDB Cloud 集群后,在 CockroachDB Cloud 控制台左侧菜单中点击 SQL Shell,打开一个可以执行 SQL 查询的界面。
运行以下 SQL 查询创建一个新表:
sqlCREATE TABLE sensor_data ( id SERIAL PRIMARY KEY, sensor_id STRING NOT NULL, temperature FLOAT, humidity FLOAT, timestamp TIMESTAMPTZ DEFAULT now(), created_at TIMESTAMPTZ DEFAULT now() );
查询执行后,返回 Databases 部分的 Tables 标签页并刷新页面,即可看到新创建的表。
配置私网连接
在连接到 CockroachDB Cloud 之前,需要在 EMQX Platform 中创建一个私网连接,并在 CockroachDB Cloud 中配置一个 PrivateLink 端点服务,以在 EMQX 部署与 CockroachDB Cloud 服务器之间建立私有网络连接。
登录 EMQX Platform 控制台,进入目标部署的概览页面。
导航至网络管理,在 私网连接 (PrivateLink) 区域点击 + 私网连接 按钮。
确认提示并点击 了解并进入下一步。
登录 CockroachDB Cloud 平台,打开集群。在左侧导航菜单中点击 Networking。
找到 Private endpoint,点击 Add a private endpoint。 保存 Service name。
将 Service Name 填入 EMQX Platform 的端点服务名字段,点击创建私网连接,等待终端节点 ID 显示。

返回 CockroachDB Cloud 平台,将 EMQX 平台中的终端节点 ID 添加到 Private Endpoints,提供描述(可选),点击 Create Endpoint。
等待连接状态在部署概览页面中变为运行中。
在 CockroachDB Cloud 的 Private endpoints 区域确认新配置的端点 ID 是否显示,表示 EMQX Platform 与 CockroachDB Cloud 的 PrivateLink 已成功建立。
创建连接器
在创建数据集成规则之前,需要先创建一个 PostgreSQL 连接器来访问 CockroachDB Cloud 服务器。
进入您的部署。在左侧导航栏点击数据集成。首次创建连接器时,选择 PostgreSQL。
在新建连接器页面,配置以下连接信息:
连接器名称:系统会自动生成一个连接器名称。
服务器地址:输入 CockroachDB 集群服务器主机名。
在 CockroachDB Cloud 平台上点击右上角 Connect,如果已经建立 PrivateLink,选择 Private connection,复制高亮部分的服务器主机名。

数据库名称:输入数据库名称,例如
defaultdb
(创建 CockroachDB Cloud 集群时的默认数据库) 。用户名和密码:输入创建 CockroachDB 集群时设置的用户名和密码。
启用 TLS:点击切换开关以启用 TLS 连接。在 SNI 输入框中输入服务器主机名。
点击测试连接按钮,如果 CockroachDB Cloud 服务能够正常访问,则会返回成功提示。
点击新建按钮完成连接器的创建。
创建规则
本节演示如何通过 EMQX Platform 控制台创建 CockroachDB 规则并向规则添加动作。
在规则区域点击新建规则,或在连接器的操作列点击创建规则图标。
在 SQL 编辑器中设置规则:
sqlSELECT payload.sensor_id AS sensor_id, payload.temperature AS temperature, payload.humidity AS humidity, payload.timestamp AS timestamp FROM "sensor/#"
点击下一步添加动作。
从连接器下拉框中选择之前创建的连接器。
在 SQL 模板中输入以下命令:
sqlINSERT INTO public.sensor_data( id, sensor_id, temperature, humidity, "timestamp", created_at ) VALUES ( DEFAULT, ${sensor_id}, ${temperature}, ${humidity}, TO_TIMESTAMP(${timestamp} / 1000), DEFAULT );
高级设置(可选)。
点击确认按钮完成规则创建。
在成功创建规则弹出窗口中点击返回规则列表,完成整个数据集成配置链。
测试规则
建议使用 MQTTX 模拟温度和湿度数据的上报,您也可以选择使用其他客户端。
使用 MQTTX 连接到部署,并向以下主题发送消息:
主题:
sensor/#
客户端 ID:
test_client
Payload:
json{ "sensor_id": "sensor_001", "temperature": 25.5, "humidity": 60.0, "timestamp": 1674392048000 }
点击发送按钮发送消息。消息数据应已插入到 CockroachDB Cloud 服务器上的
defaultdb
数据库中的sensor_data
表中。