将 MQTT 数据写入 Disk Log
Disk Log 数据集成功能允许 EMQX 以 JSON Lines 格式将事件数据持久化写入本地磁盘,类似于传统的轮转日志(rotating log)机制。这种方式支持长期事件留存,便于故障排查和历史数据分析。
本文将详细介绍 EMQX 与 Disk Log 的数据集成方式,并提供规则与 Sink 的配置指导。
工作原理
EMQX 提供内置的系统日志功能,用于记录错误、警告等运行时事件,而 Disk Log 集成则面向不同的目标:它用于将 MQTT 消息数据和客户端事件实际写入磁盘,以实现存储和离线处理。
Disk Log 集成基于 EMQX 的规则引擎和 Sink 机制实现,用户可以灵活定义捕获哪些数据,以及如何进行存储:
- 使用规则对 MQTT 消息或客户端事件进行过滤、转换和提取。
- 将 Disk Log Sink 添加到规则,指定数据的存储路径与格式,Sink 会将格式化后的 JSON 数据传递给连接器。
- Disk Log 连接器负责将数据实际写入文件系统,并控制日志路径、文件轮转策略等配置。
- 当规则被触发并产生匹配数据后,Sink 会调用所配置的连接器,将数据以 JSON Lines 格式写入本地指定目录。每条消息以一行 JSON 对象存储,便于标准工具或下游系统处理。
日志轮转机制
为控制磁盘空间占用,Disk Log 支持基于两种阈值的文件轮转策略:
- 文件大小限制:当当前日志文件达到配置的最大大小后,EMQX 会创建新文件继续写入。
- 文件数量限制:当日志文件数量超过设置上限时,最旧的文件将被截断并重用。
- 数据完整性保证:每个日志文件至少包含一条完整的消息记录,即使该条记录超过了文件大小限制。
这种机制可确保数据持续、安全地写入本地磁盘,便于后续分析与集成。
特性与优势
Disk Log 提供一种轻量、本地优先的 MQTT 消息持久化方式,适用于多种场景。其主要优势包括:
- 精细化数据控制:通过 SQL 规则捕获所需的消息或事件;支持在写入前对数据进行过滤、转换与丰富。
- 结构化输出格式:采用 JSON Lines 格式存储,每行一条 JSON 记录,便于使用命令行工具或导入分析系统。
- 轻量独立部署:无需依赖外部数据库或云存储。
- 增强可观测性与调试能力:支持消息级别追踪,适用于审计、测试与故障排查。它记录的是业务数据,作为系统日志的有力补充。
准备工作
在配置 Disk Log Sink 之前,请完成以下准备工作:
前置条件
创建日志目录
在部署 EMQX 的主机上创建一个用于存储日志文件的目录。确保运行 EMQX 的系统用户对该目录具有读写权限。
创建连接器
在创建 Disk Log Sink 之前,需先创建对应的连接器。
- 登录 Dashboard,点击顶部菜单的集成 -> 连接器。
- 点击右上角的创建。
- 选择 Disk Log 作为连接器类型,点击 下一步。
- 输入连接器名称(如
my-disk-log
),支持字母和数字组合。 - 填写连接器参数:
- 日志文件路径:日志写入的目录路径。
- 最大文件大小:单个日志文件的最大大小,超过后触发轮转。注意:每个日志文件至少会写入一条记录,因此如果单条日志记录超过此值,最终文件大小可能会超过此最大值。
- 最大文件数量:最多保留的日志文件数,超过后从最旧的文件开始覆盖。
- 可点击测试连接验证 EMQX 是否具备向该目录写入的权限。
- 点击创建完成连接器配置。
创建 Disk Log Sink 规则
以下步骤展示如何创建一条规则,将来自主题 t/#
的 MQTT 消息写入本地日志文件。
进入 Dashboard 的集成 -> 规则页面。
点击右上角的创建。
设置规则 ID 为
my_rule
,并在 SQL 编辑器中输入以下规则语句:sqlSELECT * FROM "t/#"
TIP
如不熟悉 SQL,可点击 SQL 示例或启用调试测试输出结果。
添加动作,选择 Disk Log 作为动作类型,可选择创建新 Sink 或使用已有 Sink。此处以创建新 Sink 为例。
输入 Sink 名称和描述。
从下拉列表中选择之前创建的连接器
my-disk-log
,也可点击旁边的按钮快速新建。选择写入模式(
异步
或同步
)。配置消息模板,内容必须能渲染为合法 JSON 对象。
备选动作(可选):如果您希望在消息投递失败时提升系统的可靠性,可以为 Sink 配置一个或多个备选动作。当 Sink 无法成功处理消息时,这些备选动作将被触发。更多信息请参见:备选动作。
如需高级配置,可展开高级设置选项。
保持其他配置默认,点击创建完成 Sink 配置。页面将返回规则配置,新的 Sink 会自动添加为动作。
最后点击创建,完成整条规则的创建。
规则创建完成后,您可以在规则页面看到新建规则,并在 动作(Sink) 标签页中查看对应的 Sink。
此外,还可前往集成 -> Flow 设计器,可视化查看从主题 t/#
到磁盘写入的完整数据流路径。
测试规则
以下示例演示如何使用 MQTTX 发布测试消息,并验证是否成功写入日志文件。
执行以下命令向主题 t/1
发送测试消息:
mqttx pub -i emqx_c -t t/1 -m '{ "msg": "Hello Disk Log" }'
多次发送消息后,检查配置的日志目录中最新修改的文件,确认其中包含所发布的消息内容。
高级设置
在创建或编辑 Disk Log Sink 时,您可以展开高级设置,配置如下可调参数以满足更高性能或可靠性要求:
参数名称 | 描述 |
---|---|
缓存池大小 | 设置用于处理写入请求的工作进程数量,提升高并发场景下的吞吐性能。 默认值:16 |
请求超期 | 缓存中的请求在多长时间内有效(单位:秒)。若超时未被写入则视为失败。 |
健康检查间隔 | Sink 自动对磁盘写入状态进行健康检查的周期(单位:秒)。 默认值:15 |
缓冲队列最大长度 | 每个工作进程可缓冲的最大数据量(字节)。根据消息大小和性能调优。 默认值:256 |
请求模式 | 写入方式,可选同步 或异步 。异步模式下不会阻塞 MQTT 发布,但存在消息未落盘即确认的风险。 默认值:异步 |
最大批量请求大小 | 每次批量写入的最大消息数量。值越大吞吐性能越好。设置为 1 则逐条写入。 默认值:1000 |
请求飞行窗口 | 异步请求最大并发数,控制 Sink 的并发写入能力。如需保障消息顺序,应设为 1 。 默认值:100 |