当前位置: 首页 > news >正文

redis实战day2(秒杀)

目录

优惠卷秒杀:

1.全局唯一ID:

2.添加优惠卷:

3.秒杀下单:

4.一人一单(单体项目一步步解决)

分布式锁:

1.基本原理和实现方式对比:

2.Redis分布式锁的实现核心思路

redission

1-redission功能介绍:

2.redission可重入锁原理

3.redission锁重试和WatchDog机制(源码解读)

3.1redission锁重试

3.2看门狗机制

3.3redission锁的MutiLock原理

优惠卷秒杀:

1.全局唯一ID:

每个店铺都可以发布优惠券:

如果订单表使用数据库自增ID就存在一些问题: 

  • id的规律性太明显

  • 受单表数据量的限制

    场景分析:如果我们的id具有太明显的规则,用户或者说商业对手很容易猜测出来我们的一些敏感信息,比如商城在一天时间内,卖出了多少单,这明显不合适。

    场景分析二:随着我们商城规模越来越大,mysql的单表的容量不宜超过500W,数据量过大之后,我们要进行拆库拆表,但拆分表了之后,他们从逻辑上讲他们是同一张表,所以他们的id是不能一样的, 于是乎我们需要保证id的唯一性。

    全局ID生成器,是一种在分布式系统下用来生成全局唯一ID的工具,一般要满足下列特性:

为了增加ID的安全性,我们可以不直接使用Redis自增的数值,而是拼接一些其它信息: 上面都是二进制形式

Redis实现全局唯一Id:

@Component
public class RedisIdWorker {/*** 开始时间戳*/private static final long BEGIN_TIMESTAMP = 1640995200L;/*** 序列号的位数*/private static final int COUNT_BITS = 32;private StringRedisTemplate stringRedisTemplate;public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {this.stringRedisTemplate = stringRedisTemplate;}public long nextId(String keyPrefix) {// 1.生成时间戳LocalDateTime now = LocalDateTime.now();long nowSecond = now.toEpochSecond(ZoneOffset.UTC);long timestamp = nowSecond - BEGIN_TIMESTAMP;// 2.生成序列号// 2.1.获取当前日期,精确到天String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));// 2.2.自增长long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);// 3.拼接并返回return timestamp << COUNT_BITS | count;}
}

ZoneOffset.UTC 表示世界协调时(UTC)的时区偏移量,也就是 +0 时区

long nowSecond = now.toEpochSecond(ZoneOffset.UTC);

这行代码的作用是:

  • 将 LocalDateTime 对象 now 转换为从 1970-01-01T00:00:00Z 开始的秒数

  • 使用 UTC 时区 进行计算,避免时区差异导致的时间戳不一致问题

例如:

LocalDateTime now = LocalDateTime.now();// 使用系统默认时区(可能产生差异)
long defaultSecond = now.toEpochSecond(ZoneOffset.ofHours(8)); // 使用UTC时区(统一标准)
long utcSecond = now.toEpochSecond(ZoneOffset.UTC);
String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));

这行代码的作用:

  • DateTimeFormatter.ofPattern("yyyy:MM:dd"):创建一个日期格式化器,格式为 "年:月:日"

  • now.format(...):将当前时间格式化为指定格式的字符串

  • 结果示例"2024:03:15"

方式输出示例说明
不格式化2024-03-15T10:30:45.123ISO-8601 标准格式
格式化后2024:03:15自定义格式
StringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date)

这是一个 Redis 的原子自增操作

  • stringRedisTemplate.opsForValue():获取操作 String 类型值的接口

  • .increment(key):对指定 key 的值执行原子性的 +1 操作

  • 返回值:自增后的新值

特点:

  • 原子性:多个客户端同时操作也不会出现并发问题

  • 自动创建:如果 key 不存在,会先初始化为 0,然后执行 +1

  • 键名构成"icr:业务前缀:2024:03:15"

测试类

@Test
void testIdWorker() throws InterruptedException {CountDownLatch latch = new CountDownLatch(300);Runnable task = () -> {for (int i = 0; i < 100; i++) {long id = redisIdWorker.nextId("order");System.out.println("id = " + id);}latch.countDown();};long begin = System.currentTimeMillis();for (int i = 0; i < 300; i++) {es.submit(task);}latch.await();long end = System.currentTimeMillis();System.out.println("time = " + (end - begin));
}

1.创建任务(定义工作内容)

Runnable task = () -> {for (int i = 0; i < 100; i++) {long id = redisIdWorker.nextId("order");System.out.println("id = " + id);}latch.countDown();
};

✅ 只是定义:这里只是创建了一个Runnable对象,描述了"要做什么",但还没有执行

2. 提交并执行任务

for (int i = 0; i < 300; i++) {es.submit(task);  // 提交任务并开始执行
}

✅ 提交即执行es.submit(task) 不仅提交任务,还立即开始执行(由线程池分配线程来执行)

es.submit(task) 详细解释

es.submit(task) 是 Java 并发编程中向线程池提交任务的核心方法。

  • es 是一个 ExecutorService(执行器服务)实例

  • 也就是我们常说的线程池

  • 它负责管理和调度线程来执行任务

// 创建固定大小的线程池
ExecutorService es = Executors.newFixedThreadPool(10);// 或者创建缓存线程池
ExecutorService es = Executors.newCachedThreadPool();
es.submit(task);"请线程池帮我执行这个任务"

线程池处理

线程池内部:
├─ 如果有空闲线程 → 立即执行任务
├─ 如果没有空闲线程但线程数未达上限 → 创建新线程执行
└─ 如果线程数已达上限 → 任务进入队列等待

知识小贴士:关于countdownlatch

CountDownLatch 是 Java 并发编程中的一个同步工具类,可以理解为 "倒计时门闩" 或 "计数器锁"

countdownlatch名为信号枪:主要的作用是同步协调在多线程的等待于唤醒问题

我们如果没有CountDownLatch ,那么由于程序是异步的,当异步程序没有执行完时,主线程就已经执行完了,然后我们期望的是分线程全部走完之后,主线程再走,所以我们此时需要使用到CountDownLatch

CountDownLatch 中有两个最重要的方法

1、countDown

2、await

await 方法 是阻塞方法,我们担心分线程没有执行完时,main线程就先执行,所以使用await可以让main线程阻塞,那么什么时候main线程不再阻塞呢?当CountDownLatch 内部维护的 变量变为0时,就不再阻塞,直接放行,那么什么时候CountDownLatch 维护的变量变为0 呢,我们只需要调用一次countDown ,内部变量就减少1,我们让分线程和变量绑定, 执行完一个分线程就减少一个变量,当分线程全部走完,CountDownLatch 维护的变量就是0,此时await就不再阻塞,统计出来的时间也就是所有分线程执行完后的时间。

主线程开始
│
├─ 创建计数器(300)
├─ 定义任务模板
├─ 记录开始时间
│
├─ 提交300个任务到线程池
│   ├─ 线程1: 生成100个ID → 计数器-1
│   ├─ 线程2: 生成100个ID → 计数器-1
│   ├─ ...
│   └─ 线程300: 生成100个ID → 计数器-1
│
├─ 主线程等待(await)
│   (等待计数器从300减到0)
│
└─ 所有任务完成 → 计算总耗时

场景设定:

  • 线程池大小:10个线程

  • CPU能力:1个核心(只能真正同时运行1个线程)

  • 任务数量:15个任务

1.任务分配

// 提交15个任务
for (int i = 1; i <= 15; i++) {es.submit(task);
}

结果:

  • 立即执行:10个任务被分配给10个线程,进入就绪状态

  • 等待队列:5个任务进入线程池的等待队列

2.CPU调度(时间片轮转)

text

时间轴:      t1     t2     t3     t4     t5
CPU:      [线程A] [线程B] [线程C] [线程A] [线程D] ...
状态:      运行   运行   运行   运行   运行

具体过程:

  1. 操作系统调度:CPU通过时间片轮转在10个线程间快速切换

  2. 微观串行:每个时刻只有1个线程真正在CPU上运行

  3. 宏观并发:由于切换速度极快(纳秒级),看起来像是10个线程同时在运行

3.任务完成与队列处理

初始:线程池[10个活跃线程] + 队列[5个等待任务]步骤1:线程A完成任务 → 从队列取任务6 → 继续执行
步骤2:线程B完成任务 → 从队列取任务7 → 继续执行  
...
步骤5:线程E完成任务 → 从队列取任务10 → 继续执行此时:队列为空,所有15个任务都在执行或已完成

2.添加优惠卷:

每个店铺都可以发布优惠券,分为平价券和特价券。平价券可以任意购买,而特价券需要秒杀抢购:

tb_voucher:优惠券的基本信息,优惠金额、使用规则等 tb_seckill_voucher:优惠券的库存、开始抢购时间,结束抢购时间。特价优惠券才需要填写这些信息

平价卷由于优惠力度并不是很大,所以是可以任意领取

而代金券由于优惠力度大,所以像第二种卷,就得限制数量,从表结构上也能看出,特价卷除了具有优惠卷的基本信息以外,还具有库存,抢购时间,结束时间等等字段

新增普通卷代码: VoucherController

@PostMapping
public Result addVoucher(@RequestBody Voucher voucher) {voucherService.save(voucher);return Result.ok(voucher.getId());
}

新增秒杀卷代码:

@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {// 保存优惠券save(voucher);// 保存秒杀信息SeckillVoucher seckillVoucher = new SeckillVoucher();seckillVoucher.setVoucherId(voucher.getId());seckillVoucher.setStock(voucher.getStock());seckillVoucher.setBeginTime(voucher.getBeginTime());seckillVoucher.setEndTime(voucher.getEndTime());seckillVoucherService.save(seckillVoucher);// 保存秒杀库存到Redis中stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());
}

VoucherController

@PostMapping("seckill")
public Result addSeckillVoucher(@RequestBody Voucher voucher) {voucherService.addSeckillVoucher(voucher);return Result.ok(voucher.getId());
}

VoucherServiceImpl

@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {// 保存优惠券save(voucher);// 保存秒杀信息SeckillVoucher seckillVoucher = new SeckillVoucher();seckillVoucher.setVoucherId(voucher.getId());seckillVoucher.setStock(voucher.getStock());seckillVoucher.setBeginTime(voucher.getBeginTime());seckillVoucher.setEndTime(voucher.getEndTime());seckillVoucherService.save(seckillVoucher);// 保存秒杀库存到Redis中stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());
}

3.秒杀下单:

3.1基本代码实现

秒杀下单应该思考的内容:

下单时需要判断两点:

  • 秒杀是否开始或结束,如果尚未开始或已经结束则无法下单

  • 库存是否充足,不足则无法下单

下单核心逻辑分析:

当用户开始进行下单,我们应当去查询优惠卷信息,查询到优惠卷信息,判断是否满足秒杀条件

比如时间是否充足,如果时间充足,则进一步判断库存是否足够,如果两者都满足,则扣减库存,创建订单,然后返回订单id,如果有一个条件不满足则直接结束。

VoucherOrderServiceImpl

@Override
public Result seckillVoucher(Long voucherId) {// 1.查询优惠券SeckillVoucher voucher = seckillVoucherService.getById(voucherId);// 2.判断秒杀是否开始if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {// 尚未开始return Result.fail("秒杀尚未开始!");}// 3.判断秒杀是否已经结束if (voucher.getEndTime().isBefore(LocalDateTime.now())) {// 尚未开始return Result.fail("秒杀已经结束!");}// 4.判断库存是否充足if (voucher.getStock() < 1) {// 库存不足return Result.fail("库存不足!");}//5,扣减库存boolean success = seckillVoucherService.update().setSql("stock= stock -1").eq("voucher_id", voucherId).update();if (!success) {//扣减库存return Result.fail("库存不足!");}//6.创建订单VoucherOrder voucherOrder = new VoucherOrder();// 6.1.订单idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);// 6.2.用户idLong userId = UserHolder.getUser().getId();voucherOrder.setUserId(userId);// 6.3.代金券idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);return Result.ok(orderId);}

3.2库存超卖问题分析

有关超卖问题分析:在我们原有代码中是这么写的

 if (voucher.getStock() < 1) {// 库存不足return Result.fail("库存不足!");}//5,扣减库存boolean success = seckillVoucherService.update().setSql("stock= stock -1").eq("voucher_id", voucherId).update();if (!success) {//扣减库存return Result.fail("库存不足!");}

假设线程1过来查询库存,判断出来库存大于1,正准备去扣减库存,但是还没有来得及去扣减,此时线程2过来,线程2也去查询库存,发现这个数量一定也大于1,那么这两个线程都会去扣减库存,最终多个线程相当于一起去扣减库存,此时就会出现库存的超卖问题。

超卖问题是典型的多线程安全问题,针对这一问题的常见解决方案就是加锁:而对于加锁,我们通常有两种解决方案:见下图:

特性悲观锁乐观锁
核心思想“先取锁,再访问”。假设每次数据访问都会导致冲突,因此在操作数据之前,必须先获取锁,确保在锁的持有期间,没有其他事务能修改数据。“先访问,更新时再检查”。假设大部分情况下数据不会冲突,因此直接操作数据,只在提交更新时,检查数据是否被其他事务修改过。
实现方式依赖数据库或编程语言提供的锁机制。
• 数据库SELECT ... FOR UPDATE
• JavasynchronizedReentrantLock
通过程序逻辑实现,通常使用版本号或CAS机制。
• 版本号UPDATE ... SET ..., version=version+1 WHERE id=? AND version=?
• CASUPDATE ... SET balance=80 WHERE id=? AND balance=100
操作步骤1. 开始事务。
2. 获取锁(如 FOR UPDATE)。
3. 读取、修改数据。
4. 提交事务(释放锁)。
1. 读取数据及版本号。
2. 在内存中修改数据。
3. 提交更新,检查版本号/旧值。
4. 若失败,则重试或报错。
类比独占模式:就像租房子,你签了合同(拿到锁)期间,别人不能住。协作模式:就像合租公寓,大家都可以进厨房,但如果你发现你准备用的调料被别人用光了(数据变了),你就得重新规划你的菜谱(重试)。
优点• 简单粗暴:实现简单,理解直观。
• 强一致性:能保证操作过程中的绝对隔离,无冲突。
• 性能高:无锁操作,并发能力强,适合读多写少的场景。
• 避免死锁:由于不长期持有锁,死锁概率大大降低。
缺点• 性能开销大:加锁、释放锁消耗资源,并发量下降。
• 死锁风险:多个事务相互等待锁,容易产生死锁。
• 可扩展性差
• 存在失败风险:更新可能失败,需要额外的重试逻辑。
• ABA问题(CAS法特有)。
• 在写冲突频繁的场景下,重试开销大,性能反而更差。

3.2.1 版本号法

这种方法通过一个独立的、单调递增的字段(版本号)来检测数据的变化。

工作原理:

  1. 读取数据:同时读取数据本身和当前的版本号(例如 version = 1)。

  2. 业务计算:在内存中修改数据。

  3. 更新数据:执行更新时,在 SET 子句中增加版本号,并在 WHERE 子句中指定之前读取的版本号。

  4. sql

    UPDATE table_name 
    SET column1 = new_value, version = version + 1 
    WHERE id = #{id} AND version = #{old_version};
  5. 判断结果

    • 如果该行数据被成功更新(Affected rows > 0),说明在本次操作期间没有其他事务更新过该数据。因为 WHERE 条件中的旧版本号匹配成功了。

    • 如果更新失败(Affected rows = 0),说明在读取之后、更新之前,数据已经被其他事务修改(版本号已经增加),导致 WHERE 条件不匹配。此时操作失败,需要进行重试。

3.2.2cas法

工作原理:

  1. 读取数据:读取数据以及要修改字段的当前值(例如 balance = 100)。

  2. 业务计算:在内存中计算出新值(例如 new_balance = 100 - 20 = 80)。

  3. 更新数据:执行更新时,在 SET 子句中设置新值,并在 WHERE 子句中指定要修改字段的旧值。

    sql

    UPDATE table_name 
    SET balance = 80 
    WHERE id = #{id} AND balance = 100;
  4. 判断结果

    • 如果更新成功,说明在本次操作期间,balance 字段没有被其他事务修改。

    • 如果更新失败,说明 balance 已经被其他事务修改,当前值已不是100

存在ABA问题:这是CAS法的经典缺陷。假设 balance 的变化是 100 -> 50 -> 100,虽然在你的两次操作之间它确实被改动过,但最终值又变回了100。你的CAS操作会因为 WHERE balance=100 成立而成功执行,但它没有感知到中间的“波折”。这在某些业务场景下是危险的(例如,链表操作、状态机流转)。

3.2.3对比表格

特性版本号法CAS 法
核心机制通过一个独立的、单调递增的版本号字段来检测数据变化。通过比较业务字段本身的旧值来检测变化。
检测粒度记录级:只要记录有任何字段被更新,版本号就会变。字段级:只关心你正在更新的那个字段是否被改变。
ABA 问题不存在。因为版本号只增不减,状态是单向的。存在。字段值可能从A变为B再变回A,CAS会误判。
与业务耦合度。版本号是独立的技术性字段。。直接使用业务字段进行并发控制。
适用场景通用场景,尤其是需要严格保证数据完整性和一致性的情况。对特定业务字段进行原子更新,且该字段的ABA问题不会造成业务影响。
实现示例WHERE id=? AND version=?WHERE id=? AND balance=?

3.2.4  实际代码解决

这里给出对应sql语句

sql

UPDATE seckill_voucher 
SET stock = stock - 1 
WHERE voucher_id = #{voucherId} AND stock = #{voucher.getStock}

以上逻辑的核心含义是:只要我扣减库存时的库存和之前我查询到的库存是一样的,就意味着没有人在中间修改过库存,那么此时就是安全的,但是以上这种方式通过测试发现会有很多失败的情况,失败的原因在于:在使用乐观锁过程中假设100个线程同时都拿到了100的库存,然后大家一起去进行扣减,但是100个人中只有1个人能扣减成功,其他的人在处理时,他们在扣减时,库存已经被修改过了,所以此时其他线程都会失败

之前的方式要修改前后都保持一致,但是这样我们分析过,成功的概率太低,所以我们的乐观锁需要变一下,改成stock大于0 即可

所以根据实际情况,我们这里再次优化

sql

UPDATE seckill_voucher 
SET stock = stock - 1 
WHERE voucher_id = #{voucherId} AND stock > 0

4.一人一单(单体项目一步步解决)

需求:修改秒杀业务,要求同一个优惠券,一个用户只能下一单

现在的问题在于:

优惠卷是为了引流,但是目前的情况是,一个人可以无限制的抢这个优惠卷,所以我们应当增加一层逻辑,让一个用户只能下一个单,而不是让一个用户下多个单

具体操作逻辑如下:比如时间是否充足,如果时间充足,则进一步判断库存是否足够,然后再根据优惠卷id和用户id查询是否已经下过这个订单,如果下过这个订单,则不再下单,否则进行下单

VoucherOrderServiceImpl

初步代码:增加一人一单逻辑

@Override
public Result seckillVoucher(Long voucherId) {// 1.查询优惠券SeckillVoucher voucher = seckillVoucherService.getById(voucherId);// 2.判断秒杀是否开始if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {// 尚未开始return Result.fail("秒杀尚未开始!");}// 3.判断秒杀是否已经结束if (voucher.getEndTime().isBefore(LocalDateTime.now())) {// 尚未开始return Result.fail("秒杀已经结束!");}// 4.判断库存是否充足if (voucher.getStock() < 1) {// 库存不足return Result.fail("库存不足!");}// 5.一人一单逻辑// 5.1.用户idLong userId = UserHolder.getUser().getId();
//这里查的是订单表int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();// 5.2.判断是否存在if (count > 0) {// 用户已经购买过了return Result.fail("用户已经购买过一次!");}//6,扣减库存boolean success = seckillVoucherService.update().setSql("stock= stock -1").eq("voucher_id", voucherId).update();if (!success) {//扣减库存return Result.fail("库存不足!");}//7.创建订单VoucherOrder voucherOrder = new VoucherOrder();// 7.1.订单idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);voucherOrder.setUserId(userId);// 7.3.代金券idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);return Result.ok(orderId);}

存在问题:现在的问题还是和之前一样,并发过来,查询数据库,都不存在订单,所以我们还是需要加锁,但是乐观锁比较适合更新数据,而现在是插入数据,所以我们需要使用悲观锁操作

就像黄牛一次性直接发了1000个请求,

//这里查的是订单表
    int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
    // 5.2.判断是否存在
    if (count > 0) {
        // 用户已经购买过了
        return Result.fail("用户已经购买过一次!");
    }

这里查询全通过了,那不就是他买了1000个吗?

注意:在这里提到了非常多的问题,我们需要慢慢的来思考,首先我们的初始方案是封装了一个createVoucherOrder方法,同时为了确保他线程安全,在方法上添加了一把synchronized 锁

@Transactional
public synchronized Result createVoucherOrder(Long voucherId) {Long userId = UserHolder.getUser().getId();// 5.1.查询订单int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();// 5.2.判断是否存在if (count > 0) {// 用户已经购买过了return Result.fail("用户已经购买过一次!");}// 6.扣减库存boolean success = seckillVoucherService.update().setSql("stock = stock - 1") // set stock = stock - 1.eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0.update();if (!success) {// 扣减失败return Result.fail("库存不足!");}// 7.创建订单VoucherOrder voucherOrder = new VoucherOrder();// 7.1.订单idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);// 7.2.用户idvoucherOrder.setUserId(userId);// 7.3.代金券idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);// 7.返回订单idreturn Result.ok(orderId);
}

但是这样添加锁,锁的粒度太粗了,在使用锁过程中,控制锁粒度 是一个非常重要的事情,因为如果锁的粒度太大,会导致每个线程进来都会锁住,所以我们需要去控制锁的粒度,以下这段代码需要修改为:(对每个用户的id加锁,因为我们防止的本身就是黄牛一次性买太多)

intern() 这个方法是从常量池中拿到数据,如果我们直接使用userId.toString() 他拿到的对象实际上是不同的对象,new出来的对象,我们使用锁必须保证锁必须是同一把,所以我们需要使用intern()方法

@Transactional
public  Result createVoucherOrder(Long voucherId) {Long userId = UserHolder.getUser().getId();synchronized(userId.toString().intern()){// 5.1.查询订单int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();// 5.2.判断是否存在if (count > 0) {// 用户已经购买过了return Result.fail("用户已经购买过一次!");}// 6.扣减库存boolean success = seckillVoucherService.update().setSql("stock = stock - 1") // set stock = stock - 1.eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0.update();if (!success) {// 扣减失败return Result.fail("库存不足!");}// 7.创建订单VoucherOrder voucherOrder = new VoucherOrder();// 7.1.订单idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);// 7.2.用户idvoucherOrder.setUserId(userId);// 7.3.代金券idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);// 7.返回订单idreturn Result.ok(orderId);}
}

但是以上代码还是存在问题,问题的原因在于当前方法被spring的事务控制,如果你在方法内部加锁,可能会导致当前方法事务还没有提交,但是锁已经释放也会导致问题,所以我们选择将当前方法整体包裹起来,确保事务不会出现问题:如下:

在seckillVoucher 方法中,添加以下逻辑,这样就能保证事务的特性,同时也控制了锁的粒度

但是以上做法依然有问题,因为你调用的方法,其实是this.的方式调用的,事务想要生效,还得利用代理来生效,所以这个地方,我们需要获得原始的事务对象, 来操作事务 Spring容器中的对象到底是什么?

@Service
public class UserService {public void methodA() {System.out.println("methodA - this: " + this.getClass());}public void methodB() {System.out.println("methodB - this: " + this.getClass());this.methodA();  // 这里的this是什么?}
}

输出结果可能是:(这是一个很重要的问题。不加this关键字,结果完全一样

注入的userService: class com.example.UserService$$EnhancerBySpringCGLIB$$12345678
methodB - this: class com.example.UserService$$EnhancerBySpringCGLIB$$12345678  
methodA - this: class com.example.UserService$$EnhancerBySpringCGLIB$$12345678

@Service
public class UserService {  @Transactionalpublic void methodA() {System.out.println("methodA - this: " + this.getClass());}public void methodB() {System.out.println("methodB - this: " + this.getClass());this.methodA();  // 这里的this是什么?}
}

输出结果可能是:(这是一个很重要的问题。不加this关键字,结果完全一样

注入的userService:class com.example.UserService$$EnhancerBySpringCGLIB$$12345678
methodB - this: class com.example.UserService$$EnhancerBySpringCGLIB$$12345678
methodA - this: class com.example.UserService

在Java中,以下两种写法是完全等价的:

java

methodA();        // 隐式调用
this.methodA();   // 显式调用

代理对象和目标对象的关系

java

// Spring创建的代理对象内部结构大致如下:
public class UserService$$EnhancerBySpringCGLIB extends UserService {// 持有目标对象(真实实例)的引用private UserService target;// 拦截器链(包含事务拦截器、日志拦截器等)private List<MethodInterceptor> interceptors;@Overridepublic void businessMethod() {// 1. 执行AOP前置处理// 2. 调用目标对象的方法:target.businessMethod()// 3. 执行AOP后置处理}
}

普通方法调用示例

@Service
public class UserService {public void methodA() {System.out.println("执行方法A");// 普通业务逻辑}public void methodB() {System.out.println("执行方法B");methodA();  // 这里直接调用完全没问题}public void methodC() {System.out.println("执行方法C");this.methodA();  // 使用this调用也没问题}
}

什么情况下会有问题?

只有当方法需要AOP增强功能时,比如:

java

@Service
public class UserService {@Transactional  // 需要事务增强public void transactionalMethod() {// 数据库操作}@Async  // 需要异步增强public void asyncMethod() {// 异步执行}@Cacheable  // 需要缓存增强  public void cacheMethod() {// 缓存操作}public void callerMethod() {// 这些调用都会失效!transactionalMethod();  // 事务失效asyncMethod();         // 异步失效cacheMethod();         // 缓存失效}
}
调用场景普通方法需要AOP增强的方法
this.method()✅ 正常❌ AOP增强失效
proxy.method()✅ 正常✅ AOP增强生效
直接调用 method()✅ 正常❌ AOP增强失效

解决方案

方案1:注入自身代理

java

@Service
public class OrderService {@Autowiredprivate OrderService self; // 注入代理对象public void createOrder(Order order) {orderMapper.insert(order);self.updateInventory(order); // 通过代理对象调用}@Transactionalpublic void updateInventory(Order order) {// 现在事务生效了!inventoryMapper.update(order.getProductId(), order.getQuantity());}
}

方案2:使用AopContext

java

@Service
@EnableAspectJAutoProxy(exposeProxy = true)
public class OrderService {public void createOrder(Order order) {orderMapper.insert(order);// 获取当前代理对象OrderService proxy = (OrderService) AopContext.currentProxy();proxy.updateInventory(order);}
}

4.0(这种方案在集群环境下的并发问题 )

通过加锁可以解决在单机情况下的一人一单安全问题,但是在集群模式下就不行了。

有关锁失效原因分析

由于现在我们部署了多个tomcat,每个tomcat都有一个属于自己的jvm,那么假设在服务器A的tomcat内部,有两个线程,这两个线程由于使用的是同一份代码,那么他们的锁对象是同一个,是可以实现互斥的,但是如果现在是服务器B的tomcat内部,又有两个线程,但是他们的锁对象写的虽然和服务器A一样,但是锁对象却不是同一个,所以线程3和线程4可以实现互斥,但是却无法和线程1和线程2实现互斥,这就是 集群环境下,syn锁失效的原因,在这种情况下,我们就需要使用分布式锁来解决这个问题。

分布式锁:

1.基本原理和实现方式对比:

分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。

分布式锁的核心思想就是让大家都使用同一把锁,只要大家使用的是同一把锁,那么我们就能锁住线程,不让线程进行,让程序串行执行,这就是分布式锁的核心思路

那么分布式锁他应该满足一些什么样的条件呢?

可见性:多个线程都能看到相同的结果,注意:这个地方说的可见性并不是并发编程中指的内存可见性,只是说多个进程之间都能感知到变化的意思

互斥:互斥是分布式锁的最基本的条件,使得程序串行执行

高可用:程序不易崩溃,时时刻刻都保证较高的可用性

高性能:由于加锁本身就让性能降低,所有对于分布式锁本身需要他就较高的加锁性能和释放锁性能

安全性:安全也是程序中必不可少的一环

常见的分布式锁有三种

Mysql:mysql本身就带有锁机制,但是由于mysql性能本身一般,所以采用分布式锁的情况下,其实使用mysql作为分布式锁比较少见

Redis:redis作为分布式锁是非常常见的一种使用方式,现在企业级开发中基本都使用redis或者zookeeper作为分布式锁,利用setnx这个方法,如果插入key成功,则表示获得到了锁,如果有人插入成功,其他人插入失败则表示无法获得到锁,利用这套逻辑来实现分布式锁

Zookeeper:zookeeper也是企业级开发中较好的一个实现分布式锁的方案,由于本套视频并不讲解zookeeper的原理和分布式锁的实现,所以不过多阐述

在MySQL集群环境下实现互斥锁,主要有基于数据库表行级锁乐观锁几种方式。为了让你快速了解,我用一个表格来汇总它们的主要特点:

实现方式核心原理优点缺点适用场景
基于防重表利用数据库表唯一索引约束,插入成功表示获锁。实现简单,利用数据库机制保证互斥。1. 数据库单点风险
2. 无失效时间,需额外处理
3. 非阻塞,插入失败直接返回
4. 通常不可重入
并发量不高,对锁的可用性要求不苛刻的短期任务。
基于悲观锁 (SELECT ... FOR UPDATE)基于数据库的行级排他锁,锁定锁表中的特定记录。1. 由数据库保证互斥与阻塞
2. 可避免死锁(表锁情况下)。
1. 数据库单点风险
2. 性能开销较大,可能影响系统可用性
3. 需要注意连接与会话的管理。
需要阻塞等待锁,且并发量不是特别高的场景。
基于乐观锁 (通过版本号等机制)通过版本号字段条件判断,在更新时比较版本号或校验数据。1. 避免数据库锁开销,性能较好
2. 实现相对简单。
1. 需自行处理更新失败(如重试或返回失败)
2. 高并发下成功率下降,增加数据库压力。

读多写少,并发冲突概率较低的场景。

一、基于防重表(唯一索引)

这种方法的核心是创建一张锁表,利用 method_name(或类似)字段的唯一性约束来保证互斥。

  1. 创建锁表

    sql

    CREATE TABLE `distributed_lock` (`id` int(11) unsigned NOT NULL AUTO_INCREMENT,`lock_type` varchar(64) NOT NULL COMMENT '锁类型,需唯一',`owner_id` varchar(255) NOT NULL COMMENT '持锁者标识,可用于实现可重入',`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,PRIMARY KEY (`id`),UNIQUE KEY `idx_lock_type` (`lock_type`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='分布式锁';
  2. 加锁操作:尝试向表中插入一条代表锁的记录。若插入成功,则获取锁;若因唯一索引冲突插入失败,则获取锁失败。

    sql

    INSERT INTO `distributed_lock` (`lock_type`, `owner_id`) VALUES ('order_lock', 'server_1');
  3. 解锁操作:删除对应的记录。

    sql

    DELETE FROM `distributed_lock` WHERE `lock_type` = 'order_lock' AND `owner_id` = 'server_1';
  4. 注意事项与改进

    • 锁超时:可以增加一个 expire_time 字段记录锁的过期时间,并通过后台任务清理超时的锁。

    • 可重入:通过 owner_id 字段标识当前锁的持有者。同一持有者再次加锁时,可改为更新操作(如更新update_time)并判断是否为自己的锁,从而实现可重入。

二、基于悲观锁(SELECT ... FOR UPDATE)

这种方式利用MySQL的行级排他锁SELECT ... FOR UPDATE)来实现。

  1. 加锁操作:在一个事务中查询目标锁记录并使用 FOR UPDATE

    sql

    BEGIN;
    SELECT * FROM `distributed_lock` WHERE `lock_type` = 'order_lock' FOR UPDATE;
  2. 执行业务逻辑:如果上一步成功执行,当前会话便获得了锁,可以执行后续业务代码。

  3. 解锁操作:提交事务,释放锁。

    sql

    COMMIT;
  4. 关键点

    • 务必确保 FOR UPDATE 的查询条件使用了索引(最好是唯一索引),否则可能锁表

    • 保持连接:加锁与解锁必须在同一个数据库连接(会话)中进行。

三、基于乐观锁(版本控制)

乐观锁不直接加锁,而是在更新数据时检查数据是否被其他会话修改过。

  1. 为业务数据表增加版本号字段

    sql

    ALTER TABLE `your_business_table` ADD `version` int(11) NOT NULL DEFAULT '0';
  2. 更新时检查版本号

    sql

    UPDATE `your_business_table`
    SET `stock` = `stock` - 1, `version` = `version` + 1
    WHERE `id` = 100 AND `version` = 5;
  3. 判断更新结果:检查该UPDATE语句的影响行数。如果影响行数为0,说明版本号不符或数据已更新,意味着获取锁(或更新权)失败,需要根据业务逻辑(如重试)处理。

基于悲观锁(SELECT ... FOR UPDATE)

🔒 FOR UPDATE 的详细机制

锁的行为特性:

  • 排他性:其他事务无法同时对同一行加任何类型的锁

  • 阻塞性:如果锁已被占用,其他事务的 FOR UPDATE 会阻塞等待

  • 事务绑定:锁的持有时间与事务生命周期一致

  • 自动释放:事务提交(COMMIT)或回滚(ROLLBACK)时自动释放锁

实际效果示例:

事务A(获取锁):

sql

-- 事务A开始
BEGIN;
SELECT * FROM `distributed_lock` WHERE `lock_type` = 'order_lock' FOR UPDATE;
-- 此时事务A获得了 'order_lock' 的排他锁

事务B(尝试获取同一锁):

sql

-- 事务B开始  
BEGIN;
SELECT * FROM `distributed_lock` WHERE `lock_type` = 'order_lock' FOR UPDATE;
-- ⚠️ 这里会被阻塞,直到事务A提交或回滚!

2.Redis分布式锁的实现核心思路

实现分布式锁时需要实现的两个基本方法:

  • 获取锁:

    • 互斥:确保只能有一个线程获取锁

    • 非阻塞:尝试一次,成功返回true,失败返回false

  • 释放锁:

    • 手动释放

    • 超时释放:获取锁时添加一个超时时间

核心思路:

我们利用redis 的setNx 方法,当有多个线程进入时,我们就利用该方法,第一个线程进入时,redis 中就有这个key 了,返回了1,如果结果是1,则表示他抢到了锁,那么他去执行业务,然后再删除锁,退出锁逻辑,没有抢到锁的哥们,等待一定时间后重试即可

2.1实现分布式锁最初版本

新建SimpleRedisLock类:

  • 加锁逻辑

利用setnx方法进行加锁,同时增加过期时间,防止死锁,此方法可以保证加锁和增加过期时间具有原子性

    private  String name;private StringRedisTemplate stringRedisTemplate;public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}private static final String KEY_PREFIX="lock:"
@Override
public boolean tryLock(long timeoutSec) {// 获取线程标示String threadId = Thread.currentThread().getId()// 获取锁Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS);return Boolean.TRUE.equals(success);
}
  • 释放锁逻辑

public void unlock() {//通过del删除锁stringRedisTemplate.delete(KEY_PREFIX + name);
}
  • 修改业务代码

     @Overridepublic Result seckillVoucher(Long voucherId) {// 1.查询优惠券SeckillVoucher voucher = seckillVoucherService.getById(voucherId);// 2.判断秒杀是否开始if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {// 尚未开始return Result.fail("秒杀尚未开始!");}// 3.判断秒杀是否已经结束if (voucher.getEndTime().isBefore(LocalDateTime.now())) {// 尚未开始return Result.fail("秒杀已经结束!");}// 4.判断库存是否充足if (voucher.getStock() < 1) {// 库存不足return Result.fail("库存不足!");}Long userId = UserHolder.getUser().getId();//创建锁对象(新增代码)SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);//获取锁对象boolean isLock = lock.tryLock(1200);//加锁失败if (!isLock) {return Result.fail("不允许重复下单");}try {//获取代理对象(事务)IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(voucherId);} finally {//释放锁lock.unlock();}}

    2.2Redis分布式锁误删情况(优化)版本2

逻辑说明:

持有锁的线程在锁的内部出现了阻塞,导致他的锁自动释放,这时其他线程,线程2来尝试获得锁,就拿到了这把锁,然后线程2在持有锁执行过程中,线程1反应过来,继续执行,而线程1执行过程中,走到了删除锁逻辑,此时就会把本应该属于线程2的锁进行删除,这就是误删别人锁的情况说明

解决方案:解决方案就是在每个线程释放锁的时候,去判断一下当前这把锁是否属于自己,如果属于自己,则不进行锁的删除,假设还是上边的情况,线程1卡顿,锁自动释放,线程2进入到锁的内部执行逻辑,此时线程1反应过来,然后删除锁,但是线程1,一看当前这把锁不是属于自己,于是不进行删除锁逻辑,当线程2走到删除锁逻辑时,如果没有卡过自动释放锁的时间点,则判断当前这把锁是属于自己的,于是删除这把锁。

需求:修改之前的分布式锁实现,满足:在获取锁时存入线程标示(可以用UUID表示) 在释放锁时先获取锁中的线程标示,判断是否与当前线程标示一致

  • 如果一致则释放锁

  • 如果不一致则不释放锁

核心逻辑:在存入锁时,放入自己线程的标识,在删除锁时,判断当前这把锁的标识是不是自己存入的,如果是,则进行删除,如果不是,则不进行删除。

具体代码如下:加锁

    private  String name;private StringRedisTemplate stringRedisTemplate;public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}private static final String KEY_PREFIX="lock:"
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
@Override
public boolean tryLock(long timeoutSec) {// 获取线程标示String threadId = ID_PREFIX + Thread.currentThread().getId();// 获取锁Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);return Boolean.TRUE.equals(success);
}

释放锁

public void unlock() {// 获取线程标示String threadId = ID_PREFIX + Thread.currentThread().getId();// 获取锁中的标示String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);// 判断标示是否一致if(threadId.equals(id)) {// 释放锁stringRedisTemplate.delete(KEY_PREFIX + name);}
}

有关代码实操说明:

在我们修改完此处代码后,我们重启工程,然后启动两个线程,第一个线程持有锁后,手动释放锁,第二个线程 此时进入到锁内部,再放行第一个线程,此时第一个线程由于锁的value值并非是自己,所以不能释放锁,也就无法删除别人的锁,此时第二个线程能够正确释放锁,通过这个案例初步说明我们解决了锁误删的问题。

JVM重启后的线程ID分配

java

// 第一次启动Spring项目
public class FirstStart {public static void main(String[] args) {System.out.println("第一次启动 - 主线程ID: " + Thread.currentThread().getId()); // 通常是1Thread t1 = new Thread(() -> {});System.out.println("第一次启动 - 线程1 ID: " + t1.getId()); // 比如11Thread t2 = new Thread(() -> {});System.out.println("第一次启动 - 线程2 ID: " + t2.getId()); // 比如12}
}// 关闭项目后,第二次启动
public class SecondStart {public static void main(String[] args) {System.out.println("第二次启动 - 主线程ID: " + Thread.currentThread().getId()); // 重新从1开始Thread t1 = new Thread(() -> {});System.out.println("第二次启动 - 线程1 ID: " + t1.getId()); // 重新从11开始(或类似的起始值)Thread t2 = new Thread(() -> {});System.out.println("第二次启动 - 线程2 ID: " + t2.getId()); // 12}
}

2.3分布式锁的原子性问题(优化)版本3

更为极端的误删逻辑说明:

线程1现在持有锁之后,在执行业务逻辑过程中,他正准备删除锁,而且已经走到了条件判断的过程中,比如他已经拿到了当前这把锁确实是属于他自己的,正准备删除锁,但是此时他的锁到期了,那么此时线程2进来,但是线程1他会接着往后执行,当他卡顿结束后,他直接就会执行删除锁那行代码,相当于条件判断并没有起到作用,这就是删锁时的原子性问题,之所以有这个问题,是因为线程1的拿锁,比锁,删锁,实际上并不是原子性的,我们要防止刚才的情况发生,

利用Lua脚本解决多条命令原子性问题:

Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。Lua是一种编程语言,它的基本语法大家可以参考网站:https://www.runoob.com/lua/lua-tutorial.html,这里重点介绍Redis提供的调用函数,我们可以使用lua去操作redis,又能保证他的原子性,这样就可以实现拿锁比锁删锁是一个原子性动作了,作为Java程序员这一块并不作一个简单要求,并不需要大家过于精通,只需要知道他有什么作用即可。

这里重点介绍Redis提供的调用函数,语法如下:

redis.call('命令名称', 'key', '其它参数', ...)

例如,我们要执行set name jack,则脚本是这样:

# 执行 set name jack
redis.call('set', 'name', 'jack')

例如,我们要先执行set name Rose,再执行get name,则脚本如下:

# 先执行 set name jack
redis.call('set', 'name', 'Rose')
# 再执行 get name
local name = redis.call('get', 'name')
# 返回
return name

写好脚本以后,需要用Redis命令来调用脚本,调用脚本的常见命令如下:

例如,我们要执行 redis.call('set', 'name', 'jack') 这个脚本,语法如下: 如果脚本中的key、value不想写死,可以作为参数传递。key类型参数会放入KEYS数组,其它参数会放入ARGV数组,在脚本中可以从KEYS和ARGV数组获取这些参数:

接下来我们来回一下我们释放锁的逻辑:

释放锁的业务流程是这样的

1、获取锁中的线程标示

2、判断是否与指定的标示(当前线程标示)一致

3、如果一致则释放锁(删除)

4、如果不一致则什么都不做

如果用Lua脚本来表示则是这样的:

最终我们操作redis的拿锁比锁删锁的lua脚本就会变成这样

-- 这里的 KEYS[1] 就是锁的key,这里的ARGV[1] 就是当前线程标示
-- 获取锁中的标示,判断是否与当前线程标示一致
if (redis.call('GET', KEYS[1]) == ARGV[1]) then-- 一致,则删除锁return redis.call('DEL', KEYS[1])
end
-- 不一致,则直接返回
return 0

利用Java代码调用Lua脚本改造分布式锁

lua脚本本身并不需要大家花费太多时间去研究,只需要知道如何调用,大致是什么意思即可,所以在笔记中并不会详细的去解释这些lua表达式的含义。

我们的RedisTemplate中,可以利用execute方法去执行lua脚本,参数对应关系就如下图

private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;static {UNLOCK_SCRIPT = new DefaultRedisScript<>();UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));UNLOCK_SCRIPT.setResultType(Long.class);}public void unlock() {// 调用lua脚本stringRedisTemplate.execute(UNLOCK_SCRIPT,Collections.singletonList(KEY_PREFIX + name),ID_PREFIX + Thread.currentThread().getId());
}
经过以上代码改造后,我们就能够实现 拿锁比锁删锁的原子性动作了~

测试逻辑:

第一个线程进来,得到了锁,手动删除锁,模拟锁超时了,其他线程会执行lua来抢锁,当第一天线程利用lua删除锁时,lua能保证他不能删除他的锁,第二个线程删除锁时,利用lua同样可以保证不会删除别人的锁,同时还能保证原子性。

private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
  • private:私有,只在当前类使用

  • static final:静态常量,类加载时初始化且不可改变

  • DefaultRedisScript<Long>:Spring Data Redis提供的脚本执行器,<Long>表示脚本返回类型

static {UNLOCK_SCRIPT = new DefaultRedisScript<>();UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));UNLOCK_SCRIPT.setResultType(Long.class);
}

静态代码块特点

  • 在类加载时执行,且只执行一次

  • 用于初始化静态变量

1. 创建脚本实例

UNLOCK_SCRIPT = new DefaultRedisScript<>();

创建一个空的Redis脚本执行器实例。

2. 设置脚本位置

UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
  • ClassPathResource("unlock.lua"):从classpath根目录查找unlock.lua文件

  • 文件位置通常:src/main/resources/unlock.lua

3. 设置返回类型

UNLOCK_SCRIPT.setResultType(Long.class);

指定Lua脚本执行后的返回值类型为Long,对应Redis的整数类型。

通过这种初始化方式:

  • 应用启动时就会加载unlock.lua脚本

  • 后续调用stringRedisTemplate.execute(UNLOCK_SCRIPT, ...)时直接使用预加载的脚本

  • 避免了每次执行时的文件IO和脚本解析开销

redission

1-redission功能介绍:

于setnx实现的分布式锁存在下面的问题:

重入问题:重入问题是指 获得锁的线程可以再次进入到相同的锁的代码块中,可重入锁的意义在于防止死锁,比如HashTable这样的代码中,他的方法都是使用synchronized修饰的,假如他在一个方法内,调用另一个方法,那么此时如果是不可重入的,不就死锁了吗?所以可重入锁他的主要意义是防止死锁,我们的synchronized和Lock锁都是可重入的。

不可重试:是指目前的分布式只能尝试一次,我们认为合理的情况是:当线程在获得锁失败后,他应该能再次尝试获得锁。

超时释放:我们在加锁时增加了过期时间,这样的我们可以防止死锁,但是如果卡顿的时间超长,虽然我们采用了lua表达式防止删锁的时候,误删别人的锁,但是毕竟没有锁住,有安全隐患

主从一致性: 如果Redis提供了主从集群,当我们向集群写数据时,主机需要异步的将数据同步给从机,而万一在同步过去之前,主机宕机了,就会出现死锁问题。

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。

Redission提供了分布式锁的多种多样的功能

快速入门:

引入依赖:

<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.13.6</version>
</dependency>

配置Redisson客户端:

@Configuration
public class RedissonConfig {
​@Beanpublic RedissonClient redissonClient(){// 配置Config config = new Config();config.useSingleServer().setAddress("redis://192.168.150.101:6379")(用自己虚拟机的端口号).setPassword("123321");(自己的密码)// 创建RedissonClient对象return Redisson.create(config);}
}
​

如何使用Redission的分布式锁

@Resource
private RedissionClient redissonClient;
​
@Test
void testRedisson() throws Exception{//获取锁(可重入),指定锁的名称RLock lock = redissonClient.getLock("anyLock");//尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位boolean isLock = lock.tryLock(1,10,TimeUnit.SECONDS);//判断获取锁成功if(isLock){try{System.out.println("执行业务");          }finally{//释放锁lock.unlock();}}}

在 VoucherOrderServiceI

@Resource
private RedissonClient redissonClient;@Override
public Result seckillVoucher(Long voucherId) {// 1.查询优惠券SeckillVoucher voucher = seckillVoucherService.getById(voucherId);// 2.判断秒杀是否开始if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {// 尚未开始return Result.fail("秒杀尚未开始!");}// 3.判断秒杀是否已经结束if (voucher.getEndTime().isBefore(LocalDateTime.now())) {// 尚未开始return Result.fail("秒杀已经结束!");}// 4.判断库存是否充足if (voucher.getStock() < 1) {// 库存不足return Result.fail("库存不足!");}Long userId = UserHolder.getUser().getId();//创建锁对象 这个代码不用了,因为我们现在要使用分布式锁//SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);RLock lock = redissonClient.getLock("lock:order:" + userId);//获取锁对象boolean isLock = lock.tryLock();//加锁失败if (!isLock) {return Result.fail("不允许重复下单");}try {//获取代理对象(事务)IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(voucherId);} finally {//释放锁lock.unlock();}}

2.redission可重入锁原理

我们之前的代码,如果锁还是当单个key+value的话,像上面method1获取到锁以后,调用method2方法,但是这个时候method2方法获取锁就失败了!! method1和method2都是同一个请求同一个线程,这样不就出问题了吗?

我们的解决方法采取用hash结构存储

其中大key表示表示这把锁是否存在,用小key表示当前这把锁被哪个线程持有,所以接下来我们一起分析一下当前的这个lua表达式

获取锁方法

exists: 判断数据是否存在 ,  如果==0(表示没获取到这把锁),就表示当前这把锁不存在

redis.call('hset',key,threadId,"1");不存在的话就可以获取锁了,找到这个所对大key小key,设为1(因为这个时候判断了不存在,是第一次拿锁)

之后设置一下有效期

返回结果

如果当前这把锁存在,则第一个条件不满足,再判断

redis.call('hexists',key, threadId) == 1 判断存在不存在,存在的话redis会返回1

如果返回1了,说明存在,此时需要通过大key+小key判断当前这把锁是否是属于自己的,如果是自己的,则进行

redis.call('hincrby',key,threadId,"1")  redis中的自增函数,让对应的大key小key自增1

将当前这个锁的value进行+1 ,

redis.call('expire', key,releaseTime); 然后再对其设置过期时间,之后返回结果return 1

如果以上两个条件都不满足,则表示当前这把锁抢锁失败,最后返回0 return 0

获取锁源码:

这个地方一共有3个参数

KEYS[1] : 锁名称

ARGV[1]: 锁失效时间

ARGV[2]: id + ":" + threadId; 锁的小key

"if (redis.call('exists', KEYS[1]) == 0) then " +

                  "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                  "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              "return redis.call('pttl', KEYS[1]);"

最后返回pttl,即为当前这把锁的失效时间

释放锁方法

HEXISTS,先判断小key大key看这个锁是不是自己的,不是自己的就返回

如果是自己的锁,hincrby,让这个锁的value➖1

然后判断这时候value是不是大于0,如果大于零说明重入锁还没有结束,不能释放锁

如果发现大小等于0了,就可以释放锁了,直接删除即可

释放锁源码

这个地方一共有3个参数

KEYS[1] : 锁名称

ARGV[1]: 锁失效时间

ARGV[2]: id + ":" + threadId; 锁的小key

3.redission锁重试和WatchDog机制(源码解读)

3.1redission锁重试

 在调用trylock函数时,我们也会传进去一个waitTime(代表等待的时间)

time就是等待

current得到现在的时间

threadId得到当前线程的id

然后进入tryAcquire(尝试获取锁),tryAcquire里直接调用tryAcquireAsinc

 leaseTime默认是-1,如果不是负一,就调用我传进来的leaseTime,如果是-1,就用我们的看门狗机制

 之后就可以获取锁了,上面重入讲过了

我们重点在下面的重试

 如果获取锁成功了,那么ttl就会为null,if语句成立,返回true

然后算一下我的等待时间还有没有,如果小于0了,就返回false代表过去锁失败

先严谨了一波,先判断下time结束了没,结束了就没必要等了

 这里是不会立马去重试的,那样太傻了,这里利用subscrible订阅了别人释放锁的信号(在释放锁代码中有一个public,在释放后会发布一个消息通知)

后面会尝试等待,!subscribleFuture.await,会等待time的时间,如果上面的future在我指定的时间内完成,那么就会返回true,使得代码继续进行

但是如果超时等不到的话,就会返回false,进入if语句中,先unsubscrible取消订阅,在返回一个false,代表获取锁失败

 先严谨了一波,先判断下time结束了没,结束了就没必要等了

这时候就进去循环里面,开始第一次获取锁了,再返回一个ttl,如果ttl是null,说明获取锁成功了,不用管了,返回true

失败的话同样在判断一下time,

这时候我们也不会立马就重试,还是小判断一下

然后通过信号量的方式getLatch,释放锁的时候是会给一个信号的,在if中(如果经过规定的时间还没拿到就返回false,如果拿到了就返回true),根据ttl和time的大小,如果ttl小,没必要等太长时间,等个ttl,等他释放了就可以直接用了。

如果time小,那我就等个time时间,time到期了就没必要等了

等待对应时间后,time还有时间的话重新进去循环中(因为别的ttl到期后,我还需要和其他线程去抢锁,有可能我是抢不到的)

3.2看门狗机制

 执行ttlRemainingFuture.onComplete方法中,e是异常,如果异常不是空,代表有异常,返回

只要没有异常,只要ttkRemaining=null,代表我获取锁成功了,这时候解决有效期的问题

进入scheduleExpirationRenewal方法(任务调度,过期时间的续约)

 创建了一个静态的map,getEntryname()

相当于获取锁的名称,entryName=id➕":"+name

每个锁都有自己的名字,在map里面有着自己的名字和entry

putIfAbsent,如果不存在再放,放得就是全新的,如果存在的话就不会执行

oldEntry如果不是空,说明不是第一次来了把threadId加进去

如果oldEntry是空,说明我是第一次来,map.里面还没有对应的entry,不仅要加threadId,还要执行renewExpiration方法(续约)

进入renewExpiration方法后,Timeout定义了一个定式任务,TimerTask有两个参数,一个是任务本身task,另一个是时间范围delay

在delay到期后执行这个延时任务

对于internalLockLeaseTime其实就是我们当时系统传进去的看门狗时间30s

所以每过10s,执行下这个任务

 我们展开这个具体看看,先从map中拿出entry给ee,再从entry中拿出threadId,之后调用renewExpirationAsinc方法刷新

 KEYS[1] : 锁名称

ARGV[1]: 锁失效时间

ARGV[2]: id + ":" + threadId; 锁的小key

就是一段lua脚本去运行redis,先判断这个锁是不是我这个线程的(一般都是,因为就是从这里传进来的)

然后更新有效期

 出来renewExpirationAsinc以后,下面我又调用了renewExpiration方法(自己)

最终把任务放到ee中,ee就是上面得到的entry,这样entry中还有任务

这就是为什么之前,如果是oldentry如果map已经有了就不用再次更新了,因为已经有这个任务了

诶,所以,我每隔10s执行这个任务这个任务里面呢,我又刷新有效期,刷新完后又调用自己,再次刷新,每隔10s,刷新有效期到30s,所以一直是有效的,不会出现因为业务阻塞导致锁过期

 再释放锁方法中取消这些东西

先去除id

在cancel取消任务

在把对应entry从map删除

3.3redission锁的MutiLock原理

为了提高redis的可用性,我们会搭建集群或者主从,现在以主从为例

此时我们去写命令,写在主机上, 主机会将数据同步给从机,但是假设在主机还没有来得及把数据写入到从机去的时候,此时主机宕机,哨兵会发现主机宕机,并且选举一个slave变成master,而此时新的master中实际上并没有锁信息,此时锁信息就已经丢掉了。

为了解决这个问题,redission提出来了MutiLock锁,使用这把锁咱们就不使用主从了,每个节点的地位都是一样的, 这把锁加锁的逻辑需要写入到每一个主丛节点上,只有所有的服务器都写入成功,此时才是加锁成功,假设现在某个节点挂了,那么他去获得锁的时候,只要有一个节点拿不到,都不能算是加锁成功,就保证了加锁的可靠性。

@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {try {return tryLockAsync(waitTime, leaseTime, unit).get();} catch (ExecutionException e) {throw new IllegalStateException(e);}long newLeaseTime = -1;if (leaseTime != -1) {if (waitTime == -1) {newLeaseTime = unit.toMillis(leaseTime);} else {newLeaseTime = unit.toMillis(waitTime) * 2;}}long time = System.currentTimeMillis();long remainTime = -1;if (waitTime != -1) {remainTime = unit.toMillis(waitTime);}long lockWaitTime = calcLockWaitTime(remainTime);int failedLocksLimit = failedLocksLimit();List<RLock> acquiredLocks = new ArrayList<>(locks.size());for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {RLock lock = iterator.next();boolean lockAcquired;try {if (waitTime == -1 && leaseTime == -1) {lockAcquired = lock.tryLock();} else {long awaitTime = Math.min(lockWaitTime, remainTime);lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);}} catch (RedisResponseTimeoutException e) {unlockInner(Arrays.asList(lock));lockAcquired = false;} catch (Exception e) {lockAcquired = false;}if (lockAcquired) {acquiredLocks.add(lock);} else {if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {break;}if (failedLocksLimit == 0) {unlockInner(acquiredLocks);if (waitTime == -1) {return false;}failedLocksLimit = failedLocksLimit();acquiredLocks.clear();// reset iteratorwhile (iterator.hasPrevious()) {iterator.previous();}} else {failedLocksLimit--;}}if (remainTime != -1) {remainTime -= System.currentTimeMillis() - time;time = System.currentTimeMillis();if (remainTime <= 0) {unlockInner(acquiredLocks);return false;}}}if (leaseTime != -1) {List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());for (RLock rLock : acquiredLocks) {RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);futures.add(future);}for (RFuture<Boolean> rFuture : futures) {rFuture.syncUninterruptibly();}}return true;
}

waitTime等待获取锁的最长时间,主要作用包括:

  1. 控制获取锁的等待时长 - 如果在waitTime时间内无法获取所有锁,就放弃

  2. 作为重试的时间窗口 - 在这个时间范围内会不断尝试获取锁

  3. 避免无限等待 - 防止线程因为无法获取锁而永久阻塞

完整代码逐行解析

java

@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {// 方法签名:支持等待时间、租约时间和时间单位// 可能会抛出InterruptedException(线程中断异常)try {// 调用异步版本,然后通过.get()同步等待结果return tryLockAsync(waitTime, leaseTime, unit).get();} catch (ExecutionException e) {// 如果异步执行过程中出现异常,包装后抛出throw new IllegalStateException(e);}// 注意:上面的return语句已经返回了,所以下面的代码实际上不会执行// 这可能是因为代码片段不完整或者有误// 但为了完整理解,我们继续分析下面的逻辑

参数处理和初始化

java

    // 计算新的租约时间
    long newLeaseTime = -1;  // 默认-1表示无限期

    if (leaseTime != -1) {  // 如果用户指定了租约时间
        if (waitTime == -1) {
            // 情况1:无限等待,使用用户指定的租约时间
            newLeaseTime = unit.toMillis(leaseTime);
        } else {
            // 情况2:有限等待,租约时间 = 等待时间 * 2
            // 为什么是2倍?确保在重试过程中已获取的锁不会过期
            newLeaseTime = unit.toMillis(waitTime) * 2;
        }
    }

    // 记录开始时间,用于计算剩余时间
    long time = System.currentTimeMillis();

    // 剩余时间初始化
    long remainTime = -1;  // -1表示无限等待

    if (waitTime != -1) {
        // 将用户指定的等待时间转换为毫秒
        remainTime = unit.toMillis(waitTime);
    }

    // 计算每次获取单个锁的最大等待时间
    long lockWaitTime = calcLockWaitTime(remainTime);
    // 这个方法通常会返回一个合理的值,比如剩余时间的一部分

    // 获取失败锁的限制数量
    int failedLocksLimit = failedLocksLimit();
    // 这个方法通常返回0,表示不允许任何锁获取失败

    // 创建列表来保存成功获取的锁
    List<RLock> acquiredLocks = new ArrayList<>(locks.size());
    // 预先分配足够容量,避免扩容开销

核心锁获取循环

java

    // 使用ListIterator遍历所有需要获取的锁// ListIterator支持向前和向后移动,这在重置时很重要for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {// 获取下一个锁RLock lock = iterator.next();boolean lockAcquired;  // 标记是否成功获取当前锁try {// 根据参数选择不同的获取策略if (waitTime == -1 && leaseTime == -1) {// 情况1:无限等待 + 无限租约,使用最简单的tryLocklockAcquired = lock.tryLock();} else {// 情况2:有限等待或有限租约// 计算本次获取锁的等待时间,取lockWaitTime和remainTime的较小值long awaitTime = Math.min(lockWaitTime, remainTime);// 尝试获取单个锁,使用计算出的等待时间和租约时间lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);}} catch (RedisResponseTimeoutException e) {// Redis响应超时异常处理unlockInner(Arrays.asList(lock));  // 释放当前锁(如果已获取)lockAcquired = false;  // 标记获取失败} catch (Exception e) {// 其他异常处理lockAcquired = false;  // 标记获取失败,但不释放(因为可能根本没获取到)}

获取结果处理

java

        if (lockAcquired) {// 成功获取锁,添加到已获取列表acquiredLocks.add(lock);} else {// 获取当前锁失败// 检查剩余未获取的锁数量是否已达到失败限制if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {break;  // 达到失败限制,跳出循环}// 处理失败情况if (failedLocksLimit == 0) {// 失败限制为0,表示不允许任何失败// 释放所有已经成功获取的锁unlockInner(acquiredLocks);if (waitTime == -1) {// 如果是无限等待,直接返回失败return false;}// 重置失败限制计数器failedLocksLimit = failedLocksLimit();// 清空已获取锁列表acquiredLocks.clear();// 重置迭代器到开始位置,准备重新尝试// 这是关键步骤:回到起点重新获取所有锁while (iterator.hasPrevious()) {iterator.previous();}} else {// 失败限制不为0,减少失败计数failedLocksLimit--;}}

时间管理和超时检查

java

        // 如果设置了等待时间,需要更新剩余时间if (remainTime != -1) {// 计算从循环开始到现在经过的时间long elapsed = System.currentTimeMillis() - time;// 更新剩余时间remainTime -= elapsed;// 更新时间戳,用于下一次计算time = System.currentTimeMillis();// 检查是否超时if (remainTime <= 0) {// 超时,释放所有已获取的锁unlockInner(acquiredLocks);// 返回获取失败return false;}}}  // 结束for循环

成功获取后的处理

java

    // 如果执行到这里,说明成功获取了所有需要的锁// 如果用户指定了租约时间,需要为所有锁设置统一的租约时间if (leaseTime != -1) {List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());// 为每个已获取的锁设置过期时间for (RLock rLock : acquiredLocks) {// 转换为RedissonLock类型,调用异步过期方法RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime),  // 用户指定的租约时间TimeUnit.MILLISECONDS);futures.add(future);}// 等待所有过期设置操作完成for (RFuture<Boolean> rFuture : futures) {rFuture.syncUninterruptibly();  // 同步等待,不可中断}}// 返回成功return true;
}

完整流程示例(获取3个锁)

假设我们要获取lockA、lockB、lockC,waitTime=10秒leaseTime=30秒

第一次尝试:

  1. 获取lockA:成功 → acquiredLocks = [lockA]

  2. 获取lockB:成功 → acquiredLocks = [lockA, lockB]

  3. 获取lockC:失败(被占用)

  4. 处理失败

    • failedLocksLimit = 0,所以释放lockA和lockB

    • 清空acquiredLocks

    • 重置迭代器到开始位置

    • 更新剩余时间(假设已用2秒,remainTime = 8秒

第二次尝试:

  1. 获取lockA:成功 → acquiredLocks = [lockA]

  2. 获取lockB:成功 → acquiredLocks = [lockA, lockB]

  3. 获取lockC:成功 → acquiredLocks = [lockA, lockB, lockC]

成功完成:

  • 为lockA、lockB、lockC统一设置30秒过期时间

  • 返回true

http://www.dtcms.com/a/554734.html

相关文章:

  • 网站建设企划书网站配置系统
  • 深圳做网站优化报价网站增加导航栏
  • STM32H743 cubemx配置 LL库 ADC3 调试笔记
  • 江苏中益建设官方网站工信部网站备案审核
  • 门户网站前期网络采集商家信息免费发布做宣传的网站
  • svg图片做网站背景网站报价单模板
  • 济南 制作网站 公司吗室内装修设计书籍
  • 15.<Spring Boot 日志>
  • C语言实现扫雷游戏
  • 鱼吃鱼服务线上智能服务已更新
  • 手机建站平台微点怎么给一个网站做推广
  • 环形缓冲区(ring buffer)
  • 网站服务器租用方法wordpress 下载短代码
  • 零基础能考信创认证吗?报考条件是什么?
  • 免费制作微信小程序的网站企业购
  • 计算机网络技专业术网站开发张家口住房和城乡建设部网站
  • Gartner发布AI-ITSM最新趋势!
  • Vue3 异步组件(懒加载组件)
  • 如何做电影网站才不侵权贵州省省建设厅网站
  • osgearth\AFsim如何加载影像瓦片数据和高程数据
  • 是做网站设计好还是杂志美编好有没有傻瓜式建设网站
  • Derby - Derby 服务器(Derby 概述、Derby 服务器下载与启动、Derby 连接数据库与创建数据表、Derby 数据库操作)
  • 慈溪高端网站设计甘肃嘉峪关建设局网站
  • 重庆渝云建设有限公司官方网站深圳网站运营
  • 【开题答辩实录分享】以《自动售货机刷脸支付系统的设计与实现》为例进行答辩实录分享
  • 瑜伽 网站模板夏津网站建设电话
  • 长沙手机网站设计公司网站建设与客户价格谈判技巧
  • 网页抓包实战,工具选型、分层排查与真机取证流程
  • 荆门市城乡建设管理局网站广州建设工程中心网站
  • 可以自己建设购物网站家具营销策划方案