当前位置: 首页 > news >正文

【Java微服务组件】分布式协调P4-一文打通Redisson:从API实战到分布式锁核心源码剖析

欢迎来到啾啾的博客🐱。
记录学习点滴。分享工作思考和实用技巧,偶尔也分享一些杂谈💬。
有很多很多不足的地方,欢迎评论交流,感谢您的阅读和评论😄。

目录

  • 引言
  • Redisson基本信息
    • Redisson网站
  • Redisson应用
    • 常见用法
    • 与Redis数据结构应用的差别
  • Redisson使用
    • 依赖
    • 配置
    • demo
    • 注意事项
  • Redisson源码解析
    • Redisson设计概要
    • 解决身份验证与重入性
    • 锁续期(Watchdog机制)
    • 总结

引言

在上一篇中,我们已经了解到Redis作为分布式协调存在的种种问题,如原子性、锁超时、归属权、锁续期、可重入性等等。
如果这些都需要自己在使用时补足,自己实现一个健壮Redis客户端,那不能说是宛如噩梦,只能说是没有必要。

Redisson基本信息

简单来说,Redisson 是一个在 Redis 基础上实现的、功能强大且易于使用的 Java 驻内存数据网格(In-Memory Data Grid)。它把 Redis 这个远程的、基于网络的数据库,“伪装”成了就像在你本地 JVM 内存里一样的 Java 对象(如 Map, List, Lock 等),让你用起来毫无违和感。

Redisson 让你可以像使用本地的 Java 集合和锁一样,来使用分布式的 Redis 数据结构和锁,而无需关心底层复杂的实现细节。

Redisson网站

  • github
    https://github.com/redisson/redisson

  • 官网
    https://redisson.pro/
    https://redisson.pro/docs/

Redisson应用

Redisson价值在于“封装”。

常见用法

  • 分布式锁(RLock):这是 Redisson 最著名的功能,提供了比自己实现更安全、更强大的分布式锁。
  • 分布式集合(RMap, RList, RSet, RQueue):像使用 java.util.Map 一样使用 Redis 的 Hash 结构,非常直观。
  • 分布式对象(RBucket):用于存储单个对象(序列化后),可以看作是对 String 类型的封装。
  • 限流器(RRateLimiter):基于令牌桶算法,轻松实现分布式环境下的接口限流。
  • 信号量(RSemaphore):控制并发访问特定资源的线程数量。
  • 发布/订阅(RTopic):实现消息的发布和订阅功能。
  • 缓存:配合 RMapCache 或 RBucket,可以实现带过期策略的缓存。

与Redis数据结构应用的差别

Redisson 并没有在 Redis 中创造新的数据结构。它的所有上层功能,都是通过巧妙地组合和封装 Redis 已有的数据结构(String, Hash, List, Set, Sorted Set)以及 Lua 脚本来实现的。

Redisson 接口底层 Redis 数据结构与原生 Redis API 的对比
RBucketString原生: SET key value, GET key。
Redisson: bucket.set(myObject), bucket.get()。Redisson 帮你处理了对象的序列化和反序列化。
RMap<K, V>Hash原生: HSET map_key field value, HGET map_key field。
Redisson: map.put(key, value), map.get(key)。它实现了 java.util.Map 接口,用法和 HashMap 完全一致。
RListList原生: LPUSH list_key value, RPOP list_key。
Redisson: list.add(value), list.get(index)。它实现了 java.util.List 接口,支持按索引访问等高级操作。
RSetSet原生: SADD set_key member, SMEMBERS set_key。
Redisson: set.add(value), set.contains(value)。实现了 java.util.Set 接口。
RLockHash & String & PubSub原生: 需要自己组合 SETNX, EXPIRE, Lua脚本, Pub/Sub 等,非常复杂。
Redisson: lock.lock(), lock.unlock()。一行代码搞定,内部实现了可重入、锁续期等复杂逻辑。

Redisson使用

依赖

Redisson 的 Spring Boot Starter,是为 Spring Boot 应用定制的依赖,集成了 Redisson 核心库并提供 Spring Boot 的自动配置支持。
包含 org.redisson:redisson(核心 Redisson 库)作为依赖。

  • 提供 Spring Boot 的自动配置(如 RedissonAutoConfiguration),通过 application.properties 或 application.yml 配置 Redisson 客户端。
  • 支持 Spring 生态的集成,例如 RedisConnectionFactory、Spring Cache、Spring Data Redis 等。
<dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.49.0</version>
</dependency>

配置

在application.yml中配置Redis连接如下:

spring:redisson:address: redis://127.0.0.1:6379password: mypassworddatabase: 0

配置完成后会自动创建RedissonClient实例,注入到Spring容器中。

或者代码config

    /*** 获取 RedissonClient 实例* 使用双重检查锁定(DCL)确保线程安全和高性能* @return RedissonClient*/public static RedissonClient getClient() {if (redissonClient == null) {synchronized (RedissonUtil.class) {if (redissonClient == null) {// 1. 创建配置Config config = new Config();// 使用单机模式,连接到本地的 Redis// 格式: redis://127.0.0.1:6379config.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword("123456") // 如果有密码.setDatabase(0);// 其他模式,如集群模式:// config.useClusterServers().addNodeAddress("redis://127.0.0.1:7001", "redis://127.0.0.1:7002");// 2. 创建 RedissonClient 实例redissonClient = Redisson.create(config);}}}return redissonClient;}

demo

一般注入RedissonClient使用,如下:

@Autowired
private RedissonClient redissonClient;

这里简单写一下单例与Demo

import org.redisson.Redisson;  
import org.redisson.api.RedissonClient;  
import org.redisson.config.Config;/*** 在生产环境中,通常建议将 RedissonClient 的创建和管理封装成一个单例的工具类,* 避免在各处重复创建昂贵的客户端实例。*/
public class RedissonUtil {private static volatile RedissonClient redissonClient;// 私有构造函数,防止外部实例化private RedissonUtil() {}/*** 获取 RedissonClient 实例* 使用双重检查锁定(DCL)确保线程安全和高性能* @return RedissonClient*/public static RedissonClient getClient() {if (redissonClient == null) {synchronized (RedissonUtil.class) {if (redissonClient == null) {// 1. 创建配置Config config = new Config();// 使用单机模式,连接到本地的 Redis// 格式: redis://127.0.0.1:6379config.useSingleServer().setAddress("redis://127.0.0.1:6379").setDatabase(0);// 其他模式,如集群模式:// config.useClusterServers().addNodeAddress("redis://127.0.0.1:7001", "redis://127.0.0.1:7002");// 2. 创建 RedissonClient 实例redissonClient = Redisson.create(config);}}}return redissonClient;}/*** 关闭客户端*/public static void shutdown() {if (redissonClient != null && !redissonClient.isShutdown()) {redissonClient.shutdown();}}
}

使用demo


import org.redisson.api.RBucket;
import org.redisson.api.RLock;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;import java.util.concurrent.TimeUnit;public class RedissonDemo {public static void main(String[] args) throws InterruptedException {// 从工具类获取客户端RedissonClient redisson = RedissonUtil.getClient();// 1. RBucket 示例 (操作 String)System.out.println("------ RBucket Demo ------");RBucket<String> userBucket = redisson.getBucket("user:1001");userBucket.set("{\"name\":\"Alice\", \"age\":30}");System.out.println("User from Redis: " + userBucket.get());userBucket.delete(); // 清理// 2. RMap 示例 (操作 Hash)System.out.println("\n------ RMap Demo ------");RMap<String, String> configMap = redisson.getMap("app:config");configMap.put("version", "1.2.0");configMap.put("env", "production");System.out.println("Config version: " + configMap.get("version"));configMap.clear(); // 清理// 3. RLock 示例 (分布式锁)System.out.println("\n------ RLock Demo ------");RLock myLock = redisson.getLock("my-distributed-lock");try {// 尝试加锁,最多等待10秒,上锁以后15秒自动解锁boolean isLocked = myLock.tryLock(10, 15, TimeUnit.SECONDS);if (isLocked) {System.out.println(Thread.currentThread().getName() + ": Lock acquired!");// 执行业务逻辑System.out.println("Doing critical work...");Thread.sleep(5000); // 模拟业务耗时} else {System.out.println(Thread.currentThread().getName() + ": Failed to acquire lock.");}} finally {// 确保锁被释放if (myLock.isHeldByCurrentThread()) {myLock.unlock();System.out.println(Thread.currentThread().getName() + ": Lock released!");}}// 程序结束时,关闭 Redisson 客户端RedissonUtil.shutdown();}
}

注意事项

  • 如果你不确定,就永远不要设置 leaseTime
    在绝大多数生产场景下,我们都应该使用不带 leaseTime 参数的 lock() 或 tryLock() 方法,从而启用看门狗的自动续期功能。
    这里带leaseTime参数只是为了演示。生产中只有一些特殊场景如“临时令牌”才需要。

  • 谨慎创建RedissonClient实例
    RedissonClient 是一个重量级对象,它维护着与 Redis 服务器的连接池。如果每次使用时都 Redisson.create(config),会频繁地创建和销毁连接,造成巨大的性能开销和资源浪费。

  • 序列化
    Redisson 默认使用 MarshallingCodec,这个新能较差,跨语言不兼容、可读性差。
    建议更换为 JsonJacksonCodec 或其他基于 JSON 的 Codec。

Config config = new Config();
// ... 其他配置 ...
config.setCodec(new org.redisson.codec.JsonJacksonCodec()); // 推荐!
RedissonClient redisson = Redisson.create(config);
  • 分布式共识考量
    Redisson 提供了 RedissonRedLock 的实现。可以让集群在大多数实例上都成功获取锁时,才算真正加锁成功。
import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.RedissonRedLock;public class RedLockDemo {public static void main(String[] args) throws InterruptedException {// --- 准备工作:创建多个 RedissonClient 和 RLock ---// 实例 1 的配置和客户端Config config1 = new Config();config1.useSingleServer().setAddress("redis://192.168.1.1:6379");RedissonClient client1 = Redisson.create(config1);// 实例 2 的配置和客户端Config config2 = new Config();config2.useSingleServer().setAddress("redis://192.168.1.2:6379");RedissonClient client2 = Redisson.create(config2);// 实例 3 的配置和客户端Config config3 = new Config();config3.useSingleServer().setAddress("redis://192.168.1.3:6379");RedissonClient client3 = Redisson.create(config3);// 为同一个业务锁 "my-red-lock" 在每个实例上都创建一个 RLock 对象RLock lock1 = client1.getLock("my-red-lock");RLock lock2 = client2.getLock("my-red-lock");RLock lock3 = client3.getLock("my-red-lock");// --- 核心步骤:创建和使用 RedissonRedLock ---// 将多个 RLock 对象聚合到 RedissonRedLock 中RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);try {// RedLock 的 tryLock 方法参数// waitTime: 尝试获取锁的总等待时间// leaseTime: 锁的持有时间(租约时间)// 注意:RedLock 不支持看门狗自动续期!必须手动指定 leaseTime。boolean isLocked = redLock.tryLock(10, 30, TimeUnit.SECONDS);if (isLocked) {System.out.println(Thread.currentThread().getName() + ": RedLock acquired successfully!");// 执行关键业务逻辑// 这个业务的执行时间理论上应该小于 leaseTime (30秒)System.out.println("Doing critical work...");Thread.sleep(5000); } else {System.out.println(Thread.currentThread().getName() + ": Failed to acquire RedLock.");}} finally {// 释放锁。它会尝试解锁所有实例上的锁。redLock.unlock();System.out.println(Thread.currentThread().getName() + ": RedLock released.");}// --- 清理资源 ---client1.shutdown();client2.shutdown();client3.shutdown();}
}

在生产中使用 Redisson 前,可以对照这个清单检查一下:

  1. 锁粒度:是否足够细,避免了全局锁?
  2. leaseTime:是否让看门狗自动管理,避免手动设置?
  3. tryLock 等待时间:是否设置了合理的 waitTime?
  4. finally 块:是否保证了 unlock() 的执行?
  5. RedissonClient 管理:是否是单例模式?
  6. 序列化器:是否换成了 JsonJacksonCodec 或类似的?
  7. 部署模式风险:是否已评估主从/集群模式下的锁风险,并选择了合适的锁类型(RLock vs RedissonRedLock)?

Redisson源码解析

Redisson设计概要

  1. 原子性保证 :所有加锁、解锁、续期操作,内部都通过 Lua 脚本 实现,确保了在 Redis 服务端执行的原子性。你再也不用担心命令执行到一半客户端崩溃的问题。

  2. 锁的归属权与安全性:它在加锁时会自动生成一个唯一的 ID(通常是 UUID:threadId),解锁时会通过 Lua 脚本验证这个 ID,确保只有加锁的那个线程才能解锁。这彻底解决了“误删他人锁”的问题。

  3. 自动续期(看门狗机制 - Watchdog):这是 Redisson 的王牌功能。当你获取一个锁时,如果你没有指定租约时间(Lease Time),Redisson 会默认设置一个30秒的过期时间,并启动一个后台“看门狗”线程。这个看门狗会每隔10秒(默认是锁过期时间的1/3)检查一下,如果持有锁的线程还在运行,它就会自动把锁的过期时间重置回30秒。

    • 效果:只要你的业务线程还在正常工作,你的锁就永远不会过期。
    • 宕机处理:如果你的业务线程所在的服务器宕机了,看门狗线程自然也停止了。那么在最长30秒后,这个锁会自动被 Redis 释放,从而避免了死锁。
  4. 可重入性 (Reentrant Lock):Redisson 实现的 RLock 接口,就像 Java 的 java.util.concurrent.locks.ReentrantLock 一样,是可重入的。一个线程可以多次获取同一个锁而不会被自己阻塞。Redisson 通过 Redis 的 Hash 结构来记录锁的持有者和重入次数。

  5. 公平/非公平锁:它同时支持公平锁(先到先得)和非公平锁(抢占式),可以根据业务场景选择。

  6. 易用性:它提供了与 java.util.concurrent.locks.Lock 完全一致的接口。你只需要写下面这样的代码,感觉就像在写多线程程序,完全察觉不到背后复杂的网络通信和分布式协调。

解决身份验证与重入性

通过lua脚本与唯一ID的形式。
当调用lock.lock()时,最终会执行到RedissonLock 类中的 tryLockInnerAsync 方法。

![[分布式协调P4-Redisson解析.png]]

能看到唯一标识直接使用的threadId,源码解析如下:

// RedissonLock.java (简化后)
private <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {// ...// 这个 LUA 脚本是核心return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,// 脚本内容"if (redis.call('exists', KEYS[1]) == 0) then " +         // 1. 如果锁(Hash)不存在"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +     // 2. 创建 Hash, 将 field(线程ID) 的 value(重入次数)设为1"redis.call('pexpire', KEYS[1], ARGV[1]); " +        // 3. 设置过期时间"return nil; " +                                     // 4. 返回 nil,表示加锁成功"end; " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + // 5. 如果锁存在,且 field(线程ID)也存在(说明是当前线程持有)"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +     // 6. 重入次数 +1"redis.call('pexpire', KEYS[1], ARGV[1]); " +        // 7. 刷新过期时间"return nil; " +                                     // 8. 返回 nil,表示重入成功"end; " +"return redis.call('pttl', KEYS[1]);",                    // 9. 如果锁被其他线程持有,返回剩余的过期时间,用于计算等待时间// 脚本参数Collections.singletonList(getRawName()), // KEYS[1]: 锁名,如 "myLock"unit.toMillis(leaseTime),                // ARGV[1]: 锁的租约时间(过期时间)getLockName(threadId)                    // ARGV[2]: 唯一的线程ID (UUID:threadId));
}

解锁源码unlock执行到unlockInnerAsync

// RedissonLock.java (简化后)
protected RFuture<Boolean> unlockInnerAsync(long threadId) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,// 脚本内容"if (redis.call('hexists', KEYS[1], ARGV[2]) == 0) then " +         // 1. 如果锁或 field 不存在,说明锁已被释放或不属于我"return nil;" +                                                // 2. 返回 nil (可能被其他等待线程处理)"end; " +"local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1); " +   // 3. 将重入计数器 -1"if (counter > 0) then " +                                         // 4. 如果计数器还大于0 (说明是重入锁的内层解锁)"redis.call('pexpire', KEYS[1], ARGV[1]); " +                  // 5. 刷新过期时间,防止外层锁提前过期"return 0; " +                                                 // 6. 返回 0 (false),表示锁还未完全释放"else " +                                                          // 7. 如果计数器等于0 (说明是最后一层解锁)"redis.call('del', KEYS[1]); " +                               // 8. 删除整个 Hash,彻底释放锁"redis.call('publish', KEYS[2], ARGV[3]); " +                  // 9. 发布一个 "锁已释放" 的消息,唤醒其他等待线程"return 1; " +                                                 // 10. 返回 1 (true),表示锁已成功释放"end; " +"return nil;",// 脚本参数Arrays.asList(getRawName(), getChannelName()), // KEYS[1]: 锁名, KEYS[2]: 用于发布/订阅的 channel 名internalLockLeaseTime,                         // ARGV[1]: 锁的内部租约时间 (用于续期)getLockName(threadId),                         // ARGV[2]: 唯一的线程ID,用于身份验证UNLOCK_MESSAGE                                 // ARGV[3]: 解锁时发布的消息内容);
}

锁续期(Watchdog机制)

Watchdog 机制主要在 RedissonBaseLock.java 中实现。当你调用不带 leaseTime 参数的 lock() 方法时,Redisson 会启用看门狗。
![[分布式协调P4-Redisson解析-1.png]]

tryAcquireAsync 方法中,如果传入的 leaseTime 为 -1 (默认 lock() 方法的标志),在成功获取锁后,会调用 scheduleExpirationRenewal(threadId) 方法。
![[分布式协调P4-Redisson解析-2.png]]续期调度
scheduleExpirationRenewal是Watchdog的核心,使用Netty的HashedWheelTimer创建一个定时任务,在默认租约时间(30秒)的三分之一(即10秒)后执行。

// RedissonBaseLock.java (简化后)
protected void scheduleExpirationRenewal(long threadId) {ExpirationEntry entry = new ExpirationEntry();// EXPIRATION_RENEWAL_MAP 是一个 ConcurrentHashMap,存储每个线程的续期任务ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);if (oldEntry != null) {// ... 已经有续期任务了,不用重复创建} else {entry.acquire(); // 标记为活动renewExpiration(); // 立即启动第一次调度}
}private void renewExpiration() {// ...// 创建一个定时任务,在 internalLockLeaseTime / 3 之后执行newTimeout = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {// ...// 定时任务执行时,会再次调用一个 LUA 脚本去续期RFuture<Boolean> future = renewExpirationAsync(threadId);future.whenComplete((res, e) -> {if (res) {// 如果续期成功 (res=true),则递归调用,安排下一次续期renewExpiration();}});}}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);// ...
}

续期 Lua 脚本 (renewExpirationAsync): 定时任务执行的脚本很简单,就是检查锁是否还被当前线程持有,如果是,就重置过期时间。

-- 续期脚本  
-- KEYS[1]: 锁名
-- ARGV[1]: 新的过期时间 (30)
-- ARGV[2]: 线程唯一ID
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) thenredis.call('pexpire', KEYS[1], ARGV[1]);return 1;
end;
return 0;

如果续期成功,脚本返回 1,Java 端的 renewExpiration() 方法就会再次调度自己,形成一个循环,直到锁被释放。

在unlockInnerAsync方法中,当锁被最终释放时(计数器为0),会调用 cancelExpirationRenewal(threadId) 方法,该方法会从 EXPIRATION_RENEWAL_MAP 中移除任务并取消定时器,停止续期。

总结

问题Redisson 解决方案核心源码/技术
身份验证使用 UUID:ThreadId 作为 Hash 的 Field,解锁时先验证 Field 是否匹配。RedissonLock.unlockInnerAsync 中的 Lua 脚本 和 hexists 命令。
可重入性使用 Hash 的 Value 作为重入计数器,每次加锁 hincrby 1,解锁 hincrby -1。RedissonLock.tryLockInnerAsync 和 unlockInnerAsync 中的 Lua 脚本 和 hincrby 命令。
锁续期Watchdog 机制:在成功加锁后,启动一个后台定时任务,在锁过期前自动刷新其过期时间。RedissonBaseLock.scheduleExpirationRenewal、Netty HashedWheelTimer 和一个简单的续期 Lua 脚本

相关文章:

  • MacOS下Homebrew国内镜像加速指南(2025最新国内镜像加速)
  • GOOUUU ESP32-S3-CAM 果云科技开发板开发指南(一)(超详细!)Vscode+espidf 通过摄像头拍摄照片并存取到SD卡中,文末附源码
  • Go 语言并发编程基础:无缓冲与有缓冲通道
  • VScode 使用 git 提交数据到指定库的完整指南
  • 【Fiddler工具判断前后端Bug】
  • 飞牛云一键设置动态域名+ipv6内网直通访问内网的ssh服务-家庭云计算专家
  • Linux下VSCode开发环境配置(LSP)
  • 阿里云ACP云计算备考笔记 (4)——企业应用服务
  • 客户端和服务器已成功建立 TCP 连接【输出解析】
  • WPF八大法则:告别模态窗口卡顿
  • 使用有限计算实现视频生成模型的高效训练
  • 安全访问家中 Linux 服务器的远程方案 —— 专为单用户场景设计
  • uniapp 对接腾讯云IM群公告功能
  • wpf的converter
  • 手机号在网状态查询接口如何用PHP实现调用?
  • Cisco Packer Tracer 综合实验
  • 使用Python和Flask构建简单的机器学习API
  • 基于物联网设计的智慧家庭健康医疗系统
  • 数据库系统概论(十七)超详细讲解数据库规范化与五大范式(从函数依赖到多值依赖,再到五大范式,附带例题,表格,知识图谱对比带你一步步掌握)
  • 大模型Agent智能体介绍和应用场景
  • 四川省住房和城乡建设厅新网站/google推广技巧
  • 网站搭建公司排行榜/站内推广有哪些方式
  • 淘宝客怎样做自己的网站推广/seo点击器
  • 宜昌 网站建设 公司/优化步骤
  • 网络优化工程师是做什么的/长沙关键词优化新报价
  • 自己怎样给网站做推广/营销方法有哪几种