缓存三大问题及解决方案
缓存三大问题及解决方案
一. 缓存穿透的解决方案
缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库。
1.1 缓存空对象
当缓存没有命中时,如果从数据库查询得到的结果是“空”数据(例如某个商品不存在),则可以将这个“空”结果也缓存起来。这样下次如果再有相同的请求,就能直接从缓存获取到结果,避免再次查询数据库。
优点:
- 防止重复的数据库查询。
- 可以有效缓解高频次无效请求带来的数据库压力。
缺点:
- 可能会造成缓存中存储无效数据,占用缓存空间。
Redis查询缓存——> 命中——> 是空值则结束| |未命中 不是空值则返回商铺信息|根据id查数据库——> 不存在将空值写入Redis——> 结束|商铺存在则写入Redis|返回信息,结束
// QueryByID 根据id查询商铺数据
func (s *ShopService) QueryByID(ctx context.Context, id int64) Result {key := fmt.Sprintf("%s%d", CacheShopKey, id)// 1. 从 Redis 查询店铺数据shopJSON, err := s.rdb.Get(ctx, key).Result()var shop Shop// 2. 判断缓存是否命中if err == nil && shopJSON != "" {// 2.1 缓存命中,反序列化返回if err := json.Unmarshal([]byte(shopJSON), &shop); err == nil {return Ok(shop)}}// 2.2 缓存未命中,判断缓存中查询的数据是否为空字符串if err == nil && shopJSON == "" {// 2.2.1 当前数据是空字符串,返回失败信息(说明数据是之前缓存的空对象)return Fail("店铺不存在")}// 2.2.2 当前数据是null,则从数据库中查询店铺数据if err := s.db.WithContext(ctx).First(&shop, id).Error; err != nil {// 4.1 数据库中不存在,缓存空对象(解决缓存穿透),返回失败信息s.rdb.Set(ctx, key, "", CacheNullTTL) // 将空对象存入缓存return Fail("店铺不存在")}// 4.2 数据库存在,重建缓存,并返回店铺数据bytes, _ := json.Marshal(shop)s.rdb.Set(ctx, key, bytes, CacheShopTTL)return Ok(shop)
}
第一次查询数据库中和缓存中不存在的数据,请求经过了数据库,但是缓存了空字符串;
第二次查询(短期内),发先请求未经过数据库,直接被拦截
还可以用的方法: 请求参数校验,布隆过滤器,统一拦截机制,数据过期策略优化
二. 缓存雪崩的解决方案
缓存雪崩是指在同一时段大量的缓存key同时失效或者Redis服务宕机,导致大量请求到达数据库,带来巨大压力。
缓存雪崩的常见原因:
- 缓存过期时间集中:
如果大量缓存的过期时间设置相同或接近,缓存同时失效时,会导致大量的请求同时查询数据库,造成数据库压力骤增。 - 缓存服务器宕机:
如果整个缓存服务器宕机,或者某些节点出现故障,所有请求都会直接访问数据库,导致数据库压力激增。 - 不合理的缓存更新策略:
例如,缓存更新策略不当,导致大量缓存被清空,或缓存更新未能及时进行,造成缓存频繁失效。
解决方案
1. 缓存过期时间的随机化
缓存过期时间相同会导致大量缓存同一时间失效,触发大量数据库查询。为了避免这种情况,可以给缓存的过期时间设置 随机值,使得不同缓存的失效时间错开,从而减少同时失效的缓存数量。
示例:
假设每个缓存的默认过期时间是 10 分钟,为了避免缓存雪崩,可以将过期时间设置为 10 分钟 ± 随机的几秒或几分钟。
2. 设置合理的缓存预热
缓存预热是指系统启动或缓存初始化时,通过主动加载缓存来避免系统启动时缓存缺失的情况。常见的做法是定时或在后台任务中更新一些常用的缓存,确保热点数据始终有缓存。
3. 使用双重缓存策略(热点数据不易过期)
对于一些 热点数据,即频繁访问的数据,可以采用双重缓存策略。一个是短期缓存(常规缓存),另一个是长期缓存(如永久缓存)。当短期缓存失效时,可以从长期缓存中获取数据,减少对数据库的查询。
双重缓存策略:
- 热点数据放入长时间缓存。
- 普通数据放入较短时间的缓存。
4. 限流和熔断机制
如果缓存失效,导致大量请求涌入数据库,可能会给数据库带来巨大的压力,甚至使数据库崩溃。可以通过 限流 和 熔断机制 来减缓这种压力。
限流:
- 在请求数据库时加入限流策略,控制并发请求的数量,避免数据库崩溃。
熔断:
- 如果系统过载,可以启用熔断器,自动拒绝部分请求或降级处理,防止系统崩溃。
5. 采用分布式缓存(防止单点故障)
如果单个缓存服务器宕机,会导致所有请求都打到数据库,因此可以通过 分布式缓存 来实现多节点部署,避免单点故障影响缓存的可用性。
- 使用 Redis Sentinel 或 Redis Cluster 来实现缓存的高可用。
- 配置好缓存备份,避免主节点故障时影响缓存访问。
6. 缓存降级
当缓存失效或缓存服务器宕机时,可以启用缓存降级机制,即通过默认值或其他方式替代缓存中的数据,避免请求直接打到数据库。
三. 缓存击穿的解决方案
缓存击穿问题也叫热点Key问题,就是一个被高并发访问并且缓存重建业务较复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。
缓存中不存在某个数据,而此数据恰好在高并发的情况下同时被多个请求访问,导致多个请求直接穿透缓存,去查询数据库,从而增加数据库的压力,可能导致数据库崩溃或性能急剧下降。
缓存击穿发生在缓存未命中时,多个请求同时查询数据库,造成高并发数据库访问。
缓存击穿的典型场景:
- 某些热点数据突然失效:例如,某些高并发的请求访问某个已经过期的缓存数据,而这个缓存数据在数据库中确实存在。
- 某个特定时间段的请求高峰:多个请求同时查询相同的数据,导致数据库承受大量的并发访问。
- 缓存失效后高并发访问:缓存设置的过期时间较短,或缓存未及时更新,当缓存失效时大量请求直接访问数据库。
缓存击穿与缓存穿透的区别:
- 缓存击穿:指缓存过期或失效时,多个请求并发查询数据库,导致对数据库的重复请求。
- 缓存穿透:指查询的数据在缓存和数据库中都不存在,导致每个请求都去查询数据库,造成数据库的高压力。
3.1 使用互斥锁
当缓存失效时,可以通过 加锁 来确保同一时刻只有一个请求查询数据库,其他请求等待缓存重新加载,避免多个请求同时去访问数据库。
这样可以防止大量请求同时查询数据库。
从Redis查询缓存 <—— 休眠一段时间 <——否| |判断缓存是否命中—— 未命中,尝试获取互斥锁——判断是否获取锁| |命中 是| |返回数据 <—— 释放互斥锁 <—— 将数据写入Redis ——根据id查询数据库
需求: 修改根据id查询商铺的业务,基于互斥锁方式来解决缓存击穿问题
// QueryShopByID 带互斥锁防缓存击穿的店铺查询逻辑
func QueryShopByID(id int64) (*Result, error) {initRedis()key := fmt.Sprintf("%s%d", CacheShopKeyPrefix, id)// 1. 尝试从缓存获取result, err := getShopFromCache(key)if err != nil {return nil, err}if result != nil {return result, nil // 缓存命中}// 2. 尝试加锁,防止缓存击穿lockKey := fmt.Sprintf("%s%d", LockShopKeyPrefix, id)isLock, err := tryLock(lockKey)if err != nil {return nil, err}if !isLock {// 获取锁失败,说明有别的线程在重建缓存time.Sleep(50 * time.Millisecond)return QueryShopByID(id) // 递归重试}defer unlock(lockKey) // 无论如何都释放锁// 3. 再次检查缓存(双重检测)result, err = getShopFromCache(key)if err != nil {return nil, err}if result != nil {return result, nil}// 4. 缓存未命中,从数据库查询shop, err := getShopByIDFromDB(id)if err != nil {return nil, err}if shop == nil {// 数据库不存在,缓存空对象防止穿透redisClient.Set(ctx, key, "", CacheNullTTL)return Fail("店铺不存在"), nil}// 5. 数据库存在,重建缓存shopJSON, _ := json.Marshal(shop)redisClient.Set(ctx, key, shopJSON, CacheShopTTL)return Ok(shop), nil
}// ===================== 辅助函数 =====================// 从缓存中获取店铺数据
func getShopFromCache(key string) (*Result, error) {shopJSON, err := redisClient.Get(ctx, key).Result()if err == redis.Nil {return nil, nil // 缓存未命中}if err != nil {return nil, err}if shopJSON == "" {// 缓存命中空值(防止穿透)return Fail("店铺不存在"), nil}// 缓存命中,反序列化var shop Shopif err := json.Unmarshal([]byte(shopJSON), &shop); err != nil {return nil, err}return Ok(shop), nil
}// 模拟数据库查询
func getShopByIDFromDB(id int64) (*Shop, error) {// 实际逻辑:查询数据库if id == 100 {return &Shop{ID: 100, Name: "咖啡店"}, nil}return nil, nil // 模拟没查到
}// 尝试获取锁
func tryLock(key string) (bool, error) {ok, err := redisClient.SetNX(ctx, key, "1", 10*time.Second).Result()if err != nil {return false, err}return ok, nil
}// 释放锁
func unlock(key string) {redisClient.Del(ctx, key)
}
加锁部分函数分析
// 2. 尝试加锁,防止缓存击穿
lockKey := fmt.Sprintf("%s%d", LockShopKeyPrefix, id)
isLock, err := tryLock(lockKey)
if err != nil {return nil, err
}
if !isLock {// 获取锁失败,说明有别的线程在重建缓存time.Sleep(50 * time.Millisecond)return QueryShopByID(id) // 递归重试
}
lockKey := fmt.Sprintf("%s%d", LockShopKeyPrefix, id)
-
作用:生成一个分布式锁的 key。
-
例子:如果
LockShopKeyPrefix是"lock:shop:",id是123,那最终的 key 是:lock:shop:123 -
意义:对每个商铺的缓存重建都设置一个独立的锁,避免不同商铺之间的锁互相影响。
isLock, err := tryLock(lockKey)
-
作用:尝试获取分布式锁。
-
tryLock()一般是用 Redis 的SETNX(Set if Not Exists)实现的 -
例如:
func tryLock(key string) (bool, error) {return rdb.SetNX(ctx, key, "1", 10*time.Second).Result() }- 如果 key 不存在,则成功上锁并返回
true。 - 如果 key 已存在(说明其他线程正在重建缓存),返回
false。
- 如果 key 不存在,则成功上锁并返回
-
锁的过期时间(10s 左右) 是为了防止某个 goroutine 崩溃后锁永远不释放。
if err != nil { return nil, err }
- 作用:如果 Redis 操作出错(比如网络故障),直接返回错误,避免进一步操作。
if !isLock {
- 表示 没拿到锁,也就是别的 goroutine 已经在重建这个缓存。
time.Sleep(50 * time.Millisecond)
- 作用:稍微等待一段时间(50 毫秒),给重建缓存的 goroutine 一点时间完成操作。
- 避免频繁访问数据库造成“雪崩式”请求。
return QueryShopByID(id)
- 作用:递归调用自己,重新尝试从缓存中获取数据。
- 逻辑:
- 等待完 50ms 后,缓存有可能已经被别的线程重建完成;
- 于是再次进入函数,从 Redis 获取数据;
- 如果缓存命中,则直接返回;
- 如果还没命中,再次判断是否能拿到锁(形成循环重试机制)。
总结流程图:
↓ (缓存未命中)尝试获取分布式锁↓┌───────────────┬───────────────┐│ 获取成功 │ 获取失败(别人加锁中)↓ ↓
重建缓存并释放锁 等待50ms后递归重试↓ ↓返回数据 缓存命中后返回
释放锁函数解析
defer unlock(lockKey) // 无论如何都释放锁
之前进行了加锁,说明当前线程已经拿到了重建缓存的权力,
然后就可以去访问数据库,重构缓存及更新Redis,在这个过程中可能发生异常,比如网络错误,序列化失败等,如果你没有释放锁(没有执行 unlock()),那其他线程永远无法加锁重建缓存,缓存将一直失效,造成“死锁式击穿”
所以defer是为了确保无论函数有什么错误锁都会被释放
双重检测部分函数解析
// 3. 再次检查缓存(双重检测)result, err = getShopFromCache(key)if err != nil {return nil, err}if result != nil {return result, nil}
为什么需要双重检测
在缓存击穿的防护逻辑里,我们是这么做的:
- 查询 Redis 缓存 → 没命中
- 尝试获取锁(只有一个线程能成功)
- 拿到锁的线程去查数据库并重建缓存
- 释放锁
但问题是:
当第一个线程刚拿到锁准备去查数据库时,后面很多线程也在等锁。
当第一个线程查完数据库、重建好缓存后,第二个线程拿到锁时,它其实不需要再查数据库了。
如果没有双重检测
假设:
- 请求 A 拿到了锁,去查数据库;
- 请求 B 在等锁;
- 请求 A 查完数据库 → 更新缓存 → 释放锁;
- 请求 B 拿到锁,但它并不知道缓存已经重建了;
- 请求 B 又去查数据库。
结果:数据库被多次无意义访问
“双重检测”这一段的作用,就是在并发环境下防止重复重建缓存,确保只有一个线程真正访问数据库。
如果只看代码,可以看到之前已经查过缓存并拿到锁,之后又查一次,未命中的话又去查询数据库,有一种第二次检查无意义的感觉
但是其实——它的意义不在“自己”那一轮,而在后续并发线程那几轮。
| 时间点 | A | B | C |
|---|---|---|---|
| t1 | 查缓存 → 未命中 | 查缓存 → 未命中 | 查缓存 → 未命中 |
| t2 | 拿到锁 | 等锁 | 等锁 |
| t3 | A 查数据库 + 重建缓存 | 等锁 | 等锁 |
| t4 | A 释放锁 | B 拿到锁 | 等锁 |
| t5 | B 再次查缓存(双检) → 命中,直接返回 | 等锁 | |
| t6 | C 拿到锁 | 查缓存(双检)→ 命中,直接返回 | 同理 |
| 最终 | 只有 A 查了一次数据库 |
“第二次检查缓存 是为了后面那些拿到锁的线程,而不是当前线程。”
- 第一个线程(A):确实两次都没命中缓存,所以最终访问数据库;
- 第二个线程(B):等锁时缓存已经被A重建;
- 它拿到锁后再次检查缓存,就能命中;
- 从而避免了再去访问数据库;
- 第三个线程(C):同理。
3.2 使用逻辑过期方式
判断缓存是否命中——命中——判断缓存是否过期——过期(未过期则直接返回信息)——尝试获取互斥锁——判断是否获取锁——是(若无需获取锁直接返回缓存中的信息)——开启独立线程(根据id查数据库——将数据写入redis并设置逻辑过期时间——释放互斥锁)——返回信息未命中——返回空值
什么是逻辑过期
逻辑过期是将缓存数据设置一个过期标记。这个过期标记并不是真正的过期时间,而是缓存数据中自定义的一部分字段,标记缓存是否已经过期。只有当业务逻辑判断数据过期时,才去查询数据库。
如何使用逻辑过期
在缓存中保存一个时间戳(expireAt),它表示缓存数据的过期时间。每次查询缓存时,不仅检查缓存是否存在,还需要检查该缓存的数据是否已经过期。
逻辑过期防止缓存击穿的工作流程
- 缓存中的数据存在,但判断数据是否“逻辑过期”。
- 如果数据没有过期,直接返回缓存数据。
- 如果数据过期,且当前没有其他请求正在更新缓存,则触发数据库查询并更新缓存;其他请求等待缓存更新。
首先创建一个管理数据和过期时间的结构体
// 定义缓存的数据结构
type RedisData struct {Data *Shop `json:"data"` // 店铺数据ExpireAt int64 `json:"expireAt"` // 逻辑过期时间戳
}
// 常量定义
const (CACHE_SHOP_KEY_PREFIX = "cache:shop:"LOCK_SHOP_KEY_PREFIX = "lock:shop:"CACHE_SHOP_LOGICAL_TTL = 10 * time.MinuteCACHE_REBUILD_WORKERS_NUM = 10
)// 缓存重建线程池
var cacheRebuildPool = make(chan struct{}, CACHE_REBUILD_WORKERS_NUM)
var mu sync.Mutex// 查询商铺数据(逻辑过期 + 异步重建)
func QueryShopByID(ctx context.Context, id int64) (*Shop, error) {key := fmt.Sprintf("%s%d", CACHE_SHOP_KEY_PREFIX, id)// 1. 从 Redis 查询缓存shopJson, err := rdb.Get(ctx, key).Result()if err == redis.Nil {// 缓存未命中return nil, fmt.Errorf("shop data not found")} else if err != nil {return nil, err}// 2. 解析 RedisDatavar redisData RedisDataif err := json.Unmarshal([]byte(shopJson), &redisData); err != nil {return nil, err}// 3. 反序列化商铺数据var shop ShopdataBytes, _ := json.Marshal(redisData.Data)if err := json.Unmarshal(dataBytes, &shop); err != nil {return nil, err}// 4. 判断逻辑过期时间if time.Now().Unix() < redisData.ExpireTime {// 缓存未过期,直接返回return &shop, nil}// 5. 缓存过期,尝试获取互斥锁lockKey := fmt.Sprintf("%s%d", LOCK_SHOP_KEY_PREFIX, id)ok, err := TryLock(ctx, lockKey, 10*time.Second)if err != nil {return &shop, nil // 返回旧数据}if ok {// 获取锁成功,异步重建缓存go func() {defer Unlock(ctx, lockKey)RebuildCache(ctx, id)}()}// 返回过期数据(保证高可用)return &shop, nil
}// 异步重建缓存
func RebuildCache(ctx context.Context, id int64) {select {case cacheRebuildPool <- struct{}{}:defer func() { <-cacheRebuildPool }()default:// 线程池已满,直接返回return}shop, err := GetShopFromDB(id)if err != nil {log.Printf("failed to query DB for shop %d: %v", id, err)return}// 构造逻辑过期数据data := RedisData{Data: shop,ExpireTime: time.Now().Add(CACHE_SHOP_LOGICAL_TTL).Unix(),}jsonData, _ := json.Marshal(data)key := fmt.Sprintf("%s%d", CACHE_SHOP_KEY_PREFIX, id)// 写入 Rediserr = rdb.Set(ctx, key, jsonData, 0).Err()if err != nil {log.Printf("failed to set cache for shop %d: %v", id, err)}
}// 尝试获取锁
func TryLock(ctx context.Context, key string, ttl time.Duration) (bool, error) {return rdb.SetNX(ctx, key, "1", ttl).Result()
}// 释放锁
func Unlock(ctx context.Context, key string) error {return rdb.Del(ctx, key).Err()
}
如果数据量大且希望访问时减少延迟,可以进行数据预热
预热缓存的函数
func preloadCache() {// 预热店铺数据shopList, err := queryShopFromDB()if err != nil {log.Fatalf("Failed to query shop data: %v", err)}// 将每个店铺数据写入缓存for _, shop := range shopList {key := fmt.Sprintf("shop:%d", shop.ID)cachedShop := CachedShop{Data: shop,ExpireAt: time.Now().Add(10 * time.Minute).Unix(), // 设置逻辑过期时间}// 将缓存数据存储到 RedisjsonData, err := json.Marshal(cachedShop)if err != nil {log.Printf("Failed to marshal cached shop data: %v", err)continue}// 将数据写入 RedisredisClient.Set(key, jsonData, 0) // 0 表示永不过期(实际过期由 expireAt 控制)}
}
- 启动时调用预热函数
func main() {// 连接到 Redis 等初始化操作// 调用预热缓存的函数preloadCache()// 启动 HTTP 服务或其他逻辑log.Println("Starting application...")// Your server code here...
}
异步重建缓存函数分析
为什么需要异步重建缓存?
- 避免阻塞用户请求
- 当缓存过期时,如果直接在请求线程里查询数据库重建缓存,会导致用户请求被阻塞,尤其是数据库查询慢或者高并发访问时。
- 异步重建允许请求直接返回过期缓存,保证响应速度和系统可用性。
- 防止缓存击穿
- 多个请求同时发现缓存过期,如果同步重建,会瞬间压垮数据库。
- 异步重建 + 分布式锁确保只有一个线程去刷新缓存,其它线程可以继续使用旧缓存。
- 控制并发重建数量
- 使用
cacheRebuildPool控制同时执行的缓存重建线程数量,避免缓存重建操作无限增长导致系统资源被耗尽。
- 使用
- 逻辑过期策略配合异步重建
- 返回旧缓存给用户,保证可用性;
- 异步更新缓存,保证数据最终一致性;
- 不会影响请求响应时间,也避免数据库瞬间并发冲击。
// 异步重建缓存
func RebuildCache(ctx context.Context, id int64) {//1. 使用channel控制并发,限制线程池大小select {case cacheRebuildPool <- struct{}{}:defer func() { <-cacheRebuildPool }()default:// 线程池已满,直接返回return}/*当缓存逻辑过期时,需要从数据库获取最新数据。
如果查询数据库失败,直接返回,不重建缓存。*/shop, err := GetShopFromDB(id)if err != nil {log.Printf("failed to query DB for shop %d: %v", id, err)return}// 构造逻辑过期数据/*将数据库查询到的 Shop 数据封装到 RedisData 中,同时设置 逻辑过期时间。
逻辑过期时间表示这条缓存什么时候被认为“过期”,方便下次访问判断。*/data := RedisData{Data: shop,ExpireTime: time.Now().Add(CACHE_SHOP_LOGICAL_TTL).Unix(),}/*把封装好的 RedisData 序列化成 JSON,存入 Redis。
Redis key 使用商铺 ID 作为后缀,方便查询。*/jsonData, _ := json.Marshal(data)key := fmt.Sprintf("%s%d", CACHE_SHOP_KEY_PREFIX, id)// 写入 Redis/*将逻辑过期缓存写入 Redis,过期控制由 ExpireTime 字段管理,而不是 Redis TTL。写入失败时记录日志,便于排查问题。*/err = rdb.Set(ctx, key, jsonData, 0).Err()if err != nil {log.Printf("failed to set cache for shop %d: %v", id, err)}
}
cacheRebuildPool 是一个缓冲通道,大小固定(比如 10),用来限制同时进行缓存重建的 goroutine 数量。
select + default 确保当线程池已满时,新的缓存重建请求不会阻塞或无限等待。
defer func(){<-cacheRebuildPool}() 是释放线程池资源,确保执行完 goroutine 后释放位置。
逻辑过期时间根据具体业务而定,逻辑过期过长,会造成缓存数据的堆积,浪费内存,过短造成频繁缓存重建,降低性能,所以设置逻辑过期时间时需要实际测试和评估不同参数下的性能和资源消耗情况
