Redisson 的 Watchdog 机制
为避免 Redis 实现的分布式锁超时问题,Redisson 引入了 Watchdog 机制。该机制能在 Redisson 实例关闭前持续延长锁的有效期。
主要功能
- 自动续租:当客户端获取未指定超时时间的锁时,Watchdog会基于Netty时间轮启动后台任务,定期(默认每10秒)将锁的过期时间重置为30秒(默认租约时间的1/3)。
- 续期控制:锁释放或客户端关闭时自动停止续租。
实现原理
Watchdog的核心逻辑位于scheduleExpirationRenewal方法:
protected void scheduleExpirationRenewal(long threadId) {ExpirationEntry entry = new ExpirationEntry();ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);if (oldEntry != null) {oldEntry.addThreadId(threadId);} else {entry.addThreadId(threadId);try {renewExpiration();} finally {if (Thread.currentThread().isInterrupted()) {cancelExpirationRenewal(threadId);}}}
}// 定时续期任务
private void renewExpiration() {ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ee == null) return;Timeout task = getServiceManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ent == null) return;Long threadId = ent.getFirstThreadId();if (threadId == null) return;CompletionStage<Boolean> future = renewExpirationAsync(threadId);future.whenComplete((res, e) -> {if (e != null) {log.error("Can't update lock {} expiration", getRawName(), e);EXPIRATION_RENEWAL_MAP.remove(getEntryName());return;}if (res) {renewExpiration(); // 续期成功后重新调度} else {cancelExpirationRenewal(null);}});}}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);ee.setTimeout(task);
}// 使用LUA脚本续期
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return 1; " +"end; " +"return 0;",Collections.singletonList(getRawName()),internalLockLeaseTime, getLockName(threadId));
}
关键实现点
- 通过TimerTask定时执行续期任务,默认每10秒(30s/3)执行一次
- 使用LUA脚本完成续期操作,将锁重新设为30秒
- 续期前会检查EXPIRATION_RENEWAL_MAP中是否存在对应entry,不存在则停止续期
锁释放时的处理逻辑:
@Override
public void unlock() {try {get(unlockAsync(Thread.currentThread().getId()));} catch (RedisException e) {if (e.getCause() instanceof IllegalMonitorStateException) {throw (IllegalMonitorStateException) e.getCause();}throw e;}
}@Override
public RFuture<Void> unlockAsync(long threadId) {return getServiceManager().execute(() -> unlockAsync0(threadId));
}private RFuture<Void> unlockAsync0(long threadId) {CompletionStage<Boolean> future = unlockInnerAsync(threadId);CompletionStage<Void> f = future.handle((opStatus, e) -> {cancelExpirationRenewal(threadId);// 异常处理逻辑...return null;});return new CompletableFutureWrapper<>(f);
}protected void cancelExpirationRenewal(Long threadId) {ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (task == null) return;if (threadId != null) {task.removeThreadId(threadId);}if (threadId == null || task.hasNoThreads()) {Timeout timeout = task.getTimeout();if (timeout != null) timeout.cancel();EXPIRATION_RENEWAL_MAP.remove(getEntryName());}
}
解锁流程
- 调用unlockAsync方法
- 最终执行cancelExpirationRenewal移除EXPIRATION_RENEWAL_MAP中的entry
- 确保后续不会继续续期
续期触发条件
Redisson创建分布式锁时,并非所有情况都会触发续期机制。通过分析加锁过程的代码实现可以了解续期触发的具体条件:
private RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {RFuture<Long> ttlRemainingFuture;if (leaseTime > 0) {ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);} else {ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);}CompletionStage<Long> s = handleNoSync(threadId, ttlRemainingFuture);ttlRemainingFuture = new CompletableFutureWrapper<>(s);CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {// 成功获取锁if (ttlRemaining == null) {if (leaseTime > 0) {internalLockLeaseTime = unit.toMillis(leaseTime);} else {scheduleExpirationRenewal(threadId);}}return ttlRemaining;});return new CompletableFutureWrapper<>(f);
}
重点关注第15-19行代码:仅当leaseTime <= 0时,Redisson才会触发续期机制。因此,如果在加锁时明确指定了超时时间,则不会进行自动续期。
续期终止条件
终止条件一:解锁操作
当调用锁的unlock方法时,续期机制会自动终止。核心终止逻辑如下:
protected void cancelExpirationRenewal(Long threadId) {ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (task == null) {return;}if (threadId != null) {task.removeThreadId(threadId);}if (threadId == null || task.hasNoThreads()) {Timeout timeout = task.getTimeout();if (timeout != null) {timeout.cancel();}EXPIRATION_RENEWAL_MAP.remove(getEntryName());}
}
主要通过EXPIRATION_RENEWAL_MAP.remove操作实现终止。
终止条件二:线程中断
续期机制还可能因线程中断而终止:
protected void scheduleExpirationRenewal(long threadId) {ExpirationEntry entry = new ExpirationEntry();ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);if (oldEntry != null) {oldEntry.addThreadId(threadId);} else {entry.addThreadId(threadId);try {renewExpiration();} finally {if (Thread.currentThread().isInterrupted()) {cancelExpirationRenewal(threadId);}}}
}
在初始化续期过程中,如果线程被中断,则会自动取消续期操作。
续期机制说明
-
Redisson当前未设置最大续期次数和最长续期时间的限制。正常情况下,如果未执行解锁操作,续期将持续进行。
-
续期机制基于Netty的时间轮(TimerTask、Timeout、Timer)实现,所有操作都在JVM层面执行。当应用发生宕机、下线或重启时,续期任务会自动终止,这在一定程度上可以避免因机器故障导致的锁长期不释放问题。
解锁失败,watchdog会不会一直续期下去
不会
