Redis分布式锁核心原理源码
文章目录
- 概述
- 一、Redis实现分布式锁
- 1.1、第一版
- 1.2、第二版
- 1.3、第三版
- 1.3、第四版
- 二、Redisson实现分布式锁核心源码分析
- 2.1、加锁核心源码
- 2.2、锁续期核心源码
- 2.3、重试机制核心源码
- 2.4、解锁核心源码
- 总结
概述
传统的单机锁(Synchronized,ReentrantLock)都是进程级别的锁,无法应对服务多实例部署的场景(每个服务实例都有自己的进程),如果需要跨进程加锁,则需要引入第三方工具对于进程统一管理。
使用Redis可以实现简易的分布式锁,而最常见的成熟的分布式锁方案是Redisson
。
一、Redis实现分布式锁
案例工程:减库存,没有加锁控制,在高并发的场景下必然会出现超卖的问题。如果是在单点部署的情况下,可以通过本地锁
解决,但是目前服务多点部署,本地锁
的方案无法进行控制。
@Service
public class DistributedLockDemo {@Resourceprivate StringRedisTemplate stringRedisTemplate;private final String STOCK_PREFIX = "stock:";private final String STOCK_LOCK = "stock:lock:";public void deduceStock(int orderNum,int orderId){//业务代码int stockNumber = Integer.parseInt(stringRedisTemplate.opsForValue().get(STOCK_PREFIX + orderId));if (stockNumber > 0){stockNumber = stockNumber - orderNum;}stringRedisTemplate.opsForValue().set(STOCK_PREFIX + orderId, String.valueOf(stockNumber));}
}
1.1、第一版
以stock:lock:
前缀加上orderId
作为key,使用setIfAbsent
进行加锁,setIfAbsent
命令是Redis原生的setNx
命令在客户端的体现,setNx
命令是仅仅当设置的key不存在时,才可以成功,保证互斥性。
这样做存在的问题是,如果在执行业务代码的过程中,出现了异常,那么解锁的代码则永远无法执行,造成死锁
@Service
public class DistributedLockDemo {@Resourceprivate StringRedisTemplate stringRedisTemplate;private final String STOCK_PREFIX = "stock:";private final String STOCK_LOCK = "stock:lock:";public void deduceStock(int orderNum,int orderId){Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent(STOCK_LOCK + orderId, "lock");if (lock){//业务代码int stockNumber = Integer.parseInt(stringRedisTemplate.opsForValue().get(STOCK_PREFIX + orderId));if (stockNumber > 0){stockNumber = stockNumber - orderNum;}stringRedisTemplate.opsForValue().set(STOCK_PREFIX + orderId, String.valueOf(stockNumber));stringRedisTemplate.delete(STOCK_LOCK + orderId);}}
}
1.2、第二版
针对第一版的问题进行改造,将解锁的代码放到finally
代码块中。这种方案依旧会存在问题,因为finally
代码块只能保证程序出错时最终执行,无法保证服务器宕机造成的死锁,所以最好在加锁时设置一个超时时间,到期自动释放。
public void deduceStock(int orderNum,int orderId){try {Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent(STOCK_LOCK + orderId, "lock");if (lock){//业务代码int stockNumber = Integer.parseInt(stringRedisTemplate.opsForValue().get(STOCK_PREFIX + orderId));if (stockNumber > 0){stockNumber = stockNumber - orderNum;}stringRedisTemplate.opsForValue().set(STOCK_PREFIX + orderId, String.valueOf(stockNumber));stringRedisTemplate.delete(STOCK_LOCK + orderId);}} catch (Exception e) {}finally {stringRedisTemplate.delete(STOCK_LOCK + orderId);}}
1.3、第三版
设置超时时间,可以使用stringRedisTemplate.expire
方法,但是这样写:
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent(STOCK_LOCK + orderId, "lock");
stringRedisTemplate.expire(STOCK_LOCK + orderId, 10, TimeUnit.SECONDS);
是不具有原子性的,需要分为两条命令执行,如果在执行两条命令之间出现问题,依旧会造成死锁的问题。在Redis的层面提供了一条命令 set NX EX
,保证设置超时时间和加锁是原子性操作:
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent(STOCK_LOCK + orderId, "lock",10, TimeUnit.SECONDS);
加锁时存在的问题看似是解决了,但是解锁的代码:
stringRedisTemplate.delete(STOCK_LOCK + orderId);
会存在一种情况:
- 线程一获取到了锁,然后在执行业务代码的时候陷入了阻塞。
- 线程一的锁到期自动释放。
- 线程二获取到了锁,执行业务代码
- 线程一从阻塞状态恢复,执行完业务代码,要执行最终的解锁逻辑
- 线程一将线程二的锁解锁。
1.3、第四版
为了避免当前线程将其他线程的锁误解锁,需要在加锁时加入自己的线程唯一标识,并且在解锁时进行判断:
注意,不要用当前Thread.currentThread().getId()
方法去获取线程ID,因为不同机器上的线程ID可能会重复。Redisson底层也不是直接用上述的API获取的线程ID,而是和UUID进行了拼接。
//加锁
String threadId = UUID.randomUUID().toString().replace("-","");
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent(STOCK_LOCK + orderId, threadId,10, TimeUnit.SECONDS);//解锁
String threadIdFromRedis = stringRedisTemplate.opsForValue().get("STOCK_LOCK + orderId");
if (threadId.equals(threadIdFromRedis)){stringRedisTemplate.delete(STOCK_LOCK + orderId);
}
但是这样写, 解锁和之前的加锁设置超时时间有同样的问题,都是操作分为了两步,不能保证原子性。 在解锁的判断上,Redis并没有提供原子性的命令,需要自己去通过lua脚本实现。
经过四版改动,自己用Redis实现的分布式锁已经基本可用了,但是深究下来依旧存在一些问题或不足:
- 如果执行业务代码的时间,超过了设置的锁超时时间,当前逻辑是没有自动续期的。
- 当前的逻辑不支持锁重入。
- 当前的逻辑没有实现重试机制,获取不到锁的线程无法进行重试。
二、Redisson实现分布式锁核心源码分析
相比较于自己通过set NX EX + lua
脚本实现的分布式缓存锁,Redisson是更为成熟的方案,也推荐在生产环境使用。Redisson分布式锁在API层面是非常简单的:
public class RedissonLockDemo {@Resourceprivate StringRedisTemplate stringRedisTemplate;@Resourceprivate Redisson redisson;private final String STOCK_PREFIX = "stock:";private final String STOCK_LOCK = "stock:lock:";public void deduceStock(int orderNum, int orderId) {//获取分布式锁RLock lock = redisson.getLock(STOCK_LOCK + orderId);try {//加分布式锁,可以指定超时时间,没有指定超时时间默认30s,底层会自动续期。lock.lock();//业务代码int stockNumber = Integer.parseInt(stringRedisTemplate.opsForValue().get(STOCK_PREFIX + orderId));if (stockNumber > 0) {stockNumber = stockNumber - orderNum;}stringRedisTemplate.opsForValue().set(STOCK_PREFIX + orderId, String.valueOf(stockNumber));} catch (Exception e) {} finally {lock.unlock();}}
}
关键代码:
//获取分布式锁
RLock lock = redisson.getLock(STOCK_LOCK + orderId);
//加分布式锁,可以指定超时时间,没有指定超时时间默认30s,底层会自动续期。
lock.lock();
//解锁
lock.unlock();
2.1、加锁核心源码
跟踪lock.lock();
,进入lockInterruptibly
:
首先第一次加锁,进入的是tryAcquire
方法,最终的核心逻辑是:
底层执行的是一段lua脚本,lua脚本和pipeline类似,也是可以将命令批量执行。虽然脚本中分了很多条命令,但是其他客户端要等到当前客户端的lua脚本全部执行完,才能执行脚本。
- KEYS[1]:是作为当前分布式锁的Key,也就是用户在
redisson.getLock
时传入的。 - ARGV[1]:是默认的超时时间,30s。
- ARGV[2]:是当前线程的唯一标识,用线程ID拼接上UUID。
# 加锁的逻辑
# 当前分布式锁的key不存在
"if (redis.call('exists', KEYS[1]) == 0) then " +
# 调用hset命令,key是分布式锁的key,value的key是当前线程的唯一标识,用线程ID拼接上UUID。 value是1(重入次数)
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
# 设置超时时间
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
# 返回null
"return nil; " +
"end; " +
# 重入的逻辑
# 当前分布式锁的key存在,并且是当前线程持有
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
# 重入次数 + 1
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
# 设置超时时间
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
# 返回null
"return nil; " +
"end; " +
# 查询指定键的剩余生存时间,并且返回
"return redis.call('pttl', KEYS[1]);"
上面这个脚本的执行,包含了可重入锁
和初次加锁
的逻辑,最终还会返回当前锁的剩余时间
。
2.2、锁续期核心源码
Redisson锁的续期,也称为看门狗机制。如果要实现锁续期,常见的设计思想是在业务线程执行时,开启一个守护线程,对业务线程进行监控,如果锁到期,业务线程还没有执行完,就执行续期的逻辑
在Redisson中的实现,调用完tryLockInnerAsync
方法后,会回调operationComplete
,通过future.getNow();
获取到加锁的结果,上面的lua脚本,在加锁成功和重入成功后,都会返回null。
进入scheduleExpirationRenewal
方法,该方法就是实现续期的核心方法实现,类似于一个延迟任务的线程池,延迟30/3 = 10s执行,整个方法分为两部分
首先依旧是执行一段lua脚本**(KEYS[1],ARGV[2],ARGV[1] 和第一段加锁时的lua脚本参数含义相同)**
# 当前分布式锁的key存在,并且是当前线程持有
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
# 进行续期30s
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
# 续期成功就返回1
"return 1; " +
"end; " +
# 否则返回 0
"return 0;",
第二部分则是拿到lua脚本执行的结果,递归调用scheduleExpirationRenewal
方法,延迟10s执行,最终的结果是每隔10s进行一次续期,每次续期30s
2.3、重试机制核心源码
加锁时的lua脚本,如果没有加锁或者重入成功,那么最终返回的是key的剩余生存时间
:
返回到方法的最外层,在lockInterruptibly
中执行自旋重试的逻辑。这里的自旋重试并非是在while循环中不断地循环,而是有一定的间隔时间。
在进入while循环后首先会再次尝试获取锁,如果失败了,就通过Semaphore
的API,在规定的TTL毫秒内尝试获取许可,如果有其他线程释放(即唤醒),当前线程就会继续执行。如果超时仍未获取到许可,则返回 false。
如果业务代码执行的时间短于设置的锁超时时间,那么其他等待锁的线程并不会阻塞到超时时间后再去竞争锁,在执行while循环之前,会通过redis的发布订阅模型
,将自身存入一个队列中。
唤醒队列中元素的逻辑,在解锁中。
2.4、解锁核心源码
解锁同样是通过lua脚本,将判断线程标识和解锁组成原子性的操作,解锁的lua脚本在unlockInnerAsync
方法中:
- KEYS[1]:当前分布式锁的key
- KEYS[2]:当前分布式锁的key 拼接上
redisson_lock__channel
- ARGV[1]:解锁消息标识,默认0L
- ARGV[2]:锁超时释放时间
- ARGV[3]:当前线程的唯一标识,用线程ID拼接上UUID。
主线程在解锁的时候会往队列中发送给一条消息,唤醒等待线程:
- 当前锁不存在,超时释放了
- 存在并且解锁成功
# 当前key对应的分布式锁不存在
"if (redis.call('exists', KEYS[1]) == 0) then " +
# 发布解锁消息标识到当前分布式锁的key 拼接上 redisson_lock__channel
"redis.call('publish', KEYS[2], ARGV[1]); " +
# 返回1
"return 1; " +
"end;" +
# 当前key对应的分布式锁非本线程持有
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
# 返回null
"return nil;" +
"end; " +
# 可重入锁的解锁,重入次数 - 1
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
# 重入次数>0
"if (counter > 0) then " +
# 重新设置超时时间
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
# 返回
"return 0; " +
"else " +
# 删除当前key对应的分布式锁
"redis.call('del', KEYS[1]); " +
# 发布解锁消息标识到当前分布式锁的key 拼接上 redisson_lock__channel
"redis.call('publish', KEYS[2], ARGV[1]); " +
# 返回1
"return 1; "+
"end; " +
"return nil;",
消费者 (正在阻塞等待的线程) 接受到了消息,会回调LockPubSub
的onmessage
方法,被唤醒然后重新争抢锁。