Skip to content

スキーマレジストリの例 - Protobuf

このページでは、スキーマレジストリとルールエンジンがProtobuf形式のメッセージのエンコードおよびデコードをどのようにサポートするかを示します。

デコードシナリオ

デバイスがProtobufでエンコードされたバイナリメッセージをパブリッシュし、そのメッセージをルールエンジンでマッチさせて、nameフィールドに対応するトピックに再パブリッシュする必要があります。トピックの形式は person/${name} です。

例えば、nameフィールドが「Shawn」のメッセージをトピック person/Shawn に再パブリッシュします。

スキーマの作成

  1. ダッシュボードの左側ナビゲーションメニューから Integration -> Schema を選択します。

  2. 以下のパラメータでProtobufスキーマを作成します:

    • Name: protobuf_person

    • Type: Protobuf

    • Schema:

      protobuf
      message Person {
        required string name = 1;
        required int32 id = 2;
        optional string email = 3;
      }
  3. Create をクリックします。

ルールの作成

  1. ダッシュボードのナビゲーションメニューから Integration -> Rules を選択します。

  2. Rules ページの右上にある Create をクリックします。

  3. 作成したスキーマを使って以下のルールSQL文を記述します:

    sql
    SELECT
      schema_decode('protobuf_person', payload, 'Person') as person, payload
    FROM
      "t/#"
    WHERE
      person.name = 'Shawn'

    ここでのポイントは schema_decode('protobuf_person', payload, 'Person') です:

    • schema_decode 関数は、protobuf_person スキーマに従ってペイロードの内容をデコードします;
    • as person はデコードした値を変数 person に格納します;
    • 最後の引数 Person は、ペイロード内のメッセージタイプがProtobufスキーマで定義された Person 型であることを指定します。
  4. Add Action をクリックし、Action フィールドのドロップダウンリストから Republish を選択します。

  5. Topic フィールドに宛先トピックとして person/${person.name} と入力します。

  6. Payload フィールドにメッセージコンテンツテンプレートとして ${person} と入力します。

このアクションは、デコードされた "person" メッセージをJSON形式でトピック person/${person.name} に送信します。${person.name} は変数プレースホルダーで、実行時にデコードされたメッセージの name フィールドの値に置き換えられます。

デバイス側コードの準備

ルールを作成したら、テスト用のデータをシミュレートできます。

以下のコードはPython言語を使い、ユーザーメッセージを作成してバイナリデータにエンコードし、t/1 トピックに送信します。詳細はフルコードを参照してください。

python
def publish_msg(client):
    p = person_pb2.Person()
    p.id = 1
    p.name = "Shawn"
    p.email = "shawn@example.com"
    message = p.SerializeToString()
    topic = "t/1"
    print("publish to topic: t/1, payload:", message)
    client.publish(topic, payload=message, qos=0, retain=False)

ルール実行結果の確認

  1. ダッシュボードで Diagnose -> WebSocket Client を選択します。

  2. 現在のEMQXインスタンスの接続情報を入力します。

    • ローカルでEMQXを実行している場合はデフォルト値を使用できます。
    • 認証設定などEMQXのデフォルト設定を変更している場合は、ユーザー名やパスワードの入力が必要になることがあります。
  1. Connect をクリックしてEMQXインスタンスにMQTTクライアントとして接続します。

  2. Subscription エリアの Topic フィールドに person/# と入力し、Subscribe をクリックします。

  3. Pythonの依存パッケージをインストールし、デバイス側コードを実行します:

    shell
    $ pip3 install protobuf paho-mqtt
    $ protoc --python_out=. person.proto
    
    $ python3 protobuf_mqtt.py
    Connected with result code 0
    publish to topic: t/1, payload: b'\n\x05Shawn\x10\x01\x1a\x11shawn@example.com'
  4. WebSocket側でトピック person/Shawn のメッセージを受信していることを確認します:

    json
    {"name":"Shawn","id":1,"email":"shawn@example.com"}

エンコードシナリオ

デバイスが protobuf_out トピックをサブスクライブし、Protobufでエンコードされたバイナリメッセージを受信することを期待しています。ルールエンジンを使ってそのようなメッセージをエンコードし、関連トピックにパブリッシュします。

スキーマの作成

デコードシナリオで説明したスキーマと同じものを使用します。

ルールの作成

  1. ダッシュボードのナビゲーションメニューから Integration -> Rules を選択します。

  2. Rules ページの右上にある Create をクリックします。

  3. 作成したスキーマを使って以下のルールSQL文を記述します:

    sql
    SELECT
      schema_encode('protobuf_person', json_decode(payload), 'Person') as protobuf_person
    FROM
      "protobuf_in"

    ここでのポイントは schema_encode('protobuf_person', json_decode(payload), 'Person') です:

    • schema_encode 関数は、protobuf_person スキーマに従ってペイロードの内容をエンコードします;
    • as protobuf_person はエンコードした値を変数 protobuf_person に格納します;
    • 最後の引数 Person は、ペイロード内のメッセージタイプがProtobufスキーマで定義された Person 型であることを指定します;
    • json_decode(payload) は、payload が一般的にJSONエンコードされたバイナリであるため、schema_encode の入力にMap型を渡すために必要です。
  4. Add Action をクリックし、Action フィールドのドロップダウンリストから Republish を選択します。

  5. Topic フィールドに宛先トピックとして protobuf_out と入力します。

  6. Payload フィールドにメッセージコンテンツテンプレートとして ${protobuf_person} と入力します。

このアクションは、Protobufでエンコードされたユーザーメッセージをトピック protobuf_out に送信します。${protobuf_person} は変数プレースホルダーで、実行時に schema_encode の結果(バイナリ値)に置き換えられます。

デバイス側コードの準備

ルールを作成したら、テスト用のデータをシミュレートできます。

以下のコードはPython言語を使い、ユーザーメッセージを作成してバイナリデータにエンコードし、protobuf_in トピックに送信します。詳細はフルコードを参照してください。

python
def on_message(client, userdata, msg):
    print("msg payload", msg.payload)
    p = person_pb2.Person()
    p.ParseFromString(msg.payload)
    print(msg.topic+" "+str(p))

ルール実行結果の確認

  1. ダッシュボードで Diagnose -> WebSocket Client を選択します。

  2. 現在のEMQXインスタンスの接続情報を入力します。

    • ローカルでEMQXを実行している場合はデフォルト値を使用できます。
    • 認証設定などEMQXのデフォルト設定を変更している場合は、ユーザー名やパスワードの入力が必要になることがあります。
  1. Connect をクリックしてEMQXインスタンスにMQTTクライアントとして接続します。

  2. Publish エリアの Topic フィールドに protobuf_in と入力し、Payload フィールドに以下のメッセージを入力します:

    json
    {"name":"Shawn","id":1,"email":"shawn@example.com"}
  3. Publish をクリックします。

  4. Pythonの依存パッケージをインストールし、デバイス側コードを実行します:

    shell
    $ pip3 install protobuf paho-mqtt
    
    $ python3 protobuf_mqtt_sub.py
    Connected with result code 0
    msg payload b'\n\x05Shawn\x10\x01\x1a\x11shawn@example.com'
    protobuf_out name: "Shawn"
    id: 1
    email: "shawn@example.com"