# 消息转换

消息转换允许您自定义规则修改和格式化消息，然后再进一步处理或发送给订阅者。该功能基于 Schema 的动态消息转换，高度可定制，支持多种编码格式和高级转换，满足复杂的下游数据处理需求。

## 工作流程

当一条消息发布时，它会经历以下工作流程：

1. **Schema 验证**：当消息发布并通过授权后，首先会进行 [Schema 验证](./schema_validation.md)。如果消息通过验证，则进入下一步。
2. **消息转换管道**：
   - **转换匹配**：根据消息的主题，将其与用户定义的转换规则列表进行匹配。可以为不同的主题或主题过滤器设置多个转换。
   - **转换执行**：匹配到的转换将按照用户配置的顺序依次执行。管道支持多种编码器和解码器，如 JSON、Protobuf 和 Avro，并允许使用 [Variform 表达式](https://docs.emqx.com/zh/emqx/latest/configuration/configuration.html#variform-表达式)来丰富或修改消息。
   - **转换后处理**：消息成功通过转换管道后，将继续进行下一步处理，如触发规则引擎或将消息分发给订阅者。
3. **故障处理**：如果转换失败，将执行用户配置的操作：
   - **丢弃消息**：终止发布并丢弃消息，对于 QoS 1 和 QoS 2 消息，通过 PUBACK 返回特定的原因代码（131 - 实现特定错误）。
   - **断开连接并丢弃消息**：丢弃消息并断开发布客户端的连接。
   - **忽略**：不做额外的操作。

   无论转换失败后是什么操作，部署日志中都将打印一条转换失败的信息。您可以在消息转换规则中设置日志的输出级别，默认是 `warning` 级别。

   另外，转换失败还可以触发数据集成中规则 SQL 的验证失败事件，事件主题是 `$events/message_transformation_failed`，用户可以捕获这个事件进行自定义处理，例如将错误的消息发布到另一个主题、或发送到 Kafka 中进行分析。

## 创建消息转换规则

如果您已经开通数据智能中心，您可以通过点击部署左侧菜单中的**数据智能中心** -> **消息转换**进入消息转换页面。在该页面中，您可以创建消息转换规则。

1. 在消息转换页面中点击右上角的**新建**按钮，进入新建消息转换页面，并进行以下配置：

   - **名称**：输入转换的名称，用于标识一条消息转换规则。

   - **备注**：可选，为消息转换规则添加备注信息。

   - **消息来源主题**：设置需要被转换消息的主题。可以设置多个主题或主题过滤器。

   - **消息格式转换**：

     - **源格式**：指定进入转换管道时应用的有效 payload 解码格式。可选项包括 `None`（不解码）、`JSON`、`Avro` 或 `Protobuf`。这些解码格式会将二进制输入 payload 转换为结构化映射。如果选择 `Avro` 或 `Protobuf`，则必须首先在 [Schema 管理](./schema_registry.md)中定义其 Schema。在设置了多个转换规则的转换管道中，无需在每一步都进行解码。例如，如果转换 `T1` 已经解码了 payload，则后续的转换 `T2` 可以跳过解码，直接使用已正确格式化的 payload。
     - **目标格式**：指定当转换管道结束时，将最终的消息 payload 编码为二进制值的编码格式。编码格式选项与**源格式**相同：`None`（不编码）、`JSON`、`Avro` 或 `Protobuf`。只有管道中的最后一个转换需要确保 payload 被编码为二进制值，中间的转换不需要处理二进制编码。

     ::: tip

     如果源格式和目标格式相同，则不会执行消息格式转换。Avro/Protobuf 格式和无（None）格式不能相互转换。详细的格式转换设置规则，参考[消息格式转换设置规则](#消息格式转换设置规则)。

     :::

   - **消息属性转换**：

     - **属性**：指定转换后的值（由表达式生成）的写入目标位置。有效目标包括 `payload`、`topic`、`qos`、`retain`（设置相应的标志）以及 `user_property`（用于 `User-Property` MQTT 属性）。

       使用 `user_property` 时，必须指定一个具体的键（如：`user_property.my_custom_prop`）。

       `payload` 可以按原样使用，覆盖整个消息 payload，或者指定一个嵌套键路径，将 payload 视为嵌套的 JSON 对象（例如：`payload.x.y`）。

     - **目标值**：定义将写入配置的属性的值。此值可以从其他字段复制，如 `qos`、`retain`、`topic`、`payload` 和 `payload.x.y`，也可以通过 [Variform 表达式](https://docs.emqx.com/zh/emqx/latest/configuration/configuration.html#variform-表达式)生成。

     ::: tip

     参考[消息属性转换设置规则](#消息属性转换设置规则)进行设置。

     :::

   - **转换失败后的操作**：
     - **失败操作**：选择在转换失败时执行的操作：
       - **丢弃消息**：终止发布过程并丢弃消息，通过 PUBACK 返回 QoS 1 和 QoS 2 消息的特定原因代码。
       - **断开连接并丢弃消息**：丢弃消息并断开发布客户端的连接。
       - **忽略**：不做额外的操作。
     - **日志级别**：定义输出验证失败部署日志的级别，可以选择 `warning` 或 `error`。

2. (可选）您可以在创建转换前通过点击**预览**来测试您的转换。这将打开一个新弹框，您可以在其中输入传入消息的上下文信息，例如 QoS、payload、是否设置了保留标志、发布者的用户名和客户端 ID 等。提供必要的详细信息后，点击**运行转换**以指定的上下文运行转换，并查看结果输出。
3. 点击**确认**完成设置。

创建转换规则后，它将默认启用并显示在消息转换页面的列表中。您可以对已创建的消息转换进行管理，详见[管理消息转换](#管理消息转换)。

### 消息格式转换设置规则

**备注**： <code>✓</code> （允许设置）；<code>✗</code> （不允许设置）

| **源格式 \ 目标格式** | **JSON** | **Avro** | **Protobuf** | **None** |
| :-------------------- | :------- | :------- | :----------- | :------- |
| **JSON**              | ✓        | ✓        | ✓            | ✓        |
| **Avro**              | ✓        | ✓        | ✓            | ✗        |
| **Protobuf**          | ✓        | ✓        | ✓            | ✗        |
| **None**              | ✓        | ✗        | ✗            | ✓        |

### 消息属性转换设置规则

**备注**： <code>✓</code>（支持设置所有属性）； <code>✗</code>（不支持设置）

> `user_property` 不能被整个设置，但可以操作 `user_property` 的子属性。

| **源格式 \ 目标格式** | **JSON**                                          | **Avro**                                          | **Protobuf**                                      | **None**                                          |
| :-------------------- | :------------------------------------------------ | :------------------------------------------------ | :------------------------------------------------ | :------------------------------------------------ |
| **JSON**              | ✓                                                 | ✓                                                 | ✓                                                 | ✓                                                 |
| **Avro**              | 无法直接设置 `payload`，可以为`payload`添加子属性 | 无法直接设置 `payload`，可以为`payload`添加子属性 | 无法直接设置 `payload`，可以为`payload`添加子属性 | ✗                                                 |
| **Protobuf**          | 无法直接设置 `payload`，可以为`payload`添加子属性 | 无法直接设置 `payload`，可以为`payload`添加子属性 | 无法直接设置 `payload`，可以为`payload`添加子属性 | ✗                                                 |
| **None**              | 可以直接设置 `payload`，无法为`payload`添加子属性 | ✗                                                 | ✗                                                 | 可以直接设置 `payload`，无法为`payload`添加子属性 |

## 管理消息转换规则

您可以在消息转换规则列表页面进行以下管理操作：

- 启用/停用消息转换：在**是否启用**列点击开关按钮。
- 编辑消息转换规则：在**操作**列中点击“编辑”图标，进入编辑页面进行修改并保存。
- 删除消息转换规则：在**操作**列中点击“更多”图标，在下拉选项中点击**删除**，二次确认后即可删除。
- 调整消息转换规则顺序：可以使用鼠标拖动列表中的一行表格进行上下排序，或者使用**操作**列中**更多选项**中的快捷移动按钮。

## 查看统计指标

点击消息转换列表中的名称进入详情页面，您可以查看到消息转换执行相关的统计指标：

- **统计指标**：
  - **执行次数**：消息转换规则启用以来触发的总次数。
  - **成功**：消息转换成功的次数。
  - **失败**：消息转换失败的次数。
- **速率统计**：
  - **当前速率**：最近 1 分钟内的转换速度。
  - **最近 5 分钟速率**：最近 5 分钟内的转换速度。
  - **最大速率**：历史最大转换速度。
