RabbitMQ (一)简单模式
简单模式
第一步:安装必要的库
pip install pika
import pika
第二步:编写生产者(Producer)代码
1.创建连接
credentials = pika.PlainCredentials(username, password)
connection_params = pika.ConnectionParameters(host='localhost',# 或者是服务器的IP '*.*.*.*'# port 默认:5672 (AMQP协议)virtual_host = '/', # 类似于命名空间,用于隔离不同应用credentials=self.credentials,heartbeat=300, # 发现无效的“僵尸连接” 并清理掉 ,默认580秒blocked_connection_timeout=300 # 服务器处于阻塞状态,客户端等待服务器的时间,超时客户端主动触发一个异常并关闭连接)# 建立到RabbitMQ服务器的连接self.connection = pika.BlockingConnection(self.connection_params)
2.创建通道
channel = connection.channel()
3.创建队列
# 声明一个持久化的队列,确保MQ重启后队列不丢失 durable=Truechannel.queue_declare(queue='hello_queue', durable=True)
- 队列一旦声明,是否持久化就确定了,不能更改了。
- 如果想把非持久化的队列改为持久化队列,会报错。
- 若声明过,则换一个队列名字。
4.传入消息
import json
message = {"date" :'2025-9-28' ,"MQ ":'rabbit' ,"user": 'me' "print": 'hello rabbit' }# 将结果(字典)序列化为JSON字符串,作为消息体
message_body = json.dumps(message)# 发布消息到指定的队列
channel.basic_publish(exchange='', # 使用默认的直连交换器 简单模式routing_key='hello_queue', # 指定队列名称body=message_body, # 消息内容properties=pika.BasicProperties(delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE # 设置消息为持久化,重启不丢失)
)
properties=pika.BasicProperties(delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE ) properties = pika.BasicProperties(delivery_mode=2)
- 两者是等价写法
5.关闭连接
connection.close()
- 如果传一次消息,关闭一次连接,每次都要消耗时间重新建立连接
- 所以一般传完最后一次消息,再关闭连接。那么要注意处理连接断开等异常问题
第三步:编写消费者(Consumer)代码
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(..........)) # 同上
channel = connection.channel()channel.queue_declare(..........) # 同上def callback(ch, method, properties, body):........... # 自己要处理的逻辑# 确定监听队列参数
channel.basic_consume(queue='hello_queue',auto_ack=True, # 应答参数on_message_callback=callback)print(' Waiting for messages. To exit press CTRL+C')
channel.start_consuming() # 真正开始从队列里取消息
- 如果指定的queue不存在,则会创建一个queue,如果已经存在 则不会做其他动作,官方推荐,每次使用时都创建队列。这是因为生产者进程可能晚于消费者进程被执行。
- 有消息取出来开始执行 callback,没消息就等待
channel.basic_consume(queue='hello_queue',
auto_ack=True, # 应答参数
on_message_callback=callback)
auto_ack=True 默认应答 (效率高)
队列有数据,取走,取走就出队,队列里就没有了。
如果消费者进程出bug了,callback没有成功执行,但是已经出队了,没办法重新处理