当前位置: 首页 > news >正文

使用python的pika链接rabbitMq断裂

比如我们执行一个很长的任务的时候,执行结束ack确认发现确认失败,mq都断了。

只要是使用pyhon的pika都会出现这个问题,因为pika本身是没有主动发送心跳机制的(你用java的话是没问题的)

解决方式:

在链接中heartbeat=0

credentials = pika.PlainCredentials('xxx','xxx')
connection = pika.BlockingConnection(pika.ConnectionParameters(
host = "xxxx",port = 5672, credentials = credentials , heartbeat=0
))

解决方式2:

我亲自试过,确实有用

改写代码(引用:Python RabbitMQ/Pika 长连接断开报错Connection reset by peer和pop from an empty deque_pika.exceptions.streamlosterror: stream connection-CSDN博客)

"""
@author: Zhigang Jiang
@date: 2022/1/16
@description:
"""
import functools
import pika
import threading
import time


def ack_message(channel, delivery_tag):
    print(f'ack_message thread id: {threading.get_ident()}')
    if channel.is_open:
        channel.basic_ack(delivery_tag)
    else:
        # Channel is already closed, so we can't ACK this message;
        # log and/or do something that makes sense for your app in this case.
        pass


def do_work(channel, delivery_tag, body):
    print(f'do_work thread id: {threading.get_ident()}')
    print(body, "start")
    for i in range(10):
        print(i)
        time.sleep(20)
    print(body, "end")

    cb = functools.partial(ack_message, channel, delivery_tag)
    channel.connection.add_callback_threadsafe(cb)


def on_message(channel, method_frame, header_frame, body):
    print(f'on_message thread id: {threading.get_ident()}')
    delivery_tag = method_frame.delivery_tag
    t = threading.Thread(target=do_work, args=(channel, delivery_tag, body))
    t.start()


credentials = pika.PlainCredentials('username', 'password')
parameters = pika.ConnectionParameters('test.webapi.username.com', credentials=credentials, heartbeat=5)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue="standard", durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume('standard', on_message)

print(f'main thread id: {threading.get_ident()}')
try:
    channel.start_consuming()
except KeyboardInterrupt:
    channel.stop_consuming()
connection.close()

长时间的话,家里的网抖动可能出现,我们家有时候就会断网个10几秒,有时候打游戏就会掉线:

pika.exceptions.AMQPHeartbeatTimeout: No activity or too many missed heartbeats in the last xx seconds
这种情况,把他拉起就行了,加一个

while True:
    try:
        # 用户名密码,没有设置的可以省略这一步
        credentials = pika.PlainCredentials('xx', 'xx')
        connection = pika.BlockingConnection(pika.ConnectionParameters(
            host="xxxx", port=5672, credentials=credentials, heartbeat=10
        ))
        channel = connection.channel()
        channel.queue_declare(queue="xxx", durable=True)  # 如果是持久化队列就是True

        channel.basic_qos(prefetch_count=1)
        channel.basic_consume("xxx", on_message)
        print(f'main thread id: {threading.get_ident()}')
        print("开始消费")
        channel.start_consuming()
    except KeyboardInterrupt:
        # channel.stop_consuming()
        print("出现异常,可能是网络原因,重新启动"+e)
        time.sleep(30)

相关文章:

  • 【网络面试(5)】收发数据及断开服务器(四次挥手)
  • 自动化网络故障修复管理
  • 常用设计模式全面总结版(JavaKotlin)
  • 垃圾收集器与内存分配策略
  • 记录:开始学习网络安全
  • 牛客网SQL训练5—SQL大厂面试真题
  • 信号量机制(重要)-第二十八天
  • 系统学习Python——装饰器:函数装饰器-[对方法进行装饰:基础知识]
  • 【JVM】一文掌握JVM垃圾回收机制
  • 计算机基础面试题 |04.精选计算机基础面试题
  • 微软开源,全平台通用:Shell 自动补全工具 | 开源日报 No.132
  • uni-app tabbar组件
  • R_handbook_统计分析
  • ES应用_ES原理
  • 【c语言】飞机大战2
  • centos 安装 配置 zsh
  • Stable Diffusion WebUI制作光影文字效果
  • Android Studio 如何隐藏默认标题栏
  • 为即将到来的量子攻击做好准备的 4 个步骤
  • QT QPluginloader 加载失败,出现Unknown error 0x000000c1的问题
  • 肖钢:一季度证券业金融科技投资强度在金融各子行业中居首
  • 蒲慕明院士:未来数十年不是AI取代人,而是会用AI的人取代不会用的
  • 2025年“新时代网络文明公益广告”征集展示活动在沪启动
  • 圆桌丨新能源车超充技术元年,专家呼吁重视电网承载能力可能面临的结构性挑战
  • 陕西榆林:全力推进榆林学院升格榆林大学
  • 缅甸内观冥想的历史漂流:从“人民鸦片”到东方灵修