【从零开始学习Redis】秒杀优化——阻塞队列、消息队列实现异步秒杀
秒杀优化-异步秒杀
需求:
① 新增秒杀优惠券的同时,将优惠券信息保存到Redis中
@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
// 保存优惠券
save(voucher);
// 保存秒杀信息
SeckillVoucher seckillVoucher = new SeckillVoucher();
seckillVoucher.setVoucherId(voucher.getId());
seckillVoucher.setStock(voucher.getStock());
seckillVoucher.setBeginTime(voucher.getBeginTime());
seckillVoucher.setEndTime(voucher.getEndTime());
seckillVoucherService.save(seckillVoucher);
// 保存秒杀库存到redis
stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());
}
② 基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
-- 1.参数列表
-- 1.1 优惠券id
local voucherId = ARGV[1]
-- 1.2 用户id
local userId = ARGV[2]-- 2 数据key
-- 2.1 库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2 订单key
local orderKey = 'seckill:order:' .. voucherId-- 3.脚本业务
-- 3.1 判断库存是否充足 get stockKey
if(tonumber(redis.call('get', stockKey)) <= 0) then-- 3.2 库存不足return 1
end
-- 3.2 判断用户是否下单
if(redis.call('sismember', orderKey, userId) == 1) then-- 3.3 存在,说明是重复下单,返回2return 2
end
-- 3.4 扣库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.5 下单(保存用户) sadd orderKey userId
redis.call('sadd', orderKey, userId)
return 0
③ 如果抢购成功,将优惠券id和用户id封装后存入阻塞队列
④ 开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能
@Slf4j
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@Resourceprivate ISeckillVoucherService seckillVoucherService;@Resourceprivate RedisIdWorker redisIdWorker;@Autowiredprivate StringRedisTemplate stringRedisTemplate;@Resourceprivate RedissonClient redissonClient;private static final DefaultRedisScript<Long> SECKILL_SCRIPT;// 初始化脚本static {SECKILL_SCRIPT = new DefaultRedisScript<>();SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));SECKILL_SCRIPT.setResultType(Long.class);}private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();@PostConstruct // 保证在当前类初始化完毕就执行此方法,目的是让VoucherOrderHandler赶快执行private void init() {SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());}private class VoucherOrderHandler implements Runnable {@Overridepublic void run() {while(true){try {// 1.获取队列中的订单信息VoucherOrder voucherOrder = orderTasks.take();// 2.创建订单handleVoucherOrder(voucherOrder);} catch (Exception e) {log.error("处理订单异常", e);}}}}private void handleVoucherOrder(VoucherOrder voucherOrder) {// 1.获取用户Long userId = voucherOrder.getUserId();// 2.创建锁对象// SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);RLock lock = redissonClient.getLock("Lock:order" + userId);//3.获取锁boolean isLock = lock.tryLock();//4.判断是否获取锁成功if (!isLock) {// 获取锁失败,返回错误或重试log.error("不允许重复下单");return;}try {//获取代理对象(事务)// IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();子线程拿不到,只能从主线程里拿proxy.createVoucherOrder(voucherOrder);} finally {// 释放锁lock.unlock();}}private IVoucherOrderService proxy;@Overridepublic Result seckillVoucher(Long voucherId) {Long userId = UserHolder.getUser().getId();// 1.执行lua脚本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(), userId.toString());// 2.判断结果是否为0int r = result.intValue();if (r != 0) {// 2.1 不为0,代表没有购买资格return Result.fail(r == 1 ? "库存不足!" : "不能重复下单!");}// 2.2 为0,有购买资格,把下单信息保存到阻塞队列// 2.3订单VoucherOrder voucherOrder = new VoucherOrder();// 2.4订单idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);// 2.5用户idvoucherOrder.setUserId(userId);// 2.6优惠券idvoucherOrder.setVoucherId(voucherId);// 2.7 放入阻塞队列orderTasks.add(voucherOrder);// 3.获取代理对象proxy = (IVoucherOrderService) AopContext.currentProxy();// 4.返回订单idreturn Result.ok(orderId);}@Transactionalpublic void createVoucherOrder(VoucherOrder voucherOrder) {// 一人一单Long userId = voucherOrder.getUserId();// 1.查询订单int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder).count();// 2.判断是否存在if (count > 0) {// 用户已经购买过了log.error("用户已经购买过一次!");return;}// 扣减库存boolean success = seckillVoucherService.update().setSql("stock = stock - 1") // set stock = stock - 1.eq("voucher_id", voucherOrder).gt("stock", 0) // where id = ? and stock > 0.update();if (!success) {log.error("库存不足");return;}// 创建订单save(voucherOrder);}
}
这里提出三个问题,是我当时不理解的地方,分享出来。
- 在实现 handleVocherOrder 方法时,视频里老师说这里其实没必要加锁,为什么可以不加锁?
private void handleVoucherOrder(VoucherOrder voucherOrder) {// 1.获取用户Long userId = voucherOrder.getUserId();// 2.创建锁对象// SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);RLock lock = redissonClient.getLock("Lock:order" + userId);//3.获取锁boolean isLock = lock.tryLock();//4.判断是否获取锁成功if (!isLock) {// 获取锁失败,返回错误或重试log.error("不允许重复下单");return;}try {//获取代理对象(事务)// IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();子线程拿不到,只能从主线程里拿proxy.createVoucherOrder(voucherOrder);} finally {// 释放锁lock.unlock();}}
我们回看 Lua 脚本,里面实现了检查超卖和一人一单的功能,并且在 SeckillVoucher 方法里面,执行了 Lua 脚本,并且这两个操作是原子操作,也就是说在当前阶段,已经获取到了 voucherOrder 的情况下,voucherOrder 已经被成功创建并放到了阻塞队列中,说明已经保证了不超卖、一人一单。
其次,在类初始化时,SECKILL_ORDER_EXECUTOR
会启动一个单线程VoucherOrderHandler
,执行handleVoucherOrder
,本来就是单线程创建订单,就更没必要加锁了。
但是,原视频说的是把加锁当作保底,避免出现意外的问题。
proxy = (IVoucherOrderService) AopContext.currentProxy();
这里的代理对象是什么?为什么创建的这个单线程获取不了代理对象?
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
Spring 的事务(@Transactional)是基于 AOP 实现的。
- 本质上是用动态代理,在目标方法前后织入事务的开启、提交、回滚操作。
- 只有通过代理对象执行方法,Spring 才能拦截并生效。
- 如果在类的内部直接调用
this.xxx()
,Spring 拦不住->事务不会生效。
比如,
@Transactional
public void createVoucherOrder(...) {// 保存订单
}public void handleVoucherOrder(...) {// this.createVoucherOrder(...); // ❌ 事务失效
}
如果直接 this.createVoucherOrder(),那么事务就会失效。
通过代理对象调用 proxy.createVoucherOrder(...)
→ 走 AOP → @Transactional
生效。
- 为什么不能在创建的线程获取代理对象?
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
AopContext.currentProxy()
底层是通过 ThreadLocal 保存当前线程的代理对象。- SpringAop 拦截器会先把代理对象塞到当前线程的 ThreadLocal 中,方法调用结束后清理。
- 如果自己新建了进程,他的 ThreadLocal 是空的,所以
AopContext.currentProxy()
拿到的代理对象为null
。
所以要在主线程获取代理对象。
- 这个异步下单到底是什么东西?怎么就异步了?
在最开始的同步下单方案里,流程是这样的:
请求进来 → 校验库存 & 一人一单 → 创建订单(写库)
- 秒杀场景下并发极高,如果每个请求都直接写数据库,数据库压力会非常大;
- 订单创建慢,用户响应也慢,吞吐量上不去。
所以这里优化就是,请求快相应、订单慢慢处理
① 先利用Redis完成库存余量、一人一单判断, 完成抢单业务
② 再将下单业务放入阻塞队列, 利用独立线程异步下单
也就是说,请求线程不再负责创建订单,而是把任务放到 BlockingQueue
里,让单线程消费者慢慢处理。
异步过程:
用户请求↓
seckillVoucher (快速校验 → 入队 → 立刻返回订单ID)↓
阻塞队列 (缓存所有订单任务)↓
单线程消费者 VoucherOrderHandler↓
handleVoucherOrder (获取锁 → 调用事务方法)↓
createVoucherOrder (写数据库)
- 请求线程不再写数据库,只负责“丢任务+返回结果”;
- 写数据库操作交给后台线程去做(异步处理);
- 阻塞队列起到缓冲作用,可以削峰填谷,保护数据库。
但是但是,基于阻塞队列的异步秒杀还存在问题。
- 内存限制问题,我们使用的是 jdk 的阻塞队列,使用的 JVM 的内存,如果不加以限制,高并发下容易导致内存溢出。所以我们在创建阻塞队列的时候设置了队列的长度。但是满了之后就存不下新的订单了。
- 数据安全问题, 现在是基于内存保存订单信息,如果服务突然宕机,那么存储的信息全部会丢失,订单丢失会导致数据不一致问题。
Redis 消息队列实现异步秒杀
认识消息队列
消息队列(Message Queue),字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:
- 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
- 生产者:发送消息到消息队列
- 消费者:从消息队列获取消息并处理消息
同时, Redis提供了三种不同的方式来实现消息队列:
- list结构: 基于List结构模拟消息队列
- PubSub:基本的点对点消息模型
- Stream: 比较完善的消息队列模型
基于 List 结构模拟消息队列
消息队列字面意思就是存放消息的队列,而 List 的底层数据结构是双向链表 ,所以可以基于此模拟消息队列。
因此我们可以利用 LPUSH 和 RPOP 或者 RPUSH 和 LPOP 实现。
不过需要注意的是,当队列中没有消息时再获取消息就会返回 null,不会像 JVM 那样阻塞并等待消息,也就是非阻塞式的。
因此这里应该使用 BLPUSH 和 **BRPOP **实现。
基于 List 的消息队列有哪些优缺点?
优点:
- 利用 Redis 存储,不受限于 JVM 内存
- Redis 的持久化机制保证了数据的安全
- 可以满足消息有序性
缺点:
- 无法避免消息丢失
- 只支持但消费者
基于 PubSub 的消息队列
PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。
- SUBSCRIBE channel [channel]: 订阅一个或多个频道
- PUBLISH channel msg: 向一个频道发送消息
- PSUBSCRIBEpattern[pattern]: 订阅与pattern格式匹配的所有频道
基于PubSub的消息队列有哪些优缺点?
优点:
- 采用发布订阅模型,支持多生产、多消费
缺点:
- 不支持数据持久化。不像 List 是基于双向链表模拟的消息队列,本身 List 就是用来做数据存储的,所以支持数据持久化。而这个是单纯用来做消息发送的,所以不支持数据持久化。
- 无法避免消息丢失。如果未被订阅,那么 Redis 不会去存储这个消息,发出的消息直接丢失了。
- 消息堆积有上线,超出时数据丢失。
基于 Stream 的消息队列
Stream 的单消费模式
Stream 是 Redis 5.0 引入的一种新数据类型, 可以实现一个功能非常完善的消息队列。
发送消息的命令:
例如:
读取消息的方式之一:XREAD
注意:
当我们指定起始ID为$时,代表读取最新的消息,如果我们处理一条消息的过程中,又有超过1条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题。
STREAM类型消息队列的XREAD命令特点:
- 消息可回溯
- 一个消息可以被多个消费者读取
- 可以阻塞读取
- 有消息漏读的风险
Stream 的消费者组模式
消费者组(Consumer Group): 将多个消费者划分到一个组中, 监听同一个队列。具备下列特点:
- 消息分流
队列中的消息会分流给组内的不同消费者, 而不是重复消费, 从而加快消息处理的速度。
- 消息标示
消费者组会维护一个标示,记录最后一个被处理的消息,哪怕消费者宕机重启, 还会从标示之后读取消息。确保每一个消息都会被消费
- 消息确认
消费者获取消息后,消息处于 pending状态,并存入一个 pending-list。当处理完成后需要通过XACK来确认消息,标记消息为已处理,才会从pending-list移除。解决了消息丢失的问题。
创建消费者组:
其他常用命令:
从消费者组读取消息:
-
group: 消费组名称
-
consumer: 消费者名称, 如果消费者不存在, 会自动创建一个消费者
-
count:本次查询的最大数量
-
BLOCK milliseconds: 当没有消息时最长等待时间
-
NOACK: 无需手动ACK, 获取到消息后自动确认STREAMS key: 指定队列名称
-
ID: 获取消息的起始ID:
">": 从下一个未消费的消息开始其它: 根据指定id从pending-list中获取已消费但未确认的消息, 例如0, 是从pending-list中的第一个消息开始
STREAM类型消息队列的XREADGROUP命令特点:
- 消息可回溯
- 可以多消费者争抢消息, 加快消费速度
- 可以阻塞读取
- 没有消息漏读的风险
- 有消息确认机制,保证消息至少被消费一次
小结
List | PubSub | Stream | |
---|---|---|---|
消息持久化 | 支持 | 不支持 | 支持 |
阻塞读取 | 支持 | 支持 | 支持 |
消息堆积处理 | 受限于内存空间,可以利用多消费者加快处理 | 受限于消费者缓冲区 | 受限于队列长度,可以利用消费者组提高消费速度,减少堆积 |
消息确认机制 | 不支持 | 不支持 | 支持 |
消息回溯 | 不支持 | 不支持 | 支持 |
基于 Stream 消息队列实现异步秒杀
① 创建一个Stream类型的消息队列, 名为stream.orders
② 修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherId、userId、orderId
@Override
public Result seckillVoucher(Long voucherId) {
Long userId = UserHolder.getUser().getId();
// 获取订单id
long orderId = redisIdWorker.nextId("order");
// 1.执行lua脚本
Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(), userId.toString(), String.valueOf(orderId)
);
// 2.判断结果是否为0
int r = result.intValue();
if (r != 0) {// 2.1 不为0,代表没有购买资格return Result.fail(r == 1 ? "库存不足!" : "不能重复下单!");
}
// 3.获取代理对象
proxy = (IVoucherOrderService) AopContext.currentProxy();
// 4.返回订单id
return Result.ok(orderId);}
执行 lua 脚本这块,voucherId.toString(), userId.toString(), String.valueOf(orderId)
,前两个都是 toString,怎么第三个就变成 valueOf 了呢,是作者笔误了吗,还是这两种写法都对?
其实这么写是对的,这里有个小细节容易忽略
voucherId
和userId
是Long
对象(包装类),可以直接调用toString()
方法,因为它们是对象orderId
是long
基本数据类型,就不能直接调用toString()
,必须使用String.valueOf(orderId)
来转换
③ 项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单
private class VoucherOrderHandler implements Runnable {String queueName = "stream.orders";@Overridepublic void run() {while (true) {try {// 1.获取消息队列中的订单信息 xread group g1 c1 count 1 block 2000 streams stream.orders >List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(queueName, ReadOffset.lastConsumed()));// 2.判断消息是否获取成功if (list == null || list.isEmpty()) {//如果获取失败,说明没有消息,继续下一次循环continue;}// 3. 解析消息中的订单信息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);// 4. 如果获取成功,可以下单handleVoucherOrder(voucherOrder);// 5. ACK确认 ack stream.orders g1 idstringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());} catch (Exception e) {log.error("处理订单异常", e);HandlePendingList();}}}private void HandlePendingList() {while (true) {try {// 1.获取pendingList中的订单信息 xread group g1 c1 count 1 streams stream.orders 0List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1),StreamOffset.create(queueName, ReadOffset.from("0")));// 2.判断消息是否获取成功if (list == null || list.isEmpty()) {//如果获取失败,说明pendingList没有消息,停止循环break;}// 3. 解析消息中的订单信息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);// 4. 如果获取成功,可以下单handleVoucherOrder(voucherOrder);// 5. ACK确认 ack stream.orders g1 idstringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());} catch (Exception e) {log.error("处理pending-list订单异常", e);try {Thread.sleep(20);} catch (InterruptedException ex) {ex.printStackTrace();}}}}
}
如果这篇文章对你有帮助,请点赞、评论、收藏,创作不易,你的支持是我创作的动力。