消息队列用户指南
本页面将引导你从创建消息队列到配置其行为,并通过 Dashboard、REST API 或配置文件对其进行管理,从而实际使用 EMQX 的消息队列功能。
通过 Dashboard 创建消息队列
消息队列在可以存储或分发消息之前,必须显式声明/创建。
通过 EMQX Dashboard 创建新消息队列的步骤如下:
在左侧菜单中进入消息队列页面。
点击页面中的创建按钮。
在创建消息队列对话框中:
过滤主题:输入主题或主题过滤器(例如
t/1
)。该字段用于根据主题匹配决定哪些发布的消息将被入队。一个队列将收集所有匹配该过滤器的消息。若要从队列中消费消息,客户端必须订阅使用
$q/{Topic Filter}
格式的主题。派发策略:选择消息应如何在订阅者之间分发。可选策略包括:
最少未确认消息订阅者
:优先选择未确认(正在传输)消息最少的订阅者。随机
:(默认)随机选择一个订阅者。轮询
:均匀轮询分发给所有订阅者。
数据保留期:指定消息在队列中保留的时间。你可以设置时间单位(例如:天)。
最后值语义:若希望同一队列中使用相同队列键的新消息覆盖旧消息,请开启此开关。启用此选项后,每条消息应该具有“队列键表达式”所配置的属性,以保存到队列中。
- 队列键表达式:启用“最新值语义”后,该字段用于定义如何从每条消息中提取队列键,默认值为
message.from
,即为消息的发送者客户端 ID。该字段支持使用 Variform 表达式设置。
- 队列键表达式:启用“最新值语义”后,该字段用于定义如何从每条消息中提取队列键,默认值为
点击创建保存队列。
新队列将出现在消息队列列表中,并显示其主题过滤器、分发策略、是否启用了最新值语义以及数据保留时间。你可以通过操作栏中的按钮编辑或删除队列。
队列键表达式
队列键表达式用于指定在启用“最新值语义”模式下,如何从消息元数据中提取用于消息去重的键。该表达式会针对每条消息的元数据进行求值,并遵循Variform 表达式的语法。
该表达式会在包含 from
、topic
、payload
、headers.properties
等字段的消息上下文中进行求值。例如,如果希望使用用户属性(User Property)作为键,可以将表达式设置为:
message.headers.properties.'User-Property'.user-prop
如果无法根据表达式提取出队列键(例如字段不存在),该消息将被丢弃,不会被加入队列。
消息上下文示例
队列键表达式会在如下结构的消息上下文中进行求值:
JSON 示例
{
"message": {
"qos": 0,
"topic": "some/topic",
"payload": "some-payload",
"headers": {
"client_attrs": {},
"proto_ver": 5,
"properties": {
"User-Property": {
"user-prop": "some-value"
}
},
"peerhost": "127.0.0.1",
"username": "undefined",
"protocol": "mqtt",
"peername": "127.0.0.1:49352"
},
"from": "clientid",
"timestamp": 1759238376252,
"id": "..non utf8 bytes...",
"flags": {
"retain": false,
"dup": false
},
"extra": {}
}
}
Erlang Term 示例
#{
message =>
#{
extra => #{},
flags => #{dup => false, retain => false},
id => <<0,6,64,4,154,125,229,77,244,69,0,0,28,21,0,2>>,
timestamp => 1759238376252,
from => <<"clientid">>,
headers =>
#{
peername => <<"127.0.0.1:49352">>,
protocol => mqtt,
username => undefined,
peerhost => <<"127.0.0.1">>,
properties =>
#{
'User-Property' => #{
<<"user-prop">> => <<"some-value">>
}
},
proto_ver => 5,
client_attrs => #{}
},
payload => <<"some-payload">>,
topic => <<"some/topic">>,
qos => 0
}
}
配置消息队列设置
本节介绍如何配置适用于所有消息队列的全局设置。这些设置控制消息保留时间、清理周期以及内部队列行为。你可以通过 Dashboard、REST API 或配置文件进行配置。
Dashboard
你可以在不重启 EMQX 的情况下,直接通过 Dashboard 修改消息队列的设置。这在运行时调整系统级行为时非常实用。
通过 Dashboard 配置全局设置的步骤如下:
- 在左侧菜单进入消息队列页面。
- 点击页面右上角的设置按钮。
- 页面将跳转至 MQTT 配置 -> 消息队列选项卡。在此面板中,你可以配置以下参数:
- 垃圾回收间隔:队列中过期消息的清理周期,默认为
1
小时。 - 常规队列保留周期:普通队列中消息的最长保留时间,默认为
7
天。 - 查找队列重试间隔:当客户端订阅
$q/
前缀的队列主题时,如果对应的队列尚未创建,该参数控制客户端多久重试一次查找队列。默认为10
秒。
- 垃圾回收间隔:队列中过期消息的清理周期,默认为
- 修改完成后点击保存修改应用设置。
REST API
你也可以通过 REST API 配置全局消息队列设置。这些设置为系统级配置,会影响所有队列的内部管理行为。
curl -v -u key:secret -X PUT -H "Content-Type: application/json" http://localhost:18083/api/v5/message_queues/config -d '{"find_queue_retry_interval": "10s", "gc_interval": "1h", "regular_queue_retention_period": "7d"}'
配置文件
为实现持久化和版本控制,你可以在 EMQX 配置文件(emqx.conf
)中定义消息队列设置。以下是包含主要设置的示例:
mq {
gc_interval = 1h
regular_queue_retention_period = 1d
find_queue_retry_interval = 10s
}
配置项说明
gc_interval
:定义 EMQX 扫描消息队列并清理过期消息的时间间隔。regular_queue_retention_period
:设置队列中消息的最长保留时间。超时的消息将被自动清除。find_queue_retry_interval
:当订阅者订阅$q/
主题时未找到队列,订阅者重新尝试查找队列的周期。
通过 REST API 管理消息队列
EMQX 提供一组 REST API 用于管理消息队列的生命周期,包括创建、查询、更新和删除。
创建消息队列
通过指定主题过滤器和队列属性(如是否启用最新值语义)来创建新队列:
curl -s -u key:secret -X POST -H "Content-Type: application/json" \
http://localhost:18083/api/v5/message_queues \
-d '{"topic_filter": "t1/#", "is_lastvalue": false}' | jq
列出所有消息队列
获取所有已存在的消息队列列表:
curl -s -u key:secret -X GET -H "Content-Type: application/json" \
http://localhost:18083/api/v5/message_queues | jq
更新消息队列
更新现有队列的属性,例如分发策略:
curl -s -u key:secret -X PUT -H "Content-Type: application/json" \
http://localhost:18083/api/v5/message_queues/t1%2F%23 \
-d '{"dispatch_strategy": "least_inflight"}' | jq
删除消息队列
删除指定队列及其所有已保留的消息:
curl -s -u key:secret -X DELETE \
http://localhost:18083/api/v5/message_queues/t1%2F%23
注意:
- URL 中的主题过滤器必须进行 URL 编码(例如
t1/#
编码为t1%2F%23
)。- 请求需要身份认证(
key:secret
)。
常见问题与故障排查
为什么消息没有被入队?
- 请确认声明的消息队列的主题过滤器是否匹配发布的消息主题。
- 检查队列是否存在且配置正确。
- 查看 EMQX 日志中是否有相关错误或警告,特别是带有
mq_
前缀的日志项,以帮助诊断队列问题。
队列容量超过后会发生什么?
- 当前消息队列不限制消息数量或总字节数,但会通过配置的保留时长进行时间限制。
- 一旦消息过期(即超出保留时长),将不再被投递,并会在 EMQX 定期执行垃圾回收时被自动清除。