# MQTT 消息流用户指南

本页面将引导您了解 EMQX 中 MQTT 消息流功能的实际使用方式，包括如何创建消息流、配置其行为，以及通过 Dashboard、REST API 或配置文件对消息流进行管理。

## 启用 MQTT 消息流功能

MQTT 消息流功能默认是关闭的。在创建或使用任何消息流之前，必须先在 Dashboard 中启用该功能。

1. 在左侧菜单中进入**流**。
2. 如果消息流尚未启用，页面会显示提示信息，说明该功能当前处于关闭状态。
3. 点击**设置**，进入**流**设置页面。
4. 将**启用流**切换为**开启**。
5. 点击**保存修改**。

启用后，消息流功能将立即生效，您可以开始创建和管理消息流。

## 通过 Dashboard 手动创建消息流

MQTT 消息流在存储或回放消息之前，必须被显式创建。您可以通过手动或自动方式创建和管理消息流。关于自动创建的详细说明，请参见[通过 Dashboard 自动创建消息流](#通过-dashboard-自动创建消息流)。

1. 在左侧菜单中进入**流**。

2. 点击**创建流**，打开**创建流**对话框。

3. 配置以下选项：

   - **名称**（必填）：用于标识该消息流的唯一名称。仅允许包含：

     - 字母与数字
     - 下划线 `_`
     - 连字符 `-`
     - 点号 `.`
   
     消息流通过名称进行管理与订阅。
   
   - **过滤主题**：输入主题或主题过滤器（例如 `t/1` 或 `sensors/+/data`），用于定义哪些已发布的消息会被捕获到该消息流中。所有发布到与该过滤器匹配的 MQTT 主题的消息，都会被存储到消息流中。
   
     > 主题过滤器是消息流的配置属性，而不是标识符。客户端可以通过以下订阅格式消费消息流中的消息：
     >
     > - `$queue/<name>`：当消息流已存在时使用。
     > - `$queue/<name>/<topic_filter>`：在订阅已存在消息流时可选使用；当启用自动创建功能时，如果该消息流尚不存在，EMQX 将使用提供的 `<topic_filter>` 自动创建消息流。
     > 
     > `<topic_filter>` 必须与消息流已配置的过滤主题一致。
     >
     > 若需要回放历史数据，需要在 MQTT 5 订阅时设置订阅属性：
     >
     > ```
     > stream-offset
     > ```
     >
     > `stream-offset` 可取值：
     >
     > - Unix 微秒时间戳
     > - `earliest`
     > - `latest`
   
   - **数据保留期**：指定消息在消息流中保留的时间长度。超过该保留期的消息将被自动删除，从而限制消息可被回放的时间范围。
   
   - **最后值语义**：启用后，消息流将只保留每个键对应的最新一条消息。当具有相同键的新消息写入时，旧消息会被覆盖。这非常适合设备状态、配置等状态型数据场景。
   
   - **流键表达式**（必填）：用于从每条进入消息流的消息中提取键值的表达式。默认值为 `message.from`，表示使用消息发布者的客户端 ID。该字段支持使用 [Variform 表达式](../configuration/configuration.md#variform-表达式)进行配置。
   
     提取出的键在不同类型的消息流中承担不同角色：
   
     - 对于**最后值流**，该键作为主键使用。具有相同键的消息会相互覆盖，消息流中始终只保留该键对应的最新一条消息。
   
     - 对于**常规流**，该键作为**分片键**使用，用于决定消息会被写入哪个存储分片。具有相同键的消息会被路由到同一个分片，从而在实现多分片并行存储的同时，保证按键的消息顺序性。
   
       ::: tip
   
       对于常规消息流，应避免使用常量或低基数的表达式作为流键表达式，否则可能导致所有消息写入同一个分片，形成写入热点并影响性能。
   
       :::
   
     ::: tip
   
     流键表达式与消息队列中的队列键表达式用法类似。关于如何从消息中提取键值的更多示例，请参见[队列键表达式](../message-queue/message-queue-task.md/#队列键表达式)。
   
     :::
   
   - **消息限制**：用于限制消息流中每个分片的存储使用情况：
   
     - **最大分片消息数量**：限制每个分片中可保留的最大消息条数。您可以启用该选项并设置具体数值，或保持关闭以允许无限数量（`infinity`）。
     - **最大分片消息字节数**：限制每个分片中消息的最大总字节数。您可以启用该选项并设置具体大小（例如 `200MB`），或保持关闭以允许无限存储（`infinity`）。
   
     这些限制会持久化到持久化存储中，并与数据保留期共同生效。
   
   4. 点击**创建**保存消息流。
   
   创建完成后，消息流将立即生效。发布到与配置的主题过滤器匹配的主题上的消息，会按照保留策略和限制规则进行存储，并可被客户端通过订阅进行回放。

## 通过 Dashboard 自动创建消息流

当客户端订阅 `$stream/<name>` 前缀的主题时，如果该名称对应的消息流不存在，EMQX 可自动创建。`<name>` 将作为消息流名称。

::: tip 注意

只有在全局启用了消息流功能后，自动创建消息流功能才可用。

:::

自动创建的消息流可以是**常规流**或**最后值流**。

::: tip 注意

为了确保消息流行为清晰可控，自动创建时只能启用**常规流**或**最后值流**其中之一，不能同时启用。

:::

### 自动创建最后值消息流

该选项在 **MQTT 配置** -> **流**页面中默认开启。启用后，当客户端订阅不存在的消息流时，EMQX 会自动创建支持最后值语义的消息流。

1. 进入**管理** -> **MQTT 配置** -> **流**。
2. 默认情况下，**启用自动创建流**已开启，且已选中**最后值流**类型。
3. 配置以下选项：
   - **流键表达式**（必填）：定义如何从每条消息中提取唯一键（默认：`message.from`）。在最后值消息流中，该键作为主键使用，具有相同键的消息会覆盖旧消息。
   - **数据保留期**：指定消息在消息流中保留的时间。
4. 点击**保存修改**。

当客户端订阅 `$stream/my_stream/test` 这样的主题时，EMQX 会自动创建一个名为 `my_stream` 的最后值消息流，并在**流**列表中显示。

### 自动创建常规消息流

如果您希望消息流保留所有消息、不进行覆盖，可以选择自动创建常规消息流。

1. 进入**管理** -> **MQTT 配置** -> **流**。
2. 保持**启用自动创建流**为开启状态，并选择**常规流**类型。
3. 配置以下选项：
   - **流键表达式**（必填）：定义如何从消息中提取键值（默认：`message.from`）。在常规消息流中，该键用于决定消息写入的存储分片，有助于在保证按键顺序的同时实现负载分布。
   - **数据保留期**：指定消息在消息流中的保留时间。
4. 点击**保存修改**。

## 配置消息流全局设置

本节介绍如何配置作用于所有 MQTT 消息流的全局设置。这些设置用于控制消息保留、清理周期、内部行为以及自动创建策略。您可以通过 Dashboard、REST API 或配置文件进行配置。

### Dashboard

您可以直接在 EMQX Dashboard 中修改消息流的全局设置，无需重启 EMQX。

1. 进入**管理** -> **MQTT 配置** -> **流**。

2. 配置以下选项：

   - **启用流**：全局启用或禁用消息流功能。禁用后，无法创建或使用任何消息流。

   - **最大流数**：限制集群中允许存在的消息流数量，用于防止过度创建导致资源耗尽。

   - **垃圾回收间隔**：指定清理过期消息的周期，默认值为 `1 小时`。

   - **常规消息流保留期**：常规（非最后值）消息流的默认消息保留时间，默认值为 `7 天`。

   - **启用自动创建流**：当客户端订阅消息流主题且对应消息流不存在时，是否自动创建。

   - **自动创建消息流类型**：

     - **最后值流**（默认）
     - **常规流**

   - **流键表达式**：为自动创建的消息流指定流键表达式（默认：`message.from`）。

   - **数据保留期**：自动创建消息流的消息保留时间。

   - **最大分片消息字节数**：限制每个分片可存储的最大消息数据量。

   - **最大分片消息数量**：限制每个分片可存储的最大消息条数。

     ::: tip

     分片数量由持久化存储的全局配置决定，并适用于所有消息流。上述限制是按[分片](../design/durable-storage.md#分片-shard)生效的，不考虑副本因子。在规划磁盘容量时，需要同时考虑分片数量和副本因子。

     :::

3. 点击**保存修改**。

修改后的配置会立即生效，并作用于现有和新创建的消息流（适用的情况下）。

### REST API

您可以通过 EMQX 的 REST API 以编程方式配置消息流的全局设置。

要更新 MQTT 消息流的全局配置，请向以下接口发送一个 `PUT` 请求：

```bash
PUT /api/v5/message_streams/config
```

**请求示例**:

```
curl -s -u key:secret \
  -X PUT \
  -H "Content-Type: application/json" \
  http://localhost:18083/api/v5/message_streams/config \
  -d '{
    "gc_interval": "1h",
    "regular_stream_retention_period": "1d",
    "check_stream_status_interval": "10s"
  }'
```

### 配置文件

您可以通过编辑 EMQX 的配置文件来配置消息流的全局设置。这种方式适用于在系统启动时定义默认行为，或在以配置文件作为主要管理手段的环境中统一管理相关设置。

**配置示例**：

消息流相关的配置项定义在 EMQX 配置文件（`emqx.conf`）的 `streams` 配置段中。

```hocon
streams {
    gc_interval = 1h
    regular_stream_retention_period = 1d
    check_stream_status_interval = 10s
}
```

#### 配置项说明

- **gc_interval**：控制消息流中过期消息被清理的频率。该配置项影响消息流存储的垃圾回收（GC）周期。
- **regular_stream_retention_period**：指定常规消息流的默认最大消息保留时间。超过该时长的消息会被自动删除。
- **check_stream_status_interval**：当客户端订阅 `$stream/<name>` 前缀的消息流主题且对应消息流尚不存在时，用于控制订阅端重试查找消息流的时间间隔。

所有时间相关的配置值均支持标准时间单位，例如 `s`（秒）、`m`（分钟）、`h`（小时）和 `d`（天）。

#### 持久存储配置

MQTT 消息流中的消息数据通过 EMQX 的持久存储进行保存。与消息流相关的存储配置位于 `durable_storage.streams_messages` 配置段下。

```hocon
durable_storage {
    ## 用于存储消息流消息的数据存储配置
    ## 更多信息请参见持久存储相关配置说明
    streams_messages {
        transaction {
            flush_interval = 100
            idle_flush_interval = 20
            conflict_window = 5000
        }
    }
}
```

这些配置项用于控制消息流数据写入持久存储时的行为，包括事务批处理和刷新机制。在大多数情况下，默认配置已经能够满足需求，除非您需要针对存储性能进行专项调优，否则无需修改。

## 通过 REST API 管理消息流

EMQX 提供了一组 REST API 用于管理 MQTT 消息流。您可以使用这些 API 来创建、更新、列出、查询和删除消息流，以及配置消息流的全局设置。这对于自动化运维、与外部系统集成以及大规模管理消息流非常有用。

::: tip 注意

所有 REST API 操作都需要具备相应的认证信息和访问权限。有关请求和响应参数的完整说明，请参阅 [API 文档](../admin/api.md)的”消息流“部分。

:::

以下示例均假设使用 API Key 和 Secret 进行基本认证。

### 创建消息流

要创建一个新的消息流，请向消息流接口发送一个 `POST` 请求，并在请求体中指定消息流的配置信息。

```bash
curl -s -u key:secret \
  -X POST \
  -H "Content-Type: application/json" \
  http://localhost:18083/api/v5/message_streams/streams \
  -d '{
    "name": "my_stream",
    "topic_filter": "t1/#",
    "is_lastvalue": false
  }' | jq
```

响应结果中包含新创建的消息流的详细信息，包括其 `topic_filter`。

### 列出消息流

要获取当前已存在的消息流列表，请向消息流接口发送一个 `GET` 请求。

```bash
curl -s -u key:secret \
  -X GET \
  -H "Content-Type: application/json" \
  http://localhost:18083/api/v5/message_streams/streams | jq
```

响应中返回消息流列表以及分页相关的元数据信息。

```bash
{
  "data": [
    {
      "name": "my_stream",
      "topic_filter": "t1/#"
    }
  ],
  "meta": {
    "hasnext": false
  }
}
```

### 更新消息流

要更新一个已有的消息流，请向由其主题过滤器标识的消息流资源发送一个 `PUT` 请求。主题过滤器需要进行 URL 编码。

```bash
curl -s -u key:secret \
  -X PUT \
  -H "Content-Type: application/json" \
  http://localhost:18083/api/v5/message_streams/streams/my_stream \
  -d '{
    "key_expression": "message.from",
    "is_lastvalue": false
  }' | jq
```

响应结果会返回更新后的消息流配置信息。

### 删除消息流

要删除一个消息流，请向由 URL 编码后的主题过滤器标识的消息流资源发送一个 `DELETE` 请求。

```bash
curl -s -u key:secret \
  -X DELETE \
  http://localhost:18083/api/v5/message_streams/streams/my_stream
```

消息流被删除后，将不再接收新的消息，其已存储的数据也会按照内部清理规则逐步移除。

### 配置 MQTT 消息流全局设置

请参见[配置消息流全局设置 - REST API](#rest-api)。