Skip to content

流管理

流是 NeuronEX 中数据源接入的主要运行方式,用户可通过选择数据源类型及配置参数来定义如何连接到外部资源。数据流中有数据流入时,都会触发规则中的计算。

流数据源

目前内置支持以下流类型:

名称描述
Neuron从 NeuronEX 数采模块读取数据
MQTT从 MQTT 主题读取数据
HTTP pull从 HTTP 服务器中拉取数据
HTTP push通过 HTTP 推送数据到 NeuronEX
内存从 NeuronEX 内存主题读取数据以形成规则流水线
SQL从数据库中查询数据
文件从文件中读取数据
Video从视频流中查询数据
Simulator生成模拟数据供测试

创建流

在 NeuronEX 页面,点击数据处理 -> 源管理,在流管理页面,点击创建流按钮即可进行流的创建。

stream

流参数配置

  • 是否为带结构的流

    NeuronEX 支持带结构/无结构的流,默认为无结构。即在 源管理 -> 创建流 时,是否为带结构的流选项不打勾。详细说明请参考数据结构

    • 无结构的流

    Schemaless,用户无需定义任何形式的 schema,主要用于弱结构化数据流,或数据结构经常变化的情况。

    • 带结构的流

    用户在 数据源(Source) 层定义数据schema。适用于用户的数据有固定或大致固定的格式。

    提示

    部分数据格式本身带有数据结构,例如 protobuf 格式。用户在创建源时可以定义 流格式 来指向模式注册表 ( Schema Registry ) 中的数据结构定义。此时,数据源的数据结构将会被模式注册表中的定义覆盖。有关模式的详细介绍,查阅 模式 章节。

  • 流类型

    NeuronEX 支持多种流类型,具体可参考 流数据源

  • 数据源

    取决于不同的数据源类型;如果是 MQTT 源,则为 MQTT 数据源主题名;其它源请参考相关文档。

  • 配置组

    定义各类型数据源的相关配置项,具体可参考相关文档,每个数据源均提供了 default 配置组,可供参考。

  • 流格式

    用于定义传入的数据类型,支持 jsonprotobufbinarydelimitedcustom,默认为 json 。以下为其他流格式的介绍:

    • delimited

      对于 CSV 文件数据源,需选择 delimited 格式,还应指定分隔符来区分数据字段,如 ","

    • protobuf

      Protobuf 是一种序列化结构数据的方式,当流格式设置为 protobuf 时,还应配置解码时使用的模式。模式可在 数据处理 -> 配置 -> 模式中定义。有关模式的详细介绍,查阅 模式 章节。

    • Binary

      对于二进制数据流,例如图像或者视频流,需要指定数据格式为 binary

    • custom

      custom 是由用户自定义的数据格式 。

  • 共享 (SHARED)

默认情况下,SHARED 属性设置为 false 。如果用户希望多个规则共享同一个数据源实例,可以将 SHARED 属性设置为 true

  • 共享数据源为false时,每个规则会启动一个独立的数据源运行时,不同规则中的同名数据源完全隔离。
  • 共享数据源为true时,多个规则中共享同一个数据源实例。

提示

需要使用完全相同的输入数据或者提高性能,数据源的共享字段可定义true

提示

在某些场景中,用户需要不同的规则处理完全相同的数据流。例如,在处理传感器的温度数据时,用户可能需要一个规则,当一段时间的平均温度大于30度时触发警告;而另一个规则则是当一段时间的平均温度小于 0 度时触发警告。使用默认配置时,两个规则各自独立实例化源实例。由于网络延迟等原因,规则可能得到不同顺序,甚至各有缺失数据的数据流,从而在不同的数据维度中计算平均值。通过配置共享源实例,用户可以确保两个规则处理完全相同的数据。

数据结构

在创建数据源时,可以定义数据源的数据结构。NeuronEX 在运行时,会根据定义的结构进行数据验证和类型转换。数据结构验证默认不开启。如需开启,可在创建数据源时,勾选是否为带结构的流字段,并填入字段信息。

pysam

目前 NeuronEX 支持以下字段类型:

#数据类型说明
1bigint整数型。
2float浮点型。
3string文本值,由 Unicode 字符组成。
4datetime日期时间类型。
5boolean布尔类型,值可以是true 或者 false
6bytea用于存储二进制数据的字节数组。如果在格式为 "JSON" 的流中使用此类型,则传入的数据需要为 base64 编码的字符串。
7array数组类型可以是任何简单类型,数组类型或结构类型。
8struct复杂类型。

提示

数据源接入的数据如果与定义的数据schema不一致,将会丢弃消息,不会进行规则处理。

示例

示例1

该 testStream 流将订阅 MQTT 主题topic1,MQTT Broker地址为tcp://broker.emqx.io:1883

stream

示例2

该 testStream 流将订阅 MQTT 主题topic/a,MQTT Broker地址为tcp://broker.emqx.io:1883。 该流预先定义了数据结构,包含 USERIDFIRST_NAMELAST_NAMENICKNAMESGenderADDRESS 字段。

stream

示例3

该 testStream 流将订阅 MQTT 主题 test/,MQTT Broker地址为tcp://broker.emqx.io:1883。 该流接收到protobuf格式的数据,并根据在 NeuronEX 中预先添加的模式文件protobuf1,对流入的数据进行解码。模式的管理详见 模式

stream