# Portable 插件 Go SDK

用户可利用 GO SDK 来开发 portable 插件,这个 SDK 提供了类似原生插件的 API,另外它提供了启动函数,用户只需填充插件信息即可。

# 插件开发

# Symbols

由于 portable 插件 GO SDK 提供了类似原生插件的API,用户做简单的修改即可复用以前编写的原生插件

用户只需依赖 github.com/lf-edge/ekuiper/sdk/go 而不是 eKuiper 主项目即可编写 portable 插件,用户需要实现 github.com/lf-edge/ekuiper/sdk/go/api 中的相应接口即可

对于源,实现跟原生源插件中一样的接口即可

type Source interface {
	// Open Should be sync function for normal case. The container will run it in go func
	Open(ctx StreamContext, consumer chan<- SourceTuple, errCh chan<- error)
	// Configure Called during initialization. Configure the source with the data source(e.g. topic for mqtt) and the properties read from the yaml
	Configure(datasource string, props map[string]interface{}) error
	Closable
}
1
2
3
4
5
6
7

对于目标,实现跟原生目标插件中一样的接口即可

type Sink interface {
	//Should be sync function for normal case. The container will run it in go func
	Open(ctx StreamContext) error
	//Called during initialization. Configure the sink with the properties from rule action definition
	Configure(props map[string]interface{}) error
	//Called when each row of data has transferred to this sink
	Collect(ctx StreamContext, data interface{}) error
	Closable
}
1
2
3
4
5
6
7
8
9

对于函数,实现跟原生函数插件中一样的接口即可

type Function interface {
	//The argument is a list of xsql.Expr
	Validate(args []interface{}) error
	//Execute the function, return the result and if execution is successful.
	//If execution fails, return the error and false.
	Exec(args []interface{}, ctx FunctionContext) (interface{}, bool)
	//If this function is an aggregate function. Each parameter of an aggregate function will be a slice
	IsAggregate() bool
}
1
2
3
4
5
6
7
8
9

# 插件主程序

由于 portable 插件是一个独立的程序,需要编写成一个可执行程序。在 GO SDK 中, 提供了启动函数,用户只需填充插件信息即可。启动函数如下:

package main

import (
	"github.com/lf-edge/ekuiper/sdk/go/api"
	sdk "github.com/lf-edge/ekuiper/sdk/go/runtime"
	"os"
)

func main() {
	sdk.Start(os.Args, &sdk.PluginConfig{
		Name: "mirror",
		Sources: map[string]sdk.NewSourceFunc{
			"random": func() api.Source {
				return &randomSource{}
			},
		},
		Functions: map[string]sdk.NewFunctionFunc{
			"echo": func() api.Function {
				return &echo{}
			},
		},
		Sinks: map[string]sdk.NewSinkFunc{
			"file": func() api.Sink {
				return &fileSink{}
			},
		},
	})
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

在主函数中调用了 sdk.Start来启动插件进程。在参数中,PluginConfig 定义了插件名字,源,目标,函数构造函数。注意这些信息必须跟插件安装包中的 json 描述文件一致

完整例子请参考这个例子 (opens new window)

# 打包发布

我们需要将可执行文件和 json 描述文件一起打包,使用 GO SDK,仅仅需要 go build编译出可执行文件即可。由于在不同操作系统下编译出到的可执行文件名字有所不同,需要确保 json 描述文件中可执行文件名字的准确性。详细信息,请参考