当前位置: 首页 > news >正文

redis解决常见的秒杀问题


title: redis解决常见的秒杀问题
date: 2025-03-07 14:24:13
tags: redis
categories: redis的应用

秒杀问题

每个店铺都可以发布优惠券,保存到 tb_voucher 表中;当用户抢购时,生成订单并保存到 tb_voucher_order 表中。

订单表如果使用数据库自增 ID,会存在以下问题:

  • ID 的规律太明显,容易暴露信息。
  • 单表数据量的限制,订单过多时单表很难存储得下。数据量过大后需要拆库拆表,但拆分表了之后,各表从逻辑上是同一张表,所以 id 不能一样, 于是需要保证 ID 的唯一性。

全局唯一ID

全局唯一 ID 的特点

  • 唯一性:Redis 独立于数据库之外,不论有多少个数据库、多少张表,访问 Redis 获取到的 ID 可以保证唯一。
  • 高可用:Redis 高可用(集群等方案)。
  • 高性能:Redis 速度很快。
  • 递增性:例如 String 的 INCR 命令,可以保证递增。
  • 安全性:为了增加 ID 的安全性,在使用 Redis 自增数值的基础上,在拼接一些其他信息。

全局唯一 ID 的组成(存储数值类型占用空间更小,使用 long 存储,8 byte,64 bit)

在这里插入图片描述

  • 符号位:1 bit,永远为 0,代表 ID 是正数。

  • 时间戳:31 bit,以秒为单位,可以使用 69 年。

  • 序列号:32 bit,当前时间戳对应的数量,也就是每秒可以对应 2^32 个不同的 ID。

Redis ID 自增策略:通过设置每天存入一个 Key,方便统计订单数量;ID 构造为 时间戳 + 计数器。

@Component
public class RedisIdWorker {/*** 指定时间戳(2023年1月1日 0:0:00) LocalDateTime.of(2023, 1, 1, 0, 0, 0).toEpochSecond(ZoneOffset.UTC)*/private static final long BEGIN_TIMESTAMP_2023 = 1672531200L;/*** 序列号位数*/private static final int BIT_COUNT = 32;private final StringRedisTemplate stringRedisTemplate;public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {this.stringRedisTemplate = stringRedisTemplate;}public long nextId(String keyPrefix) {// 1. 时间戳long timestamp = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC) - BEGIN_TIMESTAMP_2023;// 2. 生成序列号:自增 1,Key 不存在会自动创建一个 Key。(存储到 Redis 中的 Key 为 keyPrefix:date,Value 为自增的数量)Long serialNumber = stringRedisTemplate.opsForValue().increment(keyPrefix + ":" + DateTimeFormatter.ofPattern("yyyy-MM-dd").format(LocalDate.now()));// 3. 时间戳左移 32 位,序列号与右边的 32 个 0 进行与运算return timestamp << BIT_COUNT | serialNumber;}
}

测试(300个线程生成共3w个id)

@Resource
private RedisIdWorker redisIdWorker;public static final ExecutorService ES = Executors.newFixedThreadPool(500);@Test
void testGloballyUniqueID() throws Exception {// 程序是异步的,分线程全部走完之后主线程再走,使用 CountDownLatch;否则异步程序没有执行完时主线程就已经执行完了CountDownLatch latch = new CountDownLatch(300);Runnable task = () -> {for (int i = 0; i < 100; i++) {long globallyUniqueID = redisIdWorker.nextId("sun");System.out.println("globallyUniqueID = " + globallyUniqueID);}latch.countDown();};long begin = System.currentTimeMillis();for (int i = 0; i < 300; i++) {ES.submit(task);}latch.await();long end = System.currentTimeMillis();System.out.println("Execution Time: " + (end - begin));
}

添加优惠卷

格式类似这种逻辑太简单了略

{"shopId":1,"title":"100元代金券","subTitle":"周一至周五均可使用","rules":"全场通用\n无需预约\n可无限叠加\n不兑现、不找零\n仅限堂食","payValue":8000,"actualValue":10000,"type":1,"stock":100,"beginTime":"2022-11-13T10:09:17","endTime":"2022-11-13T22:10:17"
}

秒杀下单功能

在这里插入图片描述

@Override
@Transactional
public Result seckillVoucher(Long voucherId) {//1.查询优惠卷SeckillVoucher voucher = seckillVoucherService.getById(voucherId);//2.判断秒杀是否开始,是否结束if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {return Result.fail("秒杀尚未开始!");}if(voucher.getEndTime().isBefore(LocalDateTime.now())){return Result.fail("秒杀已结束!");}//3.判断库存是否充足if(voucher.getStock()<=0){return Result.fail("优惠券库存不足!");}//4.扣减库存boolean success = seckillVoucherService.update().setSql("stock = stock -1").eq("voucher_id", voucherId).update();//5.创建订单if(!success){return Result.fail("优惠券库存不足!");}//6.返回订单idVoucherOrder voucherOrder = new VoucherOrder();//6.1订单idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);//6.2用户idLong userId = UserHolder.getUser().getId();voucherOrder.setUserId(userId);//6.3代金券idvoucherOrder.setVoucherId(voucherId);//7.订单写入数据库save(voucherOrder);//8.返回订单Idreturn Result.ok(orderId);
}

超卖问题

假设库存为 1,有线程1、2、3,时刻 t1、t2、t3、t4。

  • t1:线程1 查询库存,库存为 1;
  • t2:线程2、线程 3 查询库存,库存为 1;
  • t3:线程1 下单,库存扣减为 0。
  • t4:线程2 和 线程3 下单,库存扣减为 -2。

具体图示:
在这里插入图片描述
在这里插入图片描述

解决超卖问题

悲观锁

太简单了直接加锁保证操作数据是原子操作要串行执行

乐观锁
版本号法:

一般是在数据库表中加上一个 version 字段表示 数据被修改的次数。数据被修改时 version 值加 1。

  1. 线程 A 读取数据,同时读取到 version 值。

  2. 提交更新时,若刚才读到的 version 值未发生变化:则提交更新并且 version 值加 1。

  3. 提交更新时,若刚才读到的 version 值发生了变化:放弃更新,并通过报错、自旋重试等方式进行下一步处理。

在这里插入图片描述

CAS法(简单来说就是直接拿库存当版本号):

CAS 操作需要输入两个数值,一个旧值(操作前的值)和一个新值,操作时先比较下在旧值有没有发生变化,若未发生变化才交换成新值,发生了变化则不交换。

CAS 是原子操作,多线程并发使用 CAS 更新数据时,可以不使用锁。原子操作是最小的不可拆分的操作,操作一旦开始,不能被打断,直到操作完成。也就是多个线程对同一块内存的操作是串行的。

在这里插入图片描述

一人一单问题

在这里插入图片描述

一人一单逻辑:

  1. 发送下单请求,提交优惠券 ID。
  2. 下单前需要判断:秒杀是否开始或结束、库存是否充足
  3. 库存充足:根据优惠券 ID 和用户 ID 查询订单,判断该用户是否购买过该优惠券
  4. 该用户对该优惠券的订单不存在时,扣减库存、创建订单、返回订单 ID。

解决并发安全问题

  1. 单人下单(一个用户),高并发的情况下:该用户的 10 个线程同时执行到 查询该用户 ID 和秒杀券对应的订单数量,10 个线程查询到的值都为 0,即未下单。于是会出现一个用户下 10 单的情况。
  2. **此处仍需加锁,乐观锁适合更新操作,插入操作需要选择悲观锁。**若直接在方法上添加 synchronized 关键字,会让锁的范围(粒度)过大,导致性能较差。因此,采用 一个用户一把锁 的方式。

问题:能否用乐观锁执行?

不能,原因是乐观锁只能操作(修改)单个变量,而创建订单需要操作数据库(难以跟踪状态)

@Override
public CommonResult<Long> seckillVoucher(Long voucherId) {// 判断秒杀是否开始或结束、库存是否充足。SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);ThrowUtils.throwIf(seckillVoucher == null, ErrorCode.NOT_FOUND_ERROR);LocalDateTime now = LocalDateTime.now();ThrowUtils.throwIf(now.isBefore(seckillVoucher.getBeginTime()), ErrorCode.OPERATION_ERROR, "秒杀尚未开始");ThrowUtils.throwIf(now.isAfter(seckillVoucher.getEndTime()), ErrorCode.OPERATION_ERROR, "秒杀已经结束");ThrowUtils.throwIf(seckillVoucher.getStock() < 1, ErrorCode.OPERATION_ERROR, "库存不足");// 下单return this.createVoucherOrder(voucherId);
}/*** 下单(超卖 - CAS、一人一单 - synchronized)*/
@Override
@Transactional
public CommonResult<Long> createVoucherOrder(Long voucherId) {// 1. 判断当前用户是否下过单Long userId = UserHolder.getUser().getId();Integer count = this.lambdaQuery().eq(VoucherOrder::getVoucherId, voucherId).eq(VoucherOrder::getUserId, userId).count();ThrowUtils.throwIf(count > 0, ErrorCode.OPERATION_ERROR, "禁止重复下单");// 2. 扣减库存boolean result = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).gt("stock", 0).update();ThrowUtils.throwIf(!result, ErrorCode.OPERATION_ERROR, "下单失败");// 3. 下单VoucherOrder voucherOrder = new VoucherOrder();voucherOrder.setUserId(userId);voucherOrder.setId(redisIdWorker.nextId("seckillVoucherOrder"));voucherOrder.setVoucherId(voucherId);result = this.save(voucherOrder);ThrowUtils.throwIf(!result, ErrorCode.OPERATION_ERROR, "下单失败");return CommonResult.success(voucherOrder.getId());
}

集群环境下的并发问题

在这里插入图片描述

分布式锁-原理

不去使用jvm内部的锁监视器,我们要在外部开一个锁监视器,让它监视所有的线程

在这里插入图片描述

常见的分布式锁

MySQL:MySQL 本身带有锁机制,但是由于 MySQL 性能一般,所以采用分布式锁的情况下,使用 MySQL 作为分布式锁比较少见。
Redis:Redis 作为分布式锁比较常见,利用 setnx 方法,如果 Key 插入成功,则表示获取到锁,插入失败则表示无法获取到锁。
Zookeeper:Zookeeper 也是企业级开发中比较好的一个实现分布式锁的方案。

MySQLRedisZookeeper
互斥利用 MySQL 本身的互斥锁机制利用 setnx 互斥命令利用节点的唯一性和有序性
高可用
高性能一般一般
安全性断开链接,自动释放锁利用锁超时时间,到期释放临时节点,断开链接自动释放
# 添加锁(NX 互斥、EX 设置 TTL 时间)
SET lock thread1 NX EX 10# 手动释放锁
DEL lock
public interface DistributedLock {/*** 获取锁(只有一个线程能够获取到锁)* @param timeout   锁的超时时间,过期后自动释放* @return          true 代表获取锁成功;false 代表获取锁失败*/boolean tryLock(long timeout);/*** 释放锁*/void unlock();
}public class SimpleDistributedLock4Redis implements DistributedLock {private static final String KEY_PREFIX = "lock:";private final String name;private final StringRedisTemplate stringRedisTemplate;public SimpleDistributedLockBased4Redis(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}@Overridepublic boolean tryLock(long timeout) {String threadId = Thread.currentThread().getId().toString();Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeout, TimeUnit.SECONDS);// result 是 Boolean 类型,直接返回存在自动拆箱,为防止空指针不直接返回return Boolean.TRUE.equals(result);}@Overridepublic void unlock() {stringRedisTemplate.delete(KEY_PREFIX + name);}
}
/*** VERSION3.0 - 秒杀下单优惠券(通过分布式锁解决一人一单问题)*/
@Override
public CommonResult<Long> seckillVoucher(Long voucherId) {// 判断秒杀是否开始或结束、库存是否充足。...// 下单SimpleDistributedLock4Redis lock = new SimpleDistributedLock4Redis("order:" + UserHolder.getUser().getId(), stringRedisTemplate);boolean tryLock = lock.tryLock(TTL_TWO);ThrowUtils.throwIf(!tryLock, ErrorCode.OPERATION_ERROR, "禁止重复下单");try {VoucherOrderService voucherOrderService = (VoucherOrderService) AopContext.currentProxy();return voucherOrderService.createVoucherOrder(voucherId);} finally {lock.unlock();}
}

误删问题

# 线程 1 获取到锁后执行业务,碰到了业务阻塞。
setnx lock:order:1 thread01# 业务阻塞的时间超过了该锁的 TTL 时间,触发锁的超时释放。超时释放后,线程 2 获取到锁并执行业务。
setnx lock:order:1 thread02# 线程 2 执行业务的过程中,线程 1 的业务执行完毕并且释放锁,但是释放的是线程 2 获取到的锁。(线程 2:你 TM 放我锁是吧!)
del lock:order:1# 线程 3 获取到锁(此时线程 23 并行执行业务)
setnx lock:order:1 thread03

在这里插入图片描述

解决方案:在线程释放锁时,判断当前这把锁是否属于自己,如果不属于自己,就不会进行锁的释放(删除)。

# 线程 1 获取到锁后执行业务,碰到了业务阻塞。
setnx lock:order:1 thread01# 业务阻塞的时间超过了该锁的 TTL 时间,触发锁的超时释放。超时释放后,线程 2 获取到锁并执行业务。
setnx lock:order:1 thread02# 线程 2 执行业务的过程中,线程 1 的业务执行完毕并且释放锁。但是线程 1 需要判断这把锁是否属于自己,不属于自己就不会释放锁。
# 于是线程 2 一直持有这把锁直到业务执行结束后才会释放,并且在释放时也需要判断当前要释放的锁是否属于自己。
del lock:order:1# 线程 3 获取到锁并执行业务
setnx lock:order:1 thread03

在这里插入图片描述

基于 Redis 的分布式锁的实现(解决误删问题)

  1. 相较于最开始分布式锁的实现,只需要增加一个功能:释放锁时需要判断当前锁是否属于自己。(而集群环境下不同 JVM 中的线程 ID 可能相同,增加一个 UUID 区分不同 JVM)

  2. 因此通过分布式锁存入 Redis 中的线程标识包括:UUID (服务器id)+ 线程 ID(线程id)。UUID 用于区分不同服务器中线程 ID 相同的线程,线程 ID 用于区分相同服务器的不同线程。

    public class SimpleDistributedLockBasedOnRedis implements DistributedLock {private String name;private StringRedisTemplate stringRedisTemplate;public SimpleDistributedLockBasedOnRedis(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}private static final String KEY_PREFIX = "lock:";// ID_PREFIX 在当前 JVM 中是不变的,主要用于区分不同 JVMprivate static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";/*** 获取锁*/@Overridepublic boolean tryLock(long timeoutSeconds) {// UUID 用于区分不同服务器中线程 ID 相同的线程;线程 ID 用于区分同一个服务器中的线程。String threadIdentifier = ID_PREFIX + Thread.currentThread().getId();Boolean isSucceeded = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadIdentifier, timeoutSeconds, TimeUnit.SECONDS);return Boolean.TRUE.equals(isSucceeded);}/*** 释放锁(释放锁前通过判断 Redis 中的线程标识与当前线程的线程标识是否一致,解决误删问题)*/@Overridepublic void unlock() {// UUID 用于区分不同服务器中线程 ID 相同的线程;线程 ID 用于区分同一个服务器中的线程。String threadIdentifier = THREAD_PREFIX + Thread.currentThread().getId();String threadIdentifierFromRedis = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);// 比较 Redis 中的线程标识与当前的线程标识是否一致if (!StrUtil.equals(threadIdentifier, threadIdentifierFromRedis)) {throw new BusinessException(ErrorCode.OPERATION_ERROR, "释放锁失败");}// 释放锁标识stringRedisTemplate.delete(KEY_PREFIX + name);}
    }

用Lua脚本解决原子性问题

分布式锁的原子性问题

  1. 线程 1 获取到锁并执行完业务,判断锁标识一致后释放锁,释放锁的过程中阻塞,导致锁没有释放成功,并且阻塞的时间超过了锁的 TTL 释放,导致锁自动释放。

  2. 此时线程 2 获取到锁,执行业务;在线程 2 执行业务的过程中,线程 1 完成释放锁操作。

  3. 之后,线程 3 获取到锁,执行业务,又一次导致此时有两个线程同时在并行执行业务

因此,需要保证 unlock() 方法的原子性,即判断线程标识的一致性和释放锁这两个操作的原子性。

Redis 提供了 Lua 脚本功能,在一个脚本中编写多条 Redis 命令,确保 Redis 多条命令执行时的原子性。

unlock操作

private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;static{//写成静态代码块,类加载就可以完成初始定义,就不用每次释放锁都去加载这个,性能提高咯UNLOCK_SCRIPT = new DefaultRedisScript<>();UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));//设置脚本位置UNLOCK_SCRIPT.setResultType(Long.class);
}public void unlock(){//调用lua脚本stringRedisTemplate.execute(UNLOCK_SCRIPT,Collections.singletonList(KEY_PREFIX + name),ID_PREFIX + Thread.currentThread().getId());}

Lua脚本

-- 锁的key
-- local key = KEYS[1]
-- 当前线程标识
-- local threadId = ARGV[1]
-- 获取锁中的线程标识
if(redis.call('get',KEYS[1]) == ARGV[1]) then return redis.call('del',KEYS[1])
end return 0

相关文章:

  • STL?list!!!
  • “傅里叶变换算法”来检测纸箱变形的简单示例
  • 2025认证杯第二阶段数学建模B题:谣言在社交网络上的传播思路+模型+代码
  • Ruby 循环与迭代器
  • 图片爬虫通过模板及使用说明
  • 01-数据结构概述和时间空间复杂度
  • 数据驱动下的具身智能进化范式
  • 3DVR制作的工具或平台
  • 视差计算,求指导
  • [Java实战]Spring Boot + Netty 实现 TCP 长连接客户端及 RESTful 请求转发(二十六)
  • 3D曲面上的TSP问题(一):曲面上点集距离求解
  • 【Python 面向对象】
  • 如何判断一个网站后端是用什么语言写的
  • Modern C++(一)基本概念
  • LeRobot 框架的核心架构概念和组件(下)
  • Framebuffer显示bmp图片
  • MySQL主从复制与读写分离
  • 概率相关问题
  • antd 主题色定制
  • Node.js 循环依赖问题详解:原理、案例与解决方案
  • 李峰已任上海青浦区委常委
  • 特朗普中东行:“能源换科技”背后的权力博弈|907编辑部
  • 杭州“放大招”支持足球发展:足球人才可评“高层次人才”
  • 国台办:实现祖国完全统一是大势所趋、大义所在、民心所向
  • 哲学新书联合书单|远离苏格拉底
  • 习近平致电祝贺阿尔巴尼斯当选连任澳大利亚总理