当前位置: 首页 > news >正文

Java延时定时刷新Redis缓存

延时定时刷新Redis缓存

一、背景

  1. 项目需求:订阅接收一批实时数据,每分钟最高可接收120万条数据,并且分别更新到redis和数据库中;而用户请求查询消息只是低频操作。
  2. 资源限制:由于项目预算有限,只有4台4C16G的主机用于消费处理这些消息;
  3. 需求容忍程度:当http请求查询消息时,可以接受查询到几秒钟内的数据,及延时一定时间的消息。

二、方案设计

  1. 缓存更新策略:先更新本地缓存,然后定时刷新到redis中
  2. 代码实现设计:
    (1)技术选型:使用caffeine或者guava缓存管理工具
    (2)代码设计:设置缓存过期时间,并在实现过期时的处理接口,在该接口中将本地缓存过期的key刷新到redis缓存中;

三、代码实现

  1. maven引用
        <dependency>
            <groupId>com.github.ben-manes.caffeine</groupId>
            <artifactId>caffeine</artifactId>
            <version>2.9.3</version>
        </dependency>

2.代码实现

  • 定义接口 DelayedCache
package com.zzc.component.cache;
public interface DelayedCache<K, V> {

    V get(K key);

    void put(K key, V value);

    void remove(K key);

}

  • 定义抽象类 AbstractDelayedCache 实现通用本地缓存策略
package com.zzc.component.cache;

import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Expiry;
import com.github.benmanes.caffeine.cache.LoadingCache;
import org.checkerframework.checker.index.qual.NonNegative;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;

import java.util.concurrent.TimeUnit;

public abstract class AbstractDelayedCache<K, V> implements DelayedCache<K, V> {

    /**
     * 延时刷新到redis的时间
     */
    private final long delayMillis;

    private final int initialCapacity;

    private final int maximumSize;

    private final LoadingCache<K, V> LOCAL_CACHE;

    public AbstractDelayedCache(int initialCapacity, int maximumSize, long delayMillis) {
        this.initialCapacity = initialCapacity;
        this.maximumSize = maximumSize;
        this.delayMillis = delayMillis;
        LOCAL_CACHE = initLocalCache();
    }


    private LoadingCache<K, V> initLocalCache() {
        return Caffeine.newBuilder()
                .expireAfter(new Expiry<K, V>() {

                    @Override
                    public long expireAfterCreate(@NonNull K key, @NonNull V value, long currentTime) {
                        return TimeUnit.MILLISECONDS.toNanos(delayMillis);
                    }

                    @Override
                    public long expireAfterUpdate(@NonNull K key, @NonNull V value, long currentTime, @NonNegative long currentDuration) {
                        return currentDuration;
                    }

                    @Override
                    public long expireAfterRead(@NonNull K key, @NonNull V value, long currentTime, @NonNegative long currentDuration) {
                        return currentDuration;
                    }
                })
                .removalListener((key, value, cause) -> {
                    switch (cause) {
                        case EXPLICIT://当缓存项被显式地调用 invalidate 或 invalidateAll 方法删除时触发
                            afterExplicit(key, value);
                            break;
                        case REPLACED://当一个新的值通过 put、replace 等方法替换现有的值时触发。
                            afterReplaced(key, value);
                            break;
                        case COLLECTED://如果缓存使用了弱引用(weak keys 或 weak values)或软引用(soft values),并且这些引用的对象被垃圾回收器回收时触发。
                            afterCollected(key, value);
                            break;
                        case EXPIRED://当缓存项达到其设定的有效期(TTL, TTI)而被自动移除时触发。
                            afterExpired(key, value);
                            break;
                        case SIZE://当缓存项因为缓存大小超过限制(如最大容量或权重限制),根据驱逐策略(通常是 LRULFU 等)被移除时触发
                            afterSize(key, value);
                            break;
                        default:
                            break;
                    }
                })
                .initialCapacity(initialCapacity)
                .maximumSize(maximumSize)
                .build(new CacheLoader<K, V>() {
                    @Override
                    public @Nullable V load(@NonNull K key) throws Exception {
                        return loadCache(key);
                    }
                });
    }

    /**
     * 当缓存被显示调用 invalidate 或 invalidateAll 方法删除时触发
     * @param key
     * @param value
     */
    protected abstract void afterExplicit(K key, V value);

    /**
     * 当缓存项被替换时触发
     * @param key
     * @param value
     */
    protected void afterReplaced(K key, V value) {

    }

    /**
     * 如果缓存使用了弱引用(weak keys 或 weak values)或软引用(soft values),并且这些引用的对象被垃圾回收器回收时触发。
     * @param key
     * @param value
     */
    protected void afterCollected(K key, V value) {

    }

    /**
     * 当缓存项达到其设定的有效期(TTL, TTI)而被自动移除时触发。
     * @param key
     * @param value
     */
    protected abstract void afterExpired(K key, V value);

    /**
     * 当缓存项因为缓存大小超过限制(如最大容量或权重限制),根据驱逐策略(通常是 LRU、LFU 等)被移除时触发
     * @param key
     * @param value
     */
    protected abstract void afterSize(K key, V value);

    /**
     * 初始化本地缓存数据,从远程获取
     * @param key
     */
    protected abstract V loadCache(K key);

    @Override
    public V get(K key) {
        return LOCAL_CACHE.get(key);
    }

    @Override
    public void put(K key, V value) {
        LOCAL_CACHE.put(key, value);
    }

    @Override
    public void remove(K key) {
        LOCAL_CACHE.invalidate(key);
    }

}

  • 继承实现缓存和redis的关系
package com.zzc.component.cache;
public class TestRedisCache extends AbstractDelayedCache<String, String> {

    /**
     * 初始缓存数量大小为 1000
     * 最大缓存数量 10000
     * 缓存过期时间 10000ms
     */
    public TestRedisCache() {
        super(1000, 10000, 10000);
    }

    @Override
    protected void afterExplicit(String key, String value) {
        //TODO 删除redis缓存 redisTemplate.del(key);
    }

    @Override
    protected void afterExpired(String key, String value) {
        //TODO 更新到redis缓存 redisTemplate.setValue(key, value);
    }

    @Override
    protected void afterSize(String key, String value) {
        //TODO 更新到redis缓存 redisTemplate.setValue(key, value);
    }

    @Override
    protected String loadCache(String key) {
        //TODO 从redis缓存中获取key的value
        return null;
    }
}

  • Demo
package com.zzc.component.cache;
public class Demo {

    public static void main(String[] args) {
        TestRedisCache cache = new TestRedisCache();
        cache.put("key", "value");
        cache.get("key");
        cache.remove("key");
    }

}

相关文章:

  • 什么是掉期(Swap)?——金融衍生品的关键工具(中英双语)
  • Spring Boot Actuator 监控✨
  • 使用DeepSeek+本地知识库,尝试从0到1搭建高度定制化工作流(爬虫模块篇)
  • 钉钉应用开发
  • unity学习42:动画状态机:混合动画状态 blend tree
  • OpenGL ES学习大纲
  • 解锁机器学习核心算法 | K -近邻算法:机器学习的神奇钥匙
  • 在 Visual Studio Code (VSCode) 中创建 React 项目
  • 基于豆瓣2025电影数据可视化分析系统的设计与实现
  • 500. 键盘行 771. 宝石与石头 简单 find接口的使用
  • (萌新入门)如何从起步阶段开始学习STM32 —— 1如何迁移一个开发版的工程
  • 深入解析 vLLM:高性能 LLM 服务框架的架构之美(二)调度管理
  • 【JAVA工程师从0开始学AI】,第二步:从强类型到动态语言:Java工程师的Python语法避坑指南
  • Golang实现简单粗暴的接口去重函数
  • 管理WSL实例 以及安装 Ubuntu 作为 WSL 子系统 流程
  • Deepseek本地部署指南:在linux服务器部署,在mac远程web-ui访问
  • 现代多核 CPU 的变化
  • TreeSet(单列集合)
  • Spring Boot(七):Swagger 接口文档
  • 【信息学奥赛一本通 C++题解】1286:怪盗基德的滑翔翼
  • 上海电视节发布海报、宣传片:三十而励,光影新程
  • 贵州茅台股东大会回应八大热点:确保茅台酒价格体系稳固,相信自我调节能力
  • 多名幼师殴打女童被行拘后续,盘锦教育局工作人员:该局将专项整治全市幼儿园
  • 俄乌上周在土耳其直接谈判,外交部回应
  • 人民日报大家谈:为基层减负,治在根子上减到点子上
  • 复旦一校友捐赠1亿元,却不留名