当前位置: 首页 > news >正文

分布式专题——4 大厂生产级Redis高并发分布式锁实战

1 “超卖”问题

1.1 问题引出

  • 来看下面这段代码:

    • 检查库存 -> 扣减库存 -> 写入库存;
    • stock是 Redis 中的一个 Key;
    int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")
    if (stock > 0) {int realStock = stock - 1;stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)System.out.println("扣减成功,剩余库存:" + realStock);
    } else {System.out.println("扣减失败,库存不足");
    }
    
  • 高并发环境下,多个请求(可能来自不同的服务器或同一服务器的不同线程)会交叉执行这三个步骤,导致数据不一致;

  • 假设初始库存 stock = 1。现在有两个用户(User A 和 User B)几乎同时发起购买请求:

    时间序列Tomcat 服务器 1 (处理 User A 的请求)Tomcat 服务器 2 (处理 User B 的请求)Redis 中的 Stock 值导致的结果
    T1读取库存: int stock = get("stock"); // 读到 11
    T2读取库存: int stock = get("stock"); // 也读到 11致命问题: 两个请求都认为库存充足,都准备进行扣减。
    T3计算新库存: realStock = 1 - 1 = 01
    T4计算新库存: realStock = 1 - 1 = 01
    T5写入库存: set("stock", "0")0User A 的请求成功扣减,库存变为 0。
    T6写入库存: set("stock", "0")0超卖发生! User B 的请求覆盖了 User A 的写入操作。库存最终为 0,但实际发生了两次扣减,商品多卖了一份。
    T7系统输出:扣减成功,剩余库存:00
    T8系统输出:扣减成功,剩余库存:00两个用户都收到了“成功”的提示,但库存只够一个人买。

1.2 单机环境下解决

  • 如果是在单机环境下,通过synchronized关键字构建一个代码块就可以解决:

    synchronized (this) {int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")if (stock > 0) {int realStock = stock - 1;stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)System.out.println("扣减成功,剩余库存:" + realStock);} else {System.out.println("扣减失败,库存不足");}
    }
    

1.3 分布式环境下解决

1.3.1 基本解决

  • 可以用SETNX命令来解决,其是一个用于设置键值对的命令,它是 “SET if Not eXists”(如果不存在,则 SET)的缩写;

    • 语法格式setnx key value,其中key是要设置的键,value是对应的值;
    • 执行逻辑:当执行SETNX命令时,Redis 会检查指定的key是否已经存在于数据库中;
      • 如果该key不存在,那么就将key的值设置为value,并且该命令返回1,表示设置成功;
      • 如果key已经存在,那么SETNX不会对已有的key-value对做任何修改,并且返回0,表示设置失败;
    @Autowired
    private StringRedisTemplate stringRedisTemplate;// 商品ID,可以作为分布式锁的 Key
    String lockKey = "lock:product_101";
    Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "shisan");
    if (!result) {return "获取锁失败";
    }// 业务逻辑
    int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")
    if (stock > 0) {int realStock = stock - 1;stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)System.out.println("扣减成功,剩余库存:" + realStock);
    } else {System.out.println("扣减失败,库存不足");
    }//解锁
    stringRedisTemplate.delete(lockKey);
    

1.3.2 异常解决

  • 如果在执行业务逻辑时抛出了异常,最后没有执行到解锁的代码,就会造成死锁,可以用try-finally解决:

    @Autowired
    private StringRedisTemplate stringRedisTemplate;// 商品ID,可以作为分布式锁的 Key
    String lockKey = "lock:product_101";
    Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "shisan");
    if (!result) {return "获取锁失败";
    }// 业务逻辑
    try{int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")if (stock > 0) {int realStock = stock - 1;stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)System.out.println("扣减成功,剩余库存:" + realStock);} else {System.out.println("扣减失败,库存不足");}
    } finally {//解锁stringRedisTemplate.delete(lockKey);
    }
    

1.3.3 宕机解决

  • 如果在执行业务逻辑时系统宕机,即使将解锁的代码放在了finally中,但系统宕机导致该解锁代码也执行不到,一样也会造成死锁,此时可以给锁加一个过期时间:

    @Autowired
    private StringRedisTemplate stringRedisTemplate;// 商品ID,可以作为分布式锁的 Key
    String lockKey = "lock:product_101";
    Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "shisan");
    stringRedisTemplate.expire(lockKey, 30, TimeUnit.SECONDS);
    if (!result) {return "获取锁失败";
    }// 业务逻辑
    try{int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")if (stock > 0) {int realStock = stock - 1;stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)System.out.println("扣减成功,剩余库存:" + realStock);} else {System.out.println("扣减失败,库存不足");}
    } finally {//解锁stringRedisTemplate.delete(lockKey);
    }
    

1.3.4 “加锁+设置过期时间”的原子性解决

  • 如果在执行完加锁逻辑后系统就发生了宕机,此时还没有执行给锁加过期时间的代码,一样也会出现没有解锁而导致死锁的场景,这是因为这两行代码不是原子性的,可以用一行代码来一次性做完这两个“加锁+设置过期时间”的操作,这样就保证了原子性:

    @Autowired
    private StringRedisTemplate stringRedisTemplate;// 商品ID,可以作为分布式锁的 Key
    String lockKey = "lock:product_101";
    //Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "shisan");
    //stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);
    Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, shisan, 30, TimeUnit.SECONDS);
    if (!result) {return "获取锁失败";
    }// 业务逻辑
    try{int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")if (stock > 0) {int realStock = stock - 1;stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)System.out.println("扣减成功,剩余库存:" + realStock);} else {System.out.println("扣减失败,库存不足");}
    } finally {//解锁stringRedisTemplate.delete(lockKey);
    }
    

1.3.5 持锁时间过长解决

  • 在高并发环境下,如果某个持有锁的线程执行时间过长(比如那台机器执行较慢),可能会引发一种严重的锁交叉释放问题:

    时间线线程1 (T1)线程2 (T2)线程3 (T3)Redis 中锁的状态 (lock:product_101)造成的结果与问题
    T0获取锁成功:执行 setIfAbsent(lockKey, "T1-val", 30s) 锁值: “T1-val”值: “T1-val” 剩余TTL: 30sT1 开始执行业务逻辑
    T+10s仍在执行业务逻辑值: “T1-val” 剩余TTL: 20s
    T+25s仍在执行业务逻辑…(执行缓慢)值: “T1-val” 剩余TTL: 5s
    T+30s仍在执行业务逻辑…(锁已过期,但T1不知情)获取锁成功setIfAbsent(..., "T2-val", 30s)值: “T2-val” 剩余TTL: 30s(新锁)锁的互斥性被破坏! T1 和 T2 现在同时进入了临界区代码,可能导致超卖等数据不一致问题
    T+32s开始执行业务逻辑值: “T2-val” 剩余TTL: 28s
    T+35s终于执行完业务逻辑 进入 finally执行 delete(lockKey)仍在执行业务逻辑值: null(线程2的锁被线程1删除)致命错误:T1 误删了 T2 的锁! T2 失去了保护,其他线程可以获取锁
    T+36s获取锁成功setIfAbsent(..., "T3-val", 30s)值: “T3-val” 剩余TTL: 30s(新锁)T3 开始执行,此时 T2 和 T3 同时在执行临界区代码,问题加剧
    T+37s执行完业务逻辑 进入 finally执行 delete(lockKey)正在执行业务逻辑值: null(线程3的锁被线程2删除)连锁反应:T2 又误删了 T3 的锁! 系统陷入“获取锁 -> 被前一个线程误删 -> 下一个线程获取锁”的恶性循环,分布式锁完全失效
  • 根本原因:自己的锁被别的线程释放掉了,所以可以将每一个锁的 value 设置成一个“身份ID”,在解锁的时候判断一下当前解锁的线程和“身份ID”是否一致,一致才允许解锁:

    @Autowired
    private StringRedisTemplate stringRedisTemplate;// 商品ID,可以作为分布式锁的 Key
    String lockKey = "lock:product_101";
    String clientId = UUID.randomUUID().toString();
    Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS);
    if (!result) {return "获取锁失败";
    }// 业务逻辑
    try{int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")if (stock > 0) {int realStock = stock - 1;stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)System.out.println("扣减成功,剩余库存:" + realStock);} else {System.out.println("扣减失败,库存不足");}
    } finally {// 谁加锁就只允许谁解锁if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))) {stringRedisTemplate.delete(lockKey);}
    }
    

1.3.6 锁续命(看门狗机制)

  • 假设有这么一个场景:

    • 线程1执行完if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey)))这行验证身份的代码,机器卡顿了一下;
    • 在卡顿的过程中,锁到了过期时间被自动释放掉了,线程2就获取到了锁且正在执行业务代码;
    • 此时线程1的机器从卡顿中恢复,执行释放锁的代码,此时释放的却是线程2的锁,1.3.5 持锁时间过长解决的问题又再现了;
  • 根本原因:验证身份的代码和释放锁的代码,没有保证原子性,可以用锁续命来解决:

  • 锁续命,也称为 看门狗机制,是一种在分布式锁场景下,由客户端主动且定期地延长其所持有锁的过期时间的策略。

    • 它的核心目的是:防止因为业务执行时间超过锁的初始过期时间而导致的锁提前失效问题,从而确保业务逻辑在执行期间能一直持有锁,维持互斥性;
    • 锁续命机制通常由一个独立的守护线程(或定时任务)来实现,其工作流程如下:
      1. 成功获取锁:业务线程成功获取分布式锁,并设置一个初始的过期时间(例如 10 秒);
      2. 启动看门狗:同时,客户端会启动一个守护线程(看门狗),这个线程会定期(例如,在过期时间的三分之一时,即 3 秒后)去检查业务线程是否还持有这把锁;
      3. 定期续期
        • 如果锁还存在且仍是当前线程持有:守护线程就执行 EXPIRE 命令,将锁的过期时间重新设置为初始值(例如,再续上 10 秒);
        • 如果锁不存在或已被其他线程持有:说明业务已经执行完毕释放了锁,或者锁已经被抢占,则守护线程停止续期;
      4. 业务完成,停止续期:当业务线程执行完毕,显式释放锁后,它会通知守护线程停止续期工作。
  • 在实际开发中,我们通常不会自己实现完整的锁续命机制,而是使用成熟的框架,例如 Redisson

    • 官网:Redisson | Valkey & Redis Java client. Ultimate Real-Time Data Platform;
    • GitHub:GitHub - redisson/redisson: Redisson - Valkey & Redis Java client. Real-Time Data Platform. Sync/Async/RxJava/Reactive API. Over 50 Valkey and Redis based Java objects and services: Set, Multimap, SortedSet, Map, List, Queue, Deque, Semaphore, Lock, AtomicLong, Map Reduce, Bloom filter, Spring, Tomcat, Scheduler, JCache API, Hibernate, RPC, local cache…。

2 Redisson 框架

2.1 锁续命实现

  • 引入依赖:

    <dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.6.5</version>
    </dependency>
    
  • 注入 Bean:

    @Bean
    public Redisson redisson() {// 此为单机模式Config config = new Config();config.useSingleServer().setAddress("redis://localhost:6379").setDatabase(0);return (Redisson) Redisson.create(config);
    }
    
  • 使用:

    @Autowired
    private Redisson redisson;@Autowired
    private StringRedisTemplate stringRedisTemplate;// 商品ID,可以作为分布式锁的 Key
    String lockKey = "lock:product_101";
    //获取锁对象
    RLock redissonLock = redisson.getLock(lockKey);
    //加分布式锁
    redissonLock.lock();// 业务逻辑
    try{int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")if (stock > 0) {int realStock = stock - 1;stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)System.out.println("扣减成功,剩余库存:" + realStock);} else {System.out.println("扣减失败,库存不足");}
    } finally {//解锁redissonLock.unlock();
    }
    

2.2 Redisson 分布式锁原理

在这里插入图片描述

  • 线程 1 加锁流程

    • 线程 1 通过 Redisson 发起 lock(加锁)请求;
    • 若加锁成功,Redisson 会启动一个后台线程:每隔 10 秒检查线程 1 是否仍持有锁。如果仍持有锁,就延长锁的过期时间(即“锁续命”,保证业务执行期间锁一直有效);
    • 加锁成功后,线程 1 执行相关业务逻辑;
    • 业务逻辑执行完后,通过 unlock(释放锁)操作,将锁归还给 Redis(Master 节点),再由 Master 同步给 Slave 节点,保证集群数据一致;
  • 线程 2 竞争锁流程

    • 线程 2 也通过 Redisson 发起 lock 请求,尝试加锁;

    • 加锁成功?

      • 若成功,后续流程与“线程 1 加锁”一致(启动后台续期线程、执行业务、释放锁);
      • 若失败(说明锁已被线程 1 持有),线程 2 会进入 while 循环(自旋):间歇性地再次尝试加锁,直到成功获取锁。

2.3 RedissonLock.class源码解析

  • 通过上面代码的redissonLock.lock();,即可追溯到;

2.3.1 加锁逻辑

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {this.internalLockLeaseTime = unit.toMillis(leaseTime); // 锁的过期时间return this.commandExecutor.evalWriteAsync(this.getName(), // KEYS[1]:锁的keyLongCodec.INSTANCE, command, // Lua 脚本开始// 锁不存在时,直接加锁"if (redis.call('exists', KEYS[1]) == 0) then " + // 检查锁 Key 是否存在。如果不存在(值为0),说明当前没有线程持有此锁"redis.call('hset', KEYS[1], ARGV[2], 1); " + // 使用Hash结构存储锁信息。Field=ARGV[2](线程唯一ID),Value=1(重入次数)"redis.call('pexpire', KEYS[1], ARGV[1]); " + // 为整个锁 Key 设置毫秒级的过期时间,防止死锁"return nil; " + // 返回 nil 表示加锁成功"end; " +// 锁已存在,且是当前线程持有(重入锁)"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + // 检查锁Key的Hash中,是否存在当前线程的Field。如果存在,说明是重入场景"redis.call('hincrby', KEYS[1], ARGV[2], 1); " + // 将当前线程对应的重入次数 +1。例如,从 1 变成 2"redis.call('pexpire', KEYS[1], ARGV[1]); " + // 重置锁的过期时间。这是实现看门狗自动续期的基础。每次重入都会刷新过期时间"return nil; " + // 返回 nil 表示重入加锁成功"end; " +// 锁已存在,且被其他线程持有"return redis.call('pttl', KEYS[1]);", // 如果前两个条件都不满足,说明锁存在且被其他线程持有。此时返回这个锁的剩余存活时间(毫秒)// Lua 脚本结束Collections.singletonList(this.getName()), // KEYS 数组:只有一个元素,就是锁的keynew Object[]{this.internalLockLeaseTime, this.getLockName(threadId)} // ARGV 数组:两个参数);
}
参数说明
KEYS[1]锁在 Redis 中的 Key 名称
ARGV[1]锁的租约时间(毫秒),即过期时间
ARGV[2]锁的唯一标识符,格式为 UUID:threadId
  • this.internalLockLeaseTime是 Redisson 设置的锁的过期时间:

    • 其是RedissonLock.class中的一个属性:

      protected long internalLockLeaseTime;
      
    • 由 Redisson 的构造方法赋值:

      public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {super(commandExecutor, name);this.commandExecutor = commandExecutor;this.id = commandExecutor.getConnectionManager().getId();this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
      }
      
    • 再根据getLockWatchdogTimeout()可以追溯到其默认值为 30000 毫秒,即30秒,对应源码在Config.class中:

      public Config() {this.transportMode = TransportMode.NIO;this.lockWatchdogTimeout = 30000L;this.keepPubSubOrder = true;this.addressResolverGroupFactory = new DnsAddressResolverGroupFactory();
      }
      
    • 在注入 Redisson 的 Bean 时,可以修改锁的过期时间:

      config.setLockWatchdogTimeout(10000);
      
  • this.getLockName(threadId)是锁信息(相当于1.3.5 持锁时间过长解决中的clientId),被上面的 Lua 脚本使用 Hash 结构存储到 Redis 中,其值为:

    String getLockName(long threadId) {return this.id + ":" + threadId;
    }
    
    final UUID id;
    

2.3.2 续命(看门狗)逻辑

  • 当成功加锁后:

    • 会返回 Lua 脚本的nil,其相当于 Java 的 null
    • 即下面代码中Boolean ttlRemaining = (Boolean)future.getNow();ttlRemaining的值就为 true
    • 就会执行RedissonLock.this.scheduleExpirationRenewal(threadId);,即执行续命逻辑;
    /*** 尝试获取一次锁的异步方法* @param leaseTime 指定的锁持有时间。-1L 表示使用看门狗机制* @param unit 时间单位* @param threadId 当前线程ID* @return RFuture<Boolean> 异步结果,true表示获取成功,false表示失败*/
    private RFuture<Boolean> tryAcquireOnceAsync(long leaseTime, TimeUnit unit, final long threadId) {// 分支一:如果调用方明确指定了锁的租约时间(leaseTime != -1L)if (leaseTime != -1L) {// 直接使用用户指定的租约时间获取锁,不启动看门狗机制// 锁会在指定时间后自动过期,即使业务未执行完return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);} // 分支二:如果调用方未指定租约时间(leaseTime == -1L),使用看门狗机制else {// 使用看门狗默认超时时间(默认30秒)作为初始租约时间尝试加锁RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), // 获取配置的看门狗超时时间TimeUnit.MILLISECONDS, // 时间单位为毫秒threadId, // 线程IDRedisCommands.EVAL_NULL_BOOLEAN // Redis命令类型);// 为加锁操作添加监听器,用于在加锁完成后处理看门狗的启动ttlRemainingFuture.addListener(new FutureListener<Boolean>() {public void operationComplete(Future<Boolean> future) throws Exception {// 只有当加锁操作成功完成时(无网络异常等错误)if (future.isSuccess()) {// 获取加锁结果:true-成功,false-失败(锁已被占用)Boolean ttlRemaining = (Boolean)future.getNow();// 如果加锁成功,启动看门狗续期机制if (ttlRemaining) {// 调度过期时间续期任务:启动看门狗线程定期续期RedissonLock.this.scheduleExpirationRenewal(threadId);}// 如果加锁失败(ttlRemaining为false),什么都不做,锁已被其他线程持有}// 如果加锁操作本身失败(如网络异常),也不启动看门狗}});// 返回异步结果Futurereturn ttlRemainingFuture;}
    }
    
  • 接下来看一下scheduleExpirationRenewal方法的源码:

    /*** 调度锁过期时间续期任务(即启动看门狗机制)* @param threadId 当前线程ID*/
    private void scheduleExpirationRenewal(final long threadId) {// 使用expirationRenewalMap来确保同一个锁只启动一个看门狗任务,避免重复调度// this.getEntryName() 获取锁的唯一标识,用于作为Map的Keyif (!expirationRenewalMap.containsKey(this.getEntryName())) {// 创建一个定时任务(Timeout),这就是看门狗的核心Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {public void run(Timeout timeout) throws Exception {// 1. 执行Lua脚本进行续期操作RFuture<Boolean> future = RedissonLock.this.commandExecutor.evalWriteAsync(RedissonLock.this.getName(), // KEYS[1]: 锁的KeyLongCodec.INSTANCE,RedisCommands.EVAL_BOOLEAN,// Lua脚本:检查锁是否仍被当前线程持有,如果是则续期"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('pexpire', KEYS[1], ARGV[1]); " + // 续期操作"return 1; " + // 返回1表示续期成功"end; " +"return 0;", // 返回0表示续期失败(锁不存在或不属于当前线程)Collections.singletonList(RedissonLock.this.getName()), // KEYS数组new Object[]{RedissonLock.this.internalLockLeaseTime, RedissonLock.this.getLockName(threadId)} // ARGV参数:续期时间和线程标识);// 2. 为续期操作添加监听器,处理续期结果future.addListener(new FutureListener<Boolean>() {public void operationComplete(Future<Boolean> future) throws Exception {// 无论续期成功与否,都先从Map中移除当前任务记录RedissonLock.expirationRenewalMap.remove(RedissonLock.this.getEntryName());// 如果续期操作本身失败(如网络异常)if (!future.isSuccess()) {RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", future.cause());} else {// 获取续期结果:true(1)-成功,false(0)-失败if ((Boolean)future.getNow()) {// 3. 续期成功:递归调用自己,安排下一次续期任务RedissonLock.this.scheduleExpirationRenewal(threadId);}// 如果续期失败(返回false),说明锁已被释放或不属于当前线程,不再续期}}});}}, // 定时任务的延迟执行时间:租约时间的1/3// 例如:internalLockLeaseTime=30000ms,则延迟10000ms(10秒)后执行第一次续期this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);// 使用putIfAbsent原子性地将任务放入Map,避免并发问题if (expirationRenewalMap.putIfAbsent(this.getEntryName(), task) != null) {// 如果Map中已存在该任务(其他线程先放入),则取消当前创建的任务,避免重复task.cancel();}}// 如果Map中已存在该任务,说明看门狗已经启动,直接返回
    }
    

2.3.3 其它线程自旋获取锁的逻辑

/*** 可中断的获取锁方法(支持等待且可被中断)* @param leaseTime 锁的租约时间* @param unit 时间单位* @throws InterruptedException 如果等待过程中线程被中断*/
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {// 获取当前线程IDlong threadId = Thread.currentThread().getId();// 1. 首先尝试直接获取锁Long ttl = this.tryAcquire(leaseTime, unit, threadId);// 如果ttl为null,表示获取锁成功,直接返回(如果不为空,返回的ttl是锁剩余的过期时间)if (ttl != null) {// 2. 获取锁失败,订阅锁释放消息通道RFuture<RedissonLockEntry> future = this.subscribe(threadId);this.commandExecutor.syncSubscription(future); // 等待订阅完成try {// 3. 进入循环等待状态while(true) {// 再次尝试获取锁ttl = this.tryAcquire(leaseTime, unit, threadId);// 如果获取成功(ttl为null),跳出循环if (ttl == null) {return;}// 4. 根据剩余时间采取不同的等待策略if (ttl >= 0L) {// ttl >= 0: 锁被占用但有明确的剩余时间// 使用Semaphore尝试在指定时间内等待,如果超时或中断则抛出异常this.getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {// ttl < 0: 锁被占用但无过期时间(理论上不应该发生)// 无限期等待,直到被通知或中断this.getEntry(threadId).getLatch().acquire();}// 循环继续,再次尝试获取锁}} finally {// 5. 最终清理:取消订阅,释放资源this.unsubscribe(future, threadId);}}// 如果第一次就获取成功,直接退出方法
}

2.3.4 解锁逻辑

/*** 释放锁的核心异步方法(执行Lua脚本)* @param threadId 当前线程ID* @return RFuture<Boolean> 异步结果:true-完全释放,false-重入计数减1,null-释放失败*/
protected RFuture<Boolean> unlockInnerAsync(long threadId) {return this.commandExecutor.evalWriteAsync(this.getName(), // 锁的KeyLongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, // Lua 脚本开始 "if (redis.call('exists', KEYS[1]) == 0) then " + // 锁 Key 已经不存在(可能已过期)"redis.call('publish', KEYS[2], ARGV[1]); " + // 仍然发布解锁消息(通知可能还在等待的线程)"return 1; " + // 返回1表示释放完成"end;" +"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + // 当前线程(ARGV[3])不持有该锁"return nil; " + // 返回nil表示当前线程不持有锁"end; " +"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + // 将当前线程的重入次数减1"if (counter > 0) then " + // 重入次数仍大于0(锁仍被当前线程持有)"redis.call('pexpire', KEYS[1], ARGV[2]); " + // 更新锁的过期时间(续期)"return 0; " + // 返回0表示重入次数减少但锁仍被持有(即还未释放)"else " + // 重入次数减到0"redis.call('del', KEYS[1]); " + // 删除锁 Key"redis.call('publish', KEYS[2], ARGV[1]); " + // 发布解锁消息,通知等待线程"return 1; " + // 返回1表示锁被完全释放"end; " +"return nil;", // 理论上不会执行到这行// Lua 脚本结束// KEYS 数组:两个KeyArrays.asList(this.getName(),          // KEYS[1]: 锁的Keythis.getChannelName()    // KEYS[2]: 发布订阅的频道名,用于通知其他等待线程), // ARGV 数组:三个参数new Object[]{LockPubSub.unlockMessage,    // ARGV[1]: 发布的消息内容(通常是数字1)this.internalLockLeaseTime,  // ARGV[2]: 锁的租约时间(用于续期)this.getLockName(threadId)   // ARGV[3]: 当前线程的唯一标识(UUID:threadId)});
}

2.3.5 其它线程被唤醒获取锁的逻辑

  • 2.3.3 其它线程自旋获取锁的逻辑中,获取锁失败的线程会执行RFuture<RedissonLockEntry> future = this.subscribe(threadId);去订阅锁释放消息通道;

  • 进入到subscribe方法中:

    protected RFuture<RedissonLockEntry> subscribe(long threadId) {return PUBSUB.subscribe(this.getEntryName(),                 // 锁的唯一标识,作为订阅的入口名this.getChannelName(),               // Redis的发布订阅频道名称this.commandExecutor.getConnectionManager().getSubscribeService() // 订阅服务);
    }
    
  • PUBSUB 是 Redisson 内部的一个静态工具类,它专门负责管理所有基于 Redis 发布订阅(Pub/Sub)功能的连接和订阅关系;

    protected static final LockPubSub PUBSUB = new LockPubSub();
    
  • Ctrl+鼠标左键点击上面LockPubSub,可以追溯到LockPubSub.class中,其中有一个onMessage方法:

    /*** 收到锁释放消息时的回调处理方法* @param value RedissonLockEntry对象,包含等待线程的信息* @param message 收到的消息内容(应该是unlockMessage)*/
    protected void onMessage(RedissonLockEntry value, Long message) {// 1. 检查收到的消息是否为解锁消息if (message.equals(unlockMessage)) {// 2. 释放信号量,唤醒一个正在等待的线程value.getLatch().release();// 3. 处理监听器任务(确保公平性)while(true) {Runnable runnableToExecute = null;// 使用synchronized保证对value对象的线程安全访问synchronized(value) {// 3.1 从监听器队列中取出一个任务Runnable runnable = (Runnable)value.getListeners().poll();if (runnable != null) {// 3.2 尝试获取信号量许可(非阻塞方式)if (value.getLatch().tryAcquire()) {// 获取成功:将这个任务标记为待执行runnableToExecute = runnable;} else {// 获取失败:说明其他线程已经抢到了锁,将任务重新放回队列value.addListener(runnable);}}}// 4. 执行取出的任务或退出循环if (runnableToExecute == null) {// 队列为空或无法获取信号量,退出循环return;}// 5. 执行获取锁的任务(不在同步块内执行,避免阻塞)runnableToExecute.run();}}// 如果不是解锁消息,忽略处理
    }
    

3 Redis 主从架构下的分布式锁失效问题

3.1 问题

  • 在 Redis 主从复制架构中:

    • 主节点(Master):处理所有写操作(如加锁、释放锁);
    • 从节点(Slave):异步复制主节点的数据,处理读操作;
  • 下面通过一个时序图来展示锁失效的具体过程:

    • Redis 主从复制是异步的,主节点写入成功后立即返回,数据同步到从节点有毫秒级的延迟;
    • 当主节点宕机时,从节点可能尚未收到最新的锁数据,但已经被提升为新的主节点;
    • 在某些网络分区情况下,可能会出现两个客户端分别向不同的节点申请同一把锁,都成功获得;
    Client ARedis MasterRedis SlaveClient B第1步:Client A 成功获取锁执行SETNX my_lock token_value成功获取锁第2步:主节点宕机,数据未同步异步复制中...主节点突然宕机锁数据未同步到从节点第3步:故障切换,从节点升级升级为新的主节点新主节点上没有my_lock数据第4步:Client B 获取同一把锁执行SETNX my_lock token_value成功获取锁(实际上锁已存在!)第5步:两个客户端同时持有锁Client A 认为持有锁Client B 认为持有锁锁失效!出现数据竞争!Client ARedis MasterRedis SlaveClient B

3.2 不推荐使用红锁解决

3.2.1 简介

  • 官网:Locks and synchronizers - Redisson Reference Guide;

  • Redlock 算法是 Redis 作者提出的解决方案,核心思想是在多个独立的 Redis 实例上同时获取锁

  • 算法步骤

    1. 客户端获取当前时间戳 T1;
    2. 依次向 N 个独立的 Redis 实例申请锁(使用相同的 Key 和随机值);
    3. 客户端计算获取锁花费的时间(当前时间 T2 - T1);
      • 只有当在大多数实例(N/2 + 1)上获取成功;
      • 且总耗时小于锁的有效时间时,才认为加锁成功;
    4. 如果加锁失败,向所有实例发送释放锁命令;
    加锁
    加锁
    加锁
    Java Client
    redis1
    redis2
    redis3

3.2.2 使用示例

@RequestMapping("/redlock")
public String redlock() {String lockKey = "product_001"; // 定义锁的键名// 创建多个RLock实例(在实际生产中,每个RLock应该对应不同的Redis实例)// 注意:这里只是伪代码,实际需要连接不同的Redis服务器实例RLock lock1 = redisson.getLock(lockKey); // 第一个Redis实例的锁RLock lock2 = redisson.getLock(lockKey); // 第二个Redis实例的锁RLock lock3 = redisson.getLock(lockKey); // 第三个Redis实例的锁/*** 创建RedissonRedLock(红锁)对象* 这是Redlock算法的核心实现,需要传入多个独立的RLock实例* 通常要求这些Redis实例是相互独立的主节点,而不是主从关系*/RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);try {/*** 尝试获取分布式锁(Redlock算法)* @param waitTimeout 10 最大等待时间:客户端尝试获取锁的最长等待时间,超过则放弃* @param leaseTime 30 锁的租约时间:获取成功后锁的自动释放时间,应大于业务执行时间* @param unit TimeUnit.SECONDS 时间单位* @return boolean 是否成功获取锁* * Redlock算法核心:必须在大多数节点(N/2 + 1)上成功获取锁,且总耗时小于租约时间*/boolean res = redLock.tryLock(10, 30, TimeUnit.SECONDS);if (res) {// 成功获得锁,在这里处理业务逻辑// 注意:此时锁已经在多个Redis实例上被成功持有// 可以安全地执行需要分布式锁保护的临界区代码} else {// 获取锁失败,可以记录日志或抛出异常// 在实际应用中,可能需要重试或返回特定错误信息给客户端}} catch (Exception e) {// 捕获获取锁过程中可能出现的异常(如网络异常、中断异常等)throw new RuntimeException("lock fail", e); // 包装异常并抛出} finally {// 无论如何,最后都要尝试解锁// RedissonRedLock会向所有参与红锁的Redis实例发送解锁命令// 即使某些实例上的锁获取失败,也会确保清理其他实例上的锁redLock.unlock();}return "end"; // 返回处理结果
}

3.2.3 不推荐使用的原因一

  • 如果给 redis1,redis2,redis3 后面分别加上一个从节点,然后来看这么一个场景:

    加锁
    加锁
    加锁
    从节点
    从节点
    从节点
    Java Client
    redis1 Master
    redis2 Master
    redis3 Master
    redis1 Slave
    redis2 Slave
    redis3 Slave
  • 第一阶段:线程1成功获取锁

    1. 线程 1 执行 Redlock 算法,向三个主节点申请锁:
      • redis1 Master:加锁成功
      • redis2 Master:加锁成功
      • redis3 Master:加锁失败(可能由于网络延迟或竞争)
    2. 线程1在 2/3 个节点上加锁成功(满足大多数条件),认为获取锁成功,开始执行业务逻辑;
    3. redis2 Master 的锁数据正在异步复制redis2 Slave,由于 Redis 复制的异步性,redis2 Slave 可能尚未收到锁数据;
  • 第二阶段:redis2 Master 宕机,故障转移

    1. redis2 Master 突然宕机(在数据完全同步前);
    2. 哨兵(Sentinel)或集群模式检测到主节点宕机,将 redis2 Slave 提升为新的主节点(现在称为 redis2 New Master);
    3. 关键问题:新的 redis2 New Master没有线程1的锁数据,因为复制尚未完成;
  • 第三阶段:线程2尝试获取同一把锁

    1. 线程 2 执行 Redlock 算法,向三个主节点申请同一把锁:
      • redis1 Master:加锁失败(线程1仍持有锁)
      • redis2 New Master:加锁成功(新主节点上没有锁数据)
      • redis3 Master:加锁成功
    2. 线程2在 2/3 个节点上加锁成功(满足大多数条件),也认为获取锁成功;
  • 最终结果:锁失效

    1. 两个线程同时持有锁
      • 线程1:认为自己在 redis1 Masterredis2 Master 上持有锁
      • 线程2:认为自己在 redis2 New Masterredis3 Master 上持有锁
    2. 数据竞争:两个线程同时进入临界区,导致数据不一致(超卖等问题);
  • 时序图:

    线程1redis1 Masterredis2 Masterredis2 Slaveredis3 Master线程2redis2 New Master阶段一:线程1获取锁SETNX lock_key (成功)SETNX lock_key (成功)SETNX lock_key (失败)线程1获取锁成功(2/3)阶段二:故障转移异步复制中...(锁数据未同步完)redis2 Master宕机从节点升级为新主节点新主节点上没有锁数据阶段三:线程2获取同一把锁SETNX lock_key (失败-已有锁)SETNX lock_key (成功-无锁数据)SETNX lock_key (成功)线程2获取锁成功(2/3)阶段四:锁失效线程1认为持有锁线程2认为持有锁两个线程同时进入临界区!锁失效!线程1redis1 Masterredis2 Masterredis2 Slaveredis3 Master线程2redis2 New Master

3.2.4 不推荐使用的原因二

  • 如果没有从节点:

    加锁
    加锁
    加锁
    Java Client
    redis1
    redis2
    redis3
  • 第一阶段:线程1成功获取锁

    1. 线程 1 执行 Redlock 算法,向三个节点申请锁:

      • redis1:加锁成功
      • redis2:加锁成功
      • redis3:加锁失败
    2. 线程1在 2/3 个节点上加锁成功,认为获取锁成功,开始执行业务逻辑;

    3. redis2 使用的是 RDB 持久化,加锁操作(SETNX)只存在于 redis2内存中,尚未到达 RDB 持久化的时间点,数据未持久化到磁盘

      RDB 的持久化方式是每隔一段时间就将 Reids 执行的命令持久化到磁盘中;

  • 第二阶段:redis2 宕机并重启

    1. 在 RDB 持久化之前,redis2 突然宕机。由于 RDB 是定时持久化,内存中的数据全部丢失;
    2. redis2 重启后,从最近的 RDB 快照文件恢复数据,但这个快照中不包含线程 1 的锁信息
    3. 关键问题:重启后的 redis2 是一个"干净"的状态,完全不知道线程 1 曾经加过锁;
  • 第三阶段:线程2尝试获取同一把锁

    1. 线程 2 执行 Redlock 算法,向三个节点申请同一把锁:
      • redis1:加锁失败(线程1仍持有锁)
      • redis2:加锁成功(重启后无锁数据)
      • redis3:加锁成功
    2. 线程2在 2/3 个节点上加锁成功,也认为获取锁成功;
  • 最终结果:锁失效

    1. 两个线程同时持有锁
      • 线程1:认为自己在 redis1redis2 上持有锁(但实际上 redis2 的锁已丢失)
      • 线程2:认为自己在 redis2redis3 上持有锁
    2. 数据竞争:两个线程同时进入临界区,导致数据不一致;
  • 时序图:

    线程1redis1redis2 (RDB)redis3线程2redis2重启后阶段一:线程1获取锁SETNX lock_key (成功)SETNX lock_key (成功-仅在内存)SETNX lock_key (失败)线程1获取锁成功(2/3)阶段二:redis2宕机重启RDB尚未持久化加锁数据仅在内存中redis2宕机→内存数据全部丢失从旧RDB文件重启恢复重启后的redis2无线程1的锁数据阶段三:线程2获取同一把锁SETNX lock_key (失败-已有锁)SETNX lock_key (成功-无锁数据)SETNX lock_key (成功)线程2获取锁成功(2/3)阶段四:锁失效线程1认为持有锁(但redis2上的锁已丢失)线程2认为持有锁两个线程同时进入临界区!锁失效!线程1redis1redis2 (RDB)redis3线程2redis2重启后

4 分布式锁性能提升方法之——分段锁

4.1 简介

  • 分段锁是一种将单个粗粒度锁拆分为多个细粒度锁的并发控制技术。其核心思想是:将共享资源划分为多个独立的区间(段),每个区间使用独立的锁进行保护,从而减少锁竞争,提高并发性能;

  • 其思想就是:空间换时间 + 拆分粒度

  • Java 的 ConcurrentHashMap 就是分段锁的经典实现:

    // ConcurrentHashMap 内部结构(Java 7及之前版本)
    public class ConcurrentHashMap<K, V> {final Segment<K, V>[] segments; // 分段数组static final class Segment<K, V> extends ReentrantLock {// 每个Segment包含一个HashEntry数组transient volatile HashEntry<K, V>[] table;}
    }
    

4.2 秒杀库存扣减场景举例

  • 假设有一个商品库存 stock = 1000传统方式是用一把锁保护整个库存

    // 传统方式 - 性能瓶颈
    @RestController
    public class SeckillController {private final String LOCK_KEY = "product_001_lock";private final String STOCK_KEY = "product_001_stock";@PostMapping("/seckill")public String seckill() {RLock lock = redisson.getLock(LOCK_KEY);try {lock.lock();int stock = Integer.parseInt(redisTemplate.opsForValue().get(STOCK_KEY));if (stock > 0) {redisTemplate.opsForValue().set(STOCK_KEY, String.valueOf(stock - 1));return "秒杀成功";}return "库存不足";} finally {lock.unlock();}}
    }
    
  • 使用分段锁优化,我们将库存拆分为10个分段:

    • 在某些业务场景下, 也可以根据地理区域来划分子区域;
    // 分段锁实现
    @RestController
    public class SegmentSeckillController {private final int SEGMENT_COUNT = 10; // 分为10个段private final String SEGMENT_LOCK_PREFIX = "product_001_segment_lock_";private final String SEGMENT_STOCK_PREFIX = "product_001_segment_stock_";// 初始化分段库存@PostConstructpublic void initStock() {int totalStock = 1000;int stockPerSegment = totalStock / SEGMENT_COUNT;int remainder = totalStock % SEGMENT_COUNT;for (int i = 0; i < SEGMENT_COUNT; i++) {int segmentStock = stockPerSegment + (i < remainder ? 1 : 0);redisTemplate.opsForValue().set(SEGMENT_STOCK_PREFIX + i, String.valueOf(segmentStock));}}@PostMapping("/seckill-segment")public String seckillSegment() {// 1. 随机选择一个分段(或根据用户ID哈希)int segmentIndex = ThreadLocalRandom.current().nextInt(SEGMENT_COUNT);// 2. 获取该分段的锁RLock segmentLock = redisson.getLock(SEGMENT_LOCK_PREFIX + segmentIndex);String segmentStockKey = SEGMENT_STOCK_PREFIX + segmentIndex;try {// 3. 尝试获取分段锁(设置较短超时时间)if (segmentLock.tryLock(100, 500, TimeUnit.MILLISECONDS)) {int stock = Integer.parseInt(redisTemplate.opsForValue().get(segmentStockKey));if (stock > 0) {redisTemplate.opsForValue().set(segmentStockKey, String.valueOf(stock - 1));return "秒杀成功,来自分段[" + segmentIndex + "]";}// 当前分段库存不足,尝试其他分段return tryOtherSegments(segmentIndex);}return "系统繁忙,请重试";} finally {segmentLock.unlock();}}private String tryOtherSegments(int excludedSegment) {// 尝试其他分段的逻辑for (int i = 0; i < SEGMENT_COUNT; i++) {if (i == excludedSegment) continue;RLock otherLock = redisson.getLock(SEGMENT_LOCK_PREFIX + i);try {if (otherLock.tryLock(50, 200, TimeUnit.MILLISECONDS)) {int stock = Integer.parseInt(redisTemplate.opsForValue().get(SEGMENT_STOCK_PREFIX + i));if (stock > 0) {redisTemplate.opsForValue().set(SEGMENT_STOCK_PREFIX + i, String.valueOf(stock - 1));return "秒杀成功,来自分段[" + i + "](重试)";}}} finally {otherLock.unlock();}}return "库存不足";}
    }
    
  • 对比:

    分段锁架构
    传统单锁架构
    分段锁0
    客户端1
    库存段0: 100
    分段锁1
    客户端2
    库存段1: 100
    分段锁2
    客户端3
    库存段2: 100
    分段锁9
    客户端1000
    库存段9: 100
    全局锁
    客户端1
    客户端2
    客户端3
    客户端1000
    库存: 1000

文章转载自:

http://baJ9XDPC.sqqkr.cn
http://cnpJIf9k.sqqkr.cn
http://x9c1KzbI.sqqkr.cn
http://z98lbov5.sqqkr.cn
http://cswF6nFm.sqqkr.cn
http://NUk5hhyJ.sqqkr.cn
http://dh4c6ZaM.sqqkr.cn
http://JHbCPM25.sqqkr.cn
http://wJE9s6Hx.sqqkr.cn
http://JlcfUszQ.sqqkr.cn
http://X8GRKYGN.sqqkr.cn
http://33PmBdGE.sqqkr.cn
http://ZmyJycyv.sqqkr.cn
http://LlaovWY3.sqqkr.cn
http://ZPbj2mJb.sqqkr.cn
http://sXhvYlHC.sqqkr.cn
http://ZTQTggFi.sqqkr.cn
http://inoqyExx.sqqkr.cn
http://w8VoKLxM.sqqkr.cn
http://9zN0wBTV.sqqkr.cn
http://B4dQQRRW.sqqkr.cn
http://lu8gJG1v.sqqkr.cn
http://vPYIYgUr.sqqkr.cn
http://r5faIr44.sqqkr.cn
http://aNEe9m1E.sqqkr.cn
http://ZA9EKlCE.sqqkr.cn
http://d14aYHHE.sqqkr.cn
http://ka1IkS9y.sqqkr.cn
http://sCyNlrwS.sqqkr.cn
http://ohFeMZWK.sqqkr.cn
http://www.dtcms.com/a/374135.html

相关文章:

  • Infortrend普安科技IEC私有云平台VM解决方案
  • 实战对比:百炼知识库与钉钉知识库的全方位对比
  • GitLab升级后仓库镜像信任证书导入问题
  • 2. 计算机系统基础知识
  • 软考中级习题与解答——第三章_操作系统(2)
  • 第七届全球校园人工智能算法精英大赛-算法巅峰赛产业命题赛--算法题科普
  • 【CentOS7】使用yum安装出错,报HTTPS Error 404 - Not Found
  • 今天继续学习shell脚本
  • 解决哈希冲突
  • C++算法专题学习:栈相关的算法
  • CentOS部署ELK Stack完整指南
  • 多模态大模型Keye-VL-1.5发布!视频理解能力更强!
  • JAK/STAT信号通路全解析:核心分子、激活与负调控
  • 人工智能知识图谱应用平台国家标准发布实施
  • Chiplet封装革命:路登多芯片同步固晶治具支持异构集成
  • 语法分析:编译器中的“语法警察”
  • python数据分析工具特点分析
  • 高并发场景下的“命令执行”注入绕道记
  • Java创建对象的5种方式
  • Redis+Envoy实现智能流量治理:动态读写分离方案
  • ros2中qos的调优配置
  • 【GPT入门】第65课 vllm指定其他卡运行的方法,解决单卡CUDA不足的问题
  • 网络地址转换(NAT)详解
  • 综合体项目 3D 数字孪生可视化运维管理平台解决方案
  • 平衡车 -- MPU6050
  • 【PyTorch】图像二分类
  • 自动驾驶中的传感器技术39——Radar(0)
  • 【进阶版两种方法 | 题解】洛谷 P4285 [SHOI2008] 汉诺塔 [数学分析递推]
  • DFT学习--文献
  • 多轻量算轻量