营销型网站建设排名网站建设公司需要申请icp吗
前言
早期的实现可能会使用 SETNX 和 EXPIRE 两条命令。这是一个巨大的隐患,因为在 SETNX 成功和 EXPIRE 执行之间,如果客户端崩溃,锁将永远不会过期。
1.基础防护 - 缓存空对象
当从数据库查询一个数据为空时,我们不直接返回,而是将一个特殊的“空对象”或“空字符串”存入缓存,并设置一个较短的过期时间(比如几十秒到几分钟)。不过这样如果数据库中该 Key 后来又有了数据,缓存层在过期前无法感知,存在数据不一致的风险。
2. 分布式互斥锁
这是解决缓存击穿最常用、也是最核心的方案。当缓存未命中时,不是立刻去查数据库,而是先尝试获取一个与该缓存键关联的分布式锁。
- 获取锁成功的线程:它获得了查询数据库的唯一权限。它会去查询数据库,将数据写入缓存,最后释放锁。
- 获取锁失败的线程:它不会去查数据库,而是会等待一小段时间(比如自旋或休眠),然后重新尝试从缓存中获取数据。这时候,很可能第一个线程已经将数据写入缓存了。
最常见的实现是使用 Redis 的 SETNX (SET if Not eXists) 命令。ET key value NX PX milliseconds
这个原子命令是实现分布式锁的最佳实践。其中 NX 代表只有当 Key 不存在时才设置,保证了原子性。PX milliseconds代表设置一个带毫秒级过期时间的 Key,这至关重要,它可以防止因服务宕机等异常情况导致锁无法被释放,从而造成死锁。
// 伪代码,重点体现思想
public String getData(String key) {// 1. 从缓存获取数据String value = redis.get(key);if (value != null) {return value;}// 2. 尝试获取分布式锁String lockKey = "lock:" + key;// SETNX + EXPIRE 原子操作,防止死锁String lockValue = UUID.randomUUID().toString(); // 锁的值使用唯一ID,用于安全释放Boolean locked = redis.set(lockKey, lockValue, "NX", "PX", 30000); // 尝试加锁,30秒过期if (locked) {try {// 3. 获取锁成功,查询数据库value = db.query(key);if (value != null) {// 数据库有值,写入缓存redis.set(key, value, "EX", 3600); // 缓存1小时} else {// 数据库无值,缓存空对象,防止缓存穿透redis.set(key, "", "EX", 60); // 缓存空值1分钟}return value;} finally {// 4. 释放锁(必须在 finally 块中)// 使用 Lua 脚本保证原子性,防止误删他人的锁String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";redis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(lockValue));}} else {// 5. 获取锁失败,休眠后重试try {Thread.sleep(50); // 休眠50毫秒} catch (InterruptedException e) {Thread.currentThread().interrupt();}return getData(key); // 递归或循环重试}
}
关于获取锁失败的线程会休眠后重试
如果有 1000 个线程都在休眠和重试,这本身也是一种资源浪费。让成百上千的线程去 sleep 然后重试,这种方式我们称之为忙等或自旋。虽然避免了直接冲击数据库,但这些线程本身在不断地“空转”,它们会频繁地被唤醒、抢占CPU时间片、检查锁、然后再次休眠。这在高并发下会造成:
- CPU资源浪费:大量的上下文切换是昂贵的。
- 惊群效应:当锁被释放的瞬间,所有等待的线程被同时唤醒,一起去争抢锁或访问缓存,这又会对 CPU 和 Redis 造成一波新的冲击。
- 响应延迟增加:后来的请求需要等待一个固定的sleep时间,无法在数据准备好的第一时间就得到响应。
可以采用JVM 内部锁 + Future 机制或者Redis 的 Pub/Sub 机制来优化
这里就说下JVM 内部锁 + Future 机制
单机环境下的极致优化(JVM 内部锁 + Future 机制)
如果你的服务集群规模不大,或者有信心通过一致性哈希等路由策略,让同一个热点 Key 的请求尽可能落到同一台服务器实例上,那么在单机维度进行优化是最高效的
假设我们有这样一个场景:根据商品 ID 查询商品信息
// 伪代码:数据库查询的DAO
interface ProductDao {Product selectById(String id);
}// 伪代码:商品实体
class Product {// fields...
}
这个方案的核心是在单个 JVM 内部,用一个 Map 来持有正在加载数据的 Future,后续线程直接等待这个 Future 的结果,从而避免了轮询。
代码如下:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;@Service
public class CacheServiceV1 {@Autowiredprivate StringRedisTemplate redisTemplate;@Autowiredprivate ProductDao productDao;// 关键:在JVM内存中维护一个Map,存储正在进行的加载任务private final ConcurrentHashMap<String, CompletableFuture<String>> promiseMap = new ConcurrentHashMap<>();private static final String LOCK_PREFIX = "lock:";private static final long LOCK_EXPIRE_TIME = 30; // 分布式锁过期时间,秒private static final long CACHE_EXPIRE_TIME = 3600; // 正常缓存过期时间,秒private static final long CACHE_NULL_EXPIRE_TIME = 60; // 空值缓存过期时间,秒public String getProductData(String productId) {String key = "product:" + productId;// 1. 先从缓存查询String cachedValue = redisTemplate.opsForValue().get(key);if (StringUtils.hasText(cachedValue)) {return cachedValue;}// 2. 尝试获取分布式锁String lockKey = LOCK_PREFIX + key;String lockValue = UUID.randomUUID().toString();Boolean locked = redisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, LOCK_EXPIRE_TIME, TimeUnit.SECONDS);if (locked != null && locked) {// 3. 获取锁成功// 关键:创建一个CompletableFuture并放入MapCompletableFuture<String> future = new CompletableFuture<>();promiseMap.put(key, future);try {// 3.1 查询数据库String dbValue = productDao.selectById(productId) != null ? "some_product_data" : null; // 假设查询结果// 3.2 写入缓存if (dbValue != null) {redisTemplate.opsForValue().set(key, dbValue, CACHE_EXPIRE_TIME, TimeUnit.SECONDS);} else {redisTemplate.opsForValue().set(key, "", CACHE_NULL_EXPIRE_TIME, TimeUnit.SECONDS); // 缓存空值}// 3.3 完成Future,唤醒所有等待者future.complete(dbValue);return dbValue;} finally {// 3.4 清理工作:从Map中移除Future,并释放锁(Lua脚本保证原子性)promiseMap.remove(key);String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";redisTemplate.execute(script, java.util.Collections.singletonList(lockKey), lockValue);}} else {// 4. 获取锁失败,说明有其他线程正在加载try {// 4.1 从Map中获取Future并等待结果CompletableFuture<String> future = promiseMap.get(key);if (future != null) {// 等待Future完成,设置一个超时时间防止永久等待return future.get(5, TimeUnit.SECONDS);} else {// 如果Future不存在(可能刚被移除),则短暂休眠后重试整个方法TimeUnit.MILLISECONDS.sleep(50);return getProductData(productId); // 重试}} catch (Exception e) {// 异常处理,比如超时后可以再次尝试获取数据Thread.currentThread().interrupt();// 可以选择重试或返回兜底数据return getProductData(productId);}}}
}
其中
- promiseMap:这是核心,一个 ConcurrentHashMap 保证了线程安全。它存储了从 productId 到 CompletableFuture 的映射。
- 获取锁成功 (if locked):
立即创建 Future 并放入 Map:这是关键一步,在查询数据库之前就占位。这样,其他线程即使获取锁失败,也能立刻在 Map 中找到这个“凭证”。
future.complete(dbValue):当数据库操作完成,数据写入缓存后,调用此方法。所有调用了 future.get() 的线程都会被唤醒,并收到 dbValue 这个结果。
finally 块:确保无论成功还是异常,Map 中的 Future 都被移除,并且分布式锁被安全释放。 - 获取锁失败 (else):
不再 sleep:而是直接去 promiseMap 里找对应的 Future。
future.get(…):这是一个阻塞操作,但它不消耗 CPU。当前线程会进入 WAITING 状态,由 JVM 管理,直到 Future 被 complete。
兜底重试:如果 Future 为 null(可能在获取 Future 的瞬间,持有锁的线程刚好执行完并移除了它),或者等待超时,最简单的策略就是重试整个方法。