extension
NeuronEX allows users to customize extensions to support more functions. Users can write extension plugins through the portable plugin system, or call existing external REST services.
Portable plugin extension
You can further expand the functionality of NeuronEX by installing portable plugins. Customized data sources, custom functions and actions (Sink) in rules can be implemented through convenient plugins.
Click Data Processing -> Extensions, on the Portable Plugins tab, click Create Portable Plugins. In the pop-up window, make the following settings:
- Name: Select or enter the plugin name
- File: Paste the plugin file by uploading or text box.
After completing the above settings, click Submit to complete the creation of the plugin. The new plugin will appear in the plugin list on this page, and you can view or delete the plugin here.
Portable plugin development
The development plugin consists of sub-modules and main programs. The Python SDK provides the source, target and function API of the python language.
Source interface:
class Source(object):
"""abstract class for eKuiper source plugin"""
@abstractmethod
def configure(self, datasource: str, conf: dict):
"""configure with the string datasource and conf map and raise error if any"""
pass
@abstractmethod
def open(self, ctx: Context):
"""run continuously and send out the data or error with ctx"""
pass
@abstractmethod
def close(self, ctx: Context):
"""stop running and clean up"""
pass
Sink interface:
class Sink(object):
"""abstract class for eKuiper sink plugin"""
@abstractmethod
def configure(self, conf: dict):
"""configure with conf map and raise error if any"""
pass
@abstractmethod
def open(self, ctx: Context):
"""open connection and wait to receive data"""
pass
@abstractmethod
def collect(self, ctx: Context, data: Any):
"""callback to deal with received data"""
pass
@abstractmethod
def close(self, ctx: Context):
"""stop running and clean up"""
pass
Function interface:
class Function(object):
"""abstract class for eKuiper function plugin"""
@abstractmethod
def validate(self, args: List[Any]):
"""callback to validate against ast args, return a string error or empty string"""
pass
@abstractmethod
def exec(self, args: List[Any], ctx: Context) -> Any:
"""callback to do execution, return result"""
pass
@abstractmethod
def is_aggregate(self):
"""callback to check if function is for aggregation, return bool"""
pass
Users create their own sources, Sinks and functions by implementing these abstract interfaces, and then declare the instantiation methods of these custom plugins in the main function
if __name__ == '__main__':
c = PluginConfig("pysam", {"pyjson": lambda: PyJson()}, {"print": lambda: PrintSink()},
{"revert": lambda: revertIns})
plugin.start(c)
Portable plugin development example
Portable plugin extensions currently support plugin extensions via the Python and Golang programming languages. For specific plugin writing methods, please refer to:
External algorithm function
The external algorithm function, also called the external service, refers to mapping an existing external HTTP service to a SQL function of NeuronEX. When using external service in rules, the data from source will be passed to the external service, and the calculation result of the external service will be returned to NeuronEX for output.
To use an external service, just follow these 2 steps:
Create an external service
Click Data Processing -> Extensions, on the External Service tab, click Add External Service.
- Name: Enter the external service name, which will be used in SQL. Please enter characters and numbers;
- Address: The service address of the external service, only includes
http://[ip]:[port]
, and the service suffix is filled in SQL; - Headers: Optional, headers information of external services.
Using external service in rules
Enter the following in the rule:
ex_service("post","/api/test1", *)
means that all data from neuronStream
will be sent to the HTTP Server address http://127.0.0.1:9876/api/test1
through the HTTP POST method, and the processing result of external service will be sent to Sink.
Example of external algorithm function
Please refer to External Algorithm Function Example.