スキーマレジストリの例 - Protobuf
このページでは、スキーマレジストリとルールエンジンがProtobuf形式のメッセージのエンコードおよびデコードをどのようにサポートするかを示します。
デコードシナリオ
デバイスがProtobufでエンコードされたバイナリメッセージをパブリッシュし、そのメッセージをルールエンジンでマッチさせて、name
フィールドに対応するトピックに再パブリッシュする必要があります。トピックの形式は person/${name}
です。
例えば、name
フィールドが「Shawn」のメッセージをトピック person/Shawn
に再パブリッシュします。
スキーマの作成
ダッシュボードの左側ナビゲーションメニューから Integration -> Schema を選択します。
以下のパラメータでProtobufスキーマを作成します:
Name:
protobuf_person
Type:
Protobuf
Schema:
protobufmessage Person { required string name = 1; required int32 id = 2; optional string email = 3; }
Create をクリックします。
ルールの作成
ダッシュボードのナビゲーションメニューから Integration -> Rules を選択します。
Rules ページの右上にある Create をクリックします。
作成したスキーマを使って以下のルールSQL文を記述します:
sqlSELECT schema_decode('protobuf_person', payload, 'Person') as person, payload FROM "t/#" WHERE person.name = 'Shawn'
ここでのポイントは
schema_decode('protobuf_person', payload, 'Person')
です:schema_decode
関数は、protobuf_person
スキーマに従ってペイロードの内容をデコードします;as person
はデコードした値を変数person
に格納します;- 最後の引数
Person
は、ペイロード内のメッセージタイプがProtobufスキーマで定義されたPerson
型であることを指定します。
Add Action をクリックし、Action フィールドのドロップダウンリストから
Republish
を選択します。Topic フィールドに宛先トピックとして
person/${person.name}
と入力します。Payload フィールドにメッセージコンテンツテンプレートとして
${person}
と入力します。
このアクションは、デコードされた "person" メッセージをJSON形式でトピック person/${person.name}
に送信します。${person.name}
は変数プレースホルダーで、実行時にデコードされたメッセージの name
フィールドの値に置き換えられます。
デバイス側コードの準備
ルールを作成したら、テスト用のデータをシミュレートできます。
以下のコードはPython言語を使い、ユーザーメッセージを作成してバイナリデータにエンコードし、t/1
トピックに送信します。詳細はフルコードを参照してください。
def publish_msg(client):
p = person_pb2.Person()
p.id = 1
p.name = "Shawn"
p.email = "shawn@example.com"
message = p.SerializeToString()
topic = "t/1"
print("publish to topic: t/1, payload:", message)
client.publish(topic, payload=message, qos=0, retain=False)
ルール実行結果の確認
ダッシュボードで Diagnose -> WebSocket Client を選択します。
現在のEMQXインスタンスの接続情報を入力します。
- ローカルでEMQXを実行している場合はデフォルト値を使用できます。
- 認証設定などEMQXのデフォルト設定を変更している場合は、ユーザー名やパスワードの入力が必要になることがあります。
Connect をクリックしてEMQXインスタンスにMQTTクライアントとして接続します。
Subscription エリアの Topic フィールドに
person/#
と入力し、Subscribe をクリックします。Pythonの依存パッケージをインストールし、デバイス側コードを実行します:
shell$ pip3 install protobuf paho-mqtt $ protoc --python_out=. person.proto $ python3 protobuf_mqtt.py Connected with result code 0 publish to topic: t/1, payload: b'\n\x05Shawn\x10\x01\x1a\x11shawn@example.com'
WebSocket側でトピック
person/Shawn
のメッセージを受信していることを確認します:json{"name":"Shawn","id":1,"email":"shawn@example.com"}
エンコードシナリオ
デバイスが protobuf_out
トピックをサブスクライブし、Protobufでエンコードされたバイナリメッセージを受信することを期待しています。ルールエンジンを使ってそのようなメッセージをエンコードし、関連トピックにパブリッシュします。
スキーマの作成
デコードシナリオで説明したスキーマと同じものを使用します。
ルールの作成
ダッシュボードのナビゲーションメニューから Integration -> Rules を選択します。
Rules ページの右上にある Create をクリックします。
作成したスキーマを使って以下のルールSQL文を記述します:
sqlSELECT schema_encode('protobuf_person', json_decode(payload), 'Person') as protobuf_person FROM "protobuf_in"
ここでのポイントは
schema_encode('protobuf_person', json_decode(payload), 'Person')
です:schema_encode
関数は、protobuf_person
スキーマに従ってペイロードの内容をエンコードします;as protobuf_person
はエンコードした値を変数protobuf_person
に格納します;- 最後の引数
Person
は、ペイロード内のメッセージタイプがProtobufスキーマで定義されたPerson
型であることを指定します; json_decode(payload)
は、payload
が一般的にJSONエンコードされたバイナリであるため、schema_encode
の入力にMap型を渡すために必要です。
Add Action をクリックし、Action フィールドのドロップダウンリストから
Republish
を選択します。Topic フィールドに宛先トピックとして
protobuf_out
と入力します。Payload フィールドにメッセージコンテンツテンプレートとして
${protobuf_person}
と入力します。
このアクションは、Protobufでエンコードされたユーザーメッセージをトピック protobuf_out
に送信します。${protobuf_person}
は変数プレースホルダーで、実行時に schema_encode
の結果(バイナリ値)に置き換えられます。
デバイス側コードの準備
ルールを作成したら、テスト用のデータをシミュレートできます。
以下のコードはPython言語を使い、ユーザーメッセージを作成してバイナリデータにエンコードし、protobuf_in
トピックに送信します。詳細はフルコードを参照してください。
def on_message(client, userdata, msg):
print("msg payload", msg.payload)
p = person_pb2.Person()
p.ParseFromString(msg.payload)
print(msg.topic+" "+str(p))
ルール実行結果の確認
ダッシュボードで Diagnose -> WebSocket Client を選択します。
現在のEMQXインスタンスの接続情報を入力します。
- ローカルでEMQXを実行している場合はデフォルト値を使用できます。
- 認証設定などEMQXのデフォルト設定を変更している場合は、ユーザー名やパスワードの入力が必要になることがあります。
Connect をクリックしてEMQXインスタンスにMQTTクライアントとして接続します。
Publish エリアの Topic フィールドに
protobuf_in
と入力し、Payload フィールドに以下のメッセージを入力します:json{"name":"Shawn","id":1,"email":"shawn@example.com"}
Publish をクリックします。
Pythonの依存パッケージをインストールし、デバイス側コードを実行します:
shell$ pip3 install protobuf paho-mqtt $ python3 protobuf_mqtt_sub.py Connected with result code 0 msg payload b'\n\x05Shawn\x10\x01\x1a\x11shawn@example.com' protobuf_out name: "Shawn" id: 1 email: "shawn@example.com"