# MQTT源

eKuiper 为 MQTT 源流提供了内置支持,流可以订阅来自 MQTT 代理的消息并输入eKuiper 处理管道。 MQTT 源的配置文件位于 $ekuiper/etc/mqtt_source.yaml。 以下是文件格式。

#全局MQTT配置
default:
  qos: 1
  servers: [tcp://127.0.0.1:1883]
  #username: user1
  #password: password
  #certificationPath: /var/kuiper/xyz-certificate.pem
  #privateKeyPath: /var/kuiper/xyz-private.pem.key
  kubeedgeVersion: "1.0"
  kubeedgeModelFile: "mqtt_model.json"


#重载全局配置
demo: #Conf_key
  qos: 0
  servers: [tcp://10.211.55.6:1883]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# 全局 MQTT 配置

用户可以在此处指定全局 MQTT 设置。default 部分中指定的配置项将用作所有MQTT 连接的默认设置。

# qos

默认订阅QoS级别。

# servers

MQTT 消息代理的服务器列表。 当前,只能指定一个服务器。

# username

MQTT 连接用户名。

# password

MQTT 连接密码。

# protocolVersion

MQTT 协议版本。3.1 (也被称为 MQTT 3) 或者 3.1.1 (也被称为 MQTT 4)。 如果未指定,缺省值为 3.1。

# clientid

MQTT 连接的客户端 ID。 如果未指定,将使用一个 uuid。

# certificationPath

证书路径。可以为绝对路径,也可以为相对路径。如果指定的是相对路径,那么父目录为执行 kuiperd 命令的路径。比如,如果你在 /var/kuiper 中运行 bin/kuiperd ,那么父目录为 /var/kuiper; 如果运行从/var/kuiper/bin中运行./kuiperd,那么父目录为 /var/kuiper/bin。 比如 d3807d9fa5-certificate.pem

# privateKeyPath

私钥路径。可以为绝对路径,也可以为相对路径。更详细的信息,请参考 certificationPath,比如 d3807d9fa5-private.pem.key

# rootCaPath

根证书路径。可以为绝对路径,也可以为相对路径。

# insecureSkipVerify

如果 InsecureSkipVerify 设置为 true, TLS接受服务器提供的任何证书以及该证书中的任何主机名。 在这种模式下,TLS容易受到中间人攻击。默认值为false。配置项只能用于TLS连接

# connectionSelector

重用 MQTT 源连接。连接配置信息位于 connections/connection.yaml.

mqtt:
  localConnection: #connection key
    servers: [tcp://127.0.0.1:1883]
    username: ekuiper
    password: password
    #certificationPath: /var/kuiper/xyz-certificate.pem
    #privateKeyPath: /var/kuiper/xyz-private.pem.ke
    #insecureSkipVerify: false
    #protocolVersion: 3
    clientid: ekuiper
  cloudConnection: #connection key
    servers: ["tcp://broker.emqx.io:1883"]
    username: user1
    password: password
    #certificationPath: /var/kuiper/xyz-certificate.pem
    #privateKeyPath: /var/kuiper/xyz-private.pem.ke
    #insecureSkipVerify: false
    #protocolVersion: 3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

对于 MQTT 连接,这里有两个配置组。用户应该使用 mqtt.localConnection 或者 mqtt.cloudConnection 来作为参数。举例如下:

#Global MQTT configurations
default:
  qos: 1
  servers: [tcp://127.0.0.1:1883]
  #username: user1
  #password: password
  #certificationPath: /var/kuiper/xyz-certificate.pem
  #privateKeyPath: /var/kuiper/xyz-private.pem.key
  connectionSelector: mqtt.localConnection
1
2
3
4
5
6
7
8
9

注意: 相应配置组一旦指定 connectionSelector 参数,所有关于连接的参数都会被忽略. 上面例子中,servers: [tcp://127.0.0.1:1883] 会被忽略。

# bufferLength

指定最大缓存消息数目。该参数主要用于防止内存溢出。实际内存用量会根据当前缓存消息数目动态变化。增大该参数不会增加初始内存分配量,因此设置较大的数值是安全的。该参数默认值为102400;如果每条消息为100字节,则默认情况下,缓存最大占用内存量为102400 * 100B ~= 10MB.

# kubeedgeVersion

kubeedge 版本号,不同的版本号对应的文件内容不同。

# kubeedgeModelFile

kubeedge 模版文件名,文件指定放在 etc/sources 文件夹中,样例格式如下:

{
	"deviceModels": [{
		"name": "device1",
		"properties": [{
			"name": "temperature",
			"dataType": "int"
		}, {
			"name": "temperature-enable",
			"dataType": "string"
		}]
	}]
}
1
2
3
4
5
6
7
8
9
10
11
12

# deviceModels.name

设备名称,与订阅主题中的字段匹配,位于第三和第四个“/”之间的内容。例如:$ke/events/device/device1/data/update

# properties.name

字段名称

# properties.dataType

期望的字段类型

# 重载默认设置

如果您有一个特定连接需要重载默认设置,则可以创建一个自定义模块。 在上一个示例中,我们创建一个名为 demo 的特定设置。 然后,您可以在创建流定义时使用选项 CONF_KEY 指定配置(有关更多信息,请参见 stream specs )。

示例

demo (
		...
	) WITH (DATASOURCE="test/", FORMAT="JSON", KEY="USERID", CONF_KEY="demo");
1
2
3

这些特定设置使用的配置键与 default 设置中的配置键相同,在特定设置中指定的任何值都将覆盖 default 部分中的值。

# 在多个配置块中引用同一个 connectionSelector

如下所示,用户创建了两个配置项

#Override the global configurations
demo_conf: #Conf_key
  qos: 0
  connectionSelector: mqtt.localConnection 
  servers: [tcp://10.211.55.6:1883, tcp://127.0.0.1]

#Override the global configurations
demo2_conf: #Conf_key
  qos: 0
  connentionSelector: mqtt.localConnection
  servers: [tcp://10.211.55.6:1883, tcp://127.0.0.1]
1
2
3
4
5
6
7
8
9
10
11

基于以上配置,创建了两个数据流

demo (
		...
	) WITH (DATASOURCE="test/", FORMAT="JSON", CONF_KEY="demo_conf");

demo2 (
		...
	) WITH (DATASOURCE="test2/", FORMAT="JSON", CONF_KEY="demo2_conf");

1
2
3
4
5
6
7
8

当相应的规则分别引用以上数据流时,规则之间的源部分将共享连接。 在这里 DATASOURCE 对应 mqtt 订阅的 topic, 配置项中的 qos 将用作订阅时的 Qos. 在这个例子中,demo 以 Qos 0 订阅 topic test/,demo2 以 Qos 0 订阅 topic test2/ 值得注意的是如果两个规则订阅的 topic 完全一样而 Qos 不同,那么只会订阅一次并以首先启动的规则订阅为准。