Skip to content

将 MQTT 数据写入 Disk Log

Disk Log 数据集成功能允许 EMQX 以 JSON Lines 格式将事件数据持久化写入本地磁盘,类似于传统的轮转日志(rotating log)机制。这种方式支持长期事件留存,便于故障排查和历史数据分析。

本文将详细介绍 EMQX 与 Disk Log 的数据集成方式,并提供规则与 Sink 的配置指导。

工作原理

EMQX 提供内置的系统日志功能,用于记录错误、警告等运行时事件,而 Disk Log 集成则面向不同的目标:它用于将 MQTT 消息数据和客户端事件实际写入磁盘,以实现存储和离线处理。

Disk Log 集成基于 EMQX 的规则引擎和 Sink 机制实现,用户可以灵活定义捕获哪些数据,以及如何进行存储:

  1. 使用规则对 MQTT 消息或客户端事件进行过滤、转换和提取。
  2. 将 Disk Log Sink 添加到规则,指定数据的存储路径与格式,Sink 会将格式化后的 JSON 数据传递给连接器。
  3. Disk Log 连接器负责将数据实际写入文件系统,并控制日志路径、文件轮转策略等配置。
  4. 当规则被触发并产生匹配数据后,Sink 会调用所配置的连接器,将数据以 JSON Lines 格式写入本地指定目录。每条消息以一行 JSON 对象存储,便于标准工具或下游系统处理。

日志轮转机制

为控制磁盘空间占用,Disk Log 支持基于两种阈值的文件轮转策略:

  • 文件大小限制:当当前日志文件达到配置的最大大小后,EMQX 会创建新文件继续写入。
  • 文件数量限制:当日志文件数量超过设置上限时,最旧的文件将被截断并重用。
  • 数据完整性保证:每个日志文件至少包含一条完整的消息记录,即使该条记录超过了文件大小限制。

这种机制可确保数据持续、安全地写入本地磁盘,便于后续分析与集成。

特性与优势

Disk Log 提供一种轻量、本地优先的 MQTT 消息持久化方式,适用于多种场景。其主要优势包括:

  • 精细化数据控制:通过 SQL 规则捕获所需的消息或事件;支持在写入前对数据进行过滤、转换与丰富。
  • 结构化输出格式:采用 JSON Lines 格式存储,每行一条 JSON 记录,便于使用命令行工具或导入分析系统。
  • 轻量独立部署:无需依赖外部数据库或云存储。
  • 增强可观测性与调试能力:支持消息级别追踪,适用于审计、测试与故障排查。它记录的是业务数据,作为系统日志的有力补充。

准备工作

在配置 Disk Log Sink 之前,请完成以下准备工作:

前置条件

创建日志目录

在部署 EMQX 的主机上创建一个用于存储日志文件的目录。确保运行 EMQX 的系统用户对该目录具有读写权限。

创建连接器

在创建 Disk Log Sink 之前,需先创建对应的连接器。

  1. 登录 Dashboard,点击顶部菜单的集成 -> 连接器
  2. 点击右上角的创建
  3. 选择 Disk Log 作为连接器类型,点击 下一步
  4. 输入连接器名称(如 my-disk-log),支持字母和数字组合。
  5. 填写连接器参数:
    • 日志文件路径:日志写入的目录路径。
    • 最大文件大小:单个日志文件的最大大小,超过后触发轮转。注意:每个日志文件至少会写入一条记录,因此如果单条日志记录超过此值,最终文件大小可能会超过此最大值。
    • 最大文件数量:最多保留的日志文件数,超过后从最旧的文件开始覆盖。
  6. 可点击测试连接验证 EMQX 是否具备向该目录写入的权限。
  7. 点击创建完成连接器配置。

创建 Disk Log Sink 规则

以下步骤展示如何创建一条规则,将来自主题 t/# 的 MQTT 消息写入本地日志文件。

  1. 进入 Dashboard 的集成 -> 规则页面。

  2. 点击右上角的创建

  3. 设置规则 ID 为 my_rule,并在 SQL 编辑器中输入以下规则语句:

    sql
    SELECT
      *
    FROM
      "t/#"

    TIP

    如不熟悉 SQL,可点击 SQL 示例启用调试测试输出结果。

  4. 添加动作,选择 Disk Log 作为动作类型,可选择创建新 Sink 或使用已有 Sink。此处以创建新 Sink 为例。

  5. 输入 Sink 名称和描述。

  6. 从下拉列表中选择之前创建的连接器 my-disk-log,也可点击旁边的按钮快速新建。

  7. 选择写入模式异步同步)。

  8. 配置消息模板,内容必须能渲染为合法 JSON 对象。

  9. 备选动作(可选):如果您希望在消息投递失败时提升系统的可靠性,可以为 Sink 配置一个或多个备选动作。当 Sink 无法成功处理消息时,这些备选动作将被触发。更多信息请参见:备选动作

  10. 如需高级配置,可展开高级设置选项。

  11. 保持其他配置默认,点击创建完成 Sink 配置。页面将返回规则配置,新的 Sink 会自动添加为动作。

  12. 最后点击创建,完成整条规则的创建。

规则创建完成后,您可以在规则页面看到新建规则,并在 动作(Sink) 标签页中查看对应的 Sink。

此外,还可前往集成 -> Flow 设计器,可视化查看从主题 t/# 到磁盘写入的完整数据流路径。

测试规则

以下示例演示如何使用 MQTTX 发布测试消息,并验证是否成功写入日志文件。

执行以下命令向主题 t/1 发送测试消息:

bash
mqttx pub -i emqx_c -t t/1 -m '{ "msg": "Hello Disk Log" }'

多次发送消息后,检查配置的日志目录中最新修改的文件,确认其中包含所发布的消息内容。

高级设置

在创建或编辑 Disk Log Sink 时,您可以展开高级设置,配置如下可调参数以满足更高性能或可靠性要求:

参数名称描述
缓存池大小设置用于处理写入请求的工作进程数量,提升高并发场景下的吞吐性能。 默认值:16
请求超期缓存中的请求在多长时间内有效(单位:秒)。若超时未被写入则视为失败。
健康检查间隔Sink 自动对磁盘写入状态进行健康检查的周期(单位:秒)。 默认值:15
缓冲队列最大长度每个工作进程可缓冲的最大数据量(字节)。根据消息大小和性能调优。 默认值:256
请求模式写入方式,可选同步异步。异步模式下不会阻塞 MQTT 发布,但存在消息未落盘即确认的风险。 默认值:异步
最大批量请求大小每次批量写入的最大消息数量。值越大吞吐性能越好。设置为 1 则逐条写入。 默认值:1000
请求飞行窗口异步请求最大并发数,控制 Sink 的并发写入能力。如需保障消息顺序,应设为 1。 默认值:100