【Fifty Project - D36】
今日完成记录
Time | Plan | 完成情况 |
---|---|---|
13:30 - 14:30 | 爬坡 | √ |
14:30 - 15:30 | Rabbit MQ | √ |
15:30 - 17:30 | 《少有人走的路2》 | √ |
17:30 - 19:30 | 🏸 | √ |
20:00 - 21:30 | 🏀 | √ |
RabbitMQ - 消费者可靠性
业务幂等性
幂等:同一个业务执行一次或者多次对业务状态的影响都是一致的,如根据id删除数据、查询数据、新增数据
但是数据的更新不是幂等的,重复执行可能带来不同的结果,如:取消订单、恢复库存这个操作,多次执行会导致库存重复增加;退款业务多次执行重复退款会导致商家经济损失
所以,我们需要避免业务的重复执行,然而实际上很多场景难免导致业务重复执行,如:页面卡顿导致表单重复提交,服务间调用的重试,MQ消息的重复投递。
举个例子,支付服务完成后通过MQ发消息给订单状态服务,将订单状态更改为已支付状态,这个过程中由于网络原因,支付服务没有得到MQ的ack,超时后支付服务会重新再发一个消息给MQ,在这个新消息被消费之前【此时订单状态为已支付】,顾客通过退款服务成功退款,订单状态变为【已退款】,此时刚刚那个消息达到了订单状态服务,消费后订单变成了已支付状态。业务异常。
因此,必须想办法保证消息处理的幂等性
- 唯一消息ID
- 业务状态判断
唯一消息ID
给每个消息增加一个唯一ID,消费者每次消费消息前检查该消息是否已经成功消费过(数据库是否存在该唯一ID),每次成功消费消息都将ID保存数据库。
SpringAMQP提供了相应的方法,只需要在消息转换器中开启设置就行
业务状态判断
就上面的案例而言,在将订单状态更改为已支付之前,必须检查原始状态是否为未支付,如果不是则说明当前消息曾被消费成功过,则不消费。
兜底方案
尽管前面有三种途径去增强消息的可靠性,但是依然无法保证消息100%可靠,万一还是出现了MQ失败,是否能有其他兜底方案确保订单的支付状态一致呢
答:让交易服务主动查询支付状态,通过定时任务的设计让交易服务对未支付的订单进行主动支付状态查询,如果发现有已支付的支付状态,那么就将订单状态更改为已支付
Rabbit MQ - 延迟消息
在电商业务中,有一些用户下单后不付款,就会一直占用库存资源,对于这些订单,需要设置延迟取消,也就是在一定时间内没有支付成功的订单自动取消并释放库存。像这种一段时间后执行的任务称之为延迟任务,而作为延迟任务的一种简单实现方法就是MQ的延迟消息
- 死信交换机 + TTL
- 延迟消息插件
死信交换机和延迟消息
死信交换机
死信:一个消息满足:无处可去【目标队列已满、nack或者reject的消息且requeue为false】或者无人消费【在队列中超时存在、未被消费】
如果一个队列的消息成为死信并且该队列通过dead-letter-exchange
指定了一个交换机,那么这个队列的死信就会投递到这个交换机中,而这个交换机就叫死信交换机,如果有队列跟该死信交换机绑定,那么就会根据路由规则将消息投递到对应队列中
作用:
- 收集因为处理失败而被拒绝的消息
- 收集因为队列满而被拒绝的消息
- 手机因为TTL而被丢弃的消息
延迟消息
通过设置消息的expiration
或者设置队列内消息的最大生存时间x-message-ttl
结合死信队列实现延迟消息。
DelayExchange插件
插件下载地址:GitHub - rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ
在上面下载好对应版本的插件
我的rabbitmq部署在docker中,所以先查看一下卷信息
将插件文件上传到挂载的路径下,执行插件安装命令docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
声明延迟交换机
基于注解
@RabbitListener(bindings = @QueueBinding(key = "delay",value = @Queue(value = "df.delay.queue", durable = "true"),exchange = @Exchange(value = "df.delay", delayed = "true")
))
public void listenDelayMsg(String msg){System.out.println("!!!");log.info("get a delay msg: " + msg + " at " + new Date().getTime());
}
基于Bean
@Configuration
public class Delay {@Beanpublic DirectExchange delayedExchange(){return ExchangeBuilder.directExchange("df.delay.direct").delayed().build();}@Beanpublic Queue delayQueue(){return new Queue("delay.queue1");}@Beanpublic Binding bindingDelay(){return BindingBuilder.bind(delayQueue()).to(delayedExchange()).with("delay");}
}
实验
生产者端使用消息后处理器设置消息延迟
@Testvoid testSendDelayMsg(){rt.convertAndSend("df.delay", "delay", "this is a delay msg send at time:" + new Date().getTime(), new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelay(3000);return message;}});}
结果如下,观察交换机可以发现消息延迟后publish out
实验中测试了三次,前两次都是消费者进程在线,生产者生产完由路由器延迟推送,直接被消费者消费了,第三次将消费者进程关闭,可以看到消息滞留在队列中
rabbit MQ 暂告一段落,后面还有一小节内容是基于黑马商城的实践作业,然而自己连黑马商城还没做完,赶紧开始整黑马商城吧QAQ
《少有人走的路2》
等待预约羽毛球场地期间看的书,想看的东野圭吾没带,顺手拿了同门桌上的书,果然这种训诫类(经验类,只是读着很像长者劝诫的人生哲理)的书还是不适合我,看了一个序还有第一个案例,感觉被老头子训了一下午hhhhh
逃避问题会带来谎言,带来的谎言会扭曲人的心灵,吧啦吧啦的
还有很有意思的live【生活】反过来就是evil【邪恶】,所以谎言这种颠倒是非黑白的东西会导致邪恶
好处是陪我消磨了一个多小时的等待时光咯