MQ重复消费问题
做MQ开发时,很多人都会遇到这样的坑:明明只发了一条“扣库存”消息,结果库存被扣了两次;用户付了一次钱,却收到两条支付成功通知——这就是MQ的“重复消费”问题。其实重复消费不可怕,搞懂原因、做好防护,就能轻松应对。这篇文章用大白话讲清重复消费的4个常见原因,再给一个万能解决办法,新手也能直接套用。
一、先看个糟心场景:重复消费导致多扣钱
之前做电商库存系统,用RabbitMQ传递“扣库存”消息。有次用户反馈“买1件商品,库存却少了2件”,查日志发现:同一条“扣库存”消息被消费了两次——第一次正常扣减,第二次因为重复消费,又多扣了一次。
后来排查才知道,是消费方处理完消息后,没及时给MQ发“确认(ack)”,MQ以为消费失败,又重发了一次消息。这就是重复消费最典型的场景,其实只要提前做好防护,完全能避免。
二、重复消费的4个常见原因,看完就懂
不管用RabbitMQ、Kafka还是RocketMQ,重复消费的原因基本就4类,搞懂这些,就能针对性避坑:
1. 原因1:消费方没及时ack,MQ重试发送
这是最常见的原因!MQ判断消息是否消费成功,全看消费方有没有发“ack(确认)”:
- 如果消费方处理完消息,没发ack就崩溃了;
- 或者网络波动,ack没传到MQ;
MQ会认为“消息没处理成功”,等消费方重启或网络恢复后,就会重发这条消息——结果就是消费方收到两次相同消息。
实例:之前做通知服务,处理完消息后,代码里漏写了ack逻辑,导致MQ一直重发,用户收到十几条相同的短信,投诉不断。
2. 原因2:位点提交失败,MQ“回头重发”
分布式MQ(如Kafka、RocketMQ)用“消费位点”记录“消费到哪条消息了”:
- 比如Kafka消费组会定期提交“offset(偏移量)”,告诉MQ“我已经消费到offset=100的消息了”;
- 如果提交offset时网络断了,或者消费方崩溃了,MQ没收到新的offset,下次消费方重启,MQ会从上次提交的offset(比如offset=99)开始重发消息——offset=99到100的消息就会被重复消费。
实例:Kafka消费组配置了“自动提交offset”,但因为消费方处理速度慢,没等提交offset就崩溃了,重启后从之前的offset重发,导致10条消息被重复消费。
3. 原因3:发送方逻辑错,重复发消息
有时候不是MQ的问题,是发送方自己的代码有问题,导致消息重复发送:
- 比如发送方发消息时,网络波动没收到MQ的“发送确认”,以为发送失败,就重试发送,结果MQ其实已经收到了,导致两条相同消息;
- 或者业务逻辑错了,比如循环里多写了一次发送代码,本来该发1条,结果发了2条。
实例:之前做支付服务,发“支付成功”消息时,因为没处理MQ的“发送确认”,以为发送失败,重试了一次,结果MQ收到两条相同消息,下游服务重复消费,给用户发了两条支付通知。
4. 原因4:网络不稳定,消息“重传”
MQ的消息在网络传输中,可能因为网络抖动、延迟,导致消息重传:
- 比如发送方发消息到MQ,网络卡了一下,MQ没及时响应,发送方的底层网络协议(如TCP)会自动重传消息;
- 或者MQ发消息到消费方,消费方没及时收到,MQ也会重传。
这种原因比较隐蔽,但概率不高,不过一旦发生,也会导致重复消费。
三、万能解决办法:消费方做好“幂等处理”
很多人会问:“能不能让MQ不发重复消息?”答案是“很难”——因为网络波动、消费方崩溃这些情况无法完全避免,MQ为了确保消息不丢失,必须有重发机制。
所以,解决重复消费的核心,不是“阻止MQ重发”,而是让消费方做到“重复消费也不影响业务”——这就是“幂等处理”。
什么是幂等?
简单说:同一条消息,消费1次和消费100次,结果一样。比如“扣库存”消息,重复消费后,库存也不会多扣。
2个落地的幂等方案,直接能用
方案1:用“唯一ID”防重(最常用)
给每条消息加一个唯一ID(比如订单ID、UUID),消费前先查“这个ID有没有处理过”,处理过就直接跳过。
实例(扣库存业务):
- 发送方发消息时,带上唯一ID(比如订单ID=order123):
// RabbitMQ发送消息,消息属性里带唯一ID
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId("order123") // 唯一ID.build();
channel.basicPublish("", "stock_queue", props, "扣库存:order123,数量1".getBytes());
- 消费方处理消息时,先查数据库:
// 消费方代码
channel.basicConsume("stock_queue", false, (tag, delivery) -> {String msgId = delivery.getProperties().getMessageId(); // 获取唯一IDString msg = new String(delivery.getBody());// 1. 查数据库,看这个msgId有没有处理过if (hasProcessed(msgId)) {channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);return; // 已处理,直接确认}// 2. 没处理过,执行扣库存业务deductStock(msg);// 3. 记录msgId到数据库,标记已处理recordProcessedMsgId(msgId);// 4. 确认消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}, tag -> {});
- 关键:数据库的
msg_id
字段要加“唯一索引”,避免重复插入——就算重复消费,插入时会报错,直接跳过处理。
方案2:用“业务状态”防重(更灵活)
如果消息没有唯一ID,或者不想存“消息日志表”,可以用“业务状态”判断——比如“订单状态从A变B”,只有状态是A时才处理,不是A就跳过。
实例(订单状态更新):
- 消息内容是“订单order123从待支付(WAIT_PAY)改成已支付(PAID)”;
- 消费方处理时,先查订单当前状态:
public void processOrderMsg(String orderId, String targetStatus) {// 1. 查订单当前状态String currentStatus = getOrderStatus(orderId);// 2. 只有当前状态是“待支付”,才更新成“已支付”if ("WAIT_PAY".equals(currentStatus)) {updateOrderStatus(orderId, targetStatus);}// 不是待支付状态,直接跳过
}
这样就算重复消费,因为订单状态已经是“已支付”,也不会重复更新。
四、总结
记住“1个核心+4个原因+1个方案”
- 核心:MQ重复消费无法完全避免,因为重发是为了确保消息不丢失;
- 原因:消费方没ack、位点提交失败、发送方逻辑错、网络不稳;
- 解决办法:消费方做好幂等处理,常用“唯一ID防重”或“业务状态防重”。
最后说一句
重复消费不是MQ的“bug”,而是为了“消息不丢失”付出的合理代价。只要消费方做好幂等,重复消费就不会影响业务——这是MQ开发的必备技能