Skip to content

Rule pipeline

We can form a rule pipeline by importing the results of previous rules into subsequent rules. By using memory as data source and action (Sink), we can create a rule pipeline.

Usage example

The rules pipeline will be implicit. Each rule can use a memory target/source. This means each step will be created individually using the existing api (example shown below).

shell
#1 Create a source stream
{"sql" : "create stream demo () WITH (DATASOURCE=\"demo\", FORMAT=\"JSON\")"}

#2 Create rules and memory targets
{
   "id": "rule1",
   "sql": "SELECT * FROM demo WHERE isNull(temperature)=false",
   "actions": [{
     "log": {
     },
     "memory": {
       "topic": "home/ch1/sensor1"
     }
   }]
}

#3 Create a stream from the memory topic
{"sql" : "create stream sensor1 () WITH (DATASOURCE=\"home/+/sensor1\", FORMAT=\"JSON\", TYPE=\"memory\")"}

#4 Create another rule to use from the memory topic
{
   "id": "rule2-1",
   "sql": "SELECT avg(temperature) FROM sensor1 GROUP BY CountWindow(10)",
   "actions": [{
     "log": {
     },
     "memory": {
       "topic": "analytic/sensors"
     }
   }]
}

{
   "id": "rule2-2",
   "sql": "SELECT temperature + 273.15 as k FROM sensor1",
   "actions": [{
     "log": {
     }
   }]
}

By using the memory topic as a bridge, we now create a rule pipeline: rule1->{rule2-1, rule2-2}. Pipelines can be many-to-many and are very flexible.

Note that memory targets can be used with other targets to create multiple rule actions for a rule. And memory source topics can use wildcards to subscribe to a filtered list of topics.