RabbitMQ—消息可靠性保证
上篇文章:
RabbitMQ工作模式https://blog.csdn.net/sniper_fandc/article/details/149310903?fromshare=blogdetail&sharetype=blogdetail&sharerId=149310903&sharerefer=PC&sharesource=sniper_fandc&sharefrom=from_link
目录
1 消息确认机制
1.1 介绍
1.2 手动确认
1.3 案例演示
(1)AcknowledgeMode.NONE
(2)AcknowledgeMode.AUTO(默认)
(3)AcknowledgeMode.MANUAL
2 持久化
2.1 介绍
2.2 交换机持久化
2.3 队列持久化
2.4 消息持久化
3 发送方确认
3.1 介绍
3.2 confirm确认模式
3.3 return退回模式
4 重试机制
4.1 介绍
4.2 案例演示
5 总结
(1)消息从生产者到Broker(存入Queue才认为存到RabbitMQ中)
(2)Broker内部问题(消息队列数据丢失)
(3)消息从Queue到消费者
1 消息确认机制
1.1 介绍
如果消息到达消费者,消费者可能成功处理,也可能处理异常,RabbitMQ如何保证消息成功到达消费者并成功处理?
RabbitMQ提供了消息确认机制,专门用来保证RabbitMQ到消费者之间的消息可靠性。消息确认机制由参数autoACK来决定:
autoACK=true:自动确认,当RabbitMQ发送消息后会认为消息成功发送,自动把发出去的消息进行确认,然后移除消息。适合消息可靠性要求不高的场景。
autoACK=false:手动确认,当RabbitMQ发送消息后会等待消费者手动调用Basic.Ack命令,只有RabbitMQ收到消费者的确认才会认为消息发送成功,然后移除消息。适合消息可靠性要求较高的场景。
1.2 手动确认
当autoACK=false是表示手动确认,此时Queue中的消息类型分为两类:等待消费的消息和已经到达消费者但还未收到确认的消息。如果还未确认的消息长时间未确认,RabbitMQ就会认为消费者到RabbitMQ的连接断了,此时会进行重新投递(重新入队,当然这个也是进行选择是否开启才能生效)。
关于手动确认在消费者代码实现中有三种策略:
(1)肯定确认Channel.basicAck(long deliveryTag, boolean multiple)
基于命令Basic.Ack实现,当确认发送给RabbitMQ时,RabbitMQ认为消息确认成功,从队列中删除消息。
deliveryTag:消息唯一ID,同一个通道内单调递增(64位长整型),通道间的消息唯一ID相互独立。
multiple:是否批量确认,true表示针对该deliveryTag以前的所有消息都进行确认;false表示只确认该deliveryTag的消息。
(2)否定确认Channel.basicReject(long deliveryTag, boolean requeue)
基于命令Basic.Reject实现,当确认发送给RabbitMQ时,RabbitMQ认为消费者拒绝该消息,根据requeue做出不同选择。
requeue:是否重新入队,true表示被拒绝的deliveryTag消息重新入队(重新等待消费);false表示不重新入队,即丢弃该消息。
(3)否定确认Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)
基于命令Basic.Nack实现,是basicReject()的升级版,可以批量拒绝消息,拒绝后RabbitMQ如何处理该消息还是看参数requeue。
multiple:是否批量拒绝,true表示针对该deliveryTag以前的所有消息都进行拒绝;false表示只拒绝该deliveryTag的消息。
1.3 案例演示
在SpringBoot中,上述的消息确认机制被进一步封装变为了如下三种策略:None、Auto和Manual。
None是自动确认。Auto是默认策略,介于自动确认和手动确认(肯定确认)之间,消息处理成功会自动确认,处理过程发生异常则不会确认(消息不会被丢弃)。Manual是手动确认,且消息处理失败或长时间无确认消息不回丢失,都会重新入队。
相关配置的通用代码(队列名称、声明队列,使用默认的交换机):
public class RabbitMQConnection {public static final String ACK_QUEUE = "ack.queue";}
@Configurationpublic class RabbitMQConfig {@Bean("ackQueue")public Queue ackQueue(){return QueueBuilder.durable(RabbitMQConnection.ACK_QUEUE).build();}}
生产者代码:
@RestController@RequestMapping("/producer")public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("ack")public String ack(){rabbitTemplate.convertAndSend("", RabbitMQConnection.ACK_QUEUE,"Hello SpringBoot RabbitMQ");return "发送成功";}}
消费者代码:
@Componentpublic class AckListener {@RabbitListener(queues = RabbitMQConnection.ACK_QUEUE)public void queueListener(Message message, Channel channel) throws UnsupportedEncodingException {System.out.printf("listener ["+RabbitMQConnection.ACK_QUEUE+"]收到消息:%s, deliveryTag:%d \n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());int num = 1/0;System.out.println("消息处理完成");}}
(1)AcknowledgeMode.NONE
配置文件:
spring:rabbitmq:addresses: amqp://admin:admin@192.168.217.150:5672/testVirtuallistener:simple:acknowledge-mode: none
在这种策略下,消息发送到消费者后,由于消费者处理消息过程中发生异常,因此最终RabbitMQ中的消息被丢弃:
(2)AcknowledgeMode.AUTO(默认)
配置文件:
spring:rabbitmq:addresses: amqp://admin:admin@192.168.217.150:5672/testVirtuallistener:simple:acknowledge-mode: auto
在这种策略下,如果消息成功处理,则RabbitMQ进行自动确认(消息被确认然后丢弃);如果处理过程出现异常,RabbitMQ会不断重发消息,导致消息积压(deliveryTag一直在增加),而消息并没有被丢弃,处于Unacked的状态(没有确认)。
(3)AcknowledgeMode.MANUAL
配置文件:
spring:rabbitmq:addresses: amqp://admin:admin@192.168.217.150:5672/testVirtuallistener:simple:acknowledge-mode: manual
如果是该策略,则需要显示手动确认,修改后的消费者代码如下:
@Componentpublic class AckListener {@RabbitListener(queues = RabbitMQConnection.ACK_QUEUE)public void queueListener(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.printf("listener ["+RabbitMQConnection.ACK_QUEUE+"]收到消息:%s, deliveryTag:%d \n",new String(message.getBody(),"UTF-8"),deliveryTag);int num = 1/0;System.out.println("消息处理完成");channel.basicAck(deliveryTag,false);} catch (Exception e) {channel.basicNack(deliveryTag,false,true);}}}
当没有发生异常(请把1/0注释掉),消息被成功处理,RabbitMQ收到手动确认(肯定确认)后,就把消息确认后清除了:
当消息处理过程发生异常(请把1/0恢复),消息没有被成功处理,RabbitMQ收到手动确认(否定确认)后,由于requeue是true,因此消息又被重新入队然后重新发送消息,也就是下面的不断重试的情况(此时是在Ready和Unacked状态反复跳转,但是页面由于是定时刷新,因此无法正常显示):
当消息处理过程发生异常(请把1/0恢复),消息没有被成功处理,RabbitMQ收到手动确认(否定确认)后,如果requeue是false,消息就不会重新入队,直接被丢弃(下面程序并没有打印消息处理完毕日志):
注意:手动确认就意味着无论消息是成功处理还是失败处理,都需要显示调用确认的方法。成功调用basicAck(),失败调用basicNack()或basicReject()。如果没有手工调用确认的方法,消息就会处于Unacked的状态(Unacked状态表示没有确认,即肯定确认和否定确认都没有)。
2 持久化
2.1 介绍
持久化也是保证消息可靠性的一种机制,消息丢失或异常不只可能发生在生产者到RabbitMQ和RabbitMQ到消费者之间,也可能发生在RabbitMQ内部。从消息进入到RabbitMQ中,还需要经过交换机、队列,这些过程也可能发生消息丢失,因此就需要持久化保证消息可靠性。
持久化有三个部分:
2.2 交换机持久化
保证交换机的属性配置不因为RabbitMQ异常而丢失,如果设置持久化RabbitMQ重启后交换机就不会丢失(元数据不会丢失),相当于交换机一直存在。
默认是持久化的,在ExchangeBuilder的build()方法中durable参数默认为true,即持久化。而如果要手动设置通过在声明交换机时durable()方法来设置:
ExchangeBuilder.XXXExchange(交换机名称).durable(true).build();//持久化ExchangeBuilder.XXXExchange(交换机名称).durable(true).build();//非持久化
2.3 队列持久化
保证队列的属性配置不因为RabbitMQ异常而丢失,如果设置持久化RabbitMQ重启后队列就不会丢失(元数据不会丢失),相当于队列一直存在(如果队列消失,消息也就丢失了)。
默认是持久化的,在QueueBuilder的durable()方法中durable参数默认为true,即持久化。而如果要手动设置通过声明队列时durable()方法或nonDurable()方法来设置:
QueueBuilder.durable(队列名称).build();//持久化QueueBuilder.nonDurable(队列名称).build();//非持久化
2.4 消息持久化
保证消息在RabbitMQ中不会因为异常而丢失,如果只设置消息持久化没有设置队列持久化,消息可靠性仍不能保证(因为队列没了消息就没了)。主要是通过MessageProperties的deliveryMode属性来设置,设置方式如下:
channel.basicPublish(交换机名称,队列名称,null,msg.getBytes());//RabbitMQ的Java包的消息非持久化设置channel.basicPublish(交换机名称,队列名称,MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());//RabbitMQ的Java包的消息持久化设置
MessageProperties.PERSISTENT_TEXT_PLAIN是该属性的封装,deliveryMode为2表示持久化PERSISTENT(1表示非持久化NON_PERSISTENT),实现代码如下(也可以自己手动定义BasicProperties对象来实现):
public static final BasicProperties PERSISTENT_TEXT_PLAIN =new BasicProperties("text/plain",null,null,2, //deliveryMode0, null, null, null,null, null, null, null,null, null);
如果是SpringBoot中使用RabbitTemplate来发送消息,在convertAndSend()方法的源码中,convertMessageIfNecessary()会把object封装成Message(默认持久化):
public void convertAndSend(String exchange, String routingKey, Object object, @Nullable CorrelationData correlationData) throws AmqpException {this.send(exchange, routingKey, this.convertMessageIfNecessary(object), correlationData);}
因此可以手动封装发送的消息为Message,从而设置为消息是否持久化:
// 要发送的消息内容String message = "This is a persistent message";// 创建一个Message对象设置为持久化Message messageObject = new Message(message.getBytes(), newMessageProperties());messageObject.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);// 使用RabbitTemplate发送消息rabbitTemplate.convertAndSend(交换机名称, routingKey, messageObject);
注意:三种持久化方式是否能100%确保消息可靠性?不能,1.如果消费者接收到消息还未及时处理就宕机,消息也就丢失了(解决办法:手动确认)。2.消息进入RabbitMQ后,如果为每一条消息都持久化(调用内核的fsync方法)就会非常耗时。因此消息会先进入OS的缓冲区,待缓冲区满时再统一进行持久化,虽然这个时间很短,但是这期间如果RabbitMQ宕机,消息还为持久化到磁盘上,也会丢失(解决办法:(1)仲裁队列,主从结构。(2)发送方确认)。
3 发送方确认
3.1 介绍
实际上就是发布确认模式(Publisher Confirm),用于确保消息从生产者正确的存入RabbitMQ(也可以用事务机制解决,但是比较消耗性能)。
发送方确认包含两个部分:confirm确认模式和return退回模式。
3.2 confirm确认模式
confirm确认模式工作在生产者的消息发送到Exchange上的过程,通过对生产者设置监听ConfirmCallback(包含回调方法confirm()),无论消息是否到达Exchange,该回调方法都会执行。如果消息到达Exchange,ACK(Acknowledge character确认字符)为true;如果消息未到达Exchange,ACK为true。
相关配置文件:
spring:rabbitmq:addresses: amqp://admin:admin@192.168.217.150:5672/testVirtuallistener:simple:acknowledge-mode: manual #消息接收确认(MQ-消费者):none(自动确认)、auto(正常自动确认,异常不确认)、manual(手动确认)publisher-confirm-type: correlated #消息发送确认(生产者-MQ)
队列、交换机声明:
public class RabbitMQConnection {public static final String CONFIRM_QUEUE = "confirm.queue";public static final String CONFIRM_EXCHANGE = "confirm.exchange";}
@Configurationpublic class RabbitMQConfig {@Bean("confirmQueue")public Queue confirmQueue(){return QueueBuilder.durable(RabbitMQConnection.CONFIRM_QUEUE).build();}@Bean("confirmExchange")public DirectExchange confirmExchange(){return ExchangeBuilder.directExchange(RabbitMQConnection.CONFIRM_EXCHANGE).durable(true).build();}@Bean("confirmQueueBinding")public Binding confirmQueueBinding(@Qualifier("confirmExchange") DirectExchange directExchange, @Qualifier("confirmQueue") Queue queue){return BindingBuilder.bind(queue).to(directExchange).with("confirm");}}
设置回调方法:
@Configurationpublic class RabbitTemplateConfig {//此处必须设置两个rabbitTemplate(一个Spring提供的,一个自定义的)//这样做防止Spring的rabbitTemplate是单例导致注入多个同类型对象实际上注入的是自定义对象//自定义设置rabbitTemplate会影响到其它接口@Bean("rabbitTemplate")public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){return new RabbitTemplate(connectionFactory);}//这个设置rabbitTemplate的逻辑不能放到接口方法中//否则重复调用接口就会重复设置rabbitTemplate(这是不允许的)@Bean("confirmRabbitTemplate")public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {// correlationData:回调关联数据(可以自定义一些消息的额外信息,比如id)// b:ack// s:错误原因@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {System.out.println("执行confirm()");if(b){System.out.printf("消息接收成功,消息ID:%s \n",correlationData==null?null:correlationData.getId());}else{System.out.printf("消息接收失败,消息ID:%s,原因:%s \n",correlationData==null?null:correlationData.getId(),s);//消息失败后的一些其它处理}}});return rabbitTemplate;}}
上述代码有两点注意事项:
1.为什么设置回调函数不直接放到接口中使用注入的rabbitTemplate对象来进行设置?由于Spring注入的Bean是单例,因此如果在某个接口对rabbitTemplate进行设置,就会导致其它接口用到rabbitTemplate都会受到影响,从而影响其它接口的原有执行逻辑和目的。其次是重复执行接口会重复设置rabbitTemplate,该对象的设置只允许一次不允许多次。
2.那为什么还需要再重新定义一个rabbitTemplate对象,直接使用Spring自带的不行吗?不行,由于Spring在加载Bean的时候,如果同一类型的Bean已经找到有一个(即自己定义的Bean:confirmRabbitTemplate),那么Spring就不会再创建Bean(Spring自带的Bean:rabbitTemplate)。即使使用@Autowired注入两个名称不同但是类型相同的Bean,由于@Autowired先按类型匹配,找到confirmRabbitTemplate这个Bean,于是后续同一类型Bean有多个按名称找的逻辑就不执行了,因此两次注入实际上注入的都是confirmRabbitTemplate。
生产者:
@RestController@RequestMapping("/producer")public class ProducerController {//其它接口使用的rabbitTemplate@Resource(name = "rabbitTemplate")private RabbitTemplate rabbitTemplate;//推荐使用@Resource按照Bean名称注入@Resource(name = "confirmRabbitTemplate")private RabbitTemplate confirmRabbitTemplate;@RequestMapping("confirm")public String confirm(){CorrelationData correlationData = new CorrelationData("1");confirmRabbitTemplate.convertAndSend(RabbitMQConnection.CONFIRM_EXCHANGE, "confirm","Hello SpringBoot RabbitMQ",correlationData);return "发送成功";}}
由于定义了两个Bean,因此注入的时候建议按照@Resource按名称注入,也可以使用@Autowired搭配@Qualifier()注解按名称注入。
代码运行结果如下:
假设此时没有找到交换机Exchange,即修改交换机名称为不存在的交换机:
@RequestMapping("confirm")public String confirm(){CorrelationData correlationData = new CorrelationData("1");confirmRabbitTemplate.convertAndSend(RabbitMQConnection.CONFIRM_EXCHANGE+1, "confirm","Hello SpringBoot RabbitMQ",correlationData);return "发送成功";}
此时回调函数就会执行ACK为false的逻辑:消息接收失败,消息ID:1,原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'confirm.exchange1' in vhost 'testVirtual', class-id=60, method-id=40) :
假设消息正确传到交换机上,但是交换机没有找到Queue(修改routingKey为不匹配的值),此时虽然返回消息接收成功,但是队列中实际上并没有消息:
因此confirm只能保证生产者-Exchange的消息可靠性,不能保证Exchange-Queue的消息可靠性。要保证Exchange-Queue的消息可靠性,需要靠return退回模式:
3.3 return退回模式
return退回模式工作在消息从Exchange路由到对应Queue的过程,如果消息不能被正确路由到Queue(routingKey不匹配或Queue不存在),则会把消息退回给生产者。设置回调方法ReturnsCallback的returnedMessage(),当消息退回时回调方法会通知生产者。
注意:confirm和return两种模式并不是互斥的,而是可以结合使用的。因为如果只使用confirm,那消息是否能被路由到Queue也是未知的。
return模式的代码大部分和confirm一样,唯一不同的是要设置rabbitTemplate:
@Bean("confirmRabbitTemplate")public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);//设置confirm模式rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {// correlationData:回调关联数据(可以自定义一些消息的额外信息,比如id)// b:ack// s:错误原因@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {System.out.println("执行confirm()");if (b) {System.out.printf("消息接收成功,消息ID:%s \n", correlationData == null ? null : correlationData.getId());} else {System.out.printf("消息接收失败,消息ID:%s,原因:%s \n", correlationData == null ? null : correlationData.getId(), s);//消息失败后的一些其它处理}}});//设置return模式rabbitTemplate.setMandatory(true);//这个必须设置为true,才能告诉RabbitMQ要退回消息并执行回调方法rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {System.out.printf("消息被退回: %s", returned);}});return rabbitTemplate;}
关于回调方法参数ReturnedMessage,其属性如下:
public class ReturnedMessage {//返回的消息对象,包含了消息体和消息属性private final Message message;//由Broker提供的回复码, 表示消息无法路由的原因, 通常是一个数字代码,每个数字代表不同的含义private final int replyCode;//一个文本字符串, 提供了无法路由消息的额外信息或错误描述private final String replyText;//消息被发送到的交换机名称private final String exchange;//消息的路由键,即发送消息时指定的键private final String routingKey;}
如果此时修改routingKey为不匹配的值,结果如下:
消息被退回: ReturnedMessage [message=(Body:'Hello SpringBoot RabbitMQ' MessageProperties [headers={spring_returned_message_correlation=1}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), replyCode=312, replyText=NO_ROUTE, exchange=confirm.exchange, routingKey=confirm1]执行confirm()。
4 重试机制
4.1 介绍
当SpringBoot中开启auto级别(默认级别,消息处理成功自动确认,处理发生异常重试)的消息确认机制时,如果消息处理发生异常(网络问题或程序逻辑问题),此时RabbitMQ提供重试机制,不断重试向消费者发送消息。如果是网络问题,重试就可以解决问题;如果是程序逻辑问题,重试解决不了问题,可以设置最大重试次数。
4.2 案例演示
配置文件:
spring:rabbitmq:addresses: amqp://admin:admin@192.168.217.150:5672/testVirtuallistener:simple:acknowledge-mode: auto #消息接收确认(MQ-消费者):none(自动确认)、auto(正常自动确认,异常不确认)、manual(手动确认)retry:enabled: true #开启消费者失败重试initial-interval: 5000ms #初始失败等待时长为5秒max-attempts: 5 # 最大重试次数(包括自身消费的第一次)
队列、交换机名称和声明:
public class RabbitMQConnection {public static final String RETRY_QUEUE = "retry.queue";public static final String RETRY_EXCHANGE = "retry.exchange";}
@Configurationpublic class RabbitMQConfig {@Bean("retryQueue")public Queue retryQueue(){return QueueBuilder.durable(RabbitMQConnection.RETRY_QUEUE).build();}@Bean("retryExchange")public DirectExchange retryExchange(){return ExchangeBuilder.directExchange(RabbitMQConnection.RETRY_EXCHANGE).durable(true).build();}@Bean("retryQueueBinding")public Binding retryQueueBinding(@Qualifier("retryExchange") DirectExchange directExchange, @Qualifier("retryQueue") Queue queue){return BindingBuilder.bind(queue).to(directExchange).with("retry");}}
生产者:
@RestController@RequestMapping("/producer")public class ProducerController {@Resource(name = "rabbitTemplate")private RabbitTemplate rabbitTemplate;@RequestMapping("retry")public String retry(){rabbitTemplate.convertAndSend(RabbitMQConnection.RETRY_EXCHANGE, "retry","Hello SpringBoot RabbitMQ");return "发送成功";}}
消费者:
@Componentpublic class RetryListener {@RabbitListener(queues = RabbitMQConnection.RETRY_QUEUE)public void queueListener(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("listener [" + RabbitMQConnection.RETRY_QUEUE + "]收到消息:%s, deliveryTag:%d \n",new String(message.getBody(), "UTF-8"),deliveryTag);int num = 1 / 0;System.out.println("消息处理完成");}}
程序执行结果如下:
可以发现,消息重试5次后就不再重试,代码抛出异常。如果把消息确认机制的auto级别改为manual级别,且设置requeue为true,结果如下:
根据两种比较结果可以得出两个结论:
1.重试设置只有在auto级别才能生效,manual级别即使配置了重试也不会生效。
2.manual级别的结果表面看也在不断“重试”,但是deliveryTag在增加,这实际上并不是重试,而是是重新入队重新发送消息的结果(重新入队就会重新分配deliveryTag)。而auto级别的重试deliveryTag并没有增加,证明是对原来那条消息反复重试。
5 总结
如何保证消息可靠性(确保消息不丢失)?
(1)消息从生产者到Broker(存入Queue才认为存到RabbitMQ中)
发布方确认模式确保消息可靠性,包括confirm模式和return模式。
a)消息从生产者到交换机
可能原因:网络问题、代码配置交换机名字出错等。
解决方案:confirm模式(通过回调方法来通知生产者ACK)。
b)消息从交换机到队列
可能原因:代码配置routingKey或队列名字出错等。
解决方案:return模式(通过回调方法来退回消息给生产者)。
(2)Broker内部问题(消息队列数据丢失)
可能原因:RabbitMQ宕机导致消息丢失等。
解决方案:持久化(Exchange持久化保证重启后Exchange不会消失,Queue持久化保证重启后Queue不会消失,消息持久化保证消息持久化到磁盘),需要注意Queue持久化和消息持久化往往需要同时保证才能消息不丢失。
存在问题:不能100%保证消息不会丢失。持久化性能开销大(采用缓冲区),如果消息在缓冲区还未持久化到磁盘,出现RabbitMQ宕机也会导致消息丢失(使用集群方式保证消息可靠性)。
(3)消息从Queue到消费者
可能原因:消费者收到消息还未处理消费者就宕机或处理过程发生异常等。
解决方案:消息确认机制(自动确认和手动确认),自动确认(消息一经发出RabbitMQ就认为消息能到消费者于是进行确认并清除消息,默认auto级别的自动确认),如果需要保证消息的高可靠性需要开启手动确认(消费者显示调用肯定确认或否定确认方法来通知RabbitMQ,RabbitMQ根据手动确认的参数来决定确认成功、重新入队或丢弃)。还可以考虑重试机制确保消息可靠性。
下篇文章: