文件 Sink
文件 Sink 将分析结果保存到指定文件中。指定的文件已存在,结果将会覆盖写入。数据源 - 文件是反向的连接器,可以读取文件 sink 的输出。
文件类型
文件 Sink 可以将数据写入不同类型的文件,例如:
- lines:这是默认类型。它写入由流定义中的格式参数解码的行分隔文件。例如,要写入行分隔的 JSON 字符串,请将文件类型设置为 lines,格式设置为 json。
- json:此类型写入标准 JSON 数组格式文件。
- csv:此类型写入逗号分隔的 csv 文件。您也可以使用自定义分隔符。要使用此文件类型,请将格式设置为 delimited。
如希望使用 File Sink 连接器,点击 数据处理 -> 规则 -> 新建规则,在 动作 区域,点击添加,Sink 选择 file。
Sink 配置
在规则页面,点击添加动作,进行如下设置:
- 文件路径:保存结果的文件路径,例如
/tmp/result.txt
。 - 文件类型:文件类型,支持 json, csv 或者 lines,其中默认值为 lines。
- 是否包含头文件:指定是否生成文件头。当前仅在文件类型为 csv 时生效。文件头由收到的第一条数据推断得来,推断的 key 采用字母排序。
- Rolling间隔:定义 rolling 策略的属性之一。滚动到新文件的最小时间间隔(以毫秒为单位)。检查频率由checkInterval 控制。
- 检查间隔:定义 rolling 策略的属性之一。检查基于时间的滚动策略的间隔(以毫秒为单位),用于控制检查文件是否应该翻转的频率。
- Rolling计数:定义 rolling 策略的属性之一。文件翻转前的最大消息计数。
- Rolling文件名模式:定义 rolling 策略的属性之一。指定滚动文件创建时如何放置时间戳。时间戳可为“前缀”,“后缀”或“无”。
- 是否忽略输出:默认为 False。
- 将结果数据按条发送:默认为 True。
- 流格式:默认为
json
。 - 数据模版:Golang 模板,用于指定输出数据格式。如不指定数据模板,则将数据作为原始输入。关于数据模版的详细介绍,见 数据模版
Rolling 策略
文件 Sink 支持配置滚动(Rolling)策略,以控制文件的大小和文件的数量。滚动策略由以下属性控制:rollingInterval、checkInterval、rollingCount 和 rollingNamePattern。
文件滚动可以基于时间或基于消息数或两者。
基于时间的滚动: rollingInterval 和 checkInterval 属性用来控制基于时间的滚动。rollingInterval 是滚动到一个新文件的最小时间间隔。checkInterval 是检查基于时间的滚动策略的时间间隔。这控制了检查一个文件是否应该滚动的频率。
例如,如果 checkInterval 是 1 小时,rollingInterval 是 1 天,那么文件 Sink 将在每小时检查每个打开的文件,如果文件打开超过 1 小时,文件将被滚动。所以实际的滚动间隔可能比rollingInterval 属性大。要使用基于时间的滚动,请将 rollingInterval 属性设置为正值,并将 rollingCount 设置为 0。
组合示例:rollingInterval=1天,checkInterval=1小时,rollingCount=0。
基于消息计数的滚动: rollingCount 属性用于控制基于消息数的滚动。文件 sink 将检查每个打开的文件的消息数,如果消息数大于 rollingCount,文件将滚动。要使用基于消息数的滚动,请将 rollingCount 属性设置为正值,并将 rollingInterval 设置为 0。
示例组合:rollingInterval=0, rollingCount=1000。
同时基于时间和消息数的滚动: 文件 sink 将同时检查每个打开的文件的时间和消息数,如果其中一个被满足,文件将被滚存。要同时使用基于时间和消息数的滚动,请将 rollingInterval 和 rollingCount 属性设置为正值。
组合示例:rollingInterval=1天,checkInterval=1小时,rollingCount=1000。
使用示例
下面是一个选择温度大于50度的示例,每5秒将结果保存到文件 /tmp/result.txt
中。
{
"sql": "SELECT * from demo where temperature>50",
"actions": [
{
"file": {
"path": "/tmp/result.txt",
"interval": 5000,
"fileType": "lines",
"format": "json"
}
}
]
}
下面是另一个例子,根据 paylaod 中的 device
字段,将结果写入多个文件。每个文件将每1小时或有超过 10000条信息时滚动一次。滚动文件名将有一个创建时间戳的前缀,如16998888_deviceName.csv
。
{
"sql": "SELECT * from demo where temperature>50",
"actions": [
{
"file": {
"path": "{{.device}}.csv",
"fileType": "csv",
"format": "delimited",
"hasHeader": true,
"delimiter": ",",
"rollingInterval": 3600000,
"checkInterval": 600000,
"rollingCount": 10000,
"rollingNamePattern": "prefix"
}
}
]
}