Schema Registry Example - Protobuf
Note
Schema Registry is an EMQX Enterprise feature.
This page demonstrates how the schema registry and rule engine support message encoding and decoding in Protobuf format.
Decoding Scenario
A device publishes a binary message encoded using Protobuf, which needs to be matched by the rule engine and then republished to the topic associated with the name
field. The format of the topic is person/${name}
.
For example, you to republish a message with the name
field equal to "Shawn" to the topic person/Shawn
.
Create Schema
Go to the Dashboard, select Integration -> Schema from the left navigation menu.
Create a Protobuf schema using the following parameters:
Name:
protobuf_person
Type:
Protobuf
Schema:
protobufmessage Person { required string name = 1; required int32 id = 2; optional string email = 3; }
Click Create.
Create Rule
In the Dashboard, select Integration -> Rules from the navigation menu.
On the Rules page, click Create at the top right corner.
Use the schema you have just created to write the rule SQL statement:
sqlSELECT schema_decode('protobuf_person', payload, 'Person') as person, payload FROM "t/#" WHERE person.name = 'Shawn'
The key point here is
schema_decode('protobuf_person', payload, 'Person')
:- The
schema_decode
function decodes the contents of the payload field according to the schemaprotobuf_person
; as person
stores the decoded value in the variableperson
;- The last argument
Person
specifies that the message type in the payload is thePerson
type defined in the Protobuf schema.
- The
Click Add Action. Select
Republish
from the drop-down list of the Action field.In the Topic field, type
person/${person.name}
as the destination topic.In the Payload field, type message content template:
${person}
.
This action sends the decoded "person" message to the topic person/${person.name}
in JSON format. ${person.name}
is a variable placeholder that will be replaced at runtime with the value of the name
field from the decoded message.
Prepare Device-Side Code
Once the rule is created, you can simulate the data for testing.
The following code uses the Python language to fill a user message, encode it as binary data, then send it to the t/1
topic. See full code for details.
def publish_msg(client):
p = person_pb2.Person()
p.id = 1
p.name = "Shawn"
p.email = "shawn@example.com"
message = p.SerializeToString()
topic = "t/1"
print("publish to topic: t/1, payload:", message)
client.publish(topic, payload=message, qos=0, retain=False)
Check Rule Execution Results
- In the Dashboard, select Diagnose -> WebSocket Client.
- Fill in the connection information for the current EMQX instance.
- If you run EMQX locally, you can use the default value.
- If you have changed EMQX's default configuration. For example, the configuration change on authentication can require you to type in a username and password.
Click Connect to connect to the EMQX instance as an MQTT client.
In the Subscription area, type
person/#
in the Topic field and click Subscribe.Install the Python dependencies and execute the device-side code:
shell$ pip3 install protobuf paho-mqtt $ protoc --python_out=. person.proto $ python3 protobuf_mqtt.py Connected with result code 0 publish to topic: t/1, payload: b'\n\x05Shawn\x10\x01\x1a\x11shawn@example.com'
Check that a message with the topic
person/Shawn
is received on the Websocket side:json{"name":"Shawn","id":1,"email":"shawn@example.com"}
Encoding Scenario
A device subscribes to a topic protobuf_out
expecting a binary message encoded using Protobuf. The rule engine is used to encode such a message and publish it to the associated topic.
Create Schema
Use the same schema as described in the decoding scenario.
Create Rule
In the Dashboard, select Integration -> Rules from the navigation menu.
On the Rules page, click Create at the top right corner.
Use the schema you have just created to write the rule SQL statement:
sqlSELECT schema_encode('protobuf_person', json_decode(payload), 'Person') as protobuf_person FROM "protobuf_in"
The key point here is
schema_encode('protobuf_person', json_decode(payload), 'Person')
:- The
schema_encode
function encodes the contents of the payload field according to the schemaprotobuf_person
; as protobuf_person
stores the encoded value in the variableprotobuf_person
;- The last argument
Person
specifies that the message type in the payload is thePerson
type defined in the Protobuf schema. json_decode(payload)
is needed becausepayload
is generally a JSON-encoded binary, andschema_encode
requires a Map as its input.
- The
Click Add Action. Select
Republish
from the drop-down list of the Action field.In the Topic field, type
protobuf_out
as the destination topic.In the Payload field, type message content template:
${protobuf_person}
.
This action sends the Protobuf encoded user message to the topic protobuf_out
. ${protobuf_person}
is a variable placeholder that will be replaced at runtime with the value of the result of schema_encode
(a binary value).
Prepare Device-Side Code
Once the rules have been created, you can simulate the data for testing.
The following code uses the Python language to fill a user message, encode it as binary data, then send it to the protobuf_in
topic. See full code for details.
def on_message(client, userdata, msg):
print("msg payload", msg.payload)
p = person_pb2.Person()
p.ParseFromString(msg.payload)
print(msg.topic+" "+str(p))
Check Rule Execution Results
- In the Dashboard, select Diagnose -> WebSocket Client.
- Fill in the connection information for the current EMQX instance.
- If you run EMQX locally, you can use the default value.
- If you have changed EMQX's default configuration. For example, the configuration change on authentication can require you to type in a username and password.
Click Connect to connect to the EMQX instance as an MQTT client.
In the Publish area, type
protobuf_in
in the Topic field and type the following message in the Payload field:json{"name":"Shawn","id":1,"email":"shawn@example.com"}
Click Publish.
Install the Python dependencies and execute the device-side code:
shell$ pip3 install protobuf paho-mqtt $ python3 protobuf_mqtt_sub.py Connected with result code 0 msg payload b'\n\x05Shawn\x10\x01\x1a\x11shawn@example.com' protobuf_out name: "Shawn" id: 1 email: "shawn@example.com"