将 MQTT 数据传输到 Amazon Kinesis
TIP
EMQX 企业版功能。EMQX 企业版可以为您带来更全面的关键业务场景覆盖、更丰富的数据集成支持,更高的生产级可靠性保证以及 24/7 的全球技术支持,欢迎免费试用。
AWS Kinesis 是 AWS 上完全托管的实时流数据处理服务,可以轻松地进行流数据的收集、处理和分析。它可以经济高效地处理任意规模的实时流数据,并具有高度的灵活性,能够低时延的处理来自数十万个来源的任意数量的流数据。EMQX 支持与 Amazon Kinesis Data Streams 无缝集成,从而能够实现海量 IoT 设备连接,进行实时消息进行采集、传输,并通过数据桥接连接到 Amazon Kinesis Data Streams,进行实时数据分析与复杂的流处理。
本页详细介绍了 EMQX 与 Amazon Kinesis 的数据集成并提供了实用的规则和数据桥接创建指导。
工作原理
Amazon Kinesis 数据桥是 EMQX 的一个开箱即用功能,旨在帮助用户无缝集成 MQTT 数据流与 Amazon Kinesis,并利用其丰富的服务和能力进行物联网应用开发。
EMQX 通过规则引擎和数据桥接将 MQTT 数据转发到 Amazon Kinesis。完整的过程如下:
- 物联网设备发布消息:设备通过特定主题发布遥测和状态数据,触发规则引擎。
- 规则引擎处理消息:使用内置规则引擎,基于主题匹配处理来自特定来源的 MQTT 消息。规则引擎匹配相应的规则并处理消息,例如转换数据格式、过滤特定信息或用上下文信息丰富消息。
- 桥接到 Amazon Kinesis:规则触发将消息转发到 Amazon Kinesis 的动作,可以自定义配置分区键、要写入的数据流以及消息格式,实现灵活的数据集成。
在 MQTT 消息数据写入 Amazon Kinesis 之后,您可以进行灵活的应用开发,例如:
- 实时数据处理和分析:利用 Amazon Kinesis 强大的数据处理和分析工具及其自身的流处理能力,对消息数据进行实时处理和分析,获取有价值的洞察和决策支持。
- 事件驱动功能:触发 Amazon 事件处理,实现动态灵活的功能触发和处理。
- 数据存储和共享:将消息数据传输到 Amazon Kinesis 存储服务,安全存储和管理大量数据。这使您能够与其他 Amazon 服务共享和分析这些数据,以满足各种业务需求。
特性与优势
EMQX 与 AWS Kinesis Data Streams 的数据集成可以为您的业务带来以下功能和优势:
- 可靠的数据传输和顺序保证:EMQX 和 AWS Kinesis Data Streams 都提供了可靠的数据传输机制,EMQX 通过 MQTT 协议确保消息的可靠传输,而 AWS Kinesis Data Streams 使用分区和顺序号来保证消息的顺序性。两者结合可以确保设备发送的消息准确无误地到达目的地,并按照正确的顺序进行处理。
- 实时数据处理:设备的高频数据能够经过 EMQX 规则 SQL 进行初步的实时处理,可以毫不费力地过滤、提取、丰富和转换 MQTT 消息。将数据发送到 AWS Kinesis Data Streams 后,可以进一步结合 AWS Lambda、AWS 托管的 Apache Flink 实现运行实时分析。
- 弹性伸缩支持:EMQX 能够轻松连接数百万台物联网设备,并提供了弹性伸缩能力,AWS Kinesis Data Streams 则采用按需模式的自动资源调配和扩展,两者构建的应用能够随连接和数据规模进行扩展,持续满足适配业务的增长需求。
- 持久化数据存储:AWS Kinesis Data Streams 提供持久化的数据存储能力,能够可靠地保存每秒数百万流入的设备数据流,并在需要时随时回溯历史数据,并进行离线分析和处理。
利用 AWS Kinesis Data Streams 构建的流数据管道,可以大幅降低 EMQX 与 AWS 平台之间的接入难度,为用户提供更丰富、灵活的数据处理方案,助力 EMQX 用户在 AWS 上构建功能完备、性能卓越的数据驱动型应用。
桥接准备
本节介绍了在 EMQX 中创建 Amazon Kinesis 数据桥接之前需要做的准备工作,包括如何创建 Kinesis 数据流并在本地模拟数据流服务。
前置准备
在 Amazon Kinesis Data Streams 中创建数据流
按照以下步骤通过 AWS 管理控制台创建数据流(详细信息请参阅本教程)。
- 登录 AWS 管理控制台并打开 Kinesis 控制台。
- 在导航栏中,展开区域选择器并选择一个区域。
- 选择创建数据流。
- 在创建 Kinesis 流页面,为您的数据流输入名称,然后选择按需容量模式。
在本地模拟 Amazon Kinesis Data Streams
为了便于开发和测试,您可以通过 LocalStack 在本地模拟 Amazon Kinesis Data Streams 服务。有了 LocalStack,您可以在本地机器上运行 AWS 应用,无需连接到远程云提供商。
安装 LocalStack 并使用 Docker Image 运行:
bash# To start the LocalStack docker image locally docker run --name localstack -p '4566:4566' -e 'KINESIS_LATENCY=0' -d localstack/localstack:2.1 # Access the container docker exec -it localstack bash
创建一个只有一个分片的流,名称设为 my_stream:
bashawslocal kinesis create-stream --stream-name "my_stream" --shard-count 1
创建 Amazon Kinesis 数据桥接
转到 EMQX Dashboard,点击集成->数据桥接。
点击页面右上角的创建。
在创建数据桥接页面,点击选择 Amazon Kinesis,然后点击下一步。
为数据桥接输入一个名称。名称应为大写/小写字母和数字的组合。
输入 Amazon Kinesis Data Streams 服务的连接信息:
- AWS 访问密钥 ID:输入访问密钥 ID。如果使用 LocalStack,可输入任何值。
- AWS 秘密访问密钥:输入密钥。如果使用 LocalStack,可输入任何值。
- Amazon Kinesis 端点:输入 Kinesis 服务的终端节点。如果使用 LocalStack,输入
http://localhost:4566
。 - AWS Kinesis 流:输入您在 Amazon Kinesis Data Streams 中创建数据流中创建的数据流名称。
- 分区键:输入将与发送到此数据流的记录关联的分区键。允许使用
${variable_name}
形式的占位符(查看下一步以了解占位符示例)。
在 Payload Template 字段中,将其留空或定义模板。
- 如果留空,它将使用 JSON 格式编码 MQTT 消息中的所有可见输入,例如 clientid、topic、payload 等。
- 如果使用定义的模板,
${variable_name}
形式的占位符将使用 MQTT 上下文中的相应值进行填充。例如,如果 MQTT 消息主题是my/topic
,${topic}
将被替换为my/topic
。
高级配置(可选),根据情况配置队列与批量等参数,详细请参考数据桥接简介中的配置参数。
在点击创建之前,您可以点击测试连接性以测试桥接。
点击创建按钮完成数据桥接创建。
在弹出的创建成功对话框中您可以点击创建规则,继续创建规则以指定需要写入 Amazon Kinesis 的数据。您也可以按照创建 Amazon Kinesis 数据桥接规则中的步骤来创建规则。
创建 Amazon Kinesis 数据桥接规则
接下来您可以创建一条规则以指定需要写入 Amazon Kinesis 的数据。
在 EMQX Dashboard 左侧导航栏中点击集成 -> **规则 **.
点击页面右上角的创建。
Input
my_rule
as the rule ID.输入规则 ID
my_rule
,在 SQL 编辑器中输入规则。例如将t/#
主题的 MQTT 消息存储至 Amazon Kinesis Data Streams,需输入以下 SQL 语法:注意:如果您希望制定自己的 SQL 语法,需要确保规则选出的字段(
SELECT
部分)包含所有 SQL 模板中用到的变量。sqlSELECT * FROM "t/#"
点击添加动作,在动作下拉框中选择使用数据桥接转发选项,选择先前创建好的 Amazon Kinesis 数据桥接。点击添加。
点击最下方创建按钮完成规则创建。
至此您已经完成整个 Amazon Kinesis 数据桥接创建过程,可以前往 集成 -> Flows 页面查看拓扑图,此时应当看到 t/#
主题的消息经过名为 my_rule
的规则处理,处理结果交由 Amazon Kinesis 存储。
测试桥接和规则
使用 MQTTX 向
t/my_topic
主题发布一条消息:bashmqttx pub -i emqx_c -t t/my_topic -m '{ "msg": "hello Amazon Kinesis" }'
查看 Amazon Kinesis 的数据桥接中的运行统计,命中、发送成功次数均 +1。
转到 Amazon Kinesis 数据查看器。您应该可以看到数据流指定分片内的数据记录。
使用 LocalStack 查看数据
如果您使用 LocalStack,通过以下步骤查看接收到的数据。
在发送数据到桥接之前,使用以下命令获取 ShardIterator:
bashawslocal kinesis get-shard-iterator --stream-name my_stream --shard-id shardId-000000000000 --shard-iterator-type LATEST { "ShardIterator": "AAAAAAAAAAG3YjBK9sp0uSIFGTPIYBI17bJ1RsqX4uJmRllBAZmFRnjq1kPLrgcyn7RVigmH+WsGciWpImxjXYLJhmqI2QO/DrlLfp6d1IyJFixg1s+MhtKoM6IOH0Tb2CPW9NwPYoT809x03n1zL8HbkXg7hpZjWXPmsEvkXjn4UCBf5dBerq7NLKS3RtAmOiXVN6skPpk=" }
使用 MQTTX 向
t/my_topic
主题发布一条消息:mqttx pub -i emqx_c -t t/my_topic -m '{ "msg": "hello Amazon Kinesis" }'
查看数据记录并解码接收到的数据:
bashawslocal kinesis get-records --shard-iterator="AAAAAAAAAAG3YjBK9sp0uSIFGTPIYBI17bJ1RsqX4uJmRllBAZmFRnjq1kPLrgcyn7RVigmH+WsGciWpImxjXYLJhmqI2QO/DrlLfp6d1IyJFixg1s+MhtKoM6IOH0Tb2CPW9NwPYoT809x03n1zL8HbkXg7hpZjWXPmsEvkXjn4UCBf5dBerq7NLKS3RtAmOiXVN6skPpk=" { "Records": [ { "SequenceNumber": "49642650476690467334495639799144299020426020544120356866", "ApproximateArrivalTimestamp": 1689389148.261, "Data": "eyAibXNnIjogImhlbGxvIEFtYXpvbiBLaW5lc2lzIiB9", "PartitionKey": "key", "EncryptionType": "NONE" } ], "NextShardIterator": "AAAAAAAAAAFj5M3+6XUECflJAlkoSNHV/LBciTYY9If2z1iP+egC/PtdVI2t1HCf3L0S6efAxb01UtvI+3ZSh6BO02+L0BxP5ssB6ONBPfFgqvUIjbfu0GOmzUaPiHTqS8nNjoBtqk0fkYFDOiATdCCnMSqZDVqvARng5oiObgigmxq8InciH+xry2vce1dF9+RRFkKLBc0=", "MillisBehindLatest": 0 } echo 'eyAibXNnIjogImhlbGxvIEFtYXpvbiBLaW5lc2lzIiB9' | base64 -d { "msg": "hello Amazon Kinesis" }