算法集成
NeuronEX 允许用户自定义算法集成,以支持更多功能。 用户可以通过便携插件系统编写扩展插件,也可以调用外部已有的 REST 服务。
便携插件
您可通过安装便携插件进一步拓展 NeuronEX 的功能。通过便捷插件可以实现自定义的数据源(Source)、规则中的自定义函数以及动作(Sink)。
点击数据处理 -> 算法集成,在便携插件页签,点击创建便携插件。在弹出的窗口,进行如下设置:
- 名称:选择或输入插件名称
- 文件:通过上传或者文本框的形式贴入插件文件。
完成上述设置后,点击提交完成插件的创建。新建插件将出现在该页面的插件列表中,您可以在此查看或者删除插件。
便携插件开发
开发插件包括子模块和主程序两部分, Python SDK 提供了 python 语言的源,目标和函数 API。
源接口:
class Source(object):
"""abstract class for eKuiper source plugin"""
@abstractmethod
def configure(self, datasource: str, conf: dict):
"""configure with the string datasource and conf map and raise error if any"""
pass
@abstractmethod
def open(self, ctx: Context):
"""run continuously and send out the data or error with ctx"""
pass
@abstractmethod
def close(self, ctx: Context):
"""stop running and clean up"""
pass
目标接口:
class Sink(object):
"""abstract class for eKuiper sink plugin"""
@abstractmethod
def configure(self, conf: dict):
"""configure with conf map and raise error if any"""
pass
@abstractmethod
def open(self, ctx: Context):
"""open connection and wait to receive data"""
pass
@abstractmethod
def collect(self, ctx: Context, data: Any):
"""callback to deal with received data"""
pass
@abstractmethod
def close(self, ctx: Context):
"""stop running and clean up"""
pass
函数接口:
class Function(object):
"""abstract class for eKuiper function plugin"""
@abstractmethod
def validate(self, args: List[Any]):
"""callback to validate against ast args, return a string error or empty string"""
pass
@abstractmethod
def exec(self, args: List[Any], ctx: Context) -> Any:
"""callback to do execution, return result"""
pass
@abstractmethod
def is_aggregate(self):
"""callback to check if function is for aggregation, return bool"""
pass
用户通过实现这些抽象接口来创建自己的源,目标和函数,然后在主函数中声明这些自定义插件的实例化方法
if __name__ == '__main__':
c = PluginConfig("pysam", {"pyjson": lambda: PyJson()}, {"print": lambda: PrintSink()},
{"revert": lambda: revertIns})
plugin.start(c)
便携插件开发示例
便携插件目前支持通过 Python 和 Golang 编程语言进行插件开发。具体的插件编写方法,请参考:
外部算法函数
外部算法函数,即外部服务,是指将外部已有的一个 HTTP 服务,映射为 NeuronEX 的一个 SQL 函数。在规则中使用外部函数时,会将数据源中的数据传递给外部服务,并将外部服务的计算结果返回到 NeuronEX 并做输出。
使用外部服务,只需以下 2 个 步骤:
创建一个外部服务
点击数据处理 -> 算法集成,在外部服务页签,点击添加外部服务。
- 名称:输入外部服务名称,该名称会在 SQL 中使用,请输入英文字符及数字;
- 服务地址:外部服务的服务地址,只包括
http://[ip]:[port]
即可,服务后缀在 SQL 中填写; - Headers:可选,外部服务的 Headers 信息。
在规则中使用外部服务
在规则中输入如下内容:
ex_service("post","/api/test1", *)
表示,会通过 HTTP POST 方法发送数据流 neuronStream 中的所有数据到 HTTP Server地址 http://127.0.0.1:9876/api/test1
,并将收到的外部服务的处理结果发送到 Sink 中。
外部算法函数示例
请查阅外部算法函数示例。