消息队列
EMQX 6.0 引入的消息队列功能扩展了 MQTT 的发布/订阅模式,加入了持久化队列语义,从而实现可靠的异步消息传递。它在不依赖外部中间件(如 RabbitMQ)的情况下,提供了类似企业级消息队列的能力,增强了 MQTT 的原生特性。
本文全面介绍了 EMQX 消息队列功能,包括设计初衷、核心概念、内部架构、消息流转机制以及典型应用场景。
什么是消息队列?
在 EMQX 中,消息队列是一个持久化的服务器端缓冲区,用于在无订阅者在线的情况下存储 MQTT 消息。每个队列都绑定一个主题过滤器,并自动接收与之匹配的所有消息。
与传统 MQTT 不同,消息队列即使在无客户端在线的情况下也会保留消息。客户端可通过订阅特殊格式 $q/{topic}
来消费这些消息。
为什么需要消息队列?
MQTT 是一种轻量级的、被广泛使用的发布/订阅协议。但其默认行为依赖于订阅者是否在线,不适用于异步或延迟消费场景。
MQTT 的局限性
虽然 MQTT 通过共享订阅(如 $share/{group}/topic
)提供了一些类队列功能,但仍存在以下限制:
- 无订阅者在线时,消息不会保留
- 不支持 TTL(过期时间)、队列长度限制或溢出控制
- 无法去重,例如按 key 保留最新值
- 无显式队列生命周期管理
因此,MQTT 难以实现以下场景:
- 设备上线前先下发命令
- 离线时接收任务
- 保留最新的状态或配置
使用消息队列扩展 MQTT
EMQX 消息队列扩展了 MQTT 协议,允许消息在无订阅者在线的情况下也能持久化,并提供:
- 持久化消息存储(即使客户端离线):队列虽非严格有序,但支持可靠、异步消息分发,弥补 MQTT 的轻量通信与企业级消息中间件之间的差距。
- 显式队列声明与属性配置:支持 TTL、容量限制、派发策略等,提供灵活的消息存储与分发控制。
- 可选的“最后值语义”:支持通过
队列键
属性,将同一队列中拥有相同队列键的新消息覆盖旧消息,适用于配置或状态更新等场景。
消息队列关键概念
队列名称 一个 MQTT 主题或主题过滤器,用于唯一标识该队列。匹配该过滤器的消息将被自动入队。
队列声明 通过配置属性创建一个持久化队列的过程。
队列删除 删除队列及其所有已存储消息。
最后值语义 创建队列时设置启用最后值语义功能,并设置
队列键表达式
。当启用后,消息进入队列时,EMQX 会对每条消息执行队列键
属性提取,使用相同队列键的新消息将会覆盖旧的未消费消息。此行为非常适用于有状态的消息传递或配置更新的场景,在这些场景中,只有最新的值是重要的,较旧的消息可以安全地丢弃。主题前缀 队列订阅使用特殊格式
$q/{topic}
,区别于常规 MQTT 订阅。队列属性 控制队列行为的配置项,如保留时间和派发策略等。
服务质量(QoS)
无论发布或订阅时使用何种 QoS 等级,消息队列中的所有消息都会以 QoS 1(至少一次)的方式投递。此行为确保了消息的可靠传递,并统一了队列的投递行为。
消息持久化 即使无订阅者在线,消息也会被保留。默认启用“最后值语义”。若未设置队列键表达式,则以消息接收顺序存储。
消息队列工作原理
EMQX 中的消息队列作为一个松耦合的扩展,通过内部钩子(Hook) 拦截发布和订阅操作,并与注册表、存储层协作,实现消息的持久化与可靠分发。
核心组件
- 队列注册表:管理队列生命周期:创建、删除和查找队列。
- 消息存储 DB:存储所有入队消息,构建于 EMQX 的持久化存储之上。
- 状态存储:保存消费进度与队列元数据(如 TTL、策略等)。
- 队列消费者:根据派发策略从队列中提取消息并推送给客户端。
- 订阅注册表:追踪每个客户端(channel)订阅了哪些队列,并在上下文中维护状态。
- Hook 机制:拦截发布和订阅事件,将消息路由到队列或消费者。
消息队列数据流图示
下图展示了消息队列各主要组件之间的数据流动关系:
发布流程
- 客户端向常规主题(如
some/topic
)发布一条消息。 - 内部的消息队列钩子(MQ hook)会被触发以处理该消息。
- 钩子会检查消息队列注册表,查找是否有与该主题匹配的队列。
- 如果找到匹配的队列,该消息将被写入该队列的消息数据库。
订阅与消费流程
- 客户端订阅某个主题。
- 消息队列钩子(MQ hook)被触发以处理该订阅请求。
- 如果订阅的是消息队列主题(如
$q/some/topic
),钩子会在客户端连接上下文中初始化订阅状态,并启动与消息队列消费者的连接。 - 如果该队列尚未存在消费者,则会启动一个新的消息队列消费者。
- 消费者会恢复之前的消息消费进度,并开始从消息数据库中拉取数据。
- 消费者根据配置的派发策略将接收到的消息分发到各个客户端连接(订阅通道)。
- 客户端连接(订阅通道)通过标准 MQTT 机制将消息传递给客户端。
消息队列核心特性
EMQX 的消息队列功能提供了一套关键能力,用于实现可靠、解耦且可配置的消息传递机制。
消息入队 发布到与已声明队列匹配的主题的消息会自动进入队列。
如果队列配置了队列键键表达式(用于启用“最后值语义”),EMQX 会对每条消息执行键提取:
- 如果成功提取出键,则会替换队列中尚未消费的同键消息;
- 如果在最后值队列中无法提取出键,该消息将被丢弃。
消息出队 已订阅的客户端将根据配置的分发策略从队列中接收消息。消息队列中的所有消息都会以 QoS 1(至少一次)的方式投递,客户端确认接收后将从队列中删除该消息。
分发策略 可以定义消息如何分配给多个订阅者:
random
:随机分发;round_robin
:轮询分发;least_inflight
:优先分发给未确认消息较少的客户端。
队列管理 提供完整的队列生命周期管理能力(创建、更新、删除、查询),支持通过 REST API 调用实现。
典型应用场景
消息队列可实现可靠的异步消息通信模式,适用于多种物联网(IoT)和事件驱动型应用场景,特别适用于终端设备或消费者不常在线的情况。
- 设备指令排队:云端应用可将指令预先写入队列,确保 IoT 设备即使离线,也不会错过指令。
- 批处理任务分发:将大批量数据或工作负载拆分为小任务,分发给多个工作客户端并行或延迟处理。
- 传感器数据缓冲:对高频采集的传感器数据进行临时排队,便于后续批量处理、聚合或分析。
- 最新配置下发:确保设备始终获取并处理最新的配置命令;对于相同配置项的旧命令,队列中会自动替换为最新值或标记为过时。
相关功能参考
消息队列构建于 MQTT 协议之上,并与 EMQX 中的其他消息功能互为补充:
- 共享订阅:将消息分发给多个订阅者,但在没有客户端在线时不会保留消息。
- 保留消息:保存某个主题的最新消息,但仅在新客户端订阅该主题时发送一次。
- MQTT 持久会话:在客户端断线重连后,保留其会话状态(订阅信息及 QoS 1/2 消息)。
- 规则引擎:通过类 SQL 规则对队列中的消息进行过滤、处理,并可实现转发等操作。
后续步骤
现在您已经了解了消息队列的基本概念,接下来可以进一步实践: