AMQP 协议
本文介绍如何使用 AMQP 0-9-1 协议接入 FlowMQ Enterprise(本地部署版)。
FlowMQ 原生支持 AMQP 0-9-1 协议,允许使用标准 AMQP 客户端和库进行连接、发布和消费消息。AMQP 非常适合企业级消息传递、任务队列和可靠消息投递场景。
注意:AMQP 协议支持目前处于开发阶段(WIP),部分功能可能尚未完全可用。
1. 连接
使用标准 AMQP 0-9-1 客户端连接 FlowMQ:
python
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='your-flowmq-host',
port=5672,
credentials=pika.PlainCredentials('username', 'password')
)
)
channel = connection.channel()2. 发布消息
2.1 Direct Exchange
python
# 声明 direct exchange
channel.exchange_declare(exchange='orders', exchange_type='direct', durable=True)
# 发布消息
channel.basic_publish(
exchange='orders',
routing_key='eu.new',
body='{"order_id": "12345", "amount": 99.99}',
properties=pika.BasicProperties(
delivery_mode=2, # 持久化消息
content_type='application/json'
)
)2.2 Topic Exchange
python
# 声明 topic exchange
channel.exchange_declare(exchange='sensors', exchange_type='topic', durable=True)
# 使用 routing key 发布
channel.basic_publish(
exchange='sensors',
routing_key='temperature.factory.line-a',
body='{"value": 22.5, "unit": "celsius"}'
)2.3 Fanout Exchange
python
# 声明 fanout exchange(广播到所有绑定的队列)
channel.exchange_declare(exchange='notifications', exchange_type='fanout', durable=True)
channel.basic_publish(
exchange='notifications',
routing_key='', # fanout 模式忽略 routing_key
body='{"message": "系统维护通知"}'
)3. 消费消息
3.1 声明和绑定队列
python
# 声明队列
channel.queue_declare(queue='order-processing', durable=True)
# 绑定到 exchange
channel.queue_bind(
exchange='orders',
queue='order-processing',
routing_key='eu.new'
)3.2 消费并确认
python
def callback(ch, method, properties, body):
print(f"收到消息: {body}")
# 处理消息
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(
queue='order-processing',
on_message_callback=callback,
auto_ack=False # 手动确认
)
print("等待消息...")
channel.start_consuming()4. 默认 Exchange
FlowMQ 提供标准 AMQP 默认 exchange:
amq.direct— 按 routing key 直接路由amq.topic— 基于通配符的主题路由amq.fanout— 广播到所有绑定队列
5. 消息属性
python
properties = pika.BasicProperties(
delivery_mode=2, # 持久化
content_type='application/json',
headers={'user_id': '12345', 'priority': 'high'},
priority=5,
correlation_id='corr-123',
reply_to='reply-queue',
expiration='60000', # TTL(毫秒)
message_id='msg-456',
app_id='order-service'
)6. 最佳实践
- 使用持久化队列和 Exchange 确保消息可靠性
- 启用 Publisher Confirms 进行可靠发布
- 使用手动 Ack 保证投递语义
- 设置合理的 TTL 控制消息过期
- 使用 Correlation ID 实现请求/响应模式
- 配置死信队列 (DLQ) 处理失败消息
7. 客户端库
FlowMQ 兼容任何标准 AMQP 0-9-1 客户端库:
| 语言 | 库 |
|---|---|
| Python | pika, aio-pika |
| Java | amqp-client, spring-amqp |
| Node.js | amqplib |
| Go | streadway/amqp |
| C# | RabbitMQ.Client |
| Ruby | bunny |
8. 连接参数
| 参数 | 默认值 | 说明 |
|---|---|---|
| Host | your-flowmq-host | FlowMQ 服务地址 |
| Port | 5672 | AMQP 端口 |
| Virtual Host | / | 虚拟主机 |
| Heartbeat | 60s | 连接心跳 |
| Frame Max | 131072 | 最大帧大小 |