【星海出品】rabbitMQ队列处理深入研究
【RabbitMQ 商业网站】
https://www.rabbitmq.com/tutorials
2007年 RabbitMQ 最初由 LShift 公司开发
为了解决金融行业和其他企业级应用中的消息传递需求
基于 AMQP 协议标准
Pivotal 被 VMware 收购,RabbitMQ 现在属于 VMware Tanzu 产品套件的一部分
安装篇之前已经讲过了,可以看专栏的其他页面
AMQP(Advanced Message Queuing Protocol) 是一个开放标准的应用层协议
单个 TCP 连接支持多个信道(Channels)
# AMQP 消息结构
Message = {"headers": {}, # 属性头"properties": {}, # 消息属性"body": "" # 消息体
}
Exchange(交换机)类型必须支持以下交换机类型:
Direct - 直接路由(精确匹配 routing key)
Fanout - 广播路由(发送到所有绑定队列)
Topic - 主题路由(模式匹配 routing key)
Headers - 头路由(基于消息属性匹配)Queue(队列)要求
持久化:支持持久化队列
独占性:支持独占队列
自动删除:消费者断开后自动删除
消息TTL:支持消息过期时间# 三种确认模式
1. 自动确认 (auto-ack)
2. 显式确认 (manual ack)
3. 拒绝消息 (reject/nack)channel.tx_select() # 开启事务
channel.tx_commit() # 提交事务
channel.tx_rollback() # 回滚事务
额外的确认
发布者确认 (Publisher Confirms)
channel.confirm_delivery() # 开启确认模式
基础客户
# 1. 建立连接
connection = pika.BlockingConnection(parameters)# 2. 创建信道
channel = connection.channel()# 3. 声明交换机
channel.exchange_declare(exchange='logs', type='fanout')# 4. 声明队列
channel.queue_declare(queue='task_queue', durable=True)# 5. 绑定队列
channel.queue_bind(exchange='logs'