当前位置: 首页 > news >正文

【Fifty Project - D34】

今日完成记录

TimePlan完成情况
7:00 - 7:40爬坡
8:30 - 11:30Rabbit MQ
17:30 - 18:30羽毛球

RabbitMQ

消费者端如何保证可靠性?

  • 消息投递过程出现网络故障
  • 消费者接收到消息但是突然宕机未消费消息
  • 消费者接收到消息后处理不当抛出异常
  • 。。。

消费者确认机制

Consumer Acknowledgement:消费者处理消息结束应该给MQ发送一个回执,告知自己的消息处理状态:ack【成功处理消息,MQ从队列中删除消息】nack【消息处理失败,MQ需要重新推送消息】reject【消息处理失败并拒绝该消息,MQ从队列删除消息】

springAMQP提供了三种ACK处理方式:

  • none:不处理,消息投递给消费者后直接返回ack【不安全,不建议】
  • manual:手动处理,自己在业务代码中调用api发送ack或者reject【存在业务入侵但是更灵活】
  • auto:自动处理,利用aop自动对业务代码进行增强,正常执行则返回ack,出现异常则根据异常类型处理【业务异常返回nack, 消息处理或者校验异常返回reject】

返回reject常见异常:MessageConversionException、MethodArgumentNotValidException、MethodArgumentTypeMissmatchException、NoSuchMethodException、ClassCastException
基本上就是消息校验异常以及不匹配处理方法或者参数的异常

通过如下配置可以设置ack处理方法:

spring:rabbitmq:listener:simple:acknowledge-mode: none # 不做处理

1、测试none处理方式,修改消费者端代码,使其抛出触发reject的异常。在抛出异常前打断点,并观察发现rabbitmq的客户端发现消息在发送到消费者则触发了自动ack并且删除了消息 ,触发异常后客户端并没有做任何处理。
在这里插入图片描述

2、修改acknowledge-mode为auto,再观察发现阻塞异常触发前消息处于uack状态,但同时观察到收到了一个manual ack。
在这里插入图片描述
(1)当代码继续执行,抛出MessageConversionException,会向MQ发送reject,删除消息。

这里发生了一个有意思的现象,因为我消息阻塞了太久触发了MQ消息重新投递,因此又出现了一个manual ack以及交替出现的ready和unack。
在这里插入图片描述

(2)当抛出异常是RuntimeException,可以观察到unack一直是1,且一直尝试重新投递。(重新投递没有触发那个自动的manual ack)
在这里插入图片描述

这里留两个小问题:为什么会自动发送了一个manual ack?这个重传是否是超时重传还是什么其他机制?

3、设置acknowledge-mode为manual,修改消费者端代码手动调用api返回消息回执

@RabbitListener(bindings = @QueueBinding(key="*.top",value = @Queue(value="df.topic.queue1"),exchange = @Exchange(value = "df.topic1", type = ExchangeTypes.TOPIC)
))
public void listenDirectQueue1(Object msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {System.out.println("这是第" + (cnt++) + "条消息");channel.basicAck(deliveryTag, false);
}

(1)测试ack
首先是接着2的调试代码继续调试【也就是此时消息队列中有一个消息没有被接收】,所以启动测试代码后这个消息会被重新投递,消息被消费者接收后手动回复确定,整个过程如下图
在这里插入图片描述
接下来重新投递一条消息观察正常的手动ack全过程,图中上面的图蓝色线(unacked)被红色线遮挡,它们其实是同样的走势。也就是当消息成功投递到消费者,会触发一次自动的ack(Deliver manual ack),但是消息处于uack,等到业务代码完成手动进行ack后该消息被ack并且删除。
在这里插入图片描述
(2)测试nack

@RabbitListener(bindings = @QueueBinding(key="*.top",value = @Queue(value="df.topic.queue1"),exchange = @Exchange(value = "df.topic1", type = ExchangeTypes.TOPIC)
))
public void listenDirectQueue1(Object msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {try {System.out.println("这是第" + (cnt++) + "条消息");throw new BusinessException();
//            channel.basicAck(deliveryTag, false);}catch (BusinessException e){// nack且重新入队 重新推送channel.basicNack(deliveryTag, false, true);}catch (MessageConversionException e){// reject 并且不重新入队channel.basicReject(deliveryTag, false);}
}

上面的代码抛出了自定义的业务异常,这个异常会被捕获并且返回nack,然后重新推送,如下图
在这里插入图片描述
(3)测试reject

@RabbitListener(bindings = @QueueBinding(key="*.top",value = @Queue(value="df.topic.queue1"),exchange = @Exchange(value = "df.topic1", type = ExchangeTypes.TOPIC)
))
public void listenDirectQueue1(Object msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {try {System.out.println("这是第" + (cnt++) + "条消息");throw new MessageConversionException("just a test for msg reject");
//            channel.basicAck(deliveryTag, false);}catch (MessageConversionException e){// reject 并且不重新入队channel.basicReject(deliveryTag, false);}}

这里抛出MessageConversionException,捕获后手动返回拒绝并且不重新投递,过程如下
在这里插入图片描述
**总结:**实际上SpringAMQP只是提供了三个接口basicAckbasicNackbasicReject,这三个接口何时触发,基于何种规则触发都是可以自定义的,上面的三个实现是基本与acknowledge-mode: auto一样的逻辑:业务异常nack且重新投递、消息异常reject且不重新投递、正常接收和消费则ack

消息失败重试机制


在这里插入图片描述

上面提到的两个小问题

为什么会触发一次自动的Deliver(Manual ACK)

原来这个Deliver(Manual ACK)是一个投递事件ACK,当消息进入消息队列未被消费,其状态为ready,当其被投递到消费者,状态会更新为unacked,如果被成功消费并且确认,则会被删除。
当首次投递,则会触发一个投递事件(ready变为unacked)
当消息被重新投递,不会再触发投递事件
这是因为:

  1. 性能优化:重复发送投递事件会导致网络带宽浪费、Broker的CPU浪费、监控系统负载
  2. 语义精确性:RabbitMQ的事件新系统旨在“报告状态变化的边界,而非状态本身”,重复投递的状态变化是首次投递的重复,因此没有必要重复报告
  3. 避免误导性监控:重复报告投递事件会导致消息计数错误,无法区分实际新消息以及重新投递消息

这个重传是否是超时重传还是什么其他机制?

明天再补了

相关文章:

  • 基于Flask,MySQL和MongoDB实现的在线阅读系统
  • WEB3全栈开发——面试专业技能点P6后端框架 / 微服务设计
  • DisplayPort 2.0协议介绍(2)
  • TJCTF 2025
  • 申请Let’s Encrypt 证书
  • 分布式增量爬虫实现方案
  • 【大模型:知识库管理】--Dify接入RAGFlow 知识库
  • Ray框架:分布式AI训练与调参实践
  • IP选择注意事项
  • QEMU源码全解析 —— 块设备虚拟化(26)
  • 渗透实战PortSwigger Labs指南:自定义标签XSS和SVG XSS利用
  • 37 C 语言字符串基础操作函数详解:strlen、strcpy、strncpy、strcat、strncat、strcmp、strncmp
  • 实现p2p的webrtc-srs版本
  • Coze工作流-语音故事创作-文本转语音的应用
  • LabVIEW双光子成像系统技术
  • BI系统帮助企业释放数据价值
  • 游戏开发中常见的战斗数值英文缩写对照表
  • SAP Fiori UI5 开发环境搭建和部署(含增强开发)
  • 用神经网络读懂你的“心情”:揭秘情绪识别系统背后的AI魔法
  • 健康档案实训室:构建全周期健康管理的数据基石
  • 网站更新要怎么做/企业网站建设步骤
  • 做网站如何注意排版问题/如何让百度收录自己的网站信息
  • 日本女做网站/网站改进建议有哪些
  • 做网站被用作非法用途/网站优化排名公司哪家好
  • 沙井网站建设/网络建站工作室
  • 威海住房和城乡建设局官方网站/百度在线搜索