スキーマ検証
EMQXには、指定されたトピックからサブスクライバーにパブリッシュされるメッセージがあらかじめ定義されたデータ形式に準拠していることを保証するための組み込みスキーマ検証機能があります。スキーマ検証はJSON Schema、Protobuf、Avroなどの複数のスキーマ形式および組み込みのSQL文検証をサポートしています。本ページではスキーマ検証機能の概要と利用方法について説明します。
なぜデータを検証するのか
クライアントは非標準のメッセージをブローカーにパブリッシュする可能性があり、それがサブスクライバーやデータシステムで例外を引き起こしたり、セキュリティリスクとなる場合があります。EMQXはデータ形式を早期に検証することでこれら非準拠メッセージを識別・ブロックし、システムの安定性と信頼性を確保します。スキーマ検証は以下のような利点をもたらします。
- データ整合性:MQTTメッセージの構造と形式を検証し、データの一貫性と正確性を保証します。
- データ品質:欠損や無効なフィールド、データ型や形式をチェックし、データの品質を維持します。
- 統一データモデル:チームやプロジェクト全体で統一されたデータモデルを使用し、不整合やエラーを減らします。
- 再利用と共有:スキーマをチーム内で再利用・共有でき、協力効率を高め、繰り返し作業やエラーを減らします。
- セキュリティ:悪意のあるメッセージや誤った形式のメッセージの処理を防ぎ、セキュリティリスクを低減します。
- 相互運用性:標準化された形式に準拠したメッセージを保証し、異なるデバイスやシステム間の通信を促進します。
- デバッグ:無効または誤った形式のメッセージを容易に特定し、デバッグできます。
ワークフロー
メッセージがパブリッシュされると、あらかじめ定義されたルールに基づいて検証が行われます。検証が成功すれば処理は継続され、失敗した場合はユーザー設定のアクションが実行されます(メッセージ破棄や切断など)。
メッセージがパブリッシュされると、まずEMQXの認可機構でパブリッシュ権限が確認されます。権限チェックを通過すると、ユーザー設定の検証リストからパブリッシュされたトピックに基づき検証ルールがマッチングされます。1つのバリデーターは複数のトピックやトピックフィルターに設定可能です。
検証ルールがマッチすると、メッセージはプリセットされたスキーマまたはSQLに対して検証されます。
- JSON Schema、Protobuf、Avroなど複数のスキーマタイプをサポート。
- EMQXルールエンジンの構文に準拠したSQL文をサポート。
- 1つのポリシーで複数のスキーマやSQLを追加し、それらの関係を指定可能:
- All Pass:すべての検証が成功した場合のみ検証成功とみなす。
- Any Pass:いずれかの検証が成功した時点で検証成功とみなす。
検証が成功すると、ルールエンジンのトリガーやサブスクライバーへの配信など次の処理に進みます。
検証に失敗した場合、以下のユーザー設定アクションが実行されます。
- メッセージ破棄:パブリッシュを終了しメッセージを破棄、QoS 1およびQoS 2メッセージにはPUBACKで特定の理由コード(131 - Implementation Specific Error)を返します。
- 切断してメッセージ破棄:メッセージを破棄し、パブリッシュ元クライアントを切断します。
- 無視:追加のアクションは行いません。
設定されたアクションにかかわらず、検証失敗時にログを出力可能で、ログの出力レベルはユーザーが設定でき、デフォルトは
warningです。検証失敗はルールエンジンイベント$events/schema_validation/failedをトリガーし、ユーザーはこのイベントをキャッチして誤ったメッセージを別トピックにパブリッシュしたり、Kafkaへ送信して解析するなどのカスタム処理が可能です。
ユーザーガイド
以下ではスキーマ検証機能の設定方法とテスト方法を説明します。
ダッシュボードでのスキーマ検証設定
ダッシュボードでスキーマバリデーターを作成・設定する手順です。
ダッシュボード左ナビゲーションの Smart Data Hub -> Schema Validation をクリックします。
Schema Validation ページ右上の Create をクリックします。
Create Schema Validationページで以下の設定を行います:
- Name:バリデーターの名前を入力します。
- Message Source Topic:検証対象とするメッセージのトピックを設定します。複数のトピックやトピックフィルターを設定可能です。
- Note(任意):メモを入力します。
- Validation Method:
- Validation Strategy:複数の検証方法間の関係を指定します。
- All Pass(デフォルト):すべての検証方法が成功した場合のみ成功とみなします。
- Any Pass:いずれかの検証方法が成功した時点で検証を終了し成功とみなします。
- Validation List:Type ドロップダウンからスキーマを選択し、スキーマまたはSQLを追加します。スキーマの作成方法はCreate Validation Schemaを参照してください。
- Validation Strategy:複数の検証方法間の関係を指定します。
- Validation Failure Operation:
- Action After Failure:検証失敗時に実行するアクションを選択します。
- Drop Message:パブリッシュを終了しメッセージを破棄、QoS 1およびQoS 2メッセージにはPUBACKで特定の理由コードを返します。
- Disconnect and Drop Message:メッセージを破棄し、パブリッシュ元クライアントを切断します。
- Ignore:追加のアクションは行いません。
- Action After Failure:検証失敗時に実行するアクションを選択します。
- Output Logs:検証失敗時にログを出力するか選択します。デフォルトは有効です。
- Logs Level:ログの出力レベルを設定します。デフォルトは
warningです。
Create をクリックして設定を完了します。
設定完了後、スキーマ検証ページのリストに有効な新しいバリデーターが表示されます。必要に応じて無効化可能です。Actions列のSettingsをクリックするとバリデーターの設定を更新できます。Moreをクリックするとバリデーターの削除や順序変更も可能です。
設定ファイルでのスキーマ検証設定
設定の詳細はConfiguration Manualを参照してください。
検証スキーマの作成
ここではJSON Schemaを例に検証スキーマの作成方法を示します。JSON Schemaは以下の要件を満たす必要があります。
- JSONオブジェクトに
tempというプロパティが含まれていること。 tempプロパティは整数型であること。tempプロパティは101以上であること。
{
"$schema": "http://json-schema.org/draft-06/schema#",
"type": "object",
"properties": {
"temp": {
"type": "integer",
"minimum": 101
}
},
"required": ["temp"]
}スキーマ検証設定のテスト
Create Validation Schemaで作成した例のスキーマを使ってスキーマ検証設定をテストできます。
mqttxを使い、MQTTメッセージルールに準拠したペイロードでメッセージをパブリッシュします。
mqttx pub -t t/1 -m '{"temp": 102}'MQTTメッセージルールに準拠しないペイロードでメッセージをパブリッシュします。
mqttx pub -t t/1 -m '{"temp": 100}'ログ出力は以下のようになります。
2024-05-16T06:24:10.733827+00:00 [warning] tag: SCHEMA_VALIDATION, clientid: mqttx_1db4547e, msg: validation_failed, peername: 127.0.0.1:40850, action: drop, validation: <<"check-json">>REST API
REST APIを通じたスキーマ検証の詳細な利用方法はEMQX Enterprise APIを参照してください。
統計と指標
スキーマ検証を有効にすると、ダッシュボード上で統計情報と指標が表示されます。スキーマ検証ページでバリデーター名をクリックすると以下を確認できます。
統計情報:
- Total:システム起動以降のトリガー総数。
- Success:成功したデータ検証の件数。
- Failed:失敗したデータ検証の件数。
レート指標:
- 現在の検証速度
- 過去5分間の速度
- 過去の最大速度
統計情報はリセット可能で、Prometheusにも追加されます。Prometheusは/prometheus/schema_validationパスからアクセス可能です。