缓存工具服务(封装缓存击穿+缓存穿透+缓存雪崩)
整合方案设计思路
- 封装一个高级缓存服务 (AdvancedCacheService):将所有逻辑(查询、空值缓存、锁、过期策略)封装在内,对上游业务代码透明。
- 使用分布式锁:解决缓存击穿,确保只有一个请求重建缓存。
- 缓存空值:解决缓存穿透,并设置较短的过期时间。
- 差异化过期时间:解决缓存雪崩,为基础过期时间添加随机扰动。
- 降级策略:当获取锁失败或数据库查询异常时,具备降级能力(如返回旧值或null)。
①、分布式锁服务
@Service
@RequiredArgsConstructor
public class RedisDistributedLock {private final RedisTemplate<String, Object> redisTemplate;/*** 尝试获取分布式锁** @param lockKey 锁的Key* @param requestId 请求标识(可用UUID)* @param expireTime 锁的持有时间* @param timeUnit 时间单位* @return 是否获取成功*/public boolean tryLock(String lockKey, String requestId, long expireTime, TimeUnit timeUnit) {// 使用SET命令:NX表示不存在时才设置,PX设置过期时间(毫秒)return Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(lockKey,requestId,expireTime,timeUnit));}/*** 释放分布式锁(使用Lua脚本保证原子性)** @param lockKey 锁的Key* @param requestId 请求标识* @return 是否释放成功*/public boolean releaseLock(String lockKey, String requestId) {// Lua脚本:判断锁的值是否还是自己的,是则删除String luaScript ="if redis.call('get', KEYS[1]) == ARGV[1] then " +"return redis.call('del', KEYS[1]) " +"else " +"return 0 " +"end";Long result = redisTemplate.execute(new DefaultRedisScript<>(luaScript, Long.class),Collections.singletonList(lockKey),requestId);return result != null && result == 1;}/*** 生成一个锁请求ID*/public String generateRequestId() {return UUID.randomUUID().toString();}
}
②、高级缓存服务(整合三大问题的解决方案)
@Slf4j
@Service
@RequiredArgsConstructor
public class AdvancedCacheService {private final RedisTemplate<String, Object> redisTemplate;private final RedisDistributedLock distributedLock;// 空值占位符,用于解决缓存穿透private static final String NULL_PLACEHOLDER = "__NULL__";// 默认锁过期时间(防止死锁)private static final long DEFAULT_LOCK_EXPIRE_TIME = 10L;// 默认缓存基础过期时间private static final long DEFAULT_CACHE_EXPIRE_TIME = 3600L;// 空值缓存过期时间(较短)private static final long NULL_CACHE_EXPIRE_TIME = 300L;// 缓存过期时间随机上限(解决雪崩,5分钟)private static final long RANDOM_EXPIRE_BOUND = 300L;/*** 获取或计算缓存值(整合三大问题解决方案的核心方法)** @param key 缓存键* @param valueLoader 缓存未命中时的数据加载器(例如:从数据库查询的方法)* @param clazz 返回值类型* @param <T> 泛型类型* @return 缓存的值或计算后的值*/public <T> T getOrCalculate(String key, Supplier<T> valueLoader, Class<T> clazz) {return getOrCalculate(key, valueLoader, clazz, DEFAULT_CACHE_EXPIRE_TIME, TimeUnit.SECONDS);}/*** 重载方法,可自定义过期时间*/public <T> T getOrCalculate(String key, Supplier<T> valueLoader, Class<T> clazz, long baseExpireTime, TimeUnit timeUnit) {// 1. 首先尝试从缓存读取Object cachedValue = redisTemplate.opsForValue().get(key);// 2. 缓存命中if (cachedValue != null) {// 2.1 如果命中空值占位符,返回null(解决穿透)if (NULL_PLACEHOLDER.equals(cachedValue)) {log.debug("Cache hit null placeholder for key: {}", key);return null;}// 2.2 命中有效值,直接返回log.debug("Cache hit for key: {}", key);return clazz.cast(cachedValue);}// 3. 缓存未命中,准备解决【缓存击穿】问题String lockKey = "lock:" + key; // 锁的KeyString requestId = distributedLock.generateRequestId();T value;try {// 3.1 尝试获取分布式锁boolean isLockAcquired = distributedLock.tryLock(lockKey, requestId, DEFAULT_LOCK_EXPIRE_TIME, TimeUnit.SECONDS);if (!isLockAcquired) {// 3.2 获取锁失败,说明有其他线程正在重建缓存log.warn("Failed to acquire lock for key: {}, waiting and retrying...", key);// 策略:短暂休眠后重试,避免恶性循环(这里重试一次,也可实现更复杂的重试逻辑)try {Thread.sleep(100); // 休眠100毫秒} catch (InterruptedException e) {Thread.currentThread().interrupt();}// 重试一次缓存获取Object retryValue = redisTemplate.opsForValue().get(key);if (retryValue != null) {return NULL_PLACEHOLDER.equals(retryValue) ? null : clazz.cast(retryValue);}// 如果重试仍未获取到,可返回null或抛出异常(根据业务降级策略)log.error("Cache breakdown occurred for key: {}, after retry still failed.", key);return null; // 降级策略:直接返回null}// 3.3 获取锁成功,再次检查缓存(Double Check Locking 模式)log.info("Lock acquired for key: {}, recalculating...", key);cachedValue = redisTemplate.opsForValue().get(key);if (cachedValue != null) {return NULL_PLACEHOLDER.equals(cachedValue) ? null : clazz.cast(cachedValue);}// 4. 【缓存穿透】保护:执行数据加载器(如查询数据库)try {value = valueLoader.get();} catch (Exception e) {log.error("Error executing value loader for key: {}", key, e);// 如果数据库查询异常,可以设置一个极短的空值缓存,防止持续穿透setCache(key, NULL_PLACEHOLDER, 30L, TimeUnit.SECONDS);throw e; // 或者返回null,根据业务决定}// 5. 写入缓存if (value == null) {// 5.1 数据库查询结果为空,缓存空值(解决穿透)setCache(key, NULL_PLACEHOLDER, NULL_CACHE_EXPIRE_TIME, TimeUnit.SECONDS);} else {// 5.2 数据库有值,写入缓存并设置【差异化过期时间】(解决雪崩)long finalExpireTime = baseExpireTime + ThreadLocalRandom.current().nextLong(0, RANDOM_EXPIRE_BOUND);setCache(key, value, finalExpireTime, timeUnit);}return value;} finally {// 6. 无论如何,释放锁if (requestId != null) {distributedLock.releaseLock(lockKey, requestId);}}}/*** 设置缓存值,统一入口*/private void setCache(String key, Object value, long expireTime, TimeUnit timeUnit) {try {redisTemplate.opsForValue().set(key, value, expireTime, timeUnit);} catch (Exception e) {log.error("Failed to set cache for key: {}", key, e);// 缓存写入失败不应影响主流程,只记录日志}}/*** 删除缓存(用于数据更新时)*/public void evict(String key) {try {redisTemplate.delete(key);} catch (Exception e) {log.error("Failed to evict cache for key: {}", key, e);}}
}
③、业务中使用
@Service
@RequiredArgsConstructor
public class ProductService {private final ProductMapper productMapper;private final AdvancedCacheService advancedCacheService;public Product getProductById(Long id) {String cacheKey = "product:" + id;// 使用高级缓存服务,只需关注业务逻辑(数据加载器)return advancedCacheService.getOrCalculate(cacheKey,() -> {// 这个Supplier只在缓存未命中且成功获取锁后执行log.info("Querying product from database, id: {}", id);return productMapper.selectById(id);},Product.class,1L, TimeUnit.HOURS // 基础过期时间1小时);}public void updateProduct(Product product) {// 1. 更新数据库productMapper.updateById(product);// 2. 删除缓存(后续查询会自动重建)String cacheKey = "product:" + product.getId();advancedCacheService.evict(cacheKey);}public void deleteProduct(Long id) {// 1. 删除数据库记录productMapper.deleteById(id);// 2. 删除缓存String cacheKey = "product:" + id;advancedCacheService.evict(cacheKey);}
}
对上述业务进行监控
①、集成监控依赖
<!-- Micrometer 核心 -->
<dependency><groupId>io.micrometer</groupId><artifactId>micrometer-core</artifactId>
</dependency>
<!-- Micrometer Prometheus 注册中心 -->
<dependency><groupId>io.micrometer</groupId><artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
<!-- Actuator (提供/metrics端点) -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
management:endpoints:web:exposure:include: health, info, metrics, prometheusmetrics:tags:application: ${spring.application.name} # 为所有指标添加应用标签distribution:percentiles-histogram:"[spring.data.redis.used.memory]": true # 开启直方图
②、改造高级缓存服务,集成指标收集
@Slf4j
@Service
@RequiredArgsConstructor
public class AdvancedCacheService {private final RedisTemplate<String, Object> redisTemplate;private final RedisDistributedLock distributedLock;private final MeterRegistry meterRegistry; // Micrometer 指标注册中心// ... (之前的常量定义保持不变) ...// 定义指标Tag常量private static final String TAG_CACHE_NAME = "cache.name";private static final String TAG_OPERATION = "operation";private static final String TAG_RESULT = "result";private static final String TAG_HIT = "hit";private static final String TAG_MISS = "miss";private static final String TAG_SUCCESS = "success";private static final String TAG_FAILURE = "failure";// 定义指标名称private static final String METRIC_CACHE_OPERATION = "cache.operation";private static final String METRIC_CACHE_HIT = "cache.hit";private static final String METRIC_CACHE_LOAD_TIME = "cache.load.time";private static final String METRIC_LOCK_ACQUIRE_TIME = "cache.lock.acquire.time";private static final String METRIC_LOCK_HOLD_TIME = "cache.lock.hold.time";private static final String METRIC_DB_QUERY = "cache.db.query";/*** 获取或计算缓存值(集成监控版本)*/public <T> T getOrCalculate(String key, Supplier<T> valueLoader, Class<T> clazz, long baseExpireTime, TimeUnit timeUnit) {String cacheName = extractCacheName(key); // 从key中提取缓存名,如"product"Timer.Sample totalTimer = Timer.start(meterRegistry); // 开始总耗时计时try {Object cachedValue = redisTemplate.opsForValue().get(key);if (cachedValue != null) {if (NULL_PLACEHOLDER.equals(cachedValue)) {// 记录命中空值recordCacheHit(cacheName, "null");return null;}// 记录命中有效值recordCacheHit(cacheName, "hit");return clazz.cast(cachedValue);}// 记录未命中recordCacheHit(cacheName, "miss");String lockKey = "lock:" + key;String requestId = distributedLock.generateRequestId();T value;long lockWaitStartTime = System.currentTimeMillis();try {boolean isLockAcquired = distributedLock.tryLock(lockKey, requestId, DEFAULT_LOCK_EXPIRE_TIME, TimeUnit.SECONDS);long lockAcquireTime = System.currentTimeMillis() - lockWaitStartTime;// 记录锁等待时间recordLockAcquireTime(cacheName, lockAcquireTime, isLockAcquired);if (!isLockAcquired) {log.warn("Failed to acquire lock for key: {}", key);// 记录锁竞争失败recordLockCompetition(cacheName, false);// ... (重试逻辑保持不变) ...return null;}// 记录锁竞争成功recordLockCompetition(cacheName, true);// Double CheckcachedValue = redisTemplate.opsForValue().get(key);if (cachedValue != null) {return NULL_PLACEHOLDER.equals(cachedValue) ? null : clazz.cast(cachedValue);}long loadStartTime = System.currentTimeMillis();try {value = valueLoader.get();// 记录数据库查询成功recordDbQuery(cacheName, true, System.currentTimeMillis() - loadStartTime);} catch (Exception e) {// 记录数据库查询失败recordDbQuery(cacheName, false, System.currentTimeMillis() - loadStartTime);setCache(key, NULL_PLACEHOLDER, 30L, TimeUnit.SECONDS);throw e;}if (value == null) {setCache(key, NULL_PLACEHOLDER, NULL_CACHE_EXPIRE_TIME, TimeUnit.SECONDS);} else {long finalExpireTime = baseExpireTime + ThreadLocalRandom.current().nextLong(0, RANDOM_EXPIRE_BOUND);setCache(key, value, finalExpireTime, timeUnit);}return value;} finally {if (requestId != null) {long lockHoldTime = System.currentTimeMillis() - lockWaitStartTime;// 记录锁持有时间recordLockHoldTime(cacheName, lockHoldTime);distributedLock.releaseLock(lockKey, requestId);}}} finally {// 记录总操作耗时totalTimer.stop(Timer.builder(METRIC_CACHE_OPERATION).tags(TAG_CACHE_NAME, cacheName, TAG_OPERATION, "getOrCalculate").register(meterRegistry));}}// --- 监控指标记录方法 ---private void recordCacheHit(String cacheName, String result) {Counter.builder(METRIC_CACHE_HIT).tags(TAG_CACHE_NAME, cacheName, TAG_RESULT, result).register(meterRegistry).increment();}private void recordLockAcquireTime(String cacheName, long millis, boolean success) {Timer.builder(METRIC_LOCK_ACQUIRE_TIME).tags(TAG_CACHE_NAME, cacheName, TAG_RESULT, success ? TAG_SUCCESS : TAG_FAILURE).register(meterRegistry).record(millis, TimeUnit.MILLISECONDS);}private void recordLockHoldTime(String cacheName, long millis) {Timer.builder(METRIC_LOCK_HOLD_TIME).tags(TAG_CACHE_NAME, cacheName).register(meterRegistry).record(millis, TimeUnit.MILLISECONDS);}private void recordLockCompetition(String cacheName, boolean success) {Counter.builder("cache.lock.competition").tags(TAG_CACHE_NAME, cacheName, TAG_RESULT, success ? TAG_SUCCESS : TAG_FAILURE).register(meterRegistry).increment();}private void recordDbQuery(String cacheName, boolean success, long durationMs) {Timer.builder(METRIC_DB_QUERY).tags(TAG_CACHE_NAME, cacheName, TAG_RESULT, success ? TAG_SUCCESS : TAG_FAILURE).register(meterRegistry).record(durationMs, TimeUnit.MILLISECONDS);}private String extractCacheName(String key) {// 简单实现:从"product:123"中提取"product"int index = key.indexOf(":");return index > 0 ? key.substring(0, index) : "unknown";}// ... setCache 和 evict 方法也需要添加类似的监控 ...
}
③、配置Prometheus和Grafana
prometheus.yml 配置
scrape_configs:- job_name: 'springboot-cache-app'metrics_path: '/actuator/prometheus'scrape_interval: 15s # 抓取间隔static_configs:- targets: ['your-app-host:8080'] # 应用服务器地址labels:application: 'user-service-cache'
Grafana 仪表盘JSON
创建一个全面的缓存监控看板,包含以下面板:
Formula: (sum(rate(cache_hit_total{cache_name="$cache_name", result="hit"}[5m])) + sum(rate(cache_hit_total{cache_name="$cache_name", result="null"}[5m]))) / sum(rate(cache_hit_total{cache_name="$cache_name"}[5m])) * 100
- QPS & 延迟面板: 显示缓存操作QPS和P99/P95延迟
- 锁竞争面板: 显示锁等待时间、锁竞争成功率
- 数据库查询面板: 显示缓存触发的数据库查询次数和延迟
- Redis内存面板: 显示Redis内存使用情况(需配置Redis Exporter)
④、设置告警规则 (prometheus-rules.yml)
groups:
- name: cache-alertsrules:- alert: CacheHitRateLowexpr: (sum(rate(cache_hit_total{result="hit"}[5m])) by (cache_name) + sum(rate(cache_hit_total{result="null"}[5m])) by (cache_name)) / sum(rate(cache_hit_total[5m])) by (cache_name) * 100 < 80for: 5mlabels:severity: warningannotations:summary: "缓存命中率过低 (instance {{ $labels.instance }})"description: "缓存 {{ $labels.cache_name }} 命中率低于80%,当前为 {{ $value }}%"- alert: LockCompetitionHighexpr: rate(cache_lock_competition_total[5m]) > 10for: 2mlabels:severity: warningannotations:summary: "缓存锁竞争激烈 (instance {{ $labels.instance }})"description: "缓存 {{ $labels.cache_name }} 锁竞争频率过高,可能存在热点Key或锁过期时间设置不合理"- alert: DbQueryRateHighexpr: rate(cache_db_query_seconds_count[5m]) > 50for: 2mlabels:severity: warningannotations:summary: "缓存层数据库查询过高 (instance {{ $labels.instance }})"description: "缓存 {{ $labels.cache_name }} 触发的数据库查询频率过高,可能大量缓存未命中"- alert: LockAcquireSlowexpr: histogram_quantile(0.95, rate(cache_lock_acquire_time_seconds_bucket[5m])) > 0.5for: 5mlabels:severity: warningannotations:summary: "分布式锁获取缓慢 (instance {{ $labels.instance }})"description: "缓存锁获取P95延迟超过500ms,当前为 {{ $value }}s"
⑤、持续调优策略
根据监控数据,实施以下调优策略:
- 调优缓存命中率
- 问题: 命中率 < 80%
行动:
- 分析 cache.hit 指标,找出命中率低的缓存名
- 检查该缓存的过期时间是否过短:increase(cache_operation_seconds_count{cache_name=“product”,
operation=“getOrCalculate”}[1h]) /
increase(cache_db_query_seconds_count{cache_name=“product”}[1h]) - 优化缓存Key设计,确保热点数据被正确缓存
- 考虑增加本地缓存(Caffeine)作为一级缓存
- 调优锁竞争
- 问题: 锁竞争频率高或等待时间长
行动:
- 分析 cache.lock.competition 和 cache.lock.acquire.time
- 对于极热点数据,使用逻辑过期而非互斥锁:在缓存值中存储逻辑过期时间,异步更新
- 调整锁过期时间 DEFAULT_LOCK_EXPIRE_TIME,避免持有时间过长或过短
- 实现锁获取的退避策略(如指数退避)
- 调优数据库负载
问题: cache.db.query 频率过高
行动:
- 优化 valueLoader 中的SQL查询,添加数据库索引
- 对批量查询实现缓存预热
- 考虑使用 Read-Through 模式,由缓存层统一管理数据加载
- 调优Redis内存
问题: 内存使用率 > 80%
行动:
- 分析大Key:redis-cli --bigkeys
- 优化数据结构:使用Hash代替多个String存储对象属性
- 设置适当的内存淘汰策略:maxmemory-policy allkeys-lru
- 考虑分片或集群部署