---
title: AMQP 协议
---

# AMQP 协议

本文介绍如何使用 AMQP 0-9-1 协议接入 FlowMQ Enterprise（本地部署版）。

FlowMQ 原生支持 AMQP 0-9-1 协议，允许使用标准 AMQP 客户端和库进行连接、发布和消费消息。AMQP 非常适合企业级消息传递、任务队列和可靠消息投递场景。

> 注意：AMQP 协议支持目前处于开发阶段（WIP），部分功能可能尚未完全可用。

## 1. 连接

使用标准 AMQP 0-9-1 客户端连接 FlowMQ：

```python
import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        host='your-flowmq-host',
        port=5672,
        credentials=pika.PlainCredentials('username', 'password')
    )
)
channel = connection.channel()
```

## 2. 发布消息

### 2.1 Direct Exchange

```python
# 声明 direct exchange
channel.exchange_declare(exchange='orders', exchange_type='direct', durable=True)

# 发布消息
channel.basic_publish(
    exchange='orders',
    routing_key='eu.new',
    body='{"order_id": "12345", "amount": 99.99}',
    properties=pika.BasicProperties(
        delivery_mode=2,  # 持久化消息
        content_type='application/json'
    )
)
```

### 2.2 Topic Exchange

```python
# 声明 topic exchange
channel.exchange_declare(exchange='sensors', exchange_type='topic', durable=True)

# 使用 routing key 发布
channel.basic_publish(
    exchange='sensors',
    routing_key='temperature.factory.line-a',
    body='{"value": 22.5, "unit": "celsius"}'
)
```

### 2.3 Fanout Exchange

```python
# 声明 fanout exchange（广播到所有绑定的队列）
channel.exchange_declare(exchange='notifications', exchange_type='fanout', durable=True)

channel.basic_publish(
    exchange='notifications',
    routing_key='',  # fanout 模式忽略 routing_key
    body='{"message": "系统维护通知"}'
)
```

## 3. 消费消息

### 3.1 声明和绑定队列

```python
# 声明队列
channel.queue_declare(queue='order-processing', durable=True)

# 绑定到 exchange
channel.queue_bind(
    exchange='orders',
    queue='order-processing',
    routing_key='eu.new'
)
```

### 3.2 消费并确认

```python
def callback(ch, method, properties, body):
    print(f"收到消息: {body}")
    # 处理消息
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(
    queue='order-processing',
    on_message_callback=callback,
    auto_ack=False  # 手动确认
)

print("等待消息...")
channel.start_consuming()
```

## 4. 默认 Exchange

FlowMQ 提供标准 AMQP 默认 exchange：

- `amq.direct` — 按 routing key 直接路由
- `amq.topic` — 基于通配符的主题路由
- `amq.fanout` — 广播到所有绑定队列

## 5. 消息属性

```python
properties = pika.BasicProperties(
    delivery_mode=2,          # 持久化
    content_type='application/json',
    headers={'user_id': '12345', 'priority': 'high'},
    priority=5,
    correlation_id='corr-123',
    reply_to='reply-queue',
    expiration='60000',       # TTL（毫秒）
    message_id='msg-456',
    app_id='order-service'
)
```

## 6. 最佳实践

- 使用**持久化队列和 Exchange** 确保消息可靠性
- 启用 **Publisher Confirms** 进行可靠发布
- 使用**手动 Ack** 保证投递语义
- 设置合理的 **TTL** 控制消息过期
- 使用 **Correlation ID** 实现请求/响应模式
- 配置**死信队列 (DLQ)** 处理失败消息

## 7. 客户端库

FlowMQ 兼容任何标准 AMQP 0-9-1 客户端库：

| 语言 | 库 |
|---|---|
| Python | pika, aio-pika |
| Java | amqp-client, spring-amqp |
| Node.js | amqplib |
| Go | streadway/amqp |
| C# | RabbitMQ.Client |
| Ruby | bunny |

## 8. 连接参数

| 参数 | 默认值 | 说明 |
|---|---|---|
| Host | `your-flowmq-host` | FlowMQ 服务地址 |
| Port | 5672 | AMQP 端口 |
| Virtual Host | `/` | 虚拟主机 |
| Heartbeat | 60s | 连接心跳 |
| Frame Max | 131072 | 最大帧大小 |
