Skip to content

消息重新发布

通过 EMQX Cloud 数据集成,可以在不编写代码的情况下,将满足某一特征的消息重新发布到其他主题上。您可以在 EMQX Cloud 中创建规则、定义规则 SQL 筛选并处理来自源消息的数据,并为规则添加“消息重新发布”动作,实现将处理结果通过消息发布进行转发。

本页演示了如何通过创建数据集成以实现:当任何一条消息的 msg 包含 hello 字符串时,将此消息重新发布到 greet 主题。主要步骤包括:

  1. 创建规则为数据集成设置筛选条件。
  2. 为规则添加一个动作以进行消息重新发布。
  3. 完成数据集成创建,并进行测试。

通过数据集成配置消息重新发布功能无需添加任何连接器。以下章节介绍了具体的配置步骤。

创建规则

  1. 数据集成页面中的数据转发服务分类下点击消息重新发布。如果您已经创建过其他的连接器,则点击新建连接器,然后在数据转发服务分类下选择消息重新发布

  2. SQL 编辑器中定义规则 SQL 以满足任何消息中只要 msg 中包含 hello 字符串,就会触发引擎:

    • 在 FROM 子句中指定消息数据的来源。本演示针对所有的主题的消息,即 #
    • 在 WHERE 子句中对消息 payload 中的 msg 进行正则匹配,含有 hello 字符串再执行数据集成。

    根据上面的原则定义的 SQL 示例如下:

    sql
    SELECT
      payload.msg as msg
    FROM
      "#"
    WHERE  
      regex_match(msg, 'hello')
  3. 可以点击 SQL 输入框下的 SQL 测试 ,填写数据:

    • topic: t/a
    • payload:
    json
    {
      "msg":"hello test"
    }

    点击测试,查看得到的数据结果,如果设置无误,测试输出框应该得到完整的 JSON 数据,如下:

    json
    {
      "msg":"hello test"
    }

    测试输出与预期相符则可以进行后续步骤。

    注意:如果无法通过测试,请检查 SQL 是否合规。

添加动作

  1. 在创建规则页面上点击下一步,添加动作。

  2. 创建动作步骤页中,配置以下参数:

    • 使用连接器:使用默认选项消息重新发布
    • 主题:设置目标主题为 greet
    • Payload:在消息内容模板里填写 ${msg} -- forward from EMQX Cloud
    • QoSRetain: 使用默认值。
  3. 关于 MQTT 5.0 消息属性的选项,详见 MQTT 5.0 消息属性

  4. 如果您需要启用直接派发,请点击切换开关。启用后,消息将直接派发给订阅者。这有助于防止触发额外的规则或重复激活相同的规则。

  5. 点击确定完成配置。

  6. 成功创建规则对话框中点击返回规则列表,然后参考测试消息重发布中的步骤测试规则。或者,您也可以点击测试规则,在页面中数据测试数据进行规则测试,详见测试规则

测试消息重发布

推荐使用 MQTTX 模拟消息上报,同时您也可以使用其他任意客户端完成。

  1. 使用 MQTTX 连接到部署,并向 test 主题发送消息。

    json
    {
      "msg": "hello"
    }
  2. 在规则列表中找到消息重新发布的规则,点击规则 ID 进入规则统计页面,在页面中可以看到相关的统计指标。点击重置按钮可以重置指标数据。

    TIP

    Serverless 部署不支持重置指标。

    重新发布

  3. 客户端订阅 greet 主题,可以看到如果 msg 包含 hello,消息将被转发,如果不包含,则不会转发。 重新发布

动作参数说明

参数说明
主题设置重新发布消息的目标主题。支持使用 ${field} 语法动态拼接主题,在输入框中输入 $ 可从下拉列表中选择可用变量。
QoS设置重新发布消息的服务质量等级。
Retain设置是否将此消息作为保留消息转发。
Payload设置重新发布消息的消息体内容。支持使用 ${field} 语法引用规则 SQL 输出字段,在编辑器中输入 $ 可从下拉列表中选择可用变量。输入 ${payload} 表示重新发布的消息将与原始消息具有相同的 Payload,不进行任何修改。
MQTT 5.0 消息属性点击切换开关以根据需要配置消息属性,允许您为重新发布的消息添加丰富的消息元数据描述。详见下方说明。
直接派发启用后,消息将直接发送给订阅者,避免触发额外的规则或导致同一规则的递归激活等意外行为。

MQTT 5.0 消息属性

参数说明
有效载荷指示器指示消息格式。值设置为 false 时,消息是未确定的字节;设置为 true 时,意味着消息体中的有效载荷是 UTF-8 编码的字符数据。
消息过期时间输入一个值(以秒为单位)以指定消息应在经过一段时间后过期,如果未传递给预期的接收方,则被视为无效。
内容类型指定重新发布消息中的载荷内容的类型或格式(MIME 类型),例如 text/plain 表示文本文件,application/json 表示 JSON 格式的数据。
响应主题输入要将响应消息发布到的特定 MQTT 主题。
对比数据输入一个唯一标识符或数据,以将响应消息与原始请求消息相关联。