# スキーマバリデーション

スキーマバリデーション機能は、MQTTメッセージをさまざまな方法で検証し、あらかじめ定義されたデータ形式に合致するメッセージのみがサブスクライバーにパブリッシュされるようにすることで、非準拠データが下流システムに流入するのを防ぎます。スキーマバリデーションは、JSON Schema、Protobuf、Avroなどのスキーマ形式を使用したり、指定したトピックのメッセージ形式を検証するために組み込みのSQL文を利用したりできます。本ページでは、スキーマバリデーション機能の概要と使い方、設定方法を紹介します。

## データ検証の必要性

クライアントは非標準のメッセージをブローカーにパブリッシュすることがあり、これがサブスクライバーやデータシステムで例外を引き起こしたり、セキュリティリスクとなったりする可能性があります。EMQXはデータ形式を早期に検証することで、これらの非準拠メッセージを特定しブロックし、システムの安定性と信頼性を確保します。スキーマバリデーションは以下の点でメリットをもたらします。

- **データ整合性**：MQTTメッセージの構造や形式を検証し、データの一貫性と正確性を保証します。
- **データ品質**：欠損や無効なフィールド、データ型や形式のチェックを行い、データの品質を担保します。
- **統一データモデル**：チームやプロジェクト全体で統一されたデータモデルを使用し、不整合やエラーを減らします。
- **再利用と共有**：スキーマをチーム内で再利用・共有でき、協力効率を高め、作業の重複やミスを減らします。
- **セキュリティ**：悪意のあるメッセージや誤った形式のメッセージの処理を防ぎ、セキュリティリスクを低減します。
- **相互運用性**：標準化された形式に準拠したメッセージを保証し、異なるデバイスやシステム間の通信を円滑にします。
- **デバッグ**：無効または誤った形式のメッセージを容易に特定し、デバッグが可能です。

## ワークフロー

メッセージがパブリッシュされると、システムはあらかじめ定義されたルールに従って検証を行います。検証が成功すればメッセージは処理を継続し、失敗した場合はユーザーが定義したアクション（メッセージ破棄やクライアント切断など）が実行されます。

1. メッセージがパブリッシュされると、EMQXプラットフォームの[認可](../deployments/authz_overview.md)機能がまずクライアントのパブリッシュ権限をチェックします。権限が確認されると、公開されたトピックに基づいてユーザー設定の検証リスト内の検証ルールがチェックされます。検証ルールは複数のトピックやトピックフィルターを含むことができます。

2. 検証ルールにマッチすると、メッセージはあらかじめ設定されたスキーマまたはSQLに対して検証されます。

   - JSON Schema、Protobuf、Avroなど複数のスキーマタイプをサポートします。
   - EMQXルールエンジンの構文に準拠したSQL文もサポートします。
   - 1つのポリシーに複数のスキーマやSQLを追加し、その関係性を指定できます：
     - **All Pass**：すべての検証が成功した場合のみ検証成功とみなします。
     - **Any Pass**：いずれかの検証が成功した時点で検証成功とみなします。

3. 検証が成功すると、メッセージはルールエンジンのトリガーやサブスクライバーへの配信など次の処理へ進みます。

4. 検証が失敗した場合、以下のユーザー設定アクションが実行されます：

   - **メッセージ破棄**：パブリッシュを終了しメッセージを破棄します。QoS 1およびQoS 2メッセージの場合は、PUBACKで特定の理由コード（131 - Implementation Specific Error）を返します。
   - **切断してメッセージ破棄**：メッセージを破棄し、パブリッシュクライアントを切断します。
   - **無視**：追加の処理は行いません。

   検証失敗後のアクションにかかわらず、検証失敗メッセージはデプロイメントログに記録されます。スキーマバリデーションルールではログ出力レベルを設定でき、デフォルトは`warning`です。

   また、検証失敗はデータ統合ルールのSQL内で「validation failed」イベントをトリガーできます。このイベントの[イベントトピック](../rule_engine/rule_engine_events.md#mqtt-events)は`$events/schema_validation_failed`です。ユーザーはこのイベントをキャプチャして、誤ったメッセージを別トピックにパブリッシュしたり、Kafkaへ送信して解析したりするカスタム処理が可能です。

## 利用例

Smart Data Hubにサブスクライブしている場合は、**Smart Data Hub** -> **Schema Validation** ページにアクセスできます。このページでスキーマバリデーションルールを作成し、その後メッセージパブリッシュをシミュレーションしてルールの有効性をテストできます。

### JSON Schemaによる検証の利用

1. 新しいJSON Schemaを作成します。[スキーマレジストリ](./schema_registry.md)のドキュメントに従い、新しいJSON Schemaを作成します。名前は`JSON_schema`とし、以下のスキーマを定義します。

   ```json
   {
     "$schema": "http://json-schema.org/draft-06/schema#",
     "type": "object",
     "properties": {
       "temp": {
         "type": "integer"
       },
       "id": {
         "type": "string"
       }
     },
     "required": [
       "temp",
       "id"
     ]
   }
   ```

2. 新しいJSON Schemaバリデーションルールを作成します。スキーマバリデーションページの右上にある**+ New**ボタンをクリックし、新規スキーマバリデーションページに入り、以下のように設定します。

   - **Name**：スキーマバリデーションルールの識別用の名前を入力します。
   - **Remarks**：任意でルールの備考を追加します。
   - **Message Source Topic**：検証対象とするトピックを設定します。複数のトピックやトピックフィルターを設定可能です。この例では`t/#`に設定します。
   - **Validation Method**：
     - **Validation Strategy**：すべての検証が通る必要があるか、いずれか1つが通ればよいかを指定します。
       - **All Pass**：すべての検証が成功した場合のみメッセージを有効とみなします。
       - **Any Pass**：いずれかの検証が成功した時点で検証を終了し、有効とみなします。
     - **Validation List**：検証に使用するスキーマまたはSQL文のタイプを指定します。スキーマレジストリで作成したスキーマを使うか、SQLタイプを選択してSQL文を入力します。この例ではステップ1で作成した`JSON_schema`を選択します。
   - **Validation Failure Operation**：
     - **Action After Failure**：検証失敗時のMQTTメッセージやパブリッシュクライアントの動作を指定します。
       - **Drop Message**：パブリッシュを終了しメッセージを破棄します。QoS 1およびQoS 2メッセージの場合はPUBACKで理由コード（131 - Implementation Specific Error）を返します。
       - **Disconnect and Drop Message**：メッセージを破棄し、パブリッシュクライアントを切断します。
       - **Ignore**：何もしません。
     - **Logs Level**：検証失敗時のデプロイメントログのログレベルを設定します。`warning`または`error`を選択可能です。

   ![json_schema_validator](./_assets/json_schema_validator.png)

3. クライアントツール[MQTTX](https://mqttx.app)を使って検証ルールをテストします。MQTTXをシミュレーションクライアントとしてデプロイメントに接続し、トピック`t/#`をサブスクライブします。

   - まず、`JSON_schema`に準拠したMQTTメッセージをトピック`t/1`にパブリッシュすると、メッセージは正常に受信されます。

   - 次に、`JSON_schema`に準拠しないMQTTメッセージをトピック`t/2`にパブリッシュすると、検証ルールに合致しないためメッセージは正常に受信されません。

   ![validate_json_schema](./_assets/validate_json_schema.png)

4. スキーマバリデーションルールの実行状況を確認します。

   - デプロイメントログで検証失敗ログを確認できます。例：

     ```bash
     2025-01-20 17:09:44 (UTC+08:00)[emqx-node-2] Warning
     clientid: mqttx_31338552, peername: 101.224.70.14:39452, username: user, pid: <0.92295.0>, line: 288, action: drop, tag: SCHEMA_VALIDATION, validation: use_json_schema_validator, msg: validation_failed
     ```

   - スキーマバリデーション一覧の該当ルール名をクリックすると詳細ページに入り、関連する[統計情報](#スキーマバリデーションの統計情報確認)を閲覧できます。

### Avro Schemaによる検証の利用

Avro Schemaによる検証はJSON Schemaとほぼ同様です。検証ルール作成時にバリデーターリストの**Type**欄で`Avro`を選択し、**Schema/SQL**欄で事前に作成したAvro Schemaの名前を選択します。

### Protobuf Schemaによる検証の利用

Protobuf Schemaによる検証もJSON Schemaとほぼ同様です。検証ルール作成時にバリデーターリストの**Type**欄で`Protobuf`を選択し、**Schema/SQL**欄で事前に作成したProtobuf Schemaの名前を選択します。

ただし、JSON SchemaやAvro Schemaと異なり、Protobufタイプを選択した場合は、Protobuf Schemaで定義された**Message Type**も指定する必要があります。例えば、以下のProtobuf Schemaではメッセージタイプは`Device`です。

```proto
message Device {
  required string id = 1;
  required uint32 temp = 2;
}
```

## スキーマバリデーションの統計情報確認

スキーマバリデーション一覧の名前をクリックすると詳細ページに入り、スキーマ実行に関する統計指標を確認できます。

- **統計情報**：
  - **Matched**：有効化以降、検証ルールがトリガーされた総回数。
  - **Success**：検証成功したデータの件数。
  - **Failure**：検証失敗したデータの件数。
- **レート指標**：
  - **Current Rate**：直近1分間の検証回数の推移。
  - **Rate in Last 5 Min**：過去5分間の検証レート。
  - **Maximum Rate**：記録された最大検証レート。

## スキーマバリデーションの管理

スキーマバリデーション一覧ページでは以下の管理操作が可能です。

- スキーマバリデーションの有効化／無効化：**Enabled**列のトグルボタンをクリックします。
- スキーマバリデーションの編集：**Actions**列の「編集」アイコンをクリックし編集ページに入り、変更後に保存します。
- スキーマバリデーションの削除：**Actions**列の「その他」アイコンをクリックし、ドロップダウンメニューから「削除」を選択し、削除を確定します。
- スキーマバリデーションの順序調整：一覧の行をマウスでドラッグして並び替えるか、**Actions**列の「その他」メニュー内のクイック移動ボタンを使用します。
