Redission
前面我们说的基于redis `setnx`
实现的的分布式锁会存在下述问题:
-
不可重入问题:获得锁的线程可以再次进入到相同的锁的代码块而不会死锁就叫可重入
-
不可重试:之前基于redis实现的分布式锁只能尝试一次,我们希望当前线程在获取锁失败后,能在我们允许大的访问内再次尝试获取锁
-
超时释放:过期时间虽然可以避免死锁的产生,但是也可能由于业务执行时间过长而超时释放锁,进而出现锁的误删问题
-
主从一致性:数据从主机异步的同步到从机的过程中,由于主机宕机产生问题
Redisson 是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。
redission-分布式锁的快速入门
- 引入依赖
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.13.6</version>
</dependency>
- 配置redission客户端
@Configuration
public class RedissonConfig {@Beanpublic RedissonClient redissonClient(){// 配置redisConfig config = new Config();config.useSingleServer().setAddress("redis://192.168.150.101:6379")//指定redis的ip和port.setPassword("123456");//访问redis的密码// 创建RedissonClient对象return Redisson.create(config);}
}
- 使用redission中的分布式锁
@Resource//注入RedissionClient
private RedissionClient redissonClient;@Test
void testRedisson() throws Exception{//获取锁(可重入),指定锁的名称RLock lock = redissonClient.getLock("anyLock");//尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位boolean isLock = lock.tryLock(1,10,TimeUnit.SECONDS);//判断获取锁成功if(isLock){try{System.out.println("执行业务"); }finally{//释放锁lock.unlock();}}
redission-分布式锁原理
可重入锁原理
我们知道
`synchronized{...}`
底层是借助计数器count来记录重入状态的,当没有人持有这把锁时,count == 0;当有人持有这把锁时,count == 1,如果持有这把锁的人再次申请这把锁,count += 1,当释放这把锁时count -= 1,当count == 0时锁才会真正释放。
Redission 实现可重入锁的原理与`synchronized{}`
类似,不过它是采用hash结构来存储锁,其中key表示这把锁,field表示这把锁被那个线程所持有,value表示这个线程重入这把锁的次数。
前置声明:
KEYS[1] : 锁名称
ARGV[1]: 过期时间
ARGV[2]: id + “:” + threadId; (field,线程标识)
接下来让我们通过redission底层获取锁的源码来分析一下可重入的实现机制:
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {internalLockLeaseTime = unit.toMillis(leaseTime);return evalWriteAsync(getName(), LongCodec.INSTANCE, command,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return redis.call('pttl', KEYS[1]);",Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
当我们调用了这个方法时,我们会判断当前锁是否存在,如果不存在,则获取该锁并添加当前线程标识,设置锁的过期时间;如果存在,则判断当前锁持有者是不是自己,如果不是则返回锁的剩余存活时间,如果是则将当前锁的value+1,然后再重置过期时间。
超时续约-WatchDog机制
在申请锁的过程中,当我们没有指定锁的过期时间时,redission底层会默认走WatchDog的过期时间,如下:
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {if (leaseTime != -1) {return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);}//当没有指定时间会走默认的看门狗实现RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);ttlRemainingFuture.onComplete((ttlRemaining, e) -> {if (e != null) {return;}// lock acquiredif (ttlRemaining == null) {//实现锁的超时续约scheduleExpirationRenewal(threadId);}});return ttlRemainingFuture;
}
private void scheduleExpirationRenewal(long threadId) {ExpirationEntry entry = new ExpirationEntry();ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);if (oldEntry != null) {oldEntry.addThreadId(threadId);} else {entry.addThreadId(threadId);renewExpiration();}
}
private void renewExpiration() {ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ee == null) {return;}Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ent == null) {return;}Long threadId = ent.getFirstThreadId();if (threadId == null) {return;}RFuture<Boolean> future = renewExpirationAsync(threadId);future.onComplete((res, e) -> {if (e != null) {log.error("Can't update lock " + getName() + " expiration", e);return;}if (res) {// reschedule itselfrenewExpiration();}});}}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);ee.setTimeout(task);
}
protected RFuture<Boolean> renewExpirationAsync(long threadId) {return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return 1; " +"end; " +"return 0;",Collections.singletonList(getName()),internalLockLeaseTime, getLockName(threadId));
}
当我们的线程宕机后,他就不会继续进行超时续约了,因为此时没有人能够调用renewExpiration实现超时续约功能。
锁重试原理
前面我们在赘述redission实现可重入机制的时候,发现底层是调用lua脚本来申请锁,这个脚本逻辑如下:
-
先判断锁是否存在,不存在则申请锁成功,返回null
-
锁存在则判断该锁是否属于该线程,如果是,则返回null;如果不是则获取锁失败,返回锁的剩余存活时间(TTL)
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {long time = unit.toMillis(waitTime);long current = System.currentTimeMillis();long threadId = Thread.currentThread().getId();Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return true;}time -= System.currentTimeMillis() - current;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}current = System.currentTimeMillis();RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {if (!subscribeFuture.cancel(false)) {subscribeFuture.onComplete((res, e) -> {if (e == null) {unsubscribe(subscribeFuture, threadId);}});}acquireFailed(waitTime, unit, threadId);return false;}try {time -= System.currentTimeMillis() - current;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}while (true) {long currentTime = System.currentTimeMillis();ttl = tryAcquire(waitTime, leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return true;}time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}// waiting for messagecurrentTime = System.currentTimeMillis();if (ttl >= 0 && ttl < time) {subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);}time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}}} finally {unsubscribe(subscribeFuture, threadId);}
// return get(tryLockAsync(waitTime, leaseTime, unit));}
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
我们在外层接收这个TTL,通过这个TTL判断是否申请锁成功(返回null表示获取锁成功,否则获取锁失败)。
当获取锁失败时,redission实现会重新获取剩余等待时间,并在有效剩余等待时间内尝试重新申请锁。
//获取剩余等待时间
time -= System.currentTimeMillis() - current;//判断等待时间是否有效
if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;
}current = System.currentTimeMillis();
RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
//在剩余有效时间内等待锁持有线程的释放(订阅锁释放信息)
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {if (!subscribeFuture.cancel(false)) {subscribeFuture.onComplete((res, e) -> {if (e == null) {unsubscribe(subscribeFuture, threadId);}});}acquireFailed(waitTime, unit, threadId);return false;
}try {//锁持有线程释放锁后,判断等待时间是否有效time -= System.currentTimeMillis() - current;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}while (true) {long currentTime = System.currentTimeMillis();//等待时间有效,并且锁持有线程已经释放锁,重新尝试获取锁ttl = tryAcquire(waitTime, leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return true;}time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}// waiting for messagecurrentTime = System.currentTimeMillis();//订阅锁释放信息if (ttl >= 0 && ttl < time) {subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);}time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}}
} finally {//取消订阅unsubscribe(subscribeFuture, threadId);
}
MutiLock连锁原理
redis的主从集群中,在主机同步数据到从机的过程中,由于主机宕机而导致锁信息同步丢失进而产生问题。
redission中为了解决这个问题,提出了MutiLock锁的概念,只有对集群中的多个节点都申请锁成功时,才算加锁成功;当某个节点宕机了,在获取锁时,只要有一个节点拿不到,都不算获取锁成功,这样就大大保证了加锁的可靠性。
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
// try {
// return tryLockAsync(waitTime, leaseTime, unit).get();
// } catch (ExecutionException e) {
// throw new IllegalStateException(e);
// }long newLeaseTime = -1;if (leaseTime != -1) {if (waitTime == -1) {newLeaseTime = unit.toMillis(leaseTime);} else {newLeaseTime = unit.toMillis(waitTime)*2;}}long time = System.currentTimeMillis();long remainTime = -1;if (waitTime != -1) {remainTime = unit.toMillis(waitTime);}long lockWaitTime = calcLockWaitTime(remainTime);int failedLocksLimit = failedLocksLimit();List<RLock> acquiredLocks = new ArrayList<>(locks.size());for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {RLock lock = iterator.next();boolean lockAcquired;try {//尝试获取连锁中每一把锁if (waitTime == -1 && leaseTime == -1) {lockAcquired = lock.tryLock();} else {long awaitTime = Math.min(lockWaitTime, remainTime);lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);}} catch (RedisResponseTimeoutException e) {unlockInner(Arrays.asList(lock));lockAcquired = false;} catch (Exception e) {lockAcquired = false;}if (lockAcquired) {//获取连锁中的锁成功acquiredLocks.add(lock);} else {//获取锁失败if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {break;}if (failedLocksLimit == 0) {unlockInner(acquiredLocks);if (waitTime == -1) {//判断是否重试获取return false;}failedLocksLimit = failedLocksLimit();//释放之前成功获取的所有锁acquiredLocks.clear();// reset iteratorwhile (iterator.hasPrevious()) {iterator.previous();}} else {failedLocksLimit--;}}//还有等待时间就继续重试获取锁if (remainTime != -1) {remainTime -= System.currentTimeMillis() - time;time = System.currentTimeMillis();if (remainTime <= 0) {unlockInner(acquiredLocks);return false;}}}if (leaseTime != -1) {List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());for (RLock rLock : acquiredLocks) {//如果有指定过期时间需要手动更新过期时间RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);futures.add(future);}for (RFuture<Boolean> rFuture : futures) {rFuture.syncUninterruptibly();}}return true;}