スキーマレジストリの例 - Protobuf
このページでは、スキーマレジストリとルールエンジンがProtobuf形式のメッセージのエンコードおよびデコードをどのようにサポートするかを示します。
デコードシナリオ
デバイスがProtobufでエンコードされたバイナリメッセージをパブリッシュし、そのメッセージをルールエンジンでマッチさせて、nameフィールドに対応するトピックに再パブリッシュする必要があります。トピックの形式は person/${name} です。
例えば、nameフィールドが "Shawn" のメッセージをトピック person/Shawn に再パブリッシュします。
スキーマの作成
ルールエンジンがProtobufメッセージを正しくデコードまたはエンコードできるように、まずスキーマレジストリを使ってProtobufメッセージの構造を定義するスキーマを登録する必要があります。
ダッシュボードの左ナビゲーションメニューから Smart Data Hub -> Schema Registry を選択します。
Internal Schema タブの下で、Create をクリックします。
スキーマの Name を入力します。例:
protobuf_person。この名前はエンコード・デコード関数で使用されます。スキーマの Type を選択します。
Protobufを選択してください。Creation Method を選択します。選択肢は2つあります:
TIP
このページの例では Input メソッドを使用しています。
Input(シンプルなスキーマ向け):
作成方法に Input を選択します。
Protobuf定義を直接 Schema フィールドに貼り付けます。例:
protobufmessage Person { required string name = 1; required int32 id = 2; optional string email = 3; }
Upload Protobuf Bundle(複雑または複数ファイルのスキーマ向け):
作成方法に Upload Protobuf Bundle を選択します。
.protoファイルを含む.tar.gzバンドルをアップロードするために Select file をクリックします。Root Proto File にエントリポイントファイル名(例:
person.proto)を指定します。このファイルはバンドルのルートに存在する必要があります。
Create をクリックしてスキーマを登録します。
ルールの作成
ダッシュボードのナビゲーションメニューから Integration -> Rules を選択します。
Rules ページの右上の Create をクリックします。
先ほど作成したスキーマを使って、以下のようなルールSQL文を記述します:
sqlSELECT schema_decode('protobuf_person', payload, 'Person') as person, payload FROM "t/#" WHERE person.name = 'Shawn'ポイントは
schema_decode('protobuf_person', payload, 'Person')の部分です:schema_decode関数は、protobuf_personスキーマに従ってペイロードの内容をデコードします。as personはデコードした値を変数personに格納します。最後の引数
Personは、ペイロード内のメッセージタイプがProtobufスキーマで定義されたPerson型であることを指定します。
Add Action をクリックし、Action フィールドのドロップダウンリストから
Republishを選択します。Topic フィールドに、送信先トピックとして
person/${person.name}と入力します。Payload フィールドに、メッセージ内容のテンプレートとして
${person}と入力します。
このアクションにより、デコードされた "person" メッセージがJSON形式でトピック person/${person.name} に送信されます。${person.name} は変数プレースホルダーで、実行時にデコードされたメッセージの name フィールドの値に置き換えられます。
デバイス側コードの準備
ルールが作成されたら、テスト用にデータをシミュレートできます。
以下のコードはPython言語を使い、ユーザーメッセージを作成してバイナリデータとしてエンコードし、t/1 トピックに送信します。詳細はフルコードを参照してください。
def publish_msg(client):
p = person_pb2.Person()
p.id = 1
p.name = "Shawn"
p.email = "shawn@example.com"
message = p.SerializeToString()
topic = "t/1"
print("publish to topic: t/1, payload:", message)
client.publish(topic, payload=message, qos=0, retain=False)ルール実行結果の確認
ダッシュボードの Diagnose -> WebSocket Client を選択します。
現在のEMQXインスタンスの接続情報を入力します。
EMQXをローカルで実行している場合は、デフォルト値を使用できます。
認証設定などEMQXのデフォルト設定を変更している場合は、ユーザー名やパスワードを入力する必要があります。
Connect をクリックしてEMQXインスタンスにMQTTクライアントとして接続します。
Subscription エリアの Topic フィールドに
person/#と入力し、Subscribe をクリックします。Pythonの依存パッケージをインストールし、デバイス側コードを実行します:
shell$ pip3 install protobuf paho-mqtt $ protoc --python_out=. person.proto $ python3 protobuf_mqtt.py Connected with result code 0 publish to topic: t/1, payload: b'\n\x05Shawn\x10\x01\x1a\x11shawn@example.com'WebSocket側でトピック
person/Shawnのメッセージを受信していることを確認します:json{"name":"Shawn","id":1,"email":"shawn@example.com"}
エンコードシナリオ
デバイスがトピック protobuf_out をサブスクライブし、Protobufでエンコードされたバイナリメッセージを受信することを期待しています。ルールエンジンを使ってそのようなメッセージをエンコードし、関連トピックにパブリッシュします。
スキーマの作成
デコードシナリオで説明したのと同じスキーマを使用します。
ルールの作成
ダッシュボードのナビゲーションメニューから Integration -> Rules を選択します。
Rules ページの右上の Create をクリックします。
先ほど作成したスキーマを使って、以下のルールSQL文を記述します:
sqlSELECT schema_encode('protobuf_person', json_decode(payload), 'Person') as protobuf_person FROM "protobuf_in"ポイントは
schema_encode('protobuf_person', json_decode(payload), 'Person')の部分です:schema_encode関数は、protobuf_personスキーマに従ってペイロードの内容をエンコードします。as protobuf_personはエンコードした値を変数protobuf_personに格納します。最後の引数
Personは、ペイロード内のメッセージタイプがProtobufスキーマで定義されたPerson型であることを指定します。json_decode(payload)は、payloadが一般的にJSONエンコードされたバイナリであるため、schema_encodeの入力としてMap型が必要なため使用します。
Add Action をクリックし、Action フィールドのドロップダウンリストから
Republishを選択します。Topic フィールドに送信先トピックとして
protobuf_outと入力します。Payload フィールドにメッセージ内容のテンプレートとして
${protobuf_person}と入力します。
このアクションにより、Protobufでエンコードされたユーザーメッセージがトピック protobuf_out に送信されます。${protobuf_person} は変数プレースホルダーで、実行時に schema_encode の結果(バイナリ値)に置き換えられます。
デバイス側コードの準備
ルールが作成されたら、テスト用にデータをシミュレートできます。
以下のコードはPython言語を使い、ユーザーメッセージを作成してバイナリデータとしてエンコードし、protobuf_in トピックに送信します。詳細はフルコードを参照してください。
def on_message(client, userdata, msg):
print("msg payload", msg.payload)
p = person_pb2.Person()
p.ParseFromString(msg.payload)
print(msg.topic+" "+str(p))ルール実行結果の確認
ダッシュボードの Diagnose -> WebSocket Client を選択します。
現在のEMQXインスタンスの接続情報を入力します。
EMQXをローカルで実行している場合は、デフォルト値を使用できます。
認証設定などEMQXのデフォルト設定を変更している場合は、ユーザー名やパスワードを入力する必要があります。
Connect をクリックしてEMQXインスタンスにMQTTクライアントとして接続します。
Publish エリアの Topic フィールドに
protobuf_inと入力し、Payload フィールドに以下のメッセージを入力します:json{"name":"Shawn","id":1,"email":"shawn@example.com"}Publish をクリックします。
Pythonの依存パッケージをインストールし、デバイス側コードを実行します:
shell$ pip3 install protobuf paho-mqtt $ python3 protobuf_mqtt_sub.py Connected with result code 0 msg payload b'\n\x05Shawn\x10\x01\x1a\x11shawn@example.com' protobuf_out name: "Shawn" id: 1 email: "shawn@example.com"