将 MQTT 数据传输到 GCP PubSub
Google Cloud PubSub 是一种异步消息传递服务,旨在实现极高的可靠性和可扩缩性。EMQX 支持与 Google Cloud Pub/Sub 的无缝集成,用于实时提取、处理和分析 MQTT 数据。此外,它还支持将数据推送并订阅各种Google Cloud 服务,如 Cloud Functions、App Engine、Cloud Run、Kubernetes Engine 和 Compute Engine。
借助 EMQX GCP PubSub 集成,您可以将 MQTT 消息和客户端事件发送到 GCP PubSub 中,这能够帮助您更快的基于 GCP 构建物联网应用,助力你从 GCP IoT Core 迁移到 EMQX 中。
TIP
EMQX 企业版功能。EMQX 企业版可以为您带来更全面的关键业务场景覆盖、更丰富的数据集成支持,更高的生产级可靠性保证以及 24/7 的全球技术支持,欢迎免费试用。
Google Cloud Pub/Sub 是一种异步消息传递服务,旨在实现极高的可靠性和可扩缩性。EMQX 支持与 Google Cloud Pub/Sub 的无缝集成,能够实时提取、处理和分析 MQTT 数据,并将数据推送到各类 Google Cloud 服务,如 Cloud Functions、App Engine、Cloud Run、Kubernetes Engine 和 Compute Engine 中,或将 Google Cloud 中的数据通过 MQTT 下发,帮助用户更快的基于 GCP 构建物联网应用。
本页详细介绍了 EMQX 与 GCP Pub/Sub 的数据集成并提供了实用的规则和数据桥接创建指导。
工作原理
GCP Pub/Sub 数据桥接是 EMQX 的开箱即用功能,旨在帮助用户轻松地将 MQTT 数据流与 Google Cloud 集成,并利用其丰富的服务和功能实现物联网应用开发。
EMQX 通过规则引擎与数据桥接将 MQTT 数据转发至 GCP Pub/Sub,以 GCP Pub/Sub 生产者角色为例,其完整流程如下:
- 物联网设备发布消息:设备通过特定的主题发布遥测和状态数据,消息将触发规则引擎。
- 规则引擎处理消息:通过内置的规则引擎,可以根据主题匹配处理特定来源的 MQTT 消息。规则引擎会匹配对应的规则,并对消息进行处理,例如转换数据格式、过滤掉特定信息或使用上下文信息丰富消息。
- 桥接到 GCP Pub/Sub:规则触发将消息转发到 GCP Pub/Sub 的动作,允许轻松配置数据到 GCP Pub/Sub 属性,排序键,以及 MQTT 主题到 GCP Pub/Sub 主题的映射关系,可以为数据集成提供更丰富的上下文信息和顺序保证,实现灵活的物联网数据处理。
MQTT 消息数据写入到 GCP PusSub 后,您可以进行灵活的应用开发,例如:
实时数据处理和分析:利用 Google Cloud 的强大数据处理和分析工具,如 Dataflow、BigQuery 和 Pub/Sub 自身的流处理功能,对消息数据进行实时处理和分析,从而获得有价值的洞察和决策支持。
事件驱动的功能:触发 Google Cloud 的事件处理如 Cloud Functions 和 Cloud Run,以实现动态、灵活的功能触发和处理。
数据存储和共享:将消息数据传输到 Google Cloud 的存储服务中,如 Cloud Storage 和 Firestore,以便安全地存储和管理大量的数据,并与其他 Google Cloud 服务共享和分析这些数据,以满足不同的业务需求。
特性与优势
将 EMQX 与 GCP Pub/Sub 结合使用具有以下特性与优势:
强大的消息传递服务:EMQX 与 GCP Pub/Sub 都具备高可用、可扩展的特性,能够可靠地接收、传递和处理大规模的消息流,支持物联网数据顺序传递、消息质量保证以及持久化等特性,确保消息的可靠传递和处理。
灵活的规则引擎:通过内置的规则引擎,可以根据主题匹配处理特定来源的消息和事件。并对消息和事件进行处理,例如转换数据格式、过滤掉特定信息或使用上下文信息丰富消息,结合 GCP Pub/Sub 可以进行进一步处理和分析。
丰富的上下文信息:通过 GCP Pub/Sub 桥接,你可以在消息中添加更丰富的上下文信息,实现客户端属性与 Pub/Sub 属性、排序键等的映射,可以帮助在后续的应用开发和数据处理中进行更精确的分析和处理。
综上所述,将 EMQX 和 GCP Pub/Sub 结合使用可以实现高可靠性、可扩展性的消息传递,并通过丰富工具和服务进行数据分析与集成,这使得你能够构建强大的物联网应用,并基于事件驱动的功能实现灵活的业务逻辑。
桥接准备
本节介绍如何配置 GCP Pub/Sub,并创建主题与获取连接凭证。
前置准备
创建服务账户凭证
服务账户凭证是用于身份验证和授权的 JSON 文件,EMQX 需要通过它访问 Pub/Sub 资源。
- 进入 GCP 控制台,在搜索框中输入 IAM 并进入 IAM & Admin 页面。
- 在 IAM & Admin 页面中点击 Service Accounts -> Email 中对应的邮箱,选择 KEYS 标签页,点击 ADD KEY 添加以生成用于身份认证 JSON 格式的 key,请妥善保管该文件。
在 GCP Pub/Sub 中创建主题
- 打开 Pub/Sub 控制台,点击 **CREATE TOPIC,**输入自定义的 **Topic ID,**点击 CREATE 即可完成创建。
- 点击列表页对应的 Topic ID 即可进入 Topic 详情页面,您需要创建一个 subscription 来保存消息,有关 subscription 详细介绍请参考 CCP Pub/Sub subscription,此处选择 Pull 类型,保留 7 天历史消息。
- 点击 Subscription ID → MESSAGES → PULL 可以在线查看发送到主题中的消息。
创建 GCP Pub/Sub 数据桥接
本节介绍如何配置 GCP Pub/Sub 数据桥接,在配置 GCP Pub/Sub 之前,必须先在 GCP 上创建好对应的服务账户凭证以及 Pub/Sub 主题。
转到 Dashboard 数据集成 -> 数据桥接页面。
点击页面右上角的创建。
在数据桥接类型中选择 GCP Pub/Sub,点击下一步。
输入数据桥接名称,要求是大小写英文字母和数字的组合。
在桥接角色字段中,根据业务需求从下拉列表中选择
生产者
或消费者
,并完成相应的配置。- GCP PubSub 主题:输入在 GCP 中创建和管理主题中创建的主题 ID
my-iot-core
。 - GCP 服务账户凭证:上传您在 在 GCP 中创建服务帐号密钥 中导出的 JSON 格式的服务帐号凭据。
- HTTP 请求消息体模版:留空或定义一个模板。
- 如果留空,它将使用 JSON 格式对 MQTT 消息中的所有可见输入进行编码,例如 clientid、topic、payload 等。
- 如果使用定义的模板,占位符的形式为
${variable_name}
,将用 MQTT 上下文中的相应值填充。例如,如果 MQTT 消息主题是my/topic
,则${topic}
将被替换为my/topic
。
- 属性模板和排序键模板(可选):类似地,您可以定义用于格式化传出消息的属性和/或排序键的模板。
- 对于属性,键和值都可以使用
${variable_name}
形式的占位符。这样的值将从 MQTT 上下文中提取出来。如果键模板解析为空字符串,则该键将从传出到 GCP Pub/Sub 的消息中省略。 - 对于顺序键,可以使用
${variable_name}
形式的占位符。如果解析后的值为空字符串,则不会为 GCP Pub/Sub 传出消息设置orderingKey
字段。
- 对于属性,键和值都可以使用
- 高级配置(可选),根据情况配置同步/异步模式,队列等参数。
- GCP PubSub 主题:输入在 GCP 中创建和管理主题中创建的主题 ID
设置完成后,您可点击测试连接按钮进行验证。
点击创建按钮完成数据桥接创建。
至此您已经完成数据桥接创建流程,接下来将继续创建一条规则来指定需要写入的数据。
创建数据转发规则
- 转到 Dashboard 数据集成 -> 规则页面。
- 点击页面右上角的创建。
- 输入规则 ID,例如:
my_rule
。 - 在 SQL 编辑器中输入规则,请确保规则选择出来的字段(SELECT 部分)包含 HTTP 请求消息体模版中用到的变量。例如将
/devices/+/events
主题的 MQTT 消息集成到 GCP Pub/Sub,此处规则 SQL 如下:
SELECT
*
FROM
"/devices/+/events"
添加动作,在动作下拉框中选择 使用数据桥接转发 选项,选择先前创建好的 GCP Pub/Sub 数据桥接。
点击最下方创建按钮完成规则创建。
至此您已经完成整个创建过程,可以前往 数据集成 -> Flows 页面查看拓扑图,此时应当看到 /devices/+/events
主题的消息经过名为 my_rule
的规则处理,处理结果写入到 GCP Pub/Sub 中。
测试数据桥接与规则
- 使用 MQTTX 向
/devices/+/events
主题发布消息:
mqttx pub -i emqx_c -t /devices/+/events -m '{ "msg": "hello GCP PubSub" }'
查看数据桥接运行统计,命中、发送成功次数均 +1。
前往 GCP Pub/Sub 控制台查看数据是否已经发送成功。