【Java微服务组件】分布式协调P4-一文打通Redisson:从API实战到分布式锁核心源码剖析
欢迎来到啾啾的博客🐱。
记录学习点滴。分享工作思考和实用技巧,偶尔也分享一些杂谈💬。
有很多很多不足的地方,欢迎评论交流,感谢您的阅读和评论😄。
目录
- 引言
- Redisson基本信息
- Redisson网站
- Redisson应用
- 常见用法
- 与Redis数据结构应用的差别
- Redisson使用
- 依赖
- 配置
- demo
- 注意事项
- Redisson源码解析
- Redisson设计概要
- 解决身份验证与重入性
- 锁续期(Watchdog机制)
- 总结
引言
在上一篇中,我们已经了解到Redis作为分布式协调存在的种种问题,如原子性、锁超时、归属权、锁续期、可重入性等等。
如果这些都需要自己在使用时补足,自己实现一个健壮Redis客户端,那不能说是宛如噩梦,只能说是没有必要。
Redisson基本信息
简单来说,Redisson 是一个在 Redis 基础上实现的、功能强大且易于使用的 Java 驻内存数据网格(In-Memory Data Grid)。它把 Redis 这个远程的、基于网络的数据库,“伪装”成了就像在你本地 JVM 内存里一样的 Java 对象(如 Map, List, Lock 等),让你用起来毫无违和感。
Redisson 让你可以像使用本地的 Java 集合和锁一样,来使用分布式的 Redis 数据结构和锁,而无需关心底层复杂的实现细节。
Redisson网站
-
github
https://github.com/redisson/redisson -
官网
https://redisson.pro/
https://redisson.pro/docs/
Redisson应用
Redisson价值在于“封装”。
常见用法
- 分布式锁(RLock):这是 Redisson 最著名的功能,提供了比自己实现更安全、更强大的分布式锁。
- 分布式集合(RMap, RList, RSet, RQueue):像使用 java.util.Map 一样使用 Redis 的 Hash 结构,非常直观。
- 分布式对象(RBucket):用于存储单个对象(序列化后),可以看作是对 String 类型的封装。
- 限流器(RRateLimiter):基于令牌桶算法,轻松实现分布式环境下的接口限流。
- 信号量(RSemaphore):控制并发访问特定资源的线程数量。
- 发布/订阅(RTopic):实现消息的发布和订阅功能。
- 缓存:配合 RMapCache 或 RBucket,可以实现带过期策略的缓存。
与Redis数据结构应用的差别
Redisson 并没有在 Redis 中创造新的数据结构。它的所有上层功能,都是通过巧妙地组合和封装 Redis 已有的数据结构(String, Hash, List, Set, Sorted Set)以及 Lua 脚本来实现的。
Redisson 接口 | 底层 Redis 数据结构 | 与原生 Redis API 的对比 |
---|---|---|
RBucket | String | 原生: SET key value, GET key。 Redisson: bucket.set(myObject), bucket.get()。Redisson 帮你处理了对象的序列化和反序列化。 |
RMap<K, V> | Hash | 原生: HSET map_key field value, HGET map_key field。 Redisson: map.put(key, value), map.get(key)。它实现了 java.util.Map 接口,用法和 HashMap 完全一致。 |
RList | List | 原生: LPUSH list_key value, RPOP list_key。 Redisson: list.add(value), list.get(index)。它实现了 java.util.List 接口,支持按索引访问等高级操作。 |
RSet | Set | 原生: SADD set_key member, SMEMBERS set_key。 Redisson: set.add(value), set.contains(value)。实现了 java.util.Set 接口。 |
RLock | Hash & String & PubSub | 原生: 需要自己组合 SETNX, EXPIRE, Lua脚本, Pub/Sub 等,非常复杂。 Redisson: lock.lock(), lock.unlock()。一行代码搞定,内部实现了可重入、锁续期等复杂逻辑。 |
Redisson使用
依赖
Redisson 的 Spring Boot Starter,是为 Spring Boot 应用定制的依赖,集成了 Redisson 核心库并提供 Spring Boot 的自动配置支持。
包含 org.redisson:redisson(核心 Redisson 库)作为依赖。
- 提供 Spring Boot 的自动配置(如 RedissonAutoConfiguration),通过 application.properties 或 application.yml 配置 Redisson 客户端。
- 支持 Spring 生态的集成,例如 RedisConnectionFactory、Spring Cache、Spring Data Redis 等。
<dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.49.0</version>
</dependency>
配置
在application.yml中配置Redis连接如下:
spring:redisson:address: redis://127.0.0.1:6379password: mypassworddatabase: 0
配置完成后会自动创建RedissonClient实例,注入到Spring容器中。
或者代码config
/*** 获取 RedissonClient 实例* 使用双重检查锁定(DCL)确保线程安全和高性能* @return RedissonClient*/public static RedissonClient getClient() {if (redissonClient == null) {synchronized (RedissonUtil.class) {if (redissonClient == null) {// 1. 创建配置Config config = new Config();// 使用单机模式,连接到本地的 Redis// 格式: redis://127.0.0.1:6379config.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword("123456") // 如果有密码.setDatabase(0);// 其他模式,如集群模式:// config.useClusterServers().addNodeAddress("redis://127.0.0.1:7001", "redis://127.0.0.1:7002");// 2. 创建 RedissonClient 实例redissonClient = Redisson.create(config);}}}return redissonClient;}
demo
一般注入RedissonClient使用,如下:
@Autowired
private RedissonClient redissonClient;
这里简单写一下单例与Demo
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;/*** 在生产环境中,通常建议将 RedissonClient 的创建和管理封装成一个单例的工具类,* 避免在各处重复创建昂贵的客户端实例。*/
public class RedissonUtil {private static volatile RedissonClient redissonClient;// 私有构造函数,防止外部实例化private RedissonUtil() {}/*** 获取 RedissonClient 实例* 使用双重检查锁定(DCL)确保线程安全和高性能* @return RedissonClient*/public static RedissonClient getClient() {if (redissonClient == null) {synchronized (RedissonUtil.class) {if (redissonClient == null) {// 1. 创建配置Config config = new Config();// 使用单机模式,连接到本地的 Redis// 格式: redis://127.0.0.1:6379config.useSingleServer().setAddress("redis://127.0.0.1:6379").setDatabase(0);// 其他模式,如集群模式:// config.useClusterServers().addNodeAddress("redis://127.0.0.1:7001", "redis://127.0.0.1:7002");// 2. 创建 RedissonClient 实例redissonClient = Redisson.create(config);}}}return redissonClient;}/*** 关闭客户端*/public static void shutdown() {if (redissonClient != null && !redissonClient.isShutdown()) {redissonClient.shutdown();}}
}
使用demo
import org.redisson.api.RBucket;
import org.redisson.api.RLock;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;import java.util.concurrent.TimeUnit;public class RedissonDemo {public static void main(String[] args) throws InterruptedException {// 从工具类获取客户端RedissonClient redisson = RedissonUtil.getClient();// 1. RBucket 示例 (操作 String)System.out.println("------ RBucket Demo ------");RBucket<String> userBucket = redisson.getBucket("user:1001");userBucket.set("{\"name\":\"Alice\", \"age\":30}");System.out.println("User from Redis: " + userBucket.get());userBucket.delete(); // 清理// 2. RMap 示例 (操作 Hash)System.out.println("\n------ RMap Demo ------");RMap<String, String> configMap = redisson.getMap("app:config");configMap.put("version", "1.2.0");configMap.put("env", "production");System.out.println("Config version: " + configMap.get("version"));configMap.clear(); // 清理// 3. RLock 示例 (分布式锁)System.out.println("\n------ RLock Demo ------");RLock myLock = redisson.getLock("my-distributed-lock");try {// 尝试加锁,最多等待10秒,上锁以后15秒自动解锁boolean isLocked = myLock.tryLock(10, 15, TimeUnit.SECONDS);if (isLocked) {System.out.println(Thread.currentThread().getName() + ": Lock acquired!");// 执行业务逻辑System.out.println("Doing critical work...");Thread.sleep(5000); // 模拟业务耗时} else {System.out.println(Thread.currentThread().getName() + ": Failed to acquire lock.");}} finally {// 确保锁被释放if (myLock.isHeldByCurrentThread()) {myLock.unlock();System.out.println(Thread.currentThread().getName() + ": Lock released!");}}// 程序结束时,关闭 Redisson 客户端RedissonUtil.shutdown();}
}
注意事项
-
如果你不确定,就永远不要设置 leaseTime
在绝大多数生产场景下,我们都应该使用不带 leaseTime 参数的 lock() 或 tryLock() 方法,从而启用看门狗的自动续期功能。
这里带leaseTime参数只是为了演示。生产中只有一些特殊场景如“临时令牌”才需要。 -
谨慎创建RedissonClient实例
RedissonClient 是一个重量级对象,它维护着与 Redis 服务器的连接池。如果每次使用时都 Redisson.create(config),会频繁地创建和销毁连接,造成巨大的性能开销和资源浪费。 -
序列化
Redisson 默认使用 MarshallingCodec,这个新能较差,跨语言不兼容、可读性差。
建议更换为 JsonJacksonCodec 或其他基于 JSON 的 Codec。
Config config = new Config();
// ... 其他配置 ...
config.setCodec(new org.redisson.codec.JsonJacksonCodec()); // 推荐!
RedissonClient redisson = Redisson.create(config);
- 分布式共识考量
Redisson 提供了 RedissonRedLock 的实现。可以让集群在大多数实例上都成功获取锁时,才算真正加锁成功。
import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.RedissonRedLock;public class RedLockDemo {public static void main(String[] args) throws InterruptedException {// --- 准备工作:创建多个 RedissonClient 和 RLock ---// 实例 1 的配置和客户端Config config1 = new Config();config1.useSingleServer().setAddress("redis://192.168.1.1:6379");RedissonClient client1 = Redisson.create(config1);// 实例 2 的配置和客户端Config config2 = new Config();config2.useSingleServer().setAddress("redis://192.168.1.2:6379");RedissonClient client2 = Redisson.create(config2);// 实例 3 的配置和客户端Config config3 = new Config();config3.useSingleServer().setAddress("redis://192.168.1.3:6379");RedissonClient client3 = Redisson.create(config3);// 为同一个业务锁 "my-red-lock" 在每个实例上都创建一个 RLock 对象RLock lock1 = client1.getLock("my-red-lock");RLock lock2 = client2.getLock("my-red-lock");RLock lock3 = client3.getLock("my-red-lock");// --- 核心步骤:创建和使用 RedissonRedLock ---// 将多个 RLock 对象聚合到 RedissonRedLock 中RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);try {// RedLock 的 tryLock 方法参数// waitTime: 尝试获取锁的总等待时间// leaseTime: 锁的持有时间(租约时间)// 注意:RedLock 不支持看门狗自动续期!必须手动指定 leaseTime。boolean isLocked = redLock.tryLock(10, 30, TimeUnit.SECONDS);if (isLocked) {System.out.println(Thread.currentThread().getName() + ": RedLock acquired successfully!");// 执行关键业务逻辑// 这个业务的执行时间理论上应该小于 leaseTime (30秒)System.out.println("Doing critical work...");Thread.sleep(5000); } else {System.out.println(Thread.currentThread().getName() + ": Failed to acquire RedLock.");}} finally {// 释放锁。它会尝试解锁所有实例上的锁。redLock.unlock();System.out.println(Thread.currentThread().getName() + ": RedLock released.");}// --- 清理资源 ---client1.shutdown();client2.shutdown();client3.shutdown();}
}
在生产中使用 Redisson 前,可以对照这个清单检查一下:
- 锁粒度:是否足够细,避免了全局锁?
- leaseTime:是否让看门狗自动管理,避免手动设置?
- tryLock 等待时间:是否设置了合理的 waitTime?
- finally 块:是否保证了 unlock() 的执行?
- RedissonClient 管理:是否是单例模式?
- 序列化器:是否换成了 JsonJacksonCodec 或类似的?
- 部署模式风险:是否已评估主从/集群模式下的锁风险,并选择了合适的锁类型(RLock vs RedissonRedLock)?
Redisson源码解析
Redisson设计概要
-
原子性保证 :所有加锁、解锁、续期操作,内部都通过 Lua 脚本 实现,确保了在 Redis 服务端执行的原子性。你再也不用担心命令执行到一半客户端崩溃的问题。
-
锁的归属权与安全性:它在加锁时会自动生成一个唯一的 ID(通常是 UUID:threadId),解锁时会通过 Lua 脚本验证这个 ID,确保只有加锁的那个线程才能解锁。这彻底解决了“误删他人锁”的问题。
-
自动续期(看门狗机制 - Watchdog):这是 Redisson 的王牌功能。当你获取一个锁时,如果你没有指定租约时间(Lease Time),Redisson 会默认设置一个30秒的过期时间,并启动一个后台“看门狗”线程。这个看门狗会每隔10秒(默认是锁过期时间的1/3)检查一下,如果持有锁的线程还在运行,它就会自动把锁的过期时间重置回30秒。
- 效果:只要你的业务线程还在正常工作,你的锁就永远不会过期。
- 宕机处理:如果你的业务线程所在的服务器宕机了,看门狗线程自然也停止了。那么在最长30秒后,这个锁会自动被 Redis 释放,从而避免了死锁。
-
可重入性 (Reentrant Lock):Redisson 实现的 RLock 接口,就像 Java 的 java.util.concurrent.locks.ReentrantLock 一样,是可重入的。一个线程可以多次获取同一个锁而不会被自己阻塞。Redisson 通过 Redis 的 Hash 结构来记录锁的持有者和重入次数。
-
公平/非公平锁:它同时支持公平锁(先到先得)和非公平锁(抢占式),可以根据业务场景选择。
-
易用性:它提供了与 java.util.concurrent.locks.Lock 完全一致的接口。你只需要写下面这样的代码,感觉就像在写多线程程序,完全察觉不到背后复杂的网络通信和分布式协调。
解决身份验证与重入性
通过lua脚本与唯一ID的形式。
当调用lock.lock()时,最终会执行到RedissonLock 类中的 tryLockInnerAsync 方法。
能看到唯一标识直接使用的threadId,源码解析如下:
// RedissonLock.java (简化后)
private <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {// ...// 这个 LUA 脚本是核心return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,// 脚本内容"if (redis.call('exists', KEYS[1]) == 0) then " + // 1. 如果锁(Hash)不存在"redis.call('hincrby', KEYS[1], ARGV[2], 1); " + // 2. 创建 Hash, 将 field(线程ID) 的 value(重入次数)设为1"redis.call('pexpire', KEYS[1], ARGV[1]); " + // 3. 设置过期时间"return nil; " + // 4. 返回 nil,表示加锁成功"end; " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + // 5. 如果锁存在,且 field(线程ID)也存在(说明是当前线程持有)"redis.call('hincrby', KEYS[1], ARGV[2], 1); " + // 6. 重入次数 +1"redis.call('pexpire', KEYS[1], ARGV[1]); " + // 7. 刷新过期时间"return nil; " + // 8. 返回 nil,表示重入成功"end; " +"return redis.call('pttl', KEYS[1]);", // 9. 如果锁被其他线程持有,返回剩余的过期时间,用于计算等待时间// 脚本参数Collections.singletonList(getRawName()), // KEYS[1]: 锁名,如 "myLock"unit.toMillis(leaseTime), // ARGV[1]: 锁的租约时间(过期时间)getLockName(threadId) // ARGV[2]: 唯一的线程ID (UUID:threadId));
}
解锁源码unlock执行到unlockInnerAsync
// RedissonLock.java (简化后)
protected RFuture<Boolean> unlockInnerAsync(long threadId) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,// 脚本内容"if (redis.call('hexists', KEYS[1], ARGV[2]) == 0) then " + // 1. 如果锁或 field 不存在,说明锁已被释放或不属于我"return nil;" + // 2. 返回 nil (可能被其他等待线程处理)"end; " +"local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1); " + // 3. 将重入计数器 -1"if (counter > 0) then " + // 4. 如果计数器还大于0 (说明是重入锁的内层解锁)"redis.call('pexpire', KEYS[1], ARGV[1]); " + // 5. 刷新过期时间,防止外层锁提前过期"return 0; " + // 6. 返回 0 (false),表示锁还未完全释放"else " + // 7. 如果计数器等于0 (说明是最后一层解锁)"redis.call('del', KEYS[1]); " + // 8. 删除整个 Hash,彻底释放锁"redis.call('publish', KEYS[2], ARGV[3]); " + // 9. 发布一个 "锁已释放" 的消息,唤醒其他等待线程"return 1; " + // 10. 返回 1 (true),表示锁已成功释放"end; " +"return nil;",// 脚本参数Arrays.asList(getRawName(), getChannelName()), // KEYS[1]: 锁名, KEYS[2]: 用于发布/订阅的 channel 名internalLockLeaseTime, // ARGV[1]: 锁的内部租约时间 (用于续期)getLockName(threadId), // ARGV[2]: 唯一的线程ID,用于身份验证UNLOCK_MESSAGE // ARGV[3]: 解锁时发布的消息内容);
}
锁续期(Watchdog机制)
Watchdog 机制主要在 RedissonBaseLock.java 中实现。当你调用不带 leaseTime 参数的 lock() 方法时,Redisson 会启用看门狗。
tryAcquireAsync 方法中,如果传入的 leaseTime 为 -1 (默认 lock() 方法的标志),在成功获取锁后,会调用 scheduleExpirationRenewal(threadId) 方法。
scheduleExpirationRenewal是Watchdog的核心,使用Netty的HashedWheelTimer创建一个定时任务,在默认租约时间(30秒)的三分之一(即10秒)后执行。
// RedissonBaseLock.java (简化后)
protected void scheduleExpirationRenewal(long threadId) {ExpirationEntry entry = new ExpirationEntry();// EXPIRATION_RENEWAL_MAP 是一个 ConcurrentHashMap,存储每个线程的续期任务ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);if (oldEntry != null) {// ... 已经有续期任务了,不用重复创建} else {entry.acquire(); // 标记为活动renewExpiration(); // 立即启动第一次调度}
}private void renewExpiration() {// ...// 创建一个定时任务,在 internalLockLeaseTime / 3 之后执行newTimeout = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {// ...// 定时任务执行时,会再次调用一个 LUA 脚本去续期RFuture<Boolean> future = renewExpirationAsync(threadId);future.whenComplete((res, e) -> {if (res) {// 如果续期成功 (res=true),则递归调用,安排下一次续期renewExpiration();}});}}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);// ...
}
续期 Lua 脚本 (renewExpirationAsync): 定时任务执行的脚本很简单,就是检查锁是否还被当前线程持有,如果是,就重置过期时间。
-- 续期脚本
-- KEYS[1]: 锁名
-- ARGV[1]: 新的过期时间 (30秒)
-- ARGV[2]: 线程唯一ID
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) thenredis.call('pexpire', KEYS[1], ARGV[1]);return 1;
end;
return 0;
如果续期成功,脚本返回 1,Java 端的 renewExpiration() 方法就会再次调度自己,形成一个循环,直到锁被释放。
在unlockInnerAsync方法中,当锁被最终释放时(计数器为0),会调用 cancelExpirationRenewal(threadId) 方法,该方法会从 EXPIRATION_RENEWAL_MAP 中移除任务并取消定时器,停止续期。
总结
问题 | Redisson 解决方案 | 核心源码/技术 |
---|---|---|
身份验证 | 使用 UUID:ThreadId 作为 Hash 的 Field,解锁时先验证 Field 是否匹配。 | RedissonLock.unlockInnerAsync 中的 Lua 脚本 和 hexists 命令。 |
可重入性 | 使用 Hash 的 Value 作为重入计数器,每次加锁 hincrby 1,解锁 hincrby -1。 | RedissonLock.tryLockInnerAsync 和 unlockInnerAsync 中的 Lua 脚本 和 hincrby 命令。 |
锁续期 | Watchdog 机制:在成功加锁后,启动一个后台定时任务,在锁过期前自动刷新其过期时间。 | RedissonBaseLock.scheduleExpirationRenewal、Netty HashedWheelTimer 和一个简单的续期 Lua 脚本。 |