Skip to content

MQTT 会话持久化

TIP

会话持久化是 EMQX 企业版功能。

EMQX 内置了 MQTT 会话持久化(Durable Session)功能,可以将会话和消息持久化存储到磁盘,并提供高可用副本以保证数据的冗余和一致性。通过会话持久化功能,可以实现有效的故障转移和恢复机制,确保服务的连续性和可用性,从而提高系统的可靠性。

本页面介绍了 EMQX 中会话持久化的概念、原理和使用方法。

重要提示

该功能自 EMQX v5.7.0 版本起可用。然而,尚不支持共享订阅会话的持久化,计划在后续版本中实现。

基本概念

在学习 EMQX 的持久会话功能之前,了解一些 EMQX 的基本概念是非常重要的。

会话和持久存储

会话:会话是 EMQX 为每个客户端连接创建的轻量级进程。会话负责执行 MQTT 标准中对消息服务器(broker)所要求的行为。这些行为包括初始连接、订阅和取消订阅主题以及消息分发。

持久存储:持久存储是 EMQX 内部使用的数据库。会话可以利用持久存储保存其状态和发送到主题的 MQTT 消息。支持持久存储的数据库引擎使用 RocksDB 将数据保存到磁盘,并通过 Raft 在集群中一致地复制数据。需要注意的是,持久存储与 持久会话 是不同的概念。

会话过期间隔

根据 MQTT 标准,客户端会话有助于在 MQTT 服务器中管理客户端连接和状态。过期间隔是会话的一个属性,用于控制客户端连接终止后服务器保留会话状态的时间长度。此属性在本文中具有重要作用。

过期间隔设为 0 的会话仅在客户端连接到 EMQX 时存在。当客户端断开连接时,所有会话信息(包括订阅和未传递的消息)会立即被丢弃。而过期间隔非 0 的会话,即使客户端连接终止,EMQX 也会保留它们。如果客户端在会话过期间隔内重新连接到 EMQX,会话可以恢复。客户端离线期间发送到主题的消息将会被传递。

  • 使用 MQTT 5 协议的客户端可以通过 CONNECTDISCONNECT 数据包中的会话过期间隔属性明确指定会话过期间隔。
  • 对于使用 MQTT 3.* 协议的客户端,EMQX 根据以下规则确定会话过期间隔:如果清除会话标志设置为 true,则会话过期间隔为 0。否则,使用 mqtt.session_expiry_interval 配置参数的值。

EMQX 中的会话实现

EMQX 提供了两种不同的客户端会话实现,每种实现都针对特定的使用场景进行了优化:

  • 临时会话:会话状态保存在正在运行的 EMQX 节点的内存中。当 EMQX 节点重启时,其状态会丢失。
  • 持久会话:会话状态和接收的消息备份在持久存储中。EMQX 节点重启后可以恢复这些会话。

会话实现的选择取决于会话过期间隔和 durable_sessions.enable 配置参数,该参数可以全局设置或按 zone 设置。基于以下标准可以选择实现方式:

durable_sessions.enable会话过期间隔 = 0会话过期间隔 > 0
false临时临时
true临时持久

EMQX 采用独特的方法来管理消息的持久性,使临时会话和持久会话共存,同时最大限度地降低存储成本。

两种会话实现的比较

客户端会话的管理策略是确保服务稳定性和可靠性的关键因素。本节对两种会话实现的特性进行比较分析,旨在帮助开发人员更好地理解它们各自的特性和适用场景,从而做出更精确的部署决策。

临时会话

这种会话实现是默认的,并在 EMQX 5.7 之前的所有版本中使用。临时会话的状态完全保存在运行中的 EMQX 节点的内存中。

临时会话的优点包括:

  • 非常高的吞吐量和低延迟。
  • 即时向客户端分发消息。

然而,也存在一些缺点:

  • 当承载会话的 EMQX 节点停止或重启时,会话数据将丢失。
  • 未传递的消息存储在会话的内存队列中,增加了 EMQX 的内存占用。
  • EMQX 对内存队列的大小施加限制,以防止内存耗尽。当达到此限制时,新消息将被丢弃,导致未传递消息的潜在丢失。

持久会话

持久会话在 EMQX v5.7.0 中引入,其实现将会话状态和路由到持久会话的消息存储在磁盘上。此功能默认禁用,可以通过将 durable_sessions.enable 配置参数设置为 true 来启用。

当持久会话订阅一个主题过滤器时,EMQX 会将匹配该过滤器的主题标记为“持久”。这确保了除了将这些主题的 MQTT PUBLISH 消息路由到临时会话外,服务器还将这些消息保存到名为 messages 的持久存储中。

需要注意的是,消息分发协议取决于订阅者会话的持久性,而不是发布者的。

每条持久 MQTT 消息在每个副本上只存储一次,无论有多少订阅持久会话或它们的连接状态如何。这确保了高效的消息分发并最小化磁盘写入。

持久存储通过在 EMQX 集群内的多个节点间一致地复制会话元数据和 MQTT 消息,提供了强大的持久性和高可用性。可配置的复制因子 (replication factor) 确定每条消息或会话的副本数量,使用户能够根据具体需求定制持久性和性能之间的平衡。

提示

高可用性副本的特性为 EMQX 企业版功能。在开源版中,会话数据仅会保存在当前节点,这意味着如果节点发生宕机,数据可以在该节点恢复后进行恢复。

持久会话的优点包括:

  • EMQX 节点重启或停止后会话可以恢复。
  • MQTT 消息存储在共享的、复制的持久存储中,而不是内存队列中,减少了在线和离线会话的 RAM 使用量。
  • 未传递消息的数量没有上限,并且由于内存队列溢出,未传递消息永远不会被丢弃。

然而,也存在一些缺点:

  • 将消息存储在磁盘上会导致整体系统吞吐量降低。
  • 持久会话的延迟比临时会话高,因为写入和读取 MQTT 消息都是批量执行的。尽管批量处理提高了吞吐量,但也增加了端到端的延迟(客户端看到发布消息之前的延迟)。

快速体验会话持久化功能

本章节将帮助您快速了解如何在 EMQX 与 MQTT 客户端上使用会话持久化功能,并介绍简单的会话持久化工作流程。

注意

即使没有启用持久会话,通过步骤 2-4 的操作会话仍然会被保留、消息也将会保存在客户端队列中。不同之处在于会话是否持久存储,以及步骤 5 中会话是否能在节点重启后恢复。

1. 在 EMQX 上启用会话持久化功能

默认情况下,EMQX 不启用会话持久化功能。您需要修改 etc/emqx.conf 文件并添加以下配置以启用该功能:

bash
durable_sessions {
  enable = true
}

重启 EMQX 以应用配置。

2. 调整 MQTT 客户端连接参数以启用持久会话

MQTTX CLI 为例,它默认使用了 MQTT 5.0 协议,添加 --no-clean 选项以设置 Clean Start = false,同时指定客户端 ID 为 emqx_c,连接到 EMQX 并订阅 t/1 主题:

bash
mqttx sub -t t/1 -i emqx_c --no-clean

3. 断开客户端连接,会话将被保留

断开步骤 2 中的客户端连接。打开 EMQX Dashboard,在 监控 -> 客户端 页面中,您仍然可以看到客户端状态变为未连接,这表明会话已经保留。

MQTT 保留会话

4. 向客户端发送消息,消息将被发送到客户端队列

仍以 MQTTX CLI 为例,使用 bench 命令,通过 1 个客户端重复向 t/1 主题发布消息:

bash
mqttx bench pub -t t/1 -c 1

根据 MQTT 协议要求,即使 emqx_c 客户端不在线,它订阅的 t/1 主题消息也会被保存在客户端队列中,以便在重新连接后继续派发。

5. 重启 EMQX 节点,会话与消息将从持久存储中恢复

重启 EMQX 节点,在没有进行任何客户端连接操作的情况下,打开 EMQX Dashboard,在 监控 -> 客户端 页面中可以看到状态为未连接的客户端,这表明会话已恢复。

尝试使用相同的客户端 ID emqx_c,并使用 --no-clean 选项设置 Clean Start = false)连接到 EMQX:

bash
mqttx sub -t t/1 -i emqx_c --no-clean

离线期间接收到的消息将在此时将派发到当前客户端:

bash
...
[2024-5-22] [16:14:14] › …  Connecting...
[2024-5-22] [16:14:14] › ✔  Connected
[2024-5-22] [16:14:14] › …  Subscribing to t/1...
[2024-5-22] [16:14:14] › ✔  Subscribed to t/1
[2024-5-22] [16:14:14] › payload: Hello From MQTTX CLI
...

注意

  • 必须使用相同的客户端 ID emqx_c,并指定 --no-clean 选项以将 Clean Start 设置为 false,确保满足这两项要求才能恢复持久的会话。
  • 由于会话中已经保存了之前的订阅信息,即使重连时不重新订阅 t/1 主题,消息也会派发到客户端。

持久存储的架构

EMQX 内置持久性功能所使用的数据库引擎将数据按照一个分层结构来组织,包括存储 (Storage)、分片 (Shard)、代 (Generation) 和流 (Stream)。

Diagram of EMQX durable storage sharding

存储 (Storage)

存储封装了某种类型的所有数据,例如 MQTT 消息或 MQTT 会话。

分片 (Shard)

消息根据客户端进行隔离,并根据发布者的客户端 ID 存储在分片中。分片数量在 EMQX 初始启动时由 n_shards 配置参数决定。分片也是复制的单位。每个分片会根据 durable_storage.messages.replication_factor 在不同节点间进行一致性复制,确保每个副本中的消息集是相同的。

代 (Generation)

分片中的消息根据特定时间段划分为代。新消息写入当前代,而以前的代是只读的。EMQX 通过完全删除旧的代来清理旧的 MQTT 消息。旧 MQTT 消息的保留期限由 durable_sessions.message_retention_period 参数确定。

代可以根据存储布局规范以不同方式组织数据。目前,仅支持一种布局,优化了通配符和单一主题订阅的高吞吐量。未来更新将引入针对不同工作负载优化的布局。

新代的存储布局通过 durable_storage.messages.layout 参数配置,每个布局引擎定义其自己的配置参数。

流 (Stream)

每个分片和代中的消息被划分为多个流。流作为 EMQX 中消息序列化的单位。流可以包含多个主题的消息。不同的存储布局可以采用不同的策略将主题映射到流中。

持久会话以批量方式从流中获取消息,批量大小可以通过 durable_sessions.batch_size 参数调整。

集群中的持久存储

EMQX 集群中的每个节点都分配有唯一的 站点 ID,该 ID 作为稳定标识符,与 Erlang 节点名称 (emqx@...) 无关。站点 ID 是持久的,并且在节点第一次启动时随机生成。这种稳定性维护了数据的完整性,特别是在节点可能经历名称修改或重新配置的情况下。

管理员可以使用 emqx_ctl ds info CLI 命令查看不同站点的状态,以管理和监控集群中的持久存储状态。

会话持久化的硬件要求

当会话持久化启用时,EMQX 会将持久会话的元数据以及发送到持久会话的 MQTT 消息保存到磁盘上。因此,EMQX 必须部署在具有足够大存储容量的服务器上。为了获得最佳吞吐量,建议使用固态硬盘(SSD)存储。

磁盘容量可以根据以下指南进行估算:

  • 消息存储:每个副本上存储消息所需的空间与传入消息的速率乘以 durable_sessions.message_retention_period 参数指定的持续时间成正比。此参数决定了消息的保留时间,从而影响所需的总存储量。
  • 会话元数据存储:会话元数据的存储量与会话数量乘以它们订阅的流数量成正比。
  • 流计算:流的数量与分片的数量成正比。它还(以非线性方式)取决于主题的数量。EMQX 会自动将结构相似的主题组合到同一个流中,确保流的数量不会随着主题数量的增加而过快增长,从而最小化每个会话存储的元数据量。

下一步

想要了解如何对会话持久化功能进行配置和管理,以及如何对 EMQX 集群中的会话持久化进行初始设置和更改设置,请参阅以下页面: