20.缓存问题与解决方案详解教程
文章目录
- 1. 缓存基础概念
- 1.1 什么是缓存
- 1.2 缓存的作用
- 1.3 常见的缓存类型
- 1.4 缓存架构示例
- 2. 缓存雪崩 (Cache Avalanche)
- 2.1 什么是缓存雪崩
- 2.2 缓存雪崩的原因
- 2.3 缓存雪崩的危害
- 2.4 缓存雪崩的解决方案
- 方案1:设置随机过期时间
- 方案2:缓存集群和主从复制
- 方案3:熔断降级机制
- 方案4:本地缓存兜底
- 2.5 缓存雪崩预防最佳实践
- 3. 缓存穿透 (Cache Penetration)
- 3.1 什么是缓存穿透
- 3.2 缓存穿透的场景示例
- 3.3 缓存穿透的危害
- 3.4 缓存穿透的解决方案
- 方案1:缓存空值
- 方案2:布隆过滤器
- 方案3:参数校验
- 方案4:接口限流
- 3.5 Redis布隆过滤器实现
- 4. 缓存预热 (Cache Warming)
- 4.1 什么是缓存预热
- 4.2 缓存预热的时机
- 4.3 缓存预热的策略
- 策略1:启动时预热
- 策略2:分批预热
- 策略3:定时预热
- 策略4:智能预热
- 4.4 预热监控和管理
- 5. 缓存更新 (Cache Update)
- 5.1 什么是缓存更新
- 5.2 缓存更新的策略
- 策略1:Cache Aside(旁路缓存)
- 策略2:Write Through(写透缓存)
- 策略3:Write Behind(异步写回)
- 5.3 缓存一致性问题解决方案
- 方案1:延时双删
- 方案2:基于消息队列的异步更新
- 方案3:分布式锁保证一致性
- 5.4 缓存更新最佳实践
- 实践1:批量更新优化
- 6. 缓存降级 (Cache Degradation)
- 6.1 什么是缓存降级
- 6.2 缓存降级的场景
- 6.3 缓存降级策略
- 策略1:本地缓存降级
- 策略2:多级缓存降级
- 策略3:静态数据降级
- 策略4:限流降级
- 6.4 降级监控和告警
- 监控组件
- 6.5 降级策略配置化
- 7. 缓存最佳实践总结
- 7.1 设计原则
- 7.2 性能优化
- 7.3 运维建议
1. 缓存基础概念
1.1 什么是缓存
缓存是一种高速存储技术,用于临时存储频繁访问的数据,以提高系统性能和响应速度。在软件架构中,缓存通常位于应用程序和数据库之间,作为数据的快速访问层。
1.2 缓存的作用
- 提高响应速度:从内存中读取数据比从磁盘快几个数量级
- 减少数据库压力:减少对数据库的直接访问
- 提升用户体验:快速响应用户请求
- 节约成本:减少服务器资源消耗
1.3 常见的缓存类型
- 本地缓存:如HashMap、Guava Cache
- 分布式缓存:如Redis、Memcached
- 数据库缓存:如MySQL查询缓存
- CDN缓存:内容分发网络缓存
1.4 缓存架构示例
// 典型的缓存使用模式
public class UserService {private RedisTemplate<String, Object> redisTemplate;private UserRepository userRepository;public User getUserById(Long userId) {String key = "user:" + userId;// 1. 先查缓存User user = (User) redisTemplate.opsForValue().get(key);if (user != null) {return user; // 缓存命中}// 2. 缓存未命中,查数据库user = userRepository.findById(userId);if (user != null) {// 3. 将数据写入缓存redisTemplate.opsForValue().set(key, user, Duration.ofMinutes(30));}return user;}
}
2. 缓存雪崩 (Cache Avalanche)
2.1 什么是缓存雪崩
缓存雪崩是指在同一时间,大量的缓存key同时失效,导致大量请求直接打到数据库上,造成数据库瞬间压力过大甚至宕机的现象。
2.2 缓存雪崩的原因
- 缓存服务器宕机:Redis服务器突然宕机
- 大量key同时过期:设置了相同的过期时间
- 缓存预热不充分:系统重启后缓存为空
2.3 缓存雪崩的危害
- 数据库瞬间压力暴增
- 系统响应时间急剧增加
- 可能导致数据库连接池耗尽
- 严重时可能导致整个系统崩溃
2.4 缓存雪崩的解决方案
方案1:设置随机过期时间
@Service
public class ProductService {private RedisTemplate<String, Object> redisTemplate;private ProductRepository productRepository;public Product getProductById(Long productId) {String key = "product:" + productId;Product product = (Product) redisTemplate.opsForValue().get(key);if (product == null) {product = productRepository.findById(productId);if (product != null) {// 设置随机过期时间:30分钟 + 0-10分钟的随机时间int randomMinutes = new Random().nextInt(10);Duration expireTime = Duration.ofMinutes(30 + randomMinutes);redisTemplate.opsForValue().set(key, product, expireTime);}}return product;}
}
方案2:缓存集群和主从复制
# Redis集群配置示例
spring:redis:cluster:nodes:- 192.168.1.100:7001- 192.168.1.100:7002- 192.168.1.100:7003- 192.168.1.101:7001- 192.168.1.101:7002- 192.168.1.101:7003max-redirects: 3timeout: 3000mslettuce:pool:max-active: 16max-idle: 8min-idle: 0
方案3:熔断降级机制
@Component
public class ProductServiceWithCircuitBreaker {private RedisTemplate<String, Object> redisTemplate;private ProductRepository productRepository;private CircuitBreaker circuitBreaker;public ProductServiceWithCircuitBreaker() {// 配置熔断器this.circuitBreaker = CircuitBreaker.ofDefaults("productService");circuitBreaker.getEventPublisher().onStateTransition(event ->System.out.println("CircuitBreaker state transition: " + event));}public Product getProductById(Long productId) {return circuitBreaker.executeSupplier(() -> {String key = "product:" + productId;Product product = (Product) redisTemplate.opsForValue().get(key);if (product == null) {product = productRepository.findById(productId);if (product != null) {redisTemplate.opsForValue().set(key, product, Duration.ofMinutes(30));}}return product;});}
}
方案4:本地缓存兜底
@Service
public class ProductServiceWithLocalCache {private RedisTemplate<String, Object> redisTemplate;private ProductRepository productRepository;private Cache<String, Product> localCache;public ProductServiceWithLocalCache() {// 创建本地缓存this.localCache = Caffeine.newBuilder().maximumSize(1000).expireAfterWrite(5, TimeUnit.MINUTES).build();}public Product getProductById(Long productId) {String key = "product:" + productId;try {// 1. 先查Redis缓存Product product = (Product) redisTemplate.opsForValue().get(key);if (product != null) {// 同时更新本地缓存localCache.put(key, product);return product;}} catch (Exception e) {// Redis异常时,查询本地缓存Product localProduct = localCache.getIfPresent(key);if (localProduct != null) {return localProduct;}}// 2. 查询数据库Product product = productRepository.findById(productId);if (product != null) {try {redisTemplate.opsForValue().set(key, product, Duration.ofMinutes(30));} catch (Exception e) {// Redis写入失败,只更新本地缓存localCache.put(key, product);}}return product;}
}
2.5 缓存雪崩预防最佳实践
@Configuration
public class CacheConfiguration {@Beanpublic RedisTemplate<String, Object> redisTemplate(LettuceConnectionFactory connectionFactory) {RedisTemplate<String, Object> template = new RedisTemplate<>();template.setConnectionFactory(connectionFactory);// 设置序列化方式template.setKeySerializer(new StringRedisSerializer());template.setValueSerializer(new GenericJackson2JsonRedisSerializer());return template;}@Beanpublic CacheManager cacheManager(RedisConnectionFactory connectionFactory) {RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig().entryTtl(Duration.ofMinutes(30)) // 默认30分钟过期.disableCachingNullValues();return RedisCacheManager.builder(connectionFactory).cacheDefaults(config).build();}
}
3. 缓存穿透 (Cache Penetration)
3.1 什么是缓存穿透
缓存穿透是指查询一个一定不存在的数据,由于缓存是不命中时才查询数据库,而且不存在的数据不会写入缓存,导致这个不存在的数据每次请求都要查询数据库,给数据库造成压力。
3.2 缓存穿透的场景示例
// 问题代码示例
public User getUserById(Long userId) {String key = "user:" + userId;// 1. 查缓存User user = (User) redisTemplate.opsForValue().get(key);if (user != null) {return user;}// 2. 查数据库user = userRepository.findById(userId);if (user != null) {// 3. 只有数据存在才缓存redisTemplate.opsForValue().set(key, user, Duration.ofMinutes(30));}// 如果user为null,不缓存,下次还会查数据库return user;
}
3.3 缓存穿透的危害
- 大量无效请求穿透到数据库
- 数据库查询压力增大
- 系统整体性能下降
- 可能被恶意攻击利用
3.4 缓存穿透的解决方案
方案1:缓存空值
@Service
public class UserServiceWithNullCache {private RedisTemplate<String, Object> redisTemplate;private UserRepository userRepository;public User getUserById(Long userId) {String key = "user:" + userId;// 1. 查缓存Object cached = redisTemplate.opsForValue().get(key);if (cached != null) {// 如果是特殊标记,说明数据不存在if ("NULL".equals(cached)) {return null;}return (User) cached;}// 2. 查数据库User user = userRepository.findById(userId);if (user != null) {// 3. 缓存有效数据redisTemplate.opsForValue().set(key, user, Duration.ofMinutes(30));} else {// 4. 缓存空值,但设置较短的过期时间redisTemplate.opsForValue().set(key, "NULL", Duration.ofMinutes(5));}return user;}
}
方案2:布隆过滤器
@Service
public class UserServiceWithBloomFilter {private RedisTemplate<String, Object> redisTemplate;private UserRepository userRepository;private BloomFilter<Long> bloomFilter;@PostConstructpublic void initBloomFilter() {// 创建布隆过滤器,预计100万个元素,误判率0.01%bloomFilter = BloomFilter.create(Funnels.longFunnel(), 1000000, 0.0001);// 将所有用户ID加入布隆过滤器List<Long> userIds = userRepository.findAllUserIds();userIds.forEach(bloomFilter::put);}public User getUserById(Long userId) {// 1. 先用布隆过滤器判断if (!bloomFilter.mightContain(userId)) {// 布隆过滤器说不存在,一定不存在return null;}String key = "user:" + userId;// 2. 查缓存User user = (User) redisTemplate.opsForValue().get(key);if (user != null) {return user;}// 3. 查数据库user = userRepository.findById(userId);if (user != null) {redisTemplate.opsForValue().set(key, user, Duration.ofMinutes(30));}return user;}
}
方案3:参数校验
@RestController
public class UserController {private UserService userService;@GetMapping("/user/{userId}")public ResponseEntity<User> getUser(@PathVariable Long userId) {// 1. 参数校验if (userId == null || userId <= 0) {return ResponseEntity.badRequest().build();}// 2. 业务范围校验if (userId > 999999999L) { // 假设用户ID不会超过这个值return ResponseEntity.notFound().build();}User user = userService.getUserById(userId);return user != null ? ResponseEntity.ok(user) : ResponseEntity.notFound().build();}
}
方案4:接口限流
@Component
public class RateLimitInterceptor implements HandlerInterceptor {private RateLimiter rateLimiter = RateLimiter.create(100); // 每秒100个请求@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {if (!rateLimiter.tryAcquire(1, TimeUnit.SECONDS)) {response.setStatus(HttpStatus.TOO_MANY_REQUESTS.value());response.getWriter().write("请求过于频繁,请稍后再试");return false;}return true;}
}
3.5 Redis布隆过滤器实现
@Component
public class RedisBloomFilter {private RedisTemplate<String, Object> redisTemplate;private static final String BF_KEY_PREFIX = "bf:";/*** 添加元素到布隆过滤器*/public void add(String filterName, String value) {String key = BF_KEY_PREFIX + filterName;int[] hashes = getHashes(value);for (int hash : hashes) {redisTemplate.opsForValue().setBit(key, Math.abs(hash), true);}// 设置过期时间redisTemplate.expire(key, Duration.ofDays(7));}/*** 判断元素是否可能存在*/public boolean mightContain(String filterName, String value) {String key = BF_KEY_PREFIX + filterName;int[] hashes = getHashes(value);for (int hash : hashes) {if (!redisTemplate.opsForValue().getBit(key, Math.abs(hash))) {return false;}}return true;}/*** 生成多个哈希值*/private int[] getHashes(String value) {int[] hashes = new int[3]; // 使用3个哈希函数int hash1 = value.hashCode();int hash2 = hash1 >>> 16;for (int i = 0; i < 3; i++) {hashes[i] = hash1 + i * hash2;}return hashes;}
}
4. 缓存预热 (Cache Warming)
4.1 什么是缓存预热
缓存预热是指在系统启动或者在业务高峰期之前,提前将热点数据加载到缓存中,避免在业务高峰期时因为缓存未命中而导致大量请求打到数据库上。
4.2 缓存预热的时机
- 系统启动时:应用启动完成后立即预热
- 定时预热:在业务低峰期定时刷新缓存
- 手动预热:通过管理接口手动触发预热
4.3 缓存预热的策略
策略1:启动时预热
@Component
public class CacheWarmUpService {private RedisTemplate<String, Object> redisTemplate;private ProductRepository productRepository;private UserRepository userRepository;@EventListener(ApplicationReadyEvent.class)public void warmUpCache() {System.out.println("开始缓存预热...");// 预热热门商品warmUpHotProducts();// 预热活跃用户warmUpActiveUsers();// 预热系统配置warmUpSystemConfig();System.out.println("缓存预热完成!");}private void warmUpHotProducts() {List<Product> hotProducts = productRepository.findHotProducts(100);for (Product product : hotProducts) {String key = "product:" + product.getId();redisTemplate.opsForValue().set(key, product, Duration.ofHours(2));}System.out.println("热门商品预热完成,共预热 " + hotProducts.size() + " 个商品");}private void warmUpActiveUsers() {List<User> activeUsers = userRepository.findActiveUsers(1000);for (User user : activeUsers) {String key = "user:" + user.getId();redisTemplate.opsForValue().set(key, user, Duration.ofHours(1));}System.out.println("活跃用户预热完成,共预热 " + activeUsers.size() + " 个用户");}private void warmUpSystemConfig() {Map<String, Object> configs = getSystemConfigs();configs.forEach((key, value) -> {redisTemplate.opsForValue().set("config:" + key, value, Duration.ofDays(1));});System.out.println("系统配置预热完成");}private Map<String, Object> getSystemConfigs() {// 模拟获取系统配置Map<String, Object> configs = new HashMap<>();configs.put("max_order_amount", 50000);configs.put("free_shipping_threshold", 100);configs.put("vip_discount_rate", 0.9);return configs;}
}
策略2:分批预热
@Service
public class BatchCacheWarmUpService {private RedisTemplate<String, Object> redisTemplate;private ProductRepository productRepository;private TaskExecutor taskExecutor;public void warmUpProductsInBatches() {int totalCount = productRepository.countAllProducts();int batchSize = 100;int totalBatches = (totalCount + batchSize - 1) / batchSize;System.out.println("开始分批预热商品缓存,总数:" + totalCount + ",批次数:" + totalBatches);for (int i = 0; i < totalBatches; i++) {final int batchIndex = i;taskExecutor.execute(() -> {List<Product> products = productRepository.findProductsByPage(batchIndex * batchSize, batchSize);for (Product product : products) {String key = "product:" + product.getId();redisTemplate.opsForValue().set(key, product, Duration.ofHours(2));}System.out.println("第 " + (batchIndex + 1) + " 批预热完成,预热了 " + products.size() + " 个商品");});// 控制预热速度,避免对系统造成压力try {Thread.sleep(100);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}
}
策略3:定时预热
@Component
public class ScheduledCacheWarmUp {private RedisTemplate<String, Object> redisTemplate;private ProductRepository productRepository;/*** 每天凌晨2点预热热门商品缓存*/@Scheduled(cron = "0 0 2 * * ?")public void scheduledWarmUp() {System.out.println("开始定时缓存预热...");// 获取热门商品List<Product> hotProducts = productRepository.findHotProductsByLastWeek(200);for (Product product : hotProducts) {String key = "product:" + product.getId();// 设置不同的过期时间,避免同时失效int randomHours = 12 + new Random().nextInt(12); // 12-24小时redisTemplate.opsForValue().set(key, product, Duration.ofHours(randomHours));}System.out.println("定时预热完成,预热了 " + hotProducts.size() + " 个热门商品");}/*** 每小时更新一次实时排行榜*/@Scheduled(fixedRate = 3600000) // 1小时public void updateRankingCache() {// 更新销量排行榜List<Product> topSelling = productRepository.findTopSellingProducts(50);redisTemplate.opsForList().leftPushAll("ranking:top_selling", topSelling.toArray());redisTemplate.expire("ranking:top_selling", Duration.ofHours(2));// 更新热门搜索榜List<String> hotKeywords = getHotSearchKeywords();redisTemplate.opsForList().leftPushAll("ranking:hot_keywords", hotKeywords.toArray());redisTemplate.expire("ranking:hot_keywords", Duration.ofHours(1));}private List<String> getHotSearchKeywords() {// 模拟获取热门搜索关键词return Arrays.asList("手机", "电脑", "耳机", "键盘", "鼠标");}
}
策略4:智能预热
@Service
public class IntelligentCacheWarmUp {private RedisTemplate<String, Object> redisTemplate;private ProductRepository productRepository;private AnalyticsService analyticsService;/*** 基于用户行为数据的智能预热*/public void intelligentWarmUp() {// 1. 分析用户访问模式Map<Long, Integer> productAccessCount = analyticsService.getProductAccessCount(Duration.ofDays(7));// 2. 按访问量排序List<Map.Entry<Long, Integer>> sortedProducts = productAccessCount.entrySet().stream().sorted(Map.Entry.<Long, Integer>comparingByValue().reversed()).limit(500).collect(Collectors.toList());// 3. 分级预热for (int i = 0; i < sortedProducts.size(); i++) {Long productId = sortedProducts.get(i).getKey();Integer accessCount = sortedProducts.get(i).getValue();Product product = productRepository.findById(productId);if (product != null) {String key = "product:" + productId;Duration expireTime = calculateExpireTime(i, accessCount);redisTemplate.opsForValue().set(key, product, expireTime);}}}/*** 根据商品热度计算过期时间*/private Duration calculateExpireTime(int rank, int accessCount) {if (rank < 50) {return Duration.ofHours(24); // 最热门的商品缓存24小时} else if (rank < 200) {return Duration.ofHours(12); // 次热门商品缓存12小时} else {return Duration.ofHours(6); // 一般热门商品缓存6小时}}
}
4.4 预热监控和管理
@RestController
@RequestMapping("/cache/warmup")
public class CacheWarmUpController {private BatchCacheWarmUpService batchWarmUpService;private IntelligentCacheWarmUp intelligentWarmUp;private RedisTemplate<String, Object> redisTemplate;/*** 手动触发预热*/@PostMapping("/manual")public ResponseEntity<String> manualWarmUp(@RequestParam String type) {try {switch (type) {case "product":batchWarmUpService.warmUpProductsInBatches();break;case "intelligent":intelligentWarmUp.intelligentWarmUp();break;default:return ResponseEntity.badRequest().body("不支持的预热类型");}return ResponseEntity.ok("预热任务已启动");} catch (Exception e) {return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("预热失败:" + e.getMessage());}}/*** 查看缓存状态*/@GetMapping("/status")public ResponseEntity<Map<String, Object>> getCacheStatus() {Map<String, Object> status = new HashMap<>();// 统计不同类型缓存的数量Set<String> productKeys = redisTemplate.keys("product:*");Set<String> userKeys = redisTemplate.keys("user:*");Set<String> configKeys = redisTemplate.keys("config:*");status.put("productCacheCount", productKeys != null ? productKeys.size() : 0);status.put("userCacheCount", userKeys != null ? userKeys.size() : 0);status.put("configCacheCount", configKeys != null ? configKeys.size() : 0);status.put("timestamp", System.currentTimeMillis());return ResponseEntity.ok(status);}/*** 清空指定类型的缓存*/@DeleteMapping("/clear")public ResponseEntity<String> clearCache(@RequestParam String type) {try {Set<String> keys = redisTemplate.keys(type + ":*");if (keys != null && !keys.isEmpty()) {redisTemplate.delete(keys);return ResponseEntity.ok("已清空 " + keys.size() + " 个 " + type + " 类型的缓存");} else {return ResponseEntity.ok("没有找到 " + type + " 类型的缓存");}} catch (Exception e) {return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("清空缓存失败:" + e.getMessage());}}
}
5. 缓存更新 (Cache Update)
5.1 什么是缓存更新
缓存更新是指当数据发生变化时,需要同步更新缓存中的数据,确保缓存数据与数据库数据的一致性。这是分布式系统中的一个重要问题,需要选择合适的策略来处理。
5.2 缓存更新的策略
策略1:Cache Aside(旁路缓存)
这是最常用的缓存模式,应用程序直接与缓存和数据库交互。
@Service
public class UserServiceCacheAside {private RedisTemplate<String, Object> redisTemplate;private UserRepository userRepository;/*** 读取数据*/public User getUserById(Long userId) {String key = "user:" + userId;// 1. 先查缓存User user = (User) redisTemplate.opsForValue().get(key);if (user != null) {return user;}// 2. 缓存未命中,查数据库user = userRepository.findById(userId);if (user != null) {// 3. 写入缓存redisTemplate.opsForValue().set(key, user, Duration.ofMinutes(30));}return user;}/*** 更新数据*/@Transactionalpublic void updateUser(User user) {// 1. 先更新数据库userRepository.save(user);// 2. 删除缓存String key = "user:" + user.getId();redisTemplate.delete(key);// 或者更新缓存// redisTemplate.opsForValue().set(key, user, Duration.ofMinutes(30));}/*** 删除数据*/@Transactionalpublic void deleteUser(Long userId) {// 1. 删除数据库数据userRepository.deleteById(userId);// 2. 删除缓存String key = "user:" + userId;redisTemplate.delete(key);}
}
策略2:Write Through(写透缓存)
数据同时写入缓存和数据库。
@Service
public class UserServiceWriteThrough {private RedisTemplate<String, Object> redisTemplate;private UserRepository userRepository;public User getUserById(Long userId) {String key = "user:" + userId;// 先查缓存User user = (User) redisTemplate.opsForValue().get(key);if (user != null) {return user;}// 缓存未命中,从数据库加载并写入缓存user = loadUserFromDatabase(userId);return user;}@Transactionalpublic void updateUser(User user) {try {// 1. 同时更新数据库和缓存userRepository.save(user);String key = "user:" + user.getId();redisTemplate.opsForValue().set(key, user, Duration.ofMinutes(30));} catch (Exception e) {// 如果任一操作失败,需要回滚throw new RuntimeException("更新失败", e);}}private User loadUserFromDatabase(Long userId) {User user = userRepository.findById(userId);if (user != null) {String key = "user:" + userId;redisTemplate.opsForValue().set(key, user, Duration.ofMinutes(30));}return user;}
}
策略3:Write Behind(异步写回)
数据先写入缓存,然后异步写入数据库。
@Service
public class UserServiceWriteBehind {private RedisTemplate<String, Object> redisTemplate;private UserRepository userRepository;private TaskExecutor taskExecutor;private BlockingQueue<User> writeQueue = new LinkedBlockingQueue<>();@PostConstructpublic void startAsyncWriter() {// 启动异步写入线程taskExecutor.execute(this::processWriteQueue);}public User getUserById(Long userId) {String key = "user:" + userId;// 先查缓存User user = (User) redisTemplate.opsForValue().get(key);if (user != null) {return user;}// 从数据库加载user = userRepository.findById(userId);if (user != null) {redisTemplate.opsForValue().set(key, user, Duration.ofMinutes(30));}return user;}public void updateUser(User user) {// 1. 立即更新缓存String key = "user:" + user.getId();redisTemplate.opsForValue().set(key, user, Duration.ofMinutes(30));// 2. 异步更新数据库try {writeQueue.offer(user, 1, TimeUnit.SECONDS);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}private void processWriteQueue() {while (true) {try {User user = writeQueue.take();userRepository.save(user);System.out.println("异步写入数据库完成:" + user.getId());} catch (InterruptedException e) {Thread.currentThread().interrupt();break;} catch (Exception e) {System.err.println("异步写入失败:" + e.getMessage());}}}
}
5.3 缓存一致性问题解决方案
方案1:延时双删
@Service
public class UserServiceDelayedDoubleDelete {private RedisTemplate<String, Object> redisTemplate;private UserRepository userRepository;private TaskExecutor taskExecutor;@Transactionalpublic void updateUser(User user) {String key = "user:" + user.getId();// 1. 先删除缓存redisTemplate.delete(key);// 2. 更新数据库userRepository.save(user);// 3. 延时再删除一次缓存taskExecutor.execute(() -> {try {Thread.sleep(1000); // 延时1秒redisTemplate.delete(key);System.out.println("延时删除缓存完成:" + key);} catch (InterruptedException e) {Thread.currentThread().interrupt();}});}
}
方案2:基于消息队列的异步更新
@Service
public class UserServiceWithMQ {private RedisTemplate<String, Object> redisTemplate;private UserRepository userRepository;private RabbitTemplate rabbitTemplate;@Transactionalpublic void updateUser(User user) {// 1. 更新数据库userRepository.save(user);// 2. 发送缓存更新消息CacheUpdateMessage message = new CacheUpdateMessage();message.setType("USER_UPDATE");message.setKey("user:" + user.getId());message.setUserId(user.getId());rabbitTemplate.convertAndSend("cache.update.exchange", "cache.update", message);}@RabbitListener(queues = "cache.update.queue")public void handleCacheUpdate(CacheUpdateMessage message) {try {if ("USER_UPDATE".equals(message.getType())) {// 删除缓存,下次访问时重新加载redisTemplate.delete(message.getKey());System.out.println("处理缓存更新消息:" + message.getKey());}} catch (Exception e) {System.err.println("处理缓存更新失败:" + e.getMessage());}}public static class CacheUpdateMessage {private String type;private String key;private Long userId;// getter和setter方法...}
}
方案3:分布式锁保证一致性
@Service
public class UserServiceWithDistributedLock {private RedisTemplate<String, Object> redisTemplate;private UserRepository userRepository;private RedissonClient redissonClient;@Transactionalpublic void updateUser(User user) {String lockKey = "lock:user:" + user.getId();RLock lock = redissonClient.getLock(lockKey);try {// 获取分布式锁if (lock.tryLock(10, TimeUnit.SECONDS)) {// 1. 更新数据库userRepository.save(user);// 2. 删除缓存String cacheKey = "user:" + user.getId();redisTemplate.delete(cacheKey);} else {throw new RuntimeException("获取锁失败");}} catch (InterruptedException e) {Thread.currentThread().interrupt();} finally {if (lock.isHeldByCurrentThread()) {lock.unlock();}}}
}
5.4 缓存更新最佳实践
实践1:批量更新优化
@Service
public class BatchUpdateService {private RedisTemplate<String, Object> redisTemplate;private UserRepository userRepository;/*** 批量更新用户数据*/@Transactionalpublic void batchUpdateUsers(List<User> users) {if (users == null || users.isEmpty()) {return;}// 1. 批量更新数据库userRepository.saveAll(users);// 2. 批量删除缓存List<String> cacheKeys = users.stream().map(user -> "user:" + user.getId()).collect(Collectors.toList());redisTemplate.delete(cacheKeys);System.out.println("批量更新完成,影响用户数:" + users.size());}/*** 批量预热缓存*/public void batchWarmUpUsers(List<Long> userIds) {List<User> users = userRepository.findAllById(userIds);// 使用Pipeline批量写入RedisredisTemplate.executePipelined((RedisCallback<Object>) connection -> {for (User user : users) {String key = "user:" + user.getId();byte[] keyBytes = key.getBytes();byte[] valueBytes = serializeUser(user);connection.setEx(keyBytes, 1800, valueBytes); // 30分钟过期}return null;});System.out.println("批量预热完成,预热用户数:" + users.size());}private byte[] serializeUser(User user) {// 序列化用户对象try {ObjectMapper mapper = new ObjectMapper();return mapper.writeValueAsBytes(user);} catch (Exception e) {throw new RuntimeException("序列化失败", e);}}
}
6. 缓存降级 (Cache Degradation)
6.1 什么是缓存降级
缓存降级是指当缓存系统出现故障或性能问题时,系统自动切换到备用方案,确保系统的可用性。这是一种保障系统稳定性的重要机制。
6.2 缓存降级的场景
- 缓存服务器宕机:Redis服务器不可用
- 缓存响应超时:网络延迟或服务器负载过高
- 缓存连接池耗尽:并发请求过多
- 缓存数据异常:数据损坏或格式错误
6.3 缓存降级策略
策略1:本地缓存降级
@Service
public class UserServiceWithLocalFallback {private RedisTemplate<String, Object> redisTemplate;private UserRepository userRepository;private Cache<String, User> localCache;private CircuitBreaker circuitBreaker;public UserServiceWithLocalFallback() {// 初始化本地缓存this.localCache = Caffeine.newBuilder().maximumSize(1000).expireAfterWrite(10, TimeUnit.MINUTES).build();// 初始化熔断器this.circuitBreaker = CircuitBreaker.ofDefaults("redis");}public User getUserById(Long userId) {String key = "user:" + userId;// 1. 尝试从Redis获取数据User user = circuitBreaker.executeSupplier(() -> {return (User) redisTemplate.opsForValue().get(key);});if (user != null) {// 更新本地缓存localCache.put(key, user);return user;}// 2. Redis失败,尝试本地缓存user = localCache.getIfPresent(key);if (user != null) {System.out.println("使用本地缓存降级:" + key);return user;}// 3. 本地缓存也没有,查询数据库user = userRepository.findById(userId);if (user != null) {// 同时更新本地缓存和Redis(如果可用)localCache.put(key, user);try {redisTemplate.opsForValue().set(key, user, Duration.ofMinutes(30));} catch (Exception e) {System.out.println("Redis写入失败,仅使用本地缓存");}}return user;}
}
策略2:多级缓存降级
@Service
public class MultiLevelCacheService {private RedisTemplate<String, Object> redisTemplate;private UserRepository userRepository;private Cache<String, User> l1Cache; // 一级缓存(本地)private Cache<String, User> l2Cache; // 二级缓存(本地备份)public MultiLevelCacheService() {// L1缓存:小容量,短过期时间this.l1Cache = Caffeine.newBuilder().maximumSize(500).expireAfterWrite(5, TimeUnit.MINUTES).build();// L2缓存:大容量,长过期时间this.l2Cache = Caffeine.newBuilder().maximumSize(2000).expireAfterWrite(30, TimeUnit.MINUTES).build();}public User getUserById(Long userId) {String key = "user:" + userId;// 1. 查询L1缓存User user = l1Cache.getIfPresent(key);if (user != null) {return user;}// 2. 查询Redistry {user = (User) redisTemplate.opsForValue().get(key);if (user != null) {l1Cache.put(key, user);l2Cache.put(key, user);return user;}} catch (Exception e) {System.out.println("Redis查询失败,尝试L2缓存");}// 3. 查询L2缓存user = l2Cache.getIfPresent(key);if (user != null) {l1Cache.put(key, user);System.out.println("使用L2缓存降级:" + key);return user;}// 4. 查询数据库user = userRepository.findById(userId);if (user != null) {l1Cache.put(key, user);l2Cache.put(key, user);// 尝试写入Redistry {redisTemplate.opsForValue().set(key, user, Duration.ofMinutes(30));} catch (Exception e) {System.out.println("Redis写入失败,仅使用本地缓存");}}return user;}
}
策略3:静态数据降级
@Service
public class UserServiceWithStaticFallback {private RedisTemplate<String, Object> redisTemplate;private UserRepository userRepository;private Map<Long, User> staticUserCache;@PostConstructpublic void initStaticCache() {// 初始化静态数据缓存(VIP用户、管理员等重要用户)List<User> vipUsers = userRepository.findVipUsers();staticUserCache = vipUsers.stream().collect(Collectors.toMap(User::getId, Function.identity()));System.out.println("静态缓存初始化完成,VIP用户数:" + vipUsers.size());}public User getUserById(Long userId) {String key = "user:" + userId;try {// 1. 尝试RedisUser user = (User) redisTemplate.opsForValue().get(key);if (user != null) {return user;}} catch (Exception e) {System.out.println("Redis不可用,尝试降级方案");}// 2. 检查静态缓存(重要用户)User staticUser = staticUserCache.get(userId);if (staticUser != null) {System.out.println("使用静态缓存降级:" + userId);return staticUser;}// 3. 查询数据库return userRepository.findById(userId);}/*** 定期更新静态缓存*/@Scheduled(fixedRate = 3600000) // 每小时更新public void refreshStaticCache() {try {List<User> vipUsers = userRepository.findVipUsers();Map<Long, User> newCache = vipUsers.stream().collect(Collectors.toMap(User::getId, Function.identity()));this.staticUserCache = newCache;System.out.println("静态缓存刷新完成");} catch (Exception e) {System.err.println("静态缓存刷新失败:" + e.getMessage());}}
}
策略4:限流降级
@Service
public class UserServiceWithRateLimit {private RedisTemplate<String, Object> redisTemplate;private UserRepository userRepository;private RateLimiter rateLimiter;private AtomicBoolean degraded = new AtomicBoolean(false);public UserServiceWithRateLimit() {// 限流器:每秒允许100个Redis请求this.rateLimiter = RateLimiter.create(100);}public User getUserById(Long userId) {String key = "user:" + userId;// 检查是否需要降级if (!degraded.get() && rateLimiter.tryAcquire()) {try {User user = (User) redisTemplate.opsForValue().get(key);if (user != null) {return user;}} catch (Exception e) {System.out.println("Redis异常,启用降级模式");degraded.set(true);// 10秒后尝试恢复scheduleRecovery();}}// 降级:直接查询数据库if (degraded.get()) {System.out.println("降级模式:直接查询数据库");} else {System.out.println("限流降级:跳过缓存查询");}return userRepository.findById(userId);}private void scheduleRecovery() {CompletableFuture.delayedExecutor(10, TimeUnit.SECONDS).execute(() -> {try {// 测试Redis连接redisTemplate.opsForValue().get("test:connection");degraded.set(false);System.out.println("Redis恢复正常,退出降级模式");} catch (Exception e) {System.out.println("Redis仍不可用,继续降级模式");}});}
}
6.4 降级监控和告警
监控组件
@Component
public class CacheMonitor {private RedisTemplate<String, Object> redisTemplate;private MeterRegistry meterRegistry;private Counter cacheHitCounter;private Counter cacheMissCounter;private Counter degradationCounter;public CacheMonitor(MeterRegistry meterRegistry) {this.meterRegistry = meterRegistry;this.cacheHitCounter = Counter.builder("cache.hit").description("Cache hit count").register(meterRegistry);this.cacheMissCounter = Counter.builder("cache.miss").description("Cache miss count").register(meterRegistry);this.degradationCounter = Counter.builder("cache.degradation").description("Cache degradation count").register(meterRegistry);}public void recordCacheHit() {cacheHitCounter.increment();}public void recordCacheMiss() {cacheMissCounter.increment();}public void recordDegradation(String reason) {degradationCounter.increment(Tags.of("reason", reason));}/*** 健康检查*/@Scheduled(fixedRate = 30000) // 每30秒检查public void healthCheck() {try {redisTemplate.opsForValue().get("health:check");System.out.println("Redis健康检查:正常");} catch (Exception e) {System.err.println("Redis健康检查:异常 - " + e.getMessage());recordDegradation("health_check_failed");}}/*** 获取缓存统计信息*/public Map<String, Object> getCacheStats() {Map<String, Object> stats = new HashMap<>();stats.put("hitCount", cacheHitCounter.count());stats.put("missCount", cacheMissCounter.count());stats.put("degradationCount", degradationCounter.count());double hitRate = 0.0;double totalRequests = cacheHitCounter.count() + cacheMissCounter.count();if (totalRequests > 0) {hitRate = cacheHitCounter.count() / totalRequests;}stats.put("hitRate", hitRate);return stats;}
}
6.5 降级策略配置化
@ConfigurationProperties(prefix = "cache.degradation")
@Component
public class CacheDegradationConfig {private boolean enabled = true;private int timeoutMs = 1000;private int maxRetries = 3;private boolean useLocalCache = true;private boolean useStaticCache = true;private int recoveryDelaySeconds = 10;// getter和setter方法...
}@Service
public class ConfigurableCacheService {private RedisTemplate<String, Object> redisTemplate;private UserRepository userRepository;private CacheDegradationConfig config;private Cache<String, User> localCache;public User getUserById(Long userId) {String key = "user:" + userId;if (config.isEnabled()) {try {// 设置超时时间User user = getFromRedisWithTimeout(key, config.getTimeoutMs());if (user != null) {return user;}} catch (Exception e) {return handleDegradation(userId, key, e);}}// 直接查询数据库return userRepository.findById(userId);}private User getFromRedisWithTimeout(String key, int timeoutMs) {// 实现带超时的Redis查询CompletableFuture<User> future = CompletableFuture.supplyAsync(() -> {return (User) redisTemplate.opsForValue().get(key);});try {return future.get(timeoutMs, TimeUnit.MILLISECONDS);} catch (TimeoutException e) {throw new RuntimeException("Redis查询超时", e);} catch (Exception e) {throw new RuntimeException("Redis查询失败", e);}}private User handleDegradation(Long userId, String key, Exception e) {System.out.println("Redis异常,启用降级策略:" + e.getMessage());// 尝试本地缓存if (config.isUseLocalCache()) {User user = localCache.getIfPresent(key);if (user != null) {System.out.println("使用本地缓存降级");return user;}}// 查询数据库return userRepository.findById(userId);}
}
7. 缓存最佳实践总结
7.1 设计原则
- 缓存穿透防护:使用布隆过滤器和空值缓存
- 缓存雪崩防护:设置随机过期时间和多级缓存
- 数据一致性:选择合适的缓存更新策略
- 降级保护:设计多层降级方案
- 监控告警:实时监控缓存状态
7.2 性能优化
- 使用批量操作减少网络开销
- 合理设置连接池大小
- 选择合适的序列化方式
- 控制缓存key的大小和数量
7.3 运维建议
- 定期备份重要缓存数据
- 监控缓存命中率和响应时间
- 设置合理的内存使用限制
- 建立缓存故障处理流程
通过学习这个详细的缓存教程,你应该能够:
- 理解各种缓存问题的原因和影响
- 掌握多种解决方案的实现方法
- 根据业务场景选择合适的缓存策略
- 设计可靠的缓存降级机制
- 建立完善的缓存监控体系
记住,缓存是一把双刃剑,正确使用能大幅提升系统性能,但处理不当也可能带来数据一致性问题。在实际应用中,需要根据具体的业务场景和技术要求,选择最适合的缓存策略。