规则流水线
我们可以通过将先前规则的结果导入后续规则来形成规则流水线。通过使用内存作为 数据源 和 动作(Sink),我们可以创建规则流水线。
使用示例
规则流水线将是隐式的。 每个规则都可以使用一个内存目标/源。 这意味着每个步骤将使用现有的 api 单独创建(示例如下所示)。
shell
#1 创建源流
{"sql" : "create stream demo () WITH (DATASOURCE=\"demo\", FORMAT=\"JSON\")"}
#2 创建规则和内存目标
{
"id": "rule1",
"sql": "SELECT * FROM demo WHERE isNull(temperature)=false",
"actions": [{
"log": {
},
"memory": {
"topic": "home/ch1/sensor1"
}
}]
}
#3 从内存主题创建一个流
{"sql" : "create stream sensor1 () WITH (DATASOURCE=\"home/+/sensor1\", FORMAT=\"JSON\", TYPE=\"memory\")"}
#4 从内存主题创建另一个要使用的规则
{
"id": "rule2-1",
"sql": "SELECT avg(temperature) FROM sensor1 GROUP BY CountWindow(10)",
"actions": [{
"log": {
},
"memory": {
"topic": "analytic/sensors"
}
}]
}
{
"id": "rule2-2",
"sql": "SELECT temperature + 273.15 as k FROM sensor1",
"actions": [{
"log": {
}
}]
}
通过使用内存主题作为桥梁,我们现在创建一个规则流水线:rule1->{rule2-1, rule2-2}
。 流水线可以是多对多的,而且非常灵活。
请注意,内存目标可以与其他目标一起使用,为一个规则创建多个规则动作。 并且内存源主题可以使用通配符订阅过滤后的主题列表。