Amazon Kinesis
EMQX 支持与 Amazon Kinesis Data Streams 无缝集成,从而能进一步与其他 AWS 服务集成,实时提取、处理和分析 MQTT 数据。
TIP
EMQX 企业版功能。EMQX 企业版可以为您带来更全面的关键业务场景覆盖、更丰富的数据集成支持,更高的生产级可靠性保证以及 24/7 的全球技术支持,欢迎免费试用。
功能清单
快速开始教程
本节介绍如何配置 Amaxon Kinesis 数据桥接,包括如何设置 Kinesis 服务、创建数据桥接和转发数据到 Kinesis Data Stream 的规则以及测试数据桥接和规则等主题。
在 Amazon Kinesis Data Streams 中创建数据流
按照以下步骤通过 AWS 管理控制台创建数据流(详细信息请参阅本教程)。
- 登录 AWS 管理控制台并打开 Kinesis 控制台。
- 在导航栏中,展开区域选择器并选择一个区域。
- 选择创建数据流。
- 在创建 Kinesis 流页面,为您的数据流输入名称,然后选择按需容量模式。
在本地模拟 Amazon Kinesis Data Streams
为了便于开发和测试,您可以通过 LocalStack 在本地模拟 Amazon Kinesis Data Streams 服务。有了 LocalStack,您可以在本地机器上运行 AWS 应用,无需连接到远程云提供商。
安装 LocalStack 并使用 Docker Image 运行:
bash# To start the LocalStack docker image locally docker run --name localstack -p '4566:4566' -e 'KINESIS_LATENCY=0' -d localstack/localstack:2.1 # Access the container docker exec -it localstack bash
创建一个只有一个分片的流,名称设为 my_stream:
bashawslocal kinesis create-stream --stream-name "my_stream" --shard-count 1
创建 Amazon Kinesis 数据桥接
转到 EMQX Dashboard,点击集成->数据桥接。
点击页面右上角的创建。
在创建数据桥接页面,点击选择 Amazon Kinesis,然后点击下一步。
为数据桥接输入一个名称。名称应为大写/小写字母和数字的组合。
输入 Amazon Kinesis Data Streams 服务的连接信息:
- AWS 访问密钥 ID:输入访问密钥 ID。如果使用 LocalStack,可输入任何值。
- AWS 秘密访问密钥:输入密钥。如果使用 LocalStack,可输入任何值。
- Amazon Kinesis 端点:输入 Kinesis 服务的终端节点。如果使用 LocalStack,输入
http://localhost:4566
。 - AWS Kinesis 流:输入您在 Amazon Kinesis Data Streams 中创建数据流中创建的数据流名称。
- 分区键:输入将与发送到此数据流的记录关联的分区键。允许使用
${variable_name}
形式的占位符(查看下一步以了解占位符示例)。
在 Payload Template 字段中,将其留空或定义模板。
- 如果留空,它将使用 JSON 格式编码 MQTT 消息中的所有可见输入,例如 clientid、topic、payload 等。
- 如果使用定义的模板,
${variable_name}
形式的占位符将使用 MQTT 上下文中的相应值进行填充。例如,如果 MQTT 消息主题是my/topic
,${topic}
将被替换为my/topic
。
高级配置(可选),根据情况配置队列与批量等参数,详细请参考数据桥接简介中的配置参数。
在点击创建之前,您可以点击测试连接性以测试桥接。
点击创建按钮完成数据桥接创建。
在弹出的创建成功对话框中您可以点击创建规则,继续创建规则以指定需要写入 Amazon Kinesis 的数据。您也可以按照创建 Amazon Kinesis 数据桥接规则中的步骤来创建规则。
创建 Amazon Kinesis 数据桥接规则
接下来您可以创建一条规则以指定需要写入 Amazon Kinesis 的数据。
在 EMQX Dashboard 左侧导航栏中点击集成 -> **规则 **.
点击页面右上角的创建。
Input
my_rule
as the rule ID.输入规则 ID
my_rule
,在 SQL 编辑器中输入规则。例如将t/#
主题的 MQTT 消息存储至 Amazon Kinesis Data Streams,需输入以下 SQL 语法:注意:如果您希望制定自己的 SQL 语法,需要确保规则选出的字段(
SELECT
部分)包含所有 SQL 模板中用到的变量。sqlSELECT * FROM "t/#"
点击添加动作,在动作下拉框中选择使用数据桥接转发选项,选择先前创建好的 Amazon Kinesis 数据桥接。点击添加。
点击最下方创建按钮完成规则创建。
至此您已经完成整个 Amazon Kinesis 数据桥接创建过程,可以前往 集成 -> Flows 页面查看拓扑图,此时应当看到 t/#
主题的消息经过名为 my_rule
的规则处理,处理结果交由 Amazon Kinesis 存储。
测试桥接和规则
使用 MQTTX 向
t/my_topic
主题发布一条消息:bashmqttx pub -i emqx_c -t t/my_topic -m '{ "msg": "hello Amazon Kinesis" }'
查看 Amazon Kinesis 的数据桥接中的运行统计,命中、发送成功次数均 +1。
转到 Amazon Kinesis 数据查看器。您应该可以看到数据流指定分片内的数据记录。
使用 LocalStack 查看数据
如果您使用 LocalStack,通过以下步骤查看接收到的数据。
在发送数据到桥接之前,使用以下命令获取 ShardIterator:
bashawslocal kinesis get-shard-iterator --stream-name my_stream --shard-id shardId-000000000000 --shard-iterator-type LATEST { "ShardIterator": "AAAAAAAAAAG3YjBK9sp0uSIFGTPIYBI17bJ1RsqX4uJmRllBAZmFRnjq1kPLrgcyn7RVigmH+WsGciWpImxjXYLJhmqI2QO/DrlLfp6d1IyJFixg1s+MhtKoM6IOH0Tb2CPW9NwPYoT809x03n1zL8HbkXg7hpZjWXPmsEvkXjn4UCBf5dBerq7NLKS3RtAmOiXVN6skPpk=" }
使用 MQTTX 向
t/my_topic
主题发布一条消息:mqttx pub -i emqx_c -t t/my_topic -m '{ "msg": "hello Amazon Kinesis" }'
查看数据记录并解码接收到的数据:
bashawslocal kinesis get-records --shard-iterator="AAAAAAAAAAG3YjBK9sp0uSIFGTPIYBI17bJ1RsqX4uJmRllBAZmFRnjq1kPLrgcyn7RVigmH+WsGciWpImxjXYLJhmqI2QO/DrlLfp6d1IyJFixg1s+MhtKoM6IOH0Tb2CPW9NwPYoT809x03n1zL8HbkXg7hpZjWXPmsEvkXjn4UCBf5dBerq7NLKS3RtAmOiXVN6skPpk=" { "Records": [ { "SequenceNumber": "49642650476690467334495639799144299020426020544120356866", "ApproximateArrivalTimestamp": 1689389148.261, "Data": "eyAibXNnIjogImhlbGxvIEFtYXpvbiBLaW5lc2lzIiB9", "PartitionKey": "key", "EncryptionType": "NONE" } ], "NextShardIterator": "AAAAAAAAAAFj5M3+6XUECflJAlkoSNHV/LBciTYY9If2z1iP+egC/PtdVI2t1HCf3L0S6efAxb01UtvI+3ZSh6BO02+L0BxP5ssB6ONBPfFgqvUIjbfu0GOmzUaPiHTqS8nNjoBtqk0fkYFDOiATdCCnMSqZDVqvARng5oiObgigmxq8InciH+xry2vce1dF9+RRFkKLBc0=", "MillisBehindLatest": 0 } echo 'eyAibXNnIjogImhlbGxvIEFtYXpvbiBLaW5lc2lzIiB9' | base64 -d { "msg": "hello Amazon Kinesis" }