Skip to content

AMQP 协议

本文介绍如何使用 AMQP 0-9-1 协议接入 FlowMQ Enterprise(本地部署版)。

FlowMQ 原生支持 AMQP 0-9-1 协议,允许使用标准 AMQP 客户端和库进行连接、发布和消费消息。AMQP 非常适合企业级消息传递、任务队列和可靠消息投递场景。

注意:AMQP 协议支持目前处于开发阶段(WIP),部分功能可能尚未完全可用。

1. 连接

使用标准 AMQP 0-9-1 客户端连接 FlowMQ:

python
import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        host='your-flowmq-host',
        port=5672,
        credentials=pika.PlainCredentials('username', 'password')
    )
)
channel = connection.channel()

2. 发布消息

2.1 Direct Exchange

python
# 声明 direct exchange
channel.exchange_declare(exchange='orders', exchange_type='direct', durable=True)

# 发布消息
channel.basic_publish(
    exchange='orders',
    routing_key='eu.new',
    body='{"order_id": "12345", "amount": 99.99}',
    properties=pika.BasicProperties(
        delivery_mode=2,  # 持久化消息
        content_type='application/json'
    )
)

2.2 Topic Exchange

python
# 声明 topic exchange
channel.exchange_declare(exchange='sensors', exchange_type='topic', durable=True)

# 使用 routing key 发布
channel.basic_publish(
    exchange='sensors',
    routing_key='temperature.factory.line-a',
    body='{"value": 22.5, "unit": "celsius"}'
)

2.3 Fanout Exchange

python
# 声明 fanout exchange(广播到所有绑定的队列)
channel.exchange_declare(exchange='notifications', exchange_type='fanout', durable=True)

channel.basic_publish(
    exchange='notifications',
    routing_key='',  # fanout 模式忽略 routing_key
    body='{"message": "系统维护通知"}'
)

3. 消费消息

3.1 声明和绑定队列

python
# 声明队列
channel.queue_declare(queue='order-processing', durable=True)

# 绑定到 exchange
channel.queue_bind(
    exchange='orders',
    queue='order-processing',
    routing_key='eu.new'
)

3.2 消费并确认

python
def callback(ch, method, properties, body):
    print(f"收到消息: {body}")
    # 处理消息
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(
    queue='order-processing',
    on_message_callback=callback,
    auto_ack=False  # 手动确认
)

print("等待消息...")
channel.start_consuming()

4. 默认 Exchange

FlowMQ 提供标准 AMQP 默认 exchange:

  • amq.direct — 按 routing key 直接路由
  • amq.topic — 基于通配符的主题路由
  • amq.fanout — 广播到所有绑定队列

5. 消息属性

python
properties = pika.BasicProperties(
    delivery_mode=2,          # 持久化
    content_type='application/json',
    headers={'user_id': '12345', 'priority': 'high'},
    priority=5,
    correlation_id='corr-123',
    reply_to='reply-queue',
    expiration='60000',       # TTL(毫秒)
    message_id='msg-456',
    app_id='order-service'
)

6. 最佳实践

  • 使用持久化队列和 Exchange 确保消息可靠性
  • 启用 Publisher Confirms 进行可靠发布
  • 使用手动 Ack 保证投递语义
  • 设置合理的 TTL 控制消息过期
  • 使用 Correlation ID 实现请求/响应模式
  • 配置死信队列 (DLQ) 处理失败消息

7. 客户端库

FlowMQ 兼容任何标准 AMQP 0-9-1 客户端库:

语言
Pythonpika, aio-pika
Javaamqp-client, spring-amqp
Node.jsamqplib
Gostreadway/amqp
C#RabbitMQ.Client
Rubybunny

8. 连接参数

参数默认值说明
Hostyour-flowmq-hostFlowMQ 服务地址
Port5672AMQP 端口
Virtual Host/虚拟主机
Heartbeat60s连接心跳
Frame Max131072最大帧大小