MQTT 消息流用户指南
本页面将引导您了解 EMQX 中 MQTT 消息流功能的实际使用方式,包括如何创建消息流、配置其行为,以及通过 Dashboard、REST API 或配置文件对消息流进行管理。
启用 MQTT 消息流功能
MQTT 消息流功能默认是关闭的。在创建或使用任何消息流之前,必须先在 Dashboard 中启用该功能。
- 在左侧菜单中进入消息流。
- 如果消息流尚未启用,页面会显示提示信息,说明该功能当前处于关闭状态。
- 点击设置,进入消息流设置页面。
- 将启用消息流切换为开启。
- 点击保存修改。
启用后,消息流功能将立即生效,您可以开始创建和管理消息流。
通过 Dashboard 手动创建消息流
MQTT 消息流在存储或回放消息之前,必须被显式创建。您可以通过手动或自动方式创建和管理消息流。关于自动创建的详细说明,请参见通过 Dashboard 自动创建消息流。
在左侧菜单中进入消息流。
点击创建消息流,打开创建消息流对话框。
配置以下选项:
主题过滤器:输入主题或主题过滤器(例如
t/1或sensors/+/data),用于定义哪些已发布的消息会被捕获到该消息流中。所有发布到与该过滤器匹配的 MQTT 主题的消息,都会被存储到消息流中。客户端通过订阅
$s/<timestamp>/<topic_filter>格式的主题来消费消息流中的消息。数据保留期:指定消息在消息流中保留的时间长度。超过该保留期的消息将被自动删除,从而限制消息可被回放的时间范围。
最后值语义:启用后,消息流将只保留每个键对应的最新一条消息。当具有相同键的新消息写入时,旧消息会被覆盖。这非常适合设备状态、配置等状态型数据场景。
流键表达式(必填):用于从每条进入消息流的消息中提取键值的表达式。默认值为
message.from,表示使用消息发布者的客户端 ID。该字段支持使用 Variform 表达式进行配置。提取出的键在不同类型的消息流中承担不同角色:
对于最后值消息流,该键作为主键使用。具有相同键的消息会相互覆盖,消息流中始终只保留该键对应的最新一条消息。
对于常规消息流,该键作为分片键使用,用于决定消息会被写入哪个存储分片。具有相同键的消息会被路由到同一个分片,从而在实现多分片并行存储的同时,保证按键的消息顺序性。
TIP
对于常规消息流,应避免使用常量或低基数的表达式作为流键表达式,否则可能导致所有消息写入同一个分片,形成写入热点并影响性能。
TIP
流键表达式与消息队列中的队列键表达式用法类似。关于如何从消息中提取键值的更多示例,请参见队列键表达式。
消息限制:用于限制消息流中每个分片的存储使用情况:
- 最大分片消息数量:限制每个分片中可保留的最大消息条数。您可以启用该选项并设置具体数值,或保持关闭以允许无限数量(
infinity)。 - 最大分片消息字节数:限制每个分片中消息的最大总字节数。您可以启用该选项并设置具体大小(例如
200MB),或保持关闭以允许无限存储(infinity)。
这些限制会持久化到持久化存储中,并与数据保留期共同生效。
- 最大分片消息数量:限制每个分片中可保留的最大消息条数。您可以启用该选项并设置具体数值,或保持关闭以允许无限数量(
- 点击创建保存消息流。
创建完成后,消息流将立即生效。发布到与配置的主题过滤器匹配的主题上的消息,会按照保留策略和限制规则进行存储,并可被客户端通过订阅进行回放。
通过 Dashboard 自动创建 MQTT 消息流
当客户端订阅 $s/ 前缀的主题时,EMQX 可以自动创建对应的消息流,从而实现无需手动配置的动态消息流创建。
注意
只有在全局启用了消息流功能后,自动创建消息流功能才可用。
自动创建的消息流可以是常规消息流或最后值消息流。
注意
为了确保消息流行为清晰可控,自动创建时只能启用常规消息流或最后值消息流其中之一,不能同时启用。
自动创建最后值 MQTT 消息流
该选项在 MQTT 配置 -> 消息流页面中默认开启。启用后,当客户端订阅不存在的消息流时,EMQX 会自动创建支持最后值语义的消息流。
- 进入管理 -> MQTT 配置 -> 消息流。
- 默认情况下,启用自动创建消息流已开启,且已选中最后值消息流类型。
- 配置以下选项:
- 流键表达式(必填):定义如何从每条消息中提取唯一键(默认:
message.from)。在最后值消息流中,该键作为主键使用,具有相同键的消息会覆盖旧消息。 - 数据保留期:指定消息在消息流中保留的时间。
- 流键表达式(必填):定义如何从每条消息中提取唯一键(默认:
- 点击保存修改。
当客户端订阅 $s/<timestamp>/test 这样的主题时,EMQX 会自动创建一个最后值消息流,并在消息流 列表中显示。
自动创建常规 MQTT 消息流
如果您希望消息流保留所有消息、不进行覆盖,可以选择自动创建常规消息流。
- 进入管理 -> MQTT 配置 -> 消息流。
- 保持启用自动创建消息流为开启状态,并选择常规消息流类型。
- 配置以下选项:
- 流键表达式(必填):定义如何从消息中提取键值(默认:
message.from)。在常规消息流中,该键用于决定消息写入的存储分片,有助于在保证按键顺序的同时实现负载分布。 - 数据保留期:指定消息在消息流中的保留时间。
- 流键表达式(必填):定义如何从消息中提取键值(默认:
- 点击保存修改。
配置 MQTT 消息流全局设置
本节介绍如何配置作用于所有 MQTT 消息流的全局设置。这些设置用于控制消息保留、清理周期、内部行为以及自动创建策略。您可以通过 Dashboard、REST API 或配置文件进行配置。
Dashboard
您可以直接在 EMQX Dashboard 中修改消息流的全局设置,无需重启 EMQX。
进入管理 -> MQTT 配置 -> 消息流。
配置以下选项:
启用消息流:全局启用或禁用消息流功能。禁用后,无法创建或使用任何消息流。
最大消息流数:限制集群中允许存在的消息流数量,用于防止过度创建导致资源耗尽。
垃圾回收间隔:指定清理过期消息的周期,默认值为
1 小时。常规消息流保留期:常规(非最后值)消息流的默认消息保留时间,默认值为
7 天。启用自动创建消息流:当客户端订阅消息流主题且对应消息流不存在时,是否自动创建。
自动创建消息流类型:
- 最后值消息流(默认)
- 常规消息流
流键表达式:为自动创建的消息流指定流键表达式(默认:
message.from)。数据保留期:自动创建消息流的消息保留时间。
最大分片消息字节数:限制每个分片可存储的最大消息数据量。
最大分片消息数量:限制每个分片可存储的最大消息条数。
TIP
分片数量由持久化存储的全局配置决定,并适用于所有消息流。上述限制是按分片生效的,不考虑副本因子。在规划磁盘容量时,需要同时考虑分片数量和副本因子。
点击保存修改。
修改后的配置会立即生效,并作用于现有和新创建的消息流(适用的情况下)。
REST API
您可以通过 EMQX 的 REST API 以编程方式配置消息流的全局设置。
要更新 MQTT 消息流的全局配置,请向以下接口发送一个 PUT 请求:
PUT /api/v5/message_streams/config请求示例:
curl -s -u key:secret \
-X PUT \
-H "Content-Type: application/json" \
http://localhost:18083/api/v5/message_streams/config \
-d '{
"gc_interval": "1h",
"regular_stream_retention_period": "1d",
"check_stream_status_interval": "10s"
}'配置文件
您可以通过编辑 EMQX 的配置文件来配置消息流的全局设置。这种方式适用于在系统启动时定义默认行为,或在以配置文件作为主要管理手段的环境中统一管理相关设置。
配置示例:
消息流相关的配置项定义在 EMQX 配置文件(emqx.conf)的 streams 配置段中。
streams {
gc_interval = 1h
regular_stream_retention_period = 1d
check_stream_status_interval = 10s
}配置项说明
- gc_interval:控制消息流中过期消息被清理的频率。该配置项影响消息流存储的垃圾回收(GC)周期。
- regular_stream_retention_period:指定常规消息流的默认最大消息保留时间。超过该时长的消息会被自动删除。
- check_stream_status_interval:当客户端订阅
$s/前缀的消息流主题且对应消息流尚不存在时,用于控制订阅端重试查找消息流的时间间隔。
所有时间相关的配置值均支持标准时间单位,例如 s(秒)、m(分钟)、h(小时)和 d(天)。
持久存储配置
MQTT 消息流中的消息数据通过 EMQX 的持久存储进行保存。与消息流相关的存储配置位于 durable_storage.streams_messages 配置段下。
durable_storage {
## 用于存储消息流消息的数据存储配置
## 更多信息请参见持久存储相关配置说明
streams_messages {
transaction {
flush_interval = 100
idle_flush_interval = 20
conflict_window = 5000
}
}
}这些配置项用于控制消息流数据写入持久存储时的行为,包括事务批处理和刷新机制。在大多数情况下,默认配置已经能够满足需求,除非您需要针对存储性能进行专项调优,否则无需修改。
通过 REST API 管理 MQTT 消息流
EMQX 提供了一组 REST API 用于管理 MQTT 消息流。您可以使用这些 API 来创建、更新、列出、查询和删除消息流,以及配置消息流的全局设置。这对于自动化运维、与外部系统集成以及大规模管理消息流非常有用。
注意
所有 REST API 操作都需要具备相应的认证信息和访问权限。有关请求和响应参数的完整说明,请参阅 API 文档的”消息流“部分。
以下示例均假设使用 API Key 和 Secret 进行基本认证。
创建消息流
要创建一个新的消息流,请向消息流接口发送一个 POST 请求,并在请求体中指定消息流的配置信息。
curl -s -u key:secret \
-X POST \
-H "Content-Type: application/json" \
http://localhost:18083/api/v5/message_streams/streams \
-d '{
"topic_filter": "t1/#",
"is_lastvalue": false
}' | jq响应结果中包含新创建的消息流的详细信息,包括其 topic_filter。
列出消息流
要获取当前已存在的消息流列表,请向消息流接口发送一个 GET 请求。
curl -s -u key:secret \
-X GET \
-H "Content-Type: application/json" \
http://localhost:18083/api/v5/message_streams/streams | jq响应中返回消息流列表以及分页相关的元数据信息。
{
"data": [
{
"topic_filter": "t1/#"
}
],
"meta": {
"hasnext": false
}
}更新消息流
要更新一个已有的消息流,请向由其主题过滤器标识的消息流资源发送一个 PUT 请求。主题过滤器需要进行 URL 编码。
curl -s -u key:secret \
-X PUT \
-H "Content-Type: application/json" \
http://localhost:18083/api/v5/message_streams/streams/t1%2F%23 \
-d '{
"key_expression": "message.from",
"is_lastvalue": false
}' | jq响应结果会返回更新后的消息流配置信息。
删除消息流
要删除一个消息流,请向由 URL 编码后的主题过滤器标识的消息流资源发送一个 DELETE 请求。
curl -s -u key:secret \
-X DELETE \
http://localhost:18083/api/v5/message_streams/streams/t1%2F%23消息流被删除后,将不再接收新的消息,其已存储的数据也会按照内部清理规则逐步移除。