---
title: 跨协议互通
---

# 跨协议互通

FlowMQ 原生支持跨协议消息互通。不同协议的客户端连接到同一 FlowMQ 集群后，可以透明地进行消息收发，无需额外的桥接组件或中间件。

## 工作原理

跨协议互通基于 FlowMQ 的[消息路由](../concepts/message-routing)机制实现。各协议的消息经协议适配器映射为统一的 FlowMQ Topic 后，由路由引擎分发到对应的 Destination（Stream、Subscription、Queue），全程与来源协议无关。

各协议使用各自原生的标识格式即可，协议适配器会自动将其映射为 FlowMQ 内部的 Topic。例如 Kafka 客户端使用 `sensors.device-001.telemetry`，适配器会将 `.` 转为 `/`，映射为与 MQTT 侧 `sensors/device-001/telemetry` 相同的 FlowMQ Topic。各协议的具体映射规则参见[协议适配与 Topic 映射](../concepts/message-routing#协议适配与-topic-映射)。

## MQTT → Kafka

**场景**：IoT 设备通过 MQTT 上报遥测数据，后端大数据平台通过 Kafka 消费进行实时分析。

**消息流转路径**：

1. 设备通过 MQTT 发布消息到 `sensors/device-001/telemetry`
2. 协议适配器直接映射为 FlowMQ Topic `sensors/device-001/telemetry`
3. 路由引擎将消息投递到 topic filter 匹配的 **Stream**，消息被持久化
4. Kafka 消费者从该 Stream 消费数据（Topic 名称自动映射为 `sensors.device-001.telemetry`）

::: tip
如果需要将多个 MQTT 主题的数据汇聚到同一个 Stream 中统一消费（如用 `sensors/+/telemetry` 捕获所有设备的遥测数据），可以为 Stream 配置额外的[主题过滤器绑定](./streaming#主题过滤器绑定)。
:::

### MQTT 端：设备上报数据

```python
from paho.mqtt import client as mqtt

HOST = "your-flowmq-host"
PORT = 1883

client = mqtt.Client(client_id="device-001", protocol=mqtt.MQTTv5)
client.connect(HOST, PORT, 60)

# 设备上报温度数据
client.publish(
    "sensors/device-001/telemetry",
    payload=b'{"temp": 23.5, "humidity": 65, "ts": 1700000000}',
    qos=1,
)

client.disconnect()
```

### Kafka 端：后端消费分析

```python
from kafka import KafkaConsumer

# Kafka 侧 Topic 使用 "." 分隔，适配器自动与 MQTT 的 "/" 互转
consumer = KafkaConsumer(
    "sensors.device-001.telemetry",
    bootstrap_servers=["your-flowmq-host:9092"],
    auto_offset_reset="latest",
    enable_auto_commit=True,
    group_id="telemetry-analytics",
)

for msg in consumer:
    print(f"收到设备数据: partition={msg.partition}, offset={msg.offset}")
    print(f"  payload={msg.value.decode()}")
    # 后续可写入数据仓库、触发实时计算等
```

### 典型应用

- **IoT 数据采集与分析**：海量设备使用轻量 MQTT 协议上报，后端 Kafka 消费组进行实时流处理与离线分析
- **边缘到云数据管道**：边缘网关汇聚 MQTT 数据，云端 Kafka 消费者对接 Flink / Spark 等大数据框架

## Kafka → MQTT

**场景**：云端控制平台通过 Kafka 向所有设备广播指令，各设备通过 MQTT 订阅实时接收并执行。

**消息流转路径**：

1. 后端通过 Kafka Producer 写入指令到 `commands.broadcast`
2. 协议适配器将 `.` 转为 `/`，映射为 FlowMQ Topic `commands/broadcast`
3. 路由引擎将消息投递到 topic filter 匹配的 **Subscription**
4. 所有订阅了 `commands/#` 的 MQTT 设备实时收到推送

### Kafka 端：广播控制指令

```python
from kafka import KafkaProducer

producer = KafkaProducer(
    bootstrap_servers=["your-flowmq-host:9092"],
    acks="all",
)

# 向所有设备广播固件升级指令
producer.send(
    "commands.broadcast",
    value=b'{"action": "upgrade_firmware", "version": "2.1.0"}',
)
producer.flush()
producer.close()
```

### MQTT 端：设备接收指令

```python
from paho.mqtt import client as mqtt

HOST = "your-flowmq-host"
PORT = 1883

def on_connect(client, userdata, flags, reason_code, properties=None):
    print("connected")
    # 使用通配符订阅所有指令
    client.subscribe("commands/#", qos=1)

def on_message(client, userdata, msg):
    print(f"收到指令: [{msg.topic}] {msg.payload.decode()}")
    # 设备执行控制逻辑（固件升级、参数调整等）

client = mqtt.Client(client_id="device-001", protocol=mqtt.MQTTv5)
client.on_connect = on_connect
client.on_message = on_message
client.connect(HOST, PORT, 60)
client.loop_forever()
```

### 典型应用

- **远程指令下发**：云端控制平台通过 Kafka 发布指令，终端设备通过 MQTT 实时接收并执行
- **告警分发**：规则引擎产生的告警写入 Kafka，边缘端通过 MQTT 订阅获取实时通知

## 通配符订阅

MQTT 客户端可以使用通配符批量接收消息，在跨协议场景下同样有效：

```python
# 订阅所有设备的告警（匹配 alerts/device-001、alerts/device-002 等）
client.subscribe("alerts/#", qos=1)

# 订阅特定类型的传感器数据（匹配 sensors/任意设备/telemetry）
client.subscribe("sensors/+/telemetry", qos=1)
```

通配符规则详见[消息路由 - Topic 命名与通配符](../concepts/message-routing#topic-命名与通配符)。

## 相关文档

- [消息路由](../concepts/message-routing) — Topic、Destination 与路由机制详解
- [数据流](./streaming) — Stream 持久化与 Kafka 兼容消费
- [Kafka 协议接入](../protocols/kafka) — Kafka 客户端详细使用指南
- [MQTT 协议接入](../protocols/mqtt) — MQTT 客户端详细使用指南
