高并发场景下抢单业务解决方案实现(乐观锁 + 分布式锁)
目录
抢单基本优化思路
第一种 设置数据库事务的隔离级别
第二种 使用乐观锁解决,通过版本号进行控制
第三种 加锁解决:
synchronized 及lock锁,本地锁
基于 Redis 做分布式锁
总结:
基于 REDISSON 做分布式锁
概述:
本文总结了抢单场景下的并发优化方案:1. 数据库层面通过设置Serializable隔离级别或乐观锁(版本号控制)实现;2. 本地锁(synchronized/Lock)在集群环境失效,需使用Redis分布式锁;3. Redis分布式锁通过SETNX+过期时间实现,需配合UUID防误删和Lua脚本保证原子性;4. 推荐使用Redisson框架,它内置看门狗机制实现锁自动续期,提供更完善的分布式锁功能。最后通过司机抢单案例演示了Redisson分布式锁的实际应用,包括锁获取、业务处理和解锁的完整流程。
抢单基本优化思路
第一种 设置数据库事务的隔离级别
设置为Serializable,效率低下。
第二种 使用乐观锁解决,通过版本号进行控制
基础sql语句,实现抢单:
update order_info set status =2 ,driver_id = ?,accept_time = ? where id=?
如果使用上面语句,产生问题:只要订单接单标识没有删除,如果有很多线程请求过来,都会去更新sql语句,造成,最后提交的把之前提交数据覆盖。
如果使用乐观锁解决,添加版本号:
# 版本号
update order_info set status =2 ,driver_id = ?,accept_time = ? where id=? and status = 1
在 where 后面添加条件,status=1 ,相当于添加版本号。
//司机抢单:乐观锁方案解决并发问题
public Boolean robNewOrder1(Long driverId, Long orderId) {//判断订单是否存在,通过Redis,减少数据库压力if(!redisTemplate.hasKey(RedisConstant.ORDER_ACCEPT_MARK)) {//抢单失败throw new GuiguException(ResultCodeEnum.COB_NEW_ORDER_FAIL);}//司机抢单//update order_info set status =2 ,driver_id = ?,accept_time = ?// where id=? and status = 1LambdaQueryWrapper<OrderInfo> wrapper = new LambdaQueryWrapper<>();wrapper.eq(OrderInfo::getId,orderId);wrapper.eq(OrderInfo::getStatus,1);//修改值OrderInfo orderInfo = new OrderInfo();orderInfo.setStatus(2);orderInfo.setDriverId(driverId);orderInfo.setAcceptTime(new Date());//调用方法修改int rows = orderInfoMapper.update(orderInfo,wrapper);if(rows != 1) {//抢单失败throw new GuiguException(ResultCodeEnum.COB_NEW_ORDER_FAIL);}//删除抢单标识redisTemplate.delete(RedisConstant.ORDER_ACCEPT_MARK);return true;
}
第三种 加锁解决:
synchronized 及lock锁,本地锁
(以num不断+1为例,使用 jmeter 压力测试工具调试)
@Service
public class TestServiceImpl implements TestService{@Autowiredprivate StringRedisTemplate redisTemplate;@Overridepublic synchronized void testLock() {//从redis里面获取数据String value = redisTemplate.opsForValue().get("num");if(StringUtils.isBlank(value)) {return;}//把从redis获取数据+1int num = Integer.parseInt(value);//数据+1之后放回到redis里面redisTemplate.opsForValue().set("num",String.valueOf(++num));}
}
可以发现,在集群模式下,本地锁无法满足业务需求,所以使用分布式锁方式解决相关问题:
基于 Redis 做分布式锁
基于 REDIS 的 SETNX()、EXPIRE() 方法做分布式锁:
- setnx(lockkey, 1) 如果返回 0,则说明占位失败;如果返回 1,则说明占位成功
- expire() 命令对 lockkey 设置超时时间,为的是避免死锁问题。
- 执行完业务代码后,可以通过 delete 命令删除 key
步骤解析:
- 使用 redisTemplate.opsForValue().setIfAbsent("lock", "lock") 方法获取lock锁,返回一个布尔值,如果值为true代表拿到当前锁。
- 判断是否获取到锁,如果获取到锁,则执行业务操作。
- 使用 redisTemplate.delete("lock") 释放锁。(注意!!!这样写一但在释放锁前出现异常,那么会无法释放锁,两种解决方案:
①try/finally语句
②设置过期时间:设置过期时间有两种方式:首先想到通过expire设置过期时间(缺乏原子性:如果在setnx和expire之间出现异常,锁也无法释放);其次可以在set时指定过期时间(推荐)
@Override
public void testLock() {//从redis里面获取数据//1 获取当前锁 setnx + 设置过期时间// Boolean ifAbsent = redisTemplate.opsForValue().setIfAbsent("lock", "lock");Boolean ifAbsent = redisTemplate.opsForValue().setIfAbsent("lock", "lock",10, TimeUnit.SECONDS);//2 如果获取到锁,从redis获取数据 数据+1 放回redis里面if(ifAbsent) {//获取锁成功,执行业务代码//1.先从redis中通过key num获取值 key提前手动设置 num 初始值:0String value = redisTemplate.opsForValue().get("num");//2.如果值为空则非法直接返回即可if (StringUtils.isBlank(value)) {return;}//3.对num值进行自增加一int num = Integer.parseInt(value);redisTemplate.opsForValue().set("num", String.valueOf(++num));//3 释放锁redisTemplate.delete("lock");} else {try {Thread.sleep(100);this.testLock();} catch (InterruptedException e) {e.printStackTrace();}}
}
但是这样会出现一种情况:虽然我们的线程1还没有结束,但是由于锁设定的时间到期而被释放销毁,此时线程2就能够开始获取锁,过一段时间后线程1结束就会要释放锁,这个时候释放的锁就是线程2刚加上去的锁,所以导致线程安全问题。
使用setnx+过期时间实现分布式锁,存在问题:删除不是自己的锁,锁误删。
解决方案:在获取锁的过程存入唯一标识(可用UUID表示),以便于在释放锁去判断这个锁是不是自己的,如果是则释放,如果不是则不释放。(总而言之:设置唯一标识的目的是判断释放锁是不是同一个线程获取的,以此来避免类似线程2创建的锁被线程1释放引发的线程安全问题)
步骤解析:
- 使用UUID创建唯一标识uuid,随后使用setnx获取当前锁(key:lock,value:uuid)。
- 获取到锁后,执行业务逻辑。
- 释放锁过程,需先判断当前拿到的redis锁的value是否为uuid,如果是则释放当前锁。
//uuid防止误删
@Override
public void testLock() {//从redis里面获取数据String uuid = UUID.randomUUID().toString();//1 获取当前锁 setnx + 设置过期时间// Boolean ifAbsent = redisTemplate.opsForValue().setIfAbsent("lock", "lock");Boolean ifAbsent =redisTemplate.opsForValue().setIfAbsent("lock", uuid,3, TimeUnit.SECONDS);//2 如果获取到锁,从redis获取数据 数据+1 放回redis里面if(ifAbsent) {//获取锁成功,执行业务代码//1.先从redis中通过key num获取值 key提前手动设置 num 初始值:0String value = redisTemplate.opsForValue().get("num");//2.如果值为空则非法直接返回即可if (StringUtils.isBlank(value)) {return;}//3.对num值进行自增加一int num = Integer.parseInt(value);redisTemplate.opsForValue().set("num", String.valueOf(++num));//出现异常//3 释放锁String redisUuid = redisTemplate.opsForValue().get("lock");if(uuid.equals(redisUuid)) {redisTemplate.delete("lock");}} else {try {Thread.sleep(100);this.testLock();} catch (InterruptedException e) {e.printStackTrace();}}
}
通过uuid防止误删,但是还是存在问题,不具备原子性的
所以这个时候可以使用 Lua 脚本保证原子性删除操作:
步骤解析:
- 前面操作一致,从删锁开始....
- 使用 redisTemplate.execute(redisScript, Arrays.asList("lock"),uuid) 方法原子性删除锁。
/*** 采用SpringDataRedis实现分布式锁* 原理:执行业务方法前先尝试获取锁(setnx存入key val),如果获取锁成功再执行业务代码,业务执行完毕后将锁释放(del key)*/
@Override
public void testLock() {//0.先尝试获取锁 setnx key val//问题:锁可能存在线程间相互释放//Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent("lock", "lock", 10, TimeUnit.SECONDS);//解决:锁值设置为uuidString uuid = UUID.randomUUID().toString();Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid, 10, TimeUnit.SECONDS);if(flag){//获取锁成功,执行业务代码//1.先从redis中通过key num获取值 key提前手动设置 num 初始值:0String value = stringRedisTemplate.opsForValue().get("num");//2.如果值为空则非法直接返回即可if (StringUtils.isBlank(value)) {return;}//3.对num值进行自增加一int num = Integer.parseInt(value);stringRedisTemplate.opsForValue().set("num", String.valueOf(++num));//4.将锁释放 判断uuid//问题:删除操作缺乏原子性。//if(uuid.equals(stringRedisTemplate.opsForValue().get("lock"))){ //线程一:判断是满足是当前线程锁的值// //条件满足,此时锁正好到期,redis锁自动释放了线程2获取锁成功,线程1将线程2的锁删除// stringRedisTemplate.delete("lock");//}//解决:redis执行lua脚本保证原子,lua脚本执行会作为一个整体执行//执行脚本参数 参数1:脚本对象封装lua脚本,参数二:lua脚本中需要key参数(KEYS[i]) 参数三:lua脚本中需要参数值 ARGV[i]//4.1 先创建脚本对象 DefaultRedisScript泛型脚本语言返回值类型 Long 0:失败 1:成功DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();//4.2设置脚本文本String script = "if redis.call(\"get\",KEYS[1]) == ARGV[1]\n" +"then\n" +" return redis.call(\"del\",KEYS[1])\n" +"else\n" +" return 0\n" +"end";redisScript.setScriptText(script);//4.3 设置响应类型redisScript.setResultType(Long.class);stringRedisTemplate.execute(redisScript, Arrays.asList("lock"), uuid);}else{try {//睡眠Thread.sleep(100);//自旋重试this.testLock();} catch (InterruptedException e) {e.printStackTrace();}}
}
总结:
为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:
第一个:互斥性,在任何时刻,只有一个客户端能持有锁。
第二个:不会发生死锁,即使有一个客户端在获取锁操作时候崩溃了,也能保证其他客户端能获取到锁。
第三个:解铃还须系铃人,解锁加锁必须同一个客户端操作。
第四个:加锁和解锁必须具备原子性
基于 REDISSON 做分布式锁
概述:
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。
-
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。
-
Redisson的宗旨是:促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。
官方文档地址:https://github.com/Redisson/Redisson/wikihttps://github.com/Redisson/Redisson/wiki
Github 地址:https://github.com/Redisson/Redissonhttps://github.com/Redisson/Redisson
首先引入Redission依赖:
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId>
</dependency>
随后创建 Redisson 配置类:
@Data
@Configuration
@ConfigurationProperties(prefix = "spring.data.redis")
public class RedissonConfig {private String host;private String password;private String port;private int timeout = 3000;private static String ADDRESS_PREFIX = "redis://";/*** 自动装配**/@BeanRedissonClient redissonSingle() {Config config = new Config();if(!StringUtils.hasText(host)){throw new RuntimeException("host is empty");}SingleServerConfig serverConfig = config.useSingleServer().setAddress(ADDRESS_PREFIX + this.host + ":" + port).setTimeout(this.timeout);if(StringUtils.hasText(this.password)) {serverConfig.setPassword(this.password);}return Redisson.create(config);}
}
注意:这里读取了一个名为 RedisProperties 的属性,因为我们引入了SpringDataRedis,Spring已经自动加载了RedisProperties,并且读取了配置文件中的Redis信息。
测试:
@Autowired
private RedissonClient redissonClient;//Redisson实现
@Override
public void testLock() {//1 通过redisson创建锁对象RLock lock = redissonClient.getLock("lock1");//2 尝试获取锁//(1) 阻塞一直等待直到获取到,获取锁之后,默认过期时间30slock.lock();//(2) 获取到锁,锁过期时间10s// lock.lock(10,TimeUnit.SECONDS);//(3) 第一个参数获取锁等待时间// 第二个参数获取到锁,锁过期时间// try {// // true// boolean tryLock = lock.tryLock(30, 10, TimeUnit.SECONDS);// } catch (InterruptedException e) {// throw new RuntimeException(e);// }//3 编写业务代码//1.先从redis中通过key num获取值 key提前手动设置 num 初始值:0String value = redisTemplate.opsForValue().get("num");//2.如果值为空则非法直接返回即可if (StringUtils.isBlank(value)) {return;}//3.对num值进行自增加一int num = Integer.parseInt(value);redisTemplate.opsForValue().set("num", String.valueOf(++num));//4 释放锁lock.unlock();
}
基于 Redis 的 Redisson 分布式可重入锁 RLock Java 对象实现了 java.util.concurrent.locks.Lock 接口。如果负责储存这个分布式锁的 Redisson 节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改 Config.lockWatchdogTimeout 来另行指定。
另外Redisson还通过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。
看门狗原理:
只要线程一加锁成功,就会启动一个watch dog
看门狗,它是一个后台线程,会每隔10
秒检查一下,如果线程一还持有锁,那么就会不断的延长锁key
的生存时间。因此,Redisson
就是使用Redisson
解决了锁过期释放,业务没执行完问题。
1、如果我们指定了锁的超时时间,就发送给Redis执行脚本,进行占锁,默认超时就是我们制定的时间,不会自动续期;
2、如果我们未指定锁的超时时间,就使用
lockWatchdogTimeout = 30 * 1000
【看门狗默认时间】
之后在业务场景下使用Redission:
以司机抢单为例:
步骤详情:
- 首先判断订单是否存在。
- 使用 redissonClient.getLock() 创建锁:
其中 RedisConstant.ROB_NEW_ORDER_LOCK + orderId 是分布式锁的名字。RLock lock = redissonClient.getLock(RedisConstant.ROB_NEW_ORDER_LOCK + orderId);
随后使用 lock.tryLock() 方法获取锁:
boolean flag = lock.tryLock(RedisConstant.ROB_NEW_ORDER_LOCK_WAIT_TIME,RedisConstant.ROB_NEW_ORDER_LOCK_LEASE_TIME, TimeUnit.SECONDS);
参数:等待超时时间,锁自动释放时间,时间单位
判断获取锁是否成功,若成功则执行业务逻辑。
如果锁存在,则释放锁 lock.unlock()。
@Autowired
private RedissonClient redissonClient;//Redisson分布式锁
//司机抢单
@Override
public Boolean robNewOrder(Long driverId, Long orderId) {//判断订单是否存在,通过Redis,减少数据库压力if(!redisTemplate.hasKey(RedisConstant.ORDER_ACCEPT_MARK)) {//抢单失败throw new GuiguException(ResultCodeEnum.COB_NEW_ORDER_FAIL);}//创建锁RLock lock = redissonClient.getLock(RedisConstant.ROB_NEW_ORDER_LOCK + orderId);try {//获取锁boolean flag = lock.tryLock(RedisConstant.ROB_NEW_ORDER_LOCK_WAIT_TIME,RedisConstant.ROB_NEW_ORDER_LOCK_LEASE_TIME, TimeUnit.SECONDS);if(flag) {if(!redisTemplate.hasKey(RedisConstant.ORDER_ACCEPT_MARK)) {//抢单失败throw new GuiguException(ResultCodeEnum.COB_NEW_ORDER_FAIL);}//司机抢单//修改order_info表订单状态值2:已经接单 + 司机id + 司机接单时间//修改条件:根据订单idLambdaQueryWrapper<OrderInfo> wrapper = new LambdaQueryWrapper<>();wrapper.eq(OrderInfo::getId,orderId);OrderInfo orderInfo = orderInfoMapper.selectOne(wrapper);//设置orderInfo.setStatus(OrderStatus.ACCEPTED.getStatus());orderInfo.setDriverId(driverId);orderInfo.setAcceptTime(new Date());//调用方法修改int rows = orderInfoMapper.updateById(orderInfo);if(rows != 1) {//抢单失败throw new GuiguException(ResultCodeEnum.COB_NEW_ORDER_FAIL);}//删除抢单标识redisTemplate.delete(RedisConstant.ORDER_ACCEPT_MARK);}}catch (Exception e) {//抢单失败throw new GuiguException(ResultCodeEnum.COB_NEW_ORDER_FAIL);}finally {//释放if(lock.isLocked()) {lock.unlock();}}return true;
}