跨协议互通
FlowMQ 原生支持跨协议消息互通。不同协议的客户端连接到同一 FlowMQ 集群后,可以透明地进行消息收发,无需额外的桥接组件或中间件。
工作原理
跨协议互通基于 FlowMQ 的消息路由机制实现。各协议的消息经协议适配器映射为统一的 FlowMQ Topic 后,由路由引擎分发到对应的 Destination(Stream、Subscription、Queue),全程与来源协议无关。
各协议使用各自原生的标识格式即可,协议适配器会自动将其映射为 FlowMQ 内部的 Topic。例如 Kafka 客户端使用 sensors.device-001.telemetry,适配器会将 . 转为 /,映射为与 MQTT 侧 sensors/device-001/telemetry 相同的 FlowMQ Topic。各协议的具体映射规则参见协议适配与 Topic 映射。
MQTT → Kafka
场景:IoT 设备通过 MQTT 上报遥测数据,后端大数据平台通过 Kafka 消费进行实时分析。
消息流转路径:
- 设备通过 MQTT 发布消息到
sensors/device-001/telemetry - 协议适配器直接映射为 FlowMQ Topic
sensors/device-001/telemetry - 路由引擎将消息投递到 topic filter 匹配的 Stream,消息被持久化
- Kafka 消费者从该 Stream 消费数据(Topic 名称自动映射为
sensors.device-001.telemetry)
TIP
如果需要将多个 MQTT 主题的数据汇聚到同一个 Stream 中统一消费(如用 sensors/+/telemetry 捕获所有设备的遥测数据),可以为 Stream 配置额外的主题过滤器绑定。
MQTT 端:设备上报数据
python
from paho.mqtt import client as mqtt
HOST = "your-flowmq-host"
PORT = 1883
client = mqtt.Client(client_id="device-001", protocol=mqtt.MQTTv5)
client.connect(HOST, PORT, 60)
# 设备上报温度数据
client.publish(
"sensors/device-001/telemetry",
payload=b'{"temp": 23.5, "humidity": 65, "ts": 1700000000}',
qos=1,
)
client.disconnect()Kafka 端:后端消费分析
python
from kafka import KafkaConsumer
# Kafka 侧 Topic 使用 "." 分隔,适配器自动与 MQTT 的 "/" 互转
consumer = KafkaConsumer(
"sensors.device-001.telemetry",
bootstrap_servers=["your-flowmq-host:9092"],
auto_offset_reset="latest",
enable_auto_commit=True,
group_id="telemetry-analytics",
)
for msg in consumer:
print(f"收到设备数据: partition={msg.partition}, offset={msg.offset}")
print(f" payload={msg.value.decode()}")
# 后续可写入数据仓库、触发实时计算等典型应用
- IoT 数据采集与分析:海量设备使用轻量 MQTT 协议上报,后端 Kafka 消费组进行实时流处理与离线分析
- 边缘到云数据管道:边缘网关汇聚 MQTT 数据,云端 Kafka 消费者对接 Flink / Spark 等大数据框架
Kafka → MQTT
场景:云端控制平台通过 Kafka 向所有设备广播指令,各设备通过 MQTT 订阅实时接收并执行。
消息流转路径:
- 后端通过 Kafka Producer 写入指令到
commands.broadcast - 协议适配器将
.转为/,映射为 FlowMQ Topiccommands/broadcast - 路由引擎将消息投递到 topic filter 匹配的 Subscription
- 所有订阅了
commands/#的 MQTT 设备实时收到推送
Kafka 端:广播控制指令
python
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers=["your-flowmq-host:9092"],
acks="all",
)
# 向所有设备广播固件升级指令
producer.send(
"commands.broadcast",
value=b'{"action": "upgrade_firmware", "version": "2.1.0"}',
)
producer.flush()
producer.close()MQTT 端:设备接收指令
python
from paho.mqtt import client as mqtt
HOST = "your-flowmq-host"
PORT = 1883
def on_connect(client, userdata, flags, reason_code, properties=None):
print("connected")
# 使用通配符订阅所有指令
client.subscribe("commands/#", qos=1)
def on_message(client, userdata, msg):
print(f"收到指令: [{msg.topic}] {msg.payload.decode()}")
# 设备执行控制逻辑(固件升级、参数调整等)
client = mqtt.Client(client_id="device-001", protocol=mqtt.MQTTv5)
client.on_connect = on_connect
client.on_message = on_message
client.connect(HOST, PORT, 60)
client.loop_forever()典型应用
- 远程指令下发:云端控制平台通过 Kafka 发布指令,终端设备通过 MQTT 实时接收并执行
- 告警分发:规则引擎产生的告警写入 Kafka,边缘端通过 MQTT 订阅获取实时通知
通配符订阅
MQTT 客户端可以使用通配符批量接收消息,在跨协议场景下同样有效:
python
# 订阅所有设备的告警(匹配 alerts/device-001、alerts/device-002 等)
client.subscribe("alerts/#", qos=1)
# 订阅特定类型的传感器数据(匹配 sensors/任意设备/telemetry)
client.subscribe("sensors/+/telemetry", qos=1)通配符规则详见消息路由 - Topic 命名与通配符。
相关文档
- 消息路由 — Topic、Destination 与路由机制详解
- 数据流 — Stream 持久化与 Kafka 兼容消费
- Kafka 协议接入 — Kafka 客户端详细使用指南
- MQTT 协议接入 — MQTT 客户端详细使用指南