专业的外贸网站建设全国各城市疫情高峰感染高峰进度
延时定时刷新Redis缓存
一、背景
- 项目需求:订阅接收一批实时数据,每分钟最高可接收120万条数据,并且分别更新到redis和数据库中;而用户请求查询消息只是低频操作。
- 资源限制:由于项目预算有限,只有4台4C16G的主机用于消费处理这些消息;
- 需求容忍程度:当http请求查询消息时,可以接受查询到几秒钟内的数据,及延时一定时间的消息。
二、方案设计
- 缓存更新策略:先更新本地缓存,然后定时刷新到redis中
- 代码实现设计:
(1)技术选型:使用caffeine或者guava缓存管理工具
(2)代码设计:设置缓存过期时间,并在实现过期时的处理接口,在该接口中将本地缓存过期的key刷新到redis缓存中;
三、代码实现
- maven引用
<dependency><groupId>com.github.ben-manes.caffeine</groupId><artifactId>caffeine</artifactId><version>2.9.3</version></dependency>
2.代码实现
- 定义接口 DelayedCache
package com.zzc.component.cache;
public interface DelayedCache<K, V> {V get(K key);void put(K key, V value);void remove(K key);}
- 定义抽象类 AbstractDelayedCache 实现通用本地缓存策略
package com.zzc.component.cache;import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Expiry;
import com.github.benmanes.caffeine.cache.LoadingCache;
import org.checkerframework.checker.index.qual.NonNegative;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;import java.util.concurrent.TimeUnit;public abstract class AbstractDelayedCache<K, V> implements DelayedCache<K, V> {/*** 延时刷新到redis的时间*/private final long delayMillis;private final int initialCapacity;private final int maximumSize;private final LoadingCache<K, V> LOCAL_CACHE;public AbstractDelayedCache(int initialCapacity, int maximumSize, long delayMillis) {this.initialCapacity = initialCapacity;this.maximumSize = maximumSize;this.delayMillis = delayMillis;LOCAL_CACHE = initLocalCache();}private LoadingCache<K, V> initLocalCache() {return Caffeine.newBuilder().expireAfter(new Expiry<K, V>() {@Overridepublic long expireAfterCreate(@NonNull K key, @NonNull V value, long currentTime) {return TimeUnit.MILLISECONDS.toNanos(delayMillis);}@Overridepublic long expireAfterUpdate(@NonNull K key, @NonNull V value, long currentTime, @NonNegative long currentDuration) {return currentDuration;}@Overridepublic long expireAfterRead(@NonNull K key, @NonNull V value, long currentTime, @NonNegative long currentDuration) {return currentDuration;}}).removalListener((key, value, cause) -> {switch (cause) {case EXPLICIT://当缓存项被显式地调用 invalidate 或 invalidateAll 方法删除时触发afterExplicit(key, value);break;case REPLACED://当一个新的值通过 put、replace 等方法替换现有的值时触发。afterReplaced(key, value);break;case COLLECTED://如果缓存使用了弱引用(weak keys 或 weak values)或软引用(soft values),并且这些引用的对象被垃圾回收器回收时触发。afterCollected(key, value);break;case EXPIRED://当缓存项达到其设定的有效期(TTL, TTI)而被自动移除时触发。afterExpired(key, value);break;case SIZE://当缓存项因为缓存大小超过限制(如最大容量或权重限制),根据驱逐策略(通常是 LRU、LFU 等)被移除时触发afterSize(key, value);break;default:break;}}).initialCapacity(initialCapacity).maximumSize(maximumSize).build(new CacheLoader<K, V>() {@Overridepublic @Nullable V load(@NonNull K key) throws Exception {return loadCache(key);}});}/*** 当缓存被显示调用 invalidate 或 invalidateAll 方法删除时触发* @param key* @param value*/protected abstract void afterExplicit(K key, V value);/*** 当缓存项被替换时触发* @param key* @param value*/protected void afterReplaced(K key, V value) {}/*** 如果缓存使用了弱引用(weak keys 或 weak values)或软引用(soft values),并且这些引用的对象被垃圾回收器回收时触发。* @param key* @param value*/protected void afterCollected(K key, V value) {}/*** 当缓存项达到其设定的有效期(TTL, TTI)而被自动移除时触发。* @param key* @param value*/protected abstract void afterExpired(K key, V value);/*** 当缓存项因为缓存大小超过限制(如最大容量或权重限制),根据驱逐策略(通常是 LRU、LFU 等)被移除时触发* @param key* @param value*/protected abstract void afterSize(K key, V value);/*** 初始化本地缓存数据,从远程获取* @param key*/protected abstract V loadCache(K key);@Overridepublic V get(K key) {return LOCAL_CACHE.get(key);}@Overridepublic void put(K key, V value) {LOCAL_CACHE.put(key, value);}@Overridepublic void remove(K key) {LOCAL_CACHE.invalidate(key);}}
- 继承实现缓存和redis的关系
package com.zzc.component.cache;
public class TestRedisCache extends AbstractDelayedCache<String, String> {/*** 初始缓存数量大小为 1000* 最大缓存数量 10000* 缓存过期时间 10000ms*/public TestRedisCache() {super(1000, 10000, 10000);}@Overrideprotected void afterExplicit(String key, String value) {//TODO 删除redis缓存 redisTemplate.del(key);}@Overrideprotected void afterExpired(String key, String value) {//TODO 更新到redis缓存 redisTemplate.setValue(key, value);}@Overrideprotected void afterSize(String key, String value) {//TODO 更新到redis缓存 redisTemplate.setValue(key, value);}@Overrideprotected String loadCache(String key) {//TODO 从redis缓存中获取key的valuereturn null;}
}
- Demo
package com.zzc.component.cache;
public class Demo {public static void main(String[] args) {TestRedisCache cache = new TestRedisCache();cache.put("key", "value");cache.get("key");cache.remove("key");}}