Skip to content

与 Azure Event Grid MQTT 桥接

Azure Event Grid 是 Azure 上的全托管事件路由服务。其 MQTT 代理功能支持物联网设备与云端应用之间大规模、标准化的双向 MQTT 通信。EMQX 提供了 Azure Event Grid 的内置连接器,使您能够在 EMQX 与 Azure Event Grid 之间桥接 MQTT 数据,无缝融入 Azure 云服务生态系统。

本页详细介绍了 EMQX 与 Azure Event Grid MQTT 的集成,并提供了创建和验证 Sink 与 Source 的实用操作指导。

工作原理

Azure Event Grid 数据集成是 EMQX 的开箱即用功能,将 EMQX 的设备接入与消息传输能力同 Azure Event Grid 云原生 MQTT 代理相结合。EMQX 以 MQTT 客户端身份连接到 Azure Event Grid MQTT 代理,实现双向消息传输:

  • 消息流出(Sink):EMQX 将本地 MQTT 主题的消息发布到 Azure Event Grid 上的指定主题。
  • 消息流入(Source):EMQX 订阅 Azure Event Grid 上的主题,并将收到的消息转发到 EMQX 本地主题。

下图展示了该集成的典型架构:

EMQX Integration Azure Event Grid

特性与优势

与 Azure Event Grid 的数据集成具有以下特性和优势:

  • 基于标准协议的 MQTT 桥接:Azure Event Grid 支持 MQTT 3.1.1 和 MQTT 5.0,EMQX 可使用标准 MQTT 协议与之桥接,并与任何支持 MQTT 的客户端或服务互通。
  • 双向数据流:支持从 EMQX 向 Azure Event Grid 发布消息(Sink),以及订阅 Azure Event Grid 主题并将消息转发到 EMQX(Source),实现灵活的物联网数据路由。
  • 安全连接:连接器支持 TLS,并支持客户端证书认证。
  • 灵活的主题映射:通过 EMQX 规则引擎,可以对消息进行过滤、转换,并通过动态主题映射将其路由到特定的 Azure Event Grid 主题空间。
  • 丰富的 Azure 生态集成:数据到达 Azure Event Grid 后,可路由到 Azure Functions、Azure Event Hubs、Azure Storage 等其他 Azure 服务,进行进一步的处理和分析。

准备工作

前置准备

设置 Azure Event Grid

在 EMQX 中创建数据集成之前,需要先在 Azure 中设置启用了 MQTT 代理支持的 Event Grid 命名空间。以下 Microsoft 官方文档提供了分步操作指导:

完成设置后,请记录以下连接信息,创建连接器时需要用到:

  • 主机名:Event Grid 命名空间的 MQTT 代理主机名,格式为 <namespace>.ts.<region>.eventgrid.azure.net,端口为 8883
  • 客户端证书和私钥:Azure Event Grid 需要客户端证书认证。请从 Azure Event Grid 命名空间导出证书和私钥,以便在连接器中配置 TLS 时使用。
  • 主题空间:您在 Azure Event Grid 中配置的主题空间和权限绑定。

TIP

有关支持的认证方式和 TLS 要求,请参阅 Azure Event Grid 官方文档

创建连接器

本节介绍如何创建连接器,将 EMQX 连接到 Azure Event Grid。

  1. 在 EMQX Dashboard 中,点击集成 -> 连接器

  2. 点击页面右上角的创建

  3. 创建连接器页面,选择 Azure Event Grid,然后点击下一步

  4. 输入连接器名称,要求为大小写字母和数字的组合,例如 my_azure_event_grid

  5. 填写连接相关配置:

    • 服务器地址:输入 Event Grid 命名空间的 MQTT 代理地址,例如 myns.northeurope-1.ts.eventgrid.azure.net:8883。默认端口为 8883

    • 客户端 ID 前缀:(可选)为 EMQX 自动生成的客户端 ID 指定前缀。EMQX 会按照 [前缀]:{连接器名称}{随机字符串}:{连接池序号} 的格式生成唯一客户端 ID。详情请参阅连接池与客户端 ID 生成规则

    • 用户名密码:留空。Azure Event Grid MQTT 不使用用户名/密码认证。

    • Keepalive:指定心跳间隔,单位为秒,默认值为 160

    • MQTT 协议版本:选择 MQTT 协议版本。Azure Event Grid 支持 MQTT 3.1.1(v4)和 MQTT 5.0(v5)。

    • 静态客户端 ID 映射表:(可选)为特定 EMQX 节点配置静态客户端 ID,适用于 Azure Event Grid 要求预先注册客户端 ID 的场景。详情请参阅配置静态客户端 ID

      TIP

      配置静态客户端 ID 映射后,只有在映射中明确指定的 EMQX 节点才会创建 MQTT 连接。

    • 清除会话:默认启用。启用后,EMQX 每次连接到 Azure Event Grid 时都会创建新的会话。

    • 启用 TLS:开启此选项。Azure Event Grid 要求使用 TLS。如使用客户端证书认证,请在此处配置证书和私钥。详细的 TLS 配置选项,请参阅启用 TLS 加密访问外部资源

  6. 高级设置(可选):详情请参阅连接器高级设置

  7. 点击创建之前,可先点击测试连接,验证 EMQX 是否能成功连接到 Azure Event Grid。

  8. 点击创建按钮完成连接器创建。弹出创建成功对话框,询问是否立即创建规则。点击创建规则可直接进入规则创建页面并自动选中该连接器,或点击返回连接器列表稍后再创建规则。

创建 Azure Event Grid Sink 规则

本节介绍如何创建规则,将 EMQX 本地主题 t/# 的 MQTT 消息转发到 Azure Event Grid。

  1. 如果在上一步点击了创建规则添加动作面板会自动打开,且动作类型已设置为 Azure Event Grid,连接器已预先选中,可直接跳到第 5 步。

    否则,在 EMQX Dashboard 中点击集成 -> 规则,点击右上角的创建,然后点击 + 添加动作

  2. 在左侧 SQL 编辑器中输入规则 ID 和以下 SQL,匹配来自主题 t/# 的消息:

    注意:如需自定义 SQL 语法,请确保 SELECT 部分包含 Sink 所需的所有字段。

    sql
    SELECT
      *
    FROM
      "t/#"

    TIP

    如果您是初学者,可点击 SQL 示例启用调试来学习和测试 SQL 规则。

  3. 在右侧添加动作面板中,从动作类型下拉列表中选择 Azure Event Grid动作下拉框保持默认的创建动作

  4. 连接器下拉框中选择刚创建的 my_azure_event_grid 连接器。也可以点击下拉框旁的按钮创建新连接器,配置参数请参考创建连接器

  5. 输入 Sink 的名称和可选描述。

  6. 配置向 Azure Event Grid 发布消息的 Sink 参数:

    • 主题:发布到 Azure Event Grid 的主题,支持 ${var} 占位符。例如,输入 devices/${clientid}/messages 可根据客户端 ID 动态设置主题。
    • QoS:发布消息的 QoS 等级。可选 012,或使用 ${qos} 等占位符跟随原始消息的 QoS。
    • Retain:选择 truefalse 或使用 ${flags.retain} 等占位符设置保留标志。
    • 消息模版:消息 payload 模板。留空则转发完整的规则输出,或输入 ${payload} 仅转发 payload。
  7. 备选动作(可选):如需在消息投递失败时提高可靠性,可定义一个或多个备选动作,当主 Sink 处理消息失败时触发。详情请参阅备选动作

  8. 高级设置(可选):详情请参阅 Sink 高级设置

  9. 点击创建之前,可先点击测试连接,验证 Sink 能否连接到 Azure Event Grid。

  10. 点击创建按钮完成 Sink 配置,新 Sink 将添加到动作输出中。

  11. 返回创建规则页面,确认配置信息无误后,点击保存按钮生成规则。

至此,您已成功创建规则。可以在集成 -> 规则页面查看新创建的规则,点击**动作(Sink)**标签可查看新的 Azure Event Grid Sink。

您还可以点击集成 -> 流程设计器查看拓扑图,确认主题 t/# 下的消息经规则 my_rule 处理后被转发到 Azure Event Grid。

创建 Azure Event Grid Source 规则

本节介绍如何创建规则,订阅 Azure Event Grid 上的消息并将其转发到 EMQX 本地主题。

创建 Azure Event Grid Source 并添加到规则

  1. 在 EMQX Dashboard 中,点击集成 -> 规则,然后点击右上角的创建

  2. 输入 my_rule_source 作为规则 ID。

  3. 配置规则的触发来源。在页面右侧数据输入标签下,删除默认的消息类型输入,然后点击添加输入以创建 Azure Event Grid Source。

  4. 添加输入对话框中,从输入类型下拉框中选择 Azure Event GridSource 下拉框保持默认的创建 Source

  5. 输入 Source 的名称和描述。

  6. 在下拉框中选择 my_azure_event_grid 连接器。

  7. 配置订阅 Azure Event Grid 的 Source 参数:

    • 主题:订阅 Azure Event Grid 的主题,支持 +# 通配符。

      TIP

      当 EMQX 在集群模式下运行,或连接器配置了连接池时,请使用共享订阅以避免重复消息,例如 $share/group/devices/#

    • QoS:订阅的 QoS,从下拉框中选择 01

  8. 点击创建按钮完成 Source 创建,规则 SQL 将自动更新为:

    sql
    SELECT
      *
    FROM
      "$bridges/azure_event_grid:<source_name>"

创建重新发布动作

从 Azure Event Grid 订阅的消息不会自动转发到 EMQX 本地主题,需要创建重新发布(Republish)动作来完成消息路由。

  1. 切换到创建规则页面右侧的动作输出标签,点击添加动作

  2. 动作类型下拉框中选择重新发布

  3. 配置重新发布参数:

    • 主题:输入目标本地主题,例如 azure/${topic},为原始主题添加 azure/ 前缀。
    • QoS:选择 ${qos} 跟随原始消息的 QoS,或设置固定值。
    • Retain:选择 false 或使用占位符。
    • 消息模版:输入 ${payload} 仅转发 payload,或留空以转发完整的规则输出。
  4. 点击添加完成动作创建,然后点击保存生成规则。

测试规则

测试 Sink

使用 MQTTX 向 EMQX 的主题 t/1 发布一条消息:

bash
mqttx pub -i emqx_c -t t/1 -m '{ "msg": "Hello Azure Event Grid" }'

查看 Azure Event Grid Sink 的运行统计,应有 1 条新的匹配消息和 1 条新的发出消息。在 Azure 门户或使用 Azure Event Grid MQTT 客户端验证消息是否已被正确接收。

测试 Source

  1. 订阅 EMQX 本地主题 azure/#

    bash
    mqttx sub -t azure/# -q 1 -v
  2. 使用配置了 Azure Event Grid 凭证的 MQTT 客户端向 Azure Event Grid 发布消息:

    bash
    mqttx pub -t devices/device1/messages -m "hello from azure" \
      -h myns.northeurope-1.ts.eventgrid.azure.net -p 8883 \
      --tls --cert /path/to/client.crt --key /path/to/client.key
  3. 您将在 EMQX 的 azure/devices/device1/messages 主题上收到该消息:

    bash
    topic: azure/devices/device1/messages
    payload: hello from azure

高级设置

本节介绍 Azure Event Grid 连接器和 Sink 的高级配置选项。在 Dashboard 中配置时,展开高级设置即可根据实际需求调整以下参数。

连接器高级设置

字段名称说明默认值
消息重发间隔消息投递失败时,两次重试之间的等待时间。15
桥接模式启用后,连接器将以 MQTT 桥接模式连接,向远端 Broker 声明本连接为桥接连接。禁用
飞行窗口每个连接中允许同时存在的未确认消息最大数量。32
连接池大小与 Azure Event Grid 保持的并发 MQTT 连接数。增大该值可提升吞吐量。8
连接超时等待与 Azure Event Grid 建立 TCP 连接的最长时间。10
启动超时时间连接器启动后,等待资源进入健康状态再开始接受请求的最长时间。5
健康检查间隔连接器对连接进行自动健康检查的时间间隔。15
健康检查超时每次健康检查允许完成的最长时间。60

Sink 高级设置

字段名称说明默认值
缓存池大小负责处理 EMQX 与 Azure Event Grid 之间数据流的缓存工作进程数量。高负载下可适当增大该值以提升吞吐量。16
请求超期请求在缓冲区中保持有效的最长时间。超过该时限的请求(无论仍在排队还是已发送但未收到确认)将被丢弃。45
健康检查间隔Sink 对连接进行自动健康检查的时间间隔。15
健康检查间隔抖动在健康检查间隔上随机增加的延迟,防止多个节点同时发起检查。当多个动作或数据源共用同一连接器时尤为有用。0 毫秒
健康检查超时每次 Sink 健康检查允许完成的最长时间。60
缓存队列最大长度每个缓存工作进程可缓存的最大字节数。如果工作负载存在突发流量超出默认容量的情况,可适当增大该值。256 MB
请求模式异步 模式下,EMQX 无需等待 Azure Event Grid 确认即可继续发布消息;同步 模式下则等待确认后再继续。异步模式吞吐量更高,但可能出现乱序投递。异步
请求飞行队列窗口同时允许存在的最大未确认请求数。当请求模式异步时,若需保证单个客户端的消息顺序,请将此值设为 1100

Source 高级设置

字段名称说明默认值
健康检查间隔Source 对连接进行自动健康检查的时间间隔。15
健康检查间隔抖动在健康检查间隔上随机增加的延迟,防止多个节点同时发起检查。当多个动作或数据源共用同一连接器时尤为有用。0 毫秒
健康检查超时每次 Source 健康检查允许完成的最长时间。60