Skip to content

MQTT 会话持久化

EMQX 内置了 MQTT 会话持久化(Durable Session)功能,可以将会话和消息持久化存储到磁盘,并提供高可用副本以保证数据的冗余和一致性。通过会话持久化功能,可以实现有效的故障转移和恢复机制,确保服务的连续性和可用性,从而提高系统的可靠性。

本页面介绍了 EMQX 中会话持久化的概念、原理和使用方法。

基本概念

在学习 EMQX 的持久会话功能之前,了解一些 EMQX 的基本概念是非常重要的。

会话和持久存储

会话:会话是 EMQX 为每个客户端连接创建的轻量级进程。会话负责执行 MQTT 标准中对消息服务器(broker)所要求的行为。这些行为包括初始连接、订阅和取消订阅主题以及消息分发。

持久存储(Durable Storage, DS):持久存储是 EMQX 内部使用的数据库。会话可以利用持久存储保存其状态和发送到主题的 MQTT 消息。支持持久存储的数据库引擎使用 RocksDB 将数据保存到磁盘,并通过 Raft 在集群中一致地复制数据。需要注意的是,持久存储与 持久会话 是不同的概念。

会话过期间隔

根据 MQTT 标准,客户端会话有助于在 MQTT 服务器中管理客户端连接和状态。过期间隔是会话的一个属性,用于控制客户端连接终止后服务器保留会话状态的时间长度。此属性在本文中具有重要作用。

过期间隔设为 0 的会话仅在客户端连接到 EMQX 时存在。当客户端断开连接时,所有会话信息(包括订阅和未传递的消息)会立即被丢弃。而过期间隔非 0 的会话,即使客户端连接终止,EMQX 也会保留它们。如果客户端在会话过期间隔内重新连接到 EMQX,会话可以恢复。客户端离线期间发送到主题的消息将会被传递。

  • 使用 MQTT 5 协议的客户端可以通过 CONNECTDISCONNECT 数据包中的会话过期间隔属性明确指定会话过期间隔。
  • 对于使用 MQTT 3.* 协议的客户端,EMQX 根据以下规则确定会话过期间隔:如果清除会话标志设置为 true,则会话过期间隔为 0。否则,使用 mqtt.session_expiry_interval 配置参数的值。

EMQX 中的会话实现

EMQX 提供了两种不同的客户端会话实现,每种实现都针对特定的使用场景进行了优化:

  • 临时会话(regular session):会话状态保存在正在运行的 EMQX 节点的内存中。当 EMQX 节点重启时,其状态会丢失。
  • 持久会话(durable session):会话状态和接收的消息备份在持久存储中。EMQX 节点重启后可以恢复这些会话。

会话实现的选择取决于会话过期间隔和 durable_sessions.enable 配置参数,该参数可以全局设置或按 zone 设置。基于以下标准可以选择实现方式:

durable_sessions.enable会话过期间隔 = 0会话过期间隔 > 0
false临时临时
true临时持久

EMQX 采用独特的方法来管理消息的持久性,使临时会话和持久会话共存,同时最大限度地降低存储成本。

两种会话实现的比较

客户端会话的管理策略是确保服务稳定性和可靠性的关键因素。本节对两种会话实现的特性进行比较分析,旨在帮助开发人员更好地理解它们各自的特性和适用场景,从而做出更精确的部署决策。

临时会话

这种会话实现是默认的,并在 EMQX 5.7 之前的所有版本中使用。临时会话的状态完全保存在运行中的 EMQX 节点的内存中。

临时会话的优点包括:

  • 非常高的吞吐量和低延迟。
  • 即时向客户端分发消息。

然而,也存在一些缺点:

  • 当承载会话的 EMQX 节点停止或重启时,会话数据将丢失。
  • 未传递的消息存储在会话的内存队列中,增加了 EMQX 的内存占用。
  • EMQX 对内存队列的大小施加限制,以防止内存耗尽。当达到此限制时,新消息将被丢弃,导致未传递消息的潜在丢失。

持久会话

持久会话在 EMQX v5.7.0 中引入,其实现将会话状态和路由到持久会话的消息存储在磁盘上。此功能默认禁用,可以通过将 durable_sessions.enable 配置参数设置为 true 来启用。

当持久会话订阅一个主题过滤器时,EMQX 会将匹配该过滤器的主题标记为“持久”。这确保了除了将这些主题的 MQTT PUBLISH 消息路由到临时会话外,服务器还将这些消息保存到名为 messages 的持久存储中。

需要注意的是,消息分发协议取决于订阅者会话的持久性,而不是发布者的。

每条持久 MQTT 消息在每个副本上只存储一次,无论有多少订阅持久会话或它们的连接状态如何。这确保了高效的消息分发并最小化磁盘写入。

持久存储通过在 EMQX 集群内的多个节点间一致地复制会话元数据和 MQTT 消息,提供了强大的持久性和高可用性。可配置的复制因子 (replication factor) 确定每条消息或会话的副本数量,使用户能够根据具体需求定制持久性和性能之间的平衡。

持久会话的优点包括:

  • EMQX 节点重启或停止后会话可以恢复。
  • MQTT 消息存储在共享的、复制的持久存储中,而不是内存队列中,减少了在线和离线会话的 RAM 使用量。
  • 未传递消息的数量没有上限,并且由于内存队列溢出,未传递消息永远不会被丢弃。

然而,也存在一些缺点:

  • 将消息存储在磁盘上会导致整体系统吞吐量降低。
  • 持久会话的延迟比临时会话高,因为写入和读取 MQTT 消息都是批量执行的。尽管批量处理提高了吞吐量,但也增加了端到端的延迟(客户端看到发布消息之前的延迟)。

快速体验会话持久化功能

本章节将帮助您快速了解如何在 EMQX 与 MQTT 客户端上使用会话持久化功能,并介绍简单的会话持久化工作流程。

注意

即使没有启用持久会话,通过步骤 2-4 的操作会话仍然会被保留、消息也将会保存在客户端队列中。不同之处在于会话是否持久存储,以及步骤 5 中会话是否能在节点重启后恢复。

1. 在 EMQX 上启用会话持久化功能

默认情况下,EMQX 不启用会话持久化功能。您需要修改 etc/emqx.conf 文件并添加以下配置以启用该功能:

bash
durable_sessions {
  enable = true
}

重启 EMQX 以应用配置。

2. 调整 MQTT 客户端连接参数以启用持久会话

MQTTX CLI 为例,它默认使用了 MQTT 5.0 协议,添加 --no-clean 选项以设置 Clean Start = false,同时指定客户端 ID 为 emqx_c,连接到 EMQX 并订阅 t/1 主题:

bash
mqttx sub -t t/1 -i emqx_c --no-clean -q 1

3. 断开客户端连接,会话将被保留

断开步骤 2 中的客户端连接。打开 EMQX Dashboard,在 监控 -> 客户端 页面中,您仍然可以看到客户端状态变为未连接,这表明会话已经保留。

MQTT 保留会话

4. 向客户端发送消息,消息将被发送到客户端队列

仍以 MQTTX CLI 为例,使用 bench 命令,通过 1 个客户端重复向 t/1 主题发布消息:

bash
mqttx bench pub -t t/1 -c 1 -q 1

根据 MQTT 协议要求,即使 emqx_c 客户端不在线,它订阅的 t/1 主题消息也会被保存在客户端队列中,以便在重新连接后继续派发。

5. 重启 EMQX 节点,会话与消息将从持久存储中恢复

重启 EMQX 节点,在没有进行任何客户端连接操作的情况下,打开 EMQX Dashboard,在 监控 -> 客户端 页面中可以看到状态为未连接的客户端,这表明会话已恢复。

尝试使用相同的客户端 ID emqx_c,并使用 --no-clean 选项设置 Clean Start = false)连接到 EMQX:

bash
mqttx sub -t t/1 -i emqx_c --no-clean -q 1

离线期间接收到的消息将在此时将派发到当前客户端:

bash
...
[2024-5-22] [16:14:14] › …  Connecting...
[2024-5-22] [16:14:14] › ✔  Connected
[2024-5-22] [16:14:14] › …  Subscribing to t/1...
[2024-5-22] [16:14:14] › ✔  Subscribed to t/1
[2024-5-22] [16:14:14] › payload: Hello From MQTTX CLI
...

注意

  • 必须使用相同的客户端 ID emqx_c,并指定 --no-clean 选项以将 Clean Start 设置为 false,确保满足这两项要求才能恢复持久的会话。
  • 由于会话中已经保存了之前的订阅信息,即使重连时不重新订阅 t/1 主题,消息也会派发到客户端。

持久存储的架构

持久会话依赖持久存储来保存会话状态和消息。要了解该存储层的结构及其工作原理,请参阅持久存储设计中的 架构:后端与存储层次结构 章节。

持久存储对会话的支持机制

持久存储是 EMQX 实现持久会话和共享订阅会话的核心基础。

持久会话

持久会话构建于 DS 数据库引擎之上。当客户端以非零会话过期间隔连接时,EMQX 会将该会话的状态保存到 DS 中。

  • 消息持久化:

    当持久会话以 QoS > 0 订阅某个主题过滤器时,EMQX 会在其路由表中将该主题过滤器标记为“可持久化”(durable)。任何发布到被标记为可持久化的主题上的消息,都会在正常转发给临时会话客户端的同时,被写入 DS 进行持久化存储。

  • 消费进度跟踪:

    持久会话通过 迭代器(iterator) 从 DS 的持久存储流中读取消息。迭代器是轻量的指针,用于记录会话在每个持久存储流中的进度,使得在断线或节点重启后能够从准确位置继续消费,实现可靠的消息重放。

  • 高效存储:

    每条消息在每个 DS 副本上只存储一次,无论有多少持久会话订阅了同一主题,从而大幅降低存储开销。

共享订阅会话

从 EMQX v6.0 开始,DS 还支持共享订阅会话的持久化。共享订阅依赖 DS 在订阅组内维持一致的消息分配和消费进度。

  • 迭代器管理:

    一个“共享订阅主节点(Shared Sub Leader)”负责管理整个订阅组的迭代器集合,并将迭代器分配给订阅组成员,以确保协同、无重复的消息消费。

  • 消息重放与重平衡:

    共享订阅主题的会话与主节点通信,由其“借出”相应的迭代器以保证正确的消息重放。迭代器的消费进度会实时上报。如果客户端断线或订阅组发生重平衡,领导者会收回迭代器并重新分配给其他成员,确保消费过程不中断并保持负载均衡。

这些机制使共享订阅组在消息负载均衡、消息顺序保证以及故障恢复方面更加可靠和一致。

集群中的持久存储

EMQX 集群中的每个节点都分配有唯一的 站点 ID,该 ID 作为稳定标识符,与 Erlang 节点名称 (emqx@...) 无关。站点 ID 是持久的,并且在节点第一次启动时随机生成。这种稳定性维护了数据的完整性,特别是在节点可能经历名称修改或重新配置的情况下。

管理员可以使用 emqx_ctl ds info CLI 命令查看不同站点的状态,以管理和监控集群中的持久存储状态。

会话持久化的硬件要求

当会话持久化启用时,EMQX 会将持久会话的元数据以及发送到持久会话的 MQTT 消息保存到磁盘上。因此,EMQX 必须部署在具有足够大存储容量的服务器上。为了获得最佳吞吐量,建议使用固态硬盘(SSD)存储。

磁盘容量可以根据以下指南进行估算:

  • 消息存储:每个副本上存储消息所需的空间与传入消息的速率乘以 durable_sessions.message_retention_period 参数指定的持续时间成正比。此参数决定了消息的保留时间,从而影响所需的总存储量。
  • 会话元数据存储:会话元数据的存储量与会话数量乘以它们订阅的流数量成正比。
  • 流计算:流的数量与分片的数量成正比。它还(以非线性方式)取决于主题的数量。EMQX 会自动将结构相似的主题组合到同一个流中,确保流的数量不会随着主题数量的增加而过快增长,从而最小化每个会话存储的元数据量。

下一步

想要了解如何对会话持久化功能进行配置和管理,以及如何对 EMQX 集群中的会话持久化进行初始设置和更改设置,请参阅以下页面:

更多信息

如需深入了解 MQTT 持久会话背后的设计原理,请参阅持久存储设计