当前位置: 首页 > news >正文

Redis分布式锁核心原理源码

文章目录

  • 概述
  • 一、Redis实现分布式锁
    • 1.1、第一版
    • 1.2、第二版
    • 1.3、第三版
    • 1.3、第四版
  • 二、Redisson实现分布式锁核心源码分析
    • 2.1、加锁核心源码
    • 2.2、锁续期核心源码
    • 2.3、重试机制核心源码
    • 2.4、解锁核心源码
  • 总结


概述

  传统的单机锁(Synchronized,ReentrantLock)都是进程级别的锁,无法应对服务多实例部署的场景(每个服务实例都有自己的进程),如果需要跨进程加锁,则需要引入第三方工具对于进程统一管理。
  使用Redis可以实现简易的分布式锁,而最常见的成熟的分布式锁方案是Redisson

一、Redis实现分布式锁

  案例工程:减库存,没有加锁控制,在高并发的场景下必然会出现超卖的问题。如果是在单点部署的情况下,可以通过本地锁解决,但是目前服务多点部署,本地锁的方案无法进行控制。

@Service
public class DistributedLockDemo {@Resourceprivate StringRedisTemplate stringRedisTemplate;private final String STOCK_PREFIX = "stock:";private final String STOCK_LOCK = "stock:lock:";public void deduceStock(int orderNum,int orderId){//业务代码int stockNumber = Integer.parseInt(stringRedisTemplate.opsForValue().get(STOCK_PREFIX + orderId));if (stockNumber > 0){stockNumber = stockNumber - orderNum;}stringRedisTemplate.opsForValue().set(STOCK_PREFIX + orderId, String.valueOf(stockNumber));}
}

1.1、第一版

  以stock:lock:前缀加上orderId作为key,使用setIfAbsent进行加锁,setIfAbsent命令是Redis原生的setNx命令在客户端的体现,setNx命令是仅仅当设置的key不存在时,才可以成功,保证互斥性。
  这样做存在的问题是,如果在执行业务代码的过程中,出现了异常,那么解锁的代码则永远无法执行,造成死锁

@Service
public class DistributedLockDemo {@Resourceprivate StringRedisTemplate stringRedisTemplate;private final String STOCK_PREFIX = "stock:";private final String STOCK_LOCK = "stock:lock:";public void deduceStock(int orderNum,int orderId){Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent(STOCK_LOCK + orderId, "lock");if (lock){//业务代码int stockNumber = Integer.parseInt(stringRedisTemplate.opsForValue().get(STOCK_PREFIX + orderId));if (stockNumber > 0){stockNumber = stockNumber - orderNum;}stringRedisTemplate.opsForValue().set(STOCK_PREFIX + orderId, String.valueOf(stockNumber));stringRedisTemplate.delete(STOCK_LOCK + orderId);}}
}

1.2、第二版

  针对第一版的问题进行改造,将解锁的代码放到finally代码块中。这种方案依旧会存在问题,因为finally代码块只能保证程序出错时最终执行,无法保证服务器宕机造成的死锁,所以最好在加锁时设置一个超时时间,到期自动释放。

    public void deduceStock(int orderNum,int orderId){try {Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent(STOCK_LOCK + orderId, "lock");if (lock){//业务代码int stockNumber = Integer.parseInt(stringRedisTemplate.opsForValue().get(STOCK_PREFIX + orderId));if (stockNumber > 0){stockNumber = stockNumber - orderNum;}stringRedisTemplate.opsForValue().set(STOCK_PREFIX + orderId, String.valueOf(stockNumber));stringRedisTemplate.delete(STOCK_LOCK + orderId);}} catch (Exception e) {}finally {stringRedisTemplate.delete(STOCK_LOCK + orderId);}}

1.3、第三版

  设置超时时间,可以使用stringRedisTemplate.expire方法,但是这样写:

Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent(STOCK_LOCK + orderId, "lock");
stringRedisTemplate.expire(STOCK_LOCK + orderId, 10, TimeUnit.SECONDS);

  是不具有原子性的,需要分为两条命令执行,如果在执行两条命令之间出现问题,依旧会造成死锁的问题。在Redis的层面提供了一条命令 set NX EX,保证设置超时时间和加锁是原子性操作:

Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent(STOCK_LOCK + orderId, "lock",10, TimeUnit.SECONDS);

  加锁时存在的问题看似是解决了,但是解锁的代码:

stringRedisTemplate.delete(STOCK_LOCK + orderId);

  会存在一种情况:

  • 线程一获取到了锁,然后在执行业务代码的时候陷入了阻塞。
  • 线程一的锁到期自动释放。
    • 线程二获取到了锁,执行业务代码
  • 线程一从阻塞状态恢复,执行完业务代码,要执行最终的解锁逻辑
  • 线程一将线程二的锁解锁。

1.3、第四版

  为了避免当前线程将其他线程的锁误解锁,需要在加锁时加入自己的线程唯一标识,并且在解锁时进行判断
  注意,不要用当前Thread.currentThread().getId()方法去获取线程ID,因为不同机器上的线程ID可能会重复。Redisson底层也不是直接用上述的API获取的线程ID,而是和UUID进行了拼接。

//加锁
String threadId = UUID.randomUUID().toString().replace("-","");
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent(STOCK_LOCK + orderId, threadId,10, TimeUnit.SECONDS);//解锁
String threadIdFromRedis = stringRedisTemplate.opsForValue().get("STOCK_LOCK + orderId");
if (threadId.equals(threadIdFromRedis)){stringRedisTemplate.delete(STOCK_LOCK + orderId);
}

  但是这样写, 解锁和之前的加锁设置超时时间有同样的问题,都是操作分为了两步,不能保证原子性。 在解锁的判断上,Redis并没有提供原子性的命令,需要自己去通过lua脚本实现。


  经过四版改动,自己用Redis实现的分布式锁已经基本可用了,但是深究下来依旧存在一些问题或不足:

  1. 如果执行业务代码的时间,超过了设置的锁超时时间,当前逻辑是没有自动续期的。
  2. 当前的逻辑不支持锁重入。
  3. 当前的逻辑没有实现重试机制,获取不到锁的线程无法进行重试。

二、Redisson实现分布式锁核心源码分析

  相比较于自己通过set NX EX + lua脚本实现的分布式缓存锁,Redisson是更为成熟的方案,也推荐在生产环境使用。Redisson分布式锁在API层面是非常简单的:

public class RedissonLockDemo {@Resourceprivate StringRedisTemplate stringRedisTemplate;@Resourceprivate Redisson redisson;private final String STOCK_PREFIX = "stock:";private final String STOCK_LOCK = "stock:lock:";public void deduceStock(int orderNum, int orderId) {//获取分布式锁RLock lock = redisson.getLock(STOCK_LOCK + orderId);try {//加分布式锁,可以指定超时时间,没有指定超时时间默认30s,底层会自动续期。lock.lock();//业务代码int stockNumber = Integer.parseInt(stringRedisTemplate.opsForValue().get(STOCK_PREFIX + orderId));if (stockNumber > 0) {stockNumber = stockNumber - orderNum;}stringRedisTemplate.opsForValue().set(STOCK_PREFIX + orderId, String.valueOf(stockNumber));} catch (Exception e) {} finally {lock.unlock();}}
}

  关键代码:

//获取分布式锁
RLock lock = redisson.getLock(STOCK_LOCK + orderId);
//加分布式锁,可以指定超时时间,没有指定超时时间默认30s,底层会自动续期。
lock.lock();
//解锁
lock.unlock();

2.1、加锁核心源码

  跟踪lock.lock();,进入lockInterruptibly
在这里插入图片描述
  首先第一次加锁,进入的是tryAcquire方法,最终的核心逻辑是:
在这里插入图片描述
  底层执行的是一段lua脚本,lua脚本和pipeline类似,也是可以将命令批量执行。虽然脚本中分了很多条命令,但是其他客户端要等到当前客户端的lua脚本全部执行完,才能执行脚本。

  • KEYS[1]:是作为当前分布式锁的Key,也就是用户在redisson.getLock时传入的。
  • ARGV[1]:是默认的超时时间,30s。
  • ARGV[2]:是当前线程的唯一标识,用线程ID拼接上UUID。
# 加锁的逻辑
# 当前分布式锁的key不存在
"if (redis.call('exists', KEYS[1]) == 0) then " +
#	调用hset命令,key是分布式锁的key,value的key是当前线程的唯一标识,用线程ID拼接上UUID。 value是1(重入次数)
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
#  设置超时时间
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
# 返回null
"return nil; " +
"end; " +
# 重入的逻辑
# 当前分布式锁的key存在,并且是当前线程持有
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
# 重入次数 + 1
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
# 设置超时时间
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
# 返回null 
"return nil; " +
"end; " +
# 查询指定键的剩余生存时间,并且返回
"return redis.call('pttl', KEYS[1]);"

  上面这个脚本的执行,包含了可重入锁初次加锁的逻辑,最终还会返回当前锁的剩余时间

2.2、锁续期核心源码

  Redisson锁的续期,也称为看门狗机制。如果要实现锁续期,常见的设计思想是在业务线程执行时,开启一个守护线程,对业务线程进行监控,如果锁到期,业务线程还没有执行完,就执行续期的逻辑
  在Redisson中的实现,调用完tryLockInnerAsync方法后,会回调operationComplete,通过future.getNow();获取到加锁的结果,上面的lua脚本,在加锁成功和重入成功后,都会返回null。
在这里插入图片描述
  进入scheduleExpirationRenewal方法,该方法就是实现续期的核心方法实现,类似于一个延迟任务的线程池,延迟30/3 = 10s执行,整个方法分为两部分
  首先依旧是执行一段lua脚本**(KEYS[1],ARGV[2],ARGV[1] 和第一段加锁时的lua脚本参数含义相同)**

# 当前分布式锁的key存在,并且是当前线程持有
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
# 进行续期30s
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
# 续期成功就返回1
"return 1; " +
"end; " +
# 否则返回 0 
"return 0;",

  第二部分则是拿到lua脚本执行的结果,递归调用scheduleExpirationRenewal方法,延迟10s执行,最终的结果是每隔10s进行一次续期,每次续期30s
在这里插入图片描述

2.3、重试机制核心源码

  加锁时的lua脚本,如果没有加锁或者重入成功,那么最终返回的是key的剩余生存时间
在这里插入图片描述

  返回到方法的最外层,在lockInterruptibly中执行自旋重试的逻辑。这里的自旋重试并非是在while循环中不断地循环,而是有一定的间隔时间。
  在进入while循环后首先会再次尝试获取锁,如果失败了,就通过Semaphore的API,在规定的TTL毫秒内尝试获取许可,如果有其他线程释放(即唤醒),当前线程就会继续执行。如果超时仍未获取到许可,则返回 false。
在这里插入图片描述
  如果业务代码执行的时间短于设置的锁超时时间,那么其他等待锁的线程并不会阻塞到超时时间后再去竞争锁,在执行while循环之前,会通过redis的发布订阅模型,将自身存入一个队列中。
在这里插入图片描述
  唤醒队列中元素的逻辑,在解锁中。

2.4、解锁核心源码

  解锁同样是通过lua脚本,将判断线程标识和解锁组成原子性的操作,解锁的lua脚本在unlockInnerAsync方法中:

  • KEYS[1]:当前分布式锁的key
  • KEYS[2]:当前分布式锁的key 拼接上 redisson_lock__channel
  • ARGV[1]:解锁消息标识,默认0L
  • ARGV[2]:锁超时释放时间
  • ARGV[3]:当前线程的唯一标识,用线程ID拼接上UUID。

  主线程在解锁的时候会往队列中发送给一条消息,唤醒等待线程:

  • 当前锁不存在,超时释放了
  • 存在并且解锁成功
# 当前key对应的分布式锁不存在
"if (redis.call('exists', KEYS[1]) == 0) then " +
# 发布解锁消息标识到当前分布式锁的key 拼接上 redisson_lock__channel
"redis.call('publish', KEYS[2], ARGV[1]); " +
# 返回1
"return 1; " +
"end;" +
# 当前key对应的分布式锁非本线程持有
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
# 返回null
"return nil;" +
"end; " +
# 可重入锁的解锁,重入次数 - 1
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
# 重入次数>0
"if (counter > 0) then " +
# 重新设置超时时间
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
# 返回
"return 0; " +
"else " +
# 删除当前key对应的分布式锁
"redis.call('del', KEYS[1]); " +
# 发布解锁消息标识到当前分布式锁的key 拼接上 redisson_lock__channel
"redis.call('publish', KEYS[2], ARGV[1]); " +
# 返回1
"return 1; "+
"end; " +
"return nil;",

  消费者 (正在阻塞等待的线程) 接受到了消息,会回调LockPubSubonmessage方法,被唤醒然后重新争抢锁。
在这里插入图片描述

总结

在这里插入图片描述

相关文章:

  • SpringCloud系列(40)--SpringCloud Gateway的Filter的简介及使用
  • 和ai对话:讨论一个简单的理财方案
  • Halcon 常用算子总结
  • 基于 SpringBoot 实现一个 JAVA 代理 HTTP / WS
  • MyBatis实战指南(八)MyBatis日志
  • 热传导方程能量分析与边界条件研究
  • HarmonyOS实战:自定义表情键盘
  • < OS 有关 4 台 Ubuntu VPSs 正在被攻击:nginx 之三> 记录、分析、防护的过程 配置 ufw Fail2Ban 保护网络上的主机
  • 个人计算机系统安全、网络安全、数字加密与认证
  • Github 2025-06-29php开源项目日报 Top10
  • RK3588集群服务器性能优化案例:电网巡检集群、云手机集群、工业质检集群
  • Mac电脑手动安装原版Stable Diffusion,开启本地API调用生成图片
  • 基于云的平板挠度模拟:动画与建模-AI云计算数值分析和代码验证
  • Linux中部署Nacos保姆级教程
  • Wpf布局之WrapPanel面板!
  • Java面试宝典:基础二
  • JSON + 存储过程:SaaS 架构下的统一接口与租户定制之道
  • 2025年渗透测试面试题总结-2025年HW(护网面试) 19(题目+回答)
  • OpenCV读取照片和可视化详解和代码示例
  • Java 数据结构 泛型