将 MQTT 数据写入到 TDengine Cloud
TDengine Cloud 是一款全托管的时序数据处理云服务平台,基于开源时序数据库 TDengine 构建,专为物联网(IoT)、工业互联网等场景设计和优化。EMQX Platform 现已支持与 TDengine Cloud 的无缝集成,能够高效地将来自海量设备和数据采集器的 MQTT 数据传输至 TDengine Cloud 进行存储、分析和分发,实现对业务运行状态的实时监测、预警,并提供实时的商业洞察。
本页详细介绍了 EMQX Platform 与 TDengine Cloud 的数据集成并提供了实用的规则和动作创建指导。
工作原理
TDengine Cloud 数据集成是 EMQX Platform 的开箱即用功能,通过内置的规则引擎组件和动作将设备数据转发到 TDengine。通过 TDengine 动作,MQTT 消息和客户端事件可以存储在 TDengine 中。此外,数据更新或在 TDengine Cloud 中的删除操作可以由事件触发,从而实现对设备在线状态和历史上下线事件的记录。该集成简化了从 EMQX Platform 到 TDengine 的数据摄取过程,无需复杂编码。
下图展示了 EMQX Platform 和 TDengine 数据集成在工业物联网中的典型架构:
以工业能耗管理场景为例,工作流程如下:
- 消息发布和接收:工业设备通过 MQTT 协议成功连接到 EMQX Platform,并定期使用 MQTT 协议发布能耗数据。这些数据包括生产线标识符和能耗值。当 EMQX Platform 接收到这些消息时,它将在其规则引擎中启动匹配过程。
- 规则引擎处理消息:内置的规则引擎根据主题匹配处理来自特定来源的消息。当消息到达时,它通过规则引擎进行匹配,规则引擎将处理消息数据。这可能包括转换数据格式、过滤特定信息或用上下文信息丰富消息。
- 数据写入到 TDengine Cloud:规则引擎中定义的规则触发动作将消息写入 TDengine Cloud。TDengine Cloud 数据集成规则提供 SQL 模板,允许灵活定义数据格式,将特定消息字段写入 TDengine Cloud 中相应的表和列。
将能耗数据写入 TDengine Cloud后,您可以使用标准 SQL 和强大的时间序列扩展实时分析您的数据,无缝集成众多第三方批分析、实时分析、报表工具、AI/ML 工具、可视化工具。例如:
- 连接到如 Grafana 等可视化工具以生成图表并显示能耗数据。
- 连接到 ERP 或 Power BI 等应用系统进行生产分析和生产计划调整。
- 连接到业务系统以进行实时能源使用分析,促进以数据驱动的能源管理。
特性与优势
TDengine Cloud 数据集成为您的业务带来了以下功能和优势:
- 高性能海量物联网数据:EMQX Platform 可以高效处理大量物联网设备连接和消息吞吐量,TDengine Cloud 充分利用了时序数据特点,在数据写入、存储、查询方面表现优异,满足物联网场景下的数据处理需求,不会对系统造成过大压力。
- 消息转换:消息可以在 EMQX Platform 规则中进行丰富的处理和转换,然后写入 TDengine Cloud。
- 弹性扩与高可用性:TDengine Cloud 基于云原生架构,支持资源的弹性伸缩和高可用性,能够根据业务需求动态调整资源配置,确保服务的稳定运行。
- 高级查询与分析能力:TDengine Cloud 提供优化的查询引擎和丰富的时间序列函数,支持高效的时序数据查询和分析,帮助用户快速获取业务洞察。
准备工作
本节介绍了在 EMQX Platform 中创建 TDengine Cloud 数据集成之前需要做的准备工作,包括网络设置和部署 TDengine Cloud 服务。
前置准备
网络设置
对于 TDengine Cloud,目前只支持开通 NAT 网关,通过公网访问 TDengine Cloud 连接器。
部署 TDengine Cloud 服务
登录网址 https://cloud.taosdata.com/ 注册 TDengine Cloud 并创建服务。
完成实例部署后,进入实例,在左侧菜单栏中点击数据浏览器,执行以下语句创建数据库
test
。bash# 创建并选择数据库 CREATE DATABASE mqtt; use mqtt;
执行以下 SQL 语句创建数据表
t_mqtt_msg
,用于存储每条消息的发布者客户端 ID、主题、Payload 以及发布时间。sqlCREATE TABLE test.t_mqtt_msg ( ts timestamp, msgid NCHAR(64), mqtt_topic NCHAR(255), qos TINYINT, payload BINARY(1024), arrived timestamp );
创建连接器
在创建 TDengine Cloud 动作之前,您需要创建一个 TDengine Cloud 连接器,以便 EMQX Platform 与 TDengine Cloud 服务建立连接。
- 在 TDengine Cloud 的控制台页面中选中您的实例。
- 进入左侧的编程选项页,选中 REST API 连接方式,可得到对应的连接地址和 Token。
- 在部署菜单中选择数据集成,在数据持久化分类下选择 TDengine。如果您已经创建了其他的连接器,点击新建连接器,然后在数据持久化分类下选择 TDengine。
- 填写连接相关配置:
- 连接器名称:系统将自动生成一个连接器的名称。
- 服务器地址:填写 TDengine Cloud 给出的
TDENGINE_CLOUD_URL
的值,即:https://gw.***.cloud.tdengine.com
。 - 数据库:填写
mqtt
。 - 用户名:可以保留
root
,也可以保持为空。 - 密码:保持为空。
- Token:填写 TDengine Cloud 给出的
TDENGINE_CLOUD_TOKEN
的值。如:a2ba69cc6****f0c18cd
。 - 根据业务需求配置高级设置(可选)。
- 点击测试连接按钮,如果 TDengine Cloud 能够正常访问,则会返回连接器可用提示。
- 点击新建按钮完成连接器的创建。
接下来,您可以基于此连接器创建数据桥接规则。
创建规则
本节演示了如何创建 TDengine Cloud 数据集成的规则来指定需要持久化至 TDengine Cloud 的数据并为规则添加触发的动作。
点击连接器列表操作列下的新建规则图标或在规则列表中点击新建规则进入新建规则步骤页。
在 SQL 编辑器中输入规则,客户端将温湿度消息发送到
temp_hum/emqx
主题时,就会触发引擎。这里需要对 SQL 进行一定的处理:sqlSELECT *, now_timestamp('millisecond') as ts FROM "temp_hum/emqx"
TIP
如果您初次使用 SQL,可以点击 SQL 示例和启用调试来学习和测试规则 SQL 的结果。
点击下一步开始创建动作。
从使用连接器下拉框中选择您之前创建的连接器。
数据库名字:填写
mqtt
。配置 SQL 模板,可使用如下 SQL 完成数据插入,并支持通过 CSV 文件批量设置,详细请参考批量设置。
sqlINSERT INTO t_mqtt_msg(ts, msgid, mqtt_topic, qos, payload, arrived) VALUES (${ts}, '${id}', '${topic}', ${qos}, '${payload}', ${timestamp})
如果在模板中使用未定义的占位符变量,您可以切换未定义变量作为 NULL 开关(位于 SQL 模板 上方)来定义规则引擎的行为:
关闭(默认):规则引擎可以将字符串
undefined
插入数据库。启用:允许规则引擎在变量未定义时将
NULL
插入数据库。
TIP
如果可能,应始终启用此选项;关闭该选项仅用于确保向后兼容性。
高级配置(可选),根据情况配置同步/异步模式,队列与批量等参数。
点击确认按钮完成动作的配置。
在弹出的成功创建规则提示框中点击返回规则列表,从而完成了整个数据集成的配置链路。
批量设置
在 TDengine Cloud 中,一条数据可能包含数百个数据点,这使得编写 SQL 语句变得具有挑战性。为了解决这个问题,EMQX Platform 提供了批量设置 SQL 的功能。
当编辑 SQL 模板时,您可以使用批量设置功能,从 CSV 文件中导入要进行插入操作的字段。
点击 SQL 模板下方的批量设置按钮,打开导入批量设置弹窗。
根据指引,先下载批量设置模板文件,然后在模板文件中填入 Fields 键值对,默认的模板文件内容如下:
Field Value Char Value Remarks (Optional) ts now FALSE Example Remark msgid ${id} TRUE mqtt_topic ${topic} TRUE qos ${qos} FALSE temp ${payload.temp} FALSE hum ${payload.hum} FALSE status ${payload.status} FALSE - Field: 字段键,支持常量或 ${var} 格式的占位符。
- Value: 字段值,支持常量或 ${var} 格式的占位符。虽然 SQL 中要求字符类型需要通过引号包裹,但在模板文件中无需包裹引号,而是通过
Char Value
列来指定字段是否为字符类型。 - Char Value: 用于指定字段是否为字符类型,以便在导入生成 SQL 时为字段添加引号。如果字段是字符类型,则填写
TRUE
或1
,否则填写FALSE
或0
。 - Remarks: 仅用于 CSV 文件内字段的备注,无法导入到 EMQX Platform 中。
注意,批量设置 CSV 文件中数据不能超过 2048 行。
将填好的模板文件保存并上传到导入批量设置弹窗中,点击导入完成批量设置。
导入完成后,您可以在 SQL 模板 中对 SQL 进行进一步的调整,例如设置表名称,美化 SQL 等。
测试规则
推荐使用 MQTTX 模拟温湿度数据上报,同时您也可以使用其他任意客户端完成。
使用 MQTTX 连接到 EMQX Platform 部署,并向以下 Topic 发送消息。
topic:
temp_hum/emqx
client id:
test_client
payload:
json{ "temp": "27.5", "hum": "41.8" }
查看 TDengine Cloud 规则的动作统计,命中、动作成功次数均 +1。
在 TDengine Cloud 控制台左侧菜单中点击数据浏览器。
执行以下命令,您可以看到数据已经插入到 TDengine Cloud。
sqlselect * from test.t_mqtt_msg;