当前位置: 首页 > news >正文

【星海随笔】RabbitMQ开发篇

生产消息

import pika
import json
import time
import logging
from typing import Dict, Any
import argparselogging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)class RobustRabbitMQProducer:def __init__(self, host='localhost', port=5672, vhost='vhost_test' ,username='rabbitmq', password='rabbitmq@123',connection_attempts=3,aliyun_object_name="None"):self.host = hostself.port = portself.username = usernameself.password = passwordself.vhost = vhostself.connection_attempts = connection_attemptsself.connection = Noneself.channel = Noneself.aliyun_obj = aliyun_object_nameself.connect()def connect(self):"""建立连接"""try:self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.host,port=self.port,credentials=pika.PlainCredentials(self.username, self.password),virtual_host=self.vhost,connection_attempts=self.connection_attempts,heartbeat=600,blocked_connection_timeout=300))self.channel = self.connection.channel()self.channel.exchange_declare(exchange='unlock_rclone_topic_exchange',  # 交换机名称exchange_type='topic',  # 交换机类型(与路由键匹配)durable=True,  # 持久化(重启后不丢失)auto_delete=False  # 不自动删除)logger.info("✅ 成功连接到RabbitMQ")except Exception as e:logger.error(f"❌ 连接RabbitMQ失败: {e}")msg=f"❌ 连接RabbitMQ失败: {e}"self.warning_send_to_wechat(record_msg=msg,obj_name=f"{self.aliyun_obj}")raisedef warning_send_to_wechat(self,record_msg=None,obj_name=None):import requestsimport jsonurl = "《机器人URL》"# 构建正确的JSON payloadpayload = {"msgtype": "text","text": {"content": f"阿里云OSS解锁记录失败: {record_msg}\n 对象: {obj_name}","mentioned_list": ["@ user_ID"]}}headers = {'Content-Type': 'application/json'  # 修正Content-Type}response = requests.post(url, headers=headers, data=json.dumps(payload))# 检查响应if response.status_code == 200:msg="企业微信通知发送成功"print(msg)#return True,msgelse:msg=f"企业微信通知发送失败: {record_msg,obj_name} "print(msg)def ensure_connection(self):"""确保连接有效"""if self.connection is None or self.connection.is_closed:logger.warning("连接已关闭,尝试重新连接...")self.connect()def send_message(self, exchange: str, routing_key: str, message: Dict[str, Any],persistent: bool = True, retry_count: int = 3):"""发送消息(带重试机制)"""for attempt in range(retry_count):try:self.ensure_connection()properties = pika.BasicProperties(delivery_mode=2 if persistent else 1,  # 2=持久化,1=非持久化content_type='application/json',timestamp=int(time.time()))self.channel.basic_publish(exchange=exchange,routing_key=routing_key,body=json.dumps(message),properties=properties,mandatory=True  # 确保消息被路由到队列)logger.info(f"📤 消息发送成功: {exchange} -> {routing_key}")return Trueexcept pika.exceptions.UnroutableError:self.warning_send_to_wechat(record_msg=f"⚠️ 消息无法路由: {routing_key}",obj_name=f"{message}")logger.warning(f"⚠️ 消息无法路由: {routing_key}")return Falseexcept Exception as e:self.warning_send_to_wechat(record_msg=f"❌ 发送失败  尝试 {attempt + 1}/{retry_count}: {e}",obj_name=f"{message}")logger.error(f"❌ 发送失败 (尝试 {attempt + 1}/{retry_count}): {e}")if attempt < retry_count - 1:time.sleep(2)  # 等待后重试else:raisedef close(self):"""安全关闭连接"""if self.connection and not self.connection.is_closed:self.connection.close()logger.info("🔌 连接已关闭")def parse_args():"""解析命令行入口参数,定义用户可传递的参数规则"""parser = argparse.ArgumentParser(description="RabbitMQ消息发送脚本:支持用户传递rclone相关的对象名及消息内容",formatter_class=argparse.RawTextHelpFormatter  # 支持换行显示帮助信息)# 1. 必传核心参数:rclone用到的对象名parser.add_argument("--aliyun-obj",  # 参数名(长选项)"-a",  # 短选项required=True,  # 必传type=str,help="阿里云对象存储的对象名(rclone操作目标,例如:oss://my-bucket/path)")parser.add_argument("--minio-obj","-m",required=True,type=str,help="MinIO对象存储的对象名(rclone操作目标,例如:minio://my-bucket/path)")# 2. 消息内容参数:支持用户自定义(提供默认值,可选传)parser.add_argument("--message-ali","-ma",type=str,default='{"service": "aliyun", "action": "copy"}',help='阿里云相关消息体(JSON格式字符串)\n''示例:\'{"service":"aliyun","action":"copy","files":["data.csv"]}\'\n''默认值:{"service": "aliyun", "action": "copy"}')parser.add_argument("--message-minio","-mm",type=str,default='{"tool": "rclone", "action": "copy", "source": "/tmp"}',help='MinIO相关消息体(JSON格式字符串)\n''示例:\'{"tool":"rclone","action":"move","source":"/data"}\'\n''默认值:{"tool": "rclone", "action": "copy", "source": "/tmp"}')# 3. 可选参数:RabbitMQ交换器和路由键(如需灵活配置可开放,这里用默认值)parser.add_argument("--exchange","-e",type=str,default="unlock_rclone_topic_exchange",help="RabbitMQ主题交换器名称(默认:unlock_rclone_topic_exchange)")parser.add_argument("--routing-key-ali","-rka",type=str,default="aliV4.sync",help="阿里云消息的路由键(默认:aliV4.sync)")parser.add_argument("--routing-key-minio","-rkm",type=str,default="rclone.copy",help="MinIO消息的路由键(默认:rclone.copy)")return parser.parse_args()def validate_and_parse_json(json_str: str, param_name: str) -> dict:"""校验并解析JSON格式的字符串为字典:param json_str: 待解析的JSON字符串:param param_name: 参数名(用于报错提示):return: 解析后的字典"""import json  # 局部导入(仅用到时加载)try:return json.loads(json_str)except json.JSONDecodeError as e:raise ValueError(f"参数【{param_name}】格式错误,必须是合法JSON字符串:{e}")def build_messages(args) -> list:"""根据用户传入的参数,构建要发送的RabbitMQ消息列表:param args: 解析后的命令行参数:return: 消息列表"""# 解析JSON格式的消息体msg_ali = validate_and_parse_json(args.message_ali, "--message-ali")msg_minio = validate_and_parse_json(args.message_minio, "--message-minio")# 构建消息(可根据需求将 aliyun-obj/minio-obj 嵌入消息体)# 注意:这里默认将对象名加入消息体的"target"字段,如需调整位置可修改messages = [{"exchange": args.exchange,"routing_key": args.routing_key_ali,"message": {**msg_ali, "target": args.aliyun_obj}  # 合并用户消息与对象名},{"exchange": args.exchange,"routing_key": args.routing_key_minio,"message": {**msg_minio, "target": args.minio_obj}  # 合并用户消息与对象名}]return messagesdef main():# 1. 解析命令行参数args = parse_args()# 2. 初始化RabbitMQ生产者producer = RobustRabbitMQProducer(aliyun_object_name=args.aliyun_obj)try:# 3. 构建消息列表messages_to_send = build_messages(args)print(f"📋 待发送消息列表:{messages_to_send}")# 4. 发送消息for idx, msg in enumerate(messages_to_send, start=1):producer.send_message(exchange=msg["exchange"],routing_key=msg["routing_key"],message=msg["message"])print(f"✅ 第{idx}条消息发送成功")time.sleep(0.5)  # 避免消息发送过快print("\n🎉 所有消息发送完成!")except ValueError as ve:# 捕获参数校验错误(如JSON格式错误)print(f"❌ 参数错误: {ve}")except Exception as e:# 捕获其他运行时错误(如RabbitMQ连接失败)print(f"💥 发送过程中出错: {str(e)}")finally:# 确保生产者连接关闭if "producer" in locals():  # 避免未初始化时调用close()producer.close()print("🔌 RabbitMQ生产者连接已关闭")if __name__ == "__main__":main()

执行

python send_message.py -a <ali-bucket>://<object_all_path> -m  <other-App>://<object_all_path> -mm '{"tool": "rclone", "action": "copy", "source": "<object_name_all_path>"}'
customer.py 消费者
#!/usr/bin/env python3
import pika
import json
import argparse
from typing import Optional, Dict, Anyclass RabbitMQMessageReader:def __init__(self, host: str = 'localhost', port: int = 5672,username: str = 'Account', password: str = 'Password',vhost: str = 'vhost_prod'):self.connection_params = pika.ConnectionParameters(host=host,port=port,virtual_host=vhost,credentials=pika.PlainCredentials(username, password),heartbeat=600,blocked_connection_timeout=300)def read_single_message(self, queue_name: str) -> Optional[Dict[str, Any]]:"""读取单条消息但不删除"""connection = Nonetry:connection = pika.BlockingConnection(self.connection_params)channel = connection.channel()# 声明队列(确保队列存在)channel.queue_declare(queue=queue_name, passive=True)# 获取消息method_frame, properties, body = channel.basic_get(queue=queue_name,auto_ack=False)if method_frame:message_info = {'delivery_tag': method_frame.delivery_tag,'routing_key': method_frame.routing_key,'exchange': method_frame.exchange,'redelivered': method_frame.redelivered,'message_count': method_frame.message_count,'body': body}# 拒绝消息并重新入队channel.basic_reject(method_frame.delivery_tag, requeue=True)return message_infoelse:print("队列为空")return Noneexcept pika.exceptions.ChannelClosedByBroker as e:print(f"队列不存在或无法访问: {e}")except Exception as e:print(f"错误: {e}")finally:if connection and not connection.is_closed:connection.close()return Nonedef decode_message_body(self, body: bytes) -> Any:"""解码消息体"""try:text = body.decode('utf-8')# 尝试解析为 JSONif text.strip().startswith('{') or text.strip().startswith('['):return json.loads(text)return textexcept UnicodeDecodeError:return body  # 返回原始字节except json.JSONDecodeError:return text  # 返回文本def print_message(self, message_info: Dict[str, Any]):"""美化打印消息"""print("=" * 60)print("RabbitMQ 消息详情")print("=" * 60)print(f"消息ID: {message_info['delivery_tag']}")print(f"路由键: {message_info['routing_key']}")print(f"交换器: {message_info['exchange']}")print(f"重投递: {message_info['redelivered']}")print(f"队列剩余消息: {message_info['message_count']}")print("\n消息内容:")decoded_body = self.decode_message_body(message_info['body'])if isinstance(decoded_body, dict):print(json.dumps(decoded_body, ensure_ascii=False, indent=2))else:print(decoded_body)print("\n消息已重新放回队列")def main():parser = argparse.ArgumentParser(description='读取 RabbitMQ 消息(不删除)')parser.add_argument('--queue', required=True, help='队列名称')parser.add_argument('--host', default='localhost', help='RabbitMQ 主机')parser.add_argument('--port', type=int, default=5672, help='RabbitMQ 端口')parser.add_argument('-u', '--username', default='Account', help='用户名')parser.add_argument('-p', '--password', required=True, help='密码')parser.add_argument('-V', '--vhost', default='vhost_prod', help='虚拟主机')args = parser.parse_args()reader = RabbitMQMessageReader(host=args.host,port=args.port,username=args.username,password=args.password,vhost=args.vhost)message = reader.read_single_message(args.queue)if message:reader.print_message(message)if __name__ == '__main__':# 使用示例# python3 script.py --queue ali_queue -p Passwordmain()    

消费者确认

手动确认的三种方式:
basic_ack(delivery_tag) - 确认并删除消息

basic_reject(delivery_tag, requeue=True) - 拒绝并重新入队

basic_reject(delivery_tag, requeue=False) - 拒绝并丢弃

例如:

def manual_ack_example():credentials = pika.PlainCredentials('Account', 'Password')parameters = pika.ConnectionParameters(host='localhost',virtual_host='vhost_prod',credentials=credentials)connection = pika.BlockingConnection(parameters)channel = connection.channel()# 获取消息,不自动确认method_frame, header_frame, body = channel.basic_get(queue='ali_queue',auto_ack=False  # 重要:手动确认模式)if method_frame:print(f"收到消息: {body.decode()}")# 手动确认消息(从队列中删除)channel.basic_ack(delivery_tag=method_frame.delivery_tag)print("消息已确认并删除")# 或者拒绝消息并重新入队# channel.basic_reject(delivery_tag=method_frame.delivery_tag, requeue=True)# print("消息已拒绝并重新入队")else:print("没有消息")connection.close()
http://www.dtcms.com/a/398171.html

相关文章:

  • 深入理解 RabbitMQ:消息处理全流程与核心能力解析
  • docker安装canal-server(v.1.1.8)【mysql->rabbitMQ】
  • 学习嵌入式的第四十天——ARM
  • 佛山营销网站建设公司益阳市城乡和住房建设部网站
  • Linux磁盘数据挂载以及迁移
  • 【图像算法 - 28】基于YOLO与PyQt5的多路智能目标检测系统设计与实现
  • Android音视频编解码全流程之Muxer
  • 一家做土产网站呼和浩特网站建设信息
  • Android Studio - Android Studio 检查特定资源被引用的情况
  • 借助Aspose.HTML控件,使用 Python 编程创建 HTML 页面
  • 营销型网站建设运营网站建设yuanmus
  • Day67 基本情报技术者 单词表02 编程基础
  • 《Java操作Redis教程:以及序列化概念和实现》
  • 欧拉公式与拉普拉斯变换的关系探讨与深入理解
  • 新的EclipesNeon,新的开始,第003章
  • 计算机专业课《数据库系统》核心解析
  • 光流 | 2025年光流及改进算法综述:原理、公式与MATLAB实现
  • 做外贸网站的价格嘉兴网站建设培训
  • 西宁制作网站需要多少钱做网站数据库多少钱
  • [第二章] web入门—N1book靶场详细思路讲解(一)
  • ES 的 shards 是什么
  • LVS:Linux 内核级负载均衡的架构设计、三种工作模式与十大调度算法详解
  • 【触想智能】工业一体机在金融领域的应用优势和具体注意事项
  • 制作大模型获取天气数据工具(和风API)
  • Nginx服务部署与配置(Day.2)
  • 计算机课程《网络安全》课程导览:开启数字世界的守护之旅
  • 网站系统开发精品网站开发
  • 国外ps网站产品推广方案ppt
  • 【MuJoCo学习笔记】#2 接触动力学 腱系统 执行器 传感器
  • 北京 旅游攻略