Skip to content

流管理

流是 NeuronEX 中数据源接入的主要运行方式,用户可通过选择数据源类型及配置参数来定义如何连接到外部资源。数据流中有数据流入时,都会触发规则中的计算。

流数据源

目前内置支持以下流类型:

名称描述
Neuron从 NeuronEX 数采模块读取数据
MQTT从 MQTT 主题读取数据
HTTP pull从 HTTP 服务器中拉取数据
HTTP push通过 HTTP 推送数据到 NeuronEX
内存从 NeuronEX 内存主题读取数据以形成规则流水线
SQL从数据库中查询数据
文件从文件中读取数据
Video从视频流中查询数据
Simulator生成模拟数据供测试
CAN从 CAN 总线读取数据
Kafka从 Kafka 读取数据
WebSocket从 WebSocket 读取数据

创建流

在 NeuronEX 页面,点击数据处理 -> 源管理,在流管理页面,点击创建流按钮即可进行流的创建。

stream

流参数配置

  • 是否为带结构的流

    NeuronEX 支持带结构/无结构的流,默认为无结构。即在 源管理 -> 创建流 时,是否为带结构的流选项不打勾。详细说明请参考数据结构

    • 无结构的流

    Schemaless,用户无需定义任何形式的 schema,主要用于弱结构化数据流,或数据结构经常变化的情况。

    • 带结构的流

    用户在 数据源(Source) 层定义数据schema。适用于用户的数据有固定或大致固定的格式。

    提示

    部分数据格式本身带有数据结构,例如 protobuf 格式。用户在创建源时可以定义 流格式 来指向模式注册表 ( Schema Registry ) 中的数据结构定义。此时,数据源的数据结构将会被模式注册表中的定义覆盖。有关模式的详细介绍,查阅 模式 章节。

  • 流类型

    NeuronEX 支持多种流类型,具体可参考 流数据源

  • 数据源

    取决于不同的数据源类型;如果是 MQTT 源,则为 MQTT 数据源主题名;其它源请参考相关文档。

  • 配置组

    定义各类型数据源的相关配置项,具体可参考相关文档,每个数据源均提供了 default 配置组,可供参考。

  • 流格式

    用于定义传入的数据类型,支持 jsonprotobufbinarydelimitedcustom,默认为 json 。以下为其他流格式的介绍:

    • delimited

      对于 CSV 文件数据源,需选择 delimited 格式,还应指定分隔符来区分数据字段,如 ","

    • protobuf

      Protobuf 是一种序列化结构数据的方式,当流格式设置为 protobuf 时,还应配置解码时使用的模式。模式可在 数据处理 -> 配置 -> 模式中定义。有关模式的详细介绍,查阅 模式 章节。

    • Binary

      对于二进制数据流,例如图像或者视频流,需要指定数据格式为 binary

    • custom

      custom 是由用户自定义的数据格式 。

  • 共享 (SHARED)

默认情况下,SHARED 属性设置为 false 。如果用户希望多个规则共享同一个数据源实例,可以将 SHARED 属性设置为 true

  • 共享数据源为false时,每个规则会启动一个独立的数据源运行时,不同规则中的同名数据源完全隔离。
  • 共享数据源为true时,多个规则中共享同一个数据源实例。

提示

需要使用完全相同的输入数据或者提高性能,数据源的共享字段可定义true

提示

在某些场景中,用户需要不同的规则处理完全相同的数据流。例如,在处理传感器的温度数据时,用户可能需要一个规则,当一段时间的平均温度大于30度时触发警告;而另一个规则则是当一段时间的平均温度小于 0 度时触发警告。使用默认配置时,两个规则各自独立实例化源实例。由于网络延迟等原因,规则可能得到不同顺序,甚至各有缺失数据的数据流,从而在不同的数据维度中计算平均值。通过配置共享源实例,用户可以确保两个规则处理完全相同的数据。

数据结构

在创建数据源时,可以定义数据源的数据结构。NeuronEX 在运行时,会根据定义的结构进行数据验证和类型转换。数据结构验证默认不开启。如需开启,可在创建数据源时,勾选是否为带结构的流字段,并填入字段信息。

pysam

目前 NeuronEX 支持以下字段类型:

#数据类型说明
1bigint整数型。
2float浮点型。
3string文本值,由 Unicode 字符组成。
4datetime日期时间类型。
5boolean布尔类型,值可以是true 或者 false
6bytea用于存储二进制数据的字节数组。如果在格式为 "JSON" 的流中使用此类型,则传入的数据需要为 base64 编码的字符串。
7array数组类型可以是任何简单类型,数组类型或结构类型。
8struct复杂类型。

提示

数据源接入的数据如果与定义的数据schema不一致,将会丢弃消息,不会进行规则处理。

示例

示例1

该 testStream 流将订阅 MQTT 主题topic1,MQTT Broker地址为tcp://broker.emqx.io:1883

stream

示例2

该 testStream 流将订阅 MQTT 主题topic/a,MQTT Broker地址为tcp://broker.emqx.io:1883。 该流预先定义了数据结构,包含 USERIDFIRST_NAMELAST_NAMENICKNAMESGenderADDRESS 字段。

stream

示例3

该 testStream 流将订阅 MQTT 主题 test/,MQTT Broker地址为tcp://broker.emqx.io:1883。 该流接收到protobuf格式的数据,并根据在 NeuronEX 中预先添加的模式文件protobuf1,对流入的数据进行解码。模式的管理详见 模式

stream