与 Azure Event Grid MQTT 桥接
Azure Event Grid 是 Azure 上的全托管事件路由服务。其 MQTT 代理功能支持物联网设备与云端应用之间大规模、标准化的双向 MQTT 通信。EMQX 提供了 Azure Event Grid 的内置连接器,使您能够在 EMQX 与 Azure Event Grid 之间桥接 MQTT 数据,无缝融入 Azure 云服务生态系统。
本页详细介绍了 EMQX 与 Azure Event Grid MQTT 的集成,并提供了创建和验证 Sink 与 Source 的实用操作指导。
工作原理
Azure Event Grid 数据集成是 EMQX 的开箱即用功能,将 EMQX 的设备接入与消息传输能力同 Azure Event Grid 云原生 MQTT 代理相结合。EMQX 以 MQTT 客户端身份连接到 Azure Event Grid MQTT 代理,实现双向消息传输:
- 消息流出(Sink):EMQX 将本地 MQTT 主题的消息发布到 Azure Event Grid 上的指定主题。
- 消息流入(Source):EMQX 订阅 Azure Event Grid 上的主题,并将收到的消息转发到 EMQX 本地主题。
下图展示了该集成的典型架构:

特性与优势
与 Azure Event Grid 的数据集成具有以下特性和优势:
- 基于标准协议的 MQTT 桥接:Azure Event Grid 支持 MQTT 3.1.1 和 MQTT 5.0,EMQX 可使用标准 MQTT 协议与之桥接,并与任何支持 MQTT 的客户端或服务互通。
- 双向数据流:支持从 EMQX 向 Azure Event Grid 发布消息(Sink),以及订阅 Azure Event Grid 主题并将消息转发到 EMQX(Source),实现灵活的物联网数据路由。
- 安全连接:连接器支持 TLS,并支持客户端证书认证。
- 灵活的主题映射:通过 EMQX 规则引擎,可以对消息进行过滤、转换,并通过动态主题映射将其路由到特定的 Azure Event Grid 主题空间。
- 丰富的 Azure 生态集成:数据到达 Azure Event Grid 后,可路由到 Azure Functions、Azure Event Hubs、Azure Storage 等其他 Azure 服务,进行进一步的处理和分析。
准备工作
前置准备
设置 Azure Event Grid
在 EMQX 中创建数据集成之前,需要先在 Azure 中设置启用了 MQTT 代理支持的 Event Grid 命名空间。以下 Microsoft 官方文档提供了分步操作指导:
完成设置后,请记录以下连接信息,创建连接器时需要用到:
- 主机名:Event Grid 命名空间的 MQTT 代理主机名,格式为
<namespace>.ts.<region>.eventgrid.azure.net,端口为8883。 - 客户端证书和私钥:Azure Event Grid 需要客户端证书认证。请从 Azure Event Grid 命名空间导出证书和私钥,以便在连接器中配置 TLS 时使用。
- 主题空间:您在 Azure Event Grid 中配置的主题空间和权限绑定。
TIP
有关支持的认证方式和 TLS 要求,请参阅 Azure Event Grid 官方文档。
创建连接器
本节介绍如何创建连接器,将 EMQX 连接到 Azure Event Grid。
在 EMQX Dashboard 中,点击集成 -> 连接器。
点击页面右上角的创建。
在创建连接器页面,选择 Azure Event Grid,然后点击下一步。
输入连接器名称,要求为大小写字母和数字的组合,例如
my_azure_event_grid。填写连接相关配置:
服务器地址:输入 Event Grid 命名空间的 MQTT 代理地址,例如
myns.northeurope-1.ts.eventgrid.azure.net:8883。默认端口为8883。客户端 ID 前缀:(可选)为 EMQX 自动生成的客户端 ID 指定前缀。EMQX 会按照
[前缀]:{连接器名称}{随机字符串}:{连接池序号}的格式生成唯一客户端 ID。详情请参阅连接池与客户端 ID 生成规则。用户名和密码:留空。Azure Event Grid MQTT 不使用用户名/密码认证。
Keepalive:指定心跳间隔,单位为秒,默认值为
160。MQTT 协议版本:选择 MQTT 协议版本。Azure Event Grid 支持 MQTT 3.1.1(
v4)和 MQTT 5.0(v5)。静态客户端 ID 映射表:(可选)为特定 EMQX 节点配置静态客户端 ID,适用于 Azure Event Grid 要求预先注册客户端 ID 的场景。详情请参阅配置静态客户端 ID。
TIP
配置静态客户端 ID 映射后,只有在映射中明确指定的 EMQX 节点才会创建 MQTT 连接。
清除会话:默认启用。启用后,EMQX 每次连接到 Azure Event Grid 时都会创建新的会话。
启用 TLS:开启此选项。Azure Event Grid 要求使用 TLS。如使用客户端证书认证,请在此处配置证书和私钥。详细的 TLS 配置选项,请参阅启用 TLS 加密访问外部资源。
高级设置(可选):详情请参阅连接器高级设置。
点击创建之前,可先点击测试连接,验证 EMQX 是否能成功连接到 Azure Event Grid。
点击创建按钮完成连接器创建。弹出创建成功对话框,询问是否立即创建规则。点击创建规则可直接进入规则创建页面并自动选中该连接器,或点击返回连接器列表稍后再创建规则。
创建 Azure Event Grid Sink 规则
本节介绍如何创建规则,将 EMQX 本地主题 t/# 的 MQTT 消息转发到 Azure Event Grid。
如果在上一步点击了创建规则,添加动作面板会自动打开,且动作类型已设置为
Azure Event Grid,连接器已预先选中,可直接跳到第 5 步。否则,在 EMQX Dashboard 中点击集成 -> 规则,点击右上角的创建,然后点击 + 添加动作。
在左侧 SQL 编辑器中输入规则 ID 和以下 SQL,匹配来自主题
t/#的消息:注意:如需自定义 SQL 语法,请确保
SELECT部分包含 Sink 所需的所有字段。sqlSELECT * FROM "t/#"TIP
如果您是初学者,可点击 SQL 示例和启用调试来学习和测试 SQL 规则。
在右侧添加动作面板中,从动作类型下拉列表中选择
Azure Event Grid,动作下拉框保持默认的创建动作。在连接器下拉框中选择刚创建的
my_azure_event_grid连接器。也可以点击下拉框旁的按钮创建新连接器,配置参数请参考创建连接器。输入 Sink 的名称和可选描述。
配置向 Azure Event Grid 发布消息的 Sink 参数:
- 主题:发布到 Azure Event Grid 的主题,支持
${var}占位符。例如,输入devices/${clientid}/messages可根据客户端 ID 动态设置主题。 - QoS:发布消息的 QoS 等级。可选
0、1、2,或使用${qos}等占位符跟随原始消息的 QoS。 - Retain:选择
true、false或使用${flags.retain}等占位符设置保留标志。 - 消息模版:消息 payload 模板。留空则转发完整的规则输出,或输入
${payload}仅转发 payload。
- 主题:发布到 Azure Event Grid 的主题,支持
备选动作(可选):如需在消息投递失败时提高可靠性,可定义一个或多个备选动作,当主 Sink 处理消息失败时触发。详情请参阅备选动作。
高级设置(可选):详情请参阅 Sink 高级设置。
点击创建之前,可先点击测试连接,验证 Sink 能否连接到 Azure Event Grid。
点击创建按钮完成 Sink 配置,新 Sink 将添加到动作输出中。
返回创建规则页面,确认配置信息无误后,点击保存按钮生成规则。
至此,您已成功创建规则。可以在集成 -> 规则页面查看新创建的规则,点击**动作(Sink)**标签可查看新的 Azure Event Grid Sink。
您还可以点击集成 -> 流程设计器查看拓扑图,确认主题 t/# 下的消息经规则 my_rule 处理后被转发到 Azure Event Grid。
创建 Azure Event Grid Source 规则
本节介绍如何创建规则,订阅 Azure Event Grid 上的消息并将其转发到 EMQX 本地主题。
创建 Azure Event Grid Source 并添加到规则
在 EMQX Dashboard 中,点击集成 -> 规则,然后点击右上角的创建。
输入
my_rule_source作为规则 ID。配置规则的触发来源。在页面右侧数据输入标签下,删除默认的消息类型输入,然后点击添加输入以创建 Azure Event Grid Source。
在添加输入对话框中,从输入类型下拉框中选择
Azure Event Grid,Source 下拉框保持默认的创建 Source。输入 Source 的名称和描述。
在下拉框中选择
my_azure_event_grid连接器。配置订阅 Azure Event Grid 的 Source 参数:
主题:订阅 Azure Event Grid 的主题,支持
+和#通配符。TIP
当 EMQX 在集群模式下运行,或连接器配置了连接池时,请使用共享订阅以避免重复消息,例如
$share/group/devices/#。QoS:订阅的 QoS,从下拉框中选择
0或1。
点击创建按钮完成 Source 创建,规则 SQL 将自动更新为:
sqlSELECT * FROM "$bridges/azure_event_grid:<source_name>"
创建重新发布动作
从 Azure Event Grid 订阅的消息不会自动转发到 EMQX 本地主题,需要创建重新发布(Republish)动作来完成消息路由。
切换到创建规则页面右侧的动作输出标签,点击添加动作。
从动作类型下拉框中选择
重新发布。配置重新发布参数:
- 主题:输入目标本地主题,例如
azure/${topic},为原始主题添加azure/前缀。 - QoS:选择
${qos}跟随原始消息的 QoS,或设置固定值。 - Retain:选择
false或使用占位符。 - 消息模版:输入
${payload}仅转发 payload,或留空以转发完整的规则输出。
- 主题:输入目标本地主题,例如
点击添加完成动作创建,然后点击保存生成规则。
测试规则
测试 Sink
使用 MQTTX 向 EMQX 的主题 t/1 发布一条消息:
mqttx pub -i emqx_c -t t/1 -m '{ "msg": "Hello Azure Event Grid" }'查看 Azure Event Grid Sink 的运行统计,应有 1 条新的匹配消息和 1 条新的发出消息。在 Azure 门户或使用 Azure Event Grid MQTT 客户端验证消息是否已被正确接收。
测试 Source
订阅 EMQX 本地主题
azure/#:bashmqttx sub -t azure/# -q 1 -v使用配置了 Azure Event Grid 凭证的 MQTT 客户端向 Azure Event Grid 发布消息:
bashmqttx pub -t devices/device1/messages -m "hello from azure" \ -h myns.northeurope-1.ts.eventgrid.azure.net -p 8883 \ --tls --cert /path/to/client.crt --key /path/to/client.key您将在 EMQX 的
azure/devices/device1/messages主题上收到该消息:bashtopic: azure/devices/device1/messages payload: hello from azure
高级设置
本节介绍 Azure Event Grid 连接器和 Sink 的高级配置选项。在 Dashboard 中配置时,展开高级设置即可根据实际需求调整以下参数。
连接器高级设置
| 字段名称 | 说明 | 默认值 |
|---|---|---|
| 消息重发间隔 | 消息投递失败时,两次重试之间的等待时间。 | 15 秒 |
| 桥接模式 | 启用后,连接器将以 MQTT 桥接模式连接,向远端 Broker 声明本连接为桥接连接。 | 禁用 |
| 飞行窗口 | 每个连接中允许同时存在的未确认消息最大数量。 | 32 |
| 连接池大小 | 与 Azure Event Grid 保持的并发 MQTT 连接数。增大该值可提升吞吐量。 | 8 |
| 连接超时 | 等待与 Azure Event Grid 建立 TCP 连接的最长时间。 | 10 秒 |
| 启动超时时间 | 连接器启动后,等待资源进入健康状态再开始接受请求的最长时间。 | 5 秒 |
| 健康检查间隔 | 连接器对连接进行自动健康检查的时间间隔。 | 15 秒 |
| 健康检查超时 | 每次健康检查允许完成的最长时间。 | 60 秒 |
Sink 高级设置
| 字段名称 | 说明 | 默认值 |
|---|---|---|
| 缓存池大小 | 负责处理 EMQX 与 Azure Event Grid 之间数据流的缓存工作进程数量。高负载下可适当增大该值以提升吞吐量。 | 16 |
| 请求超期 | 请求在缓冲区中保持有效的最长时间。超过该时限的请求(无论仍在排队还是已发送但未收到确认)将被丢弃。 | 45 秒 |
| 健康检查间隔 | Sink 对连接进行自动健康检查的时间间隔。 | 15 秒 |
| 健康检查间隔抖动 | 在健康检查间隔上随机增加的延迟,防止多个节点同时发起检查。当多个动作或数据源共用同一连接器时尤为有用。 | 0 毫秒 |
| 健康检查超时 | 每次 Sink 健康检查允许完成的最长时间。 | 60 秒 |
| 缓存队列最大长度 | 每个缓存工作进程可缓存的最大字节数。如果工作负载存在突发流量超出默认容量的情况,可适当增大该值。 | 256 MB |
| 请求模式 | 异步 模式下,EMQX 无需等待 Azure Event Grid 确认即可继续发布消息;同步 模式下则等待确认后再继续。异步模式吞吐量更高,但可能出现乱序投递。 | 异步 |
| 请求飞行队列窗口 | 同时允许存在的最大未确认请求数。当请求模式为异步时,若需保证单个客户端的消息顺序,请将此值设为 1。 | 100 |
Source 高级设置
| 字段名称 | 说明 | 默认值 |
|---|---|---|
| 健康检查间隔 | Source 对连接进行自动健康检查的时间间隔。 | 15 秒 |
| 健康检查间隔抖动 | 在健康检查间隔上随机增加的延迟,防止多个节点同时发起检查。当多个动作或数据源共用同一连接器时尤为有用。 | 0 毫秒 |
| 健康检查超时 | 每次 Source 健康检查允许完成的最长时间。 | 60 秒 |