redisson锁的可重入、可重试、超时续约原理详解
Redisson可重入锁原理
案例说明:
首先会去创建一个锁的对象,进行测试,在方法1中首先尝试获取锁,获取锁之后去执行业务,业务中的方法2也需要去获取锁,但是获取锁其实就是redis数据库中的setnx命令,因为是在同一线程,key相同,方法一获得锁,那么方法2就获取锁失败,这就是不可重入问题。
RLock lock = redissonClient.getLock("anyLock");@Testpublic void method1() {boolean isLock = lock.tryLock();if (!isLock) {log.error("获取锁失败,1");return;}try {log.info("获取锁成功,1");method2();} finally {//释放锁log.info("释放锁,1");lock.unlock();}}@Testpublic void method2() { boolean isLock = lock.tryLock();if (!isLock) {log.error("获取锁失败,2");return;}try {log.info("获取锁成功,2");} finally {log.info("释放锁,2");//释放锁lock.unlock();}}
解决方案:
可以参考jdk里提供的ReentrantLock(可重入锁)简单了解一下原理:可重入锁就是在获取锁的时候,当判断锁是否被获取的同时,检查获取锁的是哪个线程,如果是同一个线程的话,就让其通过获取锁,这样就需要在线程获取锁的时候添加一个计数器记录重入的次数,在有线程获取锁时就会累加,释放锁时减一,这就是可重入的基本原理。
这就需要我们在锁中不仅记录获取锁的线程,还需要记录该线程重入的次数。我们现在需要往一个key中存储两个value,那么现在的String结构就不够用了,因此我们需要换成hash类型。
具体实现原理:
在高并发多线程场景中,线程一尝试获取锁,发现没有人获取锁,则获取成功,将线程的标识记录下来,此时如果线程一中的业务执行时还需要获取锁,发现有人获得锁,这时在进行一次判断,判断得到的线程标识与锁中存储的线程标识对比,如果相同,则允许其获取锁,并且在重入次数加一,如果业务执行完成,则开始释放锁,释放锁之前需要进行判断,先将重入次数减一,在判断重入次数是否为0,如果为0,执行释放锁方法,如果没有,说明还有业务在使用锁。
业务流程:
首先判断锁是否存在(用exists命令判断),返回两种结果,存在或者不存在,如果不存在,获取锁并添加线程标识,设置有效时间,执行业务,如果锁已经存在,判断锁标识是否是同一线程,如果是同一个线程,只需要将重入次数加一,再重置有效时间,执行业务,业务完成后,需要释放锁,释放锁需要判断,需要先判断锁的线程标识是否一致,如果不一致,说明锁已经释放,如果标识一致,先将重入次数减一,再去进行判断重入次数的值,如果不为0,则证明不是最外层的业务,需要去重置锁的有效期,给后续业务执行留下充足的执行时间,如果为0,说明已经到最外层,此时可以直接释放锁。至此,业务完成。
业务流程图:
这样的逻辑使用Java代码实现无法保证其操作之间的原子性,因此要采用Lua脚本来编译。
获取锁的Lua脚本:
local key = KEYS[1]; -- 锁的keylocal value = ARGV[1]; -- 线程唯一标识local expireTime = ARGV[2]; -- 锁的过期时间-- 判断是否存在if (redis.call('exists',key) == 0)then-- 不存在,设置锁redis.call('set',key,value,'1');-- 设置锁的过期时间redis.call('expire',key,expireTime);-- 返回truereturn 1;end -- 存在,判断value是否一致if (redis.call('hexists',key,value) == 1)then--一致 ,获取锁,重入次数+1redis.call('hincrby',key,'count',1);-- 设置锁的过期时间redis.call('expire',key,expireTime);-- 返回truereturn 1;end return 0;
释放锁的Lua脚本:
local key = KEYS[1] -- 锁的keylocal value = ARGV[1] -- 线程唯一标识local expireTime = ARGV[2] -- 锁的过期时间if(redis.call('hexists',key,value) == 0)thenreturn nil -- 锁不存在,返回0end-- 是自己的锁,则重入次数-1local count = redis.call('hincrby',key,value,-1)-- 重入次数减为0,则删除锁if (count > 0) then-- 大于0,说明不能释放锁,重置有效期后返回redis.call('expire',key,expireTime)return nilelse -- 重入次数减为0,则删除锁redis.call('del',key)return nilend
对Redisson锁进行测试,看是否满足可重入锁:
@Slf4j@SpringBootTestpublic class ReentrantLockTest {@Resourceprivate RedissonClient redissonClient;private RLock lock;@BeforeEachvoid setUp(){lock = redissonClient.getLock("lock:");}@Testpublic void method1() {boolean isLock = lock.tryLock();if (!isLock) {log.error("获取锁失败,1");return;}try {log.info("获取锁成功,1");method2();log.info("执行业务逻辑,1");} finally {//释放锁log.warn("释放锁,1");lock.unlock();}}@Testpublic void method2() {boolean isLock = lock.tryLock();if (!isLock) {log.error("获取锁失败,2");return;}try {log.info("获取锁成功,2");log.info("执行业务逻辑,2");} finally {log.info("释放锁,2");//释放锁lock.unlock();}}}
进行测试:测试成功
同一个线程的两个方法成功获得锁,可重入次数变为2,并在释放锁时,第一个业务释放锁时,可重入次数减一,第二个业务释放锁时,可重入数减一,变为零。因此锁被释放
解析源码得:
根据测试案例中的tryLock来追踪redissonLock的源码:
我们选择的是较为基础的RedissonLock
继续向下追踪:
依旧是RedissonLock类
至此:
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {return this.evalWriteSyncedNoRetryAsync(this.getRawName(), LongCodec.INSTANCE, command, "if ((redis.call('exists', KEYS[1]) == 0) or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getRawName()), new Object[]{unit.toMillis(leaseTime), this.getLockName(threadId)});
}
我们发现这段硬编码就是Lua脚本,并且与我们开始写的获取锁的逻辑别无二致。
再看释放锁:同上所示,直接找到最底层,中间就不展示了:
至此:RedissonLock的可重入锁的原理解析完毕。
Redisson的锁重试和WatchDog机制
解决了不可重入的问题,但是依然还存在几个问题:
-
不可重试问题:获取锁只尝试一次就返回false,没有重试机制
-
超时释放问题:锁超时释放虽然是可以避免死锁,但如果是业务执行耗时较长,也会导致锁释放,存在安全隐患。
那么这两个问题在redisson内部是如何解决的呢?
我们还是需要去根据以上案例来跟踪源码:
我们在实例案例中的tryLock()开始使用的是空参,但是时可以传递参数的,共计三个参数,第一个参数waitTime,获取锁的最大等待时长,一旦传递出这个参数,那么在该线程第一次获取锁失败后就不会立即返回,而是在等待时间内,不断地去尝试,在指定时间过去之后,才会返回false,一旦传入第一个参数,就会变成一个可重试锁。
第二个参数:LeaseTime,锁自动失效释放的时间,最后一个参数是时间的单位。
开始跟踪源码:
继续追踪到最底层:
这种设计的巧妙之处就在于利用了消息订阅以及信号量的机制,并不是无休止的盲目等待机制,无休止的等待是对CPU的一种浪费,而现在所用的机制就是等待其他线程释放再去尝试,对CPU较为友好。
这就是redisson锁的重试机制。
锁超时机制(WatchDog机制)
如果获取锁成功后,执行事务时发生堵塞,导致ttl到期,这时其他线程发现锁过期,则重试获取锁,这样又导致了线程安全问题。
因此我们必须确保锁是因为业务执行完释放,而不能因为阻塞释放。
跟踪源码
当锁的持有时间未明确指定(leaseTime <= 0)时,Redisson 会自动启用看门狗机制。
scheduleExpirationRenewal(threadId) 的作用是:为当前线程持有的锁启动一个定时任务,定期刷新锁的过期时间,防止锁因超时而被释放
该方法用于调度锁的自动续期任务。具体功能如下: renewalScheduler.renewLock(...):调用续期调度器,对指定的锁进行自动续期;
getRawName():获取锁的原始名称;
threadId:当前持有锁的线程ID;
getLockName(threadId):生成该线程持有的锁的具体名称。
该方法用于续订指定锁的持有时间。其逻辑如下: 如果当前没有任务(reference为null),则创建一个新的LockTask任务; 获取当前的任务对象; 调用add方法将锁信息添加到任务中,用于后续自动续租。
创建一个新的LockEntry对象,并将其赋值给变量entry。 调用entry对象的addThreadId方法,将threadId和lockName添加到entry中。这可能意味着记录哪个线程持有哪个锁。 最后,调用另一个重载版本的add方法,传入rawName, lockName, threadId, 和新创建的entry对象。这个重载版本的方法可能负责将锁条目存储到某个数据结构中,比如一个映射表或集合。
使用threadId2counter.compute方法来更新与threadId关联的计数器。
compute方法接受两个参数:键(threadId)和一个BiFunction,该函数定义了如何计算新的值。
在BiFunction中: 使用Optional.ofNullable(counter).orElse(0)来获取当前计数器的值。
如果计数器不存在,则默认为0。 将计数器的值加1。
(可重入锁) 将threadId添加到threadsQueue队列中。 返回更新后的计数器值。
使用threadId2lockName.putIfAbsent方法来确保每个threadId只与一个lockName关联。 如果threadId在threadId2lockName映射中已经存在,则不会进行任何操作。
如果threadId在threadId2lockName映射中不存在,则将threadId和lockName添加到映射中。
总结来说,这个方法的主要功能是:
更新与特定线程ID关联的计数器,每次调用时计数器加1。 将线程ID添加到一个队列中。 确保每个线程ID只与一个锁名称关联,如果该线程ID尚未关联任何锁名称,则进行关联。
我也没有怎么搞懂,其实就是一旦线程成功获取锁,Redisson 会启动一个后台线程(看门狗)来监控和续期该锁。 看门狗机制会在锁的过期时间到达之前自动续期锁的过期时间。
并且看门狗机制还可以判断重入次数。并将每一个业务的锁分配好。
释放锁的原理流程图
释放锁的原理流程图
总结:
redisson分布式锁原理:
-
可重入:利用hash结构记录线程id和重入次数,当线程获取锁失败后去判断锁包含的线程id是否一致,如果一致,则让其获取锁,并且重入次数加一。在释放锁时,先去判断锁中的线程ID,再去判断重入次数是否为0,如果为0,则释放,不为0,则无视
-
可重试:利用信号量和pubsub功能实现等待、唤醒、获取锁失败的重试机制。
在第一次获取锁失败以后,并不是直接返回false,尝试获取锁的返回结果就是ttl(剩余有效时间),如果返回值为null。则获取锁成功,若不为null,则获取锁失败,
获取锁失败后会开启一个订阅功能,就是去接收其他线程释放锁时发送的消息,再进行判断,在指定时间内没有获取到释放锁的消息时则取消订阅,并且返回false.
如果成功,在去判断等待消耗的时间,如果时间超时,则返回false,如果时间还剩余,则开启while(true)循环.
进行再次尝试获取锁,其中会一直判断时间是否超时,如果再次失败,则会等待一段时间,其中如果剩余时间大于ttl时间,则等待ttl时间后再次重试,这里采用了信号量的方案,去获取其他线程释放锁后释放的信号量,获得后就会再次尝试获取锁,直到时间超时,则返回false,没有则重复循环获取。
这种设计的巧妙之处就在于利用了消息订阅以及信号量的机制,并不是无休止的盲目等待机制,无休止的等待是对CPU的一种浪费,而现在所用的机制就是等待其他线程释放再去尝试,对CPU较为友好。
-
超时续约:利用watchDog,每隔一段时间(releaseTime / 3),重置超时时间.
希望对大家有所帮助