Skip to content

Schema 验证

EMQX 内置了 Schema 验证功能,以确保只有符合预定数据格式的消息才会发布到订阅端。Schema 验证可使用 JSON Schema、Protobuf 和 Avro 等多种格式的模式,或使用内置的 SQL 语句验证来自指定主题的消息格式。本页介绍了 Schema 验证功能以及如何使用和配置该功能。

为什么要对数据做 Schema 验证

客户端可能会发布各种不规范的消息到 Broker,这些不合规消息可能会导致订阅者和数据系统处理异常或引发安全问题。进行数据格式校验,可以及早发现并屏蔽这些不合规消息,保证系统稳定可靠。Schema 验证能在以下几方面为您带来好处:

  • 数据完整性:验证 MQTT 消息的结构和格式,以确保数据的一致性和正确性。
  • 数据质量:强制执行数据质量,检查缺失或无效的字段、数据类型和格式,可以确保数据的质量和一致性。
  • 统一的数据模型:确保整个团队和项目中使用统一的数据模型,减少数据不一致和错误。
  • 重用和共享:允许团队成员重用和共享 schema,可以提高团队成员之间的协作效率,减少重复工作和错误。
  • 安全性:防止恶意或格式不正确的消息被处理,从而减少安全漏洞的风险。
  • 互操作性:确保消息符合标准化的格式,促进不同设备和系统之间的通信。
  • 调试:轻松地识别和调试无效或格式不正确的消息。

工作流程

当消息发布时,使用预设的验证规则对消息进行校验,如果验证成功则继续后续流程,否则根据用户配置执行相应的操作,如丢弃消息、断开连接。

  1. 当消息发布时,EMQX 的授权功能首先检查客户端的发布权限。权限检查通过后,在用户设置的验证列表中根据发布主题匹配验证规则。一个验证规则中可以设置多个主题或者主题过滤器。

  2. 匹配到验证规则后,将消息与预设的 Schema 或 SQL 进行校验。

    • 支持多种类型的 Schema: JSON Schema、Protobuf 和 Avro。
    • 支持符合 EMQX 规则引擎语法的 SQL 语句。
    • 同一个策略可以添加多个 Schema 或 SQL,并指定它们之间的关系。
      • 全部通过(all_pass):全部验证方式都通过验证时,才视为通过验证。
      • 任意通过(any_pass):任意一个验证方式通过时就终止后续的验证,视为通过验证。
  3. 一旦验证成功,消息将继续后面的流程,例如触发规则引擎、派发到订阅者。

  4. 如果验证失败,可以按照用户配置,执行以下操作:

    • 丢弃消息:终止发布并将消息丢弃,对于 QoS 1, QoS 2 消息,通过 PUBACK 返回给发布客户端相应的 reason code (131 - Implementation Specific Error)。
    • 断开连接:丢弃消息并断开发布客户端的连接。
    • 忽略:不做额外的操作。

    无论配置什么操作,验证失败都可以打印一条日志,用户可以配置日志的输出级别,默认是 warning 级别。当然,也可以选择不输出日志。

    另外,验证失败还会触发规则引擎的验证失败事件,事件主题是 $events/schema_validation_failed,用户可以捕获这个事件进行自定义处理,例如将错误的消息发布到另一个主题、或发送到 Kafka 中进行分析。

使用指导

本节演示了如何配置 Schema 验证功能以及如何测试您的设置。

在 Dashboard 中配置 Schema 验证

本节演示了如何在 Dashboard 中创建和配置验证规则。

  1. 点击 Dashboard 左侧导航中的集成 -> Schema 验证

  2. Schema 验证页面中点击右上方的创建

  3. 创建 Schema 验证页面中配置以下信息:

    • 名称:填写Schema 验证规则的名称。
    • 消息来源主题:设置发布到哪些主题的消息需要校验。可设置多个主题或主题过滤器。
    • 备注 (可选):填写备注信息。
    • 验证方式
      • 验证策略:指定验证多个验证策略之间的关系。默认设为任意通过
        • 全部通过:全部验证方式都通过验证时,才视为通过验证。
        • 任意通过:任意一个验证方式通过时就终止后续的验证,视为通过验证。
      • 验证器列表:在验证器列表中选择类型,添加验证 Schema 或 SQL。关于 Schema 的创建,详见创建验证 Schema
    • 验证失败操作
      • 失败操作: 选择如果验证失败需要执行的操作:
        • 丢弃消息:终止发布并将消息丢弃,对于 QoS 1, QoS 2 消息,通过 PUBACK 返回给发布客户端相应的 reason code (131 - Implementation Specific Error)。
        • 断开连接并丢弃消息:丢弃消息并断开发布客户端的连接。
        • 忽略:不做额外的操作。
    • 输出日志:选择验证失败是否打印一条日志,默认为启用。
    • 日志级别:设置日志的输出级别,默认为 warning
  4. 点击创建

现在您可以在 Schema 验证页面的列表中看到一个已启用的新验证规则。您可以根据需要禁用它。您可以通过点击操作列中的设置来更新验证规则设置。您还可以通过点击更多来删除验证规则或移动验证规则的顺序。

在配置文件中配置 Schema 验证

具体的配置信息,参见配置手册

创建验证 Schema

本节以 JSON Schema 为例创建一个示例验证 schema,要求:

  • 该 JSON 对象必须包含一个名为 temp 的属性。
  • temp 属性的值必须是一个整数(integer)。
  • temp 属性的值必须大于或等于 101。
json
{
  "$schema": "http://json-schema.org/draft-06/schema#",
  "type": "object",
  "properties": {
    "temp": {
      "type": "integer",
      "minimum": 101
    }
  },
  "required": [
    "temp"
  ]
}

测试 Schema 验证设置

基于创建验证 Schema 中的示例测试验证规则。

使用 mqttx 发布一条消息 payload 符合规则的 MQTT 消息:

bash
mqttx pub -t t/1 -m '{"temp": 102}'

使用 mqttx 发布一条消息 payload 不符合规则的 MQTT 消息:

bash
mqttx pub -t t/1 -m '{"temp": 100}'

此时日志将输出类似以下内容:

bash
2024-05-16T06:24:10.733827+00:00 [warning] tag: SCHEMA_VALIDATION, clientid: mqttx_1db4547e, msg: validation_failed, peername: 127.0.0.1:40850, action: drop, validation: <<"check-json">>

REST API

有关通过 REST API 使用 Schema 验证的详细信息,请参见 EMQX 企业版 API

统计与指标

启用后,Schema 验证会在 Dashboard 上显示统计和指标。您可以在Schema 验证页面点击验证器的名称,查看以下内容:

统计指标

  • 总数:系统启动以来触发的总次数。
  • 成功:数据验证成功的次数。
  • 失败:数据验证失败的次数。

速度指标

  • 当前速率:当前执行校验的速度
  • 最近5分钟速率:最近5分钟的校验速度
  • 最大速率:历史最大校验速度

统计数据可以重置,同时也被添加到 Prometheus 中,可通过 /prometheus/schema_validation 路径访问。