Skip to content

スキーマレジストリ

スキーマレジストリは、トピックメッセージデータの管理と検証のための集中管理されたスキーマを提供し、ネットワーク上でのデータのシリアライズおよびデシリアライズを可能にします。MQTTトピックのパブリッシャーとサブスクライバーはスキーマを利用してデータの一貫性と互換性を確保できます。スキーマレジストリはルールエンジンの重要なコンポーネントであり、デバイスアクセスやルール設計のさまざまなシナリオに適応可能で、データ品質、コンプライアンス、アプリケーション開発効率、システムパフォーマンスの向上に寄与します。

スキーマレジストリの理解

スキーマはデータの構造を定義し、許容されるデータ型、フォーマット、関係性を含みます。これはデータレコードの構造、各フィールドのデータ型、フィールド間の関係、およびデータに適用される制約やルールを記述する設計図です。

スキーマはデータベース、メッセージングサービス、分散イベントおよびデータ処理フレームワークなど、さまざまなデータ処理システムで使用されます。これにより、データの一貫性と正確性が保証され、異なるシステムやアプリケーションによる効率的な処理と分析が可能になります。したがって、異なるシステムや組織間のデータ共有と相互運用性を促進します。

ユーザーはスキーマレジストリでスキーマを定義し、その定義済みスキーマをルールで使用してクライアントデータをデータ統合経由で異なるデータサービスに転送できます。同時に、アプリケーションやデータサービス側のデータをスキーマを通じてクライアントに送信し、双方向のデータフローを実現できます。

schema_pic

スキーマレジストリは、データ検証、互換性チェック、バージョン管理、反復的進化など多くの利点があります。また、データパイプラインの開発と保守を簡素化し、データ互換性の問題、データ破損、データ損失のリスクを低減します。

スキーマの作成

Smart Data Hubにご契約いただいている場合、デプロイメント内の Smart Data Hub -> Schema Registry ページにアクセスできます。このページでは、EMQX CloudがネイティブにサポートするAvro、Protobuf、JSON Schema形式のスキーマを作成できます。また、外部HTTPサービスを利用してカスタムのエンコード・デコードロジックを適用することも可能です。

  1. Internal Schema タブの右上にある + New ボタンをクリックし、New Schema ページに移動します。以下の項目を設定してください:

    • Name:スキーマを識別するための名前です。

      • スキーマ検証やデータ変換ルールで名前を使用できます。
      • データ統合ルールのSQLエンコード・デコード関数でも使用可能です。例:SELECT schema_encode("<name>", payload)
    • Note:任意で説明情報を追加できます。

    • Type:ドロップダウンメニューからスキーマタイプを選択します。選択肢は AvroProtobufJSON SchemaExternal HTTP です。

    • SchemaAvroProtobufJSON Schema を選択した場合は、対応するスキーマを入力します。例:

      • Avro:
      json
      {
        "type": "record",
        "name": "Device",
        "fields": [
          { "name": "id", "type": "string" },
          { "name": "temp", "type": "int" }
        ]
      }
      • Protobuf:
      proto
      message Device {
        required string id = 1;
        required uint32 temp = 2;
      }
      • JSON Schema; JSONデータからJSON Schemaを生成できます:

      generate_JSON_schema

      json
      {
          "$schema": "http://json-schema.org/draft-06/schema#",
          "type": "object",
          "properties": {
            "temp": {
              "type": "integer"
            },
            "id": {
              "type": "string"
            }
          },
         "required": [
            "temp",
            "id"
         ]
      }
    • URLExternal HTTP を選択した場合はURLを指定します。外部HTTPスキーマの作成方法は スキーマレジストリの例 - External HTTPサーバー を参照してください。

  2. Confirm をクリックするとスキーマが作成され、スキーマレジストリページに戻ります。

外部スキーマレジストリ

EMQX Cloudは外部のConfluentスキーマレジストリ(CSR)を設定可能です。この機能により、ルール処理中に外部レジストリからスキーマを動的に取得し、効率的なメッセージのエンコード・デコードが可能になります。

外部スキーマレジストリの作成

Smart Data Hubにご契約いただいている場合、EMQX Cloudコンソールから直接外部スキーマレジストリを設定でき、スキーマ統合の管理が容易になります。

前提条件

開始前にVPCピアリング接続を作成してください。ピアリング接続確立後、プラットフォームコンソールに内部ネットワークIP経由でログインし外部スキーマレジストリにアクセスできます。あるいは、NATゲートウェイを設定しパブリックIP経由で外部スキーマレジストリにアクセスすることも可能です。

  1. デプロイメントの左メニューから Schema Registry をクリックし、スキーマレジストリページの External Schema タブを選択します。

  2. 右上の Create ボタンをクリックし、以下の項目を設定します:

    • Name:エンコード・デコード関数で使用する外部スキーマレジストリ名を入力します。

    • Type:外部スキーマレジストリのタイプを選択します。現在は Confluent のみ対応しています。

    • URL:Confluentスキーマレジストリのエンドポイントを入力します。

    • AuthenticationBasic auth を選択した場合は、外部レジストリアクセス用の認証情報(ユーザー名とパスワード)を入力します。

  3. 設定完了後、Create をクリックします。

ルールエンジンでの外部スキーマレジストリの利用

外部レジストリを設定すると、EMQX Cloudルールエンジン内で外部レジストリに保存されたスキーマを使ってペイロードのエンコード・デコードを行う関数が利用可能になります。

TIP

外部スキーマレジストリはメッセージ変換やスキーマ検証には使用できません。

以下の関数は設定済みの外部CSRを利用します:

sql
avro_encode('my_external_registry', payload, my_schema_id)
avro_decode('my_external_registry', payload, my_schema_id)
schema_encode_and_tag('my_local_avro_schema', 'my_external_registry', payload, 'my_subject')
schema_decode_tagged('my_external_registry', payload)

関数使用例

以下の例では、次の値と変数名を使用しています:

  • my_external_registry:EMQX Cloudで外部レジストリに付けた名前
  • my_schema_id:CSRに登録されたスキーマID(CSRでは常に整数)
  • my_local_avro_schema:EMQXにローカル登録されたAvroスキーマ名
  • my_subject:CSRで定義されたサブジェクト名
avro_encode

avro_encode は外部レジストリのスキーマIDを使ってペイロードをエンコードします。スキーマは実行時に動的に取得され、以降はキャッシュされます。ConfluentスキーマレジストリではスキーマIDは整数です。

TIP

エンコード時、ペイロードはルールエンジンの内部データ形式(デコード済みのマップ)である必要があります。例で json_decode を使う理由はこれです。

例:

sql
select
  -- 123はCSRに登録されたスキーマID
  avro_encode('my_external_registry', json_decode(payload), 123) as encoded
from 't'
avro_decode

この関数は外部レジストリの指定スキーマIDに基づいてAvroペイロードをデコードします。スキーマは実行時に動的に取得され、以降はキャッシュされます。

例:

sql
select
  -- 123はCSRに登録されたスキーマID
  avro_decode('my_external_registry', payload, 123) as decoded
from 't'
schema_encode_and_tag

この関数はローカル登録されたAvroスキーマ、外部CSRスキーマ名、サブジェクトを使い、内部マップ形式のペイロードをエンコードし、結果のペイロードにCSR登録時のスキーマIDタグを付与します。

例:

sql
select
  schema_encode_and_tag(
    'my_local_avro_schema',
    'my_external_registry',
    json_decode(payload),
    'my_subject'
  ) as encoded
from 't'
schema_decode_tagged

この関数はCSR名を使って、CSRから取得したスキーマIDタグ付きのペイロードをデコードします。

sql
select
  schema_decode_tagged(
    'my_external_registry',
    payload
  ) as decoded
from 't'

スキーマの管理

スキーマレジストリページには作成済みのスキーマがすべて表示され、ここで管理できます。

スキーマの編集

スキーマ一覧の Actions 欄にある編集アイコンをクリックすると、説明、スキーマタイプ、スキーマ内容を変更できます。

スキーマの削除

スキーマ一覧の Actions 欄にある削除アイコンをクリックし、削除を確認するとスキーマが削除されます。