当前位置: 首页 > news >正文

幂等的几种解决方案以及实践

目录

什么是幂等?

解决幂等的常见解决方案:

唯一标识符案例

数据库唯一约束 案例

乐观锁案例

分布式锁(Distributed Locking)

实践精选方案

首先 为什么不直接使用分布式锁呢?

自定义实现幂等组件!

RepeatExecuteLimitAutoConfiguration

repeatExecuteLimitAspect防重复幂等 切面

本地锁

localLockCache是本地锁缓存,可根据锁名和锁类型(公平锁/非公平锁)来获得ReentrantLock的实例

本地锁的作用:

分布式锁

设置幂等标识的作用

注意


什么是幂等?

在web项目中,幂等性同样是一个重要的概念。这主要是因为在网络和分布式系统中,由于网络的不稳定性和其他潜在问题,可能会导致请求被重复发送。如果一个操作不是幂等的,那么重复执行该操作可能会产生不一致的结果或副作用。例如,一个非幂等的操作可能会导致数据被重复添加、更新或删除,从而破坏数据的一致性。

因此,JavaWeb项目需要保证幂等性,主要是为了确保无论请求被发送一次还是多次,系统都能产生相同的结果。这有助于避免由于重复请求导致的数据不一致或其他潜在问题。实现幂等性的方法有很多种,包括但不限于使用数据库唯一索引、乐观锁、分布式锁、令牌等技术。

总的来说,幂等性是JavaWeb项目中一个非常重要的概念,它有助于确保系统的稳定性和数据的一致性。通过实现幂等性,我们可以有效地处理重复请求,并减少由于网络不稳定或其他原因导致的潜在问题。

解决幂等的常见解决方案:

  1. 唯一标识符(Unique Identifiers): 为每个请求生成一个唯一的标识符(如UUID),并将其作为请求的一部分发送。当接收到请求时,服务器可以检查该标识符是否已处理过。如果已处理,则拒绝或忽略该请求;如果未处理,则处理该请求并记录标识符。

  2. 数据库唯一约束: 使用数据库的唯一约束(如主键或唯一索引)来确保即使多次尝试插入相同的数据,也只有一条记录会被保存。如果尝试插入重复的数据,数据库会抛出异常,服务器可以捕获这个异常并返回幂等性的响应。

  3. 乐观锁(Optimistic Locking): 使用版本号或时间戳来检查数据是否已被其他操作修改过。在更新数据时,如果版本号或时间戳与预期的不符,则拒绝更新并返回冲突信息。这样,即使多次尝试更新相同的数据,也只有一次会成功。

  4. 分布式锁(Distributed Locking): 在分布式系统中,可以使用分布式锁来确保同一时间只有一个节点能够执行某个操作。这可以防止多个节点同时处理相同的请求,从而实现幂等性。

唯一标识符案例

  • 使用 setex 命令存储ID并设置过期时间,避免内存泄漏。

  • 适用于高并发场景,如支付、订单创建等。

// 生成唯一ID(客户端)
String requestId = UUID.randomUUID().toString();// 服务端校验(基于Redis)
public class IdempotencyService {private Jedis jedis; // Redis客户端public boolean checkRequest(String requestId) {if (jedis.exists(requestId)) {return false; // 已处理,拒绝重复请求}jedis.setex(requestId, 3600, "processed"); // 存储ID并设置过期时间return true;}
}

数据库唯一约束 案例

  • 场景:用户注册防重复。

实现思路:数据库表中为关键字段(如用户名、邮箱)添加唯一索引,插入重复数据时抛出异常。

  • 数据库自动拒绝重复数据,无需额外代码判断

  • 适用于新增操作(如注册、订单号生成)

// JPA实体类定义
@Entity
@Table(name = "users", uniqueConstraints = @UniqueConstraint(columnNames = "email"))
public class User {@Id@GeneratedValue(strategy = GenerationType.IDENTITY)private Long id;private String email; // 唯一字段
}// 插入逻辑
try {userRepository.save(user); // 尝试插入
} catch (DataIntegrityViolationException e) {throw new DuplicateKeyException("邮箱已存在"); // 捕获唯一约束异常
}

乐观锁案例

场景:库存扣减防超卖。

实现思路:通过版本号字段控制并发更新,仅当版本匹配时才允许操作。

  • JPA的 @Version 注解自动管理版本号

  • 更新失败时抛出 OptimisticLockingFailureException,需重试或提示用户。

// 实体类添加版本号字段
@Entity
public class Product {@Idprivate Long id;private int stock;@Versionprivate int version; // 乐观锁版本号
}// 更新逻辑(JPA)
@Transactional
public void deductStock(Long productId, int quantity) {Product product = productRepository.findById(productId).orElseThrow();if (product.getStock() >= quantity) {product.setStock(product.getStock() - quantity);productRepository.save(product); // 自动检查版本号} else {throw new InsufficientStockException();}
}

分布式锁(Distributed Locking)

场景:分布式系统中全局资源操作(如优惠券发放)。

实现思路:使用Redisson实现分布式锁,确保同一时刻仅一个节点执行关键逻辑。

  • tryLock 设置超时时间防止死锁

  • 适用于跨服务或集群环境下的资源竞争场景

// 基于Redisson的分布式锁
public class CouponService {private RedissonClient redissonClient;public void grantCoupon(String userId) {RLock lock = redissonClient.getLock("COUPON_GRANT_" + userId);try {if (lock.tryLock(0, 10, TimeUnit.SECONDS)) { // 尝试获取锁// 执行业务逻辑(如发放优惠券)grantCouponToUser(userId);}} finally {if (lock.isHeldByCurrentThread()) {lock.unlock();}}}
}

方案适用场景优点缺点
唯一标识符高并发请求(如支付)实现简单,适合分布式环境需维护ID存储(如Redis)
数据库唯一约束数据唯一性要求(如注册)依赖数据库特性,无需额外逻辑不适用于更新操作
乐观锁低冲突更新(如库存扣减)无锁竞争,性能较高需处理版本冲突和重试逻辑
分布式锁跨服务资源竞争(如秒杀)强一致性保证实现复杂,可能影响性能

实践精选方案

除了单纯实现幂等,而是要在保证实现幂等的前提下,还要考虑高并发下的高效率执行,不能影响程序的性能和吞吐量

基于上述这些要求,最终选择利用redis来实现,而在对使用redis上,Redisson又是非常优秀的开源中间件,其中的分布式锁是非常的经典,项目中也对分布式锁做了封装,使用起来灵活而方便,而这次幂等组件也是对Redisson基础上进行封装,保证了性能,支持MQ中间件和用户请求的幂等。

首先 为什么不直接使用分布式锁呢?

为什么还要额外设计出幂等组件?首先直接使用分布式锁是可以实现幂等的,当然业务逻辑验证也要做验证,但其实分布式锁会浪费一些性能

分布式锁的特点是多个请求并发执行,这些请求是来自不同的用户,也就是这些请求虽然要依次等待锁执行,但最终还是要把这些请求都执行完的(执行时间太长超时的异常情况排除),总结起来就是都要获得锁,没有获得锁的请求,也要争取获得锁接着执行。

幂等的特点也是多个请求并发执行,但这些请求是来自同一个用户,也就是说这些请求只要保证第一个请求能执行,其余的请求要直接拒绝掉,总结起来就是只有第一个请求获得锁执行就可以,其余的请求看到已经上了锁,那么就要直接结束掉。

自定义实现幂等组件!

通过定义注解实现哦!

@Target(value = {ElementType.TYPE, ElementType.METHOD})
@Retention(value = RetentionPolicy.RUNTIME)
public @interface RepeatExecuteLimit {/*** 业务名称** @return name*/String name() default "";/*** key设置** @return key*/String[] keys();/*** 在多长时间内一直保持幂等,如果不配置则以执行方法为准*/long durationTime() default 0L;/*** 当消息执行已经出发防重复执行的限制时,提示信息*/String message() default "提交频繁,请稍后重试";}

RepeatExecuteLimitAutoConfiguration

public class RepeatExecuteLimitAutoConfiguration {@Bean(LockInfoType.REPEAT_EXECUTE_LIMIT)public LockInfoHandle repeatExecuteLimitHandle(){return new RepeatExecuteLimitLockInfoHandle();}@Beanpublic RepeatExecuteLimitAspect repeatExecuteLimitAspect(LocalLockCache localLockCache,LockInfoHandleFactory lockInfoHandleFactory,ServiceLockFactory serviceLockFactory,RedissonDataHandle redissonDataHandle){return new RepeatExecuteLimitAspect(localLockCache, lockInfoHandleFactory,serviceLockFactory,redissonDataHandle);}
}

RepeatExecuteLimitAutoConfiguration是自动装配类,用于加载需要的对象,repeatExecuteLimitHandle是锁键名处理器、repeatExecuteLimitAspect是幂等切面。

repeatExecuteLimitAspect防重复幂等 切面

 
@Slf4j
@Aspect
@Order(-11)
@AllArgsConstructor
public class RepeatExecuteLimitAspect {private final LocalLockCache localLockCache;private final LockInfoHandleFactory lockInfoHandleFactory;private final ServiceLockFactory serviceLockFactory;private final RedissonDataHandle redissonDataHandle;@Around("@annotation(repeatLimit)")public Object around(ProceedingJoinPoint joinPoint, RepeatExecuteLimit repeatLimit) throws Throwable {// 指定保持幂等的时间long durationTime = repeatLimit.durationTime();// 提示信息String message = repeatLimit.message();Object obj;// 获取锁信息LockInfoHandle lockInfoHandle = lockInfoHandleFactory.getLockInfoHandle(LockInfoType.REPEAT_EXECUTE_LIMIT);// 获取锁名String lockName = lockInfoHandle.getLockName(joinPoint,repeatLimit.name(), repeatLimit.keys());// 幂等标识String repeatFlagName = PREFIX_NAME + lockName;// 获得幂等标识String flagObject = redissonDataHandle.get(repeatFlagName);//如果幂等标识的值为success,说明已经有请求在执行了,这次请求直接结束if (SUCCESS_FLAG.equals(flagObject)) {throw new DaMaiFrameException(message);}// 获得本地锁ReentrantLock localLock = localLockCache.getLock(lockName,true);// 本地锁尝试获取boolean localLockResult = localLock.tryLock();// 如果本地锁获取失败,说明有其他请求在执行,这次请求直接结束if (!localLockResult) {throw new DaMaiFrameException(message);}try {// 获得分布式锁ServiceLocker lock = serviceLockFactory.getLock(LockType.Fair);// 尝试获取分布式锁boolean result = lock.tryLock(lockName, TimeUnit.SECONDS, 0);// 加锁成功执行if (result) {try{// 再次获取幂等标识flagObject = redissonDataHandle.get(repeatFlagName);// 如果幂等标识的值为success,说明已经有请求在执行了,这次请求直接结束if (SUCCESS_FLAG.equals(flagObject)) {throw new DaMaiFrameException(message);}obj = joinPoint.proceed();if (durationTime > 0) {try {// 业务逻辑执行成功后,设置 指定幂等保持时间 设置请求标识redissonDataHandle.set(repeatFlagName,SUCCESS_FLAG,durationTime,TimeUnit.SECONDS);}catch (Exception e) {log.error("getBucket error",e);}}return obj;} finally {lock.unlock(lockName);}}else{// 获得锁失败,说明有其他请求在执行,这次请求直接结束throw new DaMaiFrameException(message);}}finally {localLock.unlock();}}
}

RepeatExecuteLimitAspect是负责幂等执行的切面也是核心流程。

负责幂等的切面顺序要优先于分布式锁前,所以这里是-11。

这个实践中给我最大的震撼是本地锁、分布式锁、还有本地缓存的使用!

本地锁
ReentrantLock localLock = localLockCache.getLock(lockName,true);
boolean localLockResult = localLock.tryLock();
if (!localLockResult) {throw new DaMaiFrameException(message);
}

localLockCache是本地锁缓存,可根据锁名和锁类型(公平锁/非公平锁)来获得ReentrantLock的实例
public class LocalLockCache {/*** 本地锁缓存* */private Cache<String, ReentrantLock> localLockCache;/*** 本地锁的过期时间(小时单位)* */@Value("${durationTime:2}")private Integer durationTime;@PostConstructpublic void localLockCacheInit(){localLockCache = Caffeine.newBuilder().expireAfterWrite(durationTime, TimeUnit.HOURS).build();}/*** 获得锁,Caffeine的get是线程安全的* */public ReentrantLock getLock(String lockKey,boolean fair){return localLockCache.get(lockKey, key -> new ReentrantLock(fair));}
}

LocalLockCache其实是用Caffeine缓存来保存的锁信息,并可以设置锁实例的保存时间,默认是2小时,这个时间可以根据durationTime来进行配置,如果时间过大,那么锁的实例就会过多,对项目的内存就会有压力。如果时间过小,那么构建锁的频率就会增加,性能就会受到影响,使用时,可根据业务特点进行灵活配置

Caffeine是基于Java 1.8的高性能本地缓存库,由Guava改进而来,而且在Spring5开始的默认缓存实现就将Caffeine代替原来的Google Guava,官方说明指出,其缓存命中率已经接近最优值。实际上Caffeine这样的本地缓存和ConcurrentMap很像,即支持并发,并且支持O(1)时间复杂度的数据存取。二者的主要区别在于:

  • ConcurrentMap将存储所有存入的数据,直到你显式将其移除;

  • Caffeine将通过给定的配置,自动移除“不常用”的数据,以保持内存的合理占用。

因此,一种更好的理解方式是:Cache是一种带有存储和移除策略的Map。

本地锁的作用:

之所以先使用本地锁去加锁的原因是,可以很大程度上节省分布式锁的资源,虽然分布式锁是利用reids实现的,redis的性能又非常的高,但是它再高,依旧存在网络损耗,而本地锁的操作都是基于内存中,一个是内存中操作,一个是网络操作,前者的效率可是后者的几十倍差距。

如果一秒内有100个请求,服务的实例有5个,那么每个实例就有20个请求,这20个请求就可以靠本地锁来拦截掉,那么到分布式锁那里,就有5个请求来获得锁了,其余的95个请求都可以被提前结束掉。

这是一个经典的思想,优先考虑本地内存操作,经过本地内存操作后,再去操作第三方中间件。

分布式锁

当本地锁获得了锁之后,还要用分布式锁去尝试获得锁,因为本地锁只能保证当前自己的实例范围内能锁住请求,微服务多个实例部署的话,就需要分布式锁了

//加锁成功执行
if (result) {try{//再次获取幂等标识flagObject = redissonDataHandle.get(repeatFlagName);//如果幂等标识的值为success,说明已经有请求在执行了,这次请求直接结束if (SUCCESS_FLAG.equals(flagObject)) {throw new DaMaiFrameException(message);}//执行业务逻辑obj = joinPoint.proceed();if (durationTime > 0) {try {//业务逻辑执行成功 并且 指定了设置幂等保持时间 设置请求标识redissonDataHandle.set(repeatFlagName,SUCCESS_FLAG,durationTime,TimeUnit.SECONDS);}catch (Exception e) {log.error("getBucket error",e);}}return obj;} finally {lock.unlock(lockName);}
}else{//获取锁失败,说明已经有请求在执行了,这次请求直接结束throw new DaMaiFrameException(message);
}

当通过分布式锁工厂获取到客户端实例后,就会尝试去获取分布式锁了,如果加锁失败,说明之前已经有请求获得了锁在执行中没有释放掉,那么这次请求直接结束

如果加锁成功则执行业务逻辑joinPoint.proceed()

  • 如果执行业务逻辑成功,如果设置了幂等保持时间,那么设置幂等标识

  • 如果执行业务逻辑失败,那么直接释放锁

设置幂等标识的作用

有的小伙伴可能会好奇,为什么要设置幂等标识?直接使用本地锁+分布式锁不就可以实现幂等了吗?为什么多此一举?只使用本地锁+分布式锁的方法确实能实现幂等,但此项目是为了高并发的,考虑的细节要全面。

幂等包括用户请求幂等和MQ消息幂等

要介绍这两种的特点

  • 用户请求的幂等特点是用户在短时间内多次的点击,比如说一秒内发出了10次请求,那么就第1次请求正常执行,而剩余9次请求要全部被拦截将请求直接结束掉,特点是短时间内的多次请求

  • MQ消息幂等特点是MQ为了保证消息的可靠性。在有些异常情况确实会重复投递的,比如说 某个服务监听到了消息,接着执行业务逻辑,但在执行过程中,这个服务宕机了,没有给MQ发送消息提交机制,MQ就会认为消息没有消费成功,就会再次投递。但其实这个逻辑都执行成功了,就差给MQ提交确认了。

这就需要保证幂等。而MQ的重试就没有用户多次重复请求那么频繁,可能会1分钟 5分钟 10分钟,这种情况就需要幂等标识,当有了标识后逻辑直接结束。

注意

有的小伙伴刚接触开发,对并发产生的细节问题不太清楚,比如幂等和分布式锁,直接使用就觉得没有问题了,但其实并不是这样,我们来举一个例子,比如说添加用户的操作

  1. 请求获得锁得到执行

  2. 开启事务

  3. 向数据库中添加用户

  4. 提交事务

这个流程其实是有问题的,比如说第一个请求添加了用户后释放了锁,第二个请求是重复提交的,也添加了用户,这两个用户就是重复的。所以要在第2步后,要有验证用户是否存在的步骤,这个属于业务验证,需要业务来实现,组件只能防止并发重复,并不能防止业务重复。正确的流程:

  1. 请求获得锁得到执行

  2. 开启事务

  3. 查询用户是否已存在

  4. 向数据库中添加用户

  5. 提交事务

相关文章:

  • Memgraph 的安装教程
  • node.js 实战——在express 中将input file 美化,并完成裁剪、上传进度条
  • uni-pages-hot-modules插件:uni-app的pages.json的模块化及模块热重载
  • python实现的音乐播放器
  • 【Pandas】pandas DataFrame abs
  • 无实体对话式社交机器人 拟人化印象形成机制:基于多模态交互与文化适配的拓展研究
  • 使用ESPHome烧录固件到ESP32-C3并接入HomeAssistant
  • 使用pytorch保存和加载预训练的模型方法
  • 基于Transformer的多资产收益预测模型实战(附PyTorch实现与避坑指南)
  • OpenHarmony平台驱动开发(九),MIPI DSI
  • 如何使用npm下载指定版本的cli工具
  • 【MySQL】存储引擎 - MyISAM详解
  • FPGA_Verilog实现QSPI驱动,完成FLASH程序固化
  • [ctfshow web入门] web57
  • 到达最后一个房间的最少时间II 类似棋盘转移规律查找
  • QTDesinger如何给label加边框
  • Java后端程序员学习前端之JavaScript
  • k8s的pod挂载共享内存
  • Mysql-OCP PPT课程讲解并翻译
  • 数据结构 - 9( 位图 布隆过滤器 并查集 LRUCache 6000 字详解 )
  • 金地集团:今年前4个月实现销售额109.3亿元,同比下降52.44%
  • 溢价26.3%!保利置业42.4亿元竞得上海杨浦宅地,楼板价80199元/平方米
  • 招行:拟出资150亿元全资发起设立金融资产投资公司
  • 上海将发布新一版不予行政处罚清单、首份减轻行政处罚清单
  • AI智能体,是不是可以慢一点? | ToB产业观察
  • 全军军级以上单位新任纪委书记监委主任培训班结业