当前位置: 首页 > news >正文

Python使用消息队列rabbitmq

项目文件:base.py、publish.py、consumer.py

base.py

import json
import datetime
import time
import random
import pika
from pika.exceptions import ChannelClosed, ConnectionClosed# rabbitmq 配置信息
MQ_CONFIG = {"hostname": "127.0.0.1","port": 5672,"vhost": "my_vhost","username": "admin","password": "adminxxx","exchange": "my_exchange","queue": "my_queue","routing_key": "my_key"
}class RabbitMQServer(object):def __init__(self):self.config = MQ_CONFIG  # 配置文件加载self.host = self.config.get("hostname")self.port = self.config.get("port")self.username = self.config.get("username")self.password = self.config.get("password")self.vhost = self.config.get("vhost")self.exchange = self.config.get("exchange")self.queue = self.config.get("queue")self.routing_key = self.config.get("routing_key")self.connection = Noneself.channel = None# 关于队列的声明,如果使用同一套参数进行声明了,就不能再使用其他参数来声明self.arguments = {'x-message-ttl': 82800000,   # 设置队列中的所有消息的生存周期'x-expires': 82800000,       # 当队列在指定的时间没有被访问(consume, basicGet, queueDeclare…)就会被删除'x-max-length': 100000,      # 限定队列的消息的最大条数,超过指定条数将会把最早的几条删除掉'x-max-priority': 10         # 声明队列时先定义最大优先级值,在发布消息的时候指定该消息的优先级}def reconnect(self):try:if self.connection and not self.connection.is_closed:self.connection.close()credentials = pika.PlainCredentials(self.username, self.password)parameters = pika.ConnectionParameters(self.host, self.port, self.vhost, credentials)self.connection = pika.BlockingConnection(parameters)self.channel = self.connection.channel()self.channel.exchange_declare(exchange=self.exchange, exchange_type="direct", durable=True)self.channel.queue_declare(queue=self.queue, exclusive=False, durable=True, arguments=self.arguments) self.channel.queue_bind(exchange=self.exchange, queue=self.queue, routing_key=self.routing_key)if isinstance(self, RabbitComsumer):self.channel.basic_qos(prefetch_count=1)  # prefetch 表明最大阻塞未ack的消息数量self.channel.basic_consume(on_message_callback=self.consumer_callback, queue=self.queue, auto_ack=False)except Exception as e:print("RECONNECT: ", e)class RabbitPublisher(RabbitMQServer):def __init__(self):super(RabbitPublisher, self).__init__()def start_publish(self):self.reconnect()i = 1while True:message = {"value": i}try:self.channel.basic_publish(exchange=self.exchange, routing_key=self.routing_key, body=json.dumps(message))print("Publish value: ", i)i += 1time.sleep(3)except ConnectionClosed as e:print("ConnectionClosed: ", e)self.reconnect()time.sleep(2)except ChannelClosed as e:print("ChannelClosed: ", e)self.reconnect()time.sleep(2)except Exception as e:print("basic_publish: ", e)self.reconnect()time.sleep(2)class RabbitComsumer(RabbitMQServer):def __init__(self):super(RabbitComsumer, self).__init__()def execute(self, body):body = body.decode('utf8')body = json.loads(body)print(body["value"])return Truedef consumer_callback(self, channel, method, properties, body):result = self.execute(body)if channel.is_open:if result:channel.basic_ack(delivery_tag=method.delivery_tag)   # 发送ackelse:channel.basic_nack(delivery_tag=method.delivery_tag, multiple=False, requeue=True)if not channel.is_open:print("Callback 接收频道关闭,无法ack")def start_consumer(self):self.reconnect()while True:try:self.channel.start_consuming()  #启动消息接受 进入死循环except ConnectionClosed as e:print("ConnectionClosed: ", e)self.reconnect()time.sleep(2)except ChannelClosed as e:print("ChannelClosed: ", e)self.reconnect()time.sleep(2)except Exception as e:print("consuming: ", e)self.reconnect()time.sleep(2)

publish.py

from base import RabbitPublisherif __name__ == '__main__':publisher = RabbitPublisher()publisher.start_publish()

consumer.py

from base import RabbitComsumerif __name__ == '__main__':consumer = RabbitComsumer()consumer.start_consumer()

http://www.dtcms.com/a/596861.html

相关文章:

  • GBD调试KingSCADA详细步骤
  • 做美妆的网站南昌优化网站分析
  • 上海个人医疗网站备案尖扎县公司网站建设
  • 多端统一的教育系统源码开发详解:Web、小程序与APP的无缝融合
  • uniapp小程序 订阅消息推送
  • 微信小程序管理系统,代运营3600+医院小程序
  • 重庆论坛网站建设在网站开发中应该避免哪些漏洞
  • Spring Boot整合Redis注解,实战Redis注解使用
  • 数学分析简明教程——3.5
  • php网站500错误电子商务网站建设的作用
  • S21 布隆过滤器
  • 刷题日常 4 二叉树层序遍历
  • field ——AUTOCAD设置字段
  • SVN 启动模式
  • 论文阅读《Curse of Rarity for Autonomous Vehicles》(稀疏度灾难CoR)
  • dw怎么做网站地图室内设计效果图高清
  • 专业建设信息化网站资源中英文外贸网站模版
  • 冲压和模具制造案例丨通过Prolink实现车间数据收集和分析自动化
  • Java:高效删除Excel中的空白行和列
  • Claude Code 重大更新:支持一键原生安装,彻底别了 Node.js
  • 技术漏洞被钻营!Agent 感知伪装借 ChatGPT Atlas 批量输出虚假数据,AI 安全防线面临新挑战
  • HarmonyOS布局优化实战:扁平化布局与高性能组件应用
  • 万能的开源制图利器 —— draw.io(diagrams.net)自托管与应用分享
  • 如何做海外淘宝网站个人网页设计作业
  • 花灯彩灯制作公司四川seo推广公司
  • CANN核心特性深度解析:简化AI开发的技术优势
  • YOLOv2算法详解(上篇):从经典到进化的目标检测之路
  • Detect Anything via Next Point Prediction论文解读
  • eclipse maven 项目 提示 http://maven.apache.org/xsd/maven-4.0.0.xsd‘
  • Spring AI Alibaba 静态RAG实战:让AI成为你的专属知识管家