Skip to content

スキーマバリデーション

EMQXには、指定したトピックからサブスクライブされるメッセージがあらかじめ定義されたデータ形式に準拠していることを保証するための組み込みのスキーマバリデーション機能があります。スキーマバリデーションはJSON Schema、Protobuf、Avroなど複数のスキーマ形式および組み込みのSQL文による検証をサポートしています。本ページではスキーマバリデーションの機能概要と利用方法について説明します。

データをバリデートする理由

クライアントがブローカーに非標準のメッセージをパブリッシュすると、サブスクライバーやデータシステムで例外が発生したり、セキュリティリスクを引き起こしたりする可能性があります。EMQXはデータ形式を早期に検証してこれらの非準拠メッセージを検出・ブロックすることで、システムの安定性と信頼性を確保します。スキーマバリデーションは以下のような利点をもたらします。

  • データ整合性:MQTTメッセージの構造と形式を検証し、データの一貫性と正確性を保証します。
  • データ品質:欠損や無効なフィールド、データ型、形式をチェックし、データの品質を維持します。
  • 統一データモデル:チーム全体やプロジェクトで統一されたデータモデルを使用し、不整合やエラーを減らします。
  • 再利用と共有:スキーマをチーム内で再利用・共有できるため、協力効率が向上し、作業の重複やミスを減らせます。
  • セキュリティ:悪意のあるメッセージや不正な形式のメッセージの処理を防ぎ、セキュリティリスクを低減します。
  • 相互運用性:標準化された形式に準拠したメッセージを保証し、異なるデバイスやシステム間の通信を円滑にします。
  • デバッグ:無効または誤った形式のメッセージを容易に特定・デバッグできます。

ワークフロー

メッセージがパブリッシュされると、あらかじめ定義されたルールに基づいて検証が行われます。検証に成功すれば処理が継続され、失敗した場合はユーザー設定のアクションが実行されます(メッセージ破棄や切断など)。

  1. メッセージのパブリッシュ時に、まずEMQXの認可機構でパブリッシュ権限が確認されます。権限チェックを通過した後、ユーザー設定の検証リストからパブリッシュされたトピックに基づいて検証ルールがマッチングされます。1つのバリデーターは複数のトピックやトピックフィルターに設定可能です。

  2. 検証ルールがマッチすると、メッセージはあらかじめ設定されたスキーマまたはSQLに対して検証されます。

    • JSON Schema、Protobuf、Avroなど複数のスキーマタイプをサポート。
    • EMQXルールエンジンの構文に準拠したSQL文もサポート。
    • 1つのポリシーに複数のスキーマやSQLを追加し、以下の関係性を指定可能:
      • すべて合格:すべての検証が合格した場合のみ成功とみなす。
      • いずれか合格:いずれかの検証が合格した時点で検証を停止し成功とみなす。
  3. 検証に成功すると、ルールエンジンのトリガーやサブスクライバーへの配信など次の処理に進みます。

  4. 検証に失敗した場合、以下のユーザー設定アクションが実行されます。

    • メッセージ破棄:パブリッシュを終了しメッセージを破棄。QoS 1およびQoS 2メッセージにはPUBACKで特定の理由コード(131 - Implementation Specific Error)を返します。
    • 切断してメッセージ破棄:メッセージを破棄し、パブリッシュしたクライアントを切断。
    • 無視:追加のアクションは行いません。

    設定されたアクションにかかわらず、検証失敗時にログを出力可能で、ログの出力レベルはユーザーが設定でき、デフォルトはwarningです。検証失敗はルールエンジンイベント$events/schema_validation/failedをトリガーでき、ユーザーはこのイベントをキャッチして、誤ったメッセージを別トピックにパブリッシュしたりKafkaに送信して解析したりするカスタム処理が可能です。

ユーザーガイド

このセクションでは、スキーマバリデーション機能の設定方法と動作確認方法を説明します。

ダッシュボードでのスキーマバリデーション設定

ダッシュボードでスキーマバリデーターを作成・設定する手順は以下の通りです。

  1. ダッシュボード左ナビゲーションの Smart Data Hub -> Schema Validation をクリックします。

  2. Schema Validation ページ右上の Create をクリックします。

  3. 「Create Schema Validation」ページで以下の設定を行います。

    • Name:バリデーターの名前を入力します。
    • Message Source Topic:検証対象とするメッセージのトピックを設定します。複数のトピックやトピックフィルターを設定可能です。
    • Note(任意):メモを入力します。
    • Validation Method
      • Validation Strategy:複数の検証方式の関係性を指定します。
        • All Pass(デフォルト):すべての検証方式が合格した場合のみ合格とみなします。
        • Any Pass:いずれかの検証方式が合格した時点で検証を停止し合格とみなします。
      • Validation ListType ドロップダウンからスキーマを選択し、スキーマまたはSQLを追加します。スキーマの作成方法はCreate Validation Schemaを参照してください。
    • Validation Failure Operation
      • Action After Failure:検証失敗時に実行するアクションを選択します。
        • Drop Message:パブリッシュを終了しメッセージを破棄。QoS 1およびQoS 2メッセージにはPUBACKで特定の理由コードを返します。
        • Disconnect and Drop Message:メッセージを破棄し、パブリッシュしたクライアントを切断します。
        • Ignore:追加のアクションは行いません。
    • Output Logs:検証失敗時にログを出力するか選択します。デフォルトは有効です。
    • Logs Level:ログの出力レベルを設定します。デフォルトはwarningです。
  4. Create をクリックして設定を完了します。

設定完了後、スキーマバリデーションページのリストに有効な新しいバリデーターが表示されます。必要に応じて無効化可能です。Actions列のSettingsをクリックすると設定の更新ができ、Moreからはバリデーターの削除や順序変更も可能です。

設定ファイルでのスキーマバリデーション設定

設定の詳細はConfiguration Manualを参照してください。

バリデーションスキーマの作成

ここではJSON Schemaを例にバリデーションスキーマの作成方法を示します。JSON Schemaは以下の条件を満たす必要があります。

  • JSONオブジェクトにtempという名前のプロパティが含まれること。
  • tempプロパティは整数型であること。
  • tempプロパティの値は101以上であること。
json
{
  "$schema": "http://json-schema.org/draft-06/schema#",
  "type": "object",
  "properties": {
    "temp": {
      "type": "integer",
      "minimum": 101
    }
  },
  "required": ["temp"]
}

スキーマバリデーション設定のテスト

Create Validation Schemaで作成した例のスキーマを使って、スキーマバリデーションの動作をテストできます。

mqttxを使い、MQTTメッセージルールに準拠したペイロードでメッセージをパブリッシュします。

bash
mqttx pub -t t/1 -m '{"temp": 102}'

次に、ルールに準拠しないペイロードでメッセージをパブリッシュします。

bash
mqttx pub -t t/1 -m '{"temp": 100}'

ログ出力は以下のようになります。

bash
2024-05-16T06:24:10.733827+00:00 [warning] tag: SCHEMA_VALIDATION, clientid: mqttx_1db4547e, msg: validation_failed, peername: 127.0.0.1:40850, action: drop, validation: <<"check-json">>

REST API

REST APIを使ったスキーマバリデーションの詳細な利用方法はEMQX Enterprise APIを参照してください。

統計と指標

有効化すると、スキーマバリデーションの統計情報と指標がダッシュボードに表示されます。スキーマバリデーションページでバリデーター名をクリックすると以下を確認できます。

統計情報

  • Total:システム起動以降のトリガー総数。
  • Success:成功したデータ検証数。
  • Failed:失敗したデータ検証数。

レート指標

  • 現在の検証速度
  • 過去5分間の速度
  • 過去の最大速度

統計情報はリセット可能で、Prometheusにも追加されており、/prometheus/schema_validation パスからアクセス可能です。