分布式锁介绍与实现
相信大家经常听到分布式锁这个东西,但可能并不太了解,接下来我们通过解决问题的方式来了解分布式锁
现在要实现一个秒杀功能:
public void placeOrder() throws InterruptedException {Long stock = Long.parseLong(redisTemplate.opsForValue().get("stock"));if(stock > 0) {Thread.sleep(100);redisTemplate.opsForValue().set("stock", String.valueOf(--stock));System.out.println(Thread.currentThread().getName() + "秒杀成功");}else{System.out.println(Thread.currentThread().getName() + "秒杀失败,库存不足");}}@RequestMapping("/main")public void main(String[] args) {for(int i = 0; i < 3; i++) {new Thread(() -> {try {placeOrder();} catch (InterruptedException e) {throw new RuntimeException(e);}}).start();}}
上面代码很明显存在超卖的问题,我们运行也可也看到(这里提前在redis中加入了stock,值为1):
解决方案我们也很容易想到,那就是加锁:
public static Long stock = 1L;public static synchronized void placeOrder() throws InterruptedException {if(stock > 0) {Thread.sleep(100);stock--;System.out.println(Thread.currentThread().getName() + "秒杀成功");}else{System.out.println(Thread.currentThread().getName() + "秒杀失败,库存不足");}}public static void main(String[] args) {for(int i = 0; i < 3; i++) {new Thread(() -> {try {placeOrder();} catch (InterruptedException e) {throw new RuntimeException(e);}}).start();}}
可以看到防止了超卖问题,
但是随着用户量日益增多,我们的系统出现了瓶颈,这时就可以使用负载均衡技术部署多个服务器,由于我们使用的同步锁只对当前服务器生效,多个服务之间的并发依然是不安全的,所以又会导致超卖问题。
这种情况下就需要分布式锁,具体实现我们可以使用Redis的setNX,这个命令表示只有当key不存在才会设置成功:
public void placeOrder() throws InterruptedException {String productId = "acf435";//使用分布式锁,while(redisTemplate.opsForValue().setIfAbsent(productId, Thread.currentThread().getName(), Duration.ofSeconds(10))) {Long stock = Long.parseLong(redisTemplate.opsForValue().get("stock"));if(stock > 0) {Thread.sleep(100);redisTemplate.opsForValue().set("stock", String.valueOf(--stock));System.out.println(Thread.currentThread().getName() + "秒杀成功");}else{System.out.println(Thread.currentThread().getName() + "秒杀失败,库存不足");}//释放锁redisTemplate.delete(productId);break;};}
那么这里我们就可以使用商品id作为key,当需要操作某个商品时把商品id作为key存入redis,存入成功的则允许进行下一步操作,注意,这里一定要设置超时时间,如果某个服务器存入key成功后恰巧这个服务器挂了,那么这个key就无法被释放了,所以一定要设置过期时间。
这里还存在一个问题,就是,如果超过过期时间业务还没有处理完,那么key就被释放了,其它服务器就能够继续操作。对于这个问题,我们可以使用一个线程,每隔一段时间去探测一下当前处理服务的线程是否在线,在线则延长过期时间即可:
public void placeOrder() throws InterruptedException {String productId = "acf435";Thread mainThread = Thread.currentThread();//使用分布式锁,while(redisTemplate.opsForValue().setIfAbsent(productId, Thread.currentThread().getName(), Duration.ofSeconds(10))) {//续命线程new Thread(() -> {//每隔5秒检测主线程是否在线,在线则延长key过期时间while (true) {try {Thread.sleep(5000);if(mainThread.isAlive()) {redisTemplate.opsForValue().setIfPresent(productId, mainThread.getName(), Duration.ofSeconds(10));}else{Thread.currentThread().interrupt();}} catch (InterruptedException e) {throw new RuntimeException(e);}}}).start();try {Long stock = Long.parseLong(redisTemplate.opsForValue().get("stock"));if (stock > 0) {Thread.sleep(100);redisTemplate.opsForValue().set("stock", String.valueOf(--stock));System.out.println(Thread.currentThread().getName() + "秒杀成功");} else {System.out.println(Thread.currentThread().getName() + "秒杀失败,库存不足");}break;}finally {//释放锁redisTemplate.delete(productId);}};}