视频素材网站怎么建建站的公司
目录
- 什么是分布式锁
- redisson的分布式锁的特点
- 可重入的实现原理
- 可重试的实现原理
- 超时续期(看门狗机制)
什么是分布式锁
分布式锁是一种锁机制,用户解决同一个共享资源被多个线程并发访问的问题,使用分布式锁可以避免并发安全,数据不一致的情况。
redisson的分布式锁的特点
- 可重入:同一个线程可以多次获取同一把锁
- 可重试:如果设置了等待时间,可以在这一段时间尝试从新获取锁
- 高可用:程序不易崩溃,时时刻刻都保证较高的可用性
- 自动续期:看门狗机制,如果没有设置过期时间,会一直续期,知道锁被释放
- 互斥性:同一把锁只有一个线程可以获取
可重入的实现原理
redisson的分布式锁,采用了hash结构来存储数据,大key表示锁的名称,小key表示被哪一个锁持有,小key的value记录着这一个锁被同一个线程获取了几次。
获取锁的大概的流程是这样的:先判断先要获取的锁是否存在,如果不存 在,则获取锁,设置超时时间,返回,如果存在,判断锁是不是自己的,如果不是,则获取失败,如果是,增加锁的持有次数,重置超时时间,返回。
获取锁流程图:
释放锁的流程:判断锁是不是自己的,如果不是,说明锁已经被释放,如果是,判断现在还有多少个地方使用到,如果只有自己,直接释放,如果还有别的地方使用到,减少持有的次数,重置过期时间,通知其他线程已经释放锁的消息。
释放锁流程图:
rediss源码的lua脚本:
获取锁:
if (redis.call('exists', KEYS[1]) == 0) then -- 判断锁是否存在,进入if表示不存在redis.call('hincrby', KEYS[1], ARGV[2], 1); -- 锁的持有数量加一redis.call('pexpire', KEYS[1], ARGV[1]); -- 设置过期时间return nil; -- 返回获取成功
end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then -- 锁已经存在,判断是不是自己的,进入if表示是自己的redis.call('hincrby', KEYS[1], ARGV[2], 1); -- 锁的持有数量加一redis.call('pexpire', KEYS[1], ARGV[1]); -- 重置过期时间return nil; -- 返回获取成功
end; return redis.call('pttl', KEYS[1]); -- 锁存在并且不是自己的,返回获取失败
释放锁:
local key = KEYS[1] -- "myLock" 锁的名称
local pubsub_key = KEYS[2] -- "lock_channel" 发布订阅的通道名称
local release_message = ARGV[1] -- "lock_released" 发布的消息
local expire_time = ARGV[2] -- 30000 锁的过期时间
local threadId = ARGV[3] -- "thread123" 持有锁的线程if (redis.call('hexists', key, threadId) == 0) then -- 判定要释放锁是否存在return nil; -- 不存在表示无需释放返回nil
end;
local counter = redis.call('hincrby', key, threadId, -1); -- 判断持有数量加一之后剩余数量是否大于0
if (counter > 0) then redis.call('pexpire', key, expire_time); -- 大于0表示还有其他地方是由锁return 0; -- 返回0表示为完全释放
else redis.call('del', key); -- 小于0表示完全释放,删除锁redis.call('publish', pubsub_key, release_message); -- 同时订阅释放锁消息的线程return 1; -- 返回1表示完全释放
end; -- 默认返回值,一般不会使用到
可重试的实现原理
大概流程是这样子的:先尝试获取锁,如果获取成功直接返回,如果获取失败,尝试重新获取,但是不会马上获取,先判断等待是否超时,如果超时返回失败,如果未超时,订阅释放锁的消息,如果在等待的时间还是订阅不到消息,则获取失败,取消订阅,返回失败,如果在等待期间获取到消息,判断是否超时,如果超时,尝试获取锁,获取成功返回,获取失败,继续订阅消息,判断是否超时,尝试重新获取,如此反复,知道获取成功或则超时。
流程图:
redisson源码
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {long time = unit.toMillis(waitTime);long current = System.currentTimeMillis();long threadId = Thread.currentThread().getId();// 尝试获取锁,方法返回两种结果,如果是null表示获取成功,如果是ttl(锁剩余时间),表示获取失败Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);// 如果成功,返回trueif (ttl == null) {return true;}// 如果失败,尝试再次获取// 如果等待时间已过,返回falsetime -= System.currentTimeMillis() - current;if (time <= 0) {// 执行获取锁失败后的逻辑acquireFailed(waitTime, unit, threadId);return false;}// 不会立刻尝试获取,刚刚获取不到,立马获取大概率获取不到current = System.currentTimeMillis();// 调用异步方法,订阅消息,如果有锁释放,尝试获取RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);// 调用await等待异步方法,两种结果,一种是等不到订阅信息,取消等于放回false,一种是收到订阅信息,继续执行if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {// 超时,订阅不到消息,取消订阅,返回falseif (!subscribeFuture.cancel(false)) {subscribeFuture.onComplete((res, e) -> {if (e == null) {unsubscribe(subscribeFuture, threadId);}});}acquireFailed(waitTime, unit, threadId);return false;}// 订阅到了消息try {// 判断是否超时time -= System.currentTimeMillis() - current;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}// 循环尝试获取while (true) {long currentTime = System.currentTimeMillis();// 尝试获取ttl = tryAcquire(waitTime, leaseTime, unit, threadId);if (ttl == null) {// 成功获取return true;}// 获取失败,判断是否超时time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}// 阻塞等待订阅信息,两种结果,一种超时放回false,一种接收到,返回true,// 但是这里的放回值没有获取,只是在这里阻塞一段时间currentTime = System.currentTimeMillis();if (ttl >= 0 && ttl < time) {// 如果锁过期时间小于等待时间,以过期时间为标准subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {// 如果锁过期时间大约等于等待时间,以等待为标准subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);}// 判断是否超时time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}}} finally {// 取消订阅unsubscribe(subscribeFuture, threadId);}
}
超时续期(看门狗机制)
如果一个锁没有设置过期时间,则在释放锁之前需要一直存在,如果不设置过期时间,可能在释放锁之前,服务宕机了,到时锁没有得到释放,所以才有设置过期时间,之后一直续期,即使服务宕机了,也会自己释放。
大概流程是这样子的,如果需要使用续期机制,在线程第一次获取锁的时候,开启看门狗,也就是定时任务,默认的过期时间是30秒,定时任务每隔10秒重置为30秒,再继续开启一个定时任务。在锁释放的时候,需要删除锁,关闭定时任务。
流程图:
rediss源码
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {if (leaseTime != -1) { // 不需要看门狗走这里return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);}// 需要开门狗走下面RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),// 默认过期时间30秒TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);ttlRemainingFuture.onComplete((ttlRemaining, e) -> {if (e != null) {return;}if (ttlRemaining == null) {scheduleExpirationRenewal(threadId); // 开启看门狗}});return ttlRemainingFuture;
}private void scheduleExpirationRenewal(long threadId) {ExpirationEntry entry = new ExpirationEntry(); // 里面可以存放一个定时任务和锁ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry); // 如果map里面有这个锁,返回null,没有则返回ExpirationEntryif (oldEntry != null) {oldEntry.addThreadId(threadId); // 已经有过,线程持有次数加一} else {entry.addThreadId(threadId); // 线程持有次数加一renewExpiration(); // 开启看门狗}
}private void renewExpiration() {ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ee == null) {return;}Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ent == null) {return;}Long threadId = ent.getFirstThreadId();if (threadId == null) {return;}RFuture<Boolean> future = renewExpirationAsync(threadId); // 重置过期时间future.onComplete((res, e) -> {if (e != null) {log.error("Can't update lock " + getName() + " expiration", e);return;}if (res) {// reschedule itselfrenewExpiration(); // 在开启一个定时任务}});}}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);ee.setTimeout(task);
}protected RFuture<Boolean> renewExpirationAsync(long threadId) {return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return 1; " +"end; " +"return 0;",Collections.singletonList(getName()),internalLockLeaseTime, getLockName(threadId));
}