将 MQTT 数据传输到 Apache Kafka
Apache Kafka 是一个广泛使用的开源分布式事件流处理平台,能够处理应用程序和系统之间数据流的实时传输。然而,Kafka 并不是为边缘物联网通信构建的,Kafka 客户端需要稳定的网络连接和更多的硬件资源。在物联网领域,设备和应用程序生成的数据使用轻量级 MQTT 协议传输。EMQX Platform 与 Kafka/Confluent 的集成使用户能够无缝地将 MQTT 数据流入或流出 Kafka。MQTT 数据流被引入 Kafka 主题,确保实时处理、存储和分析。EMQX Platform 支持将客户端数据转发到 Kafka 进行处理、存储和分析。同时,Kafka 主题的数据可以被 MQTT 设备消费,实现即时处理。
本页详细介绍了 Kafka 数据集成的功能特性,同时提供了实用的 Kafka 数据集成创建指导。
工作原理
Apache Kafka 数据集成是 EMQX Platform 的开箱即用功能,能够在基于 MQTT 的物联网数据和 Kafka 强大的数据处理能力之间架起桥梁。通过内置的规则引擎组件,集成简化了两个平台之间的数据流和处理过程,无需复杂编码。
将消息数据转发到 Kafka 的基本工作流程如下:
- 消息发布:设备通过 MQTT 协议成功连接到 EMQX Platform 部署,并通过 MQTT 定期发布包含状态数据的消息。当 EMQX Platform 收到这些消息时,它启动其规则引擎内的匹配过程。
- 消息数据处理:通过内置的规则引擎,这些 MQTT 消息可以根据主题匹配规则进行处理。当消息到达并通过规则引擎时,规则引擎将评估针对该消息事先定义好的处理规则。如果任何规则指定消息载荷转换,则应用这些转换,例如转换数据格式、过滤特定信息或使用额外上下文丰富载荷。
- 发送到 Kafka:规则引擎中定义的规则触发将消息转发到 Kafka 的动作。使用 Kafka 数据集成,MQTT 主题被映射到预定义的 Kafka 主题,所有处理过的消息和数据被写入 Kafka 主题。
从 Kafka 消费消息的基本工作流程如下:
- 订阅 Kafka 主题:通过规则引擎中的输入源定义 Kafka 消费者的动作,设置要消费的 Kafka 主题名称。当 EMQX 部署收到这些消息时,它启动其规则引擎内的匹配过程。
- 消息数据处理:通过内置的规则引擎,当消息到达并通过规则引擎时,规则引擎将评估针对该消息事先定义好的处理规则。如果任何规则指定消息载荷转换,则应用这些转换,例如转换数据格式、过滤特定信息或使用额外上下文丰富载荷。
- 转发到 MQTT 客户端:规则引擎中定义的规则触发将消息转发到 MQTT 主题的动作。使用 Kafka 数据集成,Kafka 主题被映射到预定义的 MQTT 主题,所有处理过的消息和数据可以被 MQTT 客户端消费。
特性与优势
与 Apache Kafka 的数据集成为您的业务带来以下特性和优势:
- 载荷转换:在传输过程中,消息载荷可以通过定义的 SQL 规则进行处理。例如,包含一些实时指标(如总消息计数、成功/失败传递计数和消息速率)的载荷可以在消息被输入到 Kafka 之前进行数据提取、过滤、丰富和转换。
- 有效的主题映射:通过配置的 kafka 数据集成,可以将众多物联网业务主题映射到 Kakfa 主题。EMQX 支持将 MQTT 用户属性映射到 Kafka 标头,并采用各种灵活的主题映射方法,包括一对一、一对多、多对多,以及支持 MQTT 主题过滤器(通配符)。
- 灵活的分区选择策略:支持根据 MQTT 主题或客户端将消息转发到同一 Kafka 分区。
- 高吞吐量情况下的处理能力:EMQX Kafka 生产者支持同步和异步写入模式,允许您根据不同场景在实时优先和性能优先的数据写入策略之间灵活平衡。
- 运行时指标:支持查看每个数据桥的运行时指标,如总消息数、成功/失败计数、当前速率等。
这些特性增强了集成能力和灵活性,有助于您建立有效和稳健的物联网平台架构。您日益增长的物联网数据可以在稳定的网络连接下传输,并且可以进一步有效地存储和管理。
连接准备
本节介绍了在 EMQX Platform 中创建 Kafka 数据集成之前需要做的准备工作。
前置准备
网络设置
开始之前,您需要在 EMQX Platform 上创建一个部署(EMQX 群集)并配置网络。
- 对于专有版和旗舰版部署用户: 请先创建 VPC 对等连接,创建完对等连接之后,可以通过内部网络 IP 登录 Platform Console 访问目标连接器。或者开通 NAT 网关,通过公网 IP 访问目标连接器。
- 对于 BYOC 部署用户: 请在部署 BYOC 的 VPC 和目标连接器所在的 VPC 之间建立对等连接,创建完对等连接之后,可以通过内部网络 IP 访问目标连接器。如果您需要通过公共 IP 地址访问资源,请在公共云控制台中为部署 BYOC 的 VPC 配置 NAT 网关。
安装 Kafka 并创建主题
安装 Kafka。
bash# 安装 zookeeper docker run -d --restart=always \ --name zookeeper \ -p 2181:2181 \ zookeeper # 安装 Kafka,开放 9092 端口 docker run -d --restart=always --name mykafka \ -p 9092:9092 \ -e HOST_IP=localhost \ -e KAFKA_ADVERTISED_PORT=9092 \ -e KAFKA_ADVERTISED_HOST_NAME=<服务器 IP> \ -e KAFKA_BROKER_ID=1 \ -e KAFKA_LOG_RETENTION_HOURS=12 \ -e KAFKA_LOG_FLUSH_INTERVAL_MESSAGES=100000 \ -e KAFKA_ZOOKEEPER_CONNECT=<服务器 IP>:2181 \ -e ZK=<服务器 IP> \ wurstmeister/kafka
创建主题。
bash# 进入 Kafka 实例,并创建 emqx 主题 $ docker exec -it mykafka /opt/kafka/bin/kafka-topics.sh --zookeeper <broker IP>:2181 --replication-factor 1 --partitions 1 --topic emqx --create
返回
Created topic emqx.
表示创建成功。
将消息数据转发到 Kafka
本节演示了如何将模拟温湿度数据通过 MQTT 协议上报到 EMQX 部署 并通过配置的数据集成将数据转存到 Kafka,内容包括创建 Kafka 生产者连接器、创建规则和测试规则。
创建 Kafka 生产者连接器
在创建数据集成的规则之前,您需要先创建一个 Kafka 生产者连接器用于访问 Kafka 服务器。
- 在部署菜单中选择数据集成,在数据转发服务分类下选择 Kafka 生产者服务。如果您已经创建了其他的连接器,点击新建连接器,然后在数据转发服务分类下选择 Kafka 生产者服务。
- 在创建连接器页面中配置以下信息:
- 连接器名称:系统将自动生成一个连接器的名称,以
connector-
开头。 - 主机列表:填写主机列表,请确保您的 kafka 服务可以正常通过网络访问。
- 认证: 填写认证方式,根据需要填写认证方式,这里我们使用基础认证。
- 认证方法: 可以选择
plain
,SHA256
,SHA512
,根据 Kafka 服务认证要求选择。 - 用户名: kafka 服务的用户名,通常为 API Key。
- 密码: kafka 服务的认证密码,通常为 API Secret。
- 认证方法: 可以选择
- 其他使用默认值,或根据您的业务需求进行配置。
- 连接器名称:系统将自动生成一个连接器的名称,以
- 点击测试连接按钮,如果 Kafka 服务能够正常访问,则会返回成功提示。
- 点击新建按钮完成配置。
创建规则
接下来您需要创建一条规则来指定需要写入的数据,并在规则中添加响应动作以将经规则处理的数据转发到 Kafka。
点击连接器列表操作列下的新建规则图标或在规则列表中点击新建规则进入新建规则步骤页。
在 SQL 编辑器中输入规则匹配 SQL 语句。以下的 SQL 示例表示从发送到
temp_hum/emqx
主题的消息中读取消息上报时间up_timestamp
、客户端 ID、消息体 (Payload),并从消息体中分别读取温度和湿度。sqlSELECT timestamp, clientid, payload.temp as temp, payload.hum as hum FROM "temp_hum/emqx"
我们可以使用启用调试来模拟数据的输入并测试查看结果。
点击下一步开始创建输出动作。
从使用连接器下拉框中选择您之前创建的 Kafka 生产者连接器。
配置以下信息:
动作名称:系统将自动生成一个动作的名称,也可以自己命名动作的名称。
Kafka 主题名称:这里填写之前创建的主题
emqx
。根据您业务的需要定义 Kafka header。
消息体的设置中,消息的键默认是使用规则中获取的 client ID,您也可以根据业务需要修改。在消息的值中,可以填入需要转发的温度和湿度的数值。
# kafka 消息的键 ${client_id} # kafka 消息的值 {"temp": ${temp}, "hum": ${hum}}
点击确认按钮完成动作的配置。
在弹出的成功创建规则提示框中点击返回规则列表,从而完成了整个数据集成的配置链路。
测试规则
推荐使用 MQTTX 模拟温湿度数据上报,同时您也可以使用其他任意客户端完成。
使用 MQTTX 连接到部署,并向以下 Topic 发送消息。
topic:
temp_hum/emqx
payload:
json{ "temp": "27.5", "hum": "41.8" }
查看消息是否转发到了 Kafka。
bash# 进入 Kafka 实例,并查看 emqx 主题 $ docker exec -it mykafka /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server <broker IP>:9092 --topic emqx --from-beginning
在控制台查看运行数据。在规则列表点击规则 ID,在运行统计页面可以查看到规则的统计以及此规则下所有动作的统计。
从 Kafka 消费消息
本节演示了 EMQX 部署如何从 Kafka 消费消息并通过配置的数据集成将消息数据重新发布到 MQTT 主题,内容包括创建 Kafka 消费者连接器、创建规则和测试规则。
创建 Kafka 消费者连接器
在添加 Kafka 消费者前,您需要创建 Kafka 消费者连接器,以便 EMQX 部署与 Kafka 建立连接。
- 在部署菜单中选择数据集成,在数据导入服务分类下选择 Kafka 消费者。如果您已经创建了其他的连接器,点击新建连接器,然后在数据导入服务分类下选择 Kafka 消费者。
- 在创建连接器页面中配置以下信息:
- 连接器名称:系统将自动生成一个连接器的名称,以
connector-
开头。 - 主机列表:填写主机列表,请确保您的 kafka 服务可以正常通过网络访问。
- 认证: 填写认证方式,根据需要填写认证方式,这里我们使用基础认证。
- 认证方法: 可以选择
plain
,SHA256
,SHA512
,根据 Kafka 服务认证要求选择。 - 用户名: kafka 服务的用户名,通常为 API Key。
- 密码: kafka 服务的认证密码,通常为 API Secret。
- 认证方法: 可以选择
- 其他使用默认值,或根据您的业务需求进行配置。
- 连接器名称:系统将自动生成一个连接器的名称,以
- 点击测试连接按钮,如果 Kafka 服务能够正常访问,则会返回成功提示。
- 点击新建按钮完成配置。
创建规则
接下来您需要创建一条规则来指定输入的数据以及规则引擎处理的规则,并在规则中添加输出动作以将经规则处理的数据转发到相应的主题中。
点击连接器列表中操作列下的新建规则图标,或在规则列表上方点击新建规则进入新建规则步骤页。
对于消费者规则,您需要先配置输入的动作。规则编辑页面会自动弹出输入动作配置,或者选择 SQL 编辑面板右侧 输入动作 - 添加输入动作 - 选择 Kafka 消费者 进入配置。
- 选择 Kafka 消费者连接器
- Kafka 主题名称:输入需要从 Kafka 消费的主题,如
temp_hum/emqx
- Key 编码模式 和 Value 编码模式:选择 Kafka 消息键和消息值的编码模式。
- 消费组 ID:指定此 Source 的消费组标识符。如果未提供,系统将基于 Source 名称自动生成一个组 ID。该字段只能包含字母、数字、下划线、点和连字符。
- 偏移重置策略:选择当没有消费者偏移量或偏移量变得无效时,Kafka 消费者开始从 Kafka 主题分区读取的偏移量重置策略。
- 如果您希望消费者从最新偏移量开始读取消息,跳过消费者启动前产生的消息,请选择
latest
。 - 如果您希望消费者从分区的开始读取消息,包括消费者启动前产生的消息,即读取主题中的所有历史数据,请选择
earliest
。
- 如果您希望消费者从最新偏移量开始读取消息,跳过消费者启动前产生的消息,请选择
- 点击确认按钮完成配置。
SQL 编辑器中将更新数据源字段,同时可以在
SELECT
中选取字段用于数据转发。sqlSELECT key as key, value as value, topic as topic FROM "$bridges/kafka_consumer:soruce-812985f2"
点击下一步,创建输出动作。
在新建输出动作中,选择消息重新发布。
配置输出动作信息:
- 主题:转发的 MQTT 主题,支持
${var}
格式的占位符。此处输入sub/${topic}
,表示在原主题的基础上添加sub/
前缀进行转发。例如,当原始消息主题为t/1
时,转发的主题为sub/t/1
。 - QoS:消息发布 QoS,下拉选择
0
、1
、2
或${qos}
,也可以输入占位符从其他字段中设置 QoS,此处选择${qos}
表示跟随原始消息的 QoS。 - Retain:选择
true
、false
或${falgs.retain}
,确认是否以保留消息方式发布消息,也可以输入占位符从其他字段中设置保留消息标志位。此处选择${falgs.retain}
表示跟随原始消息的保留消息标志位。 - 消息模板:用于生成转发消息 Payload 的模板,默认留空表示转发规则输出结果。此处可以输入
${.}
表示转发所有规则引擎中的字段。
- 主题:转发的 MQTT 主题,支持
其他配置使用默认值,点击创建按钮完成输出动作的创建。
创建成功后将回到创建规则页面,在规则列表中可以查看到新创建的规则,在动作列表中,输入动作列表可以查看到数据导入的动作列表。消息重新发布动作目前不显示在输出动作列表,如需查看,点击规则编辑按钮,在规则设置的下方,可以查看到消息重新发布的输出动作。
测试规则
您可以使用工具向 Kafka 主题 temp_hum/emqx
发送消息,并且订阅部署中的转发主题来测试规则。 当向主题 temp_hum/emqx
发布消息时,消息将被转发到部署中的 sub/temp_hum/emqx
主题中。
以下步骤演示了如何使用 MQTTX 订阅转发的 MQTT 主题来获取消息。
使用 MQTTX 订阅当前部署的主题
sub/#
。使用 Kafka 生产者或其他工具向主题
temp_hum/emqx
发布消息:json{ "temp": 55, "hum": 32 }
MQTTX 收到当前部署
sub/temp_hum/emqx
主题内消息:json{ "key": "", "value": { "temp": 55, "hum": 32 } }