RuoYi .net-实现商城秒杀下单(redis,rabbitmq)
经过一周的学习,终于对redis,rabbitmq的使用有了初步的了解,谨以此文记录我的学习过程。我将整个流程分为,秒杀模块和消息创建订单模块。整个demo只有两个接口一个是提交秒杀submit,二是处理消息scanMessages,其中submit由用户点击调用,scanMessages可以作为定时任务,自动扫描消息,处理订单相关逻辑。在这里我做成手动调用,方便学习。redis使用的是Stackexchange.redis rabbit使用的是masstransit
一、流程思想介绍
1.秒杀模块
在项目启动时,我会初始化三个商品。即在数据库手动加入三个商品,和手动在redis中创建这三个商品的库存键值对。
用户发起秒杀请求:
1. 获取分布式锁 :获取锁失败返回秒杀拥挤
2. 检查重复下单 :通过 Redis 检查用户是否已下单,已下单释放锁 → 返回"不可重复下单"
3. 检查Redis库存:判断库存是否充足 库存不足 释放锁 → 返回"库存不足"
4. Redis扣减库存
5. 检查扣减结果:判断扣减后库存是否<0 库存不足 Redis回滚 → 释放锁 → 返回"库存不足"
6. 数据库事务 :扣减数据库库存 + 记录本地消息 事务失败Redis回滚 → 释放锁 → 返回"秒杀失败"
7. 处理成功 :释放锁 → 记录用户下单 → 返回"秒杀成功"
在第六步中,解释一下:用户秒杀成功后,此时先减了商品库存,但是还没有生成订单,如何生成订单呢? 我做了以下操作:减完库存,立马new一个消息体,并保存在数据库中,此时消息的状态是0,意思是消息被创建成功了但是还没有发送,当scanMessages扫描到有状态为0的消息时,会把消息进行发布,详情我在订单模块解释。当发送成功后会改变消息的状态为1,意思是消息已发送。
秒杀模块通过 “Redis 前置拦截 + 分布式锁控制并发 + 数据库事务保证一致性 + 本地消息表异步通知” 的组合方案。我用jmeter测试500个线程同时秒杀,并未出现差错。当然还有瑕疵请批评指正。
------------------------------------------------到此秒杀模块就已结束-------------------------------------------------
2.订单创建模块
用户秒杀成功后,就应该创建相应的订单,首先扫描消息表中状态为0(消息已创建,但是还没有发送)的消息记录,发送该条消息的订单号,商品号和用户的id.消息发送成功则更改消息的状态为1(消息已发送)。第一个消费者出现了,CreateOrderConsumer意思是创建订单消费者,负责接收 “库存扣减成功” 的消息并创建订单。
1。幂等性检查:消息队列可能因网络重试、服务重启等原因重复投递消息,必须通过订单号唯一标识确保 “一次秒杀只创建一个订单”。
2.创建订单:在秒杀过程中只是new了一个订单号,并没有把订单存在数据中,此时要存入数据库,此时的订单状态为未支付。
3.发送订单确认消息:第二个消费者,告诉它订单已经创建成功,把消息的状态改为2意思是已完成。至此整个秒杀过程结束。
读到这里你可能会有疑问,在scanMessages接口中,如果创建消息失败了怎么办,此时已经扣减了redis和mysql的值。这确实是一个严重的问题,会直接导致库存已扣减但订单未创建的数据不一致。有以下解决办法:第一步失败重试,在消息表中增加retry字段,每次重试都+1,当达到规定的次数,才把消息标记为失败。第二步超时补偿,当消息重试达到最大次数仍失败,需要主动回滚库存,避免库存永久锁定。补偿逻辑大概为(回滚 MySQL 库存和 Redis 库存,删除用户下单记录,允许用户重新参与秒杀。消息状态改为 “4 - 已补偿”,避免重复处理。)第三步若是补偿也失败,那就手动处理吧。
------------------------------------------------到此订单创建模块就已结束-------------------------------------------
二、代码介绍
由于代码文件较多,我打算不全部贴出,而是以链接的形式,方便观看。以下我贴出两个接口的代码。所有自己封装的方法,我将以url的方式出现。
1.秒杀submit
秒杀这里封装了StackExchangeRedisHelper和UseTranAsync。前者是连接redis单例用的,后者是处理异步事务。对了还有个雪花算法_snowflakeGenerator.NextId(randomNum)
#region SubmitSeckill
/// <summary>
/// 秒杀
/// </summary>
[HttpPost("submit")]
[AllowAnonymous]
public async Task<IActionResult> SubmitSeckill(long goodsId, long userId)
{if (goodsId <= 0 || userId <= 0)return BadRequest("参数错误");//redis keystring userRecordKey = $"usersession:userRecord:userId{userId}"; // 用户已购标记string stockKey = $"usersession:seckill:stock:{goodsId}"; // 剩余库存缓存string lockKey = $"usersession:seckill:lock:{goodsId}"; // 分布式锁var lockVal = Guid.NewGuid().ToString(); // 锁唯一值,用于安全释放/* ---------- 1. 拿锁 ---------- */var getKey = await StackExchangeRedisHelper.Db.StringSetAsync(lockKey, lockVal, TimeSpan.FromSeconds(30), When.NotExists);if (!getKey)return ToResponse(ResultCode.CUSTOM_ERROR, "秒杀拥挤");/* ---------- 2. 判断重复下单 ---------- */bool reBuy = await StackExchangeRedisHelper.Db.KeyExistsAsync(userRecordKey);if (reBuy)return ToResponse(ResultCode.CUSTOM_ERROR, "不可重复下单");/* ---------- 3. 检查库存 ---------- */var stock = await StackExchangeRedisHelper.Db.StringGetAsync(stockKey);if ((int)stock <= 0)return ToResponse(ResultCode.CUSTOM_ERROR, "库存不足");/* ---------- 4. 减库存 ---------- */long newStock = await StackExchangeRedisHelper.Db.StringDecrementAsync(stockKey);if (newStock < 0){// 库存为负,回滚await StackExchangeRedisHelper.Db.StringIncrementAsync(stockKey);return ToResponse(ResultCode.CUSTOM_ERROR, "库存不足");}bool dbok = false; // 数据库事务是否成功string mess = ""; // 事务失败原因try{/* ---------- 5. 数据库事务 ---------- */(dbok, mess) = await _SqlSugarClient.UseTranAsync(async () =>{// 再次查库确保数据存在SeckillStock seckillStock = await _SeckillStockService.GetFirstAsync(x => x.GoodsId == goodsId);if (seckillStock == null)throw new Exception("商品不存在");/* 原子扣减库存(乐观锁) */var updateResult = await _SqlSugarClient.Updateable<SeckillStock>().SetColumns(x => x.StockCount == x.StockCount - 1).Where(x => x.GoodsId == goodsId && x.StockCount > 0).ExecuteCommandAsync();if (updateResult <= 0)throw new Exception("库存扣减失败");/* ---------- 6. 构造并记录消息,待发送到 RabbitMQ ---------- */var _snowflakeGenerator = new SnowflakeIdGenerator();var randomNum = RandomHelper.GenerateNum4();var orderNo = _snowflakeGenerator.NextId(randomNum);StockMessage stockMessage = new StockMessage{OrderNo = orderNo.ToString(),GoodsId = goodsId,UserId = userId,MessageStatus = "0",CreateTime = DateTime.Now,};_StockMessageService.Add(stockMessage); // 写入消息表});}catch (Exception e){// 事务异常日志(仅观察)var afterRollback = await _SeckillStockService.GetFirstAsync(x => x.GoodsId == goodsId);Console.WriteLine($"回滚后库存catchasdada:{afterRollback.StockCount}");}finally{/* ---------- 7. 只要数据库失败,就回滚Redis库存 ---------- */if (!dbok){await StackExchangeRedisHelper.Db.StringIncrementAsync(stockKey);}/* ---------- 8. 释放分布式锁 ---------- */try{var lua = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";await StackExchangeRedisHelper.Db.ScriptEvaluateAsync(lua,new RedisKey[] { lockKey },new RedisValue[] { lockVal });}catch (Exception ex){Console.WriteLine(ex + "释放 Redis 锁失败,lockKey: {lockKey}" + lockKey);}}if (!dbok)return ToResponse(ResultCode.CUSTOM_ERROR, mess);/* ---------- 9. 记录用户已购标记 ---------- */await StackExchangeRedisHelper.Db.StringSetAsync(userRecordKey, 1, TimeSpan.FromMinutes(30));return SUCCESS("秒杀成功");
}
#endregion
2.订单scanMessages
订单模块了用了封装的masstransit的配置 , , 订单创建消费者 ,消息确认消费者 ,消费者实体类,
#region ScanMessages
/// <summary>
/// GetOrderStatus
/// </summary>
[HttpGet("scanMessages")]
[AllowAnonymous]
public async Task<IActionResult> ScanMessages()
{// 1. 查出所有待发送(Status=0)的消息var messages = await _StockMessageService.GetListAsync(x => x.MessageStatus == "0");// 2. 逐条发送到 RabbitMQforeach (var item in messages){try{/* ---------- 构建并发送消息 ---------- */var uri = new Uri("exchange:createOrderConsumer.direct?type=direct");var endPoint = await _bus.GetSendEndpoint(uri);if (endPoint == null)throw new Exception("消息创建失败");await endPoint.Send(new CreateOrderMessage{OrderNo = item.OrderNo,GoodsId = item.GoodsId,UserId = item.UserId}, ctx =>{ctx.SetRoutingKey("orderpublic");});item.MessageStatus = "1"; // 发送成功}catch (Exception ex){item.MessageStatus = "3"; // 发送失败}item.UpdateTime = DateTime.Now;await _StockMessageService.UpdateAsync(item); // 无论成功/失败都落库}return SUCCESS("消息扫描完成");
}
#endregion
