# Schema 管理

Schema 管理提供了一个集中管理和验证主题消息数据的模式，以及通过网络对数据进行序列化和反序列化的能力。MQTT 主题的发布者和订阅者可以使用 Schema 来确保数据的一致性和兼容性。Schema 管理是规则引擎的关键组成部分，可适配多种场景的设备接入和规则设计，有助于确保数据质量、遵守规范、提高应用程序开发效率和系统性能。

## 理解 Schema

Schema 模式定义了数据的结构。它定义了允许的数据类型、格式和关系。模式是数据的蓝图，描述数据记录的结构、单个字段的数据类型、字段之间的关系以及适用于数据的任何约束或规则。

Schema 可用于各种数据处理系统，包括数据库、消息服务以及分布式事件和数据处理框架。它们有助于确保数据的一致性和准确性，并能被不同的系统和应用程序有效地处理和分析。促进了不同系统和组织之间的数据共享和互操作性。

用户可以在 Schema 管理中定义 Schema，用户在规则中使用定义好的 Schema，再通过数据集成将客户端数据转发到不同的数据服务中。同时也可以将应用或者数据服务中的数据通过 Schema 发送给客户端实现双向的数据流转。

![schema_pic](./_assets/schema_pic.png)

Schema 管理具有多种优势，包括数据验证、兼容性检查、版本控制和迭代演进。它还能简化数据管道的开发和维护，降低数据兼容性问题、数据损坏和数据丢失的风险。

## 创建 Schema

如果您已经开通数据智能中心，您可以通过点击部署左侧菜单中的**数据智能中心** -> **Schema 管理**进入 Schema 管理页面。在该页面中，您可以创建 Schema。EMQX Cloud 支持创建 Avro, Protobuf 和 JSON Schema 格式的 Schema。

1. 点击页面右上角的**新建**按钮，进入新建 Schema 页面，设置以下字段：

   - **名称**：用于标识 Schema。
     - 名称可用于 Schema 验证和数据转换规则中。
     - 名称可用于数据集成的规则 SQL 编解码函数中，示例：`SELECT schema_encode("<name>", payload)`。

   - **备注**：可选，添加备注信息。

   - **类型**：从下拉菜单中选择 Schema 的类型，可选项为 `Avro`、`Protobuf`、 `JSON Schema`，和 `外部 HTTP`。

   - **Schema**：如果您选择 `Avro`、`Protobuf` 或 `JSON Schema` 作为 Schema 类型，需输入对应类型的 Schema，示例：

     - Avro：

      ```json
      {
        "type": "record",
        "name": "Device",
        "fields": [
          { "name": "id", "type": "string" },
          { "name": "temp", "type": "int" }
        ]
      }
      ```

     - Protobuf：

      ```proto
      message Device {
        required string id = 1;
        required uint32 temp = 2;
      }
      ```

     - JSON Schema，支持使用 JSON 数据生成 JSON Schema：

     ![generate_JSON_schema](./_assets/generate_JSON_schema.png)

      ```json
      {
          "$schema": "http://json-schema.org/draft-06/schema#",
          "type": "object",
          "properties": {
            "temp": {
              "type": "integer"
            },
            "id": {
              "type": "string"
            }
          },
         "required": [
            "temp",
            "id"
         ]
      }

   - **URL**：如果您选择 `外部 HTTP`，需填写外部 HTTP 服务编解码 URL。有关创建一个外部 HTTP Schema 的具体指导，参考[编解码举例 - 外部 HTTP](./schema-registry-example-external-http)。

2. 点击**确认**创建成功后返回 Schema 列表页面。

## 创建外部 Schema

当您开通了智能数据中心，您可以通过 EMQX Cloud 控制台直接配置外部 Schema，从而更加简便地管理您的 Schema 集成。

::: tip 前提条件

在开始之前，您需要创建一个 [VPC 对等连接](../deployments/vpc_peering)。建立对等连接后，您可以通过内部网络 IP 登录平台控制台来访问外部 Schema。或者，您也可以设置一个 [NAT 网关](../vas/nat-gateway)，通过公共 IP 访问外部 schema。

:::

1. 在部署的左侧菜单中点击 **Schema 管理**，然后在 Schema 管理页面选择**外部 Schema** 标签。
2. 点击右上角的 **+ 新建** 按钮，配置以下字段：
3. - **名称**：输入一个外部 schema 名称，该名称将在编码和解码功能中使用。
   - **类型**：选择外部 schema 的类型。目前仅支持 `Confluent`。
   - **URL**：输入您的 Confluent Schema Registry (CSR) 的端点。
   - **认证**：如果选择 `Basic auth`，请输入访问外部 schema 的认证凭证（用户名和密码）。
4. 完成设置后，点击 **确认**。

### 在规则引擎中使用外部 Schema

配置外部 Schema 后，您可以在 EMQX 规则引擎中使用多个函数，利用外部 Schema 中存储的 Schema 对 payload 进行编码和解码。

::: tip

外部 Schema 不能用于消息转换和模式验证。

:::

配置的外部 Schema 可以在以下函数中使用：

```sql
avro_encode('my_external_registry', payload, my_schema_id)
avro_decode('my_external_registry', payload, my_schema_id)
schema_encode_and_tag('my_local_avro_schema', 'my_external_registry', payload, 'my_subject')
schema_decode_tagged('my_external_registry', payload)
```

#### 函数使用示例

在下面所有函数使用示例中，使用了以下示例值和变量名：

- `my_external_registry` 是您在 EMQX 中为外部 Schema Registry 指定的名称。
- `my_schema_id` 是注册在 CSR 中的 Schema ID（在 CSR 中始终是整数）。
- `my_local_avro_schema` 是在 EMQX 中配置的本地 Avro Schema 名称。
- `my_subject` 是在 CSR 中定义的主题名称。

##### `avro_encode`

`avro_encode` 使用外部 Schema Registry 中的 Schema ID 对 payload 进行编码。Schema 会在运行时动态获取，并缓存以供后续使用。在 Confluent Schema Registry 中，Schema ID 是整数。

提示

编码时，payload 必须是规则引擎的内部数据格式，即已解码的 Map。因此在示例中使用了 `json_decode`。

示例：

```sql
select
  -- 123 是在 CSR 中注册的 Schema ID
  avro_encode('my_external_registry', json_decode(payload), 123) as encoded
from 't'
```

##### `avro_decode`

该函数根据外部 Schema Registry 中的 Schema ID 对 Avro payload 进行解码。Schema 会在运行时动态获取，并缓存以供后续操作。

示例：

```sql
select
  -- 123 是在 CSR 中注册的 Schema ID
  avro_decode('my_external_registry', payload, 123) as decoded
from 't'
```

##### `schema_encode_and_tag`

此函数使用本地注册的 Avro Schema、外部 CSR 的 Schema 名称和主题对 payload 进行编码，并将编码后的 payload（已为内部 Map 格式）标记为带有 Schema ID。Schema ID 是通过将本地 Schema 注册到 CSR 获得的。

示例：

```sql
select
  schema_encode_and_tag(
    'my_local_avro_schema',
    'my_external_registry',
    json_decode(payload),
    'my_subject'
  ) as encoded
from 't'
```

##### `schema_decode_tagged`

此函数使用 CSR 名称对 payload 进行解码，假设该 payload 带有从 CSR 获取的 Schema ID。

```sql
select
  schema_decode_tagged(
    'my_external_registry',
    payload
  ) as decoded
from 't'
```

## 管理 Schema

Schema 列表页面列出了所有已创建的 Schema。您可以对它们进行管理。

### 编辑 Schema

在 Schema 列表中，点击**操作**中的编辑图标，您可以修改备注、Schema 类型和 Schema。

### 删除 Schema

在 Schema 列表中，点击**操作**中的删除图标，二次确认后即可删除 Schema。
