流管理
流是 NeuronEX 中数据源接入的主要运行方式,用户可通过选择数据源类型及配置参数来定义如何连接到外部资源。数据流中有数据流入时,都会触发规则中的计算。
流数据源
目前内置支持以下流类型:
名称 | 描述 |
---|---|
Neuron | 从 NeuronEX 数采模块读取数据 |
MQTT | 从 MQTT 主题读取数据 |
HTTP pull | 从 HTTP 服务器中拉取数据 |
HTTP push | 通过 HTTP 推送数据到 NeuronEX |
内存 | 从 NeuronEX 内存主题读取数据以形成规则流水线 |
SQL | 从数据库中查询数据 |
文件 | 从文件中读取数据 |
Video | 从视频流中查询数据 |
Simulator | 生成模拟数据供测试 |
创建流
在 NeuronEX 页面,点击数据处理 -> 源管理,在流管理页面,点击创建流按钮即可进行流的创建。
流参数配置
是否为带结构的流
NeuronEX 支持带结构/无结构的流,默认为无结构。即在 源管理 -> 创建流 时,
是否为带结构的流
选项不打勾。详细说明请参考数据结构。- 无结构的流
Schemaless,用户无需定义任何形式的 schema,主要用于弱结构化数据流,或数据结构经常变化的情况。
- 带结构的流
用户在 数据源(Source) 层定义数据schema。适用于用户的数据有固定或大致固定的格式。
提示
部分数据格式本身带有数据结构,例如
protobuf
格式。用户在创建源时可以定义流格式
来指向模式注册表 ( Schema Registry ) 中的数据结构定义。此时,数据源的数据结构将会被模式注册表中的定义覆盖。有关模式的详细介绍,查阅 模式 章节。流类型
NeuronEX 支持多种流类型,具体可参考 流数据源。
数据源
取决于不同的数据源类型;如果是 MQTT 源,则为 MQTT 数据源主题名;其它源请参考相关文档。
配置组
定义各类型数据源的相关配置项,具体可参考相关文档,每个数据源均提供了
default
配置组,可供参考。流格式
用于定义传入的数据类型,支持
json
、protobuf
、binary
、delimited
和custom
,默认为json
。以下为其他流格式的介绍:delimited
对于 CSV 文件数据源,需选择
delimited
格式,还应指定分隔符来区分数据字段,如 ",
"protobuf
Protobuf 是一种序列化结构数据的方式,当流格式设置为
protobuf
时,还应配置解码时使用的模式。模式可在 数据处理 -> 配置 -> 模式中定义。有关模式的详细介绍,查阅 模式 章节。Binary
对于二进制数据流,例如图像或者视频流,需要指定数据格式为
binary
。custom
custom
是由用户自定义的数据格式 。
- 共享 (SHARED)
默认情况下,SHARED
属性设置为 false
。如果用户希望多个规则共享同一个数据源实例,可以将 SHARED
属性设置为 true
。
- 共享数据源为
false
时,每个规则会启动一个独立的数据源运行时,不同规则中的同名数据源完全隔离。 - 共享数据源为
true
时,多个规则中共享同一个数据源实例。
提示
需要使用完全相同的输入数据或者提高性能,数据源的共享
字段可定义true
。
提示
在某些场景中,用户需要不同的规则处理完全相同的数据流。例如,在处理传感器的温度数据时,用户可能需要一个规则,当一段时间的平均温度大于30度时触发警告;而另一个规则则是当一段时间的平均温度小于 0 度时触发警告。使用默认配置时,两个规则各自独立实例化源实例。由于网络延迟等原因,规则可能得到不同顺序,甚至各有缺失数据的数据流,从而在不同的数据维度中计算平均值。通过配置共享源实例,用户可以确保两个规则处理完全相同的数据。
数据结构
在创建数据源时,可以定义数据源的数据结构。NeuronEX 在运行时,会根据定义的结构进行数据验证和类型转换。数据结构验证默认不开启。如需开启,可在创建数据源时,勾选是否为带结构的流
字段,并填入字段信息。
目前 NeuronEX 支持以下字段类型:
# | 数据类型 | 说明 |
---|---|---|
1 | bigint | 整数型。 |
2 | float | 浮点型。 |
3 | string | 文本值,由 Unicode 字符组成。 |
4 | datetime | 日期时间类型。 |
5 | boolean | 布尔类型,值可以是true 或者 false 。 |
6 | bytea | 用于存储二进制数据的字节数组。如果在格式为 "JSON" 的流中使用此类型,则传入的数据需要为 base64 编码的字符串。 |
7 | array | 数组类型可以是任何简单类型,数组类型或结构类型。 |
8 | struct | 复杂类型。 |
提示
数据源接入的数据如果与定义的数据schema不一致,将会丢弃消息,不会进行规则处理。
示例
示例1
该 testStream 流将订阅 MQTT 主题topic1
,MQTT Broker地址为tcp://broker.emqx.io:1883
。
示例2
该 testStream 流将订阅 MQTT 主题topic/a
,MQTT Broker地址为tcp://broker.emqx.io:1883
。 该流预先定义了数据结构,包含 USERID
、FIRST_NAME
、LAST_NAME
、NICKNAMES
、Gender
和 ADDRESS
字段。
示例3
该 testStream 流将订阅 MQTT 主题 test/
,MQTT Broker地址为tcp://broker.emqx.io:1883
。 该流接收到protobuf格式的数据,并根据在 NeuronEX 中预先添加的模式文件protobuf1
,对流入的数据进行解码。模式的管理详见 模式。