Java分布式锁详解
Java开发者肯定都用过Redis做分布式锁,简单来说就是利用Redis的高性能,Redis加锁(插入一条数据),如果数据存在就说明已经被其它线程锁了,业务执行完毕后就释放锁(删除数据)。
但实际使用过程中,如果并发稍大,还是会存在一些问题,那么基于这些问题来学习分布式锁。有一个很好的开源工具 Redisson
用它来实现分布式锁可以有效的解决我们自定义锁的问题。
一、使用Redis做分布式锁的问题
1、原子性加锁
加锁
if (redisTemplate.get("lockKey") == null) {redisTemplate.set("lockKey", "value");
}
上面这种方式肯定就不是原子性了,很可能出现线程一在set但还没结束,线程二已经走过了get,会导致锁失败
可以使用Redis的原子命令:
redisTemplate.set("lockKey", "value", "NX", "PX", 30000);
- NX:只有 key 不存在时才设置成功
- PX:设置过期时间,防止死锁
释放锁
redisTemplate.del("lockKey");
这个操作虽然是原子性,但存在误删除操作
- 线程 A 加锁成功
- 线程 A 超时(锁过期)
- 线程 B 成功获得锁
- 线程 A 的 finally 中执行了 del(),把 B 的锁删了!
最最直接的做法,可以使用 lua脚本,在删除的时候做一个判断(lua脚本里面的命令会原子性执行)
String lua ="if redis.call('get', KEYS[1]) == ARGV[1] then " +"return redis.call('del', KEYS[1]) else return 0 end";redisTemplate.eval(lua, Collections.singletonList("lockKey"), Collections.singletonList("value"));
Redisson 的底层就会用脚本的方式来解决原子性的问题,还提供isHeldByCurrentThread方法,在释放锁的时候要求和加锁的是同一个线程
2、代码执行超时怎么办
如果执行的过程中,并不知道要多久才可以执行完,那怎么设置超时时间合适呢?
其实并没有一个合适的时间,这时候可以用看门狗模式:比如设置锁过期时间是30s,在剩下10s的时候,它就会进行一次续费,直到你手动释放锁。
当使用Redisson加锁的时候,如果没有设置过期时间,就会默认开启看门狗模式
启动看门狗模式,它会不断的续期,直到手动释放。既然这样,那为什么不直接把超时时间设置到很大呢?比如1小时?
场景 | 你的方案 (1小时过期) | 看门狗方案 |
---|---|---|
正常执行3秒 | ✅ 3秒后释放锁 | ✅ 3秒后释放锁 |
程序异常崩溃 | ❌ 锁持有1小时 | ✅ 锁持有最多30秒 |
JVM进程被杀 | ❌ 锁持有1小时 | ✅ 锁持有最多30秒 |
网络断开 | ❌ 锁持有1小时 | ✅ 锁持有最多30秒 |
服务器重启 | ❌ 锁持有1小时 | ✅ 锁持有最多30秒 |
3、如果set时候线程断了怎么办
- 请求还没有到Redis,锁设置失败,无影响
- 请求已经到了Redis,设置了超时时间,依赖超时时间释放锁
- 请求已经到了Redis,没有设置超时时间,Redisson 会默认30s,同时启动看门狗。服务挂了,看门狗也不会续费了等 30s就释放锁
- 请求已经到了Redis,没有设置超时时间,没有使用Redisson,无法释放锁
4、锁如果在事务之前释放了会怎么办?
一般加锁的场景都会涉及到事务,如果锁在事务之前提交也会带来一些数据问题,要保证锁在事务提交之后释放
方式一:使用事务钩子
@Transactional(rollbackFor = Exception.class)
public void fun() {try {String key = "";// 获取锁if (redisUtils.get(key) == null) {// 上锁redisUtils.set(key,"",expireTime);// 业务逻辑处理}else {// 未获得锁throw new RuntimeException("未获得锁");}}catch (Exception e){// 释放锁,抛出异常redisUtils.remove(key);throw e;}finally {TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {@Overridepublic void afterCommit() {// 释放锁redisUtils.remove(key);}});}
}
方式二:使用Spring的AOP顺序
如果不是手动开启锁,而是用AOP的方式,可以设置AOP的Order顺序来控制
5、使用Redisson的其它优势
可以设置锁等待时间
如果A、B线程几乎同时进来,A线程拿到了锁,正常情况下A线程在 1s内就会执行完释放锁,如果不做特殊处理B线程是在拿不到锁就失败了,为了更好的兼容性,我们希望B线程等待一会,比如3s,自己去实现这个逻辑会很复杂。Redisson帮我们实现了等待的逻辑
boolean res = lock.tryLock(最大等待时间, 锁过期时间, TimeUnit.SECONDS);
可以防止别的线程来释放锁
为了更安全的释放锁,因此要求谁加的谁释放,而不是拿到key的任何人都可以去释放这个锁。Redisson提供了一个方法用来判断当前线程是否持有该锁 isHeldByCurrentThread
Redisson还支持可重入、公平锁、读写锁等等
二、分布式锁最佳实践
做开发的时候都希望只关注具体的业务,加锁这种操作,要尽可能的简单,那最简单的方式就是使用注解的方式。
1、自定义注解
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface DistributedLock {/*** 获取锁的最大等待时间,单位秒*/long maxWait() default 0;/*** 锁自动释放时间,单位:秒;默认releaseTime = -1,会触发redission的watch dog机制(会不断的对锁延期),保持锁定直到显式解锁*/long releaseTime() default -1;/*** 加锁的key的值,请结合业务场景设置lock key* 可以把方法上的参数绑定到注解的变量中,注解的语法#{变量名}* #{task}或者#{task.taskName}或者{task.project.projectName}*/String lockKey();
}
2、定义解析器
用来解析自定义注解上的 lockKey参数
public class DistributedLockAnnotationResolver {private static DistributedLockAnnotationResolver resolver;public static DistributedLockAnnotationResolver newInstance() {if (resolver == null) {return resolver = new DistributedLockAnnotationResolver();} else {return resolver;}}/*** 解析注解上的值*/public Object resolver(JoinPoint joinPoint, String key) {if (key == null) {return null;}// 如果name匹配上了#{},则把内容当作变量if (!key.matches("#\\{\\D*\\}")) {return key;}Object value = null;String newStr = key.replaceAll("#\\{", "").replaceAll("\\}", "");// 复杂类型if (newStr.contains(".")) {try {value = complexResolver(joinPoint, newStr);} catch (Exception e) {logger.error("解析注解上的key值失败,key:{}", key, e);}} else {value = simpleResolver(joinPoint, newStr);}return value;}private Object complexResolver(JoinPoint joinPoint, String str) throws Exception {MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();String[] names = methodSignature.getParameterNames();Object[] args = joinPoint.getArgs();String[] strs = str.split("\\.");for (int i = 0; i < names.length; i++) {if (strs[0].equals(names[i])) {Object obj = args[i];Method dmethod = obj.getClass().getMethod(getMethodName(strs[1]), null);Object value = dmethod.invoke(args[i]);return getValue(value, 1, strs);}}return null;}private Object getValue(Object obj, int index, String[] strs) throws Exception {if (obj != null && index < strs.length - 1) {Method method = obj.getClass().getMethod(getMethodName(strs[index + 1]), null);obj = method.invoke(obj);getValue(obj, index + 1, strs);}return obj;}private String getMethodName(String name) {return "get" + name.replaceFirst(name.substring(0, 1), name.substring(0, 1).toUpperCase());}private Object simpleResolver(JoinPoint joinPoint, String str) {MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();String[] names = methodSignature.getParameterNames();Object[] args = joinPoint.getArgs();for (int i = 0; i < names.length; i++) {if (str.equals(names[i])) {return args[i];}}return null;}}
3、切面实现 (重要)
public class DistributedLockAspect {private static final Logger logger = LoggerFactory.getLogger(DistributedLockAspect.class);private RedissonClient redissonClient;public Object around(ProceedingJoinPoint joinPoint, DistributedLock distributedLock) throws Throwable {String lockKey = getLockKey(joinPoint, distributedLock);RLock lock = redissonClient.getLock(lockKey);boolean res = lock.tryLock(distributedLock.maxWait(), distributedLock.releaseTime(), TimeUnit.SECONDS);if (!res) {logger.warn("lock fail, lockKey:{}", lockKey);throw new BizException(ErrorCode.REPEAT_OPERATION);}if (logger.isDebugEnabled()) {logger.debug("lock success, lockKey:{}", lockKey);}try {// 执行业务操作return joinPoint.proceed();} finally {if (lock.isLocked() && lock.isHeldByCurrentThread()) {lock.unlock();if (logger.isDebugEnabled()) {logger.debug("unlock success, lockKey:{}", lockKey);}}}}private String getLockKey(ProceedingJoinPoint joinPoint, DistributedLock distributedLock) {Object key = DistributedLockAnnotationResolver.newInstance().resolver(joinPoint, distributedLock.lockKey());String lockKeyPrefix = distributedLock.lockKeyPrefix();if (StringUtils.isNotBlank(lockKeyPrefix)) {return lockKeyPrefix + ":" + key;}String className = joinPoint.getTarget().getClass().getName();String methodName = joinPoint.getSignature().getName();//分布式加锁的key=类名+方法名+lockKeyreturn className + ":" + methodName + ":" + key;}public RedissonClient getRedissonClient() {return redissonClient;}public void setRedissonClient(RedissonClient redissonClient) {this.redissonClient = redissonClient;}
}
4、注入AOP
@Order(value = -99)
保证在事务提交之后释放锁
@Configuration
@Aspect
@Order(value = -99)
public class DistributedLockConfig {@Value("${spring.redis.password}")private String password;@Value("${spring.redis.host}")private String host;@Value("${spring.redis.port}")private String port;@Beanpublic DistributedLockAspect distributedLockAspect() {DistributedLockAspect distributedLockAspect = new DistributedLockAspect();distributedLockAspect.setBizModule("appointment");distributedLockAspect.setRedissonClient(redissonClient());return distributedLockAspect;}@Beanpublic RedissonClient redissonClient() {Config config = new Config();String redisUrl = String.format("redis://%s:%s", host + "", port + "");config.useSingleServer().setAddress(redisUrl).setPassword(password);return Redisson.create(config);}@Pointcut("@annotation(distributedLock)")public void pointCut(DistributedLock distributedLock) {}@Around("pointCut(distributedLock)")public Object around(ProceedingJoinPoint joinPoint, DistributedLock distributedLock) throws Throwable {return distributedLockAspect().around(joinPoint, distributedLock);}
}
5、使用
@Transactional(rollbackFor = Exception.class)
@DistributedLock(lockKey = "#{param.id}", releaseTime = 20)
public UserBO fun(Param param) {}