Skip to content

スキーマレジストリ

スキーマレジストリは、トピックメッセージデータの管理と検証のための集中管理されたスキーマを提供し、ネットワーク上でのデータのシリアライズおよびデシリアライズ機能を備えています。MQTTトピックのパブリッシャーおよびサブスクライバーは、スキーマを利用してデータの整合性と互換性を確保できます。スキーマレジストリはルールエンジンの重要なコンポーネントであり、デバイスアクセスやルール設計の多様なシナリオに適応可能で、データ品質、コンプライアンス、アプリケーション開発効率、システムパフォーマンスの向上に寄与します。

スキーマレジストリの理解

スキーマは、許容されるデータ型、フォーマット、関係性を含むデータ構造を定義します。これはデータレコードの構造、個々のフィールドのデータ型、フィールド間の関係、およびデータに適用される制約やルールを記述する設計図です。

スキーマはデータベース、メッセージングサービス、分散イベント・データ処理フレームワークなど、さまざまなデータ処理システムで使用されます。これによりデータの一貫性と正確性が保証され、異なるシステムやアプリケーション間で効率的に処理・分析が可能となり、データ共有と相互運用性を促進します。

ユーザーはスキーマレジストリでスキーマを定義し、そのスキーマをルールで使用してクライアントデータをデータ統合を通じて異なるデータサービスに転送できます。同時に、アプリケーションやデータサービスからクライアントへスキーマを介してデータを送信し、双方向のデータフローを実現可能です。

schema_pic

スキーマレジストリには、データ検証、互換性チェック、バージョン管理、反復的進化など多くの利点があります。また、データパイプラインの開発・保守を簡素化し、データ互換性問題、データ破損、データ損失のリスクを低減します。

スキーマの作成

Smart Data Hubをご契約の場合、デプロイメント内のSmart Data Hub -> Schema Registryページにアクセスできます。このページでは、EMQXプラットフォームがネイティブにサポートするAvro、Protobuf、JSON Schema形式のスキーマを作成できます。外部HTTPサービスを利用してカスタムのエンコード・デコードロジックを適用することも可能です。

  1. Internal Schemaタブ右上の**+ New**ボタンをクリックし、New Schemaページに移動します。以下の項目を設定してください:

    • Name:スキーマを識別するための名前です。

      • スキーマ検証やデータ変換ルールで使用可能です。
      • データ統合ルールのSQLエンコード・デコード関数でも使用できます(例:SELECT schema_encode("<name>", payload))。
    • Note:任意で説明情報を追加できます。

    • Type:ドロップダウンメニューからスキーマタイプを選択します。選択肢はAvroProtobufJSON SchemaExternal HTTPです。

    • SchemaAvroProtobufJSON Schemaを選択した場合は、対応するスキーマを入力します。例:

      • Avro:
      json
      {
        "type": "record",
        "name": "Device",
        "fields": [
          { "name": "id", "type": "string" },
          { "name": "temp", "type": "int" }
        ]
      }
      • Protobuf:
      proto
      message Device {
        required string id = 1;
        required uint32 temp = 2;
      }
      • JSON Schema; JSONデータからJSON Schemaを生成可能です:

      generate_JSON_schema

      json
      {
          "$schema": "http://json-schema.org/draft-06/schema#",
          "type": "object",
          "properties": {
            "temp": {
              "type": "integer"
            },
            "id": {
              "type": "string"
            }
          },
         "required": [
            "temp",
            "id"
         ]
      }
    • URLExternal HTTPを選択した場合はURLを指定します。外部HTTPスキーマの作成方法はスキーマレジストリの例 - External HTTPサーバーを参照してください。

  2. Confirmをクリックするとスキーマが作成され、スキーマレジストリページに戻ります。

外部スキーマレジストリ

EMQXプラットフォームは外部のConfluentスキーマレジストリ(CSR)の設定をサポートしています。この機能により、ルール処理時に外部レジストリからスキーマを動的に取得し、効率的なメッセージのエンコード・デコードが可能になります。

外部スキーマレジストリの作成

Smart Data Hubをご契約の場合、EMQXプラットフォームコンソールから直接外部スキーマレジストリを設定でき、スキーマ統合を簡単に管理できます。

前提条件

開始前にVPCピアリング接続を作成してください。ピアリング接続確立後、内部ネットワークIP経由でプラットフォームコンソールにログインし外部スキーマレジストリにアクセス可能です。あるいは、NATゲートウェイを設定しパブリックIP経由でアクセスする方法もあります。

  1. デプロイメントの左メニューからSchema Registryをクリックし、スキーマレジストリページのExternal Schemaタブを選択します。

  2. 右上のCreateボタンをクリックし、以下の項目を設定します:

    • Name:エンコード・デコード関数で使用する外部スキーマレジストリ名を入力します。

    • Type:外部スキーマレジストリの種類を選択します。現在はConfluentのみ対応しています。

    • URL:Confluentスキーマレジストリのエンドポイントを入力します。

    • AuthenticationBasic authを選択した場合は、外部レジストリアクセス用の認証情報(ユーザー名とパスワード)を入力します。

  3. 設定完了後、Createをクリックします。

ルールエンジンでの外部スキーマレジストリの利用

外部レジストリを設定すると、EMQXプラットフォームのルールエンジンで以下の関数を使用し、外部レジストリに格納されたスキーマを用いてペイロードのエンコード・デコードが可能です。

TIP

外部スキーマレジストリはメッセージ変換やスキーマ検証には使用できません。

設定済みの外部CSRを利用する関数例:

sql
avro_encode('my_external_registry', payload, my_schema_id)
avro_decode('my_external_registry', payload, my_schema_id)
schema_encode_and_tag('my_local_avro_schema', 'my_external_registry', payload, 'my_subject')
schema_decode_tagged('my_external_registry', payload)

関数使用例

以下の例では、次の値・変数名を使用しています:

  • my_external_registry:EMQXプラットフォームで設定した外部レジストリ名
  • my_schema_id:CSRに登録されたスキーマID(CSRでは常に整数)
  • my_local_avro_schema:EMQXにローカル登録されたAvroスキーマ名
  • my_subject:CSRで定義されたサブジェクト名
avro_encode

avro_encodeは外部レジストリのスキーマIDを用いてペイロードをエンコードします。スキーマは実行時に動的に取得され、以降の処理でキャッシュされます。ConfluentスキーマレジストリではスキーマIDは整数です。

TIP

エンコード時、ペイロードはルールエンジンの内部データ形式(デコード済みのマップ)である必要があるため、例ではjson_decodeを使用しています。

例:

sql
select
  -- 123はCSRに登録されたスキーマID
  avro_encode('my_external_registry', json_decode(payload), 123) as encoded
from 't'
avro_decode

指定したスキーマIDの外部レジストリスキーマに基づき、Avroペイロードをデコードします。スキーマは実行時に動的取得され、以降の処理でキャッシュされます。

例:

sql
select
  -- 123はCSRに登録されたスキーマID
  avro_decode('my_external_registry', payload, 123) as decoded
from 't'
schema_encode_and_tag

ローカルに登録されたAvroスキーマ、外部CSRスキーマ名、サブジェクトを使い、ペイロード(内部マップ形式)をエンコードし、スキーマIDでタグ付けします。スキーマIDはローカルスキーマをCSRに登録した際のものです。

例:

sql
select
  schema_encode_and_tag(
    'my_local_avro_schema',
    'my_external_registry',
    json_decode(payload),
    'my_subject'
  ) as encoded
from 't'
schema_decode_tagged

CSR名を使い、スキーマIDでタグ付けされたペイロードをデコードします。

sql
select
  schema_decode_tagged(
    'my_external_registry',
    payload
  ) as decoded
from 't'

スキーマの管理

スキーマレジストリページには作成済みのスキーマが一覧表示され、管理が可能です。

スキーマの編集

スキーマ一覧のActions欄にある編集アイコンをクリックすると、説明、スキーマタイプ、スキーマ内容を変更できます。

スキーマの削除

スキーマ一覧のActions欄にある削除アイコンをクリックし、削除を確認するとスキーマが削除されます。