スキーマバリデーション
スキーマバリデーション機能は、MQTTメッセージをさまざまな方法で検証し、あらかじめ定義されたデータ形式に合致するメッセージのみがサブスクライバーにパブリッシュされるようにすることで、非準拠データが下流システムに流入するのを防ぎます。スキーマバリデーションは、JSON Schema、Protobuf、Avroなどのスキーマ形式を使用したり、指定したトピックのメッセージ形式を検証するために組み込みのSQL文を利用したりできます。本ページでは、スキーマバリデーション機能の概要と使い方、設定方法を紹介します。
データ検証の必要性
クライアントは非標準のメッセージをブローカーにパブリッシュすることがあり、これがサブスクライバーやデータシステムで例外を引き起こしたり、セキュリティリスクとなったりする可能性があります。EMQXはデータ形式を早期に検証することで、これらの非準拠メッセージを特定しブロックし、システムの安定性と信頼性を確保します。スキーマバリデーションは以下の点でメリットをもたらします。
- データ整合性:MQTTメッセージの構造や形式を検証し、データの一貫性と正確性を保証します。
- データ品質:欠損や無効なフィールド、データ型や形式のチェックを行い、データの品質を担保します。
- 統一データモデル:チームやプロジェクト全体で統一されたデータモデルを使用し、不整合やエラーを減らします。
- 再利用と共有:スキーマをチーム内で再利用・共有でき、協力効率を高め、作業の重複やミスを減らします。
- セキュリティ:悪意のあるメッセージや誤った形式のメッセージの処理を防ぎ、セキュリティリスクを低減します。
- 相互運用性:標準化された形式に準拠したメッセージを保証し、異なるデバイスやシステム間の通信を円滑にします。
- デバッグ:無効または誤った形式のメッセージを容易に特定し、デバッグが可能です。
ワークフロー
メッセージがパブリッシュされると、システムはあらかじめ定義されたルールに従って検証を行います。検証が成功すればメッセージは処理を継続し、失敗した場合はユーザーが定義したアクション(メッセージ破棄やクライアント切断など)が実行されます。
メッセージがパブリッシュされると、EMQXプラットフォームの認可機能がまずクライアントのパブリッシュ権限をチェックします。権限が確認されると、公開されたトピックに基づいてユーザー設定の検証リスト内の検証ルールがチェックされます。検証ルールは複数のトピックやトピックフィルターを含むことができます。
検証ルールにマッチすると、メッセージはあらかじめ設定されたスキーマまたはSQLに対して検証されます。
- JSON Schema、Protobuf、Avroなど複数のスキーマタイプをサポートします。
- EMQXルールエンジンの構文に準拠したSQL文もサポートします。
- 1つのポリシーに複数のスキーマやSQLを追加し、その関係性を指定できます:
- All Pass:すべての検証が成功した場合のみ検証成功とみなします。
- Any Pass:いずれかの検証が成功した時点で検証成功とみなします。
検証が成功すると、メッセージはルールエンジンのトリガーやサブスクライバーへの配信など次の処理へ進みます。
検証が失敗した場合、以下のユーザー設定アクションが実行されます:
- メッセージ破棄:パブリッシュを終了しメッセージを破棄します。QoS 1およびQoS 2メッセージの場合は、PUBACKで特定の理由コード(131 - Implementation Specific Error)を返します。
- 切断してメッセージ破棄:メッセージを破棄し、パブリッシュクライアントを切断します。
- 無視:追加の処理は行いません。
検証失敗後のアクションにかかわらず、検証失敗メッセージはデプロイメントログに記録されます。スキーマバリデーションルールではログ出力レベルを設定でき、デフォルトは
warning
です。また、検証失敗はデータ統合ルールのSQL内で「validation failed」イベントをトリガーできます。このイベントのイベントトピックは
$events/schema_validation_failed
です。ユーザーはこのイベントをキャプチャして、誤ったメッセージを別トピックにパブリッシュしたり、Kafkaへ送信して解析したりするカスタム処理が可能です。
利用例
Smart Data Hubにサブスクライブしている場合は、Smart Data Hub -> Schema Validation ページにアクセスできます。このページでスキーマバリデーションルールを作成し、その後メッセージパブリッシュをシミュレーションしてルールの有効性をテストできます。
JSON Schemaによる検証の利用
新しいJSON Schemaを作成します。スキーマレジストリのドキュメントに従い、新しいJSON Schemaを作成します。名前は
JSON_schema
とし、以下のスキーマを定義します。json{ "$schema": "http://json-schema.org/draft-06/schema#", "type": "object", "properties": { "temp": { "type": "integer" }, "id": { "type": "string" } }, "required": [ "temp", "id" ] }
新しいJSON Schemaバリデーションルールを作成します。スキーマバリデーションページの右上にある**+ New**ボタンをクリックし、新規スキーマバリデーションページに入り、以下のように設定します。
- Name:スキーマバリデーションルールの識別用の名前を入力します。
- Remarks:任意でルールの備考を追加します。
- Message Source Topic:検証対象とするトピックを設定します。複数のトピックやトピックフィルターを設定可能です。この例では
t/#
に設定します。 - Validation Method:
- Validation Strategy:すべての検証が通る必要があるか、いずれか1つが通ればよいかを指定します。
- All Pass:すべての検証が成功した場合のみメッセージを有効とみなします。
- Any Pass:いずれかの検証が成功した時点で検証を終了し、有効とみなします。
- Validation List:検証に使用するスキーマまたはSQL文のタイプを指定します。スキーマレジストリで作成したスキーマを使うか、SQLタイプを選択してSQL文を入力します。この例ではステップ1で作成した
JSON_schema
を選択します。
- Validation Strategy:すべての検証が通る必要があるか、いずれか1つが通ればよいかを指定します。
- Validation Failure Operation:
- Action After Failure:検証失敗時のMQTTメッセージやパブリッシュクライアントの動作を指定します。
- Drop Message:パブリッシュを終了しメッセージを破棄します。QoS 1およびQoS 2メッセージの場合はPUBACKで理由コード(131 - Implementation Specific Error)を返します。
- Disconnect and Drop Message:メッセージを破棄し、パブリッシュクライアントを切断します。
- Ignore:何もしません。
- Logs Level:検証失敗時のデプロイメントログのログレベルを設定します。
warning
またはerror
を選択可能です。
- Action After Failure:検証失敗時のMQTTメッセージやパブリッシュクライアントの動作を指定します。
クライアントツールMQTTXを使って検証ルールをテストします。MQTTXをシミュレーションクライアントとしてデプロイメントに接続し、トピック
t/#
をサブスクライブします。まず、
JSON_schema
に準拠したMQTTメッセージをトピックt/1
にパブリッシュすると、メッセージは正常に受信されます。次に、
JSON_schema
に準拠しないMQTTメッセージをトピックt/2
にパブリッシュすると、検証ルールに合致しないためメッセージは正常に受信されません。
スキーマバリデーションルールの実行状況を確認します。
デプロイメントログで検証失敗ログを確認できます。例:
bash2025-01-20 17:09:44 (UTC+08:00)[emqx-node-2] Warning clientid: mqttx_31338552, peername: 101.224.70.14:39452, username: user, pid: <0.92295.0>, line: 288, action: drop, tag: SCHEMA_VALIDATION, validation: use_json_schema_validator, msg: validation_failed
スキーマバリデーション一覧の該当ルール名をクリックすると詳細ページに入り、関連する統計情報を閲覧できます。
Avro Schemaによる検証の利用
Avro Schemaによる検証はJSON Schemaとほぼ同様です。検証ルール作成時にバリデーターリストのType欄でAvro
を選択し、Schema/SQL欄で事前に作成したAvro Schemaの名前を選択します。
Protobuf Schemaによる検証の利用
Protobuf Schemaによる検証もJSON Schemaとほぼ同様です。検証ルール作成時にバリデーターリストのType欄でProtobuf
を選択し、Schema/SQL欄で事前に作成したProtobuf Schemaの名前を選択します。
ただし、JSON SchemaやAvro Schemaと異なり、Protobufタイプを選択した場合は、Protobuf Schemaで定義されたMessage Typeも指定する必要があります。例えば、以下のProtobuf SchemaではメッセージタイプはDevice
です。
message Device {
required string id = 1;
required uint32 temp = 2;
}
スキーマバリデーションの統計情報確認
スキーマバリデーション一覧の名前をクリックすると詳細ページに入り、スキーマ実行に関する統計指標を確認できます。
- 統計情報:
- Matched:有効化以降、検証ルールがトリガーされた総回数。
- Success:検証成功したデータの件数。
- Failure:検証失敗したデータの件数。
- レート指標:
- Current Rate:直近1分間の検証回数の推移。
- Rate in Last 5 Min:過去5分間の検証レート。
- Maximum Rate:記録された最大検証レート。
スキーマバリデーションの管理
スキーマバリデーション一覧ページでは以下の管理操作が可能です。
- スキーマバリデーションの有効化/無効化:Enabled列のトグルボタンをクリックします。
- スキーマバリデーションの編集:Actions列の「編集」アイコンをクリックし編集ページに入り、変更後に保存します。
- スキーマバリデーションの削除:Actions列の「その他」アイコンをクリックし、ドロップダウンメニューから「削除」を選択し、削除を確定します。
- スキーマバリデーションの順序調整:一覧の行をマウスでドラッグして並び替えるか、Actions列の「その他」メニュー内のクイック移動ボタンを使用します。