Skip to content

スキーマバリデーション

スキーマバリデーション機能は、MQTTメッセージをさまざまな方法で検証し、あらかじめ定義されたデータフォーマットに一致するメッセージのみをサブスクライバーにパブリッシュすることで、非準拠データが下流システムに流入するのを防ぎます。スキーマバリデーションは、JSON Schema、Protobuf、Avroなどのスキーマ形式を使用したり、組み込みのSQL文を利用して指定トピックのメッセージフォーマットを検証したりできます。本ページでは、スキーマバリデーション機能の概要と使用方法、設定方法を紹介します。

なぜデータを検証するのか

クライアントは非標準のメッセージをブローカーにパブリッシュする可能性があり、それがサブスクライバーやデータシステムで例外を引き起こしたり、セキュリティリスクをもたらしたりすることがあります。EMQXはデータフォーマットを早期に検証することで、これらの非準拠メッセージを識別・ブロックし、システムの安定性と信頼性を確保します。スキーマバリデーションの利点は以下の通りです。

  • データ整合性:MQTTメッセージの構造とフォーマットを検証し、データの一貫性と正確性を保証します。
  • データ品質:欠損や無効なフィールド、データ型、フォーマットをチェックし、データの一貫性と品質を強制します。
  • 統一データモデル:チーム全体やプロジェクトで統一されたデータモデルを使用し、不整合やエラーを減らします。
  • 再利用と共有:スキーマをチーム内で再利用・共有でき、協力効率が向上し、重複作業やミスを減らします。
  • セキュリティ:悪意のあるメッセージや誤ったフォーマットのメッセージの処理を防ぎ、セキュリティ脆弱性のリスクを低減します。
  • 相互運用性:メッセージが標準化されたフォーマットに準拠することを保証し、異なるデバイスやシステム間の通信を容易にします。
  • デバッグ:無効または誤ったフォーマットのメッセージを容易に特定・デバッグできます。

ワークフロー

メッセージがパブリッシュされると、事前に定義されたルールに基づいて検証が行われます。検証が成功すればメッセージは処理を継続し、失敗した場合はユーザーが定義したアクション(メッセージ破棄やクライアント切断など)が実行されます。

  1. メッセージがパブリッシュされると、EMQX Cloudの認可機能がまずクライアントのパブリッシュ権限を確認します。権限が確認されると、公開されたトピックに基づいてユーザー設定の検証リストの検証ルールをチェックします。検証ルールは複数のトピックまたはトピックフィルターを含めることができます。

  2. 検証ルールにマッチすると、メッセージは設定済みのスキーマまたはSQLで検証されます。

    • JSON Schema、Protobuf、Avroなど複数のスキーマタイプをサポートします。
    • EMQXルールエンジンの構文に準拠したSQL文もサポートします。
    • 1つのポリシーで複数のスキーマやSQLを追加し、それらの関係を指定できます:
      • All Pass:すべての検証が成功した場合のみ検証成功とみなします。
      • Any Pass:いずれかの検証が成功した時点で検証を終了し、成功とみなします。
  3. 検証が成功すると、メッセージはルールエンジンのトリガーやサブスクライバーへの配信など次の処理に進みます。

  4. 検証が失敗した場合、以下のユーザー設定アクションが実行されます。

    • メッセージ破棄:パブリッシュを終了しメッセージを破棄します。QoS 1およびQoS 2メッセージにはPUBACKで特定の理由コード(131 - Implementation Specific Error)を返します。
    • 切断してメッセージ破棄:メッセージを破棄し、パブリッシュクライアントを切断します。
    • 無視:追加のアクションは行いません。

    検証失敗後のアクションにかかわらず、検証失敗メッセージはデプロイメントログに記録されます。スキーマバリデーションルールではログ出力レベルを設定でき、デフォルトはwarningです。

    さらに、検証失敗はデータ統合ルールのSQL内で「validation failed」イベントをトリガーできます。イベントトピックは$events/schema_validation_failedです。ユーザーはこのイベントをキャプチャして、誤ったメッセージを別トピックにパブリッシュしたり、Kafkaに送信して分析したりするカスタム処理が可能です。

使用例

Smart Data Hubにサブスクライブしている場合は、Smart Data Hub -> Schema Validationページにアクセスできます。このページでスキーマバリデーションルールを作成し、メッセージパブリッシュをシミュレーションして検証ルールの効果をテストできます。

JSON Schemaによる検証の使用例

  1. 新しいJSON Schemaを作成します。スキーマレジストリのドキュメントに従い、新しいJSON Schemaを作成します。名前はJSON_schemaとし、以下のスキーマを定義します。

    json
    {
      "$schema": "http://json-schema.org/draft-06/schema#",
      "type": "object",
      "properties": {
        "temp": {
          "type": "integer"
        },
        "id": {
          "type": "string"
        }
      },
      "required": [
        "temp",
        "id"
      ]
    }
  2. 新しいJSON Schema検証ルールを作成します。スキーマバリデーションページで右上の**+ New**ボタンをクリックし、新規スキーマバリデーションページに入り、以下のように検証ルールを設定します。

    • 名前:検証ルールを識別するための名前を入力します。
    • 備考:任意で検証ルールの備考を追加します。
    • メッセージ送信元トピック:検証対象とするトピックを設定します。複数のトピックやトピックフィルターを設定可能です。本例ではt/#を設定します。
    • 検証方法
      • 検証戦略:すべての検証が成功する必要があるか、いずれか1つの検証が成功すればよいかを指定します。
        • All Pass:すべての検証方法が成功した場合のみメッセージを有効とみなします。
        • Any Pass:いずれかの検証方法が成功した時点で検証を終了し、メッセージを有効とみなします。
      • 検証リスト:検証に使用するスキーマまたはSQL文のタイプを指定します。スキーマレジストリで作成したスキーマを使用するか、SQLタイプを選択してSQL文を入力できます。本例ではステップ1で作成したJSON_schemaを選択します。
    • 検証失敗時の操作
      • 失敗時のアクション:検証失敗後のMQTTメッセージまたはパブリッシュクライアントの動作を指定します。
        • メッセージ破棄:パブリッシュを終了しメッセージを破棄します。QoS 1およびQoS 2メッセージにはPUBACKで理由コード(131 - Implementation Specific Error)を返します。
        • 切断してメッセージ破棄:メッセージを破棄し、パブリッシュクライアントを切断します。
        • 無視:何もしません。
      • ログレベル:検証失敗時のログ出力レベルをデプロイメントログに設定します。warningまたはerrorを選択可能です。

    json_schema_validator

  3. クライアントツールMQTTXを使って検証ルールをテストします。MQTTXをシミュレーションクライアントとしてデプロイメントに接続し、トピックt/#をサブスクライブします。

    • まず、JSON_schemaに準拠したMQTTメッセージをトピックt/1にパブリッシュすると、メッセージは正常に受信されます。

    • 次に、JSON_schemaに準拠しないMQTTメッセージをトピックt/2にパブリッシュすると、検証ルールに合致しないため正常に受信されません。

    validate_json_schema

  4. スキーマバリデーションルールの実行状況を確認します。

    • デプロイメントログで検証失敗ログを確認できます。例:

      bash
      2025-01-20 17:09:44 (UTC+08:00)[emqx-node-2] Warning
      clientid: mqttx_31338552, peername: 101.224.70.14:39452, username: user, pid: <0.92295.0>, line: 288, action: drop, tag: SCHEMA_VALIDATION, validation: use_json_schema_validator, msg: validation_failed
    • スキーマバリデーション一覧の該当ルール名をクリックすると詳細ページに入り、関連する統計情報を閲覧できます。

Avro Schemaによる検証の使用例

Avro Schemaの検証はJSON Schemaの検証と同様です。検証ルール作成時に、検証リストのTypeフィールドでAvroを選択し、Schema/SQLフィールドで事前に作成したAvro Schemaの名前を指定します。

Protobuf Schemaによる検証の使用例

Protobuf Schemaの検証もJSON Schemaの検証と似ています。検証ルール作成時に、検証リストのTypeフィールドでProtobufを選択し、Schema/SQLフィールドで事前に作成したProtobuf Schemaの名前を指定します。

さらに、JSON SchemaやAvro Schemaと異なり、Protobufタイプを選択した場合は、Protobuf Schemaで定義されたMessage Typeも指定する必要があります。例えば、以下のProtobuf SchemaではメッセージタイプはDeviceです。

proto
message Device {
  required string id = 1;
  required uint32 temp = 2;
}

スキーマバリデーション統計の閲覧

スキーマバリデーション一覧の名前をクリックすると詳細ページに入り、スキーマ実行に関する統計情報を確認できます。

  • 統計情報
    • マッチ数:有効化以降に検証ルールがトリガーされた総回数。
    • 成功数:データ検証が成功した回数。
    • 失敗数:データ検証が失敗した回数。
  • レート指標
    • 現在のレート:過去1分間の検証回数の推移。
    • 過去5分のレート:直近5分間の検証レート。
    • 最大レート:記録された最大検証レート。

スキーマバリデーションの管理

スキーマバリデーション一覧ページでは、以下の管理操作が可能です。

  • スキーマバリデーションの有効化/無効化:Enabled列のトグルボタンをクリック。
  • スキーマバリデーションの編集:Actions列の「編集」アイコンをクリックし編集ページに入り、変更後に保存。
  • スキーマバリデーションの削除:Actions列の「その他」アイコンをクリックし、ドロップダウンメニューから「削除」を選択して確定。
  • スキーマバリデーションの順序調整:一覧の行をドラッグして並び替えるか、Actions列の「その他」メニュー内のクイック移動ボタンを使用。