当前位置: 首页 > news >正文

RabbitMQ -- 保障消息可靠性

再说可靠性之前,我们先来想一想消息会在哪些地方丢失。因为我们想明白消息在哪丢失了之后,我们就可以针对那个丢失的地方去解决,来使得消息在那个地方不再会丢失。这样一来,不就保障了消息的可靠传输吗。

消息可能丢失的地方:

  1. 生产者发消息给rabbitmq失败:

    可能原因:出现网络问题。

  2. 消息在交换机中无法路由到指定队列:

    可能原因:代码或配置层面出错,导致消息路由失败。

  3. 消息队列自身数据丢失:

    可能原因:消息到达rabbitmq后,rabbitmq server 宕机导致消息丢失。

  4. 消费者异常,导致消息丢失:

    可能原因:消息到达消费者,还没来得及消费,消费者就宕机了。

当然rabbitmq也提供了一些机制来保证消息的可靠传输。

生产者

在使⽤ RabbitMQ的时候, 可以通过消息持久化来解决因为服务器的异常崩溃⽽导致的消息丢失, 但是还有⼀个问题, 当消息的⽣产者将消息发送出去之后, 消息到底有没有正确地到达服务器呢? 如果在消息到达服务器之前已经丢失(⽐如RabbitMQ重启, 那么RabbitMQ重启期间⽣产者消息投递失败), 持久化操作也解决不了这个问题,因为消息根本没有到达服务器,何谈持久化? RabbitMQ为我们提供了两种解决⽅案: a. 通过事务机制实现 b. 通过发送⽅确认(publisher confirm) 机制实现 事务机制⽐较消耗性能, 在实际⼯作中使⽤也不多, 咱们主要介绍confirm机制来实现发送⽅的确认.RabbitMQ为我们提供了两个⽅式来控制消息的可靠性投递:

  1. confirm确认模式
  2. return退回模式

confirm确认模式

Producer 在发送消息的时候, 对发送端设置⼀个ConfirmCallback的监听, ⽆论消息是否到达Exchange, 这个监听都会被执⾏, 如果Exchange成功收到, ACK( Acknowledge character , 确认字符)为true, 如果没收到消息, ACK就为false. 步骤如下:

  1. 配置RabbitMQ
  2. 设置确认回调逻辑并发送消息
  3. 测试

配置RabbitMQ

spring:rabbitmq:addresses: amqp://study:study@110.41.51.65:15673/bitelistener:simple:acknowledge-mode: manual #消息接收确认publisher-confirm-type: correlated #消息发送确认

设置确认回调逻辑并发送消息

⽆论消息确认成功还是失败, 都会调⽤ConfirmCallback的confirm⽅法. 如果消息成功发送到Broker,ack为true.如果消息发送失败, ack为false, 并且cause提供失败的原因.

@Component
public class RabbitTemplateConfig {@Beanpublic RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);//设置回掉方法rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("执行了confirm方法");if(ack){System.out.printf("接收到消息, 消息ID:%s \\n",correlationData == null ? null : correlationData.getId());}else {System.out.printf("未接收到消息,消息ID:%s , cause: %s \\n",correlationData == null ? null : correlationData.getId(),cause);//相应地业务处理}}});return rabbitTemplate;}
}
@RequestMapping("/product")
@RestController
public class ProducerController {@Resource(name = "confirmRabbitTemplate")private RabbitTemplate confirmRabbitTemplate;@RequestMapping("confirm")public String confirm(){CorrelationData correlationData = new CorrelationData("1");confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE,"confirm","confirm test...",correlationData);return "消息发送成功";}
}

⽅法说明:

public interface ConfirmCallback {/*** 确认回调* @param correlationData: 发送消息时的附加信息, 通常⽤于在确认回调中识别特定的消息* @param ack: 交换机是否收到消息, 收到为true, 未收到为false* @param cause: 当消息确认失败时,这个字符串参数将提供失败的原因.这个原因可以⽤于调试和错误处理.* 成功时, cause为null */void confirm(@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause);
}
  • RabbitTemplate.ConfirmCallback 和 ConfirmListener 区别

    在RabbitMQ中, ConfirmListener和ConfirmCallback都是⽤来处理消息确认的机制, 但它们属于不同的客⼾端库, 并且使⽤的场景和⽅式有所不同.

    1. ConfirmListener 是 RabbitMQ Java Client 库中的接⼝. 这个库是 RabbitMQ 官⽅提供的⼀个直接与RabbitMQ服务器交互的客⼾端库. ConfirmListener 接⼝提供了两个⽅法: handleAck 和handleNack, ⽤于处理消息确认和否定确认的事件.
    2. ConfirmCallback 是 Spring AMQP 框架中的⼀个接⼝. 专⻔为Spring环境设计. ⽤于简化与RabbitMQ交互的过程. 它只包含⼀个 confirm ⽅法,⽤于处理消息确认的回调.
    3. 在 Spring Boot 应⽤中, 通常会使⽤ ConfirmCallback, 因为它与 Spring 框架的其他部分更加整合, 可以利⽤ Spring 的配置和依赖注⼊功能. ⽽在使⽤ RabbitMQ Java Client 库时, 则可能会直接实现ConfirmListener 接⼝, 更直接的与RabbitMQ的Channel交互

测试

运⾏程序, 调⽤接⼝ http://127.0.0.1:8080/product/confirm

观察控制台, 消息确认成功.

执行了confirm方法
接收到消息, 消息ID:1 

接下来把交换机名称改下, 重新运⾏, 会触发另⼀个结果.

		@RequestMapping("confirm")public String confirm(){CorrelationData correlationData = new CorrelationData("1");confirmRabbitTemplate.convertAndSend("confirm1","confirm","confirm test...",correlationData);return "消息发送成功";}
2025-10-18T11:23:44.464+08:00 ERROR 52449 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'confirm1' in vhost 'study', class-id=60, method-id=40)
执行了confirm方法
未接收到消息,消息ID:1 , cause: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'confirm1' in vhost 'study', class-id=60, method-id=40) 

原因中, 明确显⽰"no exchange 'confirm_exchange1' in vhost 'bite'" 也就是说, bite这个虚拟机, 没有名字为confirm_exchange1的交换机.

return退回模式

消息到达Exchange之后, 会根据路由规则匹配, 把消息放⼊Queue中. Exchange到Queue的过程, 如果⼀条消息⽆法被任何队列消费(即没有队列与消息的路由键匹配或队列不存在等), 可以选择把消息退回给发送者. 消息退回给发送者时, 我们可以设置⼀个返回回调⽅法, 对消息进⾏处理.

步骤如下:

  1. 配置RabbitMQ
  2. 设置返回回调逻辑并发送消息
  3. 测试

配置RabbitMQ

spring:rabbitmq:addresses: amqp://study:study@110.41.51.65:15673/bitelistener:simple:acknowledge-mode: manual #消息接收确认publisher-confirm-type: correlated #消息发送确认

设置返回回调逻辑并发送消息

@Component
public class RabbitTemplateConfig {@Beanpublic RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);//消息被退回时,回掉方法rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {System.out.println("消息退回:" + returned);}});return rabbitTemplate;}
}
		@RequestMapping("msgReturn")public String msgReturn(){CorrelationData correlationData = new CorrelationData("2");confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE,"confirm","message return test..",correlationData);return "消息发送成功";}

使⽤RabbitTemplate的setMandatory⽅法设置消息的mandatory属性为true(默认为false). 这个属性的作⽤是告诉RabbitMQ, 如果⼀条消息⽆法被任何队列消费, RabbitMQ应该将消息返回给发送者, 此时 ReturnCallback 就会被触发.

测试

运⾏程序, 调⽤接⼝ http://127.0.0.1:8080/product/msgReturn

观察控制台, 消息被退回

消息退回:ReturnedMessage [message=(Body:'message return test..' MessageProperties [headers={spring_returned_message_correlation=2}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), replyCode=312, replyText=NO_ROUTE, exchange=confirm.exchange, routingKey=confirm11]

回调函数中有⼀个参数: ReturnedMessage, 包含以下属性:

public class ReturnedMessage {//返回的消息对象,包含了消息体和消息属性private final Message message;//由Broker提供的回复码, 表⽰消息⽆法路由的原因. 通常是⼀个数字代码,每个数字代表不同的含义. private final int replyCode;//⼀个⽂本字符串, 提供了⽆法路由消息的额外信息或错误描述.private final String replyText;//消息被发送到的交换机名称private final String exchange;//消息的路由键,即发送消息时指定的键private final String routingKey;.....
}

broker

持久化(durable)

有一个问题:如何保证当RabbitMQ服务停掉以后, ⽣产者发送的消息不丢失呢. 默认情况下, RabbitMQ 退出或者由于某种原因崩溃时, 会忽视队列和消息, 除⾮告知他不要这么做。这也是我们将要讲的“持久化”。

RabbitMQ的持久化分为三个部分:交换器的持久化、队列的持久化和消息的持久化。

交换器的持久化

交换器的持久化是通过在声明交换机时是将durable参数置为true实现的.相当于将交换机的属性在服务器内部保存,当MQ的服务器发⽣意外或关闭之后,重启 RabbitMQ 时不需要重新去建⽴交换机, 交换机会⾃动建⽴,相当于⼀直存在.

如果交换器不设置持久化, 那么在 RabbitMQ 服务重启之后, 相关的交换机元数据会丢失, 对⼀个⻓期使⽤的交换器来说,建议将其置为持久化的.

ExchangeBuilder.topicExchange(Constant.ACK_EXCHANGE_NAME).durable(true).build()

队列的持久化

队列的持久化是通过在声明队列时将 durable 参数置为 true实现的.如果队列不设置持久化, 那么在RabbitMQ服务重启之后,该队列就会被删掉, 此时数据也会丢失. (队列没 有了, 消息也⽆处可存了)

队列的持久化能保证该队列本⾝的元数据不会因异常情况⽽丢失, 但是并不能保证内部所存储的消息不会丢失. 要确保消息不会丢失, 需要将消息设置为持久化。

QueueBuilder.durable(Constant.ACK_QUEUE).build();

当然,也可以创建非持久化队列:

QueueBuilder.nonDurable(Constant.ACK_QUEUE).build();

消息的持久化

消息实现持久化, 需要把消息的投递模式( MessageProperties 中的 deliveryMode )设置为2,也就是 MessageDeliveryMode.*PERSISTENT*

public enum MessageDeliveryMode {NON_PERSISTENT,//⾮持久化PERSISTENT;//持久化
}

设置了队列和消息的持久化, 当 RabbitMQ 服务重启之后, 消息依旧存在. 如果只设置队列持久化, 重启之后消息会丢失. 如果只设置消息的持久化, 重启之后队列消失, 继⽽消息也丢失. 所以单单设置消息持久化⽽不设置队列的持久化显得毫⽆意义.

不发
//⾮持久化信息
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
//持久化信息
channel.basicPublish("",QUEUE_NAME, 
MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());

MessageProperties.PERSISTENT_TEXT_PLAIN 实际就是封装了这个属性

public static final BasicProperties PERSISTENT_TEXT_PLAIN = new BasicProperties("text/plain",null,null,2, //deliveryMode0, null, null, null,null, null, null, null,null, null);}

如果使⽤RabbitTemplate 发送持久化消息, 代码如下:

		//生产者@RequestMapping("/pres")public String pres(){Message message = new Message("Presistent test...".getBytes(),new MessageProperties());//消息非持久化message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);//消息持久化//message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);System.out.println(message);rabbitTemplate.convertAndSend(Constants.PRES_EXCHANGE,"pres",message);return "消息发送成功";}public class Constants {public static final String ACK_QUEUE = "ack.queue";public static final String ACK_EXCHANGE = "ack.exchange";public static final String PRES_QUEUE = "pres.queue";public static final String PRES_EXCHANGE = "pres.exchange";}//消费者@RabbitListener(queues = Constants.PRES_QUEUE)public void handMessage(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();//消费者逻辑System.out.printf("接收到消息: %s, deliveryTag: %d \\n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());//进行业务逻辑处理System.out.println("业务逻辑处理");//int num = 3/0;System.out.println("业务处理完成");}

RabbitMQ默认情况下会将消息视为持久化的,除⾮队列被声明为⾮持久化,或者消息在发送时被标记为⾮持久化我们也可以通过打印Message这个对象, 来观察消息是否持久化.

(Body:'[B@7a91c856(byte[18])' MessageProperties [headers={}, contentType=application/octet-stream, contentLength=0, deliveryMode=**NON_PERSISTENT**, priority=0, deliveryTag=0])

注意:

<aside> 💡

将所有的消息都设置为持久化, 会严重影响RabbitMQ的性能(随机). 写⼊磁盘的速度⽐写⼊内存的速度慢得不只⼀点点. 对于可靠性不是那么⾼的消息可以不采⽤持久化处理以提⾼整体的吞吐量. 在选择是否要将消息持久化时, 需要在可靠性和吐吞量之间做⼀个权衡.

</aside>

  • 将交换器、队列、消息都设置了持久化之后就能百分之百保证数据不丢失了吗? 答案是否定的.

    1. 在持久化的消息正确存⼊RabbitMQ之后,还需要有⼀段时间(虽然很短,但是不可忽视)才能存⼊磁盘中.RabbitMQ并不会为每条消息都进⾏同步存盘(调⽤内核的fsync⽅法)的处理, 可能仅仅保存到操作系统缓存之中⽽不是物理磁盘之中. 如果在这段时间内RabbitMQ服务节点发⽣了宕机、重启等异常情况, 消息保存还没来得及落盘, 那么这些消息将会丢失.
    2. 从消费者来说, 如果在订阅消费队列时将autoAck参数设置为true, 那么当消费者接收到相关消息之后, 还没来得及处理就宕机了, 这样也算数据居丢失. 这种情况很好解决, 将autoAck参数设置为false, 并进⾏⼿动确认,详细可以参考[消息确认]章节.

    这个问题怎么解决呢?

    1. 引⼊RabbitMQ的仲裁队列(后⾯再讲), 如果主节点(master)在此特殊时间内挂掉, 可以⾃动切换到从节点(slave),这样有效地保证了⾼可⽤性, 除⾮整个集群都挂掉(此⽅法也不能保证100%可靠, 但是配置了仲裁队列要⽐没有配置仲裁队列的可靠性要⾼很多, 实际⽣产环境中的关键业务队列⼀般都会设置仲裁队列).
    2. 还可以在发送端引⼊事务机制或者发送⽅确认机制来保证消息已经正确地发送并存储⾄RabbitMQ中, 详细参考下⼀个章节内容介绍--"发送⽅确认"

消费者

消息确认机制

为了保证消息从队列可靠地到达消费者, RabbitMQ提供了消息确认机制。

消费者在订阅队列时,可以指定 autoAck 参数, 根据这个参数设置, 消息确认机制分为以下两种:

  • ⾃动确认: 当autoAck 等于true时, RabbitMQ 会⾃动把发送出去的消息置为确认, 然后从内存(或者磁盘)中删除, ⽽不管消费者是否真正地消费到了这些消息. ⾃动确认模式适合对于消息可靠性要求不⾼的场景.
  • ⼿动确认: 当autoAck等于false时,RabbitMQ会等待消费者显式地调⽤Basic.Ack命令, 回复确认信号后才从内存(或者磁盘) 中移去消息. 这种模式适合对消息可靠性要求⽐较⾼的场景.
DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息: " + new String(body));}};
channel.basicConsume(Constants.TOPIC_QUEUE_NAME1, true, consumer);

/*** String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;* 参数说明:*  queue:队列名称*  autoAck:是否自动确认*  callback:接收到消息后,执行的逻辑*/
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;

当autoAck参数置为false, 对于RabbitMQ服务端⽽⾔, 队列中的消息分成了两个部分:

  1. 等待投递给消费者的消息.
  2. 已经投递给消费者, 但是还没有收到消费者确认信号的消息.

如果RabbitMQ⼀直没有收到消费者的确认信号, 并且消费此消息的消费者已经断开连接, 则RabbitMQ会安排该消息重新进⼊队列,等待投递给下⼀个消费者,当然也有可能还是原来的那个消费者.

从RabbitMQ的Web管理平台上, 也可以看到当前队列中Ready状态和Unacked状态的消息数.

Ready: 等待投递给消费者的消息数 Unacked: 已经投递给消费者, 但是未收到消费者确认信号的消息数

⼿动确认⽅法

消费者在收到消息之后, 可以选择确认, 也可以选择直接拒绝或者跳过, RabbitMQ也提供了不同的确认应答的⽅式, 消费者客⼾端可以调⽤与其对应的channel的相关⽅法, 共有以下三种

  1. 肯定确认: basicAck(long deliveryTag, boolean multiple)

    RabbitMQ 已知道该消息并且成功的处理消息. 可以将其丢弃了. 参数说明:

    1. deliveryTag: 消息的唯⼀标识,它是⼀个单调递增的64 位的⻓整型值. deliveryTag 是每个通道(Channel)独⽴维护的, 所以在每个通道上都是唯⼀的. 当消费者确认(ack)⼀条消息时, 必须使⽤对应的通道上进⾏确认。
    2. multiple: 是否批量确认. 在某些情况下, 为了减少⽹络流量, 可以对⼀系列连续的 deliveryTag 进⾏批量确认. 值为 true 则会⼀次性 ack所有⼩于或等于指定 deliveryTag 的消息. 值为false, 则只确认当前指定deliveryTag 的消息。

    deliveryTag 是RabbitMQ中消息确认机制的⼀个重要组成部分, 它确保了消息传递的可靠性和顺序性。

  2. 否定确认:basicReject(long deliveryTag, boolean requeue)

    RabbitMQ在2.0.0版本开始引⼊了 Basic.Reject 这个命令, 消费者客⼾端可以调⽤channel.basicReject⽅法来告诉RabbitMQ拒绝这个消息. 参数说明:

    1. deliveryTag: 消息的唯⼀标识,它是⼀个单调递增的64 位的⻓整型值. deliveryTag 是每个通道(Channel)独⽴维护的, 所以在每个通道上都是唯⼀的. 当消费者确认(ack)⼀条消息时, 必须使⽤对应的通道上进⾏确认。
    2. requeue: 表⽰拒绝后, 这条消息如何处理. 如果requeue 参数设置为true, 则RabbitMQ会重新将这条消息存⼊队列,以便可以发送给下⼀个订阅的消费者. 如果requeue参数设置为false, 则RabbitMQ会把消息从队列中移除, ⽽不会把它发送给新的消费者。
  3. 否定确认:basicNack(long deliveryTag, boolean multiple, boolean requeue)

    Basic.Reject命令⼀次只能拒绝⼀条消息,如果想要批量拒绝消息,则可以使⽤Basic.Nack这个命令. 消费者客⼾端可以调⽤ channel.basicNack⽅法来实现.

    1. deliveryTag: 消息的唯⼀标识,它是⼀个单调递增的64 位的⻓整型值. deliveryTag 是每个通道(Channel)独⽴维护的, 所以在每个通道上都是唯⼀的. 当消费者确认(ack)⼀条消息时, 必须使⽤对应的通道上进⾏确认。
    2. multiple参数设置为true则表⽰拒绝deliveryTag编号之前所有未被当前消费者确认的消息.
    3. requeue: 表⽰拒绝后, 这条消息如何处理. 如果requeue 参数设置为true, 则RabbitMQ会重新将这条消息存⼊队列,以便可以发送给下⼀个订阅的消费者. 如果requeue参数设置为false, 则RabbitMQ会把消息从队列中移除, ⽽不会把它发送给新的消费者。

Spring-AMQP 对消息确认机制提供了三种策略:

public enum AcknowledgeMode {/*** No acks - {@code autoAck=true} in {@code Channel.basicConsume()}.*/NONE,/*** Manual acks - user must ack/nack via a channel aware listener.*/MANUAL,/*** Auto - the container will issue the ack/nack based on whether* the listener returns normally, or throws an exception.* <p><em>Do not confuse with RabbitMQ {@code autoAck} which is* represented by {@link #NONE} here</em>.*/AUTO;....
}
  1. AcknowledgeMode.NONE
    • 这种模式下, 消息⼀旦投递给消费者, 不管消费者是否成功处理了消息, RabbitMQ 就会⾃动确认消息, 从RabbitMQ队列中移除消息. 如果消费者处理消息失败, 消息可能会丢失.
  2. AcknowledgeMode.AUTO(默认)
    • 这种模式下, 消费者在消息处理成功时会⾃动确认消息, 但如果处理过程中抛出了异常, 则不会确认消息.
  3. AcknowledgeMode.MANUAL
    • ⼿动确认模式下, 消费者必须在成功处理消息后显式调⽤ basicAck ⽅法来确认消息. 如果消息未被确认, RabbitMQ 会认为消息尚未被成功处理, 并且会在消费者可⽤时重新投递该消息, 这种模式提⾼了消息处理的可靠性, 因为即使消费者处理消息后失败, 消息也不会丢失, ⽽是可以被重新处理.

代码示例:

主要流程:

  1. 配置确认机制(⾃动确认/⼿动机制)
  2. ⽣产者发送消息
  3. 消费端逻辑
  4. 测试

NONE模式

  1. 配置确认机制
spring:rabbitmq:addresses: amqp://admin:admin@localhost:5672/studylistener:simple:acknowledge-mode: none  
  1. ⽣产者发送消息
//队列,交换机配置
public class Constants {public static final String ACK_QUEUE = "ack.queue";public static final String ACK_EXCHANGE = "ack.exchange";}@Configuration
public class RabbitMQConfig {@Bean("ackQueue")public Queue ackQueue() {return QueueBuilder.durable(Constants.ACK_QUEUE).build();}@Bean("directExchange")public DirectExchange directExchange(){return ExchangeBuilder.directExchange(Constants.ACK_EXCHANGE).build();}@Bean("ackBinding")public Binding ackBinding(@Qualifier("directExchange") DirectExchange directExchange, @Qualifier("ackQueue") Queue ackQueue) {return BindingBuilder.bind(ackQueue).to(directExchange).with("ack");}
}//通过接⼝发送消息
@RequestMapping("/product")
@RestController
public class ProducerController {@Resource(name = "rabbitTemplate")private RabbitTemplate rabbitTemplate;@RequestMapping("/ack")public String ack() {rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE,"ack","consumer ack mode test...");return "发送消息成功";}
}
  1. 消费端逻辑
@Component
public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void handMessage(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();//        System.out.printf("接收到消息:%s,deliveryTag: %d \\n",new String(message.getBody(),"UTF-8"),deliveryTag);
//        System.out.println("业务逻辑处理");
//        System.out.println("业务处理完成");System.out.printf("接收到消息:%s,deliveryTag: %d \\n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());//进行业务逻辑处理System.out.println("业务逻辑处理");int num = 3/0;System.out.println("业务处理完成");}}
  1. 测试

    调⽤接⼝, 发送消息 可以看到队列中有⼀条消息, unacked的为0(需要先把消费者注掉)

开启消费者, 控制台输出:

接收到消息:consumer ack mode test...,deliveryTag: 1 
业务逻辑处理
2025-10-16T23:52:52.854+08:00 ERROR 21371 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=120)
2025-10-16T23:52:53.854+08:00  WARN 21371 --- [ntContainer#0-2] o.s.a.r.l.SimpleMessageListenerContainer : Consumer raised exception, processing can restart if the connection factory supports it

可以看到, 消费者处理失败, 但是消息已经从RabbitMQ中移除。

AUTO模式

  1. 配置确认机制

    spring:rabbitmq:addresses: amqp://admin:admin@localhost:5672/studylistener:simple:acknowledge-mode: auto  #消息接收确认
    
  2. 重新运⾏程序

    调⽤接⼝, 发送消息可以看到队列中有⼀条消息, unacked的为0(需要先把消费者注掉)

  3. 开启消费者, 控制台不断输出错误信息

    接收到消息:consumer ack mode test...,deliveryTag: 8404 
    业务逻辑处理
    2025-10-17T00:49:20.209+08:00  WARN 22159 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed......接收到消息:consumer ack mode test...,deliveryTag: 8405 
    业务逻辑处理
    2025-10-17T00:49:20.209+08:00  WARN 22159 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
    .....接收到消息:consumer ack mode test...,deliveryTag: 8406 
    业务逻辑处理
    .....
    
  • 注意:

    可能生成的错误信息中 deliveryTag 一直都是 1。

    接收到消息:consumer ack mode test...,deliveryTag: 1 
    业务逻辑处理
    2025-10-17T00:42:45.009+08:00 ERROR 22107 --- [ 127.0.0.1:5672] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
    .....接收到消息:consumer ack mode test...,deliveryTag: 1 
    业务逻辑处理
    2025-10-17T00:42:45.009+08:00 ERROR 22107 --- [ 127.0.0.1:5672] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
    .....接收到消息:consumer ack mode test...,deliveryTag: 1 
    

    为什么会出现这种情况呢?

    关键概念回顾:

    1. deliveryTag:RabbitMQ 中,每个通道(Channel)会为发送给消费者的消息分配一个唯一的递增编号(deliveryTag),用于标识消息。同一个 Channel 内的 deliveryTag 是严格递增的(从 1 开始,每次 + 1),不同 Channel 的 deliveryTag 相互独立(各自从 1 开始)。
    2. AUTO 确认模式:在 Spring AMQP 中,ackMode=AUTO时,Spring 会自动处理消息确认:
      • 若消息处理成功(无异常),自动调用channel.basicAck(deliveryTag, false)确认消息;
      • 若处理失败(抛出异常),根据异常类型自动决定拒绝(basicReject)或重新入队。
      • 此时开发者无需手动调用确认方法,否则会导致重复确认

    问题原因分析:

    结合错误信息PRECONDITION_FAILED - unknown delivery tag 1和 “deliveryTag 始终为 1” 的现象,可能的原因有:

    1. 在 AUTO 模式下手动调用了消息确认方法(最可能)

    如果在消息处理逻辑中,手动调用了channel.basicAck()basicNack()basicReject(),会与 Spring AUTO 模式的自动确认冲突:

    • 例如:消息处理成功后,Spring 自动 ack 一次,而你又手动 ack 一次,此时 deliveryTag 已被消费,第二次 ack 会触发 “unknown delivery tag”。
    • 由于重复确认导致通道(Channel)被关闭,Spring 会重建新的 Channel,新 Channel 的 deliveryTag 从 1 重新开始,因此每次接收消息的 deliveryTag 都是 1。

    2. 通道(Channel)被频繁重建

    deliveryTag 的递增依赖于同一个 Channel。若 Channel 因异常(如上述重复确认)被关闭,Spring 会自动重建新的 Channel,新 Channel 的 deliveryTag 会从 1 重新计数,表现为 “deliveryTag 不增”。

    常见导致 Channel 重建的原因:

    • 手动确认与自动确认冲突(如上述);
    • 消息处理超时(超过channel.basicQos()设置的预取窗口);
    • 网络波动导致 Channel 断开重连。

    3. 消费者配置错误(确认模式实际非 AUTO)

    若配置中 ackMode 被错误设置为MANUAL(手动确认),但你误以为是 AUTO,且未手动确认消息,会导致 RabbitMQ 服务器累积未确认消息,最终触发 Channel 关闭,重建后 deliveryTag 重置。

    解决步骤:

    1. 检查并移除手动确认代码

    查看你的消息监听方法(如@RabbitListener标注的方法),是否有获取Channel并调用确认相关方法的代码。例如

    @Component
    public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void handMessage(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//消费者逻辑System.out.printf("接收到消息:%s,deliveryTag: %d \\n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());//进行业务逻辑处理System.out.println("业务逻辑处理");int num = 3/0;System.out.println("业务处理完成");//肯定确认channel.basicAck(deliveryTag,false);}catch (Exception e){//否定确认channel.basicNack(deliveryTag,false,true);}}}
    

    修复:删除channel.basicAck()basicNack()等手动确认代码,仅保留业务逻辑:

    @Component
    public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void handMessage(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("接收到消息:%s,deliveryTag: %d \\n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());//进行业务逻辑处理System.out.println("业务逻辑处理");int num = 3/0;System.out.println("业务处理完成");
    }
    

    2. 确认消费者的 ackMode 确实为 AUTO

    检查容器工厂配置,确保 ackMode 未被修改为 MANUAL。默认情况下,Spring Boot 的SimpleRabbitListenerContainerFactory的 ackMode 是AUTO,若有自定义配置,需显式确认:

    spring:rabbitmq:addresses: amqp://admin:admin@localhost:5672/studylistener:simple:acknowledge-mode: auto  #消息接收确认
    

MANUAL模式

  1. 配置确认机制

    spring:rabbitmq:addresses: amqp://admin:admin@localhost:5672/studylistener:simple:acknowledge-mode: manual
    
  2. 消费端⼿动确认逻辑

    @Component
    public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void handMessage(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//消费者逻辑System.out.printf("接收到消息:%s,deliveryTag: %d \\n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());//进行业务逻辑处理System.out.println("业务逻辑处理");//int num = 3/0;System.out.println("业务处理完成");//肯定确认channel.basicAck(deliveryTag,false);}catch (Exception e){//否定确认channel.basicNack(deliveryTag,false,true);}}
    }
    

    这个代码运⾏的结果是正常的, 运⾏后消息会被签收: Ready为0, unacked为0

    接收到消息:consumer ack mode test...,deliveryTag: 1 
    业务逻辑处理
    业务处理完成
    接收到消息:consumer ack mode test...,deliveryTag: 2 
    业务逻辑处理
    业务处理完成
    
  3. 异常时拒绝签收

    @Component
    public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void handMessage(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//消费者逻辑System.out.printf("接收到消息:%s,deliveryTag: %d \\n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());//进行业务逻辑处理System.out.println("业务逻辑处理");int num = 3/0;System.out.println("业务处理完成");//肯定确认channel.basicAck(deliveryTag,false);}catch (Exception e){//否定确认channel.basicNack(deliveryTag,false,true);}}
    }
    

    运⾏结果: 消费异常时不断重试, deliveryTag 从1递增

    接收到消息:consumer ack mode test...,deliveryTag: 1 
    业务逻辑处理
    接收到消息:consumer ack mode test...,deliveryTag: 2 
    业务逻辑处理
    接收到消息:consumer ack mode test...,deliveryTag: 3 
    业务逻辑处理
    接收到消息:consumer ack mode test...,deliveryTag: 4 
    业务逻辑处理
    接收到消息:consumer ack mode test...,deliveryTag: 5 
    业务逻辑处理
    接收到消息:consumer ack mode test...,deliveryTag: 6 
    业务逻辑处理
    接收到消息:consumer ack mode test...,deliveryTag: 7 
    

  • 失败为什么ready是1,unacked是0,不因该是unacked是1,ready是0嘛

    要理解这种现象,需先明确 RabbitMQ 中 Ready 和 Unacked 的定义,以及消息在 “消费失败” 时的流转逻辑:

    1. 核心状态定义

    • Ready:队列中待投递的消息数量(还没被消费者取走的消息)。
    • Unacked:已经被消费者成功取走,但消费者尚未确认(既没发 ack,也没发 nack/reject)的消息数量。

    2. “失败却 Ready=1,Unacked=0” 的可能原因

    出现这种情况,说明消息没有停留在 “已投递但未确认” 状态,而是重新回到了 “待投递” 队列。常见原因有以下几种:

    原因 1:消费者根本没收到消息

    如果消费者配置有误(如队列绑定错误、消费者未启动、权限不足等),消息会一直留在队列中,处于 Ready 状态,从未被投递到消费者 —— 自然 Unacked=0

    原因 2:消息被投递后,因 “自动确认 / 拒绝” 重新入队

    若消费者使用 自动确认(AcknowledgeMode.AUTO) 模式:

    • 当消息处理抛出异常时,Spring AMQP 会自动拒绝消息,并将其重新入队(回到 Ready 状态)。
    • 此时,消息短暂进入 Unacked(投递到消费者时),但因异常被自动拒绝 + 重新入队,最终 Unacked 归零,Ready 加 1。

    原因 3:手动确认时,显式将消息重新入队

    若消费者使用 手动确认(AcknowledgeMode.MANUAL) 模式,但代码中处理失败时调用了 basicNack 并设置 requeue=true

    channel.basicNack(deliveryTag, false, true); // 最后一个参数 requeue=true 表示“重新入队”
    
    • 消息被消费者取走后,进入 Unacked
    • 但因 requeue=true,拒绝后消息会重新回到队列,变为 Ready,因此 Unacked 归零,Ready 加 1。

    原因 4:投递失败,消息直接回队

    若消费者连接瞬间断开(如重启、网络闪断),RabbitMQ 投递消息失败,消息会直接回到 Ready 队列,不会进入 Unacked(因为投递本身没成功)。

    3. 如何让 Unacked=1,Ready=0

    要达到 Unacked=1,Ready=0,需要满足:消息被消费者成功投递(进入 Unacked),但消费者既不确认(ack),也不拒绝并重新入队(nack(requeue=false)),此时消息会停留在 “已投递但未确认” 状态,表现为 Unacked=1,Ready=0

    例如,手动确认模式下,代码中既不调用 basicAck,也不调用 basicNack/basicReject,且消费者正常运行(没断开),消息就会一直处于 Unacked 状态。

http://www.dtcms.com/a/532509.html

相关文章:

  • [sam2图像分割] mask_decoder | TwoWayTransformer
  • 京东面试题解析:SSO、Token与Redis交互、Dubbo负载均衡等
  • 网站建设哪家效益快做百度推广网站排名
  • RabbitMQ -- 高级特性
  • 克隆网站后台asp.net 网站数据库
  • 零基础新手小白快速了解掌握服务集群与自动化运维(十S四)储存服务-Ceph储存
  • 土壤侵蚀相关
  • 花卉网站建设规划书平台推广计划书模板范文
  • 如何使用C#编写DbContext与数据库连接
  • 从一到无穷大 #52:Lakehouse 不适用时序?打破范式 —— Catalog 架构选型复盘
  • 机器学习 (1) 监督学习
  • 从哪里找网络推广公司网站优化 毕业设计
  • Java如何将数据写入到PDF文件
  • 开发板网络配置
  • 14天备考软考-day1: 计组、操作系统(仅自用)
  • 企业网站模板包含什么有什么软件可以做网站
  • .gitignore 不生效问题——删除错误追踪的文件
  • 深度学习优化器详解
  • 做企业公示的数字证书网站wordpress有识图接口吗
  • 中国商标注册申请官网百度蜘蛛池自动收录seo
  • GitHub 热榜项目 - 日榜(2025-10-26)
  • 数据分析:指标拆解、异动归因类题目
  • 做网站需要那些软件设计建网站
  • Gorm(十二)乐观锁和悲观锁
  • neo4j图数据库笔记
  • 网页网站设计公司有哪些网站排名有什么用
  • 泉州做网站优化哪家好微信推广平台哪里找
  • 如何制作收费网站百度收录个人网站是什么怎么做
  • VsCode + Wsl:终极开发环境搭建指南
  • 深度学习——Logistic回归中的梯度下降法