Skip to content

MQTT 消息流用户指南

本页面将引导您了解 EMQX 中 MQTT 消息流功能的实际使用方式,包括如何创建消息流、配置其行为,以及通过 Dashboard、REST API 或配置文件对消息流进行管理。

启用 MQTT 消息流功能

MQTT 消息流功能默认是关闭的。在创建或使用任何消息流之前,必须先在 Dashboard 中启用该功能。

  1. 在左侧菜单中进入消息流
  2. 如果消息流尚未启用,页面会显示提示信息,说明该功能当前处于关闭状态。
  3. 点击设置,进入消息流设置页面。
  4. 启用消息流切换为开启
  5. 点击保存修改

启用后,消息流功能将立即生效,您可以开始创建和管理消息流。

通过 Dashboard 手动创建消息流

MQTT 消息流在存储或回放消息之前,必须被显式创建。您可以通过手动或自动方式创建和管理消息流。关于自动创建的详细说明,请参见通过 Dashboard 自动创建消息流

  1. 在左侧菜单中进入消息流

  2. 点击创建消息流,打开创建消息流对话框。

  3. 配置以下选项:

    • 主题过滤器:输入主题或主题过滤器(例如 t/1sensors/+/data),用于定义哪些已发布的消息会被捕获到该消息流中。所有发布到与该过滤器匹配的 MQTT 主题的消息,都会被存储到消息流中。

      客户端通过订阅 $s/<timestamp>/<topic_filter> 格式的主题来消费消息流中的消息。

    • 数据保留期:指定消息在消息流中保留的时间长度。超过该保留期的消息将被自动删除,从而限制消息可被回放的时间范围。

    • 最后值语义:启用后,消息流将只保留每个键对应的最新一条消息。当具有相同键的新消息写入时,旧消息会被覆盖。这非常适合设备状态、配置等状态型数据场景。

    • 流键表达式(必填):用于从每条进入消息流的消息中提取键值的表达式。默认值为 message.from,表示使用消息发布者的客户端 ID。该字段支持使用 Variform 表达式进行配置。

      提取出的键在不同类型的消息流中承担不同角色:

      • 对于最后值消息流,该键作为主键使用。具有相同键的消息会相互覆盖,消息流中始终只保留该键对应的最新一条消息。

      • 对于常规消息流,该键作为分片键使用,用于决定消息会被写入哪个存储分片。具有相同键的消息会被路由到同一个分片,从而在实现多分片并行存储的同时,保证按键的消息顺序性。

        TIP

        对于常规消息流,应避免使用常量或低基数的表达式作为流键表达式,否则可能导致所有消息写入同一个分片,形成写入热点并影响性能。

      TIP

      流键表达式与消息队列中的队列键表达式用法类似。关于如何从消息中提取键值的更多示例,请参见队列键表达式

    • 消息限制:用于限制消息流中每个分片的存储使用情况:

      • 最大分片消息数量:限制每个分片中可保留的最大消息条数。您可以启用该选项并设置具体数值,或保持关闭以允许无限数量(infinity)。
      • 最大分片消息字节数:限制每个分片中消息的最大总字节数。您可以启用该选项并设置具体大小(例如 200MB),或保持关闭以允许无限存储(infinity)。

      这些限制会持久化到持久化存储中,并与数据保留期共同生效。

    1. 点击创建保存消息流。

    创建完成后,消息流将立即生效。发布到与配置的主题过滤器匹配的主题上的消息,会按照保留策略和限制规则进行存储,并可被客户端通过订阅进行回放。

通过 Dashboard 自动创建 MQTT 消息流

当客户端订阅 $s/ 前缀的主题时,EMQX 可以自动创建对应的消息流,从而实现无需手动配置的动态消息流创建。

注意

只有在全局启用了消息流功能后,自动创建消息流功能才可用。

自动创建的消息流可以是常规消息流最后值消息流

注意

为了确保消息流行为清晰可控,自动创建时只能启用常规消息流最后值消息流其中之一,不能同时启用。

自动创建最后值 MQTT 消息流

该选项在 MQTT 配置 -> 消息流页面中默认开启。启用后,当客户端订阅不存在的消息流时,EMQX 会自动创建支持最后值语义的消息流。

  1. 进入管理 -> MQTT 配置 -> 消息流
  2. 默认情况下,启用自动创建消息流已开启,且已选中最后值消息流类型。
  3. 配置以下选项:
    • 流键表达式(必填):定义如何从每条消息中提取唯一键(默认:message.from)。在最后值消息流中,该键作为主键使用,具有相同键的消息会覆盖旧消息。
    • 数据保留期:指定消息在消息流中保留的时间。
  4. 点击保存修改

当客户端订阅 $s/<timestamp>/test 这样的主题时,EMQX 会自动创建一个最后值消息流,并在消息流 列表中显示。

自动创建常规 MQTT 消息流

如果您希望消息流保留所有消息、不进行覆盖,可以选择自动创建常规消息流。

  1. 进入管理 -> MQTT 配置 -> 消息流
  2. 保持启用自动创建消息流为开启状态,并选择常规消息流类型。
  3. 配置以下选项:
    • 流键表达式(必填):定义如何从消息中提取键值(默认:message.from)。在常规消息流中,该键用于决定消息写入的存储分片,有助于在保证按键顺序的同时实现负载分布。
    • 数据保留期:指定消息在消息流中的保留时间。
  4. 点击保存修改

配置 MQTT 消息流全局设置

本节介绍如何配置作用于所有 MQTT 消息流的全局设置。这些设置用于控制消息保留、清理周期、内部行为以及自动创建策略。您可以通过 Dashboard、REST API 或配置文件进行配置。

Dashboard

您可以直接在 EMQX Dashboard 中修改消息流的全局设置,无需重启 EMQX。

  1. 进入管理 -> MQTT 配置 -> 消息流

  2. 配置以下选项:

    • 启用消息流:全局启用或禁用消息流功能。禁用后,无法创建或使用任何消息流。

    • 最大消息流数:限制集群中允许存在的消息流数量,用于防止过度创建导致资源耗尽。

    • 垃圾回收间隔:指定清理过期消息的周期,默认值为 1 小时

    • 常规消息流保留期:常规(非最后值)消息流的默认消息保留时间,默认值为 7 天

    • 启用自动创建消息流:当客户端订阅消息流主题且对应消息流不存在时,是否自动创建。

    • 自动创建消息流类型

      • 最后值消息流(默认)
      • 常规消息流
    • 流键表达式:为自动创建的消息流指定流键表达式(默认:message.from)。

    • 数据保留期:自动创建消息流的消息保留时间。

    • 最大分片消息字节数:限制每个分片可存储的最大消息数据量。

    • 最大分片消息数量:限制每个分片可存储的最大消息条数。

      TIP

      分片数量由持久化存储的全局配置决定,并适用于所有消息流。上述限制是按分片生效的,不考虑副本因子。在规划磁盘容量时,需要同时考虑分片数量和副本因子。

  3. 点击保存修改

修改后的配置会立即生效,并作用于现有和新创建的消息流(适用的情况下)。

REST API

您可以通过 EMQX 的 REST API 以编程方式配置消息流的全局设置。

要更新 MQTT 消息流的全局配置,请向以下接口发送一个 PUT 请求:

bash
PUT /api/v5/message_streams/config

请求示例:

curl -s -u key:secret \
  -X PUT \
  -H "Content-Type: application/json" \
  http://localhost:18083/api/v5/message_streams/config \
  -d '{
    "gc_interval": "1h",
    "regular_stream_retention_period": "1d",
    "check_stream_status_interval": "10s"
  }'

配置文件

您可以通过编辑 EMQX 的配置文件来配置消息流的全局设置。这种方式适用于在系统启动时定义默认行为,或在以配置文件作为主要管理手段的环境中统一管理相关设置。

配置示例

消息流相关的配置项定义在 EMQX 配置文件(emqx.conf)的 streams 配置段中。

hocon
streams {
    gc_interval = 1h
    regular_stream_retention_period = 1d
    check_stream_status_interval = 10s
}

配置项说明

  • gc_interval:控制消息流中过期消息被清理的频率。该配置项影响消息流存储的垃圾回收(GC)周期。
  • regular_stream_retention_period:指定常规消息流的默认最大消息保留时间。超过该时长的消息会被自动删除。
  • check_stream_status_interval:当客户端订阅 $s/ 前缀的消息流主题且对应消息流尚不存在时,用于控制订阅端重试查找消息流的时间间隔。

所有时间相关的配置值均支持标准时间单位,例如 s(秒)、m(分钟)、h(小时)和 d(天)。

持久存储配置

MQTT 消息流中的消息数据通过 EMQX 的持久存储进行保存。与消息流相关的存储配置位于 durable_storage.streams_messages 配置段下。

hocon
durable_storage {
    ## 用于存储消息流消息的数据存储配置
    ## 更多信息请参见持久存储相关配置说明
    streams_messages {
        transaction {
            flush_interval = 100
            idle_flush_interval = 20
            conflict_window = 5000
        }
    }
}

这些配置项用于控制消息流数据写入持久存储时的行为,包括事务批处理和刷新机制。在大多数情况下,默认配置已经能够满足需求,除非您需要针对存储性能进行专项调优,否则无需修改。

通过 REST API 管理 MQTT 消息流

EMQX 提供了一组 REST API 用于管理 MQTT 消息流。您可以使用这些 API 来创建、更新、列出、查询和删除消息流,以及配置消息流的全局设置。这对于自动化运维、与外部系统集成以及大规模管理消息流非常有用。

注意

所有 REST API 操作都需要具备相应的认证信息和访问权限。有关请求和响应参数的完整说明,请参阅 API 文档的”消息流“部分。

以下示例均假设使用 API Key 和 Secret 进行基本认证。

创建消息流

要创建一个新的消息流,请向消息流接口发送一个 POST 请求,并在请求体中指定消息流的配置信息。

bash
curl -s -u key:secret \
  -X POST \
  -H "Content-Type: application/json" \
  http://localhost:18083/api/v5/message_streams/streams \
  -d '{
    "topic_filter": "t1/#",
    "is_lastvalue": false
  }' | jq

响应结果中包含新创建的消息流的详细信息,包括其 topic_filter

列出消息流

要获取当前已存在的消息流列表,请向消息流接口发送一个 GET 请求。

bash
curl -s -u key:secret \
  -X GET \
  -H "Content-Type: application/json" \
  http://localhost:18083/api/v5/message_streams/streams | jq

响应中返回消息流列表以及分页相关的元数据信息。

bash
{
  "data": [
    {
      "topic_filter": "t1/#"
    }
  ],
  "meta": {
    "hasnext": false
  }
}

更新消息流

要更新一个已有的消息流,请向由其主题过滤器标识的消息流资源发送一个 PUT 请求。主题过滤器需要进行 URL 编码。

bash
curl -s -u key:secret \
  -X PUT \
  -H "Content-Type: application/json" \
  http://localhost:18083/api/v5/message_streams/streams/t1%2F%23 \
  -d '{
    "key_expression": "message.from",
    "is_lastvalue": false
  }' | jq

响应结果会返回更新后的消息流配置信息。

删除消息流

要删除一个消息流,请向由 URL 编码后的主题过滤器标识的消息流资源发送一个 DELETE 请求。

bash
curl -s -u key:secret \
  -X DELETE \
  http://localhost:18083/api/v5/message_streams/streams/t1%2F%23

消息流被删除后,将不再接收新的消息,其已存储的数据也会按照内部清理规则逐步移除。

配置 MQTT 消息流全局设置

请参见配置消息流全局设置 - REST API