# 消息转换

消息转换允许您根据用户定义的规则修改和格式化消息，然后再进一步处理或发送给订阅者。该功能高度可定制，支持多种编码格式和高级转换。

## 工作流程

当一条消息发布时，它会经历以下工作流程：

1. **Schema 验证**：当消息发布并通过授权后，首先会进行 [Schema 验证](./schema-validation.md)。如果消息通过验证，则进入下一步。

2. **消息转换管道**：

   - **转换匹配**：根据消息的主题，将其与用户定义的转换规则列表进行匹配。可以为不同的主题或主题过滤器设置多个转换。
   - **转换执行**：匹配到的转换将按照用户配置的顺序依次执行。管道支持多种编码器和解码器，如 JSON、Protobuf 和 Avro，并允许使用 [Variform 表达式](../configuration/configuration.md#variform-表达式)来丰富或修改消息。
   - **转换后处理**：消息成功通过转换管道后，将继续进行下一步处理，如触发规则引擎或将消息分发给订阅者。

3. **故障处理**：如果转换失败，将执行用户配置的操作：

   - **丢弃消息**：终止发布并丢弃消息，对于 QoS 1 和 QoS 2 消息，通过 PUBACK 返回特定的原因代码（131 - 实现特定错误）。
   - **断开连接并丢弃消息**：丢弃消息并断开发布客户端的连接。
   - **忽略**：不执行任何额外操作。

   无论配置了何种操作，转换失败时都可以生成日志记录。用户可以配置日志的输出级别，默认级别为 `warning`。此外，转换失败还可以触发规则引擎事件（`$events/message_transformation/failed`），允许用户进行自定义处理，例如将错误的消息重新发布到其他主题或发送到 Kafka 进行进一步分析。

## 配置和使用消息转换

本节演示如何配置消息转换功能以及如何测试您的设置。

### 在 Dashboard 中配置消息转换

本节演示如何在 Dashboard 中创建和配置消息转换。

1. 进入 Dashboard，点击左侧导航菜单中的**数据智能中心** -> **消息转换**。

2. 在**消息转换**页面右上角点击**创建**。

3. 在创建消息转换页面，配置以下信息：
   - **名称**：输入转换的名称。

   - **消息来源主题**：设置需要转换消息的主题。可以设置多个主题或主题过滤器。

   - **备注**（可选）：输入任何备注信息。

   - **消息格式转换**：
     - **源格式**：指定进入转换管道时应用的有效 payload 解码格式。可选项包括:

       - `None`（不解码）
       - `JSON`
       - `Avro`
       - `Protobuf`
       - `自定义（外部 HTTP）`

       这些解码格式会将二进制输入 payload 转换为结构化映射。如果选择 `Avro`，`Protobuf` 或 `自定义（外部 HTTP）`，须确保该解码格式已经在 [Schema Registry ](./schema-registry.md)中创建。

       在具有多个转换的管道中，无需在每一步都进行解码。例如，如果转换 `T1` 已经解码了 payload，则后续的转换 `T2` 可以跳过解码，直接使用已正确格式化的 payload。

     - **目标格式**：指定当转换管道结束时，将最终的消息 payload 编码为二进制值的编码格式。编码格式选项与**源格式**相同。

       只有管道中的最后一个转换需要确保 payload 被编码为二进制值，中间的转换不需要处理二进制编码。

   - **消息属性转换**：
     - **属性**：指定转换后的值（由表达式生成）的写入目标位置。有效目标包括 `payload`、`topic`、`qos`、`retain`（设置相应的标志）以及 `user_property`（用于 `User-Property` MQTT 属性）。使用 `user_property` 时，必须指定一个具体的键（例如：`user_property.my_custom_prop`）。`payload` 可以按原样使用，覆盖整个消息 payload，或者指定一个嵌套键路径，将 payload 视为嵌套的 JSON 对象（例如：`payload.x.y`）。
     - **目标值**：定义将写入配置的属性的值。此值可以从其他字段复制，如 `qos`、`retain`、`topic`、`payload` 和 `payload.x.y`，也可以通过 [Variform表达式](../configuration/configuration.md#variform-表达式)生成。

   - **转换失败后的操作**：
     - **失败操作**：选择在转换失败时执行的操作：
       - **丢弃消息**：终止发布过程并丢弃消息，通过 PUBACK 返回 QoS 1 和 QoS 2 消息的特定原因代码。
       - **断开连接并丢弃消息**：丢弃消息并断开发布客户端的连接。
       - **忽略**：不执行任何额外操作。

   - **输出日志**：选择是否在转换失败时生成日志条目；日志记录默认启用。

   - **日志级别**：设置日志的输出级别，默认级别为 `warning`。

4. 点击**创建**完成设置。

您可以在创建转换前通过点击**预览**来测试您的转换。这将打开一个新窗口，您可以在其中输入传入消息的上下文信息，例如 QoS、payload、是否设置了保留标志、发布者的用户名和客户端 ID 等。提供必要的详细信息后，点击**运行转换**以指定的上下文运行转换，并查看结果输出。

创建转换后，它将默认启用并显示在消息转换页面的列表中。您可以根据需要禁用它，或通过点击**操作**列中的**设置**来更新转换设置。要删除转换或更改其位置，请点击**更多**。

### 在配置文件中配置消息转换

假设您收到一条 Avro 格式编码的消息，您希望将其解码为 JSON 格式。解码后，您想在将消息发送到规则引擎处理之前，将一个 `tenant` 属性（从发布客户端的客户端属性中检索）添加到主题前面。您可以使用以下配置来实现此转换：

```
message_transformation {
  transformations = [
    {
      name = mytransformation
      topics = ["t"]
      failure_action = drop
      payload_decoder = {type = avro, schema = myschema}
      payload_encoder = {type = json}
      operations = [
        {key = "topic", value = "concat([client_attrs.tenant, '/', topic])"}
      ]
    }
  ]
}
```

此配置指定了一个名为 `mytransformation` 的转换，其中：

- 使用指定的 schema 将消息 payload 从 Avro 格式**解码**。
- 将 payload **编码**为 JSON 格式。
- 将客户端属性中的 `tenant` 属性与原始主题**连接**，从而在进一步处理之前修改主题。

更多关于消息转换的配置详情，请参见[配置手册](https://docs.emqx.com/zh/enterprise/v6.2.0/hocon/)。

### REST API

有关通过 REST API 使用消息转换的详细信息，请参见 [EMQX 企业版 API](https://docs.emqx.com/zh/enterprise/v6.2/admin/api-docs.html)。

### 创建解码/编码模式

有关如何创建解码和编码模式的更多信息，请参阅 [Schema Registry](./schema-registry.md) 部分。

## 统计与指标

启用后，消息转换功能将在 Dashboard 上显示统计数据和指标。您可以点击消息转换页面中的转换名称，查看以下内容：

**统计数据**：

- **总计**：系统启动以来的触发总次数。
- **成功**：成功的数据转换次数。
- **失败**：失败的数据转换次数。

**速率指标**：

- 当前转换速度
- 最近 5 分钟的速度
- 历史最高速度

统计数据可以重置，并通过 `/prometheus/message_transformation` 路径在 Prometheus 中获取。
