Skip to content

跨协议互通

FlowMQ 原生支持跨协议消息互通。不同协议的客户端连接到同一 FlowMQ 集群后,可以透明地进行消息收发,无需额外的桥接组件或中间件。

工作原理

跨协议互通基于 FlowMQ 的消息路由机制实现。各协议的消息经协议适配器映射为统一的 FlowMQ Topic 后,由路由引擎分发到对应的 Destination(Stream、Subscription、Queue),全程与来源协议无关。

各协议使用各自原生的标识格式即可,协议适配器会自动将其映射为 FlowMQ 内部的 Topic。例如 Kafka 客户端使用 sensors.device-001.telemetry,适配器会将 . 转为 /,映射为与 MQTT 侧 sensors/device-001/telemetry 相同的 FlowMQ Topic。各协议的具体映射规则参见协议适配与 Topic 映射

MQTT → Kafka

场景:IoT 设备通过 MQTT 上报遥测数据,后端大数据平台通过 Kafka 消费进行实时分析。

消息流转路径

  1. 设备通过 MQTT 发布消息到 sensors/device-001/telemetry
  2. 协议适配器直接映射为 FlowMQ Topic sensors/device-001/telemetry
  3. 路由引擎将消息投递到 topic filter 匹配的 Stream,消息被持久化
  4. Kafka 消费者从该 Stream 消费数据(Topic 名称自动映射为 sensors.device-001.telemetry

TIP

如果需要将多个 MQTT 主题的数据汇聚到同一个 Stream 中统一消费(如用 sensors/+/telemetry 捕获所有设备的遥测数据),可以为 Stream 配置额外的主题过滤器绑定

MQTT 端:设备上报数据

python
from paho.mqtt import client as mqtt

HOST = "your-flowmq-host"
PORT = 1883

client = mqtt.Client(client_id="device-001", protocol=mqtt.MQTTv5)
client.connect(HOST, PORT, 60)

# 设备上报温度数据
client.publish(
    "sensors/device-001/telemetry",
    payload=b'{"temp": 23.5, "humidity": 65, "ts": 1700000000}',
    qos=1,
)

client.disconnect()

Kafka 端:后端消费分析

python
from kafka import KafkaConsumer

# Kafka 侧 Topic 使用 "." 分隔,适配器自动与 MQTT 的 "/" 互转
consumer = KafkaConsumer(
    "sensors.device-001.telemetry",
    bootstrap_servers=["your-flowmq-host:9092"],
    auto_offset_reset="latest",
    enable_auto_commit=True,
    group_id="telemetry-analytics",
)

for msg in consumer:
    print(f"收到设备数据: partition={msg.partition}, offset={msg.offset}")
    print(f"  payload={msg.value.decode()}")
    # 后续可写入数据仓库、触发实时计算等

典型应用

  • IoT 数据采集与分析:海量设备使用轻量 MQTT 协议上报,后端 Kafka 消费组进行实时流处理与离线分析
  • 边缘到云数据管道:边缘网关汇聚 MQTT 数据,云端 Kafka 消费者对接 Flink / Spark 等大数据框架

Kafka → MQTT

场景:云端控制平台通过 Kafka 向所有设备广播指令,各设备通过 MQTT 订阅实时接收并执行。

消息流转路径

  1. 后端通过 Kafka Producer 写入指令到 commands.broadcast
  2. 协议适配器将 . 转为 /,映射为 FlowMQ Topic commands/broadcast
  3. 路由引擎将消息投递到 topic filter 匹配的 Subscription
  4. 所有订阅了 commands/# 的 MQTT 设备实时收到推送

Kafka 端:广播控制指令

python
from kafka import KafkaProducer

producer = KafkaProducer(
    bootstrap_servers=["your-flowmq-host:9092"],
    acks="all",
)

# 向所有设备广播固件升级指令
producer.send(
    "commands.broadcast",
    value=b'{"action": "upgrade_firmware", "version": "2.1.0"}',
)
producer.flush()
producer.close()

MQTT 端:设备接收指令

python
from paho.mqtt import client as mqtt

HOST = "your-flowmq-host"
PORT = 1883

def on_connect(client, userdata, flags, reason_code, properties=None):
    print("connected")
    # 使用通配符订阅所有指令
    client.subscribe("commands/#", qos=1)

def on_message(client, userdata, msg):
    print(f"收到指令: [{msg.topic}] {msg.payload.decode()}")
    # 设备执行控制逻辑(固件升级、参数调整等)

client = mqtt.Client(client_id="device-001", protocol=mqtt.MQTTv5)
client.on_connect = on_connect
client.on_message = on_message
client.connect(HOST, PORT, 60)
client.loop_forever()

典型应用

  • 远程指令下发:云端控制平台通过 Kafka 发布指令,终端设备通过 MQTT 实时接收并执行
  • 告警分发:规则引擎产生的告警写入 Kafka,边缘端通过 MQTT 订阅获取实时通知

通配符订阅

MQTT 客户端可以使用通配符批量接收消息,在跨协议场景下同样有效:

python
# 订阅所有设备的告警(匹配 alerts/device-001、alerts/device-002 等)
client.subscribe("alerts/#", qos=1)

# 订阅特定类型的传感器数据(匹配 sensors/任意设备/telemetry)
client.subscribe("sensors/+/telemetry", qos=1)

通配符规则详见消息路由 - Topic 命名与通配符

相关文档