当前位置: 首页 > news >正文

MQ重复消费问题

做MQ开发时,很多人都会遇到这样的坑:明明只发了一条“扣库存”消息,结果库存被扣了两次;用户付了一次钱,却收到两条支付成功通知——这就是MQ的“重复消费”问题。其实重复消费不可怕,搞懂原因、做好防护,就能轻松应对。这篇文章用大白话讲清重复消费的4个常见原因,再给一个万能解决办法,新手也能直接套用。

一、先看个糟心场景:重复消费导致多扣钱

之前做电商库存系统,用RabbitMQ传递“扣库存”消息。有次用户反馈“买1件商品,库存却少了2件”,查日志发现:同一条“扣库存”消息被消费了两次——第一次正常扣减,第二次因为重复消费,又多扣了一次。

后来排查才知道,是消费方处理完消息后,没及时给MQ发“确认(ack)”,MQ以为消费失败,又重发了一次消息。这就是重复消费最典型的场景,其实只要提前做好防护,完全能避免。

二、重复消费的4个常见原因,看完就懂

不管用RabbitMQ、Kafka还是RocketMQ,重复消费的原因基本就4类,搞懂这些,就能针对性避坑:

1. 原因1:消费方没及时ack,MQ重试发送

这是最常见的原因!MQ判断消息是否消费成功,全看消费方有没有发“ack(确认)”:

  • 如果消费方处理完消息,没发ack就崩溃了;
  • 或者网络波动,ack没传到MQ;
    MQ会认为“消息没处理成功”,等消费方重启或网络恢复后,就会重发这条消息——结果就是消费方收到两次相同消息。

实例:之前做通知服务,处理完消息后,代码里漏写了ack逻辑,导致MQ一直重发,用户收到十几条相同的短信,投诉不断。

2. 原因2:位点提交失败,MQ“回头重发”

分布式MQ(如Kafka、RocketMQ)用“消费位点”记录“消费到哪条消息了”:

  • 比如Kafka消费组会定期提交“offset(偏移量)”,告诉MQ“我已经消费到offset=100的消息了”;
  • 如果提交offset时网络断了,或者消费方崩溃了,MQ没收到新的offset,下次消费方重启,MQ会从上次提交的offset(比如offset=99)开始重发消息——offset=99到100的消息就会被重复消费。

实例:Kafka消费组配置了“自动提交offset”,但因为消费方处理速度慢,没等提交offset就崩溃了,重启后从之前的offset重发,导致10条消息被重复消费。

3. 原因3:发送方逻辑错,重复发消息

有时候不是MQ的问题,是发送方自己的代码有问题,导致消息重复发送:

  • 比如发送方发消息时,网络波动没收到MQ的“发送确认”,以为发送失败,就重试发送,结果MQ其实已经收到了,导致两条相同消息;
  • 或者业务逻辑错了,比如循环里多写了一次发送代码,本来该发1条,结果发了2条。

实例:之前做支付服务,发“支付成功”消息时,因为没处理MQ的“发送确认”,以为发送失败,重试了一次,结果MQ收到两条相同消息,下游服务重复消费,给用户发了两条支付通知。

4. 原因4:网络不稳定,消息“重传”

MQ的消息在网络传输中,可能因为网络抖动、延迟,导致消息重传:

  • 比如发送方发消息到MQ,网络卡了一下,MQ没及时响应,发送方的底层网络协议(如TCP)会自动重传消息;
  • 或者MQ发消息到消费方,消费方没及时收到,MQ也会重传。

这种原因比较隐蔽,但概率不高,不过一旦发生,也会导致重复消费。

三、万能解决办法:消费方做好“幂等处理”

很多人会问:“能不能让MQ不发重复消息?”答案是“很难”——因为网络波动、消费方崩溃这些情况无法完全避免,MQ为了确保消息不丢失,必须有重发机制。

所以,解决重复消费的核心,不是“阻止MQ重发”,而是让消费方做到“重复消费也不影响业务”——这就是“幂等处理”。

什么是幂等?

简单说:同一条消息,消费1次和消费100次,结果一样。比如“扣库存”消息,重复消费后,库存也不会多扣。

2个落地的幂等方案,直接能用

方案1:用“唯一ID”防重(最常用)

给每条消息加一个唯一ID(比如订单ID、UUID),消费前先查“这个ID有没有处理过”,处理过就直接跳过。

实例(扣库存业务)

  1. 发送方发消息时,带上唯一ID(比如订单ID=order123):
// RabbitMQ发送消息,消息属性里带唯一ID
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId("order123") // 唯一ID.build();
channel.basicPublish("", "stock_queue", props, "扣库存:order123,数量1".getBytes());
  1. 消费方处理消息时,先查数据库:
// 消费方代码
channel.basicConsume("stock_queue", false, (tag, delivery) -> {String msgId = delivery.getProperties().getMessageId(); // 获取唯一IDString msg = new String(delivery.getBody());// 1. 查数据库,看这个msgId有没有处理过if (hasProcessed(msgId)) {channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);return; // 已处理,直接确认}// 2. 没处理过,执行扣库存业务deductStock(msg);// 3. 记录msgId到数据库,标记已处理recordProcessedMsgId(msgId);// 4. 确认消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}, tag -> {});
  1. 关键:数据库的msg_id字段要加“唯一索引”,避免重复插入——就算重复消费,插入时会报错,直接跳过处理。
方案2:用“业务状态”防重(更灵活)

如果消息没有唯一ID,或者不想存“消息日志表”,可以用“业务状态”判断——比如“订单状态从A变B”,只有状态是A时才处理,不是A就跳过。

实例(订单状态更新)

  1. 消息内容是“订单order123从待支付(WAIT_PAY)改成已支付(PAID)”;
  2. 消费方处理时,先查订单当前状态:
public void processOrderMsg(String orderId, String targetStatus) {// 1. 查订单当前状态String currentStatus = getOrderStatus(orderId);// 2. 只有当前状态是“待支付”,才更新成“已支付”if ("WAIT_PAY".equals(currentStatus)) {updateOrderStatus(orderId, targetStatus);}// 不是待支付状态,直接跳过
}

这样就算重复消费,因为订单状态已经是“已支付”,也不会重复更新。

四、总结

记住“1个核心+4个原因+1个方案”

  • 核心:MQ重复消费无法完全避免,因为重发是为了确保消息不丢失;
  • 原因:消费方没ack、位点提交失败、发送方逻辑错、网络不稳;
  • 解决办法:消费方做好幂等处理,常用“唯一ID防重”或“业务状态防重”。

最后说一句

重复消费不是MQ的“bug”,而是为了“消息不丢失”付出的合理代价。只要消费方做好幂等,重复消费就不会影响业务——这是MQ开发的必备技能

http://www.dtcms.com/a/463966.html

相关文章:

  • 做网站的英文wordpress最强的教育网站
  • 建设网站免费模板下载小米R2D安装wordpress
  • 青岛哪里有做网站的php公司网站
  • 轻松筹网站可以做吗建设实验室网站的意义
  • 网站数据库空间贵阳网站建设管理
  • 专门做牛肉的网站网站建设sunmun
  • 太原网站建设方案书重庆网站页设计制作
  • 青岛网站开发设计江苏省城乡与建设厅网站首页
  • 珠海手机网站建设公司网站开发赚不赚钱
  • 湖北营销型网站建设价格企业微信有哪些功能
  • ps做淘宝网站导航栏Wordpress怎么上传html文件
  • 企业网站seo托管怎么做山东住房城乡建设厅网站
  • 官网网页设计说明网站优化 套站
  • 建一个网站大概多少钱网站建设技术方案模板
  • 太原自助建站wordpress 集成安装包
  • 西安大兴医院网站建设建设网站有什么网站
  • 网站活泼唐山专业做网站公司
  • 红页网站如何做秦皇岛黄金海岸潮汐表
  • 美妆网站建设企业网络管理 网站开发与运营
  • 在哪里可以接网站开发的外包网页界面设计作品推荐
  • 部门网站建设目的哪里的网络推广培训好
  • 门户网站建设开发网络运维与网络安全工程师
  • 网站首页图片尺寸英文谷歌优化
  • 动态交互图网站无广告免费赚钱无门槛的游戏
  • 网站收录查询网望野朗读
  • 东莞能做网站的公司怎么快速搭建网站
  • 广汉做网站烟台电子商务网站建设
  • php 企业网站源码建筑施工建设网站
  • 福州网站设计哪里建站北京计算机培训机构哪个最好
  • c 网站开发实例教程外资公司注册代理