---
title: 消息路由
---

# 消息路由

FlowMQ 中，每条消息都携带一个 **Topic** 作为路由地址。路由引擎根据各 **Destination** 的 topic filters 对消息进行匹配与分发——Destination 决定消息如何存储与消费。这套路由机制是完全**协议无关**的。

## Topic：消息的路由地址

Topic 是 FlowMQ 内部的路由地址，与具体协议中的"topic"并非同一概念。消息进入系统时，协议适配器将各协议中的相应标识——如 MQTT 的 topic、Kafka 的 topic、AMQP 的 message key——映射为 FlowMQ 的 Topic。映射完成后，路由引擎仅根据 Topic 进行匹配与分发，不再感知消息的来源协议。

## Destination：消息的投递目标

消息离开 Topic 后的去向，由 **Destination** 决定。

每个 Destination 关联一组 **topic filters**（主题过滤规则）。消息发布到 Topic 后，路由引擎将其与所有 Destination 的 topic filters 逐一匹配，并投递到所有命中的 Destination。

概言之：**Topic 是消息的路由地址，Destination 决定消息如何存储与消费。**

### Subscription

Subscription 是一种 Destination。它将匹配消息**实时推送给在线的订阅者**，消息到达后立即投递。

### Stream

Stream 是一种 Destination。它将匹配消息**以追加日志（append-only log）的形式持久化**，支持按偏移量推进、回放历史和重新消费。

### Queue

Queue 是一种 Destination。它提供**消息队列语义**：匹配的消息会先缓存到队列中，再投递给消费者，消费确认（ACK）后删除。

### 持续扩展

Destination 的设计天然支持扩展。除 Stream、Queue 和 Subscription 外，未来还将引入更多类型（如 **Table**），为消息提供新的处理形态——而这不需要改变 Topic 的地址体系，也无需调整发布端或消费端的接入方式。

## 消息流转：接入 → 路由 → 投递

一条消息在系统内的完整路径如下：

1. **接入**：协议适配器（MQTT / Kafka 等）接收消息，将其转换为内部统一格式，并从中提取 FlowMQ Topic（提取规则参见[协议适配与 Topic 映射](#协议适配与-topic-映射)）。
2. **路由**：路由引擎将消息所属的 Topic 与所有 Destination 的 topic filters 进行匹配，确定投递目标。
3. **投递**：消息到达匹配的 Destination——Stream 以日志形式持久化，Queue 进入队列等待消费，Subscription 则实时推送给在线订阅者。

路由逻辑与协议完全解耦：新增协议只需新增适配器，核心路由引擎无需变动。

## 多协议互通

FlowMQ 实现跨协议互通的关键在于两层解耦：

1. **消息与协议解耦**：消息经适配器转换为内部统一格式后，路由引擎仅根据 Topic 进行分发，不区分消息的来源协议。
2. **协议与能力解耦**：消息的存储与消费能力由 Destination 承载，与接入协议无关。

这种设计也赋予系统良好的扩展性：新增能力只需新增 Destination 类型，新增协议只需新增协议适配器，核心路由模型保持稳定。

## 跨协议互通示例

### MQTT 发布 → Kafka 消费

1. IoT 设备通过 MQTT 向 `factory/line-1/telemetry` 发布遥测数据
2. 消息进入 Topic `factory/line-1/telemetry`
3. 某个 Stream 的 topic filter `factory/#` 命中该消息，消息被路由至该 Stream 并持久化
4. 数据平台的 Kafka consumer 直接消费该 Stream

设备侧使用 MQTT，数据侧使用 Kafka，全程无需桥接，各取所长。

### Kafka 生产 → MQTT 订阅

1. 数据平台通过 Kafka Producer 向 `factory/line-1/alerts` 写入告警消息
2. 适配器将其映射为 Topic `factory/line-1/alerts`
3. 现场大屏通过 MQTT 订阅 `factory/line-1/#`，实时接收告警推送

## Topic 命名与通配符

FlowMQ 采用层级化 Topic 命名结构，以 `/` 作为层级分隔符：

```
factory/line-1/robot-7/telemetry/temperature
```

### 通配符

Destination 的 topic filter 和 MQTT 订阅均支持通配符：

| 通配符 | 说明 | 示例 |
|-------|------|------|
| `+` | 匹配单个层级 | `factory/+/robot-7/#` 匹配 `factory/line-1/robot-7/...`、`factory/line-2/robot-7/...` |
| `#` | 匹配剩余所有层级（须位于末尾） | `factory/line-1/#` 匹配该产线下的所有主题 |


## 协议适配与 Topic 映射

协议适配器负责将各协议的标识映射为 FlowMQ Topic。不同协议的映射来源和规则各不相同：

| 协议 | 映射来源 | 规则 |
|------|---------|------|
| **MQTT** | MQTT topic | 直接映射——MQTT topic 字符串即为 FlowMQ Topic |
| **Kafka** | Kafka topic | 自动转换分隔符——`.` 转为 `/`（如 `factory.line1.robot7.telemetry` → `factory/line1/robot7/telemetry`），反向投递时自动映射回 Kafka 格式 |
| **AMQP** | message key | 将 message key 映射为 FlowMQ Topic |
