スキーマバリデーション
EMQXには、指定したトピックからサブスクライバーにパブリッシュされるメッセージが、あらかじめ定義されたデータフォーマットに準拠していることを保証するための組み込みスキーマバリデーション機能があります。スキーマバリデーションは、JSON Schema、Protobuf、Avroなど複数のスキーマ形式および組み込みのSQLステートメントバリデーションをサポートしています。本ページでは、スキーマバリデーション機能の概要と利用方法について説明します。
なぜデータをバリデートするのか
クライアントはブローカーに対して非標準のメッセージをパブリッシュする可能性があり、これがサブスクライバーやデータシステムで例外を引き起こしたり、セキュリティリスクとなる場合があります。EMQXはデータフォーマットを早期にバリデートすることで、これらの非準拠メッセージを検知・ブロックし、システムの安定性と信頼性を確保します。スキーマバリデーションには以下のような利点があります。
- データ整合性:MQTTメッセージの構造とフォーマットを検証し、データの一貫性と正確性を保証します。
- データ品質:欠損や無効なフィールド、データ型、フォーマットをチェックし、データの品質を維持します。
- 統一データモデル:チームやプロジェクト全体で統一されたデータモデルを使用することを保証し、不整合やエラーを減らします。
- 再利用と共有:チームメンバー間でスキーマを再利用・共有でき、協力効率を高め、重複作業やエラーを減らします。
- セキュリティ:悪意のあるまたは不正なフォーマットのメッセージの処理を防ぎ、セキュリティリスクを低減します。
- 相互運用性:標準化されたフォーマットに準拠したメッセージを保証し、異なるデバイスやシステム間の通信を円滑にします。
- デバッグ:無効または不正なフォーマットのメッセージを容易に特定・デバッグできます。
ワークフロー
メッセージがパブリッシュされると、あらかじめ定義されたルールに基づいてバリデーションが行われます。バリデーションが成功すれば処理は継続され、失敗した場合はユーザー設定のアクションが実行されます(例:メッセージ破棄や切断など)。
メッセージがパブリッシュされると、まずEMQXの認可機構でパブリッシュ権限がチェックされます。権限チェックを通過した後、ユーザーが設定したバリデータのリストから、パブリッシュされたトピックに基づいてバリデーションルールがマッチングされます。1つのバリデータは複数のトピックやトピックフィルターに設定可能です。
バリデーションルールがマッチすると、メッセージはあらかじめ設定されたスキーマまたはSQLに対してバリデートされます。
- JSON Schema、Protobuf、Avroなど複数のスキーマタイプをサポート。
- EMQXルールエンジンの構文に準拠したSQLステートメントをサポート。
- 1つのポリシーに複数のスキーマやSQLを追加し、それらの関係を指定可能:
- All Pass:すべてのバリデーションが成功した場合のみ成功とみなす。
- Any Pass:いずれかのバリデーションが成功した時点で成功とみなす。
バリデーションに成功すると、メッセージはルールエンジンのトリガーやサブスクライバーへの配信など次の処理に進みます。
バリデーションに失敗した場合、以下のユーザー設定アクションが実行されます。
- メッセージ破棄:パブリッシュを終了しメッセージを破棄。QoS 1およびQoS 2メッセージにはPUBACKで特定の理由コード(131 - Implementation Specific Error)を返します。
- 切断してメッセージ破棄:メッセージを破棄し、パブリッシュしたクライアントを切断。
- 無視:追加のアクションは行わない。
設定されたアクションにかかわらず、バリデーション失敗時にログ出力が可能で、ログの出力レベルはユーザーが設定でき、デフォルトは
warning
です。バリデーション失敗はルールエンジンイベント$events/schema_validation_failed
をトリガー可能で、ユーザーはこのイベントをキャッチして、誤ったメッセージを別トピックにパブリッシュしたりKafkaに送信して解析するなどのカスタム処理が行えます。
ユーザーガイド
このセクションでは、スキーマバリデーション機能の設定方法とテスト方法を紹介します。
ダッシュボードでのスキーマバリデーション設定
ダッシュボードでスキーマバリデータを作成・設定する手順を説明します。
ダッシュボードの左ナビゲーションで Smart Data Hub -> Schema Validation をクリックします。
Schema Validation ページ右上の Create をクリックします。
スキーマバリデーション作成ページで以下を設定します:
- Name:バリデータの名前を入力。
- Message Source Topic:バリデーション対象のメッセージトピックを設定。複数のトピックやトピックフィルターを設定可能。
- Note(任意):メモを入力。
- Validation Method:
- Validation Strategy:複数のバリデーション方式の関係を指定。
- All Pass(デフォルト):すべてのバリデーション方式が成功した場合に合格とみなす。
- Any Pass:いずれかのバリデーション方式が成功した時点で合格とみなす。
- Validation List:Type ドロップダウンからスキーマを選択し、スキーマまたはSQLを追加。スキーマの作成方法はCreate Validation Schemaを参照。
- Validation Strategy:複数のバリデーション方式の関係を指定。
- Validation Failure Operation:
- Action After Failure:バリデーション失敗時に実行するアクションを選択:
- Drop Message:パブリッシュを終了しメッセージを破棄。QoS 1およびQoS 2メッセージにはPUBACKで特定の理由コードを返す。
- Disconnect and Drop Message:メッセージを破棄し、パブリッシュクライアントを切断。
- Ignore:追加のアクションは行わない。
- Action After Failure:バリデーション失敗時に実行するアクションを選択:
- Output Logs:バリデーション失敗時にログを出力するか選択。デフォルトは有効。
- Logs Level:ログの出力レベルを設定。デフォルトは
warning
。
Create をクリックして設定を完了します。
これで有効な新しいバリデータがSchema Validationページのリストに表示されます。必要に応じて無効化可能です。Actions列のSettingsをクリックするとバリデータ設定を更新できます。Moreからはバリデータの削除や順序変更も可能です。
設定ファイルでのスキーマバリデーション設定
設定の詳細はConfiguration Manualを参照してください。
バリデーションスキーマの作成
ここではJSON Schemaを例にバリデーションスキーマの作成方法を示します。JSON Schemaは以下の要件を満たす必要があります。
- JSONオブジェクトに
temp
というプロパティが含まれていること。 temp
プロパティは整数型であること。temp
プロパティの値は101以上であること。
{
"$schema": "http://json-schema.org/draft-06/schema#",
"type": "object",
"properties": {
"temp": {
"type": "integer",
"minimum": 101
}
},
"required": ["temp"]
}
スキーマバリデーション設定のテスト
Create Validation Schemaで作成した例のスキーマを使って、スキーマバリデーション設定をテストできます。
mqttxを使って、MQTTメッセージルールに準拠したペイロードのメッセージをパブリッシュします。
mqttx pub -t t/1 -m '{"temp": 102}'
mqttxを使って、MQTTメッセージルールに準拠しないペイロードのメッセージをパブリッシュします。
mqttx pub -t t/1 -m '{"temp": 100}'
ログ出力は以下のようになります。
2024-05-16T06:24:10.733827+00:00 [warning] tag: SCHEMA_VALIDATION, clientid: mqttx_1db4547e, msg: validation_failed, peername: 127.0.0.1:40850, action: drop, validation: <<"check-json">>
REST API
REST API経由でのスキーマバリデーションの詳細な利用方法はEMQX Enterprise APIを参照してください。
統計と指標
スキーマバリデーションを有効にすると、ダッシュボード上で統計と指標が表示されます。Schema Validationページでバリデータ名をクリックすると以下を確認できます。
統計情報:
- Total:システム起動以降のトリガー総数。
- Success:成功したデータバリデーション数。
- Failed:失敗したデータバリデーション数。
レート指標:
- 現在の検証速度
- 過去5分間の速度
- 過去の最大速度
統計はリセット可能で、Prometheusにも追加され、/prometheus/schema_validation
パスからアクセスできます。