当前位置: 首页 > news >正文

rabbitmq的高级特性

一.发送者的可靠性

1.生产者重试机制

修改publisher模块的application.yaml文件

spring:rabbitmq:connection-timeout: 1s # 设置MQ的连接超时时间template:retry:enabled: true # 开启超时重试机制initial-interval: 1000ms # 失败后的初始等待时间multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multipliermax-attempts: 3 # 最大重试次数

注意:

①当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是 阻塞式 的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的。
②如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。 

2.生产者确认机制

RabbitMQ提供了生产者消息确认机制,包括 Publisher Confirm 和 Publisher Return 两种。在开启确认机制的情况下,当生产者发送消息给MQ后,MQ会根据消息处理的情况返回不同的 回执

如何返回基本内容如下:

① 当消息投递到MQ,但是路由失败时,通过 Publisher Return 返回异常信息,同时通过 Publisher Confirm 返回ACK 的确认信息,代表投递成功。
② 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功。
③ 持久消息投递到了MQ,并且入队完成持久化,返回ACK ,告知投递成功。
④ 其它情况都会返回NACK,告知投递失败。

其中 ack 和 nack 属于 Publisher Confirm 机制,ack 是投递成功;nack 是投递失败。而return 则属于 Publisher Return 机制。默认两种机制都是关闭状态,需要通过配置文件来开启。

①开启生产者确认机制

在publisher模块的 application.yaml 中添加配置:

spring:rabbitmq:publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型  publisher-returns: true # 开启publisher return机制

这里 publisher-confirm-type 有三种模式可选:

① none:关闭confirm机制
② simple:同步阻塞等待MQ的回执
③ correlated:MQ异步回调返回回执

②定义ReturnCallback

每个 RabbitTemplate 只能配置一个 ReturnCallback,因此我们可以在配置类中统一设置。我们在publisher模块定义一个配置类:

@Slf4j
@AllArgsConstructor
@Configuration
public class MqConfig {private final RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {log.error("触发return callback,");log.debug("exchange: {}", returned.getExchange());log.debug("routingKey: {}", returned.getRoutingKey());log.debug("message: {}", returned.getMessage());log.debug("replyCode: {}", returned.getReplyCode());log.debug("replyText: {}", returned.getReplyText());}});}
}
③定义ConfirmCallback

由于每个消息发送时的处理逻辑不一定相同,因此ConfirmCallback需要在每次发消息时定义。具体来说,是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数:

@Test
void testPublisherConfirm() {// 1.创建CorrelationDataCorrelationData cd = new CorrelationData(UUID.randomUUID().toString());// 2.给Future添加ConfirmCallbackcd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable ex) {// 2.1.Future发生异常时的处理逻辑,基本不会触发log.error("send message fail", ex);}@Overridepublic void onSuccess(CorrelationData.Confirm result) {// 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容if(result.isAck()){ // result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执log.debug("发送消息成功,收到 ack!");}else{ // result.getReason(),String类型,返回nack时的异常描述log.error("发送消息失败,收到 nack, reason : {}", result.getReason());}}});// 3.发送消息rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", cd);
}

总结:Publisher Confirm 用来确认消息是否发送到MQ,而Publish Return 用来通知生产者哪些消息由于路由失败没有被接收

注意:

开启生产者确认比较消耗MQ性能,一般不建议开启。 

二.MQ的可靠性

1.数据持久化

交换机持久化,队列持久化,消息持久化(先保存到内存在写入磁盘),这三个持久化都是默认开启的。如果消息类型是非持久化的,只有在消息队列满了后会被迫写入磁盘。

总结:

持久化是持续将消息写入磁盘,非持久化是当mq内存被使用完毕后才将消息写入磁盘,因此性能较差。

2.LazyQueue

① 接收到(不论临时还是持久的消息)消息后直接存入磁盘而非内存

② 消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载)(如果消费者的速度很快也会把消息提前缓存到内存)
③ 支持数百万条的消息存储

这种模式属于数据持久化的升级版。

从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的模式,而在3.12版本之后,LazyQueue已经成为所有队列的默认格式。

三.消费者的可靠性

1.消费者确认机制

为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。即:当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:

  - ack:成功处理消息,RabbitMQ从队列中删除该消息
  - nack:消息处理失败,RabbitMQ需要再次投递消息
  - reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

由于消息回执的处理代码比较统一,因此SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式,有三种模式:

     ①none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
     ②manual:手动模式。需要自己在业务代码中调用api,发送 ack 或 reject ,存在业务入侵,但更灵活。
     ③auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回 ack.  当业务出现异常时,根据异常判断返回不同结果:

  • 如果是 业务异常,会自动返回 nack,消息处理失败后,会回到RabbitMQ,并重新投递到消费者。
  • 如果是 消息处理或校验异常,自动返回 reject。

 通过下面的配置可以修改SpringAMQP的ACK处理方式:

spring:rabbitmq:listener:simple:acknowledge-mode: none # 不做处理

2.失败重试机制(对消费者确认机制的增强)

如果上面的代码一直返回nack会导致无线循环。

所以我们配置消费者自己重试,如果超过了配置重试的次数,就会返回reject

修改consumer服务的application.yml文件,添加内容:

spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初识的失败等待时长为1秒multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

配置之后出现的现象:

- 消费者在失败后消息没有重新回到MQ无限重新投递,而是在本地重试了3次
- 本地重试3次以后,抛出了 AmqpRejectAndDontRequeueException 异常。查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是 reject 

3.自定义失败处理策略(对消费者重试机制的增强)

有上面失败之后是直接返回的reject,这可能并不是我们想要返回的结果,于是我们有了失败处理策略。

因此Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery 接口来定义的,它有3个不同实现:

  ①RejectAndDontRequeueRecoverer:重试耗尽后,直接 reject ,丢弃消息。默认就是这种方式 (黑马说不可选,下面两个可选)
  ②ImmediateRequeueMessageRecoverer:重试耗尽后,返回 nack ,消息重新入队 
  ③RepublishMessageRecoverer :重试耗尽后,将失败消息投递到指定的交换机,然后转入到指定的队列,后续由人工集中处理。

代码实现:

@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {@Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");}@Beanpublic Queue errorQueue(){return new Queue("error.queue", true);}@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error"); //这里的with是routingkey}// 这个是配置消息处理失败之后投入到那个交换机@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}

4.确保业务幂等性(解决消费者重复消费的问题)

幂等性:指同一个业务,执行一次或多次对业务状态的影响是一致的。

数据的删除,查询一般是幂等的,但是修改和新增不是幂等的,案例如下:

所以,我们要尽可能避免业务被重复执行。

为了解决上面图中的问题我们有了如下的解决方案:

①使用唯一消息id

给每一个消息都设置一个唯一ID,用ID区分是否被消费过。当我们消费一个消息的时候,先在数据库中查询是否存在这个数据的ID,如果不存在就消费。如果存在就说明这个消息之前被消费过。

给消息设置唯一id:这就是在配置消息转换器的时候添加了一点代码

@Bean
public MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jjmc.setCreateMessageIds(true);return jjmc;
}

 获取消息的id:

@RabbitListener(queues = "simple.queue")
public void listensimpleQueue(Message message) {log.info("监听到simple.queue的消息:ID:【{}】", message.getMessageProperties().getMessageId());log.info("监听到simple.queue的消息:【{}】", new String(message.getBody()));
}

缺点:

 但是这种方法也有自己的缺点,也就是对业务逻辑有侵入性,而且还有额外的数据库操作。 

②根据业务判断

根据上面出现非幂等性问题,我们可以把业务逻辑改成这样就可以保证业务的幂等性了。

相关文章:

  • 016搜索之广度优先BFS——算法备赛
  • UPS的工作原理和UPS系统中旁路的作用
  • 数据库优化常用技巧【面试】
  • 上讯信息运维管理审计系统imo.php存在命令执行漏洞(CNVD-2025-07703)
  • hive 笔记
  • JAVA运算符详解
  • 实验设计与分析(第6版,Montgomery)第3章单因子实验:方差分析3.11思考题3.4 R语言解题
  • 如何彻底禁用WordPress中的评论
  • 12. CSS 布局与样式技巧
  • [ Qt ] | 常见控件(二): window相关
  • 每天掌握一个Linux命令 - sqlite3
  • Baklib在数字化内容管理中的关键作用是什么?
  • JAVA SE 文件IO
  • 笔记: 在WPF中ContentElement 和 UIElement 的主要区别
  • MYSQL丢失pid处理方式
  • LVGL(lv_tabview)
  • 商品条形码查询接口如何用C#进行调用?
  • 【Harmony OS】组件自定义属性、事件和状态管理
  • 525全国护肤日 国际医学皮肤科助力银龄肌肤科学护肤
  • 用ChatGPT辅助UI设计:从需求分析到风格提案的提效秘籍
  • 建站属于什么行业/简单制作html静态网页
  • 网站建设类文章要发多少片/电商
  • 一个独立IP做几个网站比较合适/最常用的搜索引擎有哪些
  • 个人网站要备案嘛/杭州网站seo
  • 网站建设的素材/雅虎搜索引擎首页
  • 网站建设企/电工培训机构