2.商户查询缓存
2.0 问题记录
2.0.1 为什么要给缓存 TTL
- 1. 防止内存泄漏:如果不设置过期时间,缓存数据会永久存在于 Redis 中,随着时间推移可能导致 Redis 内存耗尽。
- 2. 数据一致性:设置合理的过期时间可以确保缓存不会长期存储过时数据,当数据库中的数据更新后,缓存会自动过期并从数据库重新加载最新数据。
- 3. 资源优化:不是所有数据都需要永久缓存,设置过期时间可以自动清理不常用的数据,优化 Redis 资源使用。
- 4. 缓存雪崩防护:通过设置不同的 TTL(虽然这里使用的是固定值),可以避免大量缓存同时过期导致数据库瞬时压力过大。
2.0.2 缓存一致性问题
2.商户查询缓存
仍有问题
2.0.3 缓存穿透问题
2.商户查询缓存
仍然有安全风险
2.0.4 缓存击穿问题
2.商户查询缓存
没考虑分布式
2.0.5 自旋 sleep 问题
1.避免 CPU 资源浪费
// 4.2 自旋锁解决缓存击穿问题
while (!tryLock(lockKey)) {log.info("Thread:{} 被阻塞", Thread.currentThread().getId());// 4.3 休眠并重试try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {throw new RuntimeException(e);}// 再次检查缓存...
}
如果不进行休眠,线程会不断地尝试获取锁,这会导致 CPU 资源的大量消耗。通过让线程短暂休眠,可以减轻 CPU 负担,让出 CPU 时间片给其他线程使用。
2.减轻 Redis 服务器压力
每次调用 tryLock 方法都会向 Redis 发送一次 setIfAbsent 命令。如果没有休眠,失败的线程会立即重试,导致大量请求涌向 Redis 服务器,可能造成 Redis 服务器压力过大。休眠可以降低 Redis 的访问频率。
3. 避免活锁
如果多个线程同时竞争锁,没有休眠的话可能会导致所有线程都在不断尝试获取锁但都无法成功,形成活锁状态。通过引入随机或固定的休眠时间,可以打破这种同步竞争模式。
4. 提高获取锁的公平性
休眠可以让竞争失败的线程有序地重新尝试获取锁,而不是立即重试,这在一定程度上提高了获取锁的公平性,避免某些线程因为系统调度原因一直无法获取到锁。
5. 降低系统整体负载
在高并发场景下,如果所有线程都不休眠持续自旋,会导致系统整体负载升高,影响其他业务的正常运行。休眠可以平滑并发压力。
总结
在自旋锁实现中加入休眠是一种常见的优化手段,它在保持自旋锁非阻塞特性的同时,有效减少了资源消耗。对于您的缓存击穿问题,这种实现方式既能保证只有一个线程去查询数据库并重建缓存,又能让其他线程在等待过程中不会过度消耗系统资源。
2.1 什么是缓存
缓存 (Cache),就是数据交换的缓冲区,俗称的缓存就是缓冲区内的数据,一般从数据库中获取,存储于本地代码,例如:
例1:Static final ConcurrentHashMap<K,V> map = new ConcurrentHashMap<>();
本地用于高并发例2:static final Cache<K,V> USER_CACHE = CacheBuilder.newBuilder().build();
用于redis等缓存例3:Static final Map<K,V> map = new HashMap();
本地缓存
由于其被 Static 修饰,所以随着类的加载而被加载到内存之中,作为本地缓存,由于其又被 final 修饰,所以其引用 (例 3:map) 和对象 (例 3:new HashMap()) 之间的关系是固定的,不能改变,因此不用担心赋值 (=) 导致缓存失效;
2.1.1 为什么要使用缓存
一句话: 因为速度快,好用
缓存数据存储于代码中,而代码运行在内存中,内存的读写性能远高于磁盘,缓存可以大大降低用户访问并发量带来的服务器读写压力
实际开发过程中,企业的数据量,少则几十万,多则几千万,这么大数据量,如果没有缓存来作为 " 避震器 ",系统是几乎撑不住的,所以企业会大量运用到缓存技术;
但是缓存也会增加代码复杂度和运营的成本:
2.1.2 如何使用缓存
实际开发中,会构筑多级缓存来使系统运行速度进一步提升,例如:
- 本地缓存与 redis 中的缓存并发使用
- 浏览器缓存:主要是存在于浏览器端的缓存
- 应用层缓存:可以分为 tomcat 本地缓存,比如之前提到的 map,或者是使用 redis 作为缓存
- 数据库缓存:在数据库中有一片空间是 buffer pool,增改查数据都会先加载到 mysql 的缓存中
- CPU 缓存:当代计算机最大的问题是 cpu 性能提升了,但内存读写速度没有跟上,所以为了适应当下的情况,增加了 cpu 的 L1,L2,L3 级的缓存
2.2 添加商户缓存
在我们查询商户信息时,我们是直接操作从数据库中去进行查询的,大致逻辑是这样,直接查询数据库那肯定慢咯,所以我们需要增加缓存
@GetMapping("/{id}")
public Result queryShopById(@PathVariable("id") Long id) {//这里是直接查询数据库return shopService.queryById(id);
}
2.2.1 缓存模型和思路
标准的操作方式就是查询数据库之前先查询缓存,如果缓存数据存在,则直接从缓存中返回,如果缓存数据不存在,再查询数据库,然后将数据存入 redis。
2.3 缓存更新策略
缓存更新是 redis 为了节约内存而设计出来的一个东西,主要是因为内存数据宝贵,当我们向 redis 插入太多数据,此时就可能会导致缓存中的数据过多,所以 redis 会对部分数据进行更新,或者把他叫为淘汰更合适。
内存淘汰:redis 自动进行,当 redis 内存达到咱们设定的 max-memery 的时候,会自动触发淘汰机制,淘汰掉一些不重要的数据 (可以自己设置策略方式)
超时剔除:当我们给 redis 设置了过期时间 ttl 之后,redis 会将超时的数据进行删除,方便咱们继续使用缓存
主动更新:我们可以手动调用方法把缓存删掉,通常用于解决缓存和数据库不一致问题
2.3.1 缓存双写一致性
于我们的缓存的数据源来自于数据库,而数据库的数据是会发生变化的,因此,如果当数据库中数据发生变化,而缓存却没有同步,此时就会有一致性问题存在,其后果是:用户使用缓存中的过时数据,就会产生类似多线程数据安全问题,从而影响业务,产品口碑等; 怎么解决呢?有如下几种方案:
- 1. Cache Aside Pattern 人工编码方式:缓存调用者在更新完数据库后再去更新缓存,也称之为双写方案
- 2. Read/Write Through Pattern: 由系统本身完成,数据库与缓存的问题交由系统本身去处理
- 3. Write Behind Caching Pattern:调用者只操作缓存,其他线程去异步处理数据库,实现最终一致
2.3.2 解决方案
综合考虑使用方案一,但是方案一调用者如何处理呢?这里有几个问题
操作缓存和数据库时有三个问题需要考虑:
如果采用第一个方案,那么假设我们每次操作数据库后,都操作缓存,但是中间如果没有人查询,那么这个更新动作实际上只有最后一次生效,中间的更新动作意义并不大,我们可以把缓存删除,等待再次查询时,将缓存中的数据加载出来
- 删除缓存还是更新缓存?
- 更新缓存:每次更新数据库都更新缓存,无效写操作较多
- 删除缓存:更新数据库时让缓存失效,查询时再更新缓存
- 如何保证缓存与数据库的操作的同时成功或失败?
- 单体系统,将缓存与数据库操作放在一个事务
- 分布式系统,利用 TCC(Try-Confirm-Cancel)) 等分布式事务方案
应该具体操作缓存还是操作数据库,我们应当是先操作数据库,再删除缓存,原因在于,如果你选择第一种方案,在两个线程并发来访问时,假设线程 1 先来,他先把缓存删了,此时线程 2 过来,他查询缓存数据并不存在,此时他写入缓存,当他写入缓存后,线程 1 再执行更新动作时,实际上写入的就是旧的数据,新的数据被旧数据覆盖了。
- 先操作缓存还是先操作数据库?
- 先删除缓存,再操作数据库
- 先操作数据库,再删除缓存
🎨
缓存双写一致性问题解决方案
缓存双写一致性是分布式系统中常见的问题,指的是当数据同时存在于数据库和缓存中时,如何保证两者的数据一致性。以下是几种常见的解决方案:
1. 缓存更新策略
1.1 Cache Aside Pattern (旁路缓存模式)
- 读操作:先读缓存,缓存没有则读数据库,然后写入缓存
- 写操作:先更新数据库,再删除缓存
- 优点:简单易实现,适合读多写少场景
- 缺点:存在短暂不一致窗口
1.2 Read/Write Through Pattern
- 读/写操作都通过缓存层,缓存层负责与数据库同步
- 优点:业务代码更简洁
- 缺点:实现复杂,需要缓存层支持
2. 双写一致性保障方案
2.1 延迟双删策略
- 先删除缓存
- 更新数据库
- 延迟一段时间后再次删除缓存(处理可能的脏数据)
2.2 基于消息队列的最终一致性
- 数据库更新后发送消息到 MQ
- 消费者异步更新缓存
- 优点:解耦,保证最终一致性
- 缺点:系统复杂度增加
3. 高级解决方案
3.1 分布式锁
- 更新数据时获取分布式锁
- 防止并发更新导致的数据不一致
3.2 版本号/时间戳机制
- 数据增加版本号或时间戳
- 更新时比较版本,防止旧数据覆盖新数据
3.3 数据库 binlog 监听
- 通过 Canal 等工具监听数据库 binlog
- 实时更新缓存,保证强一致性
选择建议
- 对一致性要求不高的场景:Cache Aside Pattern
- 对一致性要求较高的场景:延迟双删+消息队列
- 强一致性要求场景:分布式锁+版本控制
实际应用中,通常需要根据业务场景和性能要求选择合适的方案或组合多种方案。
2.4 实现商铺双写一致性
- 根据 id 查询店铺时,如果缓存未命中,则查询数据库,将数据库结果写入缓存,并设置超时时间
- 根据 id 修改店铺时,先修改数据库,再删除缓存
- 新查询缓存未命中则把 mysql 的数据写入 redis
@Override@Transactionalpublic Result update(Shop shop) {Long id = shop.getId();if (id == null) {return Result.fail("店铺id不能为空");}// 1.更新数据库updateById(shop);// 2.删除缓存stringRedisTemplate.delete(RedisConstants.CACHE_SHOP_KEY + id);return Result.ok();}
2.4.1 存在的问题
当前实现采用了"先更新数据库,再删除缓存"的策略。虽然这种策略在大多数情况下能工作,但在高并发场景下可能出现数据不一致:
- 如果删除缓存操作失败(比如 Redis 服务不可用),会导致缓存中保留旧数据
- 在多线程环境下,可能出现缓存刚被删除,又被其他线程以旧数据写入的情况
2.4.2 解决方案(额外考虑)
针对这种情况,有几种常见的解决方案:
- 1. 设置较短的缓存过期时间 :减少缓存与数据库不一致的时间窗口
- 2. 使用分布式锁 :在更新操作时加锁,确保同一时间只有一个线程能更新数据
- 3. 延迟双删策略 :
// 1.删除缓存 stringRedisTemplate.delete(RedisConstants.CACHE_SHOP_KEY + id); // 2.更新数据库 updateById(shop); // 3.休眠一段时间(大于读操作的时间) Thread.sleep(200); // 4.再次删除缓存 stringRedisTemplate.delete(RedisConstants.CACHE_SHOP_KEY + id);
- 4. 使用消息队列 :数据库更新后发送消息,由专门的消费者负责更新缓存
- 5. 使用数据库变更捕获(CDC)工具 :如 Canal,监听数据库变更,实时更新缓存
在实际应用中,应根据业务特点、并发量和一致性要求选择合适的方案。对于大多数业务场景,设置合理的缓存过期时间和延迟双删策略已经能够满足需求。
2.4.2 @Transactional
Spring 的@Transactional 注解详解
@Transactional 是 Spring 框架提供的一个非常重要的注解,用于声明式事务管理。让我详细解释一下这个注解及其与 MySQL 原生事务回滚的区别。
@Transactional 注解的基本概念
@Transactional 注解可以应用在类或方法级别,用于定义事务的边界和属性。当一个方法被@Transactional 注解标记时,Spring 会在方法执行前开启一个事务,方法执行完成后提交事务,如果方法执行过程中抛出异常,则回滚事务。
@Transactional 注解的主要属性
- propagation :事务传播行为,定义了方法如何参与现有事务
- REQUIRED(默认):如果当前存在事务,则加入该事务;如果当前没有事务,则创建一个新事务
- REQUIRES_NEW:创建一个新事务,如果当前存在事务,则挂起当前事务
- SUPPORTS:如果当前存在事务,则加入该事务;如果当前没有事务,则以非事务方式执行
- 其他值:MANDATORY, NEVER, NOT_SUPPORTED, NESTED
- isolation :事务隔离级别
- DEFAULT(默认):使用数据库默认的隔离级别
- READ_UNCOMMITTED:读未提交
- READ_COMMITTED:读已提交
- REPEATABLE_READ:可重复读
- SERIALIZABLE:串行化
- timeout :事务超时时间(秒)
- readOnly :是否为只读事务
- rollbackFor/rollbackForClassName :指定哪些异常类型会导致事务回滚
- noRollbackFor/noRollbackForClassName :指定哪些异常类型不会导致事务回滚
@Transactional 与 MySQL 事务回滚的区别
1. 实现层次不同
- MySQL 事务 :是数据库层面的事务机制,通过 SQL 语句(BEGIN/START TRANSACTION, COMMIT, ROLLBACK)直接控制
- @Transactional :是应用层面的事务管理,由 Spring 框架通过 AOP(面向切面编程)实现,底层仍然调用 JDBC 的事务 API
2. 触发方式不同
- MySQL 事务 :需要手动编写事务控制语句
- @Transactional :声明式事务,只需添加注解,无需编写事务控制代码
3. 回滚机制不同
- MySQL 事务 :需要手动执行 ROLLBACK 语句才会回滚
- @Transactional :当方法抛出未捕获的 RuntimeException 或 Error 时自动回滚(默认情况下)
4. 事务范围不同
- MySQL 事务 :仅限于单个数据库连接内的操作
- @Transactional :可以通过配置实现跨多个数据源的分布式事务(需要 JTA 等支持)
5. 异常处理不同
- MySQL 事务 :需要在 catch 块中手动执行 ROLLBACK
- @Transactional :可以通过 rollbackFor 属性指定哪些异常会触发回滚
@Transactional 的工作原理
- Spring 通过 AOP(面向切面编程)创建代理对象
- 当调用被@Transactional 注解的方法时,代理拦截该调用
- 代理开启事务(通过 PlatformTransactionManager)
- 执行目标方法
- 如果方法执行成功,代理提交事务
- 如果方法抛出异常,代理根据配置决定是否回滚事务
使用@Transactional 的注意事项
- 只对 public 方法有效 :@Transactional 只能应用于 public 方法,对于非 public 方法,即使使用@Transactional 注解,也不会创建事务
- 自调用问题 :在同一个类中,一个非事务方法调用事务方法,事务不会生效,因为 Spring 的 AOP 代理机制导致
- 运行时异常回滚 :默认情况下,只有 RuntimeException 和 Error 会触发事务回滚,checked 异常不会
- 事务失效情况 :
- 数据库不支持事务
- 类没有被 Spring 管理
- 方法不是 public 的
- 异常被捕获但未重新抛出
- 使用了错误的事务传播行为
- 注意事项 :在您的代码中,Redis 操作不在事务范围内,因为 Redis 是一个独立的存储系统,不参与 Spring 的事务管理
总结
@Transactional 是 Spring 提供的强大事务管理工具,相比 MySQL 原生事务,它提供了更高级别的抽象和更便捷的使用方式。在实际开发中,合理使用@Transactional 可以大大简化事务管理的复杂性,提高代码的可维护性。
在您的 ShopServiceImpl 中,@Transactional 确保了数据库更新操作的原子性,但需要注意的是,Redis 缓存操作不在事务范围内,如果对数据一致性要求较高,可能需要考虑额外的机制来保证缓存与数据库的一致性。
2.5 缓存穿透
缓存穿透:缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库。
常见的解决方案有两种:
- 缓存空对象
- 优点:实现简单,维护方便
- 缺点:
- 额外的内存消耗
- 可能造成短期的不一致
- 布隆过滤
- 优点:内存占用较少,没有多余 key
- 缺点:
- 实现复杂
- 存在误判可能
2.5.1 缓存空对象
- 当我们客户端访问不存在的数据时,先请求 redis
- 但是此时 redis 中没有数据,此时会访问到数据库
- 但是数据库中也没有数据,这个数据穿透了缓存,直击数据库
- 我们都知道数据库能够承载的并发不如 redis 这么高,如果大量的请求同时过来访问这种不存在的数据,这些请求就都会访问到数据库
- 简单的解决方案就是哪怕这个数据在数据库中也不存在,我们也把这个数据存入到 redis 中去,这样,下次用户过来访问这个不存在的数据,那么在 redis 中也能找到这个数据就不会进入到缓存了
2.5.2 布隆过滤器
- 布隆过滤器其实采用的是哈希思想来解决这个问题
- 通过一个庞大的二进制数组,走哈希思想去判断当前这个要查询的这个数据是否存在
- 如果布隆过滤器判断存在,则放行
- 这个请求会去访问 redis,哪怕此时 redis 中的数据过期了
- 但是数据库中一定存在这个数据,在数据库中查询出来这个数据后,再将其放入到 redis 中
- 假设布隆过滤器判断这个数据不存在,则直接返回
- 这种方式优点在于节约内存空间,存在误判,误判原因在于:布隆过滤器走的是哈希思想,只要哈希思想,就可能存在哈希冲突
2.5.3 解决缓存穿透
核心思路如下:
- 在原来的逻辑中
- 我们如果发现这个数据在 mysql 中不存在,直接就返回 404 了
- 这样是会存在缓存穿透问题的
- 现在的逻辑中
- 如果这个数据不存在,我们不会返回 404
- 还是会把这个数据写入到 Redis 中,并且将 value 设置为空
- 当再次发起查询时,我们如果发现命中之后,判断这个 value 是否是 null
- 如果是 null,则是之前写入的数据,证明是缓存穿透数据
- 如果不是,则直接返回数据。
public Result queryById(Long id) {String key = RedisConstants.CACHE_SHOP_KEY + id;// 1.从redis查询商品缓存String shopJson = stringRedisTemplate.opsForValue().get(key);// 2.判断是否存在if (StrUtil.isNotBlank(shopJson)) {// 3.存在,直接返回Shop shop = JSONUtil.toBean(shopJson, Shop.class);return Result.ok(shop);}// 判断命中是否是空值if (shopJson != null) {// 返回错误信息return Result.fail("店铺不存在");}// 4.不存在,根据id查询数据库Shop shop = getById(id);// 5.不存在,返回错误if (shop == null) {// 解决缓存穿透问题 - 这里采用空值解决stringRedisTemplate.opsForValue().set(key, "", RedisConstants.CACHE_NULL_TTL, TimeUnit.MINUTES);return Result.fail("店铺不存在");}// 6.存在,写入redisstringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES);// 7.返回return Result.ok(shop);
}
仍然有安全风险:
对于恶意攻击者使用大量不同的不存在的 key 进行攻击时,这种方法无法完全解决问题,因为攻击者可以不断生成新的 key,导致缓存中仍然无法命中,最终请求还是会打到数据库。
2.6 小总结
缓存穿透产生的原因是什么?
- 用户请求的数据在缓存中和数据库中都不存在,不断发起这样的请求,给数据库带来巨大压力
缓存穿透的解决方案有哪些?
- 缓存 null 值
- 布隆过滤
- 增强 id 的复杂度,避免被猜测 id 规律
- 做好数据的基础格式校验
- 加强用户权限校验
- 做好热点参数的限流
2.7 缓存雪崩
缓存雪崩是指在同一时段大量的缓存 key 同时失效或者 Redis 服务宕机,导致大量请求到达数据库,带来巨大压力。
解决方案:
- 给不同的 Key 的 TTL 添加随机值
- 利用 Redis 集群提高服务的可用性
- 给缓存业务添加降级限流策略
- 给业务添加多级缓存
2.8 缓存击穿
就是一个被高并发访问并且缓存重建业务较复杂的 key 突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。
常见的解决方案有两种:
- 互斥锁
- 逻辑过期
逻辑分析
- 假设线程 1 在查询缓存之后,本来应该去查询数据库,然后把这个数据重新加载到缓存的
- 此时只要线程 1 走完这个逻辑,其他线程就都能从缓存中加载这些数据了
- 但是假设在线程 1 没有走完的时候,后续的线程 2,线程 3,线程 4 同时过来访问当前这个方法
- 那么这些线程都不能从缓存中查询到数据
- 那么他们就会同一时刻来访问查询缓存,都没查到,接着同一时间去访问数据库
- 同时的去执行数据库代码,对数据库访问压力过大
2.8.1 解决方案
为了避免这种问题,可以采用以下方法:
- 1. 加锁机制:在查询数据库并更新缓存时加锁,确保只有一个线程执行数据库查询和缓存更新操作,其他线程等待。
- 因为锁能实现互斥性。假设线程过来,只能一个人一个人的来访问数据库,从而避免对于数据库访问压力过大,但这也会影响查询的性能,因为此时会让查询的性能从并行变成了串行,我们可以采用 tryLock 方法 + double check 来解决这样的问题。
- 假设现在线程 1 过来访问,他查询缓存没有命中,但是此时他获得到了锁的资源,那么线程 1 就会一个人去执行逻辑,假设现在线程 2 过来,线程 2 在执行过程中,并没有获得到锁,那么线程 2 就可以进行到休眠,直到线程 1 把锁释放后,线程 2 获得到锁,然后再来执行逻辑,此时就能够从缓存中拿到数据了。
- 2. 逻辑过期
- 方案分析:我们之所以会出现这个缓存击穿问题,主要原因是在于我们对 key 设置了过期时间,假设我们不设置过期时间,其实就不会有缓存击穿的问题,但是不设置过期时间,这样数据不就一直占用我们内存了吗,我们可以采用逻辑过期方案。
- 我们把过期时间设置在 redis 的 value 中,注意:这个过期时间并不会直接作用于 redis,而是我们后续通过逻辑去处理。
- 线程 1 查询缓存:
- 从缓存中获取数据,并检查逻辑过期时间。
- 如果数据已过期,线程 1 尝试获取锁。
- 线程 1 获取锁:
- 线程 1 成功获取锁,并开启一个异步线程去更新缓存。
- 线程 1 直接返回旧数据。
- 线程 2 和线程 3 查询缓存:
- 线程 2 和线程 3 发现数据已过期,尝试获取锁。
- 由于锁被线程 1 占用,线程 2 和线程 3 无法获取锁,直接返回旧数据。
- 异步线程更新缓存:
- 异步线程查询数据库,获取最新数据。
- 更新缓存,并释放锁。
- 后续线程查询缓存:
- 缓存已更新,后续线程可以直接从缓存中获取最新数据。
- 线程 1 查询缓存:
- 这种方案巧妙在于,异步的构建缓存,缺点在于在构建完缓存之前,返回的都是脏数据。
- 3. 缓存预热:提前将热点数据加载到缓存中,避免缓存失效。
- 4. 缓存空值:如果数据库查询结果为空,也缓存空值,避免频繁查询数据库。
- 5. 分布式锁:在分布式环境下,使用分布式锁(如 Redis 的 SETNX)来保证只有一个线程执行数据库查询。
2.8.2 方案对比
互斥锁方案:
- 由于保证了互斥性,所以数据一致,且实现简单
- 因为仅仅只需要加一把锁而已,也没其他的事情需要操心,所以没有额外的内存消耗
- 缺点在于有锁就有死锁问题的发生,且只能串行执行性能肯定受到影响
逻辑过期方案:
- 线程读取过程中不需要等待,性能好
- 有一个额外的线程持有锁去进行重构数据,但是在重构数据完成前
- 缺点是其他的线程只能返回之前的数据,且实现起来麻烦
2.8.3 互斥锁解决缓存击穿
核心思路:
- 相较于原来从缓存中查询不到数据后直接查询数据库而言
- 现在的方案是 进行查询之后,如果从缓存没有查询到数据,则进行互斥锁的获取
- 获取互斥锁后,判断是否获得到了锁,如果没有获得到,则休眠,过一会再进行尝试
- 直到获取到锁为止,才能进行查询
如果获取到了锁的线程,再去进行查询,查询后将数据写入 redis,再释放锁,返回数据,利用互斥锁就能保证只有一个线程去执行操作数据库的逻辑,防止缓存击穿
/*** 尝试获取锁* @param key* @return*/
private boolean tryLock(String key) {Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);return BooleanUtil.isTrue(flag);
}/*** 释放锁* @param key*/
private void unlock(String key) {stringRedisTemplate.delete(key);
}
/*** 解决缓存穿透问题 - 这里采用互斥锁解决 + 递归* @param id* @return*/
public Shop queryWithPassMutex(Long id) {String key = RedisConstants.CACHE_SHOP_KEY + id;// 1.从redis查询商品缓存String shopJson = stringRedisTemplate.opsForValue().get(key);// 2.判断是否存在if (StrUtil.isNotBlank(shopJson)) {// 3.存在,直接返回return JSONUtil.toBean(shopJson, Shop.class);}// 判断命中是否是空值if (shopJson != null) {// 返回错误信息return null;}// 4.实现缓存重建// 4.1.获取互斥锁String lockKey = RedisConstants.LOCK_SHOP_KEY + id;Shop shop = null;try {boolean isLock = tryLock(lockKey);// 4.2.判断是否获取成功if (!isLock) {// 4.3.失败,则休眠并重试Thread.sleep(50);return queryWithPassMutex(id);}// 4.4.成功,根据id查询数据库shop = getById(id);// 5.不存在,返回错误if (shop == null) {// 将空值写入redisstringRedisTemplate.opsForValue().set(key, "", RedisConstants.CACHE_NULL_TTL, TimeUnit.MINUTES);return null;}// 6.存在,写入redisstringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES);} catch (InterruptedException e) {throw new RuntimeException(e);} finally {// 7.释放互斥锁unlock(lockKey);}return shop;
}
2.8.4 JMeter 压测
- QPS 达到 200 没问题
- 只查询了一次数据库
- 缓存击穿问题解决
2.8.5 问题
- 递归容易造成 StackOverflowError,改为自旋 queryWithMutexV3 并检查 redis
public Shop queryWithMutexV3(Long id) {String key = RedisConstants.CACHE_SHOP_KEY + id;// 1.从redis查询商品缓存String shopJson = stringRedisTemplate.opsForValue().get(key);// 2.判断是否存在if (StrUtil.isNotBlank(shopJson)) {log.info("命中缓存,Thread:{} 拿到缓存", Thread.currentThread().getId());// 3.存在,直接返回return JSONUtil.toBean(shopJson, Shop.class);}// 判断命中是否是空值if (shopJson != null) {// 返回错误信息return null;}// 4.实现缓存重建// 4.1.获取互斥锁String lockKey = RedisConstants.LOCK_SHOP_KEY + id;Shop shop = null;// 4.2 自旋锁解决缓存击穿问题while (!tryLock(lockKey)) {log.info("Thread:{} 被阻塞", Thread.currentThread().getId());// 4.3 休眠并重试try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {throw new RuntimeException(e);}// 再次检查缓存,可能有其他线程已经写入shopJson = stringRedisTemplate.opsForValue().get(key);if (StrUtil.isNotBlank(shopJson)) {log.info("更新缓存成功,锁被释放,自旋Thread:{} 拿到缓存", Thread.currentThread().getId());return JSONUtil.toBean(shopJson, Shop.class);}// 判断命中是否是空值if (shopJson != null) {return null;}}try {// 4.4.成功,根据id查询数据库shop = getById(id);// 5.不存在,返回错误if (shop == null) {// 将空值写入redisstringRedisTemplate.opsForValue().set(key, "", RedisConstants.CACHE_NULL_TTL, TimeUnit.MINUTES);return null;}// 6.存在,写入redisstringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES);} finally {// 7.释放互斥锁unlock(lockKey);}return shop; }
- 获取锁+TTL 为原子,但 实际业务处理时间超过锁的过期时间问题,此时其他线程可以获取到锁,导致多个线程同时操作共享资源,在此业务中无非是多个线程去打 DB,再次造成击穿问题,若是其他业务如扣减库存,则造成库存失真问题
2.8.5 自旋压测-假成功
出现了 假成功 情况
没出现,是我误判了~~~~,但也了解一小 假成功 吧
假成功(False Positive):
- 表面上看,所有请求都成功了(HTTP 状态码正常),但实际上业务逻辑或数据处理存在问题。
- 例如:接口返回了 200,但实际数据未正确保存、处理逻辑错误、或返回了错误的结果
开始加锁逻辑就有问题无问题:
2025-04-23T20:11:54.974+08:00 DEBUG 30024 : Thread:48 加锁,避免缓存击穿
2025-04-23T20:11:54.974+08:00 DEBUG 30024 : Thread:46 加锁,避免缓存击穿
2025-04-23T20:11:54.974+08:00 DEBUG 30024 : Thread:49 加锁,避免缓存击穿
2025-04-23T20:11:54.974+08:00 DEBUG 30024 : Thread:47 加锁,避免缓存击穿
2025-04-23T20:11:54.975+08:00 INFO 30024 : Thread:48 被阻塞
2025-04-23T20:11:54.975+08:00 INFO 30024 : Thread:47 被阻塞
2025-04-23T20:11:54.975+08:00 INFO 30024 : Thread:49 被阻塞
逻辑梳理:
- 线程 46、47、48、49 同时进入了 tryLock 方法,尝试获取同一个锁
- 线程 46 先执行了 setnx 命令,成功设置了锁(返回 true )
- 线程 47、48、49 随后执行 setnx 命令,但因为锁已被线程 46 占用,所以返回 false
- 线程 46 获取锁成功,继续执行查询数据库和更新缓存的操作
- 线程 47、48、49 因为获取锁失败,进入自旋状态,不断重试获取锁或检查缓存是否已更新
- 如果此时有新的线程进来,也会像线程 47、48、49 一样进入自旋状态
- 当线程 46 完成数据库查询和缓存更新后,释放锁,其他线程才有机会获取锁或直接从更新后的缓存中获取数据
2.8.6 逻辑过期解决缓存击穿
需求:修改根据 id 查询商铺的业务,基于逻辑过期方式来解决缓存击穿问题
思路分析:
- 前提:热点 key 问题,提前将 key 存入了 redis
- 当用户开始查询 redis 时,判断是否命中
- 如果没有命中则直接返回空数据,不查询数据库
- 而一旦命中后,将 value 取出,判断 value 中的过期时间是否满足
- 如果没有过期,则直接返回 redis 中的数据
- 如果过期,则在开启独立线程后,直接返回之前的数据
- 独立线程去重构数据,重构完成后释放互斥锁
value 中要存过期时间,两种方案
- 修改原来的 shop 类再加一个属性 expireTime
- 新建一个实体类
@Data public class RedisData {private LocalDateTime expireTime;private Object data; // 存原来的shop属性信息 }
2.9 工具类封装
基于 StringRedisTemplate 封装一个缓存工具类,满足下列需求:
- 方法 1:将任意 Java 对象序列化为 json 并存储在 string 类型的 key 中,并且可以设置 TTL 过期时间
- 方法 2:将任意 Java 对象序列化为 json 并存储在 string 类型的 key 中,并且可以设置逻辑过期时间,用于处理缓存击穿问题
- 方法 3:根据指定的 key 查询缓存,并反序列化为指定类型,利用缓存空值的方式解决缓存穿透问题
- 方法 4:根据指定的 key 查询缓存,并反序列化为指定类型,需要利用逻辑过期解决缓存击穿问题
2.9 考虑分布式与微服务
2.9.1 当前方案的局限性
目前的互斥锁实现使用了 Redis 的setIfAbsent命令来实现分布式锁:
private boolean tryLock(String key) {// 获取锁+设置过期时间,保证原子性Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);log.debug("Thread:{} 加锁,避免缓存击穿", Thread.currentThread().getId());return BooleanUtil.isTrue(flag);
}
在分布式环境下,这种实现存在以下问题:
- 1. 锁的可靠性问题:如果获取锁的服务实例在释放锁之前崩溃,可能导致锁无法释放
- 2. 锁的原子性问题:在某些 Redis 客户端实现中,setIfAbsent和设置过期时间可能不是原子操作
- 3. 缓存一致性问题:不同服务实例可能有自己的本地缓存,导致数据不一致
2.9.2 分布式环境下的可行方案
1. 使用 Redis 分布式锁
您当前的实现已经使用了 Redis 作为分布式锁,但需要一些改进:
private boolean tryLock(String key) {// 使用唯一标识作为锁值,便于安全释放String lockValue = UUID.randomUUID().toString();Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, lockValue, 10, TimeUnit.SECONDS);return BooleanUtil.isTrue(flag);
}private void unlock(String key) {// 使用Lua脚本确保原子性释放String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";// 执行脚本stringRedisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Collections.singletonList(key), lockValue);
}
2. 使用 Redisson 框架
Redisson 提供了更可靠的分布式锁实现,包括看门狗机制自动续期:
@Resource
private RedissonClient redissonClient;public Shop queryWithRedissonLock(Long id) {String key = RedisConstants.CACHE_SHOP_KEY + id;// 1.从redis查询商品缓存// ... 现有代码 ...// 4.实现缓存重建// 4.1.获取互斥锁String lockKey = RedisConstants.LOCK_SHOP_KEY + id;RLock lock = redissonClient.getLock(lockKey);Shop shop = null;try {// 尝试获取锁,最多等待500毫秒,锁自动释放时间10秒boolean isLock = lock.tryLock(500, 10000, TimeUnit.MILLISECONDS);if (!isLock) {// 获取锁失败,休眠后重试Thread.sleep(50);return queryWithRedissonLock(id);}// 获取锁成功,查询数据库// ... 现有代码 ...} catch (Exception e) {throw new RuntimeException(e);} finally {// 释放锁lock.unlock();}return shop;
}
3. 使用消息队列实现缓存更新
在微服务环境下,可以使用消息队列来协调缓存更新:
@Override
@Transactional
public Result update(Shop shop) {Long id = shop.getId();if (id == null) {return Result.fail("店铺id不能为空");}// 1.更新数据库updateById(shop);// 2.发送消息到MQkafkaTemplate.send("shop-update-topic", JSONUtil.toJsonStr(shop));return Result.ok();
}// 在消费者服务中
@KafkaListener(topics = "shop-update-topic")
public void handleShopUpdate(String message) {Shop shop = JSONUtil.toBean(message, Shop.class);// 删除缓存stringRedisTemplate.delete(RedisConstants.CACHE_SHOP_KEY + shop.getId());
}
4. 使用 Canal 监听数据库变更
对于大型分布式系统,可以使用 Canal 监听 MySQL 的 binlog,实时同步数据库变更到缓存:
// Canal客户端配置
@Bean
public CanalConnector canalConnector() {return CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111),"example", "", "");
}// 监听数据库变更
@Scheduled(fixedDelay = 1000)
public void processDataChange() {CanalConnector connector = canalConnector();try {connector.connect();connector.subscribe(".*\\..*");Message message = connector.get(100);if (message.getEntries().size() > 0) {for (CanalEntry.Entry entry : message.getEntries()) {// 处理数据变更if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {// 删除对应的缓存// ...}}}} finally {connector.disconnect();}
}
2.9.3 总结建议
在分布式和微服务环境下,我建议:
- 1. 短期方案:使用 Redisson 框架替代自定义的 Redis 分布式锁实现,提高锁的可靠性
- 2. 中期方案:引入消息队列(如 Kafka、RabbitMQ)实现缓存更新的异步通知
- 3. 长期方案:考虑使用 Canal 等工具监听数据库变更,实现自动缓存同步
您当前的方案在做了上述调整后,完全可以在分布式和微服务环境下正常工作。关键是要解决分布式锁的可靠性问题和缓存一致性问题。