RabbitMQ学习(第二天)
文章目录
- 1、生产者可靠性
- ①、生产者重连
- ②、生产者确认
- 2、MQ可靠性
- ①、数据持久化
- ②、LazyQueue(惰性队列)
- 3、消费者可靠性
- ①、消费者确认
- ②、失败重试机制
- ③、保证业务幂等性
- 总结
之前的学习中,熟悉了java中搭建和操作RabbitMQ发送接收消息,熟悉使用之后,重点要关注一下面试中常考的点,以及工作经常遇到的问题。今天的学习主要从保证消息可靠性出发,保证可靠性分三部分,分别是生产者可靠性、MQ可靠性、消费者可靠性三部分出发。
1、生产者可靠性
①、生产者重连
有时候因为网络波动,可能导致客户端连接MQ失败,通过配置可以开启连接失败后的重试机制。
yml配置文件:
spring:rabbitmq:connection-timeout: 1s # 连接MQ的连接超时间template:retry:enabled: true # 开启模板的重试机制initial-interval: 1000ms # 失败后的初始等待时间multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multipliermax-attempts: 3 # 最大重试次数
我们故意让它连接不上,结果如下:
当网络不稳定的时刻,利用重试机制可以有数据显示消息发送的成功率。不过SpringAMQP重试的重复机制是
阻塞式的重试,也就是说多次重试等待的过程是中,当前线程是被阻塞的,会影响到业务性能。
如果对于业务性能有要求,建议参考用重试机制。如果一定需要使用,请合理配置重试等待时长和重试次数,当然也
可以考虑异步用异步多线程来执行发送消息的代码。
②、生产者确认
RabbitMQ有Publisher Confirm和Publisher Return两种确认机制。开启确认机制后,在MQ成功收到消息后会返回确认消息给生产者。返回的结果有以下几种情况:
- 消息投递到了MQ,但是路由失败。此时会通过PublisherReturn返回路由异常原因,然后返回ACK,告知投递成功
- 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
- 持久消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
- 其它情况都会返回NACK,告知投递失败
spring:rabbitmq:publisher-confirm-type: correlated # 开启publisher confirm机制,异步confirm类型publisher-returns: true # 开启publisher return机制
publisher-confim:
这里设置 correlated 表示MQ异步回调方式返回回执消息
**publisher-return: **
开启后,会返回路由失败消息。
@Slf4j
@Configuration
public class CommonConfig {@ResourceRabbitTemplate rabbitTemplate;@PostConstructpublic void setRabbitTemplate() {rabbitTemplate.setReturnsCallback(returnedMessage -> {log.error("收到消息的return callback: msg:{} exchange:{}, test:{}, key:{}, code:{}",returnedMessage.getMessage(), returnedMessage.getExchange(), returnedMessage.getReplyText(),returnedMessage.getRoutingKey(), returnedMessage.getReplyCode());});}
}
我们定义一个config类,里面添加一个处理publish-return消息的处理,
测试代码:
@Testpublic void testConfirmCallBack() throws InterruptedException {CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable ex) {log.debug("消息回调失败", ex);}@Overridepublic void onSuccess(CorrelationData.Confirm result) {log.debug("收到confim callback回执");if(result.isAck()) {log.debug("消息发送成功,收到ack");} else{log.debug("消息发送失败,收到nack, 原因:{}", result.getReason());}}});rabbitTemplate.convertAndSend("amq.direct", "red2", "hello", cd);Thread.sleep(2000);}
这里我们测试三种情况:
现有正确的交换机为 amq.direct,routingKey为red,cd逻辑如代码所示。
①、交换机与routingKey均正确,运行结果:
收到confim callback回执
消息发送成功,收到ack
②、交换机正确,routingKey不正确,运行结果:
收到confim callback回执
消息发送成功,收到ack
收到消息的return callback: msg:(Body:'"hello"' MessageProperties [headers={spring_returned_message_correlation=0a8a70fd-a66b-43a3-a681-324e46db7b79, __TypeId__=java.lang.String}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]) exchange:amq.direct, test:NO_ROUTE, key:red2, code:312
③、交换机不正确,运行结果:
收到confim callback回执
消息发送失败,收到nack, 原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'amq.direct.123' in vhost '/', class-id=60, method-id=40)
关于生产者确认消息要会的几点:
-
生产者确认需要额外的网络和系统资源开销,尽量不要使用
-
如果一定要使用,无需开启Publisher-Return机制,因为一般路由失败是自己业务问题
-
对于nack消息可以有限次数重试,依然失败则记录异常消息
2、MQ可靠性
MO通常将消息保存在内存中,这样可以降低收发消息的延迟,但是会有一些问题:
- 最常见的问题就是可能因为各种原因宕机重启,而这极大可能会导致消息丢失。
- 消息堆积在内存中,如果消费者出故障,导致消息积压,引发MQ阻塞。
①、数据持久化
持久化分三部分:
- 交换机持久化
- 队列持久化,
- 消息持久化
前两个的话,Spring中创建时默认将队列和交换机都创建为持久化,当然,手动设置也可以,设置为Durable:
消息持久化也可以设置:
我们向他发送了1百万条非持久化消息:
可以看到其中凹下去的地方就是消息积压导致的。
发送了1百万条持久化消息:
可以看到此时不存在消息积压。
每次下降的时候,是在将数据写入到磁盘中,但是不会出现宕机为0的情况。
②、LazyQueue(惰性队列)
惰性队列的特征如下:
-
接收到消息后直接存入磁盘而非内存(内存中只保留最近的消息)
-
消费者要消费消息时才会从磁盘中读取并加载到内存
-
支持数百万条的消息存储
-
在3.12版本后,所有队列都是Lazy Queue模式,无法更改。
控制台创建:
spring中创建:
@Bean
public Queue lazyQueue() {return QueueBuilder.durable("lazy.queue").lazy() // 开启Lazy模式.build();
}@RabbitListener(queuesToDeclare = @Queue(name = "lazy.queue",durable = "true",arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg) {log.info("接收到 lazy.queue 的消息: {}", msg);
}
向lazy.queue发送1百万条非持久消息:
可以看到,性能特别好,因为数据都是直接通过page out到磁盘。
3、消费者可靠性
①、消费者确认
为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:
ack:成功处理消息,RabbitMQ从队列中删除该消息
nack:消息处理失败,RabbitMQ需要再次投递消息
reject:消息处理失败并且拒绝该消息,RabbitMQ从队列中删除消息
我们在Spring中配置,auto代表Spring帮我们自动处理这三种情况:
spring:rabbitmq:simple:acknowledge-mode: auto
一般都是直接配置auto。
但是这样就会导致一个问题,如果程序中有异常,那么mq一直重试,这样的话,会导致资源的消耗,对系统造成大的压力,因此我们后面引入了失败重试机制。
②、失败重试机制
配置以下yml内容可以开启失败重试机制:
spring:rabbitmq:listener:simple:prefetch: 1retry:enabled: true #开启失败重试机制initial-interval: 1000ms #初始的失败重试等待时长为1秒multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multipliermax-attempts: 3stateless: true # true无状态,false有状态,如果业务中包含事务,这里为false
失败消息处理策略:
在开启重试模式后,重试次数耗尽,如果消息仍然失败,则需要MessageRecoverer接口来处理,它包含三种不同的实现:
RejectAndDontRequeueRecoverer: 重试耗尽后,直接reject,丢弃消息。默认就是这种方式
ImmediateRequeueMessageRecoverer: 重试耗尽后,返回nack,消息重新入队
RepublishMessageRecoverer: 重试耗尽后,将失败消息投递到指定的交换机
收发消息定义:
@RabbitListener(queues = {"object.queue"})public void ObjectConsumer(String msg) {System.out.println(msg);throw new RuntimeException("......");}@Testpublic void testMessage() {Message message = MessageBuilder.withBody("hello".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build();rabbitTemplate.convertAndSend("object.queue", message);}
java代码中实现失败消息处理:
@Slf4j
@Configuration
public class CommonConfig {@ResourceRabbitTemplate rabbitTemplate;@PostConstructpublic void setRabbitTemplate() {rabbitTemplate.setReturnsCallback(returnedMessage -> {log.error("收到消息的return callback: msg:{} exchange:{}, test:{}, key:{}, code:{}",returnedMessage.getMessage(), returnedMessage.getExchange(), returnedMessage.getReplyText(),returnedMessage.getRoutingKey(), returnedMessage.getReplyCode());});}
}
运行结果:
可以看到报错信息被发送到了error交换机。
③、保证业务幂等性
幂等是一个数学概念,用函数表达式来描述是:f(x) = f(f(x))。在程序开发中,则是指同一个业务,执行一次或多次的结果是一致的。
像查询一般都是幂等的,因为它的执行对于业务状态没有影响,差多少次都没事,就是幂等的。
像用户下单,退单涉及扣钱,退款操作,如果执行多次,会造成经济损失,就是非幂等的。
为了保证幂等性,我们通过一些方法来实现:
(1)、 唯一消息id
是给每个消息都设置一个唯一id,利用id区分是否重复消费:
-
每一条消息都生成一个唯一的id,与消息一起投递给消费者。
-
消费者接收到消息后根据自己的业务,业务处理成功后将消息ID保存到数据库库
-
如果下次又收到相同消息,去数据库查找判断是否存档,存档则为重复消费放弃处理。
@Bean
public MessageConverter messageConverter() {// 1. 定义消息转换器Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();// 2. 配置自动创建消息id,用于识别不同消息,也可以自己在业务中生成ID判断是否重复消费jjmc.setCreateMessageIds(true);return jjmc;
}
但是,这种方案对数据库压力太大,同时速度也相对慢很多,因此只做了解,只需要知道这种方式是jjmc内部帮你创建了一个UUID的随机值作id。
其它方式的话,需要我们结合实际项目在业务逻辑中进行操作来保证幂等性。
总结
最后,用两个问题来总结:
如何保证对服务与交易服务之间的订单状态一致性?
-
首先,支付服务会在用户支付成功后利用MQ消息通知交易服务,完成订单状态同步。
-
其次,为了保证MQ消息的可靠性,我们采用了生产者确认机制、消息重确认、消费者失败重试等策略,确保消息投递和处理可靠性。同时也开启了MQ的持久化,避免因服务宕机导致消息丢失。
-
最后,我们还在交易服务端新增了业务幂等判断,避免重复消费。
如果交易服务消息处理失败,有没有什么兜底方案?
- 我们可以在交易服务设置定时任务,定期查询订单支付状态。这样即使用MQ通知失败,还可以用定时任务作为兜底方案,确保订单状态最终一致性。
即Spring Task中的@Schedule注解,定时扫描订单状态,用来弥补消息失败,兜底方案。