Schema Registry
The Schema Registry provides a centralized schema for managing and validating topic message data and the ability to serialize and deserialize data over the network. Publishers and subscribers of MQTT topics can use the Schema to ensure data consistency and compatibility. The Schema Registry is a key component of the rule engine. It can be adapted to multiple scenarios of device access and rule design, helping to ensure data quality, compliance, application development efficiency, and system performance.
Understand Schema Registry
The Schema defines the structure of the data, including the allowed data types, formats, and relationships. It is a blueprint for data that describes the structure of a data record, the data types of individual fields, the relationships between fields, and any constraints or rules that apply to the data.
Schemas can be used in various data processing systems, including databases, messaging services, and distributed event and data processing frameworks. They help ensure that data is consistent and accurate and can be efficiently processed and analyzed by different systems and applications. Thus, they facilitate data sharing and interoperability between different systems and organizations.
Users can define Schema in the Schema Registry and then use the defined Schema in rules for forwarding client data to different data services through data integration. At the same time, you can also send the data in the application or data service to the client through the Schema to realize two-way data flow.
Schema Registry has multiple advantages, including data validation, compatibility checking, version control and iterative evolution. It also simplifies the development and maintenance of data pipelines and reduces the risk of data compatibility issues, data corruption and data loss.
Create Schema
If you have subscribed to the Smart Data Hub, you can access the Schema Registry page by clicking Smart Data Hub -> Schema Registry in the left menu of the deployment. On this page, you can create a Schema. The EMQX Platform supports creating Schemas in Avro, Protobuf, and JSON Schema formats.
Click the + New button in the top right corner of the Internal Scehma tab to go to the New Schema Registry page. Set the following fields:
Name: Used to identify the Schema.
- The name can be used in Schema validation and data transformation rules.
- The name can also be used in the SQL encoding and decoding functions for data integration rules, for example:
SELECT schema_encode("<name>", payload)
.
Description: Optional, add any description information.
Type: Select the Schema type from the dropdown menu. Available options are
Avro
,Protobuf
, andJSON Schema
.Schema: Enter the Schema corresponding to the selected type. Examples:
- Avro:
json{ "type": "record", "name": "Device", "fields": [ { "name": "id", "type": "string" }, { "name": "temp", "type": "int" } ] }
- Protobuf:
protomessage Device { required string id = 1; required uint32 temp = 2; }
- JSON Schema; you can generate JSON Schema from JSON data:
json{ "$schema": "http://json-schema.org/draft-06/schema#", "type": "object", "properties": { "temp": { "type": "integer" }, "id": { "type": "string" } }, "required": [ "temp", "id" ] }
After clicking Confirm, the schema will be successfully created, and you will be returned to the Schema Registry page.
External Schema Registry
EMQX Platform supports configuring an external Confluent Schema Registry (CSR). This feature allows users to dynamically retrieve schemas from external registries during rule processing, enabling efficient message encoding and decoding.
Create External Schema Registry
Once you have subscribed to the Smart Data Hub, you can configure an external schema registry directly through the EMQX Platform Console, making it easy to manage your schema integration.
Prerequisite
Before you start, you need to create a VPC Peering Connection. After establishing the peering connection, you can log in to the Platform Console via the internal network IP to access the external schema registry. Alternatively, set up a NAT Gateway to access the external schema registry through a public IP.
Click Schema Registry in the left menu of the deployment. Select the External Schema tab on the Schema Registry page.
Click the Create button at the upper right corner. Configure the following fields:
Name: Enter an external schema registry name that will be used in the encoding and decoding functions.
Type: Select the type of external schema registry. Currently, only
Confluent
is supported.URL: Enter the endpoint of your Confluent Schema Registry.
Authentication: If you select
Basic auth
, enter the authentication credentials (username and password) for accessing the external registry.
Click Create after you complete the settings.
Use External Schema Registry in Rule Engine
Once an external registry is configured, you can use several functions in the EMQX Platform rule engine to encode and decode payloads using the schemas stored in the external registry.
TIP
The external schema registry cannot be used in message transformation and schema validation.
The following functions utilize a configured external CSR:
avro_encode('my_external_registry', payload, my_schema_id)
avro_decode('my_external_registry', payload, my_schema_id)
schema_encode_and_tag('my_local_avro_schema', 'my_external_registry', payload, 'my_subject')
schema_decode_tagged('my_external_registry', payload)
Function Usage Examples
In all function usage examples below, the following example values and variable names are used:
my_external_registry
is the name you assigned to the external registry in EMQX Platform.my_schema_id
is the schema ID registered in the CSR (always an integer in CSR).my_local_avro_schema
is the name of a locally configured Avro schema in EMQX.my_subject
is the subject name defined in the CSR.
avro_encode
avro_encode
encodes a payload using the schema ID from the external registry. The schema is retrieved dynamically at runtime and cached for subsequent runs. In Confluent Schema Registry, schema IDs are integers.
TIP
When encoding, the payload must be in the internal data format of the rule engine, which is a decoded map. This is why json_decode
is used in the example.
Example:
select
-- 123 is the schema ID that is registered in CSR
avro_encode('my_external_registry', json_decode(payload), 123) as encoded
from 't'
avro_decode
This function decodes an Avro payload based on the specified schema ID from the external registry. The schema is dynamically fetched during runtime and cached for subsequent operations.
Example:
select
-- 123 is the schema ID that is registered in CSR
avro_decode('my_external_registry', payload, 123) as decoded
from 't'
schema_encode_and_tag
This function uses a locally registered Avro schema, an external CSR schema name, and a subject to encode a payload (already in internal map format), and to tag the resulting payload with a schema ID. The schema ID comes from registering the local schema to CSR.
Example:
select
schema_encode_and_tag(
'my_local_avro_schema',
'my_external_registry',
json_decode(payload),
'my_subject'
) as encoded
from 't'
schema_decode_tagged
This function uses a CSR name to decode a payload, assuming it is tagged with the schema ID retrieved from CSR.
select
schema_decode_tagged(
'my_external_registry',
payload
) as decoded
from 't'
Manage Schema
The Schema Registry page shows all created Schemas. You can manage them here.
Edit Schema
In the Schema list, click the edit icon under Actions to modify the description, Schema type, and Schema.
Delete Schema
In the Schema list, click the delete icon under Actions, confirm the deletion, and the Schema will be deleted.