当前位置: 首页 > news >正文

Redis--黑马点评--基于stream消息队列的秒杀优化业务详解

基于redis的stream结构作为消息队列,实现异步秒杀下单

需求:

  • 创建一个Stream类型的消息队列,名为stream.oreders

  • 修改之前的秒杀下单Lua脚本,在认定有抢够资格后,直接向stream.orders中添加消息,内容包括voucherId、userId、orderId

  • 项目启动后,开启一个线程任务,尝试获取stream.orders中的消息,完成下单

首先,使用命令行来创建消息队列:

image-20250702162909923

其次,需要修改Lua脚本:

------ Generated by EmmyLua(https://github.com/EmmyLua)--- Created by 20893.--- DateTime: 2025/6/27 15:48----- 1.参数列表--1.1优惠券IDlocal voucherId = ARGV[1]--1.2用户IDlocal userId = ARGV[2]-- 1.3 订单IDlocal orderId = ARGV[3]--2.数据key--2.1库存keylocal stockKey = 'seckill:stock:' .. voucherId--2.2订单keylocal orderKey = 'seckill:order:' .. voucherId--3.业务逻辑--3.1判断库存是否充足-- redis.call('get',stockKey)中取出的值是String类型,需要将其转化成int类型,调用tonumber()方法if  (tonumber(redis.call('get', stockKey))<= 0) then--3.2. 库存不足 返回1return 1end--3.3判断用户是否重复下单if (redis.call('sismember', orderKey, userId) == 1) then--3.4. 重复下单 返回2return 2end--3.5扣减库存redis.call('incrby', stockKey, -1)--3.6记录订单redis.call('sadd', orderKey, userId)--3.7发送消息到队列中,xadd stream.orders * k1 v1 ...redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)return 0

还需要修改Java业务代码:

 public Result seckillVoucher(Long voucherId) {//获取用户Long userId = UserHolder.getUser().getId();long orderId = redisIdWorker.nextId("order");//1.执行Lua脚本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT, Collections.emptyList(),voucherId.toString(),userId.toString(),String.valueOf(orderId));//2.判断结果是否为0int r = result.intValue();if (result != 0){//2.1.不为0,代表没有购买资格return Result.fail(r == 1 ? "库存不足" : "不能重复下单");}//获取代理对象proxy = (IVoucherOrderService) AopContext.currentProxy();//3.返回订单IDreturn Result.ok(orderId);

最终,根据我们的伪代码进行对异步线程中业务流程的修改

while(true){//尝试今天队列,使用阻塞模式,最长等待2000毫秒Object msg = redis.call("xreadgroup group g1 c1 count 1 block 200 streams s1 >");if(msg == null){//null说明没有消息,继续下一次continue;}try{//处理消息,完成后需要确认消息(ACK)handleMessage(msg);}catch(Exception e){while(true){Object msg = redis.call("xreadgroup group g1 c1 count 1 block 200 streams s1 0");if(msg == null){//null说明没有异常消息,所有消息已确认,结束循环break;}try{//说明有异常消息,再次处理handleMessage(msg);}catch(Exception e){//再次出现异常,记录日志,继续循环continue;}}}}

代码展示:

 private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();//该注解表示在类初始化之后执行@PostConstructprivate void init(){SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());}private  class VoucherOrderHandler implements Runnable{String queueName = "stream.orders";@Overridepublic void run() {while ( true){try {//获取消息队列中的订单信息 xreadgroup group g1 c1 count 1 block 2000 streams stream.orders >List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(queueName, ReadOffset.lastConsumed()));//判断消息获取是否成功if(list == null || list.isEmpty()){//2.1如果获取失败,说明没有消息,继续下一次循环continue;}//2.2.解析消息中的数据MapRecord<String, Object, Object> entries = list.get(0);Map<Object, Object> value = entries.getValue();//将map对象转换成订单对象VoucherOrder order = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);//2.2.如果获取成功,可以下单handleVoucherOrder(order);//3.ACK确认 sack stream.order g1 idstringRedisTemplate.opsForStream().acknowledge(queueName,"g1",entries.getId());​} catch (Exception e) {log.error("获取订单信息异常",e);handlePendingList();}}​}​private void handlePendingList() {while( true){try {//获取pending-list中的订单信息 xreadgroup group g1 c1 count 1 streams stream.orders 0List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(queueName, ReadOffset.from("0")));//判断消息获取是否成功if(list == null || list.isEmpty()){//2.1如果获取失败,说明pending-list没有消息,继续下一次循环break;}//2.2.解析消息中的数据MapRecord<String, Object, Object> entries = list.get(0);Map<Object, Object> value = entries.getValue();//将map对象转换成订单对象VoucherOrder order = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);//2.2.如果获取成功,可以下单handleVoucherOrder(order);//3.ACK确认 sack stream.order g1 idstringRedisTemplate.opsForStream().acknowledge(queueName,"g1",entries.getId());​} catch (Exception e) {log.error("获取pending-list订单信息异常",e);//如果害怕因为报错导致陷入循环,可以设置休眠时间try {Thread.sleep(20);} catch (InterruptedException ex) {throw new RuntimeException(ex);}}}}}

整体业务流程描述:

首先尝试从消息队列中读取数据,如果数据获取失败,直接进行下一次循环,再来读一次消息队列,如果获取成功,说明有订单信息需要处理,就去解析订单信息,完成下单,在进行ack确认。在进行下一次读取,继续循环,如果在处理消息的过程抛出异常,导致该消息没有确认,那么该消息就会进入pending-list,就被catch到,在catch中执行handlePendinglist()函数,在该函数中,首先去pending-list获取未确认消息,如果读到,则解析消息,处理,下单,ack确认,如果没有异常消息,就会直接跳出循环,异常处理结束,如果再抛异常,就再度循环。直到pending-list中所有异常全部处理完成为止。

进行测试:

image-20250702173402911

image-20250702173423837

image-20250702173452042

image-20250702173524978

再次测试:

image-20250702173728568

至此优化秒杀下单的业务需求完成。

希望对大家有所帮助。

http://www.dtcms.com/a/267370.html

相关文章:

  • 升级到MySQL 8.4,MySQL启动报错:io_setup() failed with EAGAIN
  • 每日算法刷题Day42 7.5:leetcode前缀和3道题,用时2h
  • Node.js worker_threads:并发 vs 并行
  • 洛谷刷题9
  • 如何在idea里快速地切换Windows CMD、git bash、powershell
  • 谷物干燥的滚筒式烘干机的设计cad【11张】三维图+设计说明书+绛重
  • LinkedList剖析
  • OneCode 图表组件核心优势解析
  • Kafka消息积压全面解决方案:从应急处理到系统优化
  • <script setup>中的setup作用以及和不带的区别对比
  • DeepSeek飞机大战小游戏HTML5(附源码)
  • 【动态规划】笔记—完全背包问题
  • opensuse tumbleweed上安装显卡驱动
  • 针对工业触摸屏维修的系统指南和资源获取途径
  • 【Linux】自旋锁和读写锁
  • Day52 神经网络调参指南
  • oracle的诊断文件的学习
  • SpringCloud系列(50)--SpringCloud Stream消息驱动之实现消费者
  • 零基础 “入坑” Java--- 七、数组(二)
  • grom 事务 RowsAffected 踩坑记录
  • 数据结构——栈的讲解(超详细)
  • 深入解析C语言位域
  • 计算故障诊断振动信号的时频域特征,得到特征向量
  • Redis服务器
  • 个人独创-CV领域快速测试缝合模型实战框架讲解-基础篇-Pytorch必学知识
  • 从新闻到知识图谱:用大模型和知识工程“八步成诗”打造科技并购大脑
  • MySQL 数据库传统方式部署主从架构的实现很详细
  • C语言socket编程-补充
  • MOS管(MOSFET)和三极管(BJT)和IGBT的区别
  • 【赵渝强老师】Oracle RMAN的目录数据库