Java延时定时刷新Redis缓存
延时定时刷新Redis缓存
一、背景
- 项目需求:订阅接收一批实时数据,每分钟最高可接收120万条数据,并且分别更新到redis和数据库中;而用户请求查询消息只是低频操作。
- 资源限制:由于项目预算有限,只有4台4C16G的主机用于消费处理这些消息;
- 需求容忍程度:当http请求查询消息时,可以接受查询到几秒钟内的数据,及延时一定时间的消息。
二、方案设计
- 缓存更新策略:先更新本地缓存,然后定时刷新到redis中
- 代码实现设计:
(1)技术选型:使用caffeine或者guava缓存管理工具
(2)代码设计:设置缓存过期时间,并在实现过期时的处理接口,在该接口中将本地缓存过期的key刷新到redis缓存中;
三、代码实现
- maven引用
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>2.9.3</version>
</dependency>
2.代码实现
- 定义接口 DelayedCache
package com.zzc.component.cache;
public interface DelayedCache<K, V> {
V get(K key);
void put(K key, V value);
void remove(K key);
}
- 定义抽象类 AbstractDelayedCache 实现通用本地缓存策略
package com.zzc.component.cache;
import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Expiry;
import com.github.benmanes.caffeine.cache.LoadingCache;
import org.checkerframework.checker.index.qual.NonNegative;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import java.util.concurrent.TimeUnit;
public abstract class AbstractDelayedCache<K, V> implements DelayedCache<K, V> {
/**
* 延时刷新到redis的时间
*/
private final long delayMillis;
private final int initialCapacity;
private final int maximumSize;
private final LoadingCache<K, V> LOCAL_CACHE;
public AbstractDelayedCache(int initialCapacity, int maximumSize, long delayMillis) {
this.initialCapacity = initialCapacity;
this.maximumSize = maximumSize;
this.delayMillis = delayMillis;
LOCAL_CACHE = initLocalCache();
}
private LoadingCache<K, V> initLocalCache() {
return Caffeine.newBuilder()
.expireAfter(new Expiry<K, V>() {
@Override
public long expireAfterCreate(@NonNull K key, @NonNull V value, long currentTime) {
return TimeUnit.MILLISECONDS.toNanos(delayMillis);
}
@Override
public long expireAfterUpdate(@NonNull K key, @NonNull V value, long currentTime, @NonNegative long currentDuration) {
return currentDuration;
}
@Override
public long expireAfterRead(@NonNull K key, @NonNull V value, long currentTime, @NonNegative long currentDuration) {
return currentDuration;
}
})
.removalListener((key, value, cause) -> {
switch (cause) {
case EXPLICIT://当缓存项被显式地调用 invalidate 或 invalidateAll 方法删除时触发
afterExplicit(key, value);
break;
case REPLACED://当一个新的值通过 put、replace 等方法替换现有的值时触发。
afterReplaced(key, value);
break;
case COLLECTED://如果缓存使用了弱引用(weak keys 或 weak values)或软引用(soft values),并且这些引用的对象被垃圾回收器回收时触发。
afterCollected(key, value);
break;
case EXPIRED://当缓存项达到其设定的有效期(TTL, TTI)而被自动移除时触发。
afterExpired(key, value);
break;
case SIZE://当缓存项因为缓存大小超过限制(如最大容量或权重限制),根据驱逐策略(通常是 LRU、LFU 等)被移除时触发
afterSize(key, value);
break;
default:
break;
}
})
.initialCapacity(initialCapacity)
.maximumSize(maximumSize)
.build(new CacheLoader<K, V>() {
@Override
public @Nullable V load(@NonNull K key) throws Exception {
return loadCache(key);
}
});
}
/**
* 当缓存被显示调用 invalidate 或 invalidateAll 方法删除时触发
* @param key
* @param value
*/
protected abstract void afterExplicit(K key, V value);
/**
* 当缓存项被替换时触发
* @param key
* @param value
*/
protected void afterReplaced(K key, V value) {
}
/**
* 如果缓存使用了弱引用(weak keys 或 weak values)或软引用(soft values),并且这些引用的对象被垃圾回收器回收时触发。
* @param key
* @param value
*/
protected void afterCollected(K key, V value) {
}
/**
* 当缓存项达到其设定的有效期(TTL, TTI)而被自动移除时触发。
* @param key
* @param value
*/
protected abstract void afterExpired(K key, V value);
/**
* 当缓存项因为缓存大小超过限制(如最大容量或权重限制),根据驱逐策略(通常是 LRU、LFU 等)被移除时触发
* @param key
* @param value
*/
protected abstract void afterSize(K key, V value);
/**
* 初始化本地缓存数据,从远程获取
* @param key
*/
protected abstract V loadCache(K key);
@Override
public V get(K key) {
return LOCAL_CACHE.get(key);
}
@Override
public void put(K key, V value) {
LOCAL_CACHE.put(key, value);
}
@Override
public void remove(K key) {
LOCAL_CACHE.invalidate(key);
}
}
- 继承实现缓存和redis的关系
package com.zzc.component.cache;
public class TestRedisCache extends AbstractDelayedCache<String, String> {
/**
* 初始缓存数量大小为 1000
* 最大缓存数量 10000
* 缓存过期时间 10000ms
*/
public TestRedisCache() {
super(1000, 10000, 10000);
}
@Override
protected void afterExplicit(String key, String value) {
//TODO 删除redis缓存 redisTemplate.del(key);
}
@Override
protected void afterExpired(String key, String value) {
//TODO 更新到redis缓存 redisTemplate.setValue(key, value);
}
@Override
protected void afterSize(String key, String value) {
//TODO 更新到redis缓存 redisTemplate.setValue(key, value);
}
@Override
protected String loadCache(String key) {
//TODO 从redis缓存中获取key的value
return null;
}
}
- Demo
package com.zzc.component.cache;
public class Demo {
public static void main(String[] args) {
TestRedisCache cache = new TestRedisCache();
cache.put("key", "value");
cache.get("key");
cache.remove("key");
}
}