# スキーマレジストリ

スキーマレジストリは、トピックメッセージデータの管理と検証のための集中管理されたスキーマを提供し、ネットワーク上でのデータのシリアライズおよびデシリアライズ機能を備えています。MQTTトピックのパブリッシャーおよびサブスクライバーは、スキーマを利用してデータの整合性と互換性を確保できます。スキーマレジストリはルールエンジンの重要なコンポーネントであり、デバイスアクセスやルール設計の多様なシナリオに適応可能で、データ品質、コンプライアンス、アプリケーション開発効率、システムパフォーマンスの向上に寄与します。

## スキーマレジストリの理解

スキーマは、許容されるデータ型、フォーマット、関係性を含むデータ構造を定義します。これはデータレコードの構造、個々のフィールドのデータ型、フィールド間の関係、およびデータに適用される制約やルールを記述する設計図です。

スキーマはデータベース、メッセージングサービス、分散イベント・データ処理フレームワークなど、さまざまなデータ処理システムで使用されます。これによりデータの一貫性と正確性が保証され、異なるシステムやアプリケーション間で効率的に処理・分析が可能となり、データ共有と相互運用性を促進します。

ユーザーはスキーマレジストリでスキーマを定義し、そのスキーマをルールで使用してクライアントデータをデータ統合を通じて異なるデータサービスに転送できます。同時に、アプリケーションやデータサービスからクライアントへスキーマを介してデータを送信し、双方向のデータフローを実現可能です。

![schema_pic](./_assets/schema_pic.png)

スキーマレジストリには、データ検証、互換性チェック、バージョン管理、反復的進化など多くの利点があります。また、データパイプラインの開発・保守を簡素化し、データ互換性問題、データ破損、データ損失のリスクを低減します。

## スキーマの作成

Smart Data Hubをご契約の場合、デプロイメント内の**Smart Data Hub** -> **Schema Registry**ページにアクセスできます。このページでは、EMQXプラットフォームがネイティブにサポートするAvro、Protobuf、JSON Schema形式のスキーマを作成できます。外部HTTPサービスを利用してカスタムのエンコード・デコードロジックを適用することも可能です。

1. **Internal Schema**タブ右上の**+ New**ボタンをクリックし、**New Schema**ページに移動します。以下の項目を設定してください：

   - **Name**：スキーマを識別するための名前です。
     
     - スキーマ検証やデータ変換ルールで使用可能です。
     - データ統合ルールのSQLエンコード・デコード関数でも使用できます（例：`SELECT schema_encode("<name>", payload)`）。
   - **Note**：任意で説明情報を追加できます。
   - **Type**：ドロップダウンメニューからスキーマタイプを選択します。選択肢は`Avro`、`Protobuf`、`JSON Schema`、`External HTTP`です。
   - **Schema**：`Avro`、`Protobuf`、`JSON Schema`を選択した場合は、対応するスキーマを入力します。例：

     - Avro:

      ```json
      {
        "type": "record",
        "name": "Device",
        "fields": [
          { "name": "id", "type": "string" },
          { "name": "temp", "type": "int" }
        ]
      }
      ```

     - Protobuf:

      ```proto
      message Device {
        required string id = 1;
        required uint32 temp = 2;
      }
      ```

     - JSON Schema; JSONデータからJSON Schemaを生成可能です：

     ![generate_JSON_schema](./_assets/generate_JSON_schema.png)

      ```json
      {
          "$schema": "http://json-schema.org/draft-06/schema#",
          "type": "object",
          "properties": {
            "temp": {
              "type": "integer"
            },
            "id": {
              "type": "string"
            }
          },
         "required": [
            "temp",
            "id"
         ]
      }
      ```
   
   - **URL**：`External HTTP`を選択した場合はURLを指定します。外部HTTPスキーマの作成方法は[スキーマレジストリの例 - External HTTPサーバー](./schema-registry-example-external-http.md)を参照してください。

2. **Confirm**をクリックするとスキーマが作成され、スキーマレジストリページに戻ります。

## 外部スキーマレジストリ

EMQXプラットフォームは外部のConfluentスキーマレジストリ（CSR）の設定をサポートしています。この機能により、ルール処理時に外部レジストリからスキーマを動的に取得し、効率的なメッセージのエンコード・デコードが可能になります。

### 外部スキーマレジストリの作成

Smart Data Hubをご契約の場合、EMQXプラットフォームコンソールから直接外部スキーマレジストリを設定でき、スキーマ統合を簡単に管理できます。

::: tip 前提条件

開始前に[VPCピアリング接続](../deployments/vpc_peering)を作成してください。ピアリング接続確立後、内部ネットワークIP経由でプラットフォームコンソールにログインし外部スキーマレジストリにアクセス可能です。あるいは、[NATゲートウェイ](../vas/nat-gateway)を設定しパブリックIP経由でアクセスする方法もあります。

:::

1. デプロイメントの左メニューから**Schema Registry**をクリックし、スキーマレジストリページの**External Schema**タブを選択します。

2. 右上の**Create**ボタンをクリックし、以下の項目を設定します：

   - **Name**：エンコード・デコード関数で使用する外部スキーマレジストリ名を入力します。

   - **Type**：外部スキーマレジストリの種類を選択します。現在は`Confluent`のみ対応しています。

   - **URL**：Confluentスキーマレジストリのエンドポイントを入力します。

   - **Authentication**：`Basic auth`を選択した場合は、外部レジストリアクセス用の認証情報（ユーザー名とパスワード）を入力します。

3. 設定完了後、**Create**をクリックします。

### ルールエンジンでの外部スキーマレジストリの利用

外部レジストリを設定すると、EMQXプラットフォームのルールエンジンで以下の関数を使用し、外部レジストリに格納されたスキーマを用いてペイロードのエンコード・デコードが可能です。

::: tip

外部スキーマレジストリはメッセージ変換やスキーマ検証には使用できません。

:::

設定済みの外部CSRを利用する関数例：

```sql
avro_encode('my_external_registry', payload, my_schema_id)
avro_decode('my_external_registry', payload, my_schema_id)
schema_encode_and_tag('my_local_avro_schema', 'my_external_registry', payload, 'my_subject')
schema_decode_tagged('my_external_registry', payload)
```

#### 関数使用例

以下の例では、次の値・変数名を使用しています：

- `my_external_registry`：EMQXプラットフォームで設定した外部レジストリ名
- `my_schema_id`：CSRに登録されたスキーマID（CSRでは常に整数）
- `my_local_avro_schema`：EMQXにローカル登録されたAvroスキーマ名
- `my_subject`：CSRで定義されたサブジェクト名

##### `avro_encode`

`avro_encode`は外部レジストリのスキーマIDを用いてペイロードをエンコードします。スキーマは実行時に動的に取得され、以降の処理でキャッシュされます。ConfluentスキーマレジストリではスキーマIDは整数です。

::: tip

エンコード時、ペイロードはルールエンジンの内部データ形式（デコード済みのマップ）である必要があるため、例では`json_decode`を使用しています。

:::

例：

```sql
select
  -- 123はCSRに登録されたスキーマID
  avro_encode('my_external_registry', json_decode(payload), 123) as encoded
from 't'
```

##### `avro_decode`

指定したスキーマIDの外部レジストリスキーマに基づき、Avroペイロードをデコードします。スキーマは実行時に動的取得され、以降の処理でキャッシュされます。

例：

```sql
select
  -- 123はCSRに登録されたスキーマID
  avro_decode('my_external_registry', payload, 123) as decoded
from 't'
```

##### `schema_encode_and_tag`

ローカルに登録されたAvroスキーマ、外部CSRスキーマ名、サブジェクトを使い、ペイロード（内部マップ形式）をエンコードし、スキーマIDでタグ付けします。スキーマIDはローカルスキーマをCSRに登録した際のものです。

例：

```sql
select
  schema_encode_and_tag(
    'my_local_avro_schema',
    'my_external_registry',
    json_decode(payload),
    'my_subject'
  ) as encoded
from 't'
```

##### `schema_decode_tagged`

CSR名を使い、スキーマIDでタグ付けされたペイロードをデコードします。

```sql
select
  schema_decode_tagged(
    'my_external_registry',
    payload
  ) as decoded
from 't'
```

## スキーマの管理

スキーマレジストリページには作成済みのスキーマが一覧表示され、管理が可能です。

### スキーマの編集

スキーマ一覧の**Actions**欄にある編集アイコンをクリックすると、説明、スキーマタイプ、スキーマ内容を変更できます。

### スキーマの削除

スキーマ一覧の**Actions**欄にある削除アイコンをクリックし、削除を確認するとスキーマが削除されます。
