Skip to content

Schema 管理

Schema 管理提供了一个集中管理和验证主题消息数据的模式,以及通过网络对数据进行序列化和反序列化的能力。MQTT 主题的发布者和订阅者可以使用 Schema 来确保数据的一致性和兼容性。Schema 管理是规则引擎的关键组成部分,可适配多种场景的设备接入和规则设计,有助于确保数据质量、遵守规范、提高应用程序开发效率和系统性能。

理解 Schema

Schema 模式定义了数据的结构。它定义了允许的数据类型、格式和关系。模式是数据的蓝图,描述数据记录的结构、单个字段的数据类型、字段之间的关系以及适用于数据的任何约束或规则。

Schema 可用于各种数据处理系统,包括数据库、消息服务以及分布式事件和数据处理框架。它们有助于确保数据的一致性和准确性,并能被不同的系统和应用程序有效地处理和分析。促进了不同系统和组织之间的数据共享和互操作性。

用户可以在 Schema 管理中定义 Schema,用户在规则中使用定义好的 Schema,再通过数据集成将客户端数据转发到不同的数据服务中。同时也可以将应用或者数据服务中的数据通过 Schema 发送给客户端实现双向的数据流转。

schema_pic

Schema 管理具有多种优势,包括数据验证、兼容性检查、版本控制和迭代演进。它还能简化数据管道的开发和维护,降低数据兼容性问题、数据损坏和数据丢失的风险。

创建 Schema

如果您已经开通数据智能中心,您可以通过点击部署左侧菜单中的数据智能中心 -> Schema 管理进入 Schema 管理页面。在该页面中,您可以创建 Schema。EMQX Platform 支持创建 Avro, Protobuf 和 JSON Schema 格式的 Schema。

  1. 点击页面右上角的新建按钮,进入新建 Schema 页面,设置以下字段:

    • 名称:用于标识 Schema。

      • 名称可用于 Schema 验证和数据转换规则中。
      • 名称可用于数据集成的规则 SQL 编解码函数中,示例:SELECT schema_encode("<name>", payload)
    • 备注:可选,添加备注信息。

    • 类型:从下拉菜单中选择 Schema 的类型,可选项为 AvroProtobufJSON Schema

    • Schema:输入对应类型的 Schema,示例:

      • Avro:
      json
      {
        "type": "record",
        "name": "Device",
        "fields": [
          { "name": "id", "type": "string" },
          { "name": "temp", "type": "int" }
        ]
      }
      • Protobuf:
      proto
      message Device {
        required string id = 1;
        required uint32 temp = 2;
      }
      • JSON Schema,支持使用 JSON 数据生成 JSON Schema:

      generate_JSON_schema

      json
      {
          "$schema": "http://json-schema.org/draft-06/schema#",
          "type": "object",
          "properties": {
            "temp": {
              "type": "integer"
            },
            "id": {
              "type": "string"
            }
          },
         "required": [
            "temp",
            "id"
         ]
      }
  2. 点击确认创建成功后返回 Schema 列表页面。

创建外部 Schema

当您开通了智能数据中心,您可以通过 EMQX Platform 控制台直接配置外部 Schema,从而更加简便地管理您的 Schema 集成。

前提条件

在开始之前,您需要创建一个 VPC 对等连接。建立对等连接后,您可以通过内部网络 IP 登录平台控制台来访问外部 Schema。或者,您也可以设置一个 NAT 网关,通过公共 IP 访问外部 schema。

  1. 在部署的左侧菜单中点击 Schema 管理,然后在 Schema 管理页面选择外部 Schema 标签。
  2. 点击右上角的 + 新建 按钮,配置以下字段:
    • 名称:输入一个外部 schema 名称,该名称将在编码和解码功能中使用。
    • 类型:选择外部 schema 的类型。目前仅支持 Confluent
    • URL:输入您的 Confluent Schema Registry (CSR) 的端点。
    • 认证:如果选择 Basic auth,请输入访问外部 schema 的认证凭证(用户名和密码)。
  3. 完成设置后,点击 确认

在规则引擎中使用外部 Schema

配置外部 Schema 后,您可以在 EMQX 规则引擎中使用多个函数,利用外部 Schema 中存储的 Schema 对 payload 进行编码和解码。

TIP

外部 Schema 不能用于消息转换和模式验证。

配置的外部 Schema 可以在以下函数中使用:

sql
avro_encode('my_external_registry', payload, my_schema_id)
avro_decode('my_external_registry', payload, my_schema_id)
schema_encode_and_tag('my_local_avro_schema', 'my_external_registry', payload, 'my_subject')
schema_decode_tagged('my_external_registry', payload)

函数使用示例

在下面所有函数使用示例中,使用了以下示例值和变量名:

  • my_external_registry 是您在 EMQX 中为外部 Schema Registry 指定的名称。
  • my_schema_id 是注册在 CSR 中的 Schema ID(在 CSR 中始终是整数)。
  • my_local_avro_schema 是在 EMQX 中配置的本地 Avro Schema 名称。
  • my_subject 是在 CSR 中定义的主题名称。
avro_encode

avro_encode 使用外部 Schema Registry 中的 Schema ID 对 payload 进行编码。Schema 会在运行时动态获取,并缓存以供后续使用。在 Confluent Schema Registry 中,Schema ID 是整数。

提示

编码时,payload 必须是规则引擎的内部数据格式,即已解码的 Map。因此在示例中使用了 json_decode

示例:

sql
select
  -- 123 是在 CSR 中注册的 Schema ID
  avro_encode('my_external_registry', json_decode(payload), 123) as encoded
from 't'
avro_decode

该函数根据外部 Schema Registry 中的 Schema ID 对 Avro payload 进行解码。Schema 会在运行时动态获取,并缓存以供后续操作。

示例:

sql
select
  -- 123 是在 CSR 中注册的 Schema ID
  avro_decode('my_external_registry', payload, 123) as decoded
from 't'
schema_encode_and_tag

此函数使用本地注册的 Avro Schema、外部 CSR 的 Schema 名称和主题对 payload 进行编码,并将编码后的 payload(已为内部 Map 格式)标记为带有 Schema ID。Schema ID 是通过将本地 Schema 注册到 CSR 获得的。

示例:

sql
select
  schema_encode_and_tag(
    'my_local_avro_schema',
    'my_external_registry',
    json_decode(payload),
    'my_subject'
  ) as encoded
from 't'
schema_decode_tagged

此函数使用 CSR 名称对 payload 进行解码,假设该 payload 带有从 CSR 获取的 Schema ID。

sql
select
  schema_decode_tagged(
    'my_external_registry',
    payload
  ) as decoded
from 't'

管理 Schema

Schema 列表页面列出了所有已创建的 Schema。您可以对它们进行管理。

编辑 Schema

在 Schema 列表中,点击操作中的编辑图标,您可以修改备注、Schema 类型和 Schema。

删除 Schema

在 Schema 列表中,点击操作中的删除图标,二次确认后即可删除 Schema。