# MQTT 会话持久化

EMQX 内置了 MQTT 会话持久化（Durable Session）功能，可以将会话和消息持久化存储到磁盘，并提供高可用副本以保证数据的冗余和一致性。通过会话持久化功能，可以实现有效的故障转移和恢复机制，确保服务的连续性和可用性，从而提高系统的可靠性。

本页面介绍了 EMQX 中会话持久化的概念、原理和使用方法。

## 基本概念

在学习 EMQX 的持久会话功能之前，了解一些 EMQX 的基本概念是非常重要的。

### 会话和持久存储

**会话**：会话是 EMQX 为每个客户端连接创建的轻量级进程。会话负责执行 MQTT 标准中对消息服务器（broker）所要求的行为。这些行为包括初始连接、订阅和取消订阅主题以及消息分发。

**持久存储（Durable Storage, DS）**：持久存储是 EMQX 内部使用的数据库。会话可以利用持久存储保存其状态和发送到主题的 MQTT 消息。支持持久存储的数据库引擎使用 [RocksDB](https://rocksdb.org/) 将数据保存到磁盘，并通过 [Raft](https://raft.github.io/) 在集群中一致地复制数据。需要注意的是，持久存储与 **持久会话** 是不同的概念。

### 会话过期间隔

根据 MQTT 标准，客户端会话有助于在 MQTT 服务器中管理客户端连接和状态。**过期间隔**是会话的一个属性，用于控制客户端连接终止后服务器保留会话状态的时间长度。此属性在本文中具有重要作用。

过期间隔设为 0 的会话仅在客户端连接到 EMQX 时存在。当客户端断开连接时，所有会话信息（包括订阅和未送达的消息）会立即被丢弃。而过期间隔非 0 的会话，即使客户端连接终止，EMQX 也会保留它们。如果客户端在会话过期间隔内重新连接到 EMQX，会话可以恢复。客户端离线期间发送到主题的消息将会被传递。

- 使用 MQTT 5 协议的客户端可以通过 `CONNECT` 或 `DISCONNECT` 数据包中的[会话过期间隔](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901048)属性明确指定会话过期间隔。
- 对于使用 MQTT 3.* 协议的客户端，EMQX 根据以下规则确定会话过期间隔：如果[清除会话](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718030)标志设置为 `true`，则会话过期间隔为 0。否则，使用 `mqtt.session_expiry_interval` 配置参数的值。

## EMQX 中的会话实现

EMQX 提供了两种不同的客户端会话实现，每种实现都针对特定的使用场景进行了优化：

- **临时会话（regular session）**：会话状态保存在正在运行的 EMQX 节点的内存中。当 EMQX 节点重启时，其状态会丢失。
- **持久会话（durable session）**：会话状态和接收的消息备份在持久存储中。EMQX 节点重启后可以恢复这些会话。

会话实现的选择取决于会话过期间隔和 `durable_sessions.enable` 配置参数，该参数可以全局设置或按 [zone](../configuration/configuration.md#zone-override) 设置。基于以下标准可以选择实现方式：

| `durable_sessions.enable` | 会话过期间隔 = 0 | 会话过期间隔 > 0 |
| ------------------------- | ---------------- | ---------------- |
| `false`                   | 临时             | 临时             |
| `true`                    | 临时             | 持久             |

EMQX 采用独特的方法来管理消息的持久性，使临时会话和持久会话共存，同时最大限度地降低存储成本。

### 两种会话实现的比较

客户端会话的管理策略是确保服务稳定性和可靠性的关键因素。本节对两种会话实现的特性进行比较分析，旨在帮助开发人员更好地理解它们各自的特性和适用场景，从而做出更精确的部署决策。

#### 临时会话

这种会话实现是默认的，并在 EMQX 5.7 之前的所有版本中使用。临时会话的状态完全保存在运行中的 EMQX 节点的内存中。

临时会话的优点包括：

- 非常高的吞吐量和低延迟。
- 即时向客户端分发消息。

然而，也存在一些缺点：

- 当承载会话的 EMQX 节点停止或重启时，会话数据将丢失。
- 未送达的消息存储在会话的内存队列中，增加了 EMQX 的内存占用。
- EMQX 对内存队列的大小施加限制，以防止内存耗尽。当达到此限制时，新消息将被丢弃，导致未送达消息的潜在丢失。

#### 持久会话

持久会话在 EMQX v5.7.0 中引入，其实现将会话状态和路由到持久会话的消息存储在磁盘上。此功能默认禁用，可以通过将 `durable_sessions.enable` 配置参数设置为 `true` 来启用。

当持久会话订阅一个主题过滤器时，EMQX 会将匹配该过滤器的主题标记为“持久”。这确保了除了将这些主题的 MQTT PUBLISH 消息路由到临时会话外，服务器还将这些消息保存到名为 `messages` 的持久存储中。

需要注意的是，消息分发协议取决于订阅者会话的持久性，而不是发布者的。

每条持久 MQTT 消息在每个副本上只存储一次，无论有多少订阅持久会话或它们的连接状态如何。这确保了高效的消息分发并最小化磁盘写入。

持久存储通过在 EMQX 集群内的多个节点间一致地复制会话元数据和 MQTT 消息，提供了强大的持久性和高可用性。可配置的[复制因子 (replication factor) ](./managing-replication.md#replication-factor)确定每条消息或会话的副本数量，使用户能够根据具体需求定制持久性和性能之间的平衡。

持久会话的优点包括：

- EMQX 节点重启或停止后会话可以恢复。
- MQTT 消息存储在共享的、复制的持久存储中，而不是内存队列中，减少了在线和离线会话的 RAM 使用量。
- 未送达消息的数量没有上限，并且未送达消息不会因内存队列溢出而被丢弃。

然而，也存在一些缺点：

- 将消息存储在磁盘上会导致整体系统吞吐量降低。
- 持久会话的延迟比临时会话高，因为写入和读取 MQTT 消息都是批量执行的。尽管批量处理提高了吞吐量，但也增加了端到端的延迟（客户端看到发布消息之前的延迟）。

## 快速体验会话持久化功能

本章节将帮助您快速了解如何在 EMQX 与 MQTT 客户端上使用会话持久化功能，并介绍简单的会话持久化工作流程。

::: tip 注意

即使没有启用持久会话，通过步骤 2-4 的操作会话仍然会被保留、消息也将会保存在客户端队列中。不同之处在于会话是否持久存储，以及步骤 5 中会话是否能在节点重启后恢复。

:::

### 1. 在 EMQX 上启用会话持久化功能

默认情况下，EMQX 不启用会话持久化功能。您需要修改 `etc/emqx.conf` 文件并添加以下配置以启用该功能：

```bash
durable_sessions {
  enable = true
}
```

重启 EMQX 以应用配置。

### 2. 调整 MQTT 客户端连接参数以启用持久会话

以 [MQTTX CLI](https://mqttx.app/zh/cli) 为例，它默认使用了 MQTT 5.0 协议，添加 `--no-clean` 选项以设置 `Clean Start = false`，同时指定客户端 ID 为 `emqx_c`，连接到 EMQX 并订阅 `t/1` 主题：

```bash
mqttx sub -t t/1 -i emqx_c --no-clean -q 1
```

### 3. 断开客户端连接，会话将被保留

断开步骤 2 中的客户端连接。打开 EMQX Dashboard，在 **监控** -> **客户端** 页面中，您仍然可以看到客户端状态变为**未连接**，这表明会话已经保留。

![MQTT 保留会话](./assets/session-persistence-list.png)

### 4. 向客户端发送消息，消息将被发送到客户端队列

仍以 MQTTX CLI 为例，使用 `bench` 命令，通过 1 个客户端重复向 `t/1` 主题发布消息：

```bash
mqttx bench pub -t t/1 -c 1 -q 1
```

根据 MQTT 协议要求，即使 `emqx_c` 客户端不在线，它订阅的 `t/1` 主题消息也会被保存在客户端队列中，以便在重新连接后继续派发。

### 5. 重启 EMQX 节点，会话与消息将从持久存储中恢复

重启 EMQX 节点，在没有进行任何客户端连接操作的情况下，打开 EMQX Dashboard，在 **监控** -> **客户端** 页面中可以看到状态为**未连接**的客户端，这表明会话已恢复。

尝试使用相同的客户端 ID `emqx_c`，并使用 `--no-clean` 选项设置 `Clean Start = false`）连接到 EMQX：

```bash
mqttx sub -t t/1 -i emqx_c --no-clean -q 1
```

离线期间接收到的消息将在此时将派发到当前客户端：

```bash
...
[2024-5-22] [16:14:14] › …  Connecting...
[2024-5-22] [16:14:14] › ✔  Connected
[2024-5-22] [16:14:14] › …  Subscribing to t/1...
[2024-5-22] [16:14:14] › ✔  Subscribed to t/1
[2024-5-22] [16:14:14] › payload: Hello From MQTTX CLI
...
```

::: tip **注意**

- 必须使用相同的客户端 ID `emqx_c`，并指定 `--no-clean` 选项以将 `Clean Start` 设置为 `false`，确保满足这两项要求才能恢复持久的会话。
- 由于会话中已经保存了之前的订阅信息，即使重连时不重新订阅 `t/1` 主题，消息也会派发到客户端。

:::

## 持久存储的架构

持久会话依赖持久存储来保存会话状态和消息。要了解该存储层的结构及其工作原理，请参阅[持久存储设计](../design/durable-storage.md)中的 *架构：后端与存储层次结构* 章节。

## 持久存储对会话的支持机制

持久存储是 EMQX 实现持久会话和共享订阅会话的核心基础。

### 持久会话

持久会话构建于 DS 数据库引擎之上。当客户端以**非零会话过期间隔**连接时，EMQX 会将该会话的状态保存到 DS 中。

- **消息持久化：**

  当持久会话以 QoS > 0 订阅某个主题过滤器时，EMQX 会在其路由表中将该主题过滤器标记为“可持久化”（durable）。任何发布到被标记为可持久化的主题上的消息，都会在正常转发给临时会话客户端的同时，被写入 DS 进行持久化存储。

- **消费进度跟踪：**

  持久会话通过 *迭代器（iterator）* 从 DS 的持久存储流中读取消息。迭代器是轻量的指针，用于记录会话在每个持久存储流中的进度，使得在断线或节点重启后能够从准确位置继续消费，实现可靠的消息重放。

- **高效存储：**

  每条消息在每个 DS 副本上只存储一次，无论有多少持久会话订阅了同一主题，从而大幅降低存储开销。

### 共享订阅会话

从 EMQX v6.0 开始，DS 还支持共享订阅会话的持久化。共享订阅依赖 DS 在订阅组内维持一致的消息分配和消费进度。

- **迭代器管理：**

  一个“共享订阅主节点（Shared Sub Leader）”负责管理整个订阅组的迭代器集合，并将迭代器分配给订阅组成员，以确保协同、无重复的消息消费。

- **消息重放与重平衡：**

  共享订阅主题的会话与主节点通信，由其“借出”相应的迭代器以保证正确的消息重放。迭代器的消费进度会实时上报。如果客户端断线或订阅组发生重平衡，领导者会收回迭代器并重新分配给其他成员，确保消费过程不中断并保持负载均衡。

这些机制使共享订阅组在消息负载均衡、消息顺序保证以及故障恢复方面更加可靠和一致。

## 集群中的持久存储

EMQX 集群中的每个节点都分配有唯一的 *站点 ID*，该 ID 作为稳定标识符，与 Erlang 节点名称 (`emqx@...`) 无关。站点 ID 是持久的，并且在节点第一次启动时随机生成。这种稳定性维护了数据的完整性，特别是在节点可能经历名称修改或重新配置的情况下。

管理员可以使用 `emqx_ctl ds info` CLI 命令查看不同站点的状态，以管理和监控集群中的持久存储状态。

## 会话持久化的硬件要求

当会话持久化启用时，EMQX 会将持久会话的元数据以及发送到持久会话的 MQTT 消息保存到磁盘上。因此，EMQX 必须部署在具有足够大存储容量的服务器上。为了获得最佳吞吐量，建议使用固态硬盘（SSD）存储。

磁盘容量可以根据以下指南进行估算：

- **消息存储**：每个副本上存储消息所需的空间与传入消息的速率乘以 `durable_sessions.message_retention_period` 参数指定的持续时间成正比。此参数决定了消息的保留时间，从而影响所需的总存储量。
- **会话元数据存储**：会话元数据的存储量与会话数量乘以它们订阅的流数量成正比。
- **流计算**：流的数量与分片的数量成正比。它还（以非线性方式）取决于主题的数量。EMQX 会自动将结构相似的主题组合到同一个流中，确保流的数量不会随着主题数量的增加而过快增长，从而最小化每个会话存储的元数据量。

## 下一步

想要了解如何对会话持久化功能进行配置和管理，以及如何对 EMQX 集群中的会话持久化进行初始设置和更改设置，请参阅以下页面：

- [管理数据副本](./managing-replication.md)
- [配置和管理会话持久化](./management.md)

## 更多信息

如需深入了解 MQTT 持久会话背后的设计原理，请参阅[持久存储设计](../design/durable-storage.md)。
