スキーマレジストリ
スキーマレジストリは、トピックメッセージデータの管理と検証のための集中管理されたスキーマを提供し、ネットワーク上でのデータのシリアライズおよびデシリアライズ機能を備えています。MQTTトピックのパブリッシャーおよびサブスクライバーは、スキーマを利用してデータの整合性と互換性を確保できます。スキーマレジストリはルールエンジンの重要なコンポーネントであり、デバイスアクセスやルール設計の多様なシナリオに適応可能で、データ品質、コンプライアンス、アプリケーション開発効率、システムパフォーマンスの向上に寄与します。
スキーマレジストリの理解
スキーマは、許容されるデータ型、フォーマット、関係性を含むデータ構造を定義します。これはデータレコードの構造、個々のフィールドのデータ型、フィールド間の関係、およびデータに適用される制約やルールを記述する設計図です。
スキーマはデータベース、メッセージングサービス、分散イベント・データ処理フレームワークなど、さまざまなデータ処理システムで使用されます。これによりデータの一貫性と正確性が保証され、異なるシステムやアプリケーション間で効率的に処理・分析が可能となり、データ共有と相互運用性を促進します。
ユーザーはスキーマレジストリでスキーマを定義し、そのスキーマをルールで使用してクライアントデータをデータ統合を通じて異なるデータサービスに転送できます。同時に、アプリケーションやデータサービスからクライアントへスキーマを介してデータを送信し、双方向のデータフローを実現可能です。
スキーマレジストリには、データ検証、互換性チェック、バージョン管理、反復的進化など多くの利点があります。また、データパイプラインの開発・保守を簡素化し、データ互換性問題、データ破損、データ損失のリスクを低減します。
スキーマの作成
Smart Data Hubをご契約の場合、デプロイメント内のSmart Data Hub -> Schema Registryページにアクセスできます。このページでは、EMQXプラットフォームがネイティブにサポートするAvro、Protobuf、JSON Schema形式のスキーマを作成できます。外部HTTPサービスを利用してカスタムのエンコード・デコードロジックを適用することも可能です。
Internal Schemaタブ右上の**+ New**ボタンをクリックし、New Schemaページに移動します。以下の項目を設定してください:
Name:スキーマを識別するための名前です。
- スキーマ検証やデータ変換ルールで使用可能です。
- データ統合ルールのSQLエンコード・デコード関数でも使用できます(例:
SELECT schema_encode("<name>", payload)
)。
Note:任意で説明情報を追加できます。
Type:ドロップダウンメニューからスキーマタイプを選択します。選択肢は
Avro
、Protobuf
、JSON Schema
、External HTTP
です。Schema:
Avro
、Protobuf
、JSON Schema
を選択した場合は、対応するスキーマを入力します。例:- Avro:
json{ "type": "record", "name": "Device", "fields": [ { "name": "id", "type": "string" }, { "name": "temp", "type": "int" } ] }
- Protobuf:
protomessage Device { required string id = 1; required uint32 temp = 2; }
- JSON Schema; JSONデータからJSON Schemaを生成可能です:
json{ "$schema": "http://json-schema.org/draft-06/schema#", "type": "object", "properties": { "temp": { "type": "integer" }, "id": { "type": "string" } }, "required": [ "temp", "id" ] }
URL:
External HTTP
を選択した場合はURLを指定します。外部HTTPスキーマの作成方法はスキーマレジストリの例 - External HTTPサーバーを参照してください。
Confirmをクリックするとスキーマが作成され、スキーマレジストリページに戻ります。
外部スキーマレジストリ
EMQXプラットフォームは外部のConfluentスキーマレジストリ(CSR)の設定をサポートしています。この機能により、ルール処理時に外部レジストリからスキーマを動的に取得し、効率的なメッセージのエンコード・デコードが可能になります。
外部スキーマレジストリの作成
Smart Data Hubをご契約の場合、EMQXプラットフォームコンソールから直接外部スキーマレジストリを設定でき、スキーマ統合を簡単に管理できます。
前提条件
開始前にVPCピアリング接続を作成してください。ピアリング接続確立後、内部ネットワークIP経由でプラットフォームコンソールにログインし外部スキーマレジストリにアクセス可能です。あるいは、NATゲートウェイを設定しパブリックIP経由でアクセスする方法もあります。
デプロイメントの左メニューからSchema Registryをクリックし、スキーマレジストリページのExternal Schemaタブを選択します。
右上のCreateボタンをクリックし、以下の項目を設定します:
Name:エンコード・デコード関数で使用する外部スキーマレジストリ名を入力します。
Type:外部スキーマレジストリの種類を選択します。現在は
Confluent
のみ対応しています。URL:Confluentスキーマレジストリのエンドポイントを入力します。
Authentication:
Basic auth
を選択した場合は、外部レジストリアクセス用の認証情報(ユーザー名とパスワード)を入力します。
設定完了後、Createをクリックします。
ルールエンジンでの外部スキーマレジストリの利用
外部レジストリを設定すると、EMQXプラットフォームのルールエンジンで以下の関数を使用し、外部レジストリに格納されたスキーマを用いてペイロードのエンコード・デコードが可能です。
TIP
外部スキーマレジストリはメッセージ変換やスキーマ検証には使用できません。
設定済みの外部CSRを利用する関数例:
avro_encode('my_external_registry', payload, my_schema_id)
avro_decode('my_external_registry', payload, my_schema_id)
schema_encode_and_tag('my_local_avro_schema', 'my_external_registry', payload, 'my_subject')
schema_decode_tagged('my_external_registry', payload)
関数使用例
以下の例では、次の値・変数名を使用しています:
my_external_registry
:EMQXプラットフォームで設定した外部レジストリ名my_schema_id
:CSRに登録されたスキーマID(CSRでは常に整数)my_local_avro_schema
:EMQXにローカル登録されたAvroスキーマ名my_subject
:CSRで定義されたサブジェクト名
avro_encode
avro_encode
は外部レジストリのスキーマIDを用いてペイロードをエンコードします。スキーマは実行時に動的に取得され、以降の処理でキャッシュされます。ConfluentスキーマレジストリではスキーマIDは整数です。
TIP
エンコード時、ペイロードはルールエンジンの内部データ形式(デコード済みのマップ)である必要があるため、例ではjson_decode
を使用しています。
例:
select
-- 123はCSRに登録されたスキーマID
avro_encode('my_external_registry', json_decode(payload), 123) as encoded
from 't'
avro_decode
指定したスキーマIDの外部レジストリスキーマに基づき、Avroペイロードをデコードします。スキーマは実行時に動的取得され、以降の処理でキャッシュされます。
例:
select
-- 123はCSRに登録されたスキーマID
avro_decode('my_external_registry', payload, 123) as decoded
from 't'
schema_encode_and_tag
ローカルに登録されたAvroスキーマ、外部CSRスキーマ名、サブジェクトを使い、ペイロード(内部マップ形式)をエンコードし、スキーマIDでタグ付けします。スキーマIDはローカルスキーマをCSRに登録した際のものです。
例:
select
schema_encode_and_tag(
'my_local_avro_schema',
'my_external_registry',
json_decode(payload),
'my_subject'
) as encoded
from 't'
schema_decode_tagged
CSR名を使い、スキーマIDでタグ付けされたペイロードをデコードします。
select
schema_decode_tagged(
'my_external_registry',
payload
) as decoded
from 't'
スキーマの管理
スキーマレジストリページには作成済みのスキーマが一覧表示され、管理が可能です。
スキーマの編集
スキーマ一覧のActions欄にある編集アイコンをクリックすると、説明、スキーマタイプ、スキーマ内容を変更できます。
スキーマの削除
スキーマ一覧のActions欄にある削除アイコンをクリックし、削除を確認するとスキーマが削除されます。