Skip to content

将 MQTT 数据写入到 Amazon S3 存储桶中

Amazon S3 是一种面向互联网的存储服务,具有高度的可靠性、稳定性和安全性,能够快速部署和使用。EMQX 能够将 MQTT 消息高效地存储至 Amazon S3 存储桶中,实现灵活的物联网数据存储功能。

本页详细介绍了 EMQX 与 Amazon S3 的数据集成并提供了实用的规则和 Sink 创建指导。

TIP

EMQX 也兼容其他支持 S3 协议的存储服务,例如:

  • MinIO: MinIO 是一款高性能、分布式的对象存储系统。它是兼容 Amazon S3 API 的开源对象存储服务器,可以用于构建私有云。
  • Google Cloud Storage: Google Cloud Storage 是 Google Cloud 的统一对象存储,用于开发者和企业存储大量数据。它提供了与 Amazon S3 兼容的接口。

您可以根据自己的业务需求和场景选择合适的存储服务。

工作原理

Amazon S3 数据集成是 EMQX 中开箱即用的功能,通过简单的配置即可实现复杂的业务开发。在一个典型的物联网应用中,EMQX 作为物联网平台,负责接入设备进行消息传输,Amazon S3 作为数据存储平台,负责消息数据的存储。

EMQX Amazon S3 数据集成

EMQX 通过规则引擎与 Sink 将设备事件和数据转发至 Amazon S3,应用读取 Amazon S3 中数据即可进行数据的应用。其具体的工作流程如下:

  1. 设备连接到 EMQX:物联网设备通过 MQTT 协议连接成功后将触发上线事件,事件包含设备 ID、来源 IP 地址以及其他属性等信息。
  2. 设备消息发布和接收:设备通过特定的主题发布遥测和状态数据,EMQX 接收到消息后将在规则引擎中进行比对。
  3. 规则引擎处理消息:通过内置的规则引擎,可以根据主题匹配处理特定来源的消息和事件。规则引擎会匹配对应的规则,并对消息和事件进行处理,例如转换数据格式、过滤掉特定信息或使用上下文信息丰富消息。
  4. 写入到 Amazon S3:规则触发后,消息会被写入到 Amazon S3。通过使用 Amazon S3 Sink,用户可以从处理结果中提取数据并发送到 S3。消息可以以文本、二进制格式存储,或者将多行结构化数据聚合成一个 CSV 文件或 JSON Lines 文件,具体取决于消息内容和 Sink 的配置。

事件和消息数据写入到 Amazon S3 后,您可以连接到 Amazon S3 读取数据,进行灵活的应用开发,例如:

  • 数据归档:将设备消息作为对象存储,长期保存在 Amazon S3 中,以满足数据保留和合规性要求。
  • 数据分析:将 S3 中的数据导入到分析服务例如 Snowflake 中,实现预测性维护、设备效能评估等数据分析业务。

特性与优势

在 EMQX 中使用 Amazon S3 数据集成能够为您的业务带来以下特性与优势:

  • 消息转换:消息可以在写入 Amazon S3 之前,通过 EMQX 规则中进行丰富的处理和转换,方便后续的存储和使用。
  • 灵活数据操作:通过 S3 Sink,可以方便地将特定字段的数据写入到 Amazon S3 存储桶中,支持动态设置存储桶与对象键,实现数据的灵活存储。
  • 整合业务流程:通过 S3 Sink 可以将设备数据与 Amazon S3 丰富的生态应用结合,以实现更多的业务场景,例如数据分析、数据归档等。
  • 低成本长期存储:相较于数据库,Amazon S3 提供了高可用、高可靠、低成本的对象存储服务,可以满足长期存储的需求。

通过以上特性,您可以构建高效、可靠和可扩展的物联网应用,并在业务决策和优化方面受益。

准备工作

本节介绍了在 EMQX 中创建 Amazon S3 Sink 之前需要做的准备工作。

前置准备

在开始之前,请确保您已了解以下内容:

EMQX 相关概念:

  • 规则引擎:了解规则如何定义从 MQTT 消息中提取和转换数据的逻辑。
  • 数据集成:了解 EMQX 数据集成中连接器和 Sink 的概念。

AWS 相关概念:

如果您是第一次使用 AWS S3 存储服务,请了解以下概念:

  • EC2:AWS 的虚拟机服务(计算实例)。
  • IAM:AWS 身份与权限管理,实例角色可给在该实例上运行的程序签发临时凭证。
  • IMDSv2:EC2 的实例元数据/临时凭证获取接口,采用令牌机制,更安全。

准备 S3 存储桶

EMQX 支持 Amazon S3 以及兼容 S3 的存储服务,您可以使用 AWS 云服务或者 Docker 部署一个 MinIO 实例。

创建连接器

在添加 S3 Sink 前,您需要创建对应的连接器,创建步骤如下:

  1. 转到 Dashboard 集成 -> 连接器页面。

  2. 点击页面右上角的创建

  3. 在连接器类型中选择 Amazon S3,点击下一步

  4. 输入连接器名称。名称必须以字母或数字开头,可以包含字母、数字、连字符或下划线。例如: my-s3

  5. 输入连接信息:

    • 如果使用 AWS S3 存储桶,请输入以下信息:

      • 主机:根据区域不同,格式为 s3.{region}.amazonaws.com

      • 端口:填写 443

      • 访问密钥 ID私有访问密钥

        • 填写在 AWS 中创建的访问密钥,或者
        • 如果 EMQX 运行在已绑定 IAM 角色的 EC2 上,可留空。

        详细说明请参见准备 S3 存储桶中的 Amazon S3 标签页。

    • 如果使用 MinIO:

      • 主机:填写 127.0.0.1 (如果 MinIO 在远程运行,填写实际地址)。
      • 端口:填写 9000
      • 访问密钥 ID私有访问密钥:填写 MinIO 中创建的访问密钥。
  6. 点击创建之前,您可以先点击测试连接来测试连接器是否可以连接到 S3 服务。

  7. 点击最下方创建按钮完成连接器创建。

至此您已经完成连接器创建,接下来将继续创建一条规则和 Sink 来指定需要写入的数据。

创建 Amazon S3 Sink 规则

本节演示了如何在 EMQX 中创建一条规则,用于处理来自源 MQTT 主题 t/# 的消息,并通过配置的 Sink 将处理后的结果写入到 S3 的 iot-data 存储桶中。

  1. 转到 Dashboard 集成 -> 规则页面。

  2. 点击页面右上角的创建

  3. 输入规则 ID my_rule,在 SQL 编辑器中输入规则 SQL 如下:

    sql
    SELECT
      *
    FROM
        "t/#"

    TIP

    如果您初次使用 SQL,可以点击 SQL 示例启用调试来学习和测试规则 SQL 的结果。

  4. 添加动作,从动作类型下拉列表中选择 Amazon S3,保持动作下拉框为默认的“创建动作”选项,您也可以从动作下拉框中选择一个之前已经创建好的 Amazon S3 动作。此处我们创建一个全新的 Sink 并添加到规则中。

  5. 在下方的表单中输入 Sink 的名称与描述。

  6. 在连接器下拉框中选择刚刚创建的 my-s3 连接器。您也可以点击下拉框旁边的创建按钮,在弹出框中快捷创建新的连接器,所需的配置参数可以参考创建连接器

  7. 设置存储桶,此处输入 iot-data,此处也支持 ${var} 格式的占位符,但要注意需要在 S3 中预先创建好对应名称的存储桶。

  8. 根据情况选择 ACL,指定上传对象的访问权限。

  9. 选择上传方式,两种方式区别如下:

    • 直接上传:每次规则触发时,按照预设的对象键和内容直接上传到 S3,适合存储二进制或体积较大的文本数据。这种方法可能会生成大量的文件。
    • 聚合上传:将多次规则触发的结果打包为一个文件(如 CSV 文件)并上传到 S3,适合存储结构化数据。这种方法可以减少文件数量,提高写入效率。

    两种方式配置的参数不同,请根据所选方式进行配置:

  10. 备选动作(可选):如果您希望在消息投递失败时提升系统的可靠性,可以为 Sink 配置一个或多个备选动作。当 Sink 无法成功处理消息时,这些备选动作将被触发。更多信息请参见:备选动作

  11. 展开高级设置,根据需要配置高级设置选项(可选),详细请参考高级设置

  12. 其余参数使用默认值即可。点击创建按钮完成 Sink 的创建,创建成功后页面将回到创建规则,新的 Sink 将添加到规则动作中。

  13. 回到规则创建页面,点击创建按钮完成整个规则创建。

现在您已成功创建了规则,你可以在规则页面上看到新建的规则,同时在动作(Sink) 标签页看到新建的 S3 Sink。

您也可以点击 集成 -> Flow 设计器查看拓扑,通过拓扑可以直观的看到,主题 t/# 下的消息在经过规则 my_rule 解析后被写入到 S3 中。

测试规则

此处以直接上传为例进行测试。使用 MQTTX 向 t/1 主题发布消息:

bash
mqttx pub -i emqx_c -t t/1 -m '{ "msg": "hello S3" }'

发送几条消息后,访问 Amazon S3 控制台或 MinIO 控制台查看结果。

高级设置

本节将深入介绍可用于 S3 Sink 的高级配置选项。在 Dashboard 中配置 Sink 时,您可以根据您的特定需求展开高级设置,调整以下参数。

字段名称描述默认值
缓存池大小指定缓冲区工作进程数量,这些工作进程将被分配用于管理 EMQX 与 S3 的数据流,它们负责在将数据发送到目标服务之前临时存储和处理数据。此设置对于优化性能并确保 Sink 数据传输顺利进行尤为重要。16
请求超期“请求 TTL”(生存时间)设置指定了请求在进入缓冲区后被视为有效的最长持续时间(以秒为单位)。此计时器从请求进入缓冲区时开始计时。如果请求在缓冲区内停留的时间超过了此 TTL 设置或者如果请求已发送但未能在 S3 中及时收到响应或确认,则将视为请求已过期。45
健康检查间隔指定 Sink 对与 S3 的连接执行自动健康检查的时间间隔(以秒为单位)。15
健康检查间隔抖动在健康检查间隔中添加一个均匀的随机延迟(抖动),用于避免多个节点在相同时间触发健康检查请求。当多个 Sink 或 Source 使用同一连接器时,启用抖动可确保它们在不同时间启动健康检查,提升系统稳定性。0 毫秒
健康检查超时指定对与 S3 Tables 服务的连接执行自动健康检查的超时时间。60
缓存队列最大长度指定可以由 S3 Sink 中的每个缓冲器工作进程缓冲的最大字节数。缓冲器工作进程在将数据发送到 S3 之前会临时存储数据,充当处理数据流的中介以更高效地处理数据流。根据系统性能和数据传输要求调整该值。256 MB
请求模式允许您选择同步异步请求模式,以根据不同要求优化消息传输。在异步模式下,写入到 S3 不会阻塞 MQTT 消息发布过程。但是,这可能导致客户在它们到达 S3 之前就收到了消息。异步
请求飞行队列窗口“飞行队列请求”是指已启动但尚未收到响应或确认的请求。此设置控制 Sink 与 S3 通信时可以同时存在的最大飞行队列请求数。
请求模式 设置为 异步 时,“请求飞行队列窗口”参数变得特别重要。如果对于来自同一 MQTT 客户端的消息严格按顺序处理很重要,则应将此值设置为 1
100