Skip to content

スキーマレジストリの例 - Avro

このページでは、スキーマレジストリとルールエンジンがAvro形式のメッセージのエンコードおよびデコードをどのようにサポートするかを示します。

デコードシナリオ

デバイスがAvroでエンコードされたバイナリメッセージをパブリッシュし、ルールエンジンでマッチングした後、nameフィールドに対応するトピックに再パブリッシュする必要があります。トピックの形式はavro_user/${name}です。

例えば、nameフィールドがShawnのメッセージをトピックavro_user/Shawnに再パブリッシュする必要があります。

スキーマの作成

ルールエンジンがAvroメッセージを正しくデコードまたはエンコードできるように、まずスキーマレジストリを使ってAvroメッセージの構造を定義するスキーマを登録する必要があります。

  1. ダッシュボードの左ナビゲーションメニューから Smart Data Hub -> Schema Registry を選択します。

  2. Internal Schema タブの下で、Create をクリックします。

  3. 次のパラメータでAvroスキーマを作成します:

    • Name: avro_user。この名前はエンコード・デコード関数で使用されます。

    • Type: Avro

    • Schema:

      json
      {
        "type":"record",
        "name": "myrecord1",
        "fields":[
            {"name":"name", "type":"string"},
            {"name":"favorite_number", "type":["int", "null"]},
            {"name":"favorite_color", "type":["string", "null"]}
        ]
      }
  4. Create をクリックします。

ルールの作成

  1. ダッシュボードのナビゲーションメニューから Integration -> Rules を選択します。

  2. Rules ページで、右上の Create をクリックします。

  3. 先ほど作成したスキーマを使って、以下のルールSQL文を書きます:

    sql
    SELECT
      schema_decode('avro_user', payload) as avro_user, payload
    FROM
      "t/#"
    WHERE
      avro_user.name = 'Shawn'

    ここでのポイントは schema_decode('avro_user', payload) です:

    • schema_decode 関数は、avro_userスキーマに従ってペイロードの内容をデコードします。
    • as avro_user はデコードした値を変数 avro_user に格納します。
  4. Add Action をクリックし、Action フィールドのドロップダウンリストから Republish を選択します。

  5. Topic フィールドに、宛先トピックとして avro_user/${avro_user.name} を入力します。

  6. Payload フィールドに、メッセージ内容のテンプレートとして ${avro_user} を入力します。

このアクションにより、デコードされたメッセージがJSON形式でトピック avro_user/${avro_user.name} に送信されます。${avro_user.name} は変数プレースホルダーで、実行時にデコードされたメッセージの name フィールドの値に置き換えられます。

デバイス側コードの準備

ルールが作成されたら、テスト用にデータをシミュレートできます。

以下のコードはPython言語を使用し、ユーザーメッセージを作成してバイナリデータとしてエンコードし、t/1 トピックに送信します。詳細はフルコードを参照してください。

python
def publish_msg(client):
    datum_w = avro.io.DatumWriter(SCHEMA)
    buf = io.BytesIO()
    encoder = avro.io.BinaryEncoder(buf)
    datum_w.write({"name": "Shawn", "favorite_number": 666, "favorite_color": "red"}, encoder)
    message = buf.getvalue()
    topic = "t/1"
    print("publish to topic: t/1, payload:", message)
    client.publish(topic, payload=message, qos=0, retain=False)

ルール実行結果の確認

  1. ダッシュボードで Diagnose -> WebSocket Client を選択します。

  2. 現在のEMQXインスタンスの接続情報を入力します。

    • ローカルでEMQXを実行している場合は、デフォルト値を使用できます。
    • 認証設定などEMQXのデフォルト設定を変更している場合は、ユーザー名やパスワードの入力が必要です。
  1. Connect をクリックして、EMQXインスタンスにMQTTクライアントとして接続します。

  2. Subscription エリアの Topic フィールドに avro_user/# と入力し、Subscribe をクリックします。

  3. Pythonの依存パッケージをインストールし、デバイス側コードを実行します:

    shell
    $ pip3 install avro paho-mqtt
    
    $ python3 avro_mqtt.py
    Connected with result code 0
    publish to topic: t/1, payload: b'\nShawn\x00\xb4\n\x00\x06red'
  4. WebSocket側でトピック avro_user/Shawn のメッセージが受信されていることを確認します:

    json
    {"favorite_color":"red","favorite_number":666,"name":"Shawn"}

エンコードシナリオ

デバイスがトピック avro_out をサブスクライブし、Avroでエンコードされたバイナリメッセージを受信することを想定しています。ルールエンジンはこのようなメッセージをエンコードし、対応するトピックにパブリッシュします。

スキーマの作成

デコードシナリオで説明したのと同じスキーマを使用します。

ルールの作成

  1. ダッシュボードのナビゲーションメニューから Integration -> Rules を選択します。

  2. Rules ページで、右上の Create をクリックします。

  3. 先ほど作成したスキーマを使って、以下のルールSQL文を書きます:

    sql
    SELECT
      schema_encode('avro_user', json_decode(payload)) as avro_user
    FROM
      "avro_in"

    ここでのポイントは schema_encode('avro_user', json_decode(payload)) です:

    • schema_encode 関数は、avro_userスキーマに従ってペイロードの内容をエンコードします。
    • as avro_user はエンコードした値を変数 avro_user に格納します。
    • json_decode(payload) は、payload が一般的にJSONエンコードされたバイナリであり、schema_encode の入力にはMap型が必要なため使用します。
  4. Add Action をクリックし、Action フィールドのドロップダウンリストから Republish を選択します。

  5. Topic フィールドに、宛先トピックとして avro_out を入力します。

  6. Payload フィールドに、メッセージ内容のテンプレートとして ${avro_user} を入力します。

このアクションにより、Avroでエンコードされたメッセージがトピック avro_out に送信されます。${avro_user} は、schema_encode の結果(バイナリ値)に実行時に置き換えられる変数プレースホルダーです。

デバイス側コードの準備

ルールが作成されたら、テスト用にデータをシミュレートできます。

以下のコードはPython言語を使用し、Userメッセージを受信してデコードし、内容を表示します。詳細はフルコードを参照してください。

python
def on_message(client, userdata, msg):
    datum_r = avro.io.DatumReader(SCHEMA)
    buf = io.BytesIO(msg.payload)
    decoder = avro.io.BinaryDecoder(buf)
    decoded_payload = datum_r.read(decoder)
    print(msg.topic+" "+str(decoded_payload))

ルール実行結果の確認

  1. ダッシュボードで Diagnose -> WebSocket Client を選択します。

  2. 現在のEMQXインスタンスの接続情報を入力します。

    • ローカルでEMQXを実行している場合は、デフォルト値を使用できます。
    • 認証設定などEMQXのデフォルト設定を変更している場合は、ユーザー名やパスワードの入力が必要です。
  1. Connect をクリックして、EMQXインスタンスにMQTTクライアントとして接続します。

  2. Publish エリアの Topic フィールドに avro_in と入力し、Payload フィールドに以下のメッセージを入力します:

    json
    {"favorite_color":"red","favorite_number":666,"name":"Shawn"}
  3. Publish をクリックします。

  4. Pythonの依存パッケージをインストールし、デバイス側コードを実行します:

    shell
    $ pip3 install avro paho-mqtt
    
    $ python3 avro_mqtt_sub.py
    Connected with result code 0
    msg payload b'\nShawn\x00\xb4\n\x00\x06red'
    avro_out {'name': 'Shawn', 'favorite_number': 666, 'favorite_color': 'red'}