Skip to content

将 MQTT 数据导入 Azure Blob Storage

TIP

Azure Blob Storage 数据集成是 EMQX 企业版功能。

Azure Blob Storage 是微软基于云的对象存储解决方案,专为处理大量非结构化数据而设计。非结构化数据指的是不遵循特定数据模型或格式的数据类型,例如文本文件或二进制数据。EMQX 可以高效地将 MQTT 消息存储在 Blob Storage 容器中,为物联网 (IoT) 数据存储提供了一个多功能的解决方案。

本页面详细介绍了 EMQX 与 Azure Blob Storage 之间的数据集成,并提供了规则和 Sink 的创建指南。

工作原理

Azure Blob Storage 数据集成是 EMQX 中一个开箱即用的功能,可以轻松配置以应对复杂的业务发展。在典型的 IoT 应用中,EMQX 作为负责设备连接和消息传输的物联网平台,而 Azure Blob Storage 则作为数据存储平台,处理消息数据的存储。

azure-blob-storage-architecture

EMQX 利用规则引擎和数据接收器将设备事件和数据转发到 Azure Blob Storage。应用程序可以从 Azure Blob Storage 中读取数据,用于进一步的数据应用。具体工作流程如下:

  1. 设备连接到 EMQX:IoT 设备通过 MQTT 协议成功连接后触发上线事件。该事件包括设备 ID、来源 IP 地址以及其他属性信息。
  2. 设备消息发布和接收:设备通过特定主题发布遥测和状态数据。EMQX 接收这些消息,并在规则引擎中进行匹配。
  3. 规则引擎处理消息:内置规则引擎根据主题匹配处理来自特定来源的消息和事件。它匹配相应的规则,并处理消息和事件,如数据格式转换、过滤特定信息或用上下文信息丰富消息。
  4. 写入 Azure Blob Storage:规则触发一个动作,将消息写入存储容器。使用 Azure Blob Storage Sink,用户可以从处理结果中提取数据并发送到 Blob Storage。根据消息内容和 Sink 中的配置,消息可以以文本或二进制格式存储,或将多行结构化数据汇总到单个 CSV 文件中。

事件和消息数据写入存储容器后,可以连接到 Azure Blob Storage 读取数据,以实现灵活的应用开发,例如:

  • 数据存档:将设备消息作为对象存储在 Azure Blob Storage 中,以实现长期保存,满足合规要求或业务需求。
  • 数据分析:将存储容器中的数据导入分析服务,如 Snowflake,用于预测性维护、设备效率评估等数据分析服务。

功能和优势

在 EMQX 中使用 Azure Blob Storage 数据集成可以为您的业务带来以下功能和优势:

  • 消息转换:消息在写入 Azure Blob Storage 之前,可以在 EMQX 规则中进行广泛的处理和转换,以便后续存储和使用。
  • 灵活的数据操作:通过 Azure Blob Storage Sink,可以方便地将特定字段的数据写入 Azure Blob Storage 容器中,支持动态设置容器和对象键,实现灵活的数据存储。
  • 集成的业务流程:Azure Blob Storage Sink 允许设备数据与 Azure Blob Storage 丰富的生态系统应用结合,实现更多业务场景,如数据分析和存档。
  • 低成本的长期存储:与数据库相比,Azure Blob Storage 提供了一种高可用性、可靠且成本效益高的对象存储服务,适合长期存储需求。

这些功能使您能够构建高效、可靠且可扩展的 IoT 应用,并从业务决策和优化中受益。

准备工作

本节介绍在 EMQX 中创建 Azure Blob Storage Sink 之前需要完成的准备工作。

前置准备

在 Azure Storage 中创建容器

  1. 要访问 Azure Storage,您需要一个 Azure 订阅。如果您还没有订阅,请在开始之前创建一个免费账户

  2. 所有对 Azure Storage 的访问都通过存储账户进行。对于本指南,请使用 Azure 门户、Azure PowerShell 或 Azure CLI 创建一个存储账户。有关创建存储账户的帮助,请参阅创建存储账户

  3. 要在 Azure 门户中创建容器,请导航到您的新存储账户。在存储账户的左侧菜单中,滚动到数据存储部分,然后选择 Container。选择 + Container 按钮,使用 iot-data 作为新容器的名称,然后点击 Create 以创建容器。

    azure-storage-container-create

  4. 导航到存储账户中的 Security + networking -> Access keys,并复制 Key。您将需要此密钥来配置 EMQX 中的 Sink。

    azure-storage-access-keys

创建连接器

在添加 Azure Blob Storage 数据 Sink 之前,您需要创建相应的连接器。

  1. 转到 Dashboard 集成 -> 连接器 页面。
  2. 点击右上角的创建按钮。
  3. 选择 Azure Blob Storage 作为连接器类型,然后点击下一步
  4. 输入连接器名称,名称应为大小写字母和数字的组合。在这里,输入 my-azure
  5. 输入连接信息。
    • 账户名称:您的存储账户名称
    • 访问密钥:之前创建的存储账户的 Key
  6. 在点击创建之前,您可以点击测试连接测试连接器是否能够连接到 Azure Storage。
  7. 点击底部的创建按钮,完成连接器的创建。

现在您已完成连接器的创建,将继续创建规则和 Sink,以指定要写入 Azure Storage 服务的数据。

创建 Azure Blob Storage Sink 规则

本节演示如何在 EMQX 中创建规则,以处理来自源 MQTT 主题 t/# 的消息,并通过配置的 Sink 将处理结果写入 Azure Storage 中的 iot-data 容器。

  1. 转到 Dashboard 集成 -> 规则页面。

  2. 点击右上角的创建按钮。

  3. 输入规则 ID my_rule,并在 SQL 编辑器中输入以下规则 SQL:

    sql
    SELECT
      *
    FROM
        "t/#"

    TIP

    如果您初次使用 SQL,可以点击 SQL 示例启用调试来学习和测试规则 SQL 的结果。

  4. 点击右侧的添加动作,从动作类型下拉列表中选择 Azure Blob Storage,将动作下拉菜单保持为默认的 create action 选项,或者从动作下拉菜单中选择先前创建的 Azure Blob Storage 动作。此处,创建一个新的 Sink 并将其添加到规则中。

  5. 输入 Sink 的名称和描述。

  6. 从连接器下拉列表中选择之前创建的 my-azure 连接器。您也可以点击下拉框旁的创建按钮,在弹出框中快速创建一个新连接器。所需的配置参数可以在创建连接器中找到。

  7. 设置容器,输入 iot-data

  8. 设置 Blob,输入 iot-data-blob

  9. 选择 上传方式。两种方式的区别如下:

    • 直接上传:每次触发规则时,数据会根据预设的对象键和值直接上传到 Azure Storage。这种方式适合存储二进制或大型文本数据,但可能会生成大量文件。
    • 聚合上传:此方式将多个规则触发结果打包到一个文件(如 CSV 文件)中,并上传到 Azure Storage,适合存储结构化数据。它可以减少文件数量并提高写入效率。

    每种方式的配置参数不同。请根据选择的方式进行配置:

  10. 展开高级设置,根据需要配置高级设置选项(可选)。有关详细信息,请参阅高级设置

  11. 其余设置使用默认值。点击创建按钮完成数据接收器的创建。创建成功后,页面将返回到规则创建,新的数据接收器将被添加到规则操作中。

  12. 返回到规则创建页面,点击创建按钮完成整个规则创建过程。

现在您已成功创建规则。您可以在规则页面上看到新创建的规则,以及在动作 (Sink) 选项卡上的新 Azure Blob Storage Sink。

您还可以点击集成 -> Flow 设计器查看拓扑。拓扑图形化地展示了主题 t/# 下的消息如何在被规则 my_rule 解析后写入 Azure Storage 容器中。

测试规则

本节展示如何测试配置了直接上传方式的规则。

使用 MQTTX 将消息发布到主题 t/1

bash
mqttx pub -i emqx_c -t t/1 -m '{ "msg": "Hello Azure" }'

发送几条消息后,访问 Azure 门户查看 iot-data 容器中上传的对象。

登录 Azure 门户,导航到存储账户,并打开 iot-data 容器。您应该在容器中看到上传的对象。

高级设置

本节深入介绍 Azure Blob Storage Sink 可用的高级配置选项。在 Dashboard 中配置数据接收器时,您可以展开高级设置以根据您的具体需求调整以下参数。

字段名称描述默认值
缓存池大小指定缓冲工作进程的数量,这些工作进程被分配来管理 EMQX 和 Azure Storage 之间的数据流。这些工作进程在将数据发送到目标服务之前暂时存储和处理数据,这对于优化性能和确保平稳的数据传输至关重要。16
请求超期 (TTL)"请求生存时间"(TTL)配置设置指定请求进入缓冲区后被认为有效的最长时间(以秒为单位)。此计时器从请求进入缓冲区的那一刻开始计时。如果请求在缓冲区中停留的时间超过此 TTL 设置,或者请求已发送但未及时收到来自 Azure Storage 的响应或确认,则该请求将被视为已过期。
健康检查间隔指定数据接收器与 Azure Storage 连接进行自动健康检查的时间间隔(以秒为单位)。15
缓冲队列最大长度指定 Azure Blob Storage 数据接收器中每个缓冲工作进程可以缓冲的最大字节数。缓冲工作进程在将数据发送到 Azure Storage 之前临时存储数据,充当中介角色以更高效地处理数据流。根据系统性能和数据传输需求调整此值。256
最大批量请求大小指定在单次传输操作中从 EMQX 传输到 Azure Storage 的数据批次的最大大小。通过调整批次大小,可以微调 EMQX 和 Azure Storage 之间的数据传输效率和性能。
如果将“批量大小”设置为“1”,则数据记录会单独发送,不会被分组到批次中。
1
请求模式允许您在 同步异步 请求模式之间进行选择,以根据不同需求优化消息传输。在异步模式下,写入 Azure Storage 不会阻塞 MQTT 消息发布过程。然而,这可能导致客户端在消息到达 Azure Storage 之前收到消息。异步
飞行窗口“飞行队列请求”是指已启动但尚未收到响应或确认的请求。此设置控制数据接收器与 Azure Storage 通信期间可以同时存在的飞行队列请求的最大数量。
请求模式设置为 异步 时,“请求飞行队列窗口”参数变得尤为重要。如果严格按顺序处理来自同一 MQTT 客户端的消息至关重要,那么此值应设置为 1
100