Skip to content

消息转换

消息转换允许您自定义规则修改和格式化消息,然后再进一步处理或发送给订阅者。该功能基于 Schema 的动态消息转换,高度可定制,支持多种编码格式和高级转换,满足复杂的下游数据处理需求。

工作流程

当一条消息发布时,它会经历以下工作流程:

  1. Schema 验证:当消息发布并通过授权后,首先会进行 Schema 验证。如果消息通过验证,则进入下一步。

  2. 消息转换管道

    • 转换匹配:根据消息的主题,将其与用户定义的转换规则列表进行匹配。可以为不同的主题或主题过滤器设置多个转换。
    • 转换执行:匹配到的转换将按照用户配置的顺序依次执行。管道支持多种编码器和解码器,如 JSON、Protobuf 和 Avro,并允许使用 Variform 表达式来丰富或修改消息。
    • 转换后处理:消息成功通过转换管道后,将继续进行下一步处理,如触发规则引擎或将消息分发给订阅者。
  3. 故障处理:如果转换失败,将执行用户配置的操作:

    • 丢弃消息:终止发布并丢弃消息,对于 QoS 1 和 QoS 2 消息,通过 PUBACK 返回特定的原因代码(131 - 实现特定错误)。
    • 断开连接并丢弃消息:丢弃消息并断开发布客户端的连接。
    • 忽略:不做额外的操作。

    无论转换失败后是什么操作,部署日志中都将打印一条转换失败的信息。您可以在消息转换规则中设置日志的输出级别,默认是 warning 级别。

    另外,转换失败还可以触发数据集成中规则 SQL 的验证失败事件,事件主题是 $events/message_transformation_failed,用户可以捕获这个事件进行自定义处理,例如将错误的消息发布到另一个主题、或发送到 Kafka 中进行分析。

创建消息转换规则

如果您已经开通数据智能中心,您可以通过点击部署左侧菜单中的数据智能中心 -> 消息转换进入消息转换页面。在该页面中,您可以创建消息转换规则。

  1. 在消息转换页面中点击右上角的新建按钮,进入新建消息转换页面,并进行以下配置:

    • 名称:输入转换的名称,用于标识一条消息转换规则。

    • 备注:可选,为消息转换规则添加备注信息。

    • 消息来源主题:设置需要被转换消息的主题。可以设置多个主题或主题过滤器。

    • 消息格式转换

      • 源格式:指定进入转换管道时应用的有效 payload 解码格式。可选项包括 None(不解码)、JSONAvroProtobuf。这些解码格式会将二进制输入 payload 转换为结构化映射。如果选择 AvroProtobuf,则必须首先在 Schema 管理中定义其 Schema。在设置了多个转换规则的转换管道中,无需在每一步都进行解码。例如,如果转换 T1 已经解码了 payload,则后续的转换 T2 可以跳过解码,直接使用已正确格式化的 payload。
      • 目标格式:指定当转换管道结束时,将最终的消息 payload 编码为二进制值的编码格式。编码格式选项与源格式相同:None(不编码)、JSONAvroProtobuf。只有管道中的最后一个转换需要确保 payload 被编码为二进制值,中间的转换不需要处理二进制编码。

      TIP

      如果源格式和目标格式相同,则不会执行消息格式转换。Avro/Protobuf 格式和无(None)格式不能相互转换。详细的格式转换设置规则,参考消息格式转换设置规则

    • 消息属性转换

      • 属性:指定转换后的值(由表达式生成)的写入目标位置。有效目标包括 payloadtopicqosretain(设置相应的标志)以及 user_property(用于 User-Property MQTT 属性)。

        使用 user_property 时,必须指定一个具体的键(如:user_property.my_custom_prop)。

        payload 可以按原样使用,覆盖整个消息 payload,或者指定一个嵌套键路径,将 payload 视为嵌套的 JSON 对象(例如:payload.x.y)。

      • 目标值:定义将写入配置的属性的值。此值可以从其他字段复制,如 qosretaintopicpayloadpayload.x.y,也可以通过 Variform 表达式生成。

      TIP

      参考消息属性转换设置规则进行设置。

    • 转换失败后的操作

      • 失败操作:选择在转换失败时执行的操作:
        • 丢弃消息:终止发布过程并丢弃消息,通过 PUBACK 返回 QoS 1 和 QoS 2 消息的特定原因代码。
        • 断开连接并丢弃消息:丢弃消息并断开发布客户端的连接。
        • 忽略:不做额外的操作。
      • 日志级别:定义输出验证失败部署日志的级别,可以选择 warningerror
  2. (可选)您可以在创建转换前通过点击预览来测试您的转换。这将打开一个新弹框,您可以在其中输入传入消息的上下文信息,例如 QoS、payload、是否设置了保留标志、发布者的用户名和客户端 ID 等。提供必要的详细信息后,点击运行转换以指定的上下文运行转换,并查看结果输出。

  3. 点击确认完成设置。

创建转换规则后,它将默认启用并显示在消息转换页面的列表中。您可以对已创建的消息转换进行管理,详见管理消息转换

消息格式转换设置规则

备注 (允许设置); (不允许设置)

源格式 \ 目标格式JSONAvroProtobufNone
JSON
Avro
Protobuf
None

消息属性转换设置规则

备注(支持设置所有属性); (不支持设置)

user_property 不能被整个设置,但可以操作 user_property 的子属性。

源格式 \ 目标格式JSONAvroProtobufNone
JSON
Avro无法直接设置 payload,可以为payload添加子属性无法直接设置 payload,可以为payload添加子属性无法直接设置 payload,可以为payload添加子属性
Protobuf无法直接设置 payload,可以为payload添加子属性无法直接设置 payload,可以为payload添加子属性无法直接设置 payload,可以为payload添加子属性
None可以直接设置 payload,无法为payload添加子属性可以直接设置 payload,无法为payload添加子属性

管理消息转换规则

您可以在消息转换规则列表页面进行以下管理操作:

  • 启用/停用消息转换:在是否启用列点击开关按钮。
  • 编辑消息转换规则:在操作列中点击“编辑”图标,进入编辑页面进行修改并保存。
  • 删除消息转换规则:在操作列中点击“更多”图标,在下拉选项中点击删除,二次确认后即可删除。
  • 调整消息转换规则顺序:可以使用鼠标拖动列表中的一行表格进行上下排序,或者使用操作列中更多选项中的快捷移动按钮。

查看统计指标

点击消息转换列表中的名称进入详情页面,您可以查看到消息转换执行相关的统计指标:

  • 统计指标
    • 执行次数:消息转换规则启用以来触发的总次数。
    • 成功:消息转换成功的次数。
    • 失败:消息转换失败的次数。
  • 速率统计
    • 当前速率:最近 1 分钟内的转换速度。
    • 最近 5 分钟速率:最近 5 分钟内的转换速度。
    • 最大速率:历史最大转换速度。