Skip to content

消息转换

提示

消息转换是 EMQX 企业版的功能。

概述

消息转换允许您根据用户定义的规则修改和格式化消息,然后再进一步处理或发送给订阅者。该功能高度可定制,支持多种编码格式和高级转换。

工作流程

当一条消息发布时,它会经历以下工作流程:

  1. Schema 验证:当消息发布并通过授权后,首先会进行 Schema 验证。如果消息通过验证,则进入下一步。

  2. 消息转换管道

    • 转换匹配:根据消息的主题,将其与用户定义的转换规则列表进行匹配。可以为不同的主题或主题过滤器设置多个转换。
    • 转换执行:匹配到的转换将按照用户配置的顺序依次执行。管道支持多种编码器和解码器,如 JSON、Protobuf 和 Avro,并允许使用 Variform 表达式来丰富或修改消息。
    • 转换后处理:消息成功通过转换管道后,将继续进行下一步处理,如触发规则引擎或将消息分发给订阅者。
  3. 故障处理:如果转换失败,将执行用户配置的操作:

    • 丢弃消息:终止发布并丢弃消息,对于 QoS 1 和 QoS 2 消息,通过 PUBACK 返回特定的原因代码(131 - 实现特定错误)。
    • 断开连接并丢弃消息:丢弃消息并断开发布客户端的连接。
    • 忽略:不执行任何额外操作。

    无论配置了何种操作,转换失败时都可以生成日志记录。用户可以配置日志的输出级别,默认级别为 warning。此外,转换失败还可以触发规则引擎事件($events/message_transformation_failed),允许用户进行自定义处理,例如将错误的消息重新发布到其他主题或发送到 Kafka 进行进一步分析。

配置和使用消息转换

本节演示如何配置消息转换功能以及如何测试您的设置。

在 Dashboard 中配置消息转换

本节演示如何在 Dashboard 中创建和配置消息转换。

  1. 进入 Dashboard,点击左侧导航菜单中的集成 -> 消息转换
  2. 消息转换页面右上角点击创建
  3. 在创建消息转换页面,配置以下信息:
    • 名称:输入转换的名称。
    • 消息来源主题:设置需要转换消息的主题。可以设置多个主题或主题过滤器。
    • 备注(可选):输入任何备注信息。
    • 消息格式转换
      • 源格式:指定进入转换管道时应用的有效 payload 解码格式。可选项包括 None(不解码)、JSONAvroProtobuf。这些解码格式会将二进制输入 payload 转换为结构化映射。如果选择 AvroProtobuf,则必须首先在 Schema Registry 中定义其模式。在具有多个转换的管道中,无需在每一步都进行解码。例如,如果转换 T1 已经解码了 payload,则后续的转换 T2 可以跳过解码,直接使用已正确格式化的 payload。
      • 目标格式:指定当转换管道结束时,将最终的消息 payload 编码为二进制值的编码格式。编码格式选项与源格式相同:NoneJSONAvroProtobuf。只有管道中的最后一个转换需要确保 payload 被编码为二进制值,中间的转换不需要处理二进制编码。
    • 消息属性转换
      • 属性:指定转换后的值(由表达式生成)的写入目标位置。有效目标包括 payloadtopicqosretain(设置相应的标志)以及 user_property(用于 User-Property MQTT 属性)。使用 user_property 时,必须指定一个具体的键(例如:user_property.my_custom_prop)。payload 可以按原样使用,覆盖整个消息 payload,或者指定一个嵌套键路径,将 payload 视为嵌套的 JSON 对象(例如:payload.x.y)。
      • 目标值:定义将写入配置的属性的值。此值可以从其他字段复制,如 qosretaintopicpayloadpayload.x.y,也可以通过 Variform表达式生成。
    • 转换失败后的操作
      • 失败操作:选择在转换失败时执行的操作:
        • 丢弃消息:终止发布过程并丢弃消息,通过 PUBACK 返回 QoS 1 和 QoS 2 消息的特定原因代码。
        • 断开连接并丢弃消息:丢弃消息并断开发布客户端的连接。
        • 忽略:不执行任何额外操作。
    • 输出日志:选择是否在转换失败时生成日志条目;日志记录默认启用。
    • 日志级别:设置日志的输出级别,默认级别为 warning
  4. 点击创建完成设置。

您可以在创建转换前通过点击预览来测试您的转换。这将打开一个新窗口,您可以在其中输入传入消息的上下文信息,例如 QoS、payload、是否设置了保留标志、发布者的用户名和客户端 ID 等。提供必要的详细信息后,点击运行转换以指定的上下文运行转换,并查看结果输出。

创建转换后,它将默认启用并显示在消息转换页面的列表中。您可以根据需要禁用它,或通过点击操作列中的设置来更新转换设置。要删除转换或更改其位置,请点击更多

在配置文件中配置消息转换

假设您收到一条 Avro 格式编码的消息,您希望将其解码为 JSON 格式。解码后,您想在将消息发送到规则引擎处理之前,将一个 tenant 属性(从发布客户端的客户端属性中检索)添加到主题前面。您可以使用以下配置来实现此转换:

message_transformation {
  transformations = [
    {
      name = mytransformation
      topics = ["t"]
      failure_action = drop
      payload_decoder = {type = avro, schema = myschema}
      payload_encoder = {type = json}
      operations = [
        {key = "topic", value = "concat([client_attrs.tenant, '/', topic])"}
      ]
    }
  ]
}

此配置指定了一个名为 mytransformation 的转换,其中:

  • 使用指定的 schema 将消息 payload 从 Avro 格式解码
  • 将 payload 编码为 JSON 格式。
  • 将客户端属性中的 tenant 属性与原始主题连接,从而在进一步处理之前修改主题。

更多关于消息转换的配置详情,请参见配置手册

REST API

有关通过 REST API 使用消息转换的详细信息,请参见 EMQX 企业版 API

创建解码/编码模式

有关如何创建解码和编码模式的更多信息,请参阅 Schema Registry部分。

统计与指标

启用后,消息转换功能将在 Dashboard 上显示统计数据和指标。您可以点击消息转换页面中的转换名称,查看以下内容:

统计数据

  • 总计:系统启动以来的触发总次数。
  • 成功:成功的数据转换次数。
  • 失败:失败的数据转换次数。

速率指标

  • 当前转换速度
  • 最近 5 分钟的速度
  • 历史最高速度

统计数据可以重置,并通过 /prometheus/message_transformation 路径在 Prometheus 中获取。