当前位置: 首页 > news >正文

【从零开始学习Redis】秒杀优化——阻塞队列、消息队列实现异步秒杀

秒杀优化-异步秒杀

需求:

① 新增秒杀优惠券的同时,将优惠券信息保存到Redis中

@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
// 保存优惠券
save(voucher);
// 保存秒杀信息
SeckillVoucher seckillVoucher = new SeckillVoucher();
seckillVoucher.setVoucherId(voucher.getId());
seckillVoucher.setStock(voucher.getStock());
seckillVoucher.setBeginTime(voucher.getBeginTime());
seckillVoucher.setEndTime(voucher.getEndTime());
seckillVoucherService.save(seckillVoucher);
// 保存秒杀库存到redis
stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());
}

② 基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功

-- 1.参数列表
-- 1.1 优惠券id
local voucherId = ARGV[1]
-- 1.2 用户id
local userId = ARGV[2]-- 2 数据key
-- 2.1 库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2 订单key
local orderKey = 'seckill:order:' .. voucherId-- 3.脚本业务
-- 3.1 判断库存是否充足 get stockKey
if(tonumber(redis.call('get', stockKey)) <= 0) then-- 3.2 库存不足return 1
end
-- 3.2 判断用户是否下单
if(redis.call('sismember', orderKey, userId) == 1) then-- 3.3 存在,说明是重复下单,返回2return 2
end
-- 3.4 扣库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.5 下单(保存用户) sadd orderKey userId
redis.call('sadd', orderKey, userId)
return 0

③ 如果抢购成功,将优惠券id和用户id封装后存入阻塞队列

④ 开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能

@Slf4j
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@Resourceprivate ISeckillVoucherService seckillVoucherService;@Resourceprivate RedisIdWorker redisIdWorker;@Autowiredprivate StringRedisTemplate stringRedisTemplate;@Resourceprivate RedissonClient redissonClient;private static final DefaultRedisScript<Long> SECKILL_SCRIPT;// 初始化脚本static {SECKILL_SCRIPT = new DefaultRedisScript<>();SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));SECKILL_SCRIPT.setResultType(Long.class);}private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();@PostConstruct // 保证在当前类初始化完毕就执行此方法,目的是让VoucherOrderHandler赶快执行private void init() {SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());}private class VoucherOrderHandler implements Runnable {@Overridepublic void run() {while(true){try {// 1.获取队列中的订单信息VoucherOrder voucherOrder = orderTasks.take();// 2.创建订单handleVoucherOrder(voucherOrder);} catch (Exception e) {log.error("处理订单异常", e);}}}}private void handleVoucherOrder(VoucherOrder voucherOrder) {// 1.获取用户Long userId = voucherOrder.getUserId();// 2.创建锁对象// SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);RLock lock = redissonClient.getLock("Lock:order" + userId);//3.获取锁boolean isLock = lock.tryLock();//4.判断是否获取锁成功if (!isLock) {// 获取锁失败,返回错误或重试log.error("不允许重复下单");return;}try {//获取代理对象(事务)// IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();子线程拿不到,只能从主线程里拿proxy.createVoucherOrder(voucherOrder);} finally {// 释放锁lock.unlock();}}private IVoucherOrderService proxy;@Overridepublic Result seckillVoucher(Long voucherId) {Long userId = UserHolder.getUser().getId();// 1.执行lua脚本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(), userId.toString());// 2.判断结果是否为0int r = result.intValue();if (r != 0) {// 2.1 不为0,代表没有购买资格return Result.fail(r == 1 ? "库存不足!" : "不能重复下单!");}// 2.2 为0,有购买资格,把下单信息保存到阻塞队列// 2.3订单VoucherOrder voucherOrder = new VoucherOrder();// 2.4订单idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);// 2.5用户idvoucherOrder.setUserId(userId);// 2.6优惠券idvoucherOrder.setVoucherId(voucherId);// 2.7 放入阻塞队列orderTasks.add(voucherOrder);// 3.获取代理对象proxy = (IVoucherOrderService) AopContext.currentProxy();// 4.返回订单idreturn Result.ok(orderId);}@Transactionalpublic void createVoucherOrder(VoucherOrder voucherOrder) {// 一人一单Long userId = voucherOrder.getUserId();// 1.查询订单int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder).count();// 2.判断是否存在if (count > 0) {// 用户已经购买过了log.error("用户已经购买过一次!");return;}// 扣减库存boolean success = seckillVoucherService.update().setSql("stock = stock - 1") // set stock = stock - 1.eq("voucher_id", voucherOrder).gt("stock", 0) // where id = ? and stock > 0.update();if (!success) {log.error("库存不足");return;}// 创建订单save(voucherOrder);}
}

这里提出三个问题,是我当时不理解的地方,分享出来。

  1. 在实现 handleVocherOrder 方法时,视频里老师说这里其实没必要加锁,为什么可以不加锁?
 private void handleVoucherOrder(VoucherOrder voucherOrder) {// 1.获取用户Long userId = voucherOrder.getUserId();// 2.创建锁对象// SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);RLock lock = redissonClient.getLock("Lock:order" + userId);//3.获取锁boolean isLock = lock.tryLock();//4.判断是否获取锁成功if (!isLock) {// 获取锁失败,返回错误或重试log.error("不允许重复下单");return;}try {//获取代理对象(事务)// IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();子线程拿不到,只能从主线程里拿proxy.createVoucherOrder(voucherOrder);} finally {// 释放锁lock.unlock();}}

我们回看 Lua 脚本,里面实现了检查超卖和一人一单的功能,并且在 SeckillVoucher 方法里面,执行了 Lua 脚本,并且这两个操作是原子操作,也就是说在当前阶段,已经获取到了 voucherOrder 的情况下,voucherOrder 已经被成功创建并放到了阻塞队列中,说明已经保证了不超卖、一人一单

其次,在类初始化时,SECKILL_ORDER_EXECUTOR会启动一个单线程VoucherOrderHandler,执行handleVoucherOrder,本来就是单线程创建订单,就更没必要加锁了。

但是,原视频说的是把加锁当作保底,避免出现意外的问题。

  1. proxy = (IVoucherOrderService) AopContext.currentProxy();这里的代理对象是什么?为什么创建的这个单线程获取不了代理对象?
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);

Spring 的事务(@Transactional)是基于 AOP 实现的。

  • 本质上是用动态代理,在目标方法前后织入事务的开启、提交、回滚操作。
  • 只有通过代理对象执行方法,Spring 才能拦截并生效。
  • 如果在类的内部直接调用this.xxx(),Spring 拦不住->事务不会生效。

比如,

@Transactional
public void createVoucherOrder(...) {// 保存订单
}public void handleVoucherOrder(...) {// this.createVoucherOrder(...); // ❌ 事务失效
}

如果直接 this.createVoucherOrder(),那么事务就会失效。

通过代理对象调用 proxy.createVoucherOrder(...) → 走 AOP → @Transactional 生效。

  1. 为什么不能在创建的线程获取代理对象?
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
  • AopContext.currentProxy()底层是通过 ThreadLocal 保存当前线程的代理对象。
  • SpringAop 拦截器会先把代理对象塞到当前线程的 ThreadLocal 中,方法调用结束后清理。
  • 如果自己新建了进程,他的 ThreadLocal 是空的,所以AopContext.currentProxy()拿到的代理对象为null

所以要在主线程获取代理对象。

  1. 这个异步下单到底是什么东西?怎么就异步了?

在最开始的同步下单方案里,流程是这样的:

请求进来 → 校验库存 & 一人一单 → 创建订单(写库)

  • 秒杀场景下并发极高,如果每个请求都直接写数据库,数据库压力会非常大;
  • 订单创建慢,用户响应也慢,吞吐量上不去。

所以这里优化就是,请求快相应、订单慢慢处理

① 先利用Redis完成库存余量、一人一单判断, 完成抢单业务

② 再将下单业务放入阻塞队列, 利用独立线程异步下单

也就是说,请求线程不再负责创建订单,而是把任务放到 BlockingQueue里,让单线程消费者慢慢处理。

异步过程:

用户请求↓
seckillVoucher (快速校验 → 入队 → 立刻返回订单ID)↓
阻塞队列 (缓存所有订单任务)↓
单线程消费者 VoucherOrderHandler↓
handleVoucherOrder (获取锁 → 调用事务方法)↓
createVoucherOrder (写数据库)
  • 请求线程不再写数据库,只负责“丢任务+返回结果”;
  • 写数据库操作交给后台线程去做(异步处理);
  • 阻塞队列起到缓冲作用,可以削峰填谷,保护数据库。

但是但是,基于阻塞队列的异步秒杀还存在问题。

  1. 内存限制问题,我们使用的是 jdk 的阻塞队列,使用的 JVM 的内存,如果不加以限制,高并发下容易导致内存溢出。所以我们在创建阻塞队列的时候设置了队列的长度。但是满了之后就存不下新的订单了。
  2. 数据安全问题, 现在是基于内存保存订单信息,如果服务突然宕机,那么存储的信息全部会丢失,订单丢失会导致数据不一致问题。
Redis 消息队列实现异步秒杀
认识消息队列

消息队列(Message Queue),字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:

  • 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
  • 生产者:发送消息到消息队列
  • 消费者:从消息队列获取消息并处理消息

同时, Redis提供了三种不同的方式来实现消息队列:

  • list结构: 基于List结构模拟消息队列
  • PubSub:基本的点对点消息模型
  • Stream: 比较完善的消息队列模型
基于 List 结构模拟消息队列

消息队列字面意思就是存放消息的队列,而 List 的底层数据结构是双向链表 ,所以可以基于此模拟消息队列。

因此我们可以利用 LPUSH 和 RPOP 或者 RPUSH 和 LPOP 实现。

不过需要注意的是,当队列中没有消息时再获取消息就会返回 null,不会像 JVM 那样阻塞并等待消息,也就是非阻塞式的。

因此这里应该使用 BLPUSH 和 **BRPOP **实现。

基于 List 的消息队列有哪些优缺点?

优点:

  • 利用 Redis 存储,不受限于 JVM 内存
  • Redis 的持久化机制保证了数据的安全
  • 可以满足消息有序性

缺点:

  • 无法避免消息丢失
  • 只支持但消费者

基于 PubSub 的消息队列

PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。

  • SUBSCRIBE channel [channel]: 订阅一个或多个频道
  • PUBLISH channel msg: 向一个频道发送消息
  • PSUBSCRIBEpattern[pattern]: 订阅与pattern格式匹配的所有频道

基于PubSub的消息队列有哪些优缺点

优点:

  • 采用发布订阅模型,支持多生产、多消费

缺点:

  • 不支持数据持久化。不像 List 是基于双向链表模拟的消息队列,本身 List 就是用来做数据存储的,所以支持数据持久化。而这个是单纯用来做消息发送的,所以不支持数据持久化。
  • 无法避免消息丢失。如果未被订阅,那么 Redis 不会去存储这个消息,发出的消息直接丢失了。
  • 消息堆积有上线,超出时数据丢失。
基于 Stream 的消息队列
Stream 的单消费模式

Stream 是 Redis 5.0 引入的一种新数据类型, 可以实现一个功能非常完善的消息队列。

发送消息的命令:

例如:

读取消息的方式之一:XREAD

注意:

当我们指定起始ID为$时,代表读取最新的消息,如果我们处理一条消息的过程中,又有超过1条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题。

STREAM类型消息队列的XREAD命令特点:

  • 消息可回溯
  • 一个消息可以被多个消费者读取
  • 可以阻塞读取
  • 有消息漏读的风险
Stream 的消费者组模式

消费者组(Consumer Group): 将多个消费者划分到一个组中, 监听同一个队列。具备下列特点:

  • 消息分流

队列中的消息会分流给组内的不同消费者, 而不是重复消费, 从而加快消息处理的速度。

  • 消息标示

消费者组会维护一个标示,记录最后一个被处理的消息,哪怕消费者宕机重启, 还会从标示之后读取消息。确保每一个消息都会被消费

  • 消息确认

消费者获取消息后,消息处于 pending状态,并存入一个 pending-list。当处理完成后需要通过XACK来确认消息,标记消息为已处理,才会从pending-list移除。解决了消息丢失的问题。

创建消费者组:

其他常用命令:

从消费者组读取消息:

  • group: 消费组名称

  • consumer: 消费者名称, 如果消费者不存在, 会自动创建一个消费者

  • count:本次查询的最大数量

  • BLOCK milliseconds: 当没有消息时最长等待时间

  • NOACK: 无需手动ACK, 获取到消息后自动确认STREAMS key: 指定队列名称

  • ID: 获取消息的起始ID:

           ">": 从下一个未消费的消息开始其它: 根据指定id从pending-list中获取已消费但未确认的消息, 例如0, 是从pending-list中的第一个消息开始
    
STREAM类型消息队列的XREADGROUP命令特点:
  • 消息可回溯
  • 可以多消费者争抢消息, 加快消费速度
  • 可以阻塞读取
  • 没有消息漏读的风险
  • 有消息确认机制,保证消息至少被消费一次
小结
ListPubSubStream
消息持久化支持不支持支持
阻塞读取支持支持支持
消息堆积处理受限于内存空间,可以利用多消费者加快处理受限于消费者缓冲区受限于队列长度,可以利用消费者组提高消费速度,减少堆积
消息确认机制不支持不支持支持
消息回溯不支持不支持支持
基于 Stream 消息队列实现异步秒杀

① 创建一个Stream类型的消息队列, 名为stream.orders

② 修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherId、userId、orderId

@Override
public Result seckillVoucher(Long voucherId) {
Long userId = UserHolder.getUser().getId();
// 获取订单id
long orderId = redisIdWorker.nextId("order");
// 1.执行lua脚本
Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(), userId.toString(), String.valueOf(orderId)
);
// 2.判断结果是否为0
int r = result.intValue();
if (r != 0) {// 2.1 不为0,代表没有购买资格return Result.fail(r == 1 ? "库存不足!" : "不能重复下单!");
}
// 3.获取代理对象
proxy = (IVoucherOrderService) AopContext.currentProxy();
// 4.返回订单id
return Result.ok(orderId);}

执行 lua 脚本这块,voucherId.toString(), userId.toString(), String.valueOf(orderId),前两个都是 toString,怎么第三个就变成 valueOf 了呢,是作者笔误了吗,还是这两种写法都对?

其实这么写是对的,这里有个小细节容易忽略

  1. voucherIduserIdLong对象(包装类),可以直接调用toString()方法,因为它们是对象
  2. orderIdlong基本数据类型,就不能直接调用toString(),必须使用String.valueOf(orderId)来转换

③ 项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单

private class VoucherOrderHandler implements Runnable {String queueName = "stream.orders";@Overridepublic void run() {while (true) {try {// 1.获取消息队列中的订单信息 xread group g1 c1 count 1 block 2000 streams stream.orders >List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(queueName, ReadOffset.lastConsumed()));// 2.判断消息是否获取成功if (list == null || list.isEmpty()) {//如果获取失败,说明没有消息,继续下一次循环continue;}// 3. 解析消息中的订单信息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);// 4. 如果获取成功,可以下单handleVoucherOrder(voucherOrder);// 5. ACK确认 ack stream.orders g1 idstringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());} catch (Exception e) {log.error("处理订单异常", e);HandlePendingList();}}}private void HandlePendingList() {while (true) {try {// 1.获取pendingList中的订单信息 xread group g1 c1 count 1  streams stream.orders 0List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1),StreamOffset.create(queueName, ReadOffset.from("0")));// 2.判断消息是否获取成功if (list == null || list.isEmpty()) {//如果获取失败,说明pendingList没有消息,停止循环break;}// 3. 解析消息中的订单信息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);// 4. 如果获取成功,可以下单handleVoucherOrder(voucherOrder);// 5. ACK确认 ack stream.orders g1 idstringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());} catch (Exception e) {log.error("处理pending-list订单异常", e);try {Thread.sleep(20);} catch (InterruptedException ex) {ex.printStackTrace();}}}}
}

如果这篇文章对你有帮助,请点赞、评论、收藏,创作不易,你的支持是我创作的动力。


文章转载自:

http://et7Ho1UP.yjwdg.cn
http://2Qqt3QCQ.yjwdg.cn
http://GM2nBxqo.yjwdg.cn
http://4beM8PwO.yjwdg.cn
http://dmBXirJE.yjwdg.cn
http://WYBvxnO1.yjwdg.cn
http://OGj9eomM.yjwdg.cn
http://Cr31XRR0.yjwdg.cn
http://oyX7ap7s.yjwdg.cn
http://ePS6ig26.yjwdg.cn
http://9fRVvMQX.yjwdg.cn
http://bpafgdjI.yjwdg.cn
http://XCYeAWfS.yjwdg.cn
http://1e3CR55f.yjwdg.cn
http://LAJugWFF.yjwdg.cn
http://RO1kt7wc.yjwdg.cn
http://mEMZLZYe.yjwdg.cn
http://cyDyqsYN.yjwdg.cn
http://2G9LARZ1.yjwdg.cn
http://n1YJGat3.yjwdg.cn
http://r1RSTCrK.yjwdg.cn
http://o0A5074c.yjwdg.cn
http://oCNOVudk.yjwdg.cn
http://0lz4uwSt.yjwdg.cn
http://F4qeQ3O3.yjwdg.cn
http://sTLeHLT1.yjwdg.cn
http://JTygt257.yjwdg.cn
http://y9fNejU4.yjwdg.cn
http://VSc2LeN0.yjwdg.cn
http://6Uszu09k.yjwdg.cn
http://www.dtcms.com/a/371332.html

相关文章:

  • 【基于深度学习的中草药识别系统】
  • AI 驱动数据分析:开源 SQLBot 项目探索,基于大模型和 RAG 实现精准问数与图表挖掘
  • 延迟 队列
  • 宋红康 JVM 笔记 Day14|垃圾回收概述
  • 【ICCV2025】计算机视觉|即插即用|ESC:颠覆Transformer!超强平替,ESC模块性能炸裂!
  • 手机能看、投屏 / 车机不能看与反向链接验证类似吗?
  • Xilinx ZYNQ 开发环境中搭建 Qt 环
  • leetcode909.蛇梯棋
  • JAVA NIO学习笔记基础强化学习总结
  • 基于51单片机手机无线蓝牙APP控制风扇调速设计
  • 力扣hot100:相交链表与反转链表详细思路讲解(160,206)
  • 如何在 DevOps 管道中实现 AI?
  • 【Java基础07】面向对象进阶
  • 动态维护有效区间:滑动窗口
  • 桌面时间 Catime
  • 解锁服务器网络配置新姿势:Wisdom SSH 助力之旅
  • 设计模式:状态模式(State Pattern)
  • 【ARM基础知道】
  • SpringCloud Alibaba微服务--Gateway使用
  • 基于脚手架微服务的视频点播系统-播放控制部分
  • 【C++详解】C++ 智能指针:使用场景、实现原理与内存泄漏防治
  • 【iOS】push,pop和present,dismiss
  • HiCMAE 论文复现:基于 RAVDESS 数据集的音视频情感识别
  • axios的两种异步方式对比
  • uniapp结合uview制作美食页面
  • Spark mapreduce 的一个用法
  • [iOS] push 和 present Controller 的区别
  • 五.贪心算法
  • vue中axios与fetch比较
  • 【iOS】block复习