Skip to content

スキーマバリデーション

EMQXには、指定したトピックからサブスクライバーにパブリッシュされるメッセージが、あらかじめ定義されたデータフォーマットに準拠していることを保証するための組み込みスキーマバリデーション機能があります。スキーマバリデーションは、JSON Schema、Protobuf、Avroなど複数のスキーマ形式および組み込みのSQLステートメントバリデーションをサポートしています。本ページでは、スキーマバリデーション機能の概要と利用方法について説明します。

なぜデータをバリデートするのか

クライアントはブローカーに対して非標準のメッセージをパブリッシュする可能性があり、これがサブスクライバーやデータシステムで例外を引き起こしたり、セキュリティリスクとなる場合があります。EMQXはデータフォーマットを早期にバリデートすることで、これらの非準拠メッセージを検知・ブロックし、システムの安定性と信頼性を確保します。スキーマバリデーションには以下のような利点があります。

  • データ整合性:MQTTメッセージの構造とフォーマットを検証し、データの一貫性と正確性を保証します。
  • データ品質:欠損や無効なフィールド、データ型、フォーマットをチェックし、データの品質を維持します。
  • 統一データモデル:チームやプロジェクト全体で統一されたデータモデルを使用することを保証し、不整合やエラーを減らします。
  • 再利用と共有:チームメンバー間でスキーマを再利用・共有でき、協力効率を高め、重複作業やエラーを減らします。
  • セキュリティ:悪意のあるまたは不正なフォーマットのメッセージの処理を防ぎ、セキュリティリスクを低減します。
  • 相互運用性:標準化されたフォーマットに準拠したメッセージを保証し、異なるデバイスやシステム間の通信を円滑にします。
  • デバッグ:無効または不正なフォーマットのメッセージを容易に特定・デバッグできます。

ワークフロー

メッセージがパブリッシュされると、あらかじめ定義されたルールに基づいてバリデーションが行われます。バリデーションが成功すれば処理は継続され、失敗した場合はユーザー設定のアクションが実行されます(例:メッセージ破棄や切断など)。

  1. メッセージがパブリッシュされると、まずEMQXの認可機構でパブリッシュ権限がチェックされます。権限チェックを通過した後、ユーザーが設定したバリデータのリストから、パブリッシュされたトピックに基づいてバリデーションルールがマッチングされます。1つのバリデータは複数のトピックやトピックフィルターに設定可能です。

  2. バリデーションルールがマッチすると、メッセージはあらかじめ設定されたスキーマまたはSQLに対してバリデートされます。

    • JSON Schema、Protobuf、Avroなど複数のスキーマタイプをサポート。
    • EMQXルールエンジンの構文に準拠したSQLステートメントをサポート。
    • 1つのポリシーに複数のスキーマやSQLを追加し、それらの関係を指定可能:
      • All Pass:すべてのバリデーションが成功した場合のみ成功とみなす。
      • Any Pass:いずれかのバリデーションが成功した時点で成功とみなす。
  3. バリデーションに成功すると、メッセージはルールエンジンのトリガーやサブスクライバーへの配信など次の処理に進みます。

  4. バリデーションに失敗した場合、以下のユーザー設定アクションが実行されます。

    • メッセージ破棄:パブリッシュを終了しメッセージを破棄。QoS 1およびQoS 2メッセージにはPUBACKで特定の理由コード(131 - Implementation Specific Error)を返します。
    • 切断してメッセージ破棄:メッセージを破棄し、パブリッシュしたクライアントを切断。
    • 無視:追加のアクションは行わない。

    設定されたアクションにかかわらず、バリデーション失敗時にログ出力が可能で、ログの出力レベルはユーザーが設定でき、デフォルトはwarningです。バリデーション失敗はルールエンジンイベント$events/schema_validation_failedをトリガー可能で、ユーザーはこのイベントをキャッチして、誤ったメッセージを別トピックにパブリッシュしたりKafkaに送信して解析するなどのカスタム処理が行えます。

ユーザーガイド

このセクションでは、スキーマバリデーション機能の設定方法とテスト方法を紹介します。

ダッシュボードでのスキーマバリデーション設定

ダッシュボードでスキーマバリデータを作成・設定する手順を説明します。

  1. ダッシュボードの左ナビゲーションで Smart Data Hub -> Schema Validation をクリックします。

  2. Schema Validation ページ右上の Create をクリックします。

  3. スキーマバリデーション作成ページで以下を設定します:

    • Name:バリデータの名前を入力。
    • Message Source Topic:バリデーション対象のメッセージトピックを設定。複数のトピックやトピックフィルターを設定可能。
    • Note(任意):メモを入力。
    • Validation Method
      • Validation Strategy:複数のバリデーション方式の関係を指定。
        • All Pass(デフォルト):すべてのバリデーション方式が成功した場合に合格とみなす。
        • Any Pass:いずれかのバリデーション方式が成功した時点で合格とみなす。
      • Validation ListType ドロップダウンからスキーマを選択し、スキーマまたはSQLを追加。スキーマの作成方法はCreate Validation Schemaを参照。
    • Validation Failure Operation
      • Action After Failure:バリデーション失敗時に実行するアクションを選択:
        • Drop Message:パブリッシュを終了しメッセージを破棄。QoS 1およびQoS 2メッセージにはPUBACKで特定の理由コードを返す。
        • Disconnect and Drop Message:メッセージを破棄し、パブリッシュクライアントを切断。
        • Ignore:追加のアクションは行わない。
    • Output Logs:バリデーション失敗時にログを出力するか選択。デフォルトは有効。
    • Logs Level:ログの出力レベルを設定。デフォルトはwarning
  4. Create をクリックして設定を完了します。

これで有効な新しいバリデータがSchema Validationページのリストに表示されます。必要に応じて無効化可能です。Actions列のSettingsをクリックするとバリデータ設定を更新できます。Moreからはバリデータの削除や順序変更も可能です。

設定ファイルでのスキーマバリデーション設定

設定の詳細はConfiguration Manualを参照してください。

バリデーションスキーマの作成

ここではJSON Schemaを例にバリデーションスキーマの作成方法を示します。JSON Schemaは以下の要件を満たす必要があります。

  • JSONオブジェクトにtempというプロパティが含まれていること。
  • tempプロパティは整数型であること。
  • tempプロパティの値は101以上であること。
json
{
  "$schema": "http://json-schema.org/draft-06/schema#",
  "type": "object",
  "properties": {
    "temp": {
      "type": "integer",
      "minimum": 101
    }
  },
  "required": ["temp"]
}

スキーマバリデーション設定のテスト

Create Validation Schemaで作成した例のスキーマを使って、スキーマバリデーション設定をテストできます。

mqttxを使って、MQTTメッセージルールに準拠したペイロードのメッセージをパブリッシュします。

bash
mqttx pub -t t/1 -m '{"temp": 102}'

mqttxを使って、MQTTメッセージルールに準拠しないペイロードのメッセージをパブリッシュします。

bash
mqttx pub -t t/1 -m '{"temp": 100}'

ログ出力は以下のようになります。

bash
2024-05-16T06:24:10.733827+00:00 [warning] tag: SCHEMA_VALIDATION, clientid: mqttx_1db4547e, msg: validation_failed, peername: 127.0.0.1:40850, action: drop, validation: <<"check-json">>

REST API

REST API経由でのスキーマバリデーションの詳細な利用方法はEMQX Enterprise APIを参照してください。

統計と指標

スキーマバリデーションを有効にすると、ダッシュボード上で統計と指標が表示されます。Schema Validationページでバリデータ名をクリックすると以下を確認できます。

統計情報

  • Total:システム起動以降のトリガー総数。
  • Success:成功したデータバリデーション数。
  • Failed:失敗したデータバリデーション数。

レート指標

  • 現在の検証速度
  • 過去5分間の速度
  • 過去の最大速度

統計はリセット可能で、Prometheusにも追加され、/prometheus/schema_validationパスからアクセスできます。