消息转换
提示
消息转换是 EMQX 企业版的功能。
概述
消息转换允许您根据用户定义的规则修改和格式化消息,然后再进一步处理或发送给订阅者。该功能高度可定制,支持多种编码格式和高级转换。
工作流程
当一条消息发布时,它会经历以下工作流程:
Schema 验证:当消息发布并通过授权后,首先会进行 Schema 验证。如果消息通过验证,则进入下一步。
消息转换管道:
- 转换匹配:根据消息的主题,将其与用户定义的转换规则列表进行匹配。可以为不同的主题或主题过滤器设置多个转换。
- 转换执行:匹配到的转换将按照用户配置的顺序依次执行。管道支持多种编码器和解码器,如 JSON、Protobuf 和 Avro,并允许使用 Variform 表达式来丰富或修改消息。
- 转换后处理:消息成功通过转换管道后,将继续进行下一步处理,如触发规则引擎或将消息分发给订阅者。
故障处理:如果转换失败,将执行用户配置的操作:
- 丢弃消息:终止发布并丢弃消息,对于 QoS 1 和 QoS 2 消息,通过 PUBACK 返回特定的原因代码(131 - 实现特定错误)。
- 断开连接并丢弃消息:丢弃消息并断开发布客户端的连接。
- 忽略:不执行任何额外操作。
无论配置了何种操作,转换失败时都可以生成日志记录。用户可以配置日志的输出级别,默认级别为
warning
。此外,转换失败还可以触发规则引擎事件($events/message_transformation_failed
),允许用户进行自定义处理,例如将错误的消息重新发布到其他主题或发送到 Kafka 进行进一步分析。
配置和使用消息转换
本节演示如何配置消息转换功能以及如何测试您的设置。
在 Dashboard 中配置消息转换
本节演示如何在 Dashboard 中创建和配置消息转换。
- 进入 Dashboard,点击左侧导航菜单中的集成 -> 消息转换。
- 在消息转换页面右上角点击创建。
- 在创建消息转换页面,配置以下信息:
- 名称:输入转换的名称。
- 消息来源主题:设置需要转换消息的主题。可以设置多个主题或主题过滤器。
- 备注(可选):输入任何备注信息。
- 消息格式转换:
- 源格式:指定进入转换管道时应用的有效 payload 解码格式。可选项包括
None
(不解码)、JSON
、Avro
或Protobuf
。这些解码格式会将二进制输入 payload 转换为结构化映射。如果选择Avro
或Protobuf
,则必须首先在 Schema Registry 中定义其模式。在具有多个转换的管道中,无需在每一步都进行解码。例如,如果转换T1
已经解码了 payload,则后续的转换T2
可以跳过解码,直接使用已正确格式化的 payload。 - 目标格式:指定当转换管道结束时,将最终的消息 payload 编码为二进制值的编码格式。编码格式选项与源格式相同:
None
、JSON
、Avro
或Protobuf
。只有管道中的最后一个转换需要确保 payload 被编码为二进制值,中间的转换不需要处理二进制编码。
- 源格式:指定进入转换管道时应用的有效 payload 解码格式。可选项包括
- 消息属性转换:
- 属性:指定转换后的值(由表达式生成)的写入目标位置。有效目标包括
payload
、topic
、qos
、retain
(设置相应的标志)以及user_property
(用于User-Property
MQTT 属性)。使用user_property
时,必须指定一个具体的键(例如:user_property.my_custom_prop
)。payload
可以按原样使用,覆盖整个消息 payload,或者指定一个嵌套键路径,将 payload 视为嵌套的 JSON 对象(例如:payload.x.y
)。 - 目标值:定义将写入配置的属性的值。此值可以从其他字段复制,如
qos
、retain
、topic
、payload
和payload.x.y
,也可以通过 Variform表达式生成。
- 属性:指定转换后的值(由表达式生成)的写入目标位置。有效目标包括
- 转换失败后的操作:
- 失败操作:选择在转换失败时执行的操作:
- 丢弃消息:终止发布过程并丢弃消息,通过 PUBACK 返回 QoS 1 和 QoS 2 消息的特定原因代码。
- 断开连接并丢弃消息:丢弃消息并断开发布客户端的连接。
- 忽略:不执行任何额外操作。
- 失败操作:选择在转换失败时执行的操作:
- 输出日志:选择是否在转换失败时生成日志条目;日志记录默认启用。
- 日志级别:设置日志的输出级别,默认级别为
warning
。
- 点击创建完成设置。
您可以在创建转换前通过点击预览来测试您的转换。这将打开一个新窗口,您可以在其中输入传入消息的上下文信息,例如 QoS、payload、是否设置了保留标志、发布者的用户名和客户端 ID 等。提供必要的详细信息后,点击运行转换以指定的上下文运行转换,并查看结果输出。
创建转换后,它将默认启用并显示在消息转换页面的列表中。您可以根据需要禁用它,或通过点击操作列中的设置来更新转换设置。要删除转换或更改其位置,请点击更多。
在配置文件中配置消息转换
假设您收到一条 Avro 格式编码的消息,您希望将其解码为 JSON 格式。解码后,您想在将消息发送到规则引擎处理之前,将一个 tenant
属性(从发布客户端的客户端属性中检索)添加到主题前面。您可以使用以下配置来实现此转换:
message_transformation {
transformations = [
{
name = mytransformation
topics = ["t"]
failure_action = drop
payload_decoder = {type = avro, schema = myschema}
payload_encoder = {type = json}
operations = [
{key = "topic", value = "concat([client_attrs.tenant, '/', topic])"}
]
}
]
}
此配置指定了一个名为 mytransformation
的转换,其中:
- 使用指定的 schema 将消息 payload 从 Avro 格式解码。
- 将 payload 编码为 JSON 格式。
- 将客户端属性中的
tenant
属性与原始主题连接,从而在进一步处理之前修改主题。
更多关于消息转换的配置详情,请参见配置手册。
REST API
有关通过 REST API 使用消息转换的详细信息,请参见 EMQX 企业版 API。
创建解码/编码模式
有关如何创建解码和编码模式的更多信息,请参阅 Schema Registry部分。
统计与指标
启用后,消息转换功能将在 Dashboard 上显示统计数据和指标。您可以点击消息转换页面中的转换名称,查看以下内容:
统计数据:
- 总计:系统启动以来的触发总次数。
- 成功:成功的数据转换次数。
- 失败:失败的数据转换次数。
速率指标:
- 当前转换速度
- 最近 5 分钟的速度
- 历史最高速度
统计数据可以重置,并通过 /prometheus/message_transformation
路径在 Prometheus 中获取。