Skip to content

スキーマバリデーション

EMQXには、指定されたトピックからサブスクライバーにパブリッシュされるメッセージがあらかじめ定義されたデータフォーマットに準拠していることを保証するための組み込みスキーマバリデーション機能があります。スキーマバリデーションはJSON Schema、Protobuf、Avroなど複数のスキーマ形式および組み込みのSQL文バリデーションをサポートしています。本ページではスキーマバリデーションの機能と利用方法について説明します。

なぜデータをバリデートするのか

クライアントはブローカーに非標準のメッセージをパブリッシュする可能性があり、それがサブスクライバーやデータシステムで例外を引き起こしたり、セキュリティリスクとなることがあります。EMQXはデータフォーマットを早期にバリデートすることでこれらの非準拠メッセージを検出・ブロックし、システムの安定性と信頼性を確保します。スキーマバリデーションの利点は以下の通りです。

  • データ整合性:MQTTメッセージの構造とフォーマットを検証し、データの一貫性と正確性を保証します。
  • データ品質:欠損や無効なフィールド、データ型、フォーマットをチェックし、データの品質と一貫性を維持します。
  • 統一データモデル:チームやプロジェクト全体で統一されたデータモデルを使用し、不整合やエラーを減らします。
  • 再利用と共有:チームメンバー間でスキーマを再利用・共有でき、コラボレーション効率を高め、重複作業やエラーを削減します。
  • セキュリティ:悪意のあるまたは誤ったフォーマットのメッセージの処理を防ぎ、セキュリティ脆弱性のリスクを低減します。
  • 相互運用性:メッセージが標準化されたフォーマットに準拠することで、異なるデバイスやシステム間の通信を円滑にします。
  • デバッグ:無効または誤ったフォーマットのメッセージを容易に特定・デバッグできます。

ワークフロー

メッセージがパブリッシュされると、あらかじめ定義されたルールに基づいてバリデーションが行われます。バリデーションに成功すれば処理が継続され、失敗した場合はユーザー設定のアクションが実行されます(例:メッセージ破棄や切断など)。

  1. メッセージがパブリッシュされると、まずEMQXの認可機構を通じてパブリッシュ権限がチェックされます。権限チェックを通過した後、ユーザー設定のバリデータ一覧からパブリッシュされたトピックに基づいてバリデーションルールがマッチングされます。1つのバリデータは複数のトピックやトピックフィルターに設定可能です。

  2. バリデーションルールがマッチすると、メッセージはあらかじめ設定されたスキーマまたはSQLに対してバリデーションされます。

    • JSON Schema、Protobuf、Avroなど複数のスキーマタイプをサポート。
    • EMQXルールエンジンの構文に準拠したSQL文もサポート。
    • 1つのポリシーに複数のスキーマやSQLを追加し、それらの関係を指定可能:
      • All Pass:すべてのバリデーションが成功した場合のみ成功とみなす。
      • Any Pass:いずれかのバリデーションが成功した時点で成功とみなす。
  3. バリデーションが成功すると、メッセージはルールエンジンのトリガーやサブスクライバーへの配信など次の処理に進みます。

  4. バリデーションが失敗した場合、以下のユーザー設定のアクションを実行できます。

    • メッセージ破棄:パブリッシュを終了しメッセージを破棄、QoS 1およびQoS 2メッセージにはPUBACKで特定の理由コード(131 - Implementation Specific Error)を返します。
    • 切断してメッセージ破棄:メッセージを破棄し、パブリッシュしたクライアントを切断します。
    • 無視:追加のアクションは行いません。

    設定されたアクションにかかわらず、バリデーション失敗時にログを生成可能で、ログの出力レベルはユーザーが設定でき、デフォルトはwarningです。バリデーション失敗はルールエンジンイベント$events/schema_validation/failedをトリガーでき、ユーザーはこのイベントをキャッチして、誤ったメッセージを別トピックにパブリッシュしたりKafkaに送信して解析するなどのカスタム処理が可能です。

ユーザーガイド

このセクションではスキーマバリデーション機能の設定方法とテスト方法を説明します。

ダッシュボードでのスキーマバリデーション設定

ダッシュボードでスキーマバリデータを作成・設定する手順を示します。

  1. ダッシュボードの左ナビゲーションで Smart Data Hub -> Schema Validation をクリックします。

  2. Schema Validation ページ右上の Create をクリックします。

  3. 「Create Schema Validation」ページで以下を設定します:

    • Name:バリデータの名前を入力します。
    • Message Source Topic:バリデーション対象とするメッセージのトピックを設定します。複数のトピックやトピックフィルターを設定可能です。
    • Note(任意):メモを入力します。
    • Validation Method
      • Validation Strategy:複数のバリデーション戦略の関係を指定します。
        • All Pass(デフォルト):すべてのバリデーションが成功した場合に成功とみなします。
        • Any Pass:いずれかのバリデーションが成功した時点で成功とみなします。
      • Validation ListType ドロップダウンからスキーマを選択し、スキーマまたはSQLを追加します。スキーマの作成方法はCreate Validation Schemaを参照してください。
    • Validation Failure Operation
      • Action After Failure:バリデーション失敗時に実行するアクションを選択します。
        • Drop Message:パブリッシュを終了しメッセージを破棄、QoS 1およびQoS 2メッセージにはPUBACKで特定の理由コードを返します。
        • Disconnect and Drop Message:メッセージを破棄し、パブリッシュクライアントを切断します。
        • Ignore:追加のアクションは行いません。
    • Output Logs:バリデーション失敗時にログを出力するか選択します。デフォルトは有効です。
    • Logs Level:ログの出力レベルを設定します。デフォルトはwarningです。
  4. Create をクリックして設定を完了します。

これで新しい有効なバリデータがSchema Validationページの一覧に表示されます。必要に応じて無効化できます。Actions列のSettingsをクリックするとバリデータの設定を更新可能です。また、Moreをクリックしてバリデータの削除や順序変更も行えます。

設定ファイルでのスキーマバリデーション設定

設定の詳細はConfiguration Manualを参照してください。

バリデーションスキーマの作成

ここではJSON Schemaを例にバリデーションスキーマの作成方法を示します。JSON Schemaは以下の条件を満たす必要があります。

  • JSONオブジェクトにtempというプロパティを含むこと。
  • tempプロパティは整数であること。
  • tempプロパティは101以上であること。
json
{
  "$schema": "http://json-schema.org/draft-06/schema#",
  "type": "object",
  "properties": {
    "temp": {
      "type": "integer",
      "minimum": 101
    }
  },
  "required": ["temp"]
}

スキーマバリデーション設定のテスト

Create Validation Schemaで作成した例のスキーマを使ってスキーマバリデーションの設定をテストできます。

mqttxを使い、MQTTメッセージルールに準拠したペイロードでメッセージをパブリッシュします。

bash
mqttx pub -t t/1 -m '{"temp": 102}'

MQTTメッセージルールに準拠しないペイロードでメッセージをパブリッシュします。

bash
mqttx pub -t t/1 -m '{"temp": 100}'

ログ出力は以下のようになります。

bash
2024-05-16T06:24:10.733827+00:00 [warning] tag: SCHEMA_VALIDATION, clientid: mqttx_1db4547e, msg: validation_failed, peername: 127.0.0.1:40850, action: drop, validation: <<"check-json">>

REST API

REST APIを使ったスキーマバリデーションの詳細な利用方法はEMQX Enterprise APIを参照してください。

統計と指標

有効化すると、スキーマバリデーションの統計と指標がダッシュボードに表示されます。Schema Validationページでバリデータの名前をクリックすると以下を確認できます。

統計情報

  • Total:システム起動以来のトリガー総数。
  • Success:成功したデータバリデーション数。
  • Failed:失敗したデータバリデーション数。

レート指標

  • 現在の検証速度
  • 過去5分間の速度
  • 過去の最大速度

統計はリセット可能で、Prometheusにも追加されます。Prometheusは/prometheus/schema_validationパスからアクセス可能です。