Skip to content

MQTT 消息流

EMQX 6.1 新增了 MQTT 消息流,它是一项面向流式处理与消息重放的功能,通过持久化、可重放的消息流,扩展了 MQTT 的实时发布/订阅模型,并在保持 MQTT 语义的同时提供类似 Kafka 的流式能力。

本页将完整介绍 EMQX 中 MQTT 消息流功能的设计动机、关键概念、内部架构、消息流转过程,以及典型应用场景。

什么是 MQTT 消息流?

MQTT 消息流是一个通过名称进行标识的逻辑资源,用于持续收集与其配置的主题过滤器匹配的 MQTT 消息。消息会按照配置的保留策略进行持久化存储,客户端可以通过订阅该消息流进行历史数据回放。

每个消息流都由一个唯一的流名称(Name)进行标识。主题过滤器是消息流的配置属性,但不再作为消息流的标识符。

每个消息流包含:

  • 唯一的流名称
  • 一个配置的主题过滤器
  • 保留策略(基于时间或大小)
  • 可选的最后值语义
  • 明确的生命周期(创建、更新、删除)

为什么使用 MQTT 消息流?

MQTT 擅长实时消息传递,但也存在一些固有局限:

  • 消息通常只会投递给在线订阅者。
  • MQTT 原生不支持历史消息回放。
  • 对历史数据进行重放或再处理通常需要外部系统配合。
  • 构建有序、可重放的消息历史较为困难。

MQTT 消息流为 MQTT 增加了持久化存储与回放能力。它允许消费者读取历史消息,并获取设备的最新状态,而无需改变 MQTT 客户端的发布或订阅方式。

MQTT 消息流关键概念

  • MQTT 消息流

    MQTT 消息流是一个由流名称标识的逻辑资源,并具有明确的生命周期。处于启用状态时,在启用状态下,它会持续存储与其配置的主题过滤器匹配的消息,并受配置的时间或大小限制约束。存储的消息可被订阅者回放;发布端无需感知消息流的存在。

    消息流名称必须满足以下规则:

    • 仅允许字母、数字
    • 下划线(_
    • 连字符(-
    • 点号(.

    支持两种类型:

    • 常规流:存储所有匹配的消息,不覆盖历史数据。消费者可通过设置订阅属性 stream-offset 指定回放起点(例如时间戳或逻辑偏移量)。
    • 最后值流:最后值消息流启用了最后值语义。对于具有相同流键(key)的消息,新的消息会覆盖旧的消息,消息流中仅保留该 key 对应的最新一条消息。
  • 过滤主题

    一个 MQTT 主题过滤器(例如 sensors/+/data),用于决定哪些已发布消息会被捕获并写入消息流。只有匹配主题过滤器的消息会被摄取;同一条消息也可能同时属于多个消息流。

    主题过滤器是消息流的配置属性,而不是消息流的标识符。

  • 消息流订阅

    消息流订阅是一种用于消费 MQTT 消息流数据的特殊 MQTT 订阅。客户端通过以下格式订阅:

    SUBSCRIBE $stream/<name>
    SUBSCRIBE $stream/<name>/<topic_filter>

    其中:

    • <name> 为消息流名称(必填)
    • <topic_filter> 可选,用于与已存在的消息流绑定
    • 当启用自动创建功能时,使用 $queue/<name>/<topic_filter> 可以在该消息流不存在的情况下,使用提供的主题过滤器自动创建流。

    消息流订阅与普通 MQTT 订阅相互独立,并通过外部订阅(External Subscription,ExtSub)机制进行投递。

  • 回放起点(Stream Offset)

    消息流的回放起点通过 MQTT 5 的订阅属性 stream-offset 指定,而不是通过主题路径传递。

    stream-offset 可以为:

    • Unix 微秒时间戳
    • 特殊值 earliest(从最早开始)
    • 特殊值 latest(从最新开始)

    该设计避免在主题路径中编码偏移量,使回放控制更加规范化。

  • 流键表达式

    流键表达式是一个用户定义的表达式,会在每条入流消息上执行以提取一个键。该表达式可以引用消息内容或元数据。提取出的键用于以保证存储分区内消息的顺序性。当消息流启用最后值语义时,键还用于确定覆盖范围:具有相同键的新消息会替换旧消息。

MQTT 消息流架构

MQTT 消息流以独立的 EMQX 应用形式实现,与 Broker 核心保持松耦合,并复用现有基础设施。它通过内部钩子(Hook)以及外部订阅框架与 EMQX 集成。

TIP

外部订阅是 EMQX 的一种机制,用于将外部消息源(即不来自实时 MQTT 发布路径的消息)连接到 MQTT 客户端会话,使这些消息能够通过标准的 MQTT 订阅投递给客户端,而无需改变客户端行为。

主要组件

  • 消息流注册表(Streams Registry):管理消息流的生命周期,并维护消息流的元数据与索引。它使用 Mnesia 表按主题过滤器建立索引,以便高效查找匹配的消息流。
  • 消息流消息数据库(Streams Message Database):为消息流消息提供持久化存储,基于 EMQX 的持久存储构建。它负责消息落盘、执行保留限制、在启用时应用最后值语义,并在消息按保留策略过期前支持高效读取。
  • 消息流 ExtSub 处理器(Streams ExtSub Handler):将消息流与 MQTT 客户端会话集成。它从持久化存储中读取消息,并通过外部订阅框架向订阅客户端投递。

MQTT 消息流的数据流示意图

下图展示了 MQTT 消息流组件之间的数据流转关系:

streams_data_flow

发布流程

  1. 客户端向某个 MQTT 主题发布消息。
  2. 消息发布时触发消息流钩子,用于处理本次发布。
  3. 钩子查询消息流注册表,识别过滤主题与该消息主题匹配的消息流。
  4. 对每个匹配的消息流,将消息写入消息流并持久化到持久化存储中。

订阅与消费流程

  1. 客户端订阅消息流主题($stream/<name>$stream/<name>/<topic_filter>),并可通过订阅属性指定 stream-offset
  2. 外部订阅框架处理该订阅,并为该消息流主题初始化一个消息流 ExtSub 处理器。
  3. 处理器根据 stream-offset 与保留规则从持久化存储读取消息。
  4. 读取到的消息会被传递给外部订阅框架。
  5. ExtSub 应用通过标准 MQTT 投递流程向客户端下发消息。

MQTT 消息流核心特性

MQTT 消息流提供了一组核心能力,用于定义消息如何被存储、排序、保留,以及如何面向回放式消费进行投递。

  • 基于 Offset 的回放

    消费者通过订阅属性 stream-offset 指定回放起点。早于该时间戳发布的消息会被跳过。

  • 保留策略

    MQTT 消息流的保留策略会限制可回放的范围。消息会按时间或大小限制进行保留;过期消息会自动清理,无论它是否已被消费。

  • 按 key 有序投递

    MQTT 消息流不保证全局单一的投递顺序。具有相同 key 的消息一定会按发布顺序投递;不同 key 的消息之间可能以任意顺序交错投递。

  • 最后值语义

    MQTT 消息流可启用最后值语义。key 相同的新消息会覆盖更早的消息,仅保留最新值。无法解析出 key 的消息会按普通消息方式存储。

  • MQTT 原生投递

    消息流消息通过标准 MQTT 机制投递。发布端无需改变行为;面向订阅者的消息投递通过外部订阅机制集成实现。

兼容性说明

本节说明 MQTT 消息流在当前版本中的兼容性行为,以及对历史部署的影响。

命名消息流

所有消息流均为具名资源。每个消息流必须具有唯一的名称,并通过该名称进行管理与订阅。

流名称仅允许包含:

  • 字母与数字
  • 下划线(_
  • 连字符(-
  • 点号(.

旧版本未命名消息流

在旧版本中创建的未命名消息流会在升级后自动分配名称。

生成规则为:/<topic_filter>,即使用原主题过滤器作为名称,并在前面添加 /

例如:

  • 原主题过滤器:t/1
  • 自动生成名称:/t/1

这些消息流仍可继续使用,但建议逐步迁移为显式命名的消息流。

已废弃的订阅前缀

旧版本使用的订阅格式:

$s/<offset>/<topic_filter>

仍可用于向后兼容,但已被废弃。

新的推荐订阅方式为:

$stream/<name>
$stream/<name>/<topic_filter>

回放起点需通过 MQTT 5 订阅属性 stream-offset 指定,而不再通过主题路径传递。

典型使用场景

  • 历史数据回放:重放过去的 MQTT 事件,用于排障或上线新的业务逻辑。
  • 时序分析:存储并回放传感器数据,用于分析与预测性维护。
  • 事件溯源(Event Sourcing):将所有状态变化以不可变事件日志形式持久化。
  • IoT 数字孪生:以数字形态维护物理设备的最新状态。
  • 配置同步:确保设备始终接收最新配置。

下一步

了解消息流基础后,你可以继续探索如何在实践中使用它: