スキーマレジストリの例 - Protobuf
このページでは、スキーマレジストリとルールエンジンがProtobuf形式のメッセージのエンコードおよびデコードをどのようにサポートするかを示します。
デコードシナリオ
デバイスがProtobufでエンコードされたバイナリメッセージをパブリッシュし、そのメッセージをルールエンジンでマッチさせて、nameフィールドに対応するトピックに再パブリッシュする必要があります。トピックの形式は person/${name} です。
例えば、nameフィールドが「Shawn」のメッセージをトピック person/Shawn に再パブリッシュします。
スキーマの作成
ダッシュボードの左側ナビゲーションメニューから Integration -> Schema を選択します。
以下のパラメータでProtobufスキーマを作成します:
Name:
protobuf_personType:
ProtobufSchema:
protobufmessage Person { required string name = 1; required int32 id = 2; optional string email = 3; }
Create をクリックします。

ルールの作成
ダッシュボードのナビゲーションメニューから Integration -> Rules を選択します。
Rules ページの右上にある Create をクリックします。
作成したスキーマを使って以下のルールSQL文を記述します:
sqlSELECT schema_decode('protobuf_person', payload, 'Person') as person, payload FROM "t/#" WHERE person.name = 'Shawn'ここでのポイントは
schema_decode('protobuf_person', payload, 'Person')です:schema_decode関数は、protobuf_personスキーマに従ってペイロードの内容をデコードします;as personはデコードした値を変数personに格納します;- 最後の引数
Personは、ペイロード内のメッセージタイプがProtobufスキーマで定義されたPerson型であることを指定します。
Add Action をクリックし、Action フィールドのドロップダウンリストから
Republishを選択します。Topic フィールドに宛先トピックとして
person/${person.name}と入力します。Payload フィールドにメッセージコンテンツテンプレートとして
${person}と入力します。
このアクションは、デコードされた "person" メッセージをJSON形式でトピック person/${person.name} に送信します。${person.name} は変数プレースホルダーで、実行時にデコードされたメッセージの name フィールドの値に置き換えられます。
デバイス側コードの準備
ルールを作成したら、テスト用のデータをシミュレートできます。
以下のコードはPython言語を使い、ユーザーメッセージを作成してバイナリデータにエンコードし、t/1 トピックに送信します。詳細はフルコードを参照してください。
def publish_msg(client):
p = person_pb2.Person()
p.id = 1
p.name = "Shawn"
p.email = "shawn@example.com"
message = p.SerializeToString()
topic = "t/1"
print("publish to topic: t/1, payload:", message)
client.publish(topic, payload=message, qos=0, retain=False)ルール実行結果の確認
ダッシュボードで Diagnose -> WebSocket Client を選択します。
現在のEMQXインスタンスの接続情報を入力します。
- ローカルでEMQXを実行している場合はデフォルト値を使用できます。
- 認証設定などEMQXのデフォルト設定を変更している場合は、ユーザー名やパスワードの入力が必要になることがあります。
Connect をクリックしてEMQXインスタンスにMQTTクライアントとして接続します。
Subscription エリアの Topic フィールドに
person/#と入力し、Subscribe をクリックします。Pythonの依存パッケージをインストールし、デバイス側コードを実行します:
shell$ pip3 install protobuf paho-mqtt $ protoc --python_out=. person.proto $ python3 protobuf_mqtt.py Connected with result code 0 publish to topic: t/1, payload: b'\n\x05Shawn\x10\x01\x1a\x11shawn@example.com'WebSocket側でトピック
person/Shawnのメッセージを受信していることを確認します:json{"name":"Shawn","id":1,"email":"shawn@example.com"}
エンコードシナリオ
デバイスが protobuf_out トピックをサブスクライブし、Protobufでエンコードされたバイナリメッセージを受信することを期待しています。ルールエンジンを使ってそのようなメッセージをエンコードし、関連トピックにパブリッシュします。
スキーマの作成
デコードシナリオで説明したスキーマと同じものを使用します。
ルールの作成
ダッシュボードのナビゲーションメニューから Integration -> Rules を選択します。
Rules ページの右上にある Create をクリックします。
作成したスキーマを使って以下のルールSQL文を記述します:
sqlSELECT schema_encode('protobuf_person', json_decode(payload), 'Person') as protobuf_person FROM "protobuf_in"ここでのポイントは
schema_encode('protobuf_person', json_decode(payload), 'Person')です:schema_encode関数は、protobuf_personスキーマに従ってペイロードの内容をエンコードします;as protobuf_personはエンコードした値を変数protobuf_personに格納します;- 最後の引数
Personは、ペイロード内のメッセージタイプがProtobufスキーマで定義されたPerson型であることを指定します; json_decode(payload)は、payloadが一般的にJSONエンコードされたバイナリであるため、schema_encodeの入力にMap型を渡すために必要です。
Add Action をクリックし、Action フィールドのドロップダウンリストから
Republishを選択します。Topic フィールドに宛先トピックとして
protobuf_outと入力します。Payload フィールドにメッセージコンテンツテンプレートとして
${protobuf_person}と入力します。
このアクションは、Protobufでエンコードされたユーザーメッセージをトピック protobuf_out に送信します。${protobuf_person} は変数プレースホルダーで、実行時に schema_encode の結果(バイナリ値)に置き換えられます。
デバイス側コードの準備
ルールを作成したら、テスト用のデータをシミュレートできます。
以下のコードはPython言語を使い、ユーザーメッセージを作成してバイナリデータにエンコードし、protobuf_in トピックに送信します。詳細はフルコードを参照してください。
def on_message(client, userdata, msg):
print("msg payload", msg.payload)
p = person_pb2.Person()
p.ParseFromString(msg.payload)
print(msg.topic+" "+str(p))ルール実行結果の確認
ダッシュボードで Diagnose -> WebSocket Client を選択します。
現在のEMQXインスタンスの接続情報を入力します。
- ローカルでEMQXを実行している場合はデフォルト値を使用できます。
- 認証設定などEMQXのデフォルト設定を変更している場合は、ユーザー名やパスワードの入力が必要になることがあります。
Connect をクリックしてEMQXインスタンスにMQTTクライアントとして接続します。
Publish エリアの Topic フィールドに
protobuf_inと入力し、Payload フィールドに以下のメッセージを入力します:json{"name":"Shawn","id":1,"email":"shawn@example.com"}Publish をクリックします。
Pythonの依存パッケージをインストールし、デバイス側コードを実行します:
shell$ pip3 install protobuf paho-mqtt $ python3 protobuf_mqtt_sub.py Connected with result code 0 msg payload b'\n\x05Shawn\x10\x01\x1a\x11shawn@example.com' protobuf_out name: "Shawn" id: 1 email: "shawn@example.com"