rabbitmq的高级特性
一.发送者的可靠性
1.生产者重试机制
修改publisher
模块的application.yaml
文件
spring:rabbitmq:connection-timeout: 1s # 设置MQ的连接超时时间template:retry:enabled: true # 开启超时重试机制initial-interval: 1000ms # 失败后的初始等待时间multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multipliermax-attempts: 3 # 最大重试次数
注意:
①当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是 阻塞式 的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的。
②如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。
2.生产者确认机制
RabbitMQ提供了生产者消息确认机制,包括 Publisher Confirm 和 Publisher Return 两种。在开启确认机制的情况下,当生产者发送消息给MQ后,MQ会根据消息处理的情况返回不同的 回执。
如何返回基本内容如下:
① 当消息投递到MQ,但是路由失败时,通过 Publisher Return 返回异常信息,同时通过 Publisher Confirm 返回ACK 的确认信息,代表投递成功。
② 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功。
③ 持久消息投递到了MQ,并且入队完成持久化,返回ACK ,告知投递成功。
④ 其它情况都会返回NACK,告知投递失败。其中 ack 和 nack 属于 Publisher Confirm 机制,ack 是投递成功;nack 是投递失败。而return 则属于 Publisher Return 机制。默认两种机制都是关闭状态,需要通过配置文件来开启。
①开启生产者确认机制
在publisher模块的 application.yaml 中添加配置:
spring:rabbitmq:publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型 publisher-returns: true # 开启publisher return机制
这里 publisher-confirm-type 有三种模式可选:
① none:关闭confirm机制
② simple:同步阻塞等待MQ的回执
③ correlated:MQ异步回调返回回执
②定义ReturnCallback
每个 RabbitTemplate 只能配置一个 ReturnCallback,因此我们可以在配置类中统一设置。我们在publisher模块定义一个配置类:
@Slf4j @AllArgsConstructor @Configuration public class MqConfig {private final RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {log.error("触发return callback,");log.debug("exchange: {}", returned.getExchange());log.debug("routingKey: {}", returned.getRoutingKey());log.debug("message: {}", returned.getMessage());log.debug("replyCode: {}", returned.getReplyCode());log.debug("replyText: {}", returned.getReplyText());}});} }
③定义ConfirmCallback
由于每个消息发送时的处理逻辑不一定相同,因此ConfirmCallback需要在每次发消息时定义。具体来说,是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数:
@Test void testPublisherConfirm() {// 1.创建CorrelationDataCorrelationData cd = new CorrelationData(UUID.randomUUID().toString());// 2.给Future添加ConfirmCallbackcd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable ex) {// 2.1.Future发生异常时的处理逻辑,基本不会触发log.error("send message fail", ex);}@Overridepublic void onSuccess(CorrelationData.Confirm result) {// 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容if(result.isAck()){ // result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执log.debug("发送消息成功,收到 ack!");}else{ // result.getReason(),String类型,返回nack时的异常描述log.error("发送消息失败,收到 nack, reason : {}", result.getReason());}}});// 3.发送消息rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", cd); }
总结:Publisher Confirm 用来确认消息是否发送到MQ,而Publish Return 用来通知生产者哪些消息由于路由失败没有被接收。
注意:
开启生产者确认比较消耗MQ性能,一般不建议开启。
二.MQ的可靠性
1.数据持久化
交换机持久化,队列持久化,消息持久化(先保存到内存在写入磁盘),这三个持久化都是默认开启的。如果消息类型是非持久化的,只有在消息队列满了后会被迫写入磁盘。
总结:
持久化是持续将消息写入磁盘,非持久化是当mq内存被使用完毕后才将消息写入磁盘,因此性能较差。
2.LazyQueue
① 接收到(不论临时还是持久的消息)消息后直接存入磁盘而非内存
② 消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载)(如果消费者的速度很快也会把消息提前缓存到内存)
③ 支持数百万条的消息存储这种模式属于数据持久化的升级版。
从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的模式,而在3.12版本之后,LazyQueue已经成为所有队列的默认格式。
三.消费者的可靠性
1.消费者确认机制
为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。即:当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:
- ack:成功处理消息,RabbitMQ从队列中删除该消息
- nack:消息处理失败,RabbitMQ需要再次投递消息
- reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息由于消息回执的处理代码比较统一,因此SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式,有三种模式:
①none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
②manual:手动模式。需要自己在业务代码中调用api,发送 ack 或 reject ,存在业务入侵,但更灵活。
③auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回 ack. 当业务出现异常时,根据异常判断返回不同结果:
- 如果是 业务异常,会自动返回 nack,消息处理失败后,会回到RabbitMQ,并重新投递到消费者。
- 如果是 消息处理或校验异常,自动返回 reject。
通过下面的配置可以修改SpringAMQP的ACK处理方式:
spring:rabbitmq:listener:simple:acknowledge-mode: none # 不做处理
2.失败重试机制(对消费者确认机制的增强)
如果上面的代码一直返回nack会导致无线循环。
所以我们配置消费者自己重试,如果超过了配置重试的次数,就会返回reject
修改consumer服务的application.yml文件,添加内容:
spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初识的失败等待时长为1秒multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
配置之后出现的现象:
- 消费者在失败后消息没有重新回到MQ无限重新投递,而是在本地重试了3次
- 本地重试3次以后,抛出了 AmqpRejectAndDontRequeueException 异常。查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是 reject
3.自定义失败处理策略(对消费者重试机制的增强)
有上面失败之后是直接返回的reject,这可能并不是我们想要返回的结果,于是我们有了失败处理策略。
因此Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery 接口来定义的,它有3个不同实现:
①RejectAndDontRequeueRecoverer:重试耗尽后,直接 reject ,丢弃消息。默认就是这种方式 (黑马说不可选,下面两个可选)
②ImmediateRequeueMessageRecoverer:重试耗尽后,返回 nack ,消息重新入队
③RepublishMessageRecoverer :重试耗尽后,将失败消息投递到指定的交换机,然后转入到指定的队列,后续由人工集中处理。代码实现:
@Configuration @ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true") public class ErrorMessageConfig {@Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");}@Beanpublic Queue errorQueue(){return new Queue("error.queue", true);}@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error"); //这里的with是routingkey}// 这个是配置消息处理失败之后投入到那个交换机@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");} }
4.确保业务幂等性(解决消费者重复消费的问题)
幂等性:指同一个业务,执行一次或多次对业务状态的影响是一致的。
数据的删除,查询一般是幂等的,但是修改和新增不是幂等的,案例如下:
所以,我们要尽可能避免业务被重复执行。
为了解决上面图中的问题我们有了如下的解决方案:
①使用唯一消息id
给每一个消息都设置一个唯一ID,用ID区分是否被消费过。当我们消费一个消息的时候,先在数据库中查询是否存在这个数据的ID,如果不存在就消费。如果存在就说明这个消息之前被消费过。
给消息设置唯一id:这就是在配置消息转换器的时候添加了一点代码
@Bean
public MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jjmc.setCreateMessageIds(true);return jjmc;
}
获取消息的id:
@RabbitListener(queues = "simple.queue")
public void listensimpleQueue(Message message) {log.info("监听到simple.queue的消息:ID:【{}】", message.getMessageProperties().getMessageId());log.info("监听到simple.queue的消息:【{}】", new String(message.getBody()));
}
缺点:
但是这种方法也有自己的缺点,也就是对业务逻辑有侵入性,而且还有额外的数据库操作。
②根据业务判断
根据上面出现非幂等性问题,我们可以把业务逻辑改成这样就可以保证业务的幂等性了。