将 MQTT 数据写入到 Datalayers
Datalayers 是一款面向工业物联网、车联网、能源等行业的多模、超融合数据库。其强大的数据吞吐能力以及稳定的性能表现使其非常适合物联网领域。EMQX Platform 目前已支持通过 Sink 将消息和数据存储到 Datalayers 中,以便进行数据分析和可视化。
本页提供了 EMQX Platform 与 Datalayers 数据集成的全面介绍,并提供了创建和验证数据集成的实用指导。
工作原理
Datalayers 数据集成是 EMQX 中开箱即用的功能,它结合了 EMQX Platform 的设备接入、消息传输能力与 Datalayers 的数据存储和分析能力,通过简单的配置即可实现 MQTT 数据的无缝集成。EMQX 通过规则引擎与 Sink 将设备数据转发至 Datalayers 进行存储和分析,在对数据进行分析之后,Datalayers 会生成报表、图表等数据分析结果,通过 Datalayers 的可视化工具展示给用户。
下图展示了储能场景中 EMQX Platform 和 Datalayers 数据集成的典型架构。
EMQX Platform 和 Datalayers 提供了一个可扩展的物联网平台,用于高效地实时收集和分析能耗数据。在此架构中,EMQX Platform 作为物联网平台,负责设备接入、消息传输、数据路由等功能,Datalayers 作为数据存储和分析平台,负责数据存储、数据分析等功能。具体的工作流程如下:
- 消息发布与接收:储能设备通过 MQTT 协议连接成功后定期发布能耗数据,这些数据包括电量、输入输出功率信息。EMQX 接收到消息后将在规则引擎中进行比对。
- 规则引擎处理消息:通过内置的规则引擎,可以根据主题匹配处理特定来源的消息。当消息到达时,它会通过规则引擎,规则引擎会匹配对应的规则,并对消息数据进行处理,例如转换数据格式、过滤掉特定信息或使用上下文信息丰富消息。
- 写入到 Datalayers:规则引擎中定义的规则触发将消息写入到 Datalayers 的操作。Datalayers Sink 提供了 SQL 模板,能够灵活地定义写入的数据格式,将消息中的特定字段写入到 Datalayers 的对应的表和列中。
储能数据写入到 Datalayers 后,您可以灵活的使用行协议对数据进行分析,例如:
- 连接到可视化工具,例如 Grafana,根据数据生成图表,展示储能数据。
- 连接业务系统,进行储能设备状态监控与告警。
特性与优势
Datalayers 数据集成具有以下特性与优势:
- 高效的数据处理能力:EMQX Platform 能够处理海量物联网设备连接与消息吞吐,Datalayers 在数据写入、存储和查询方面具有出色的性能表现,能够满足物联网场景下的数据处理需求,不会导致系统不堪重负。
- 消息转换:消息可以写入 Datalayers 之前,通过 EMQX Platform 规则中进行丰富的处理和转换。
- 可扩展性:EMQX Platform 与 Datalayers 都具备集群扩展能力,能够随着业务的发展,利用灵活地进行集群水平扩展,满足业务的发展需求。
- 丰富的查询能力:Datalayers 提供包括优化的函数、运算符和索引技术,可实现对时间戳数据的高效查询和分析,准确地从 IoT 时间序列数据中提取有价值的见解。
- 高效存储:Datalayers 使用高压缩比的编码方式,可以大幅降低存储成本。也可以自定义不同数据的存储时间,避免不必要的数据占用存储空间。
准备工作
本节介绍了在 EMQX Platform 中创建 Datalayers 数据集成之前需要做的准备工作。
前置准备
- 了解 Datalayers 行协议。
- 了解数据集成。
- 了解规则。
网络设置
开始之前,您需要在 EMQX Platform 上创建一个部署(EMQX 群集)并配置网络。
- 对于专有版和旗舰版部署用户: 请先创建 VPC 对等连接,创建完对等连接之后,可以通过内部网络 IP 登录 Platform Console 访问目标连接器。或者开通 NAT 网关,通过公网 IP 访问目标连接器。
- 对于 BYOC 部署用户: 请在部署 BYOC 的 VPC 和目标连接器所在的 VPC 之间建立对等连接,创建完对等连接之后,可以通过内部网络 IP 访问目标连接器。如果您需要通过公共 IP 地址访问资源,请在公共云控制台中为部署 BYOC 的 VPC 配置 NAT 网关。
通过 Docker 安装 Datalayers
- 通过 Docker 安装并启动 Datalayers,详细步骤请参考 Install Datalayers。
# 启动一个 Datalayers 容器
docker run --name datalayers -p 8361:8361 datalayers/datalayers:v2.1.7
Datalayers 服务启动后,您可以通过以下步骤进入 Datalayers CLI 中创建数据库,默认的用户和密码为 admin/public:
进入 Datalayers 容器:
bashdocker exec -it datalayers bash
进入 Datalayers CLI:
bashdlsql -u admin -p public
创建数据库:
sqlcreate database emqx
创建 Datalayers 连接器
在创建数据集成的规则之前,您需要先创建一个 Datalayers 连接器用于访问 Datalayers 服务器。
在部署菜单中选择 数据集成,在数据持久化服务分类下选择 Datalayers 服务。如果您已经创建了其他的连接器,点击新建连接器,然后在数据持久化服务分类下选择 Datalayers 服务。
连接器名称:系统将自动生成一个连接器的名称。
输入连接信息:
- 服务器地址:填写服务器的 IP 地址以及端口:
{url}:8361
。 - 按照安装 Datalayers 中的设定完成数据库名字、用户名及密码设置。
- 如果您想建立加密连接,请点击 启用 TLS 切换开关。
- 服务器地址:填写服务器的 IP 地址以及端口:
点击测试连接按钮,如果 Datalayers 服务能够正常访问,则会返回成功提示。
点击新建按钮完成连接器的创建。
创建规则
接下来您需要创建一条规则来指定需要写入的数据,并在规则中添加响应动作以将经规则处理的数据转发到 Datalayers。
点击连接器列表操作列下的新建规则图标或在规则列表中点击新建规则进入新建规则步骤页。
在 SQL 编辑器中输入规则,客户端将温湿度消息发送到
temp_hum/emqx
主题时,就会触发引擎。这里需要对 SQL 进行一定的处理:sqlSELECT timestamp, payload.location as location, payload.temp as temp, payload.hum as hum FROM "temp_hum/emqx"
TIP
如果您初次使用 SQL,可以点击 SQL 示例和启用调试来学习和测试规则 SQL 的结果。
点击下一步开始创建动作。
从使用连接器下拉框中选择您之前创建的连接器。
设定时间精度,默认为毫秒。
写语句:使用 Datalayers API Line Protocol 写入 Datalayers 的数据。
TIP
由于 Datalayers 的写入完全兼容 InfluxDB v1 行协议,因此您可以参考 InfluxDB 行协议 来设置数据格式。
例如,输入带符号的整型值,请在占位符后添加
i
作为类型标识,例如${payload.int}i
。参见 InfluxDB 1.8 写入整型值。此处,我们可以使用 Line Protocol 格式,将其设置为:
bashtemp_hum,location=${location} temp=${temp},hum=${hum} ${timestamp}
根据需要配置高级设置选项(可选),详情请参考高级设置。
点击确认按钮完成动作的配置。
在弹出的成功创建规则提示框中点击返回规则列表,从而完成了整个数据集成的配置链路。
测试规则
推荐使用 MQTTX 模拟温湿度数据上报,同时您也可以使用其他任意客户端完成。
使用 MQTTX 连接到部署,并向以下 Topic 发送消息。
topic:
temp_hum/emqx
payload:
json{ "temp": 27.5, "hum": 41.8, "location": "Prague" }
前往 Datalayers CLI,查看数据是否成功写入到数据库,执行以下命令:
- 进入 Datalayers 控制台:
bash$ docker exec -it datalayers bash $ dlsql -u admin -p public $ use emqx
- 执行 SQL 查询数据:
bash> select * from "temp_hum" +-------------------------------+----------+------+------+ | time | location | hum | temp | +-------------------------------+----------+------+------+ | 2024-09-24T11:36:58.831+08:00 | Prague | 41.8 | 27.5 | +-------------------------------+----------+------+------+ 2 rows in set (0.002 sec)
在控制台查看运行数据。在规则列表点击规则 ID,在运行统计页面可以查看到规则的统计以及此规则下所有动作的统计。