# MQTT 消息流

EMQX 6.1 新增了 MQTT 消息流，它是一项面向流式处理与消息重放的功能，通过持久化、可重放的消息流，扩展了 MQTT 的实时发布/订阅模型，并在保持 MQTT 语义的同时提供类似 Kafka 的流式能力。

本页将完整介绍 EMQX 中 MQTT 消息流功能的设计动机、关键概念、内部架构、消息流转过程，以及典型应用场景。

## 什么是 MQTT 消息流？

MQTT 消息流是一个通过名称进行标识的逻辑资源，用于持续收集与其配置的主题过滤器匹配的 MQTT 消息。消息会按照配置的保留策略进行持久化存储，客户端可以通过订阅该消息流进行历史数据回放。

每个消息流都由一个唯一的流名称（Name）进行标识。主题过滤器是消息流的配置属性，但不再作为消息流的标识符。

每个消息流包含：

- 唯一的流名称
- 一个配置的主题过滤器
- 保留策略（基于时间或大小）
- 可选的最后值语义
- 明确的生命周期（创建、更新、删除）

## 为什么使用 MQTT 消息流？

MQTT 擅长实时消息传递，但也存在一些固有局限：

- 消息通常只会投递给在线订阅者。
- MQTT 原生不支持历史消息回放。
- 对历史数据进行重放或再处理通常需要外部系统配合。
- 构建有序、可重放的消息历史较为困难。

MQTT 消息流为 MQTT 增加了持久化存储与回放能力。它允许消费者读取历史消息，并获取设备的最新状态，而无需改变 MQTT 客户端的发布或订阅方式。

## MQTT 消息流关键概念

- **MQTT 消息流**

  MQTT 消息流是一个由流名称标识的逻辑资源，并具有明确的生命周期。处于启用状态时，在启用状态下，它会持续存储与其配置的主题过滤器匹配的消息，并受配置的时间或大小限制约束。存储的消息可被订阅者回放；发布端无需感知消息流的存在。

  消息流名称必须满足以下规则：

  - 仅允许字母、数字
  - 下划线（`_`）
  - 连字符（`-`）
  - 点号（`.`）

  支持两种类型：

  - **常规流**：存储所有匹配的消息，不覆盖历史数据。消费者可通过设置订阅属性 `stream-offset` 指定回放起点（例如时间戳或逻辑偏移量）。
  - **最后值流**：最后值消息流启用了[最后值语义](#最后值语义)。对于具有相同流键（key）的消息，新的消息会覆盖旧的消息，消息流中仅保留该 key 对应的最新一条消息。

- **过滤主题**

  一个 MQTT 主题过滤器（例如 `sensors/+/data`），用于决定哪些已发布消息会被捕获并写入消息流。只有匹配主题过滤器的消息会被摄取；同一条消息也可能同时属于多个消息流。

  > 主题过滤器是消息流的配置属性，而不是消息流的标识符。

- **消息流订阅**

  消息流订阅是一种用于消费 MQTT 消息流数据的特殊 MQTT 订阅。客户端通过以下格式订阅：

  ```
  SUBSCRIBE $stream/<name>
  SUBSCRIBE $stream/<name>/<topic_filter>
  ```

  其中：

  - `<name>` 为消息流名称（必填）
  - `<topic_filter>` 可选，用于与已存在的消息流绑定
  - 当启用自动创建功能时，使用 `$queue/<name>/<topic_filter>` 可以在该消息流不存在的情况下，使用提供的主题过滤器自动创建流。

  消息流订阅与普通 MQTT 订阅相互独立，并通过外部订阅（External Subscription，ExtSub）机制进行投递。

- **回放起点（Stream Offset）**

  消息流的回放起点通过 MQTT 5 的订阅属性 `stream-offset` 指定，而不是通过主题路径传递。
  
  `stream-offset` 可以为：
  
  - Unix 微秒时间戳
  - 特殊值 `earliest`（从最早开始）
  - 特殊值 `latest`（从最新开始）
  
  该设计避免在主题路径中编码偏移量，使回放控制更加规范化。
  
- **流键表达式**

  流键表达式是一个用户定义的表达式，会在每条入流消息上执行以提取一个键。该表达式可以引用消息内容或元数据。提取出的键用于以保证存储分区内消息的顺序性。当消息流启用最后值语义时，键还用于确定覆盖范围：具有相同键的新消息会替换旧消息。

## MQTT 消息流架构

MQTT 消息流以独立的 EMQX 应用形式实现，与 Broker 核心保持松耦合，并复用现有基础设施。它通过内部钩子（Hook）以及外部订阅框架与 EMQX 集成。

::: tip

外部订阅是 EMQX 的一种机制，用于将外部消息源（即不来自实时 MQTT 发布路径的消息）连接到 MQTT 客户端会话，使这些消息能够通过标准的 MQTT 订阅投递给客户端，而无需改变客户端行为。

:::

### 主要组件

- **消息流注册表（Streams Registry）**：管理消息流的生命周期，并维护消息流的元数据与索引。它使用 Mnesia 表按主题过滤器建立索引，以便高效查找匹配的消息流。
- **消息流消息数据库（Streams Message Database）**：为消息流消息提供持久化存储，基于 EMQX 的[持久存储](../design/durable-storage.md)构建。它负责消息落盘、执行保留限制、在启用时应用最后值语义，并在消息按保留策略过期前支持高效读取。
- **消息流 ExtSub 处理器（Streams ExtSub Handler）**：将消息流与 MQTT 客户端会话集成。它从持久化存储中读取消息，并通过外部订阅框架向订阅客户端投递。

### MQTT 消息流的数据流示意图

下图展示了 MQTT 消息流组件之间的数据流转关系：

![streams_data_flow](./assets/streams_data_flow.png)

### 发布流程

1. 客户端向某个 MQTT 主题发布消息。
2. 消息发布时触发消息流钩子，用于处理本次发布。
3. 钩子查询消息流注册表，识别过滤主题与该消息主题匹配的消息流。
4. 对每个匹配的消息流，将消息写入消息流并持久化到持久化存储中。

### 订阅与消费流程

1. 客户端订阅消息流主题（`$stream/<name>` 或 `$stream/<name>/<topic_filter>`），并可通过订阅属性指定 `stream-offset`。
2. 外部订阅框架处理该订阅，并为该消息流主题初始化一个消息流 ExtSub 处理器。
3. 处理器根据 `stream-offset` 与保留规则从持久化存储读取消息。
4. 读取到的消息会被传递给外部订阅框架。
5. ExtSub 应用通过标准 MQTT 投递流程向客户端下发消息。

## MQTT 消息流核心特性

MQTT 消息流提供了一组核心能力，用于定义消息如何被存储、排序、保留，以及如何面向回放式消费进行投递。

- **基于 Offset 的回放**

  消费者通过订阅属性 `stream-offset` 指定回放起点。早于该时间戳发布的消息会被跳过。

- **保留策略**

  MQTT 消息流的保留策略会限制可回放的范围。消息会按时间或大小限制进行保留；过期消息会自动清理，无论它是否已被消费。

- **按 key 有序投递**

  MQTT 消息流不保证全局单一的投递顺序。具有相同 key 的消息一定会按发布顺序投递；不同 key 的消息之间可能以任意顺序交错投递。

- **最后值语义**

  MQTT 消息流可启用最后值语义。key 相同的新消息会覆盖更早的消息，仅保留最新值。无法解析出 key 的消息会按普通消息方式存储。

- **MQTT 原生投递**

  消息流消息通过标准 MQTT 机制投递。发布端无需改变行为；面向订阅者的消息投递通过外部订阅机制集成实现。

## 兼容性说明

本节说明 MQTT 消息流在当前版本中的兼容性行为，以及对历史部署的影响。

### 命名消息流

所有消息流均为具名资源。每个消息流必须具有唯一的名称，并通过该名称进行管理与订阅。

流名称仅允许包含：

- 字母与数字
- 下划线（`_`）
- 连字符（`-`）
- 点号（`.`）

### 旧版本未命名消息流

在旧版本中创建的未命名消息流会在升级后自动分配名称。

生成规则为：`/<topic_filter>`，即使用原主题过滤器作为名称，并在前面添加 `/`。

例如：

- 原主题过滤器：`t/1`
- 自动生成名称：`/t/1`

这些消息流仍可继续使用，但建议逐步迁移为显式命名的消息流。

### 已废弃的订阅前缀

旧版本使用的订阅格式：

```
$s/<offset>/<topic_filter>
```

仍可用于向后兼容，但已被废弃。

新的推荐订阅方式为：

```
$stream/<name>
$stream/<name>/<topic_filter>
```

回放起点需通过 MQTT 5 订阅属性 `stream-offset` 指定，而不再通过主题路径传递。

## 典型使用场景

- **历史数据回放**：重放过去的 MQTT 事件，用于排障或上线新的业务逻辑。
- **时序分析**：存储并回放传感器数据，用于分析与预测性维护。
- **事件溯源（Event Sourcing）**：将所有状态变化以不可变事件日志形式持久化。
- **IoT 数字孪生**：以数字形态维护物理设备的最新状态。
- **配置同步**：确保设备始终接收最新配置。

## 下一步

了解消息流基础后，你可以继续探索如何在实践中使用它：

- [创建与配置 MQTT 消息流](./mqtt-stream-task.md)：学习如何通过 Dashboard 或 REST API 声明消息流，并设置最后值语义与保留策略。
- [快速开始教程](./mqtt-stream-quick-start.md)：跟随基于 MQTTX 的分步指南，模拟真实的发布端与订阅端场景。

