スキーマレジストリ
スキーマレジストリは、トピックメッセージデータの管理と検証のための集中管理されたスキーマを提供し、ネットワーク上でのデータのシリアライズおよびデシリアライズを可能にします。MQTTトピックのパブリッシャーとサブスクライバーはスキーマを利用してデータの一貫性と互換性を確保できます。スキーマレジストリはルールエンジンの重要なコンポーネントであり、デバイスアクセスやルール設計のさまざまなシナリオに適応可能で、データ品質、コンプライアンス、アプリケーション開発効率、システムパフォーマンスの向上に寄与します。
スキーマレジストリの理解
スキーマはデータの構造を定義し、許容されるデータ型、フォーマット、関係性を含みます。これはデータレコードの構造、各フィールドのデータ型、フィールド間の関係、およびデータに適用される制約やルールを記述する設計図です。
スキーマはデータベース、メッセージングサービス、分散イベントおよびデータ処理フレームワークなど、さまざまなデータ処理システムで使用されます。これにより、データの一貫性と正確性が保証され、異なるシステムやアプリケーションによる効率的な処理と分析が可能になります。したがって、異なるシステムや組織間のデータ共有と相互運用性を促進します。
ユーザーはスキーマレジストリでスキーマを定義し、その定義済みスキーマをルールで使用してクライアントデータをデータ統合経由で異なるデータサービスに転送できます。同時に、アプリケーションやデータサービス側のデータをスキーマを通じてクライアントに送信し、双方向のデータフローを実現できます。

スキーマレジストリは、データ検証、互換性チェック、バージョン管理、反復的進化など多くの利点があります。また、データパイプラインの開発と保守を簡素化し、データ互換性の問題、データ破損、データ損失のリスクを低減します。
スキーマの作成
Smart Data Hubにご契約いただいている場合、デプロイメント内の Smart Data Hub -> Schema Registry ページにアクセスできます。このページでは、EMQX CloudがネイティブにサポートするAvro、Protobuf、JSON Schema形式のスキーマを作成できます。また、外部HTTPサービスを利用してカスタムのエンコード・デコードロジックを適用することも可能です。
Internal Schema タブの右上にある + New ボタンをクリックし、New Schema ページに移動します。以下の項目を設定してください:
Name:スキーマを識別するための名前です。
- スキーマ検証やデータ変換ルールで名前を使用できます。
- データ統合ルールのSQLエンコード・デコード関数でも使用可能です。例:
SELECT schema_encode("<name>", payload)
Note:任意で説明情報を追加できます。
Type:ドロップダウンメニューからスキーマタイプを選択します。選択肢は
Avro、Protobuf、JSON Schema、External HTTPです。Schema:
Avro、Protobuf、JSON Schemaを選択した場合は、対応するスキーマを入力します。例:- Avro:
json{ "type": "record", "name": "Device", "fields": [ { "name": "id", "type": "string" }, { "name": "temp", "type": "int" } ] }- Protobuf:
protomessage Device { required string id = 1; required uint32 temp = 2; }- JSON Schema; JSONデータからJSON Schemaを生成できます:
json{ "$schema": "http://json-schema.org/draft-06/schema#", "type": "object", "properties": { "temp": { "type": "integer" }, "id": { "type": "string" } }, "required": [ "temp", "id" ] }URL:
External HTTPを選択した場合はURLを指定します。外部HTTPスキーマの作成方法は スキーマレジストリの例 - External HTTPサーバー を参照してください。
Confirm をクリックするとスキーマが作成され、スキーマレジストリページに戻ります。
外部スキーマレジストリ
EMQX Cloudは外部のConfluentスキーマレジストリ(CSR)を設定可能です。この機能により、ルール処理中に外部レジストリからスキーマを動的に取得し、効率的なメッセージのエンコード・デコードが可能になります。
外部スキーマレジストリの作成
Smart Data Hubにご契約いただいている場合、EMQX Cloudコンソールから直接外部スキーマレジストリを設定でき、スキーマ統合の管理が容易になります。
前提条件
開始前にVPCピアリング接続を作成してください。ピアリング接続確立後、プラットフォームコンソールに内部ネットワークIP経由でログインし外部スキーマレジストリにアクセスできます。あるいは、NATゲートウェイを設定しパブリックIP経由で外部スキーマレジストリにアクセスすることも可能です。
デプロイメントの左メニューから Schema Registry をクリックし、スキーマレジストリページの External Schema タブを選択します。
右上の Create ボタンをクリックし、以下の項目を設定します:
Name:エンコード・デコード関数で使用する外部スキーマレジストリ名を入力します。
Type:外部スキーマレジストリのタイプを選択します。現在は
Confluentのみ対応しています。URL:Confluentスキーマレジストリのエンドポイントを入力します。
Authentication:
Basic authを選択した場合は、外部レジストリアクセス用の認証情報(ユーザー名とパスワード)を入力します。
設定完了後、Create をクリックします。
ルールエンジンでの外部スキーマレジストリの利用
外部レジストリを設定すると、EMQX Cloudルールエンジン内で外部レジストリに保存されたスキーマを使ってペイロードのエンコード・デコードを行う関数が利用可能になります。
TIP
外部スキーマレジストリはメッセージ変換やスキーマ検証には使用できません。
以下の関数は設定済みの外部CSRを利用します:
avro_encode('my_external_registry', payload, my_schema_id)
avro_decode('my_external_registry', payload, my_schema_id)
schema_encode_and_tag('my_local_avro_schema', 'my_external_registry', payload, 'my_subject')
schema_decode_tagged('my_external_registry', payload)関数使用例
以下の例では、次の値と変数名を使用しています:
my_external_registry:EMQX Cloudで外部レジストリに付けた名前my_schema_id:CSRに登録されたスキーマID(CSRでは常に整数)my_local_avro_schema:EMQXにローカル登録されたAvroスキーマ名my_subject:CSRで定義されたサブジェクト名
avro_encode
avro_encode は外部レジストリのスキーマIDを使ってペイロードをエンコードします。スキーマは実行時に動的に取得され、以降はキャッシュされます。ConfluentスキーマレジストリではスキーマIDは整数です。
TIP
エンコード時、ペイロードはルールエンジンの内部データ形式(デコード済みのマップ)である必要があります。例で json_decode を使う理由はこれです。
例:
select
-- 123はCSRに登録されたスキーマID
avro_encode('my_external_registry', json_decode(payload), 123) as encoded
from 't'avro_decode
この関数は外部レジストリの指定スキーマIDに基づいてAvroペイロードをデコードします。スキーマは実行時に動的に取得され、以降はキャッシュされます。
例:
select
-- 123はCSRに登録されたスキーマID
avro_decode('my_external_registry', payload, 123) as decoded
from 't'schema_encode_and_tag
この関数はローカル登録されたAvroスキーマ、外部CSRスキーマ名、サブジェクトを使い、内部マップ形式のペイロードをエンコードし、結果のペイロードにCSR登録時のスキーマIDタグを付与します。
例:
select
schema_encode_and_tag(
'my_local_avro_schema',
'my_external_registry',
json_decode(payload),
'my_subject'
) as encoded
from 't'schema_decode_tagged
この関数はCSR名を使って、CSRから取得したスキーマIDタグ付きのペイロードをデコードします。
select
schema_decode_tagged(
'my_external_registry',
payload
) as decoded
from 't'スキーマの管理
スキーマレジストリページには作成済みのスキーマがすべて表示され、ここで管理できます。
スキーマの編集
スキーマ一覧の Actions 欄にある編集アイコンをクリックすると、説明、スキーマタイプ、スキーマ内容を変更できます。
スキーマの削除
スキーマ一覧の Actions 欄にある削除アイコンをクリックし、削除を確認するとスキーマが削除されます。