当前位置: 首页 > news >正文

RabbitMQ (一)简单模式

简单模式

第一步:安装必要的库

pip install pika
import pika

第二步:编写生产者(Producer)代码

1.创建连接

      
credentials = pika.PlainCredentials(username, password)
connection_params = pika.ConnectionParameters(host='localhost',# 或者是服务器的IP  '*.*.*.*'# port 默认:5672 (AMQP协议)virtual_host = '/', # 类似于命名空间,用于隔离不同应用credentials=self.credentials,heartbeat=300, # 发现无效的“僵尸连接” 并清理掉 ,默认580秒blocked_connection_timeout=300 # 服务器处于阻塞状态,客户端等待服务器的时间,超时客户端主动触发一个异常并关闭连接)#  建立到RabbitMQ服务器的连接self.connection = pika.BlockingConnection(self.connection_params)

2.创建通道

channel = connection.channel()

3.创建队列

# 声明一个持久化的队列,确保MQ重启后队列不丢失 durable=Truechannel.queue_declare(queue='hello_queue', durable=True)
  • 队列一旦声明,是否持久化就确定了,不能更改了。
  • 如果想把非持久化的队列改为持久化队列,会报错。
  • 若声明过,则换一个队列名字。

4.传入消息

import json
message = {"date" :'2025-9-28' ,"MQ ":'rabbit' ,"user": 'me' "print": 'hello rabbit' }# 将结果(字典)序列化为JSON字符串,作为消息体
message_body = json.dumps(message)# 发布消息到指定的队列
channel.basic_publish(exchange='',  # 使用默认的直连交换器 简单模式routing_key='hello_queue',  # 指定队列名称body=message_body, # 消息内容properties=pika.BasicProperties(delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE  # 设置消息为持久化,重启不丢失)
)
  • properties=pika.BasicProperties(delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE  
    )
    properties = pika.BasicProperties(delivery_mode=2)
    
  • 两者是等价写法

5.关闭连接

connection.close()
  • 如果传一次消息,关闭一次连接,每次都要消耗时间重新建立连接
  • 所以一般传完最后一次消息,再关闭连接。那么要注意处理连接断开等异常问题

第三步:编写消费者(Consumer)代码

import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(..........)) # 同上
channel = connection.channel()channel.queue_declare(..........) # 同上def callback(ch, method, properties, body):........... # 自己要处理的逻辑# 确定监听队列参数
channel.basic_consume(queue='hello_queue',auto_ack=True, # 应答参数on_message_callback=callback)print(' Waiting for messages. To exit press CTRL+C')
channel.start_consuming() # 真正开始从队列里取消息
  • 如果指定的queue不存在,则会创建一个queue,如果已经存在 则不会做其他动作,官方推荐,每次使用时都创建队列。这是因为生产者进程可能晚于消费者进程被执行。
  • 有消息取出来开始执行 callback,没消息就等待

channel.basic_consume(queue='hello_queue',
auto_ack=True, # 应答参数
on_message_callback=callback)

 auto_ack=True 默认应答      (效率高)

队列有数据,取走,取走就出队,队列里就没有了。

如果消费者进程出bug了,callback没有成功执行,但是已经出队了,没办法重新处理

http://www.dtcms.com/a/423111.html

相关文章:

  • 阿里巴巴 Java 开发手册 v1.2.0
  • Leetcode+Java+单调栈
  • Word和WPS文字如何从特定的页开始编号(页码)?
  • EDSR模型
  • thinkphp做中英文网站网站跟app区别
  • 6. 数据库设计基础知识
  • 【nginx平滑升级演示】
  • 桥梁缺陷检测数据集:腐蚀、剥落、渗透等5类,3k+图像,yolo标注
  • 上交提出单图生成3D场景方法SceneGen:单图输入,多资源输出,3D 合成性能飙升的“秘密武器”!
  • 百度验证网站济南网络科技公司
  • NO5.硼:火箭专家
  • 细化处理refinement process
  • 第四部分:VTK常用类详解(第120章 vtkWarpTo变形到类)
  • Day01_Linux移植基础
  • 工控网做网站维护吗免费网站建站申请
  • kcwebplus可视化框架
  • JVM如何管理直接内存?
  • 【完整源码+数据集+部署教程】医疗设备显示器图像分割系统: yolov8-seg-C2f-SCConv
  • PyCharm项目依赖库的备份与还原方法
  • OpenSSL 3.0对某些加密算法增加了限制
  • git fatal:Server aborted the SSL handshake
  • 深入理解 Python `ssl` 库:安全通信的基石
  • 江门网页建站模板临沂网站建设费用
  • 网站手机模板和pc模板要分开做网站首页需求
  • 国内 huggingfaces 仓下载
  • 基因组学发展史
  • 论文阅读(第4章,page55)
  • java设计模式:适配器模式
  • 做微商网站制作迪虎科技网站建设
  • Cobalt Strike 学习笔记(1)