将 MQTT 数据传输到 Amazon Kinesis
AWS Kinesis 是 AWS 上完全托管的实时流数据处理服务,可以轻松地进行流数据的收集、处理和分析。它可以经济高效地处理任意规模的实时流数据,并具有高度的灵活性,能够低时延的处理来自数十万个来源的任意数量的流数据。EMQX 支持与 Amazon Kinesis Data Streams 无缝集成,从而能够实现海量 IoT 设备连接,进行实时消息进行采集、传输,并通过 Sink 连接到 Amazon Kinesis Data Streams,进行实时数据分析与复杂的流处理。
本页详细介绍了 EMQX 与 Amazon Kinesis 的数据集成并提供了实用的规则和 Sink 创建指导。
工作原理
Amazon Kinesis 数据集成是 EMQX 的一个开箱即用功能,旨在帮助用户无缝集成 MQTT 数据流与 Amazon Kinesis,并利用其丰富的服务和能力进行物联网应用开发。
EMQX 通过规则引擎和 Sink 将 MQTT 数据转发到 Amazon Kinesis。完整的过程如下:
- 物联网设备发布消息:设备通过特定主题发布遥测和状态数据,触发规则引擎。
- 规则引擎处理消息:使用内置规则引擎,基于主题匹配处理来自特定来源的 MQTT 消息。规则引擎匹配相应的规则并处理消息,例如转换数据格式、过滤特定信息或用上下文信息丰富消息。
- 桥接到 Amazon Kinesis:规则触发将消息转发到 Amazon Kinesis 的动作,可以自定义配置分区键、要写入的数据流以及消息格式,实现灵活的数据集成。
在 MQTT 消息数据写入 Amazon Kinesis 之后,您可以进行灵活的应用开发,例如:
- 实时数据处理和分析:利用 Amazon Kinesis 强大的数据处理和分析工具及其自身的流处理能力,对消息数据进行实时处理和分析,获取有价值的洞察和决策支持。
- 事件驱动功能:触发 Amazon 事件处理,实现动态灵活的功能触发和处理。
- 数据存储和共享:将消息数据传输到 Amazon Kinesis 存储服务,安全存储和管理大量数据。这使您能够与其他 Amazon 服务共享和分析这些数据,以满足各种业务需求。
特性与优势
EMQX 与 AWS Kinesis Data Streams 的数据集成可以为您的业务带来以下功能和优势:
- 可靠的数据传输和顺序保证:EMQX 和 AWS Kinesis Data Streams 都提供了可靠的数据传输机制,EMQX 通过 MQTT 协议确保消息的可靠传输,而 AWS Kinesis Data Streams 使用分区和顺序号来保证消息的顺序性。两者结合可以确保设备发送的消息准确无误地到达目的地,并按照正确的顺序进行处理。
- 实时数据处理:设备的高频数据能够经过 EMQX 规则 SQL 进行初步的实时处理,可以毫不费力地过滤、提取、丰富和转换 MQTT 消息。将数据发送到 AWS Kinesis Data Streams 后,可以进一步结合 AWS Lambda、AWS 托管的 Apache Flink 实现运行实时分析。
- 弹性伸缩支持:EMQX 能够轻松连接数百万台物联网设备,并提供了弹性伸缩能力,AWS Kinesis Data Streams 则采用按需模式的自动资源调配和扩展,两者构建的应用能够随连接和数据规模进行扩展,持续满足适配业务的增长需求。
- 持久化数据存储:AWS Kinesis Data Streams 提供持久化的数据存储能力,能够可靠地保存每秒数百万流入的设备数据流,并在需要时随时回溯历史数据,并进行离线分析和处理。
利用 AWS Kinesis Data Streams 构建的流数据管道,可以大幅降低 EMQX 与 AWS 平台之间的接入难度,为用户提供更丰富、灵活的数据处理方案,助力 EMQX 用户在 AWS 上构建功能完备、性能卓越的数据驱动型应用。
准备工作
本节介绍了在 EMQX 中创建 Amazon Kinesis Sink 之前需要做的准备工作,包括如何创建 Kinesis 数据流并在本地模拟数据流服务。
前置准备
在 Amazon Kinesis Data Streams 中创建数据流
按照以下步骤通过 AWS 管理控制台创建数据流(详细信息请参阅本教程)。
- 登录 AWS 管理控制台并打开 Kinesis 控制台。
- 在导航栏中,展开区域选择器并选择一个区域。
- 选择创建数据流。
- 在创建 Kinesis 流页面,为您的数据流输入名称,然后选择按需容量模式。
在本地模拟 Amazon Kinesis Data Streams
为了便于开发和测试,您可以通过 LocalStack 在本地模拟 Amazon Kinesis Data Streams 服务。有了 LocalStack,您可以在本地机器上运行 AWS 应用,无需连接到远程云提供商。
安装 LocalStack 并使用 Docker Image 运行:
bash# To start the LocalStack docker image locally docker run --name localstack -p '4566:4566' -e 'KINESIS_LATENCY=0' -d localstack/localstack:2.1 # Access the container docker exec -it localstack bash
创建一个只有一个分片的流,名称设为 my_stream:
bashawslocal kinesis create-stream --stream-name "my_stream" --shard-count 1
创建连接器
在添加 Amazon Kinesis Sink 前,您需要创建连接器用于将 Sink 连接到 Amazone Kinesis Data Streams 服务。
进入 EMQX Dashboard,点击集成 -> 连接器。
点击页面右上角的创建。
在创建连接器页面,点击选择 Amazon Kinesis,然后点击下一步。
为连接器输入一个名称,名称应为大小写字母和数字的组合,此处我们输入
my-kinesis
。配置连接信息。
- AWS 访问密钥 ID:输入访问密钥 ID。如果使用 LocalStack,可输入任何值。
- AWS 秘密访问密钥:输入密钥。如果使用 LocalStack,可输入任何值。
- Amazon Kinesis 端点:输入 Kinesis 服务的终端节点。如果使用 LocalStack,输入
http://localhost:4566
。
在点击创建之前,您可以点击测试连接**以测试连接器是否能连接到 Amazon Kinesis Data Streams 服务。
点击底部的创建按钮完成连接器的创建。在弹出对话框中,您可以点击返回连接器列表或点击创建规则**继续创建带有 Amazone Kinesis Sink 的规则,以指定要转发到 Amazon Kinesis 的数据。详细步骤请参见 创建 Amazon Kinesis Sink 规则。
创建 Amazon Kinesis Sink 规则
本节演示了如何在 Dashboard 中创建一条规则以指定需要写入 Amazon Kinesis 的数据并为规则添加触发的动作。
在 EMQX Dashboard 左侧导航栏中点击集成 -> 规则。
点击页面右上角的创建。
输入规则 ID
my_rule
,在 SQL 编辑器中输入规则。例如将t/#
主题的 MQTT 消息存储至 Amazon Kinesis Data Streams,需输入以下 SQL 语法:注意:如果您希望制定自己的 SQL 语法,需要确保规则选出的字段(
SELECT
部分)包含所有 SQL 模板中用到的变量。sqlSELECT * FROM "t/#"
TIP
如果您初次使用 SQL,可以点击 SQL 示例 和启用调试来学习和测试规则 SQL 的结果。
点击右侧的添加动作按钮,为规则在被触发的情况下指定一个动作。在动作类型下拉框中选择
Amazon Kinesis
,保持动作下拉框为默认的创建动作
选项,您也可以选择一个之前已经创建好的 Amazon Kinesis Sink。此处我们创建一个全新的 Sink 并添加到规则中。输入 Sink 名称,名称应为大/小写字母和数字的组合。
在连接器下拉框中选择刚刚创建的
my-kinesis
连接器。您也可以点击下拉框旁边的创建按钮,在弹出框中快捷创建新的连接器,所需的配置参数按照参照创建连接器。为 Sink 配置以下信息:
- AWS Kinesis 流:输入您在 Amazon Kinesis Data Streams 中创建数据流中创建的数据流名称。
- 分区键:输入将与发送到此数据流的记录关联的分区键。允许使用
${variable_name}
形式的占位符(查看下一步以了解占位符示例)。
在 Payload Template 字段中,将其留空或定义模板。
- 如果留空,它将使用 JSON 格式编码 MQTT 消息中的所有可见输入,例如 clientid、topic、payload 等。
- 如果使用定义的模板,
${variable_name}
形式的占位符将使用 MQTT 上下文中的相应值进行填充。例如,如果 MQTT 消息主题是my/topic
,${topic}
将被替换为my/topic
。
备选动作(可选):如果您希望在消息投递失败时提升系统的可靠性,可以为 Sink 配置一个或多个备选动作。当 Sink 无法成功处理消息时,这些备选动作将被触发。更多信息请参见:备选动作。
高级设置(可选):根据需要配置高级设置选项(可选)。更多详细信息请参考高级设置。
在点击创建之前,您可以点击测试连接以测试 Sink 是否能连接到 Amazon Kinesis Data Stream 服务。
点击创建按钮完成 Sink 创建,新建的 Sink 将被添加到动作输出列表中。
回到创建规则页面,对配置的信息进行确认,点击创建。一条规则应该出现在规则列表中。
现在您已成功创建了通过 Amazon Kinesis Sink 将数据转发到 Amazon Kinesis Data Streams 的规则,同时在规则页面的动作(Sink) 标签页看到新建的 Amazon Kinesis Sink。
您还可以点击 集成 -> Flow 设计器查看拓扑,通过拓扑可以直观的看到,主题 t/#
下的消息在经过名为 my_rule
的规则处理,处理结果交由 Amazon Kinesis Data Streams 存储。
测试桥接和规则
使用 MQTTX 向
t/my_topic
主题发布一条消息:bashmqttx pub -i emqx_c -t t/my_topic -m '{ "msg": "hello Amazon Kinesis" }'
查看 Amazon Kinesis 的 Sink 中的运行统计,命中、发送成功次数均 +1。
转到 Amazon Kinesis 数据查看器。您应该可以看到数据流指定分片内的数据记录。
使用 LocalStack 查看数据
如果您使用 LocalStack,通过以下步骤查看接收到的数据。
在发送数据到 EMQX 之前,使用以下命令获取 ShardIterator:
bashawslocal kinesis get-shard-iterator --stream-name my_stream --shard-id shardId-000000000000 --shard-iterator-type LATEST { "ShardIterator": "AAAAAAAAAAG3YjBK9sp0uSIFGTPIYBI17bJ1RsqX4uJmRllBAZmFRnjq1kPLrgcyn7RVigmH+WsGciWpImxjXYLJhmqI2QO/DrlLfp6d1IyJFixg1s+MhtKoM6IOH0Tb2CPW9NwPYoT809x03n1zL8HbkXg7hpZjWXPmsEvkXjn4UCBf5dBerq7NLKS3RtAmOiXVN6skPpk=" }
使用 MQTTX 向
t/my_topic
主题发布一条消息:mqttx pub -i emqx_c -t t/my_topic -m '{ "msg": "hello Amazon Kinesis" }'
查看数据记录并解码接收到的数据:
bashawslocal kinesis get-records --shard-iterator="AAAAAAAAAAG3YjBK9sp0uSIFGTPIYBI17bJ1RsqX4uJmRllBAZmFRnjq1kPLrgcyn7RVigmH+WsGciWpImxjXYLJhmqI2QO/DrlLfp6d1IyJFixg1s+MhtKoM6IOH0Tb2CPW9NwPYoT809x03n1zL8HbkXg7hpZjWXPmsEvkXjn4UCBf5dBerq7NLKS3RtAmOiXVN6skPpk=" { "Records": [ { "SequenceNumber": "49642650476690467334495639799144299020426020544120356866", "ApproximateArrivalTimestamp": 1689389148.261, "Data": "eyAibXNnIjogImhlbGxvIEFtYXpvbiBLaW5lc2lzIiB9", "PartitionKey": "key", "EncryptionType": "NONE" } ], "NextShardIterator": "AAAAAAAAAAFj5M3+6XUECflJAlkoSNHV/LBciTYY9If2z1iP+egC/PtdVI2t1HCf3L0S6efAxb01UtvI+3ZSh6BO02+L0BxP5ssB6ONBPfFgqvUIjbfu0GOmzUaPiHTqS8nNjoBtqk0fkYFDOiATdCCnMSqZDVqvARng5oiObgigmxq8InciH+xry2vce1dF9+RRFkKLBc0=", "MillisBehindLatest": 0 } echo 'eyAibXNnIjogImhlbGxvIEFtYXpvbiBLaW5lc2lzIiB9' | base64 -d { "msg": "hello Amazon Kinesis" }
高级设置
本节深入介绍 Kinesis Sink 的高级配置选项。在 Dashboard 中配置 Sink 时,可以展开高级设置,根据您的具体需求调整以下参数。
字段名称 | 描述 | 默认值 |
---|---|---|
缓存池大小 | 指定缓冲工作进程的数量,这些进程用于管理 EMQX 与 Kinesis 之间的数据流。缓冲进程在数据发送到目标服务之前临时存储和处理数据,对于优化性能和确保数据传输的顺畅至关重要。 | 16 |
请求超期 | “请求超期”(生存时间)配置指定请求在进入缓冲区后被视为有效的最长持续时间(以秒为单位)。计时器从请求进入缓冲区时开始,如果请求在超过 TTL 时间后仍未发送或未收到来自 Kinesis 的响应或确认,该请求将被视为已过期。 | 45 秒 |
健康检查间隔 | 指定 Sink 与 Kinesis 之间自动进行健康检查的时间间隔(以秒为单位)。 | 15 秒 |
健康检查间隔抖动 | 在健康检查间隔中添加一个均匀的随机延迟(抖动),用于避免多个节点在相同时间触发健康检查请求。当多个 Sink 或 Source 使用同一连接器时,启用抖动可确保它们在不同时间启动健康检查,提升系统稳定性。 | 15 秒 |
健康检查超时 | 指定对与 Kinesis 服务的连接执行自动健康检查的超时时间。 | 60 秒 |
缓存队列最大长度 | 指定每个缓冲工作进程在 Kinesis Sink 中可以缓冲的最大字节数。缓冲工作进程临时存储数据,以便更有效地处理数据流。在系统性能和数据传输要求下,可以调整此值。 | 256 MB |
请求模式 | 允许您在 同步 和 异步 请求模式之间进行选择,以根据不同需求优化消息传输。在异步模式下,写入 Kinesis 不会阻塞 MQTT 消息的发布过程,但这可能会导致客户端在消息到达 Kinesis 之前就收到消息。 | 异步 |
最大批量请求大小 | 指定从 EMQX 向 Kinesis 传输数据时的单次传输最大数据批大小。通过调整批处理大小,您可以微调 EMQX 和 Kinesis 之间数据传输的效率和性能。 如果将“批处理大小”设置为 "1",则数据记录将单独发送,而不会被分组为批处理。 | 1 |
请求飞行队列窗口 | “在途请求队列”指已启动但尚未收到响应或确认的请求。此设置控制 Sink 与 Kinesis 通信期间同时存在的最大在途请求数量。 当请求模式设置为 异步 时,“在途请求队列窗口”参数尤其重要。如果需要严格按顺序处理来自同一 MQTT 客户端的消息,则应将此值设置为 1 。 | 100 |