Skip to content

Schema 验证

Schema 验证功能支持通过多种方式对 MQTT 消息进行验证,以确保只有符合预定数据格式的消息才会发布到订阅端,从而防止不合规的数据流入下游系统。Schema 验证可使用 JSON Schema、Protobuf 和 Avro 等多种格式的模式,或使用内置的 SQL 语句验证来自指定主题的消息格式。本页介绍了 Schema 验证功能以及如何使用和配置该功能。

为什么要对数据做 Schema 验证

客户端可能会将多种不规范的消息发布到 MQTT Broker,这些不合规消息可能导致订阅者和数据系统出现处理异常或安全问题。通过数据格式校验,可以及早识别并阻止这些不合规消息,确保系统稳定可靠。Schema 验证在以下方面能为您提供多项优势:

  • 数据完整性:验证 MQTT 消息的结构和格式,以确保数据的一致性和正确性。
  • 数据质量:强制执行数据质量检查,包括检测缺失或无效的字段、数据类型和格式,以确保数据的高质量和一致性。
  • 统一的数据模型:在团队和项目中使用统一的数据模型,降低数据不一致和错误的风险。
  • 重用和共享:允许团队成员重用和共享 Schema,提高协作效率,减少重复工作和错误。
  • 安全性:阻止恶意或格式不正确的消息被处理,降低安全漏洞风险。
  • 互操作性:确保消息符合标准化格式,增强不同设备和系统之间的通信。
  • 调试:方便识别和调试无效或格式不正确的消息。

工作流程

当消息发布时,系统会依据预设的验证规则进行校验。如果验证成功,消息继续后续流程;若验证失败,则根据用户配置执行相应操作,例如丢弃消息或断开连接。

  1. 当消息发布时,EMQX Platform 的授权功能首先检查客户端的发布权限。权限检查通过后,在用户设置的验证列表中根据发布主题匹配验证规则。一个验证规则中可以设置多个主题或者主题过滤器。

  2. 匹配到验证规则后,将消息与预设的 Schema 或 SQL 进行校验。

    • 支持多种类型的 Schema: JSON Schema、Protobuf 和 Avro。
    • 支持符合 EMQX 规则引擎语法的 SQL 语句。
    • 同一个策略可以添加多个 Schema 或 SQL,并指定它们之间的关系。
      • 全部通过(all_pass):全部验证方式都通过验证时,才视为通过验证。
      • 任意通过(any_pass):任意一个验证方式通过时就终止后续的验证,视为通过验证。
  3. 验证成功后,消息将继续后续流程,如触发数据集成规则或派发给订阅者。

  4. 如果验证失败,则根据用户配置执行以下操作:

    • 丢弃消息:终止发布并丢弃消息。对于 QoS 1 或 QoS 2 消息,通过 PUBACK 向客户端返回相应的原因代码 (131 - Implementation Specific Error)。
    • 断开连接并丢弃消息:丢弃消息并断开发布客户端的连接。
    • 忽略:不做额外的操作。

    无论验证失败后是什么操作,部署日志中都将打印一条验证失败的信息。您可以在 Schema 验证规则中设置日志的输出级别,默认是 warning 级别。

    另外,验证失败还可以触发数据集成中规则 SQL 的验证失败事件,事件主题是 $events/schema_validation_failed,用户可以捕获这个事件进行自定义处理,例如将错误的消息发布到另一个主题、或发送到 Kafka 中进行分析。

使用示例

如果您已经开通数据智能中心,您可以通过点击部署左侧菜单中的数据智能中心 -> Schema 验证进入 Schema 验证页面。在该页面中,您可以创建 Schema 验证规则。之后,您可以通过模拟消息发布来测试验证规则是否有效。

使用 JSON Schema 验证

  1. 新建一个 JSON Schema。按照 Schema 管理文档新建一个 JSON Schema,名称输入 JSON_schema,Schema 定义为:

    json
    {
      "$schema": "http://json-schema.org/draft-06/schema#",
      "type": "object",
      "properties": {
        "temp": {
          "type": "integer"
        },
        "id": {
          "type": "string"
        }
      },
      "required": [
        "temp",
        "id"
      ]
    }
  2. 新建 JSON Schema 验证规则。在 Schema 验证页面中点击右上角的新建按钮,进入新建 Schema 验证页面,并根据以下指导进行配置验证规则:

    • 名称:填写 Schema 验证规则的名称,用于标识一条 Schema 验证规则。
    • 备注:可选,为验证规则添加备注信息。
    • 消息来源主题:设置发布到哪些主题的消息需要被验证验。可设置多个主题或主题过滤器。此示例中设置为 t/#
    • 验证方式
      • 验证策略:指定需要所有验证器全部通过或者任意一个通过。
        • 全部通过:全部验证方式都通过验证时,才视为通过验证。
        • 任意通过:任意一个验证方式通过时就终止后续的验证,视为通过验证。
      • 验证器列表:指定使用什么类型的 Schema 或 SQL 语句进行验证。可以使用在 Schema 管理中创建好的 Schema 或选择 SQL 类型并输入 SQL 语句。在此示例中,选择第一步中创建的 JSON_schema
    • 验证失败操作
      • 失败操作:指定验证失败之后,MQTT 消息或发布客户端的行为。
        • 丢弃消息:终止发布并将消息丢弃,对于 QoS 1, QoS 2 消息,通过 PUBACK 返回给发布客户端相应的 reason code (131 - Implementation Specific Error)。
        • 断开连接并丢弃消息:丢弃消息并断开发布客户端的连接。
        • 忽略:不做额外的操作。
      • 日志级别:定义输出验证失败部署日志的级别,可以选择 warningerror

    json_schema_validator

  3. 使用客户端工具 MQTTX 验证规则。将 MQTTX 作为一个模拟客户端连接到部署,并订阅主题 t/#

    • 先往主题 t/1 发布一条符合 JSON_schema 的 MQTT 消息,可以成功接收到消息。

    • 再往主题 t/2 发布一条不符合 JSON_schema 的 MQTT 消息,由于消息不符合验证规则,所以无法成功接收到消息。

    validate_json_schema

  4. 查看 Schema 验证规则执行情况。

    • 您可以在部署日志中查看到验证失败的日志,示例:

      bash
      2024-12-30 14:05:14 (UTC+08:00)[emqx-node-1] warning
      clientid: mqttx_3fcf2e42, peername: 121.41.174.170:57707, username: admin, pid: <0.31215.0>, line: 288, action: drop, tag: SCHEMA_VALIDATION, validation: use_json_schema_validator, msg: validation_failed
    • 在 Schema 验证列表点击对应的名称,进入详情页面即可查看到相关的统计指标

使用 Avro Schema 验证

Avro Schema 验证与 JSON Schema 验证类似。在创建验证规则时,验证器列表中类型选择 Avro ,并且在 Schema/SQL 中选择已创建好的 Avro Schema 的名称。

使用 Protobuf Schema 验证

Protobuf Schema 验证与 JSON Schema 验证类似。在创建验证规则时,验证器列表中类型选择 Protobuf ,并且在 Schema/SQL 中选择已创建好的 Protobuf Schema 名称。

此外,与 JSON Schema 和 Avro Schema 不同的是,当选择使用 Protobuf 类型时,还需要额外输入需要使用的 Protobuf Schema 中定义好的消息类型。例如:下述 Protobuf Schema 中的消息类型是 Device

proto
message Device {
  required string id = 1;
  required uint32 temp = 2;
}

查看统计指标

点击 Schema 验证列表中的名称进入详情页面,您可以查看到 Schema 执行相关的统计指标:

  • 统计指标
    • 执行次数:验证规则启用以来触发的总次数。
    • 成功:数据验证成功的次数。
    • 失败:数据验证失败的次数。
  • 速率统计
    • 当前速率:最近 1 分钟内的验证速度。
    • 最近 5 分钟速率:最近 5 分钟内的验证速度。
    • 最大速率:历史最大验证速度。

管理 Schema 验证

您可以在 Schema 验证列表页面进行以下管理操作:

  • 启用/停用 Schema 验证:在是否启用列点击开关按钮。
  • 编辑 Schema 验证:在操作列中点击“编辑”图标,进入编辑页面进行修改并保存。
  • 删除 Schema 验证:在操作列中点击“更多”图标,在下拉选项中点击删除,二次确认后即可删除。
  • 调整 Schema 验证顺序:使用鼠标拖动列表中的一行表格进行上下排序,或者使用操作列中“更多选项”中的快捷移动按钮。