当前位置: 首页 > news >正文

从Redisson源码角度深入理解Redis分布式锁的正确实现

文章目录

  • 1. 为什么使用分布式锁
  • 2. 简单实现分布式锁
  • 3. 防止误删的分布式锁
  • 4. 自动锁续命的分布式锁
  • 5. 深度理解Redisson底层逻辑
    • 5.1. 大概逻辑
    • 5.2. 获取锁的实现
    • 5.3. 看门狗的实现
    • 5.4. 等待锁的实现


1. 为什么使用分布式锁

拿实际开发商品库存的例子来说,我们在开发中进行删减库存可能会写出这样的代码:

但是这样无疑会出现并发问题

如图所示,两个请求进行删减库存, 最后数据库存储的数据却只删减了一次

解决这个问题的直接方法就是加锁

但是加什么锁合适? 直接使用Java的synchronized行不行? 就像这样

@Service
public class ProductService {@Autowiredprivate StringRedisTemplate stringRedisTemplate;public void deductStock(String productId) {int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock:" + productId));synchronized (this) {if (stock > 0) {int realStock = stock - 1;stringRedisTemplate.opsForValue().set("stock:" + productId, realStock + "");System.out.println("扣除成功,剩余库存:" + realStock);}else {System.out.println("扣除失败,库存不足");}}}
}

在单机环境下确实能解决多线程并发问题

但是现在的互联网架构大多都是集群架构,也就是不止一台机器,而synchronized只在单个JVM进行内有效,多台服务器部署时无法跨进程同步.

同时,由于针对的是this加锁,那么一旦别的线程删减库存的商品id与获取锁的线程不一致,也会进行阻塞,但是这是没必要的,我们只需要针对一个商品id的操作进行加锁即可

因此我们需要引入分布式锁,本质上就是使用一个公共服务器来记录加锁状态

这个公共的服务器可以是Redis,也可以是其他组件(如Mysql或者ZooKeeper等) 还可以是我们自己写的一个服务

2. 简单实现分布式锁

redis中提供了setnx命令, 当且进仅当key不存在的时候,将key的值设置为value, 如果key已经存在,不做任何操作

与我们锁实现的功能基本一致

因此我们可以使用这个命令来针对某个productId进行加锁

    public String deductStock(String productId) {String lockKey = "lock:product:" + productId;boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey,"lock");if(!result) {return "waiting";}int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock:" + productId));if (stock > 0) {int realStock = stock - 1;stringRedisTemplate.opsForValue().set("stock:" + productId, realStock + "");System.out.println("扣除成功,剩余库存:" + realStock);}else {System.out.println("扣除失败,库存不足");}stringRedisTemplate.delete(lockKey);return "success";}

但是这样的做法会出现死锁问题

最简单直接的现象就是,如果某个线程获取锁后,执行删减库存的过程中,出现了异常, 那么锁将无法释放

因此我们需要将释放锁的步骤放在finally中,确保锁能释放

  public String deductStock(String productId) {String lockKey = "lock:product:" + productId;boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey,"lock");if(!result) {return "waiting";}try {int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock:" + productId));if (stock > 0) {int realStock = stock - 1;stringRedisTemplate.opsForValue().set("stock:" + productId, realStock + "");System.out.println("扣除成功,剩余库存:" + realStock);}else {System.out.println("扣除失败,库存不足");}}finally {stringRedisTemplate.delete(lockKey);}return "success";}

但是,如果在执行删减的过程中,服务器突然宕机了呢?? finally也不能确保执行到

因此为了保险,我们需要加上过期时间,并且需要通过原子命令进行,确保加锁的同时设置过期时间

3. 防止误删的分布式锁

我们上面实现的加上过期时间的分布式锁,实际上还会出现误删的问题,如下:

由于线程1设置的锁提前过期,导致线程2可以占据锁,但是线程1业务执行完后, 又将锁给删除了,但是实际上线程1删除的锁是线程2后来设置的锁, 导致线程2设置的锁直接失效了

此时别的线程又可以占据锁, 而线程2执行完业务逻辑后又将锁给删除了, 由此反复,可能导致锁一直不生效

解决这个方法的核心就是,确保当前线程删除的锁是自己设置的锁, 不能释放别的线程设置的锁

我们可以使用UUID作为唯一标识

能不能使用ThreadId作为唯一标识?? 当然不行,最简单的问题就是, 分布式环境下,不同机器的多个线程id之间可能是重复的

但是很容易发现,释放锁的步骤,不是原子的

这种不是原子的操作,在极端的场景下很可能出现问题

我们将删除锁的步骤单独拿出来说

但是在极端情况下:

也就是由于获取锁对应的value进行判断, 和删除锁, 由于不是原子操作,导致中间如果出现其他情况(如卡顿) , 而此时锁更好过期, 别的线程又刚好获取到锁, 此时线程1就把别的线程的锁释放掉了

究其原因就是 释放锁的步骤不是原子的

但是redis又没有原子的命令能够保证这两个步骤是原子的

实际上通过lua脚本就可以实现(文章后面会讲)

4. 自动锁续命的分布式锁

我们上面的误删问题,本质上还是因为在业务逻辑还没执行完之前, 锁就释放了

那如果我们可以让业务逻辑执行完之前,不断让锁进行续命, 也就是增加超时时间, 不让锁提前过期

那如果直接增加超时时间,如从10s提升到30s? 实际上这样不能从根本上解决问题,因为业务执行的时间是不确定的

因此我们可以在执行业务逻辑的同时,通过后台线程定时去判断锁是否还存在(这个定时时间一定要小于超时时间)

如果还存在,说明业务逻辑还没执行完, 那么就要进行续命

这就是看门狗机制(Watchdog)的思想

会不会出现业务机器宕机,导致一直续命?? 实际上续命的线程和业务机器是同一台,因此当业务机器宕机了, 看门狗自然也会消失

我们当然可以自己实现Watchdog解决方案,但是市面上已经存在很多成熟的开源方案

Redisson就是一个热门完善的实现方案,是一个基于Java的Redis客户端,专门用于分布式场景的Redis操作,如分布式对象,分布式集合,分布式锁等

引入依赖:

<dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.50.0</version>
</dependency>

现在我们通过Redisson来使用分布式锁

@Autowiredprivate StringRedisTemplate stringRedisTemplate;@Autowiredprivate RedissonClient  redissonClient;public String deductStock(String productId) {String lockKey = "lock:product:" + productId;RLock lock = redissonClient.getLock(lockKey);lock.lock();try {int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock:" + productId));if (stock > 0) {int realStock = stock - 1;stringRedisTemplate.opsForValue().set("stock:" + productId, realStock + "");System.out.println("扣除成功,剩余库存:" + realStock);}else {System.out.println("扣除失败,库存不足");}}finally {lock.unlock();}return "success";}

5. 深度理解Redisson底层逻辑

redisson一个简单的lock,帮我们实现了阻塞等待, 看门狗续命,防误删等操作

我们先来看他的大概逻辑

5.1. 大概逻辑

5.2. 获取锁的实现

接下来我们看源码,直接查看Redisson中lock的实现方法

可以看到,如果我们不传参数leaseTime那么默认就是-1

第二个lock方法再次调用了重载的lock方法

最核心的方法就是这里的tryAcquire方法

这里传进的waiting是-1,threadId是当前线程的id

接着一层层往下走,就会找到这个方法

由于我们的leaseTime是-1,那么我们执行else方法的逻辑

可以看到,与if分治不同的地方在于,leastTime被替换成了别的变量

一层一层往上找,就能发现这个变量的值默认为30秒

所以我们接下来的重心就在与tryLockInnerAsync方法

这些命令实际上就是我们上面提了一嘴的lua脚本

Lua 的语法类似于 JS, 是一个动态弱类型的语言. Lua 的解释器一般使用 C 语言实现. Lua 语法

简单精炼, 执行速度快, 解释器也比较轻量(Lua 解释器的可执行程序体积只有 200KB 左右).

因此 Lua 经常作为其他程序内部嵌入的脚本语言. Redis 本身就支持 Lua 作为内嵌脚本.

最重要的是,一个 lua 脚本会被 Redis 服务器以原子的方式来执行.

我们先来看参数的对应关系

实际上我们逻辑就类似于

public Long execute(long expireTime,String key,String hKey) {if (!redisTemplate.hasKey(hKey) || redisTemplate.opsForHash().hasKey(key, hKey)) {redisTemplate.opsForHash().increment(key, hKey, 1);redisTemplate.expire(key, expireTime, TimeUnit.MILLISECONDS);return null;}else {return redisTemplate.getExpire(key);}
}

但是通过Lua脚本实现就是原子的

这里的key就是源代码里的getRawName方法的返回结果,实际上也就对应着我们一开始传进来的key,也就是我们的锁

而ARG[2],也就是我们自己实现的hKey,是getLockName方法的返回结果,实际上就是一个随机值

我们先可以简单理解, 这个脚本就是尝试去加锁,只是加的锁是通过hash结构存储的(至于为什么后面会讲)

这个锁的超时时间是30s,如果我们自己指定那就是我们指定的值

这里要注意,如果我们获取锁成功,那么返回值就是null, 如果获取失败,那么返回值就是这个锁的剩余时间

5.3. 看门狗的实现

我们回到这个方法调用的地方

通过方法名也可以看出,这个方法实际上是异步执行的

当异步执行完后,会执行这一部分区域的内容

我们上面说过,如果获取锁成功, 返回值就是null

因此我们会进入第一个if分支,又由于leaseTime是-1, 因此会执行scheduleExpirationRenewal方法

从方法名可以猜出, 这个方法就是重置超时时间, 实际上也就对应着看门狗的逻辑!

这里可以看出,看门狗的定时任务逻辑就封装在LockTask中

并且这里的add相当于开启了一个定时任务

我们来看看这个schedule方法

此时会通过<font style="color:rgba(0, 0, 0, 0.85);">schedule</font>方法设置第一个定时任务,间隔为<font style="color:rgba(0, 0, 0, 0.85);">internalLockLeaseTime / 3</font>,如果是默认值,也就是10s

定时任务的<font style="color:rgba(0, 0, 0, 0.85);">run</font>方法(<font style="color:rgba(0, 0, 0, 0.85);">TimerTask</font>接口的核心方法)会在定时时间到达后被执行,执行完成后会再次调用<font style="color:rgba(0, 0, 0, 0.85);">schedule</font>方法,形成循环:

这是定时任务的执行,那么定式任务具体做了什么呢??? 我们要回到添加定时任务前,也就是下面这个方法

定时任务的逻辑就封装在这里面

也就是这个方法

我们来看看下面的Lua脚本

简单说,这个循环的目的是遍历所有传入脚本的键(KEYS 数组中的每个键),对每个键执行后续的<font style="color:rgba(0, 0, 0, 0.85);">hexists</font><font style="color:rgba(0, 0, 0, 0.85);">pexpire</font>操作。也就是实现了批量key的续命

到此,Watchdog的逻辑我们大概就能理解了

5.4. 等待锁的实现

回到一开始的地方,如果我们的线程没有抢占到锁呢??

我们上面讲过会得到这会锁剩余的超时时间,那么接下来会执行什么逻辑??

上面我们讲过,如果ttl是null,那么就说明获取锁成功

因此如果我们获取锁失败,执行的是下面的逻辑

我们来看最主要的while循环

这里实际上就得到了一个信号量

Semaphore维护了一个计数器,线程可以通过调用acquire()方法来获取Semaphore中的许可证,当计数器为0时,调用acquire()的线程将被阻塞,直到有其他线程释放许可证;线程可以通过调用release()方法来释放Semaphore中的许可证,这会使Semaphore中的计数器增加,从而允许更多的线程访问共享资源。

简单来说,如果线程没有加锁成功, 那么就会通过Semaphore进行阻塞ttl的时间,这段时间内是不会阻塞CPU的,

等到ttl到了以后,再通过while循环再次尝试获取锁,由此不断的间歇性的抢占锁

那么可能到这里就会有疑问,如果锁提前释放了呢? 也是会傻等ttl的时间吗??? 那性能也太差了. 实际上不是这样的

实际上,在并发编程里,如果这里阻塞住,一定会有其他地方会在一定条件下唤醒这个线程

实际上就在我们上面的逻辑中

实际上这里就是使用了redis的发布订阅的功能

简单来说,这些没抢到锁的线程, 会在这里订阅了一个频道,相当于进行了监听

这个类实际上就是对发布订阅的封装

那这个频道什么时候会通知这个线程呢?? 可想而知肯定是在解锁的逻辑中

我们通过伪代码来解释

/*** 分布式锁解锁核心逻辑(伪代码)* @param threadId 当前线程ID* @param requestId 本次请求标识(用于生成解锁信号键)* @param timeout 超时时间(毫秒)* @return 异步结果:1=完全解锁,0=部分解锁,null=解锁失败*/
protected CompletableFuture<Boolean> unlockInnerAsync(long threadId, String requestId, int timeout) {// 1. 定义关键参数String lockKey = getRawName(); // 锁的主键(哈希表)String channelKey = getChannelName(); // 通知频道(用于发布解锁消息)String unlockLatchKey = getUnlockLatchName(requestId); // 解锁信号键(用于同步)String threadLockId = getLockName(threadId); // 当前线程的锁标识(如"threadId:uuid")long leaseTime = internalLockLeaseTime; // 锁的默认租期String publishCmd = getSubscribeService().getPublishCommand(); // 发布命令("PUBLISH")String unlockMsg = LockPubSub.UNLOCK_MESSAGE; // 解锁消息内容// 2. 执行核心逻辑return CompletableFuture.supplyAsync(() -> {// 2.1 检查解锁信号状态(对应Lua中"get KEYS[3]")String latchVal = redis.get(unlockLatchKey);if (latchVal != null) {return Long.parseLong(latchVal) > 0; // 若有信号,返回信号值}// 2.2 验证是否是当前线程持有锁(防止误删)boolean hasLock = redis.hexists(lockKey, threadLockId);if (!hasLock) {return null; // 未持有锁,解锁失败}// 2.3 重入计数器减1(对应Lua中"hincrby -1")long counter = redis.hincrBy(lockKey, threadLockId, -1);if (counter > 0) {// 2.4 部分释放(仍有重入):更新过期时间,标记部分释放redis.pexpire(lockKey, leaseTime);redis.set(unlockLatchKey, "0", "PX", timeout); // 标记"部分释放"return false; // 对应原逻辑返回0} else {// 2.5 完全释放:删除锁,发布解锁消息redis.del(lockKey); // 删除锁redis.executeCommand(publishCmd, channelKey, unlockMsg); // 通知等待线程redis.set(unlockLatchKey, "1", "PX", timeout); // 标记"完全释放"return true; // 对应原逻辑返回1}});
}

这里可以看到, 如果hash中对应的counter > 0, 那么就是-1, 实际上就是实现了可重入锁

因此可以回到我们一开始加锁的lua脚本上,如果当前hash的hKey存在,那么就在原来的基础上加1

解锁的时候也是, 只是进行-1, 直到为0 ,才代表完全释放锁

这里同时也解决了我们上面留下的一个问题, 就是我们自己写的代码,在释放锁的时候不是原子的

那么将释放锁的逻辑放到lua脚本之后, 就能作为一个原子操作,自然不会出现上面的问题!

因此,当锁释放后,会释放出一个信号, 通知等待的线程, 当前锁释放了,可以去尝试获取锁了

那么接收到消息的线程会执行什么逻辑呢?

我们来看看上面发布订阅类的onMessage方法

至此,形成了一个完美的闭环!!

http://www.dtcms.com/a/324571.html

相关文章:

  • JetPack系列教程(三):Lifecycle——给Activity和Fragment装个“生命探测仪“
  • redis主从模型与对象模型
  • Beelzebub靶机练习
  • 代码随想录算法训练营第五十九天|图论part9
  • 下一代防火墙总结
  • 【软考中级网络工程师】知识点之 PPP 协议:网络通信的基石
  • Stlink识别不到-安装驱动
  • Hutool-RedisDS:简化Redis操作的Java工具类
  • 【Python 小脚本·大用途 · 第 1 篇】
  • 在VMware中安装统信UOS桌面专业版
  • Python 的浅拷贝 vs 深拷贝(含嵌套可变对象示例与踩坑场景)
  • 基础算法(11)——栈
  • 【3D图像技术分析与实现】CityGaussianV2 工作解析
  • log4cpp、log4cplus 与 log4cxx 三大 C++ 日志框架
  • 机器学习数学基础:46.Mann-Kendall 序贯检验(Sequential MK Test)
  • Java集合框架、Collection体系的单列集合
  • 有限元方法中的数值技术:追赶法求解三对角方程
  • 【鸿蒙/OpenHarmony/NDK】什么是NDK? 为啥要用NDK?
  • PCB知识07 地层与电源层
  • LLIC:基于自适应权重大感受野图像变换编码的学习图像压缩
  • 每日一题:使用栈实现逆波兰表达式求值
  • Redis高级
  • AAAI 2025丨具身智能+多模态感知如何精准锁定目标
  • 如何解决pip安装报错ModuleNotFoundError: No module named ‘ray’问题
  • Python数据分析常规步骤整理
  • Mysql系列--5、表的基本查询(下)
  • Speaking T2 - Dining Hall to CloseDuring Spring Break
  • 机器学习 DBScan
  • 一键复制产品信息到剪贴板
  • 【接口自动化】初识pytest,一文讲解pytest的安装,识别规则以及配置文件的使用