スキーマバリデーション
スキーマバリデーション機能は、MQTTメッセージをさまざまな方法で検証し、あらかじめ定義されたデータフォーマットに一致するメッセージのみをサブスクライバーにパブリッシュすることで、非準拠データが下流システムに流入するのを防ぎます。スキーマバリデーションは、JSON Schema、Protobuf、Avroなどのスキーマ形式を使用したり、組み込みのSQL文を利用して指定トピックのメッセージフォーマットを検証したりできます。本ページでは、スキーマバリデーション機能の概要と使用方法、設定方法を紹介します。
なぜデータを検証するのか
クライアントは非標準のメッセージをブローカーにパブリッシュする可能性があり、それがサブスクライバーやデータシステムで例外を引き起こしたり、セキュリティリスクをもたらしたりすることがあります。EMQXはデータフォーマットを早期に検証することで、これらの非準拠メッセージを識別・ブロックし、システムの安定性と信頼性を確保します。スキーマバリデーションの利点は以下の通りです。
- データ整合性:MQTTメッセージの構造とフォーマットを検証し、データの一貫性と正確性を保証します。
- データ品質:欠損や無効なフィールド、データ型、フォーマットをチェックし、データの一貫性と品質を強制します。
- 統一データモデル:チーム全体やプロジェクトで統一されたデータモデルを使用し、不整合やエラーを減らします。
- 再利用と共有:スキーマをチーム内で再利用・共有でき、協力効率が向上し、重複作業やミスを減らします。
- セキュリティ:悪意のあるメッセージや誤ったフォーマットのメッセージの処理を防ぎ、セキュリティ脆弱性のリスクを低減します。
- 相互運用性:メッセージが標準化されたフォーマットに準拠することを保証し、異なるデバイスやシステム間の通信を容易にします。
- デバッグ:無効または誤ったフォーマットのメッセージを容易に特定・デバッグできます。
ワークフロー
メッセージがパブリッシュされると、事前に定義されたルールに基づいて検証が行われます。検証が成功すればメッセージは処理を継続し、失敗した場合はユーザーが定義したアクション(メッセージ破棄やクライアント切断など)が実行されます。
メッセージがパブリッシュされると、EMQX Cloudの認可機能がまずクライアントのパブリッシュ権限を確認します。権限が確認されると、公開されたトピックに基づいてユーザー設定の検証リストの検証ルールをチェックします。検証ルールは複数のトピックまたはトピックフィルターを含めることができます。
検証ルールにマッチすると、メッセージは設定済みのスキーマまたはSQLで検証されます。
- JSON Schema、Protobuf、Avroなど複数のスキーマタイプをサポートします。
- EMQXルールエンジンの構文に準拠したSQL文もサポートします。
- 1つのポリシーで複数のスキーマやSQLを追加し、それらの関係を指定できます:
- All Pass:すべての検証が成功した場合のみ検証成功とみなします。
- Any Pass:いずれかの検証が成功した時点で検証を終了し、成功とみなします。
検証が成功すると、メッセージはルールエンジンのトリガーやサブスクライバーへの配信など次の処理に進みます。
検証が失敗した場合、以下のユーザー設定アクションが実行されます。
- メッセージ破棄:パブリッシュを終了しメッセージを破棄します。QoS 1およびQoS 2メッセージにはPUBACKで特定の理由コード(131 - Implementation Specific Error)を返します。
- 切断してメッセージ破棄:メッセージを破棄し、パブリッシュクライアントを切断します。
- 無視:追加のアクションは行いません。
検証失敗後のアクションにかかわらず、検証失敗メッセージはデプロイメントログに記録されます。スキーマバリデーションルールではログ出力レベルを設定でき、デフォルトは
warningです。さらに、検証失敗はデータ統合ルールのSQL内で「validation failed」イベントをトリガーできます。イベントトピックは
$events/schema_validation_failedです。ユーザーはこのイベントをキャプチャして、誤ったメッセージを別トピックにパブリッシュしたり、Kafkaに送信して分析したりするカスタム処理が可能です。
使用例
Smart Data Hubにサブスクライブしている場合は、Smart Data Hub -> Schema Validationページにアクセスできます。このページでスキーマバリデーションルールを作成し、メッセージパブリッシュをシミュレーションして検証ルールの効果をテストできます。
JSON Schemaによる検証の使用例
新しいJSON Schemaを作成します。スキーマレジストリのドキュメントに従い、新しいJSON Schemaを作成します。名前は
JSON_schemaとし、以下のスキーマを定義します。json{ "$schema": "http://json-schema.org/draft-06/schema#", "type": "object", "properties": { "temp": { "type": "integer" }, "id": { "type": "string" } }, "required": [ "temp", "id" ] }新しいJSON Schema検証ルールを作成します。スキーマバリデーションページで右上の**+ New**ボタンをクリックし、新規スキーマバリデーションページに入り、以下のように検証ルールを設定します。
- 名前:検証ルールを識別するための名前を入力します。
- 備考:任意で検証ルールの備考を追加します。
- メッセージ送信元トピック:検証対象とするトピックを設定します。複数のトピックやトピックフィルターを設定可能です。本例では
t/#を設定します。 - 検証方法:
- 検証戦略:すべての検証が成功する必要があるか、いずれか1つの検証が成功すればよいかを指定します。
- All Pass:すべての検証方法が成功した場合のみメッセージを有効とみなします。
- Any Pass:いずれかの検証方法が成功した時点で検証を終了し、メッセージを有効とみなします。
- 検証リスト:検証に使用するスキーマまたはSQL文のタイプを指定します。スキーマレジストリで作成したスキーマを使用するか、SQLタイプを選択してSQL文を入力できます。本例ではステップ1で作成した
JSON_schemaを選択します。
- 検証戦略:すべての検証が成功する必要があるか、いずれか1つの検証が成功すればよいかを指定します。
- 検証失敗時の操作:
- 失敗時のアクション:検証失敗後のMQTTメッセージまたはパブリッシュクライアントの動作を指定します。
- メッセージ破棄:パブリッシュを終了しメッセージを破棄します。QoS 1およびQoS 2メッセージにはPUBACKで理由コード(131 - Implementation Specific Error)を返します。
- 切断してメッセージ破棄:メッセージを破棄し、パブリッシュクライアントを切断します。
- 無視:何もしません。
- ログレベル:検証失敗時のログ出力レベルをデプロイメントログに設定します。
warningまたはerrorを選択可能です。
- 失敗時のアクション:検証失敗後のMQTTメッセージまたはパブリッシュクライアントの動作を指定します。

クライアントツールMQTTXを使って検証ルールをテストします。MQTTXをシミュレーションクライアントとしてデプロイメントに接続し、トピック
t/#をサブスクライブします。まず、
JSON_schemaに準拠したMQTTメッセージをトピックt/1にパブリッシュすると、メッセージは正常に受信されます。次に、
JSON_schemaに準拠しないMQTTメッセージをトピックt/2にパブリッシュすると、検証ルールに合致しないため正常に受信されません。

スキーマバリデーションルールの実行状況を確認します。
デプロイメントログで検証失敗ログを確認できます。例:
bash2025-01-20 17:09:44 (UTC+08:00)[emqx-node-2] Warning clientid: mqttx_31338552, peername: 101.224.70.14:39452, username: user, pid: <0.92295.0>, line: 288, action: drop, tag: SCHEMA_VALIDATION, validation: use_json_schema_validator, msg: validation_failedスキーマバリデーション一覧の該当ルール名をクリックすると詳細ページに入り、関連する統計情報を閲覧できます。
Avro Schemaによる検証の使用例
Avro Schemaの検証はJSON Schemaの検証と同様です。検証ルール作成時に、検証リストのTypeフィールドでAvroを選択し、Schema/SQLフィールドで事前に作成したAvro Schemaの名前を指定します。
Protobuf Schemaによる検証の使用例
Protobuf Schemaの検証もJSON Schemaの検証と似ています。検証ルール作成時に、検証リストのTypeフィールドでProtobufを選択し、Schema/SQLフィールドで事前に作成したProtobuf Schemaの名前を指定します。
さらに、JSON SchemaやAvro Schemaと異なり、Protobufタイプを選択した場合は、Protobuf Schemaで定義されたMessage Typeも指定する必要があります。例えば、以下のProtobuf SchemaではメッセージタイプはDeviceです。
message Device {
required string id = 1;
required uint32 temp = 2;
}スキーマバリデーション統計の閲覧
スキーマバリデーション一覧の名前をクリックすると詳細ページに入り、スキーマ実行に関する統計情報を確認できます。
- 統計情報:
- マッチ数:有効化以降に検証ルールがトリガーされた総回数。
- 成功数:データ検証が成功した回数。
- 失敗数:データ検証が失敗した回数。
- レート指標:
- 現在のレート:過去1分間の検証回数の推移。
- 過去5分のレート:直近5分間の検証レート。
- 最大レート:記録された最大検証レート。
スキーマバリデーションの管理
スキーマバリデーション一覧ページでは、以下の管理操作が可能です。
- スキーマバリデーションの有効化/無効化:Enabled列のトグルボタンをクリック。
- スキーマバリデーションの編集:Actions列の「編集」アイコンをクリックし編集ページに入り、変更後に保存。
- スキーマバリデーションの削除:Actions列の「その他」アイコンをクリックし、ドロップダウンメニューから「削除」を選択して確定。
- スキーマバリデーションの順序調整:一覧の行をドラッグして並び替えるか、Actions列の「その他」メニュー内のクイック移動ボタンを使用。