消息重新发布
通过 EMQX Platform 数据集成,可以在不编写代码的情况下,将满足某一特征的消息重新发布到其他主题上。您可以在 EMQX Platform 中创建规则、定义规则 SQL 筛选并处理来自源消息的数据,并为规则添加“消息重新发布”动作,实现将处理结果通过消息发布进行转发。
本页演示了如何通过创建数据集成以实现:当任合一条消息的 msg
包含 hello
字符串时,将此消息重新发布到 greet
主题。主要步骤包括:
- 创建规则为数据集成设置筛选条件。
- 为规则添加一个动作以进行消息重新发布。
- 完成数据集成创建,并进行测试。
通过数据集成配置消息重新发布功能无需添加任何连接器。以下章节介绍了具体的配置步骤。
创建规则
在数据集成页面中的数据转发服务分类下点击消息重新发布。如果您已经创建过其他的连接器,则点击新建连接器,然后在数据转发服务分类下选择消息重新发布。
在 SQL 编辑器中定义规则 SQL 以满足任何消息中只要
msg
中包含hello
字符串,就会触发引擎:- 在 FROM 子句中指定消息数据的来源。本演示针对所有的主题的消息,即
#
。 - 在 WHERE 子句中对消息 payload 中的
msg
进行正则匹配,含有hello
字符串再执行数据集成。
根据上面的原则定义的 SQL 示例如下:
sqlSELECT payload.msg as msg FROM "#" WHERE regex_match(msg, 'hello')
- 在 FROM 子句中指定消息数据的来源。本演示针对所有的主题的消息,即
可以点击 SQL 输入框下的 SQL 测试 ,填写数据:
- topic:
t/a
- payload:
json{ "msg":"hello test" }
点击测试,查看得到的数据结果,如果设置无误,测试输出框应该得到完整的 JSON 数据,如下:
json{ "msg":"hello test" }
测试输出与预期相符则可以进行后续步骤。
注意:如果无法通过测试,请检查 SQL 是否合规。
- topic:
添加动作
- 在创建规则页面上点击下一步,添加动作。
- 在创建动作步骤页中,配置以下信息:
- 使用连接器:使用默认选项
消息重新发布
。 - 主题:设置目标主题为
greet
。 - QoS 和 Retain: 使用默认值。
- Payload:在消息内容模板里填写
${msg} -- forward from EMQX Cloud
。 - 关于 MQTT 5.0 消息属性的选项,详见添加重新发布动作。
- 使用连接器:使用默认选项
- 点击确定完成配置。
测试消息重发布
推荐使用 MQTTX 模拟消息上报,同时您也可以使用其他任意客户端完成。
使用 MQTTX 连接到部署,并向
test
主题发送消息。json{ "msg": "hello" }
在规则列表中找到消息重新发布的规则,点击规则 ID 进入规则统计页面,在页面中可以看到相关的统计指标。
客户端订阅
greet
主题,可以看到如果msg
包含hello
,消息将被转发,如果不包含,则不会转发。