Skip to content

将 MQTT 数据导入 Azure Blob Storage

Azure Blob Storage 是微软基于云的对象存储解决方案,专为处理大量非结构化数据而设计。非结构化数据指的是不遵循特定数据模型或格式的数据类型,例如文本文件或二进制数据。EMQX Platform 可以高效地将 MQTT 消息存储在 Blob Storage 容器中,为物联网 (IoT) 数据存储提供了一个多功能的解决方案。

本页提供了 EMQX Platform 与 Azure Blob Storage 数据集成的全面介绍,并提供了创建和验证数据集成的实用指导。

工作原理

Azure Blob Storage 数据集成是 EMQX Platform 中一个开箱即用的功能,可以轻松配置以应对复杂的业务发展。在典型的 IoT 应用中,EMQX Platform 作为负责设备连接和消息传输的物联网平台,而 Azure Blob Storage 则作为数据存储平台,处理消息数据的存储。

EMQX Platform Azure Blob Storage  数据集成

EMQX Platform 利用规则引擎和数据接收器将设备事件和数据转发到 Azure Blob Storage。应用程序可以从 Azure Blob Storage 中读取数据,用于进一步的数据应用。具体工作流程如下:

  1. 设备连接到 EMQX Platform:IoT 设备通过 MQTT 协议成功连接后触发上线事件。该事件包括设备 ID、来源 IP 地址以及其他属性信息。
  2. 设备消息发布和接收:设备通过特定主题发布遥测和状态数据。EMQX Platform 接收这些消息,并在规则引擎中进行匹配。
  3. 规则引擎处理消息:内置规则引擎根据主题匹配处理来自特定来源的消息和事件。它匹配相应的规则,并处理消息和事件,如数据格式转换、过滤特定信息或用上下文信息丰富消息。
  4. 写入 Azure Blob Storage:规则触发一个动作,将消息写入存储容器。使用 Azure Blob Storage Sink,用户可以从处理结果中提取数据并发送到 Blob Storage。根据消息内容和 Sink 中的配置,消息可以以文本或二进制格式存储,或将多行结构化数据汇总到单个 CSV 文件中。

事件和消息数据写入存储容器后,可以连接到 Azure Blob Storage 读取数据,以实现灵活的应用开发,例如:

  • 数据存档:将设备消息作为对象存储在 Azure Blob Storage 中,以实现长期保存,满足合规要求或业务需求。
  • 数据分析:将存储容器中的数据导入分析服务,如 Snowflake,用于预测性维护、设备效率评估等数据分析服务。

特性与优势

在 EMQX Platform 中使用 Azure Blob Storage 数据集成可以为您的业务带来以下功能和优势:

  • 消息转换:消息在写入 Azure Blob Storage 之前,可以在 EMQX Platform 规则中进行广泛的处理和转换,以便后续存储和使用。
  • 灵活的数据操作:通过 Azure Blob Storage Sink,可以方便地将特定字段的数据写入 Azure Blob Storage 容器中,支持动态设置容器和对象键,实现灵活的数据存储。
  • 集成的业务流程:Azure Blob Storage Sink 允许设备数据与 Azure Blob Storage 丰富的生态系统应用结合,实现更多业务场景,如数据分析和存档。
  • 低成本的长期存储:与数据库相比,Azure Blob Storage 提供了一种高可用性、可靠且成本效益高的对象存储服务,适合长期存储需求。

这些功能使您能够构建高效、可靠且可扩展的 IoT 应用,并从业务决策和优化中受益。

准备工作

本节介绍了在 EMQX Platform 中创建 Azure Blob Storage 数据集成之前需要做的准备工作。

前置准备

在 Azure Storage 中创建容器

  1. 要访问 Azure Storage,您需要一个 Azure 订阅。如果您还没有订阅,请在开始之前创建一个免费账户

  2. 所有对 Azure Storage 的访问都通过存储账户进行。对于本指南,请使用 Azure 门户、Azure PowerShell 或 Azure CLI 创建一个存储账户。有关创建存储账户的帮助,请参阅创建存储账户

  3. 要在 Azure 门户中创建容器,请导航到您的新存储账户。在存储账户的左侧菜单中,滚动到数据存储部分,然后选择 Container。选择 + Container 按钮,使用 iot-data 作为新容器的名称,然后点击 Create 以创建容器。

    azure-storage-container-create

  4. 导航到存储账户中的 Security + networking -> Access keys,并复制 Key。您将需要此密钥来配置 EMQX Platform 中的 Sink。

    azure-storage-access-keys

创建 Azure Blob Storage 连接器

在创建数据集成的规则之前,您需要先创建一个 Azure Blob Storage 连接器用于访问 Azure Blob Storage。

  1. 在部署菜单中选择 数据集成,在数据持久化服务分类下选择 Azure Blob Storage 服务。如果您已经创建了其他的连接器,点击新建连接器,然后在数据持久化服务分类下选择 Azure Blob Storage 服务。

  2. 连接器名称:系统将自动生成一个连接器的名称。

  3. 输入连接信息:

    • 账户名称:您的存储账户名称
    • 访问密钥:之前创建的存储账户的 Key
    • 高级设置(可选):请参阅高级配置
  4. 点击测试连接按钮,如果 Azure Blob Storage 服务能够正常访问,则会返回成功提示。

  5. 点击新建按钮完成连接器的创建。

创建规则

接下来您需要创建一条规则来指定需要写入的数据,并在规则中添加响应动作以将经规则处理的数据转发到 Azure Blob Storage。

  1. 点击连接器列表操作列下的新建规则图标或在规则列表中点击新建规则进入新建规则步骤页。

  2. 在 SQL 编辑器中输入规则:

    sql
    SELECT
      *
    FROM
        "t/#"

    TIP

    如果您初次使用 SQL,可以点击 SQL 示例启用调试来学习和测试规则 SQL 的结果。

  3. 点击下一步开始创建动作。

  4. 使用连接器下拉框中选择您之前创建的连接器。

  5. 设置容器,输入 iot-data

  6. 选择上传方式。两种方式的区别如下:

    • 直接上传:每次触发规则时,数据会根据预设的对象键和值直接上传到 Azure Storage。这种方式适合存储二进制或大型文本数据,但可能会生成大量文件。
    • 聚合上传:此方式将多个规则触发结果打包到一个文件(如 CSV 文件)中,并上传到 Azure Storage,适合存储结构化数据。它可以减少文件数量并提高写入效率。

    每种方式的配置参数不同。请根据选择的方式进行配置:

  7. 展开高级设置,根据需要配置高级设置选项(可选)。有关详细信息,请参阅高级配置

  8. 点击确认按钮完成动作的配置。

  9. 在弹出的成功创建规则提示框中点击返回规则列表,从而完成整个数据集成的配置。

测试规则

本节展示如何测试配置了直接上传方式的规则。

  1. 使用 MQTTX 将消息发布到主题 t/1
bash
mqttx pub -i EMQX_c -t t/1 -m '{ "msg": "Hello Azure" }'
  1. 发送几条消息后,访问 Azure 门户查看 iot-data 容器中上传的对象。

    登录 Azure 门户,导航到存储账户,并打开 iot-data 容器。您应该在容器中看到上传的对象。

  2. 在控制台查看运行数据。在规则列表点击规则 ID,在运行统计页面可以查看到规则的统计以及此规则下所有动作的统计。