Skip to content

算法集成

NeuronEX 允许用户自定义算法集成,以支持更多功能。 用户可以通过便携插件系统编写扩展插件,也可以调用外部已有的 REST 服务。

便携插件

您可通过安装便携插件进一步拓展 NeuronEX 的功能。通过便捷插件可以实现自定义的数据源(Source)、规则中的自定义函数以及动作(Sink)

点击数据处理 -> 算法集成,在便携插件页签,点击创建便携插件。在弹出的窗口,进行如下设置:

  • 名称:选择或输入插件名称
  • 文件:通过上传或者文本框的形式贴入插件文件。

完成上述设置后,点击提交完成插件的创建。新建插件将出现在该页面的插件列表中,您可以在此查看或者删除插件。

便携插件开发

开发插件包括子模块和主程序两部分, Python SDK 提供了 python 语言的源,目标和函数 API。

源接口:

python
  class Source(object):
    """abstract class for eKuiper source plugin"""

    @abstractmethod
    def configure(self, datasource: str, conf: dict):
        """configure with the string datasource and conf map and raise error if any"""
        pass

    @abstractmethod
    def open(self, ctx: Context):
        """run continuously and send out the data or error with ctx"""
        pass

    @abstractmethod
    def close(self, ctx: Context):
        """stop running and clean up"""
        pass

目标接口:

python
class Sink(object):
    """abstract class for eKuiper sink plugin"""

    @abstractmethod
    def configure(self, conf: dict):
        """configure with conf map and raise error if any"""
        pass

    @abstractmethod
    def open(self, ctx: Context):
        """open connection and wait to receive data"""
        pass

    @abstractmethod
    def collect(self, ctx: Context, data: Any):
        """callback to deal with received data"""
        pass

    @abstractmethod
    def close(self, ctx: Context):
        """stop running and clean up"""
        pass

函数接口:

python
class Function(object):
    """abstract class for eKuiper function plugin"""

    @abstractmethod
    def validate(self, args: List[Any]):
        """callback to validate against ast args, return a string error or empty string"""
        pass

    @abstractmethod
    def exec(self, args: List[Any], ctx: Context) -> Any:
        """callback to do execution, return result"""
        pass

    @abstractmethod
    def is_aggregate(self):
        """callback to check if function is for aggregation, return bool"""
        pass

用户通过实现这些抽象接口来创建自己的源,目标和函数,然后在主函数中声明这些自定义插件的实例化方法

python
if __name__ == '__main__':
    c = PluginConfig("pysam", {"pyjson": lambda: PyJson()}, {"print": lambda: PrintSink()},
                     {"revert": lambda: revertIns})
    plugin.start(c)

便携插件开发示例

便携插件目前支持通过 PythonGolang 编程语言进行插件开发。具体的插件编写方法,请参考:

外部算法函数

外部算法函数,即外部服务,是指将外部已有的一个 HTTP 服务,映射为 NeuronEX 的一个 SQL 函数。在规则中使用外部函数时,会将数据源中的数据传递给外部服务,并将外部服务的计算结果返回到 NeuronEX 并做输出。

使用外部服务,只需以下 2 个 步骤:

创建一个外部服务

点击数据处理 -> 算法集成,在外部服务页签,点击添加外部服务

  • 名称:输入外部服务名称,该名称会在 SQL 中使用,请输入英文字符及数字;
  • 服务地址:外部服务的服务地址,只包括http://[ip]:[port]即可,服务后缀在 SQL 中填写;
  • Headers:可选,外部服务的 Headers 信息。

alt text

在规则中使用外部服务

在规则中输入如下内容:

alt text

ex_service("post","/api/test1", *) 表示,会通过 HTTP POST 方法发送数据流 neuronStream 中的所有数据到 HTTP Server地址 http://127.0.0.1:9876/api/test1 ,并将收到的外部服务的处理结果发送到 Sink 中。

外部算法函数示例

请查阅外部算法函数示例