将 MQTT 数据传输到 RocketMQ
通过 RocketMQ 数据集成可以将 MQTT 消息和客户端事件转发到 RocketMQ 中。例如,可以通过事件触发转发消息到 RocketMQ 中,从而实现对诸如设备在线状态、上下线历史等的记录。
本页详细介绍了 EMQX Platform 与 RocketMQ 的数据集成并提供了实用的规则和动作创建指导。
工作原理
RocketMQ 数据集成是 EMQX Platform 中的一个开箱即用功能,它结合了 EMQX Platform 的设备接入以及实时数据捕获和传输能力与 RocketMQ 强大的消息队列处理能力。通过内置的规则引擎组件,该集成简化了将数据从 EMQX Platform 引入到 RocketMQ 进行存储和管理的过程,无需复杂编码。
下图展示了 EMQX Platform 与 RocketMQ 之间数据集成的典型架构:
将 MQTT 数据引入 RocketMQ 的过程如下:
- 消息发布和接收:工业物联网设备通过 MQTT 协议成功连接到 EMQX Platform,并向 EMQX Platform 发布实时 MQTT 数据。EMQX Platform 收到这些消息后,将启动其规则引擎中的匹配过程。
- 消息数据处理:当消息到达时,它会经过规则引擎,然后由 EMQX Platform 中定义的规则处理。这些规则基于预定义的标准,确定哪些消息需要路由到 RocketMQ。如果任何规则指定了有效载荷转换,那么将应用这些转换,例如转换数据格式、过滤特定信息或用额外的上下文丰富有效载荷。
- 数据传入到 RocketMQ:一旦规则处理了消息,它就会触发一个动作,将消息转发到 RocketMQ。处理后的数据将无缝写入 RocketMQ。
- 数据存储和利用:现在数据存储在 RocketMQ 中,企业可以利用其查询能力应用于各种用例。例如,在金融行业,RocketMQ 可以用作可靠的高性能消息队列来存储和管理来自支付终端、交易系统的数据,并将消息连接到数据分析和监管平台,实现风险管理、欺诈检测和预防、监管合规等要求。
特性与优势
RocketMQ 数据集成为您的业务带来了以下功能和优势:
- 可靠的物联网数据消息传递:EMQX Platform 能够可靠地批处理并发送 MQTT 消息到 RocketMQ,实现物联网设备与 RocketMQ 及应用系统的集成。
- MQTT 消息转换:使用规则引擎,EMQX Platform 可以过滤和转换 MQTT 消息。消息在发送到 RocketMQ 之前,可以进行数据提取、过滤、丰富和转换。
- 云原生弹性扩展:EMQX Platform 与 RocketMQ 都是基于云原生构建的应用,提供了友好的 K8s 支持以及云原生生态集成,能够无限弹性扩缩以适应业务的快速发展。
- 灵活的主题映射:RocketMQ 数据集成支持将 MQTT 主题灵活映射到 RocketMQ 主题,允许轻松配置 RocketMQ 消息中的键(Key)和值(Value)。
- 高吞吐量场景下的处理能力:RocketMQ 数据集成支持同步和异步写入模式,允许根据不同场景灵活平衡延迟和吞吐量。
准备工作
本节介绍了在 EMQX Platform 中创建 RocketMQ 数据集成之前需要做的准备工作,包括如何设置 RocketMQ 服务器。
前置准备
网络设置
开始之前,您需要在 EMQX Platform 上创建一个部署(EMQX 群集)并配置网络。
- 对于专有版和旗舰版部署用户: 请先创建 VPC 对等连接,创建完对等连接之后,可以通过内部网络 IP 登录 Platform Console 访问目标连接器。或者开通 NAT 网关,通过公网 IP 访问目标连接器。
- 对于 BYOC 部署用户: 请在部署 BYOC 的 VPC 和目标连接器所在的 VPC 之间建立对等连接,创建完对等连接之后,可以通过内部网络 IP 访问目标连接器。如果您需要通过公共 IP 地址访问资源,请在公共云控制台中为部署 BYOC 的 VPC 配置 NAT 网关。
安装 RocketMQ
- 准备一份 docker-compose 文件
rocketmq.yaml
来部署 RocketMQ。
version: '3.3'
services:
mqnamesrv:
image: apache/rocketmq:4.9.4
container_name: rocketmq_namesrv
ports:
- 9876:9876
volumes:
- ./rocketmq/logs:/opt/logs
- ./rocketmq/store:/opt/store
command: ./mqnamesrv
mqbroker:
image: apache/rocketmq:4.9.4
container_name: rocketmq_broker
ports:
- 10909:10909
- 10911:10911
volumes:
- ./rocketmq/logs:/opt/logs
- ./rocketmq/store:/opt/store
- ./rocketmq/conf/broker.conf:/etc/rocketmq/broker.conf
environment:
NAMESRV_ADDR: 'rocketmq_namesrv:9876'
JAVA_OPTS: ' -Duser.home=/opt'
JAVA_OPT_EXT: '-server -Xms1024m -Xmx1024m -Xmn1024m'
command: ./mqbroker -c /etc/rocketmq/broker.conf
depends_on:
- mqnamesrv
- 准备运行 RocketMQ 所需的文件夹和配置文件。
mkdir rocketmq
mkdir rocketmq/logs
mkdir rocketmq/store
mkdir rocketmq/conf
- 将下面的内容存入到
rocketmq/conf/broker.conf
文件中。
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
brokerIP1=这里需要填写你的真实 IP 地址
defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
listenPort=10911
deleteWhen=04
fileReservedTime=120
mapedFileSizeCommitLog=1073741824
mapedFileSizeConsumeQueue=300000
diskMaxUsedSpaceRatio=100
maxMessageSize=65536
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
- 启动 RocketMQ。
docker-compose -f rocketmq.yaml up
- 启动一个 RocketMQ 的消费者。
docker run --rm -e NAMESRV_ADDR=host.docker.internal:9876 apache/rocketmq:4.9.4 ./tools.sh org.apache.rocketmq.example.quickstart.Consumer
注意
如果是在 Linux 中,需要将 host.docker.internal
替换成您的真实 IP 地址。
创建连接器
在创建数据集成的规则之前,您需要先创建一个 RocketMQ 连接器用于访问 RocketMQ 服务。
在部署菜单中选择 数据集成,在数据转发分类下选择 RocketMQ。如果您已经创建了其他的连接器,点击新建连接器,然后在数据转发服务分类下选择 RocketMQ。
连接器名称:系统将自动生成一个连接器的名称。
填写连接相关配置:
- 服务器列表:输入 你想要连接的 RocketMQ 服务器地址(例如,localhost),如果 RocketMQ 服务器在远程运行,则填写实际服务器地址。主机名具有以下形式:Host[:Port]。如果未指定 [:Port],则使用 RocketMQ 默认端口 9876。
- AccessKey: RocketMQ 服务器的 accessKey。
- SecretKey: RocketMQ 服务器的 secretKey。
- 安全令牌: RocketMQ 服务器安全令牌
- 根据业务需求配置高级设置(可选)。
点击测试连接按钮,如果 RocketMQ 能够正常访问,则会返回连接器可用提示。
点击新建按钮完成连接器的创建。
接下来,您可以基于此连接器创建数据桥接规则。
创建规则
本节演示了如何创建 RocketMQ 数据集成的规则来指定需要转发至 RocketMQ 的数据并为规则添加触发的动作。
点击连接器列表操作列下的新建规则图标或在规则列表中点击新建规则进入新建规则步骤页。
在 SQL 编辑器中输入规则,客户端将温湿度消息发送到
temp_hum/emqx
主题时,就会触发引擎。这里需要对 SQL 进行一定的处理:
SELECT
timestamp as up_timestamp,
clientid as client_id,
payload.temp as temp,
payload.hum as hum
FROM
"temp_hum/emqx"
TIP
如果您初次使用 SQL,可以点击 SQL 示例和启用调试来学习和测试规则 SQL 的结果。
点击下一步开始创建动作。
从使用连接器下拉框中选择您之前创建的连接器。
完成消息从 EMQX Platform 到发布到 RocketMQ 的配置:
- RocketMQ 主题: 输入
emqx
- 消息模板: 模板, 默认为空,为空时将会将整个消息转发给 RocketMQ。 模板可以是任意带有占位符的合法字符串,列如本次使用以下内容作为消息模板
{"up_timestamp": ${up_timestamp}, "client_id": ${client_id}, "temp": ${temp}, "hum": ${hum}}
- RocketMQ 主题: 输入
展开高级设置,根据情况配置同步/异步模式,队列与批量等参数高级设置选项(可选)
点击确认按钮完成动作的配置。
在弹出的成功创建规则提示框中点击返回规则列表,从而完成了整个数据集成的配置链路。
测试规则
推荐使用 MQTTX 模拟温湿度数据上报,同时您也可以使用其他任意客户端完成。
使用 MQTTX 连接到 EMQX Platform 部署,并向以下 Topic 发送消息。
topic:
temp_hum/emqx
client id:
test_client
payload:
json{ "temp": "27.5", "hum": "41.8" }
在 RocketMQ 的消费者窗口,我们将看到下面的输出:
bashConsumeMessageThread_please_rename_unique_group_name_4_1 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=0, storeSize=208, queueOffset=0, sysFlag=0, bornTimestamp=1711354009076, bornHost=/121.43.165.169:48850, storeTimestamp=1711354009085, storeHost=/118.178.124.161:10911, msgId=76B27CA100002A9F000000000000058D, commitLogOffset=1421, bodyCRC=1137462344, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1711354066863, CLUSTER=DefaultCluster}, body=[123, 34, 117, 112, 95, 116, 105, 109, 101, 115, 116, 97, 109, 112, 34, 58, 49, 55, 49, 49, 51, 53, 52, 48, 48, 57, 48, 53, 54, 44, 34, 116, 101, 109, 112, 34, 58, 34, 50, 55, 46, 53, 34, 44, 34, 104, 117, 109, 34, 58, 34, 52, 49, 46, 56, 34, 44, 34, 99, 108, 105, 101, 110, 116, 95, 105, 100, 34, 58, 34, 109, 113, 116, 116, 120, 95, 97, 50, 97, 99, 102, 100, 49, 57, 34, 125], transactionId='null'}]]