Schema 管理
Schema 管理提供了一个集中管理和验证主题消息数据的模式,以及通过网络对数据进行序列化和反序列化的能力。MQTT 主题的发布者和订阅者可以使用 Schema 来确保数据的一致性和兼容性。Schema 管理是规则引擎的关键组成部分,可适配多种场景的设备接入和规则设计,有助于确保数据质量、遵守规范、提高应用程序开发效率和系统性能。
理解 Schema
Schema 模式定义了数据的结构。它定义了允许的数据类型、格式和关系。模式是数据的蓝图,描述数据记录的结构、单个字段的数据类型、字段之间的关系以及适用于数据的任何约束或规则。
Schema 可用于各种数据处理系统,包括数据库、消息服务以及分布式事件和数据处理框架。它们有助于确保数据的一致性和准确性,并能被不同的系统和应用程序有效地处理和分析。促进了不同系统和组织之间的数据共享和互操作性。
用户可以在 Schema 管理中定义 Schema,用户在规则中使用定义好的 Schema,再通过数据集成将客户端数据转发到不同的数据服务中。同时也可以将应用或者数据服务中的数据通过 Schema 发送给客户端实现双向的数据流转。
Schema 管理具有多种优势,包括数据验证、兼容性检查、版本控制和迭代演进。它还能简化数据管道的开发和维护,降低数据兼容性问题、数据损坏和数据丢失的风险。
创建 Schema
如果您已经开通数据智能中心,您可以通过点击部署左侧菜单中的数据智能中心 -> Schema 管理进入 Schema 管理页面。在该页面中,您可以创建 Schema。EMQX Platform 支持创建 Avro, Protobuf 和 JSON Schema 格式的 Schema。
点击页面右上角的新建按钮,进入新建 Schema 页面,设置以下字段:
名称:用于标识 Schema。
- 名称可用于 Schema 验证和数据转换规则中。
- 名称可用于数据集成的规则 SQL 编解码函数中,示例:
SELECT schema_encode("<name>", payload)
。
备注:可选,添加备注信息。
类型:从下拉菜单中选择 Schema 的类型,可选项为
Avro
、Protobuf
和JSON Schema
。Schema:输入对应类型的 Schema,示例:
- Avro:
json{ "type": "record", "name": "Device", "fields": [ { "name": "id", "type": "string" }, { "name": "temp", "type": "int" } ] }
- Protobuf:
protomessage Device { required string id = 1; required uint32 temp = 2; }
- JSON Schema,支持使用 JSON 数据生成 JSON Schema:
json{ "$schema": "http://json-schema.org/draft-06/schema#", "type": "object", "properties": { "temp": { "type": "integer" }, "id": { "type": "string" } }, "required": [ "temp", "id" ] }
点击确认创建成功后返回 Schema 列表页面。
创建外部 Schema
当您开通了智能数据中心,您可以通过 EMQX Platform 控制台直接配置外部 Schema,从而更加简便地管理您的 Schema 集成。
前提条件
在开始之前,您需要创建一个 VPC 对等连接。建立对等连接后,您可以通过内部网络 IP 登录平台控制台来访问外部 Schema。或者,您也可以设置一个 NAT 网关,通过公共 IP 访问外部 schema。
- 在部署的左侧菜单中点击 Schema 管理,然后在 Schema 管理页面选择外部 Schema 标签。
- 点击右上角的 + 新建 按钮,配置以下字段:
- 名称:输入一个外部 schema 名称,该名称将在编码和解码功能中使用。
- 类型:选择外部 schema 的类型。目前仅支持
Confluent
。 - URL:输入您的 Confluent Schema Registry (CSR) 的端点。
- 认证:如果选择
Basic auth
,请输入访问外部 schema 的认证凭证(用户名和密码)。
- 完成设置后,点击 确认。
在规则引擎中使用外部 Schema
配置外部 Schema 后,您可以在 EMQX 规则引擎中使用多个函数,利用外部 Schema 中存储的 Schema 对 payload 进行编码和解码。
TIP
外部 Schema 不能用于消息转换和模式验证。
配置的外部 Schema 可以在以下函数中使用:
avro_encode('my_external_registry', payload, my_schema_id)
avro_decode('my_external_registry', payload, my_schema_id)
schema_encode_and_tag('my_local_avro_schema', 'my_external_registry', payload, 'my_subject')
schema_decode_tagged('my_external_registry', payload)
函数使用示例
在下面所有函数使用示例中,使用了以下示例值和变量名:
my_external_registry
是您在 EMQX 中为外部 Schema Registry 指定的名称。my_schema_id
是注册在 CSR 中的 Schema ID(在 CSR 中始终是整数)。my_local_avro_schema
是在 EMQX 中配置的本地 Avro Schema 名称。my_subject
是在 CSR 中定义的主题名称。
avro_encode
avro_encode
使用外部 Schema Registry 中的 Schema ID 对 payload 进行编码。Schema 会在运行时动态获取,并缓存以供后续使用。在 Confluent Schema Registry 中,Schema ID 是整数。
提示
编码时,payload 必须是规则引擎的内部数据格式,即已解码的 Map。因此在示例中使用了 json_decode
。
示例:
select
-- 123 是在 CSR 中注册的 Schema ID
avro_encode('my_external_registry', json_decode(payload), 123) as encoded
from 't'
avro_decode
该函数根据外部 Schema Registry 中的 Schema ID 对 Avro payload 进行解码。Schema 会在运行时动态获取,并缓存以供后续操作。
示例:
select
-- 123 是在 CSR 中注册的 Schema ID
avro_decode('my_external_registry', payload, 123) as decoded
from 't'
schema_encode_and_tag
此函数使用本地注册的 Avro Schema、外部 CSR 的 Schema 名称和主题对 payload 进行编码,并将编码后的 payload(已为内部 Map 格式)标记为带有 Schema ID。Schema ID 是通过将本地 Schema 注册到 CSR 获得的。
示例:
select
schema_encode_and_tag(
'my_local_avro_schema',
'my_external_registry',
json_decode(payload),
'my_subject'
) as encoded
from 't'
schema_decode_tagged
此函数使用 CSR 名称对 payload 进行解码,假设该 payload 带有从 CSR 获取的 Schema ID。
select
schema_decode_tagged(
'my_external_registry',
payload
) as decoded
from 't'
管理 Schema
Schema 列表页面列出了所有已创建的 Schema。您可以对它们进行管理。
编辑 Schema
在 Schema 列表中,点击操作中的编辑图标,您可以修改备注、Schema 类型和 Schema。
删除 Schema
在 Schema 列表中,点击操作中的删除图标,二次确认后即可删除 Schema。