Redis 学习笔记 5:分布式锁
Redis 学习笔记 5:分布式锁
在前文中学习了如何基于 Redis 创建一个简单的分布式锁。虽然在大多数情况下这个锁已经可以满足需要,但其依然存在以下缺陷:
事实上一般而言,我们可以直接使用 Redisson 提供的分布式锁而非自己创建。
Redisson
添加 Redisson 依赖:
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.13.6</version>
</dependency>
创建 Redisson 客户端实例:
@Configuration
public class RedisConfig {@AutowiredRedisProperties redisProperties;@Beanpublic RedissonClient redissonClient(){Config config = new Config();String address = String.format("redis://%s:%s", redisProperties.getHost(), redisProperties.getPort());config.useSingleServer().setAddress(address).setPassword(redisProperties.getPassword());return Redisson.create(config);}
}
这里的RedisProperties
是 Spring-data-redis 自带的一个配置类,可以借助其直接从配置文件中读取 redis 相关配置信息。
也可以通过修改配置文件的方式配置 Redisson 客户端,但缺点是会变更 spring-data-redis 对 Redis 客户端的默认配置,所以不建议那样做。
使用 Redisson 提供的分布式锁限制优惠券抢购:
// ...
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {// ...@Autowiredprivate RedissonClient redissonClient;// ...@Overridepublic Result createOrder(Long voucherId) {// ...// 使用用户标识进行加锁String lockName = "lock:voucher-order:" + userId.toString();RLock lock = redissonClient.getLock(lockName);boolean isLock = lock.tryLock();if (!isLock) {return Result.fail("同一用户不能重复抢购");}try {return proxy.doCreateOrder(voucherId);} finally {lock.unlock();}}// ...
}
Redisson 提供的分布式锁RLock
有多种加锁方式,这里展示的tryLock()
是非阻塞式的加锁,如果获取锁失败,会立即返回。如果需要阻塞式获取锁(获取锁失败时等待并尝试获取),可以:
isLock = lock.tryLock(1,10, TimeUnit.SECONDS);
这里第一个参数是等待时长,第二个参数是 Redis 锁的过期时长。
可重入锁原理
Redisson 提供的分布式锁是可重入的,其原理和 JDK 提供的用于处理并发的可重入锁ReentrantLock
是类似的。即在锁内部使用一个计数器,当一个线程多次获取同一个锁时,将计数器自增以记录已经重复获取锁的次数。在释放锁的时候将计数器减1,当计数器为0时才真正释放锁。
下面通过改造前文的 Redis 分布式锁,让其支持再入以说明 Redisson 锁可再入的实现原理。
在改造 Redis 锁前,先编写一个测试用例来证明目前的 Redis 锁不支持重入:
// ...
@Log4j2
@SpringBootTest
public class RedisLockTests {@Autowiredprivate StringRedisTemplate redisTemplate;/*** 测试 Redis 锁是否可以再入*/@Testpublic void test() {SimpleRedisLock lock = new SimpleRedisLock(redisTemplate, "test");boolean tryLock = lock.tryLock(2000);if (!tryLock) {log.error("test获取锁失败");return;}try {log.info("test获取锁成功");log.info("test执行业务代码");test2(lock);} finally {lock.unlock();log.info("test释放锁");}}private void test2(SimpleRedisLock lock) {boolean tryLock = lock.tryLock(2000);if (!tryLock) {log.error("test2获取锁失败");return;}try {log.info("test2获取锁成功");log.info("test2执行业务代码");} finally {lock.unlock();log.info("test2释放锁");}}
}
因为像前面说的,重入锁需要有一个计数器,同时还需要持有一个线程 ID 以检查是否当前线程持有的锁,因此不能再使用 Redis 中的 key-value 结构作为锁,改为使用 Hash 来同时保存这两个信息:
[外链图片转存中…(img-otcOkwkN-1747619754081)]
这里将线程 ID 直接作为字典的 key 以节省存储空间。
现在的问题就是在获取锁和释放锁的部分加入计数器维护的逻辑。但就像在前文引入 Lua 脚本时讨论的那样,显然这些操作不能通过 Java 实现,因为那样做不能保证操作的原子性,因此需要用 Lua 脚本来实现:
--[[@描述: Redis 锁获取脚本(支持再入)@版本: 1.0.0
]] --
local key = KEYS[1] -- Redis 锁的对应的 key
local threadId = ARGV[1] -- 持有锁的线程标识
local timeoutSec = ARGV[2] -- 锁的自动过期时长(单位秒)
-- 检查锁是否已经存在
local exists = redis.call('exists', key)
if (exists == 0) then-- 如果锁不存在,添加(正常获取到锁)redis.call('hset', key, threadId, 1)-- 更新锁的过期时间redis.call('expire', key, timeoutSec)return 1
end
-- 如果锁存在,检查是否当前线程的锁
if (redis.call('HEXISTS', key, threadId) == 0) then-- 如果不是当前线程的锁,返回错误信息(互斥,没有获取到锁)return 0
end
-- 是当前线程的锁(再入)
-- 计数器+1
redis.call('HINCRBY', key, threadId, 1)
-- 更新过期时长
redis.call('expire', key, timeoutSec)
return 1
--[[@描述:Redis 锁释放(支持再入)
]] --
local key = KEYS[1] -- Redis 锁的对应的 key
local threadId = ARGV[1] -- 持有锁的线程标识
local timeoutSec = ARGV[2] -- 锁的自动过期时长(单位秒)
-- 检查锁是否已经存在
if (redis.call('exists', key) == 0) then-- 锁不存在,返回错误信息return 0
end
-- 锁存在,检查是否当前线程持有的锁
if (redis.call('HEXISTS', key, threadId) == 0) then-- 不是当前线程持有的锁,返回错误信息return 0
end
-- 是当前线程持有的锁,计数器-1
redis.call('HINCRBY', key, threadId, -1)
-- 如果计数器小于等于0,删除锁
if (tonumber(redis.call('HGET', key, threadId)) <= 0) thenredis.call('del', key)return 1
end
-- 如果计数器还未归0,更新锁的有效时长
redis.call('expire', key, timeoutSec)
return 1
需要注意的是,与之前不同的是,再次获取锁和释放锁的时候都需要更新锁的有效时长,以确保之后的业务能在锁生效期内正常执行完毕。
修改锁的实现类,用 Lua 脚本获取和释放锁:
// ...
public class SimpleRedisLock implements ILock {// ...// Redis 锁获取脚本private static final DefaultRedisScript<Long> LOCK_SCRIPT;static {LOCK_SCRIPT = new DefaultRedisScript<>();// 指定脚本的位置LOCK_SCRIPT.setLocation(new ClassPathResource("reentrant-lock.lua"));// 指定脚本的返回值类型LOCK_SCRIPT.setResultType(Long.class);}// Redis 锁释放脚本private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;static {UNLOCK_SCRIPT = new DefaultRedisScript<>();// 指定脚本的位置UNLOCK_SCRIPT.setLocation(new ClassPathResource("reentrant-unlock.lua"));// 指定脚本的返回值类型UNLOCK_SCRIPT.setResultType(Long.class);}// ...@Overridepublic boolean tryLock(long timeoutSec) {final String jvmThreadId = getJvmThreadId();Long res = stringRedisTemplate.execute(LOCK_SCRIPT,Collections.singletonList(redisKey),jvmThreadId,Long.toString(timeoutSec));return res != null && res > 0;}// ...@Overridepublic void unlock(long timeoutSec) {// 使用 lua 脚本删除 Redis 锁stringRedisTemplate.execute(UNLOCK_SCRIPT,Collections.singletonList(redisKey),getJvmThreadId(),Long.toString(timeoutSec));}@Overridepublic void unlock() {unlock(200);}
}
查看 Redisson 源码可以发现,其实现方式和上文描述的是类似的。
重试机制
Redisson 提供的分布式锁具备获取锁失败后进行重试的机制,且这种机制是基于 Redis 订阅和信号量的方式实现的,会有效避免 CPU 计算资源的浪费。此外,调用 API 时如果指定锁的过期时长为 -1,Redisson 会将锁的在 Redis 中的有效时长设置一个默认值(30秒),并启动一个守护进程(WatchDog)来定期重新刷新其有效时长,以保证该锁的长期有效。在释放锁的时候,该守护进程会被终止。
整个过程可以用下图表示:
这里的
ttl
指获取锁的 Lua 脚本的返回值,如果锁成功获取,会返回 null,获取锁失败,会返回锁的剩余有效时长。
详细的源码分析和说明可以参考这个视频。
联锁
如果 Redisson 实现的锁是基于单个 Redis 的,那么是没有问题的,反之,如果是主从同步的集群,之前所使用的锁就会存在问题:
如果从主节点获取锁成功,但还未将锁同步到其他从节点时主节点宕机,锁就会“丢失”。
这个问题可以用联锁来解决:
即不使用主从,而是使用多台独立的 Redis 获取锁,只有从所有 Redis 获取锁成功,才算是成功,否则视为获取锁失败。在这种情况下,任意 Redis 宕机都不会导致锁失效。
为了演示,额外启动两个 Redis 实例:
docker run --name my-redis -p 6380:6379 -d redis
docker run --name my-redis2 -p 6381:6379 -d redis
关于如何用 Docker 部署 Redis 可以参考这里。
在配置为文件中添加两个 Redis 服务的配置信息:
my-config:redis1:host: 192.168.0.88port: 6380redis2:host: 192.168.0.88port: 6381
创建配置类读取该信息:
@Configuration
@ConfigurationProperties(prefix = "my-config")
@Data
public class MyConfigProperties {@Datapublic static class RedisConfig{private String host;private String port;}private RedisConfig redis1;private RedisConfig redis2;
}
创建对应的 Redisson 客户端:
@Configuration
public class RedisConfig {// ...@AutowiredMyConfigProperties myConfig;// ...@Beanpublic RedissonClient redissonClient2(){Config config = new Config();String address = String.format("redis://%s:%s",myConfig.getRedis1().getHost(),myConfig.getRedis1().getPort());config.useSingleServer().setAddress(address).setPassword(redisProperties.getPassword());return Redisson.create(config);}@Beanpublic RedissonClient redissonClient3(){Config config = new Config();String address = String.format("redis://%s:%s",myConfig.getRedis2().getHost(),myConfig.getRedis2().getPort());config.useSingleServer().setAddress(address).setPassword(redisProperties.getPassword());return Redisson.create(config);}
}
修改测试用例,使用联锁:
// ...
@Log4j2
@SpringBootTest
public class RedisLockTests {// ...@Resourceprivate RedissonClient redissonClient;@Resourceprivate RedissonClient redissonClient2;@Resourceprivate RedissonClient redissonClient3;/*** 测试 Redis 锁是否可以再入*/@Testpublic void test() throws InterruptedException {// 创建 Redisson 联锁String lockName = "lock:test";RLock lock1 = redissonClient.getLock(lockName);RLock lock2 = redissonClient2.getLock(lockName);RLock lock3 = redissonClient3.getLock(lockName);RLock lock = redissonClient.getMultiLock(lock1, lock2, lock3);boolean tryLock = lock.tryLock(10, TimeUnit.SECONDS);// ...}private void test2(RLock lock) throws InterruptedException {// ...}
}
和单个锁类似,联锁同样可以指定等待时间以进行重试。
关于 Redisson 联锁的源码分析可以看这里。
本文的所有示例代码可以从这里获取。
The End.
参考资料
- 黑马程序员Redis入门到实战教程