消息队列用户指南
本页面将引导你从创建消息队列到配置其行为,并通过 Dashboard、REST API 或配置文件对其进行管理,从而实际使用 EMQX 的消息队列功能。
通过 Dashboard 创建消息队列
消息队列在可以存储或分发消息之前,必须显式声明/创建。
通过 EMQX Dashboard 创建新消息队列的步骤如下:
在左侧菜单中进入消息队列页面。
点击页面中的创建按钮。
在创建消息队列对话框中:
过滤主题:输入主题或主题过滤器(例如
t/1
)。该字段用于根据主题匹配决定哪些发布的消息将被入队。一个队列将收集所有匹配该过滤器的消息。若要从队列中消费消息,客户端必须订阅使用
$q/{Topic Filter}
格式的主题。派发策略:选择消息应如何在订阅者之间分发。可选策略包括:
最少未确认消息订阅者
:优先选择未确认(正在传输)消息最少的订阅者。随机
:(默认)随机选择一个订阅者。轮询
:均匀轮询分发给所有订阅者。
数据保留期:指定消息在队列中保留的时间。你可以设置时间单位(例如:天)。
最后值语义:若希望同一队列中使用相同队列键的新消息覆盖旧消息,请开启此开关。启用此选项后,每条消息应该具有“队列键表达式”所配置的属性,以保存到队列中。
- 队列键表达式:启用“最新值语义”后,该字段用于定义如何从每条消息中提取队列键,默认值为
message.from
,即为消息的发送者客户端 ID。该字段支持使用 Variform 表达式设置。
- 队列键表达式:启用“最新值语义”后,该字段用于定义如何从每条消息中提取队列键,默认值为
点击创建保存队列。
新队列将出现在消息队列列表中,并显示其主题过滤器、分发策略、是否启用了最新值语义以及数据保留时间。你可以通过操作栏中的按钮编辑或删除队列。
配置消息队列设置
本节介绍如何配置适用于所有消息队列的全局设置。这些设置控制消息保留时间、清理周期以及内部队列行为。你可以通过 Dashboard、REST API 或配置文件进行配置。
Dashboard
你可以在不重启 EMQX 的情况下,直接通过 Dashboard 修改消息队列的设置。这在运行时调整系统级行为时非常实用。
通过 Dashboard 配置全局设置的步骤如下:
- 在左侧菜单进入消息队列页面。
- 点击页面右上角的设置按钮。
- 页面将跳转至 MQTT 配置 -> 消息队列选项卡。在此面板中,你可以配置以下参数:
- 垃圾回收间隔:队列中过期消息的清理周期,默认为
1
小时。 - 常规队列保留周期:普通队列中消息的最长保留时间,默认为
7
天。 - 查找队列重试间隔:当客户端订阅
$q/
前缀的队列主题时,如果对应的队列尚未创建,该参数控制客户端多久重试一次查找队列。默认为10
秒。
- 垃圾回收间隔:队列中过期消息的清理周期,默认为
- 修改完成后点击保存修改应用设置。
REST API
你也可以通过 REST API 配置全局消息队列设置。这些设置为系统级配置,会影响所有队列的内部管理行为。
curl -v -u key:secret -X PUT -H "Content-Type: application/json" http://localhost:18083/api/v5/message_queues/config -d '{"find_queue_retry_interval": "10s", "gc_interval": "1h", "regular_queue_retention_period": "7d"}'
配置文件
为实现持久化和版本控制,你可以在 EMQX 配置文件(emqx.conf
)中定义消息队列设置。以下是包含主要设置的示例:
mq {
gc_interval = 1h
regular_queue_retention_period = 1d
find_queue_retry_interval = 10s
}
配置项说明
gc_interval
:定义 EMQX 扫描消息队列并清理过期消息的时间间隔。regular_queue_retention_period
:设置队列中消息的最长保留时间。超时的消息将被自动清除。find_queue_retry_interval
:当订阅者订阅$q/
主题时未找到队列,订阅者重新尝试查找队列的周期。
通过 REST API 管理消息队列
EMQX 提供一组 REST API 用于管理消息队列的生命周期,包括创建、查询、更新和删除。
创建消息队列
通过指定主题过滤器和队列属性(如是否启用最新值语义)来创建新队列:
curl -s -u key:secret -X POST -H "Content-Type: application/json" \
http://localhost:18083/api/v5/message_queues \
-d '{"topic_filter": "t1/#", "is_lastvalue": false}' | jq
列出所有消息队列
获取所有已存在的消息队列列表:
curl -s -u key:secret -X GET -H "Content-Type: application/json" \
http://localhost:18083/api/v5/message_queues | jq
更新消息队列
更新现有队列的属性,例如分发策略:
curl -s -u key:secret -X PUT -H "Content-Type: application/json" \
http://localhost:18083/api/v5/message_queues/t1%2F%23 \
-d '{"dispatch_strategy": "least_inflight"}' | jq
删除消息队列
删除指定队列及其所有已保留的消息:
curl -s -u key:secret -X DELETE \
http://localhost:18083/api/v5/message_queues/t1%2F%23
注意:
- URL 中的主题过滤器必须进行 URL 编码(例如
t1/#
编码为t1%2F%23
)。- 请求需要身份认证(
key:secret
)。
常见问题与故障排查
为什么消息没有被入队?
- 请确认声明的消息队列的主题过滤器是否匹配发布的消息主题。
- 检查队列是否存在且配置正确。
- 查看 EMQX 日志中是否有相关错误或警告,特别是带有
mq_
前缀的日志项,以帮助诊断队列问题。
队列容量超过后会发生什么?
- 当前消息队列不限制消息数量或总字节数,但会通过配置的保留时长进行时间限制。
- 一旦消息过期(即超出保留时长),将不再被投递,并会在 EMQX 定期执行垃圾回收时被自动清除。