当前位置: 首页 > news >正文

分布式锁踩坑记:当“防重“变成了“重复“

前言: 在微服务架构中,分布式锁就像是交通红绿灯,本意是维护秩序,但如果理解错误,可能会造成更大的混乱。今天分享一个真实的生产事故:本想用分布式锁防止消息重复消费,结果却因为锁的可重入特性,反而导致了重复消费。这个案例告诉我们:技术没有银弹,只有合适的场景。

🚨 事故现场还原

业务反馈同一个单据有两条一摸一样的数据,排查日志

现象描述

2024-04-20 13:38:36 [INFO][ConsumeMessageThread_2] saveKeys,["order1"]2024-04-20 13:38:34 [INFO][ConsumeMessageThread_2] saveKeys,["order1"]

诡异现象: 同一条消息被重复消费了,而且都是 ConsumeMessageThread_2 线程在处理!

核心代码分析

MQ消费者配置:

@RocketMQMessageListener(topic = RocketMqConstants.Topic.TOPIC_A,selectorExpression = RocketMqConstants.Tags.TAG1,consumerGroup = RocketMqConstants.Group.GROUP_A,// 🔥 关键点:最大线程数设置为2consumeThreadMax = 2
)
public class TopicAConsumer extends AbstractRocketmqConsumer<OrderCreateEvent> {// ...
}

分布式锁实现:

public OrderCommonRespDTO save(Order order) {String redisKey = getRedisKey(order);RLock lock = null;try {// 🔥 关键点:2小时超时,既防并发又想防重lock = redisLockHelper.tryLock(redisKey, 0, 2, TimeUnit.HOURS);if (Objects.isNull(lock)) {throw new BussinessException(ErrorEnum.BUSINESS_ERROR,"当前单据正在生成中,请稍后重试!");}// 业务逻辑...} finally {// unlock...}
}

🕵️ 破案过程:三个"凶手"的完美配合

凶手一:Redisson的可重入特性

// Redisson分布式锁是可重入的
RLock lock = redisson.getLock("myLock");// 同一个线程可以多次获取同一把锁
lock.lock();  // 第一次获取
lock.lock();  // 第二次获取,成功!(可重入)
lock.unlock();
lock.unlock();

凶手二:过长的锁持有时间

// 2小时的锁持有时间,试图让锁既防并发又防重
lock = redisLockHelper.tryLock(redisKey, 0, 2, TimeUnit.HOURS);

凶手三:受限的线程池配置

// 消费线程池被限制为只有2个线程
consumeThreadMax = 2  // 本意是控制并发,却成了"帮凶"

💥 完美犯罪:重复消费是这样发生的

RocketMQConsumeMessageThread_2Redis Lock业务逻辑第一次消息消费消息AtryLock("key_A", 2hours)获取锁成功执行业务逻辑完成,但锁未释放(2小时超时)2分钟后,同样消息再次投递消息A(重复)tryLock("key_A", 2hours)同一线程,可重入!再次获取锁成功 😱再次执行业务逻辑重复消费发生!RocketMQConsumeMessageThread_2Redis Lock业务逻辑

🎯 核心问题:概念混淆

防并发 vs 防重复

  • 防并发(Concurrency Control):防止同一时刻多个线程同时执行
  • 防重复(Idempotency):防止同一操作被执行多次
// ❌ 错误理解:用分布式锁既防并发又防重
public void processOrder(String orderId) {RLock lock = redisson.getLock("order:" + orderId);lock.lock(2, TimeUnit.HOURS);  // 试图用长时间锁防重try {// 业务逻辑} finally {lock.unlock();}
}// ✅ 正确理解:分离关注点
public void processOrder(String orderId) {// 1. 先检查幂等性if (orderProcessedCache.exists(orderId)) {return; // 已处理过,直接返回}// 2. 使用分布式锁防并发RLock lock = redisson.getLock("order:" + orderId);lock.lock(30, TimeUnit.SECONDS);  // 短时间锁try {// 双重检查if (orderProcessedCache.exists(orderId)) {return;}// 执行业务逻辑processOrderBusiness(orderId);// 标记已处理orderProcessedCache.set(orderId, "processed", 1, TimeUnit.HOURS);} finally {lock.unlock();}
}

🛠️ 解决方案:让专业的做专业的事

🎯 方案一:分离职责

public class OptimizedSettleBillService {@Autowiredprivate IdempotentHelper idempotentHelper;@Autowiredprivate RedisLockHelper redisLockHelper;public SettleBillCommonRespDTO saveSettleBillAndSplit(SettleBill settleBill) {String businessKey = getBusinessKey(settleBill);// 🎯 第一步:幂等性检查if (idempotentHelper.isProcessed(businessKey)) {return idempotentHelper.getResult(businessKey);}// 🔒 第二步:防并发控制String lockKey = getLockKey(settleBill);RLock lock = redisLockHelper.tryLock(lockKey, 0, 30, TimeUnit.SECONDS);try {if (Objects.isNull(lock)) {throw new BussinessException("系统繁忙,请稍后重试");}// 🔄 双重检查if (idempotentHelper.isProcessed(businessKey)) {return idempotentHelper.getResult(businessKey);}// 💼 执行业务逻辑SettleBillCommonRespDTO result = doBusinessLogic(settleBill);// 💾 保存幂等性记录idempotentHelper.saveResult(businessKey, result);return result;} finally {redisLockHelper.unlock(lock);}}
}

🎯 方案二:基于数据库的幂等性

@Entity
@Table(name = "idempotent_record",uniqueConstraints = @UniqueConstraint(columnNames = "business_key"))
public class IdempotentRecord {@Idprivate Long id;@Column(name = "business_key", unique = true, nullable = false)private String businessKey;@Column(name = "result", columnDefinition = "TEXT")private String result;@Column(name = "status")private String status; // PROCESSING, SUCCESS, FAILED// getters & setters...
}
@Service
public class DatabaseIdempotentHelper {@Transactionalpublic boolean tryCreateRecord(String businessKey) {try {IdempotentRecord record = new IdempotentRecord();record.setBusinessKey(businessKey);record.setStatus("PROCESSING");record.setCreateTime(new Date());idempotentRecordRepository.save(record);return true;} catch (DataIntegrityViolationException e) {// 唯一约束冲突,说明已经在处理中return false;}}
}

📚 最佳实践总结

1. 分布式锁使用原则

// ✅ DO: 短超时,专注防并发
RLock lock = redisson.getLock(key);
lock.lock(30, TimeUnit.SECONDS);// ❌ DON'T: 长超时,试图防重
RLock lock = redisson.getLock(key);
lock.lock(2, TimeUnit.HOURS);

2. 幂等性设计模式

public class IdempotentPattern {// 模式1: Token机制public Result submitOrder(String token, OrderRequest request) {if (!validateAndConsumeToken(token)) {throw new BusinessException("重复提交");}return processOrder(request);}// 模式2: 状态机public void updateOrderStatus(String orderId, OrderStatus newStatus) {Order order = orderRepository.findById(orderId);if (!order.canTransitionTo(newStatus)) {log.warn("Invalid status transition for order {}", orderId);return; // 幂等处理}order.setStatus(newStatus);orderRepository.save(order);}
}

3. 消息队列消费最佳实践

@Component
public class MQConsumerBestPractice {@RocketMQMessageListener(topic = "order-topic",consumerGroup = "order-consumer",// 合理设置线程数consumeThreadMax = 4)public class OrderConsumer implements RocketMQListener<OrderMessage> {@Overridepublic void onMessage(OrderMessage message) {try {// 1. 参数校验validateMessage(message);// 2. 幂等性检查if (isMessageProcessed(message.getOrderId())) {log.info("Message already processed: {}", message.getOrderId());return;}// 3. 业务处理(带分布式锁防并发)processOrderWithLock(message);// 4. 标记已处理markMessageProcessed(message.getOrderId());} catch (BusinessException e) {log.error("Business error processing message: {}", message, e);// 业务异常不重试} catch (Exception e) {log.error("System error processing message: {}", message, e);throw e; // 系统异常触发重试}}}
}

✅ DO’s (推荐做法)

  1. 🎯 职责分离

    // 分布式锁 → 防并发
    // 幂等性机制 → 防重复
  2. ⏰ 合理的锁超时时间

    // 根据业务处理时间设置,通常几十秒即可
    lock = redisson.getLock(key).tryLock(30, TimeUnit.SECONDS);
  3. 🏷️ 消息级别的幂等性

    // 基于消息ID或业务唯一标识做幂等
    String idempotentKey = message.getMsgId() + ":" + businessKey;

❌ DON’Ts (避免做法)

  1. 🚫 过长的锁持有时间

    // ❌ 不要这样做
    lock.tryLock(2, TimeUnit.HOURS);
  2. 🚫 用分布式锁做幂等性控制

    // ❌ 不要依赖锁的持续时间来防重

🎨 架构设计建议

消息处理架构图

Not Processed
Already Processed
Lock Failed
MQ Message
Message Validator
Idempotent Check
Distributed Lock
Return Success
Business Logic
Mark as Processed
Release Lock
Return Busy

💡 总结与反思

核心要点

  1. 分布式锁 ≠ 幂等性保证:分布式锁专注解决并发问题,幂等性需要专门设计
  2. 可重入锁的陷阱:在线程池环境下,同一线程可能处理相同业务,可重入特性可能带来意外问题
  3. 职责分离:防并发用锁,防重复用幂等性设计
  4. 监控先行:建立完善的监控体系,及时发现问题

技术选型建议

场景推荐方案原因
防并发Redisson分布式锁成熟稳定,功能丰富
防重复Redis SETNX + 业务表唯一约束简单可靠,性能好
消息幂等Token机制 + 状态机语义清晰,易于理解

🎭 彩蛋:锁的"性格"对比

锁类型可重入性适用场景注意事项
ReentrantLock✅ 可重入单JVM内防并发同线程可多次获取
Redisson RLock✅ 可重入分布式防并发注意线程复用场景
数据库唯一约束❌ 不可重入防重复插入天然幂等性保证

🎉 写在最后

分布式锁是个好工具,但不能指望它包治百病。正如古人云:“术业有专攻”,让专业的工具做专业的事,才是王道!

📝 本文基于真实生产环境踩坑经历整理,如有类似经历,纯属英雄所见略同 😄

⭐ 如果这篇文章对你有帮助,别忘了点个赞哦!

http://www.dtcms.com/a/278030.html

相关文章:

  • JAVA并发——什么是Java的原子性、可见性和有序性
  • Redis缓存设计与性能优化指南
  • 使用Starrocks替换Clickhouse的理由
  • C++封装、多态、继承
  • 在 Ubuntu 下安装 MySQL 数据库
  • 从文本中 “提取” 商业洞察“DatawhaleAI夏令营”
  • 电路分析基础(02)-电阻电路的等效变换
  • Matlab批量转换1km降水数据为tiff格式
  • 【LeetCode100】--- 5.盛水最多的容器【复习回顾】
  • ssm学习笔记day05
  • QT 多线程 管理串口
  • 《[系统底层攻坚] 张冬〈大话存储终极版〉精读计划启动——存储架构原理深度拆解之旅》-系统性学习笔记(适合小白与IT工作人员)
  • springboot高校竞赛赛事管理系统 计算机毕业设计源码23756
  • Java行为型模式---策略模式
  • 第1章 概 述
  • dll文件缺失解决方法
  • C++——static成员
  • HiPPO: Recurrent Memory with Optimal Polynomial Projections论文精读(逐段解析)
  • QT控件命名简写
  • Linux内核高效之道:Slab分配器与task_struct缓存管理
  • 编译器优化——LLVM IR,零基础入门
  • 学习C++、QT---23(QT中QFileDialog库实现文件选择框打开、保存讲解)
  • 7月13日日记
  • 时间管理四象限理论
  • 小白学Python,操作文件和文件夹
  • 阶段性渗透总结
  • 第五章 Python手写数字识别【CNN卷积神经网络实现】
  • Windows怎样同步时间服务器?
  • 最简约的Windows多标签页文件管理器推荐 - 360文件夹 - 免费开源绿色软件推荐
  • Lucene原理