mq消息可靠性传送
mq消息传送
开启消息发布确认模式
def publish(self, message):"""发布消息(自动重连)"""for i in range(3):try:message_ = json.dumps(message, ensure_ascii=False)self.ensure_connection()# 开启 confirm 模式(Publisher Confirms)self.channel.confirm_delivery()self.channel.basic_publish(exchange=self.queue_exchange,routing_key=self.queue_routing_key,body=message_,properties=pika.BasicProperties(delivery_mode=2),mandatory=True) # 持久化消息logger.info(resp_dic["infoId"] + ":" + str(message))# print("confirm---",str(confirm))returnexcept Exception as e:logger.error(f"Publish failed: {e}. Reconnecting...{message}")self.is_connected = Falseelse:logger.error("Failed to publish message after 3 attempts.")
删掉消息队列之后
1 unroutable message(s) returned. Reconnecting...
self.channel.confirm_delivery() 开启消息确认机制,防止消息丢失