# 将 MQTT 数据写入 Disk Log

Disk Log 数据集成功能允许 EMQX 以 [JSON Lines](https://jsonlines.org/) 格式将事件数据持久化写入本地磁盘，类似于传统的轮转日志（rotating log）机制。这种方式支持长期事件留存，便于故障排查和历史数据分析。

本文将详细介绍 EMQX 与 Disk Log 的数据集成方式，并提供规则与 Sink 的配置指导。

## 工作原理

EMQX 提供内置的系统日志功能，用于记录错误、警告等运行时事件，而 Disk Log 集成则面向不同的目标：它用于将 MQTT 消息数据和客户端事件实际写入磁盘，以实现存储和离线处理。

Disk Log 集成基于 EMQX 的[规则引擎](./rules.md)和 Sink 机制实现，用户可以灵活定义捕获哪些数据，以及如何进行存储：

1. 使用规则对 MQTT 消息或客户端事件进行过滤、转换和提取。
2. 将 Disk Log Sink 添加到规则，指定数据的存储路径与格式，Sink 会将格式化后的 JSON 数据传递给连接器。
3. Disk Log 连接器负责将数据实际写入文件系统，并控制日志路径、文件轮转策略等配置。
4. 当规则被触发并产生匹配数据后，Sink 会调用所配置的连接器，将数据以 JSON Lines 格式写入本地指定目录。每条消息以一行 JSON 对象存储，便于标准工具或下游系统处理。

### 日志轮转机制

为控制磁盘空间占用，Disk Log 支持基于两种阈值的文件轮转策略：

- **文件大小限制**：当当前日志文件达到配置的最大大小后，EMQX 会创建新文件继续写入。
- **文件数量限制**：当日志文件数量超过设置上限时，最旧的文件将被截断并重用。
- **数据完整性保证**：每个日志文件至少包含一条完整的消息记录，即使该条记录超过了文件大小限制。

这种机制可确保数据持续、安全地写入本地磁盘，便于后续分析与集成。

## 特性与优势

Disk Log 提供一种轻量、本地优先的 MQTT 消息持久化方式，适用于多种场景。其主要优势包括：

- **精细化数据控制**：通过 SQL 规则捕获所需的消息或事件；支持在写入前对数据进行过滤、转换与丰富。
- **结构化输出格式**：采用 JSON Lines 格式存储，每行一条 JSON 记录，便于使用命令行工具或导入分析系统。
- **轻量独立部署**：无需依赖外部数据库或云存储。
- **增强可观测性与调试能力**：支持消息级别追踪，适用于审计、测试与故障排查。它记录的是业务数据，作为系统日志的有力补充。

## 准备工作

在配置 Disk Log Sink 之前，请完成以下准备工作：

### 前置条件

- 了解[规则引擎](./rules.md)
- 熟悉[数据集成](./data-bridges.md) 概念

### 创建日志目录

在部署 EMQX 的主机上创建一个用于存储日志文件的目录。确保运行 EMQX 的系统用户对该目录具有读写权限。

## 创建连接器

在创建 Disk Log Sink 之前，需先创建对应的连接器。

1. 登录 Dashboard，点击顶部菜单的**集成** -> **连接器**。
2. 点击右上角的**创建**。
3. 选择 **Disk Log** 作为连接器类型，点击 **下一步**。
4. 输入连接器名称（如 `my-disk-log`），支持字母和数字组合。
5. 填写连接器参数：
   - **日志文件路径**：日志写入的目录路径。
   - **最大文件大小**：单个日志文件的最大大小，超过后触发轮转。注意：每个日志文件至少会写入一条记录，因此如果单条日志记录超过此值，最终文件大小可能会超过此最大值。
   - **最大文件数量**：最多保留的日志文件数，超过后从最旧的文件开始覆盖。
6. 可点击**测试连接**验证 EMQX 是否具备向该目录写入的权限。
7. 点击**创建**完成连接器配置。

## 创建 Disk Log Sink 规则

以下步骤展示如何创建一条规则，将来自主题 `t/#` 的 MQTT 消息写入本地日志文件。

1. 进入 Dashboard 的**集成** -> **规则**页面。

2. 点击右上角的**创建**。

3. 设置规则 ID 为 `my_rule`，并在 SQL 编辑器中输入以下规则语句：

   ```sql
   SELECT
     *
   FROM
     "t/#"
   ```

   ::: tip

   如不熟悉 SQL，可点击 **SQL 示例**或**启用调试**测试输出结果。

   :::

4. 添加动作，选择 **Disk Log** 作为动作类型，可选择创建新 Sink 或使用已有 Sink。此处以创建新 Sink 为例。

5. 输入 Sink 名称和描述。

6. 从下拉列表中选择之前创建的连接器 `my-disk-log`，也可点击旁边的按钮快速新建。

7. 选择**写入模式**（`异步`或`同步`）。

8. 配置**消息模板**，内容必须能渲染为合法 JSON 对象。

9. **备选动作（可选）**：如果您希望在消息投递失败时提升系统的可靠性，可以为 Sink 配置一个或多个备选动作。当 Sink 无法成功处理消息时，这些备选动作将被触发。更多信息请参见：[备选动作](./data-bridges.md#备选动作)。

10. 如需高级配置，可展开[**高级设置**](#高级设置)选项。

11. 保持其他配置默认，点击**创建**完成 Sink 配置。页面将返回规则配置，新的 Sink 会自动添加为动作。

12. 最后点击**创建**，完成整条规则的创建。

规则创建完成后，您可以在**规则**页面看到新建规则，并在 **动作（Sink）** 标签页中查看对应的 Sink。

此外，还可前往**集成** -> **Flow 设计器**，可视化查看从主题 `t/#` 到磁盘写入的完整数据流路径。

## 测试规则

以下示例演示如何使用 MQTTX 发布测试消息，并验证是否成功写入日志文件。

执行以下命令向主题 `t/1` 发送测试消息：

```bash
mqttx pub -i emqx_c -t t/1 -m '{ "msg": "Hello Disk Log" }'
```

多次发送消息后，检查配置的日志目录中最新修改的文件，确认其中包含所发布的消息内容。

## 高级设置

在创建或编辑 Disk Log Sink 时，您可以展开**高级设置**，配置如下可调参数以满足更高性能或可靠性要求：

| 参数名称             | 描述                                                         |
| -------------------- | ------------------------------------------------------------ |
| **缓存池大小**       | 设置用于处理写入请求的工作进程数量，提升高并发场景下的吞吐性能。 默认值：`16` |
| **请求超期**         | 缓存中的请求在多长时间内有效（单位：秒）。若超时未被写入则视为失败。 |
| **健康检查间隔**     | Sink 自动对磁盘写入状态进行健康检查的周期（单位：秒）。 默认值：`15` |
| **缓冲队列最大长度** | 每个工作进程可缓冲的最大数据量（字节）。根据消息大小和性能调优。 默认值：`256` |
| **请求模式**         | 写入方式，可选`同步`或`异步`。异步模式下不会阻塞 MQTT 发布，但存在消息未落盘即确认的风险。 默认值：`异步` |
| **最大批量请求大小** | 每次批量写入的最大消息数量。值越大吞吐性能越好。设置为 `1` 则逐条写入。 默认值：`1000` |
| **请求飞行窗口**     | 异步请求最大并发数，控制 Sink 的并发写入能力。如需保障消息顺序，应设为 `1`。 默认值：`100` |
