当前位置: 首页 > news >正文

Spring Cache 扩展:Redis 批量操作优化方案与 BatchCache 自定义实现

Spring Cache是一个强大的缓存抽象层,提供了统一的缓存操作接口,但原生支持主要集中在单键操作。在高并发场景下,批量操作能力对于提升系统性能至关重要。本文将深入探讨如何通过继承Cache接口,以RedisCache为基础,实现兼容Spring Cache规范的BatchCache扩展。

一、Spring Cache架构与RedisCache源码剖析

1.1 Spring Cache核心组件

Spring Cache的核心是org.springframework.cache.Cache接口,定义了缓存的基本操作:

public interface Cache {String getName();Object getNativeCache();ValueWrapper get(Object key);<T> T get(Object key, Class<T> type);<T> T get(Object key, Callable<T> valueLoader);void put(Object key, Object value);ValueWrapper putIfAbsent(Object key, Object value);void evict(Object key);void clear();
}

主要实现类包括:

  • ConcurrentMapCache:基于ConcurrentHashMap的简单实现
  • RedisCache:Redis集成实现
  • CaffeineCache:Caffeine缓存实现

1.2 RedisCache源码深入分析

RedisCache是Spring Data Redis提供的缓存实现,核心类结构如下:

public class RedisCache extends AbstractValueAdaptingCache {private final String name;private final RedisCacheWriter cacheWriter;private final RedisCacheConfiguration cacheConfig;// 构造函数和核心方法实现
}

核心组件解析:

  • RedisCacheWriter:负责与Redis通信的底层接口
  • RedisCacheConfiguration:缓存配置,如序列化器、TTL等
  • AbstractValueAdaptingCache:提供缓存值处理的基础实现

1.3 RedisCacheWriter源码分析

RedisCacheWriter是Redis操作的核心接口:

public interface RedisCacheWriter {void put(String name, byte[] key, byte[] value, @Nullable Duration ttl);byte[] get(String name, byte[] key);byte[] putIfAbsent(String name, byte[] key, byte[] value, @Nullable Duration ttl);void remove(String name, byte[] key);void clean(String name, byte[] pattern);
}

主要实现类:

  • DefaultRedisCacheWriter:基于RedisTemplate的默认实现
  • LettuceRedisCacheWriter:基于Lettuce客户端的优化实现
  • JedisRedisCacheWriter:基于Jedis客户端的实现

二、BatchCache接口设计与实现思路

2.1 BatchCache接口定义

为了实现批量操作能力,我们需要定义一个扩展接口:

/*** 扩展Spring Cache接口,提供批量操作能力** @author doubao*/
public interface BatchCache extends Cache {/*** 批量获取缓存值** @param keys 缓存键集合* @return 键值对映射,不存在的键对应的值为null*/Map<Object, Object> getAll(Collection<?> keys);/*** 批量存入缓存值** @param values 键值对映射*/void putAll(Map<?, ?> values);/*** 批量存入缓存值,仅当键不存在时** @param values 键值对映射* @return 实际存入的键值对映射,已存在的键对应的值为null*/Map<Object, Object> putAllIfAbsent(Map<?, ?> values);/*** 批量删除缓存** @param keys 缓存键集合*/void evictAll(Collection<?> keys);
}

2.2 实现思路概述

实现BatchCache接口的核心思路:

  1. 继承RedisCache类,复用现有功能
  2. 扩展RedisCacheWriter接口,添加批量操作方法
  3. 实现BatchRedisCache类,实现BatchCache接口
  4. 提供配置类,注册自定义CacheManager
  5. 确保与Spring Cache生态系统兼容

三、核心代码实现

3.1 扩展RedisCacheWriter接口

/*** 扩展RedisCacheWriter接口,添加批量操作方法** @author doubao*/
public interface BatchRedisCacheWriter extends RedisCacheWriter {/*** 批量获取缓存值** @param name 缓存名称* @param keys 缓存键集合(字节数组形式)* @return 键值对映射,不存在的键对应的值为null*/Map<byte[], byte[]> getAll(String name, Collection<byte[]> keys);/*** 批量存入缓存值** @param name 缓存名称* @param values 键值对映射(字节数组形式)* @param ttl 过期时间,null表示不过期*/void putAll(String name, Map<byte[], byte[]> values, @Nullable Duration ttl);/*** 批量删除缓存** @param name 缓存名称* @param keys 缓存键集合(字节数组形式)*/void removeAll(String name, Collection<byte[]> keys);
}

3.2 实现BatchRedisCacheWriter

/*** RedisCacheWriter的批量操作实现** @author doubao*/
public class DefaultBatchRedisCacheWriter implements BatchRedisCacheWriter {private final RedisTemplate<byte[], byte[]> redisTemplate;private final Duration sleepTime;/*** 构造函数** @param redisOperations Redis操作模板*/public DefaultBatchRedisCacheWriter(RedisOperations<byte[], byte[]> redisOperations) {this(redisOperations, Duration.ZERO);}/*** 构造函数** @param redisOperations Redis操作模板* @param sleepTime 重试间隔时间*/public DefaultBatchRedisCacheWriter(RedisOperations<byte[], byte[]> redisOperations, Duration sleepTime) {Assert.notNull(redisOperations, "RedisOperations must not be null!");Assert.notNull(sleepTime, "SleepTime must not be null!");this.redisTemplate = (RedisTemplate<byte[], byte[]>) redisOperations;this.sleepTime = sleepTime;}@Overridepublic void put(String name, byte[] key, byte[] value, @Nullable Duration ttl) {execute(name, connection -> {if (shouldExpireWithin(ttl)) {connection.setEx(key, ttl.getSeconds(), value);} else {connection.set(key, value);}return "OK";});}@Overridepublic byte[] get(String name, byte[] key) {return execute(name, connection -> connection.get(key));}// 其他方法实现...@Overridepublic Map<byte[], byte[]> getAll(String name, Collection<byte[]> keys) {return execute(name, connection -> {List<byte[]> values = connection.mGet(keys.toArray(new byte[0][]));Map<byte[], byte[]> result = new LinkedHashMap<>(keys.size());int index = 0;for (byte[] key : keys) {result.put(key, index < values.size() ? values.get(index) : null);index++;}return result;});}@Overridepublic void putAll(String name, Map<byte[], byte[]> values, @Nullable Duration ttl) {execute(name, connection -> {if (shouldExpireWithin(ttl)) {Pipeline pipeline = connection.openPipeline();for (Map.Entry<byte[], byte[]> entry : values.entrySet()) {pipeline.setEx(entry.getKey(), ttl.getSeconds(), entry.getValue());}pipeline.close();} else {Map<byte[], byte[]> nonNullValues = values.entrySet().stream().filter(e -> e.getValue() != null).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));if (!nonNullValues.isEmpty()) {connection.mSet(nonNullValues);}}return "OK";});}@Overridepublic void removeAll(String name, Collection<byte[]> keys) {execute(name, connection -> {if (!keys.isEmpty()) {connection.del(keys.toArray(new byte[0][]));}return "OK";});}// 辅助方法...private <T> T execute(String name, RedisCallback<T> callback) {try {return redisTemplate.execute(callback);} catch (Exception ex) {throw new CacheOperationFailedException(name, "Redis batch operation failed", ex);}}private boolean shouldExpireWithin(@Nullable Duration ttl) {return ttl != null && !ttl.isZero() && !ttl.isNegative();}
}

3.3 实现BatchRedisCache类

/*** 支持批量操作的Redis缓存实现** @author doubao*/
public class BatchRedisCache extends RedisCache implements BatchCache {private final BatchRedisCacheWriter cacheWriter;private final RedisSerializationContext<Object, Object> serializationContext;/*** 构造函数** @param name 缓存名称* @param cacheWriter 缓存写入器* @param cacheConfig 缓存配置*/protected BatchRedisCache(String name, BatchRedisCacheWriter cacheWriter,RedisCacheConfiguration cacheConfig) {super(name, cacheWriter, cacheConfig);this.cacheWriter = cacheWriter;this.serializationContext = cacheConfig.getSerializationContext();}@Overridepublic Map<Object, Object> getAll(Collection<?> keys) {// 转换键为字节数组Map<Object, byte[]> keyMappings = new LinkedHashMap<>(keys.size());for (Object key : keys) {keyMappings.put(key, serializeCacheKey(createCacheKey(key)));}// 批量获取缓存值Map<byte[], byte[]> results = cacheWriter.getAll(getName(), keyMappings.values());// 反序列化结果Map<Object, Object> valueMappings = new LinkedHashMap<>(results.size());for (Map.Entry<Object, byte[]> entry : keyMappings.entrySet()) {byte[] valueBytes = results.get(entry.getValue());valueMappings.put(entry.getKey(), deserializeCacheValue(valueBytes));}return valueMappings;}@Overridepublic void putAll(Map<?, ?> values) {// 序列化键值对Map<byte[], byte[]> serializedValues = new LinkedHashMap<>(values.size());for (Map.Entry<?, ?> entry : values.entrySet()) {if (entry.getValue() != null) {String cacheKey = createCacheKey(entry.getKey());byte[] keyBytes = serializeCacheKey(cacheKey);byte[] valueBytes = serializeCacheValue(entry.getValue());serializedValues.put(keyBytes, valueBytes);}}// 批量存入缓存cacheWriter.putAll(getName(), serializedValues, getTtl());}@Overridepublic Map<Object, Object> putAllIfAbsent(Map<?, ?> values) {// 实现略,需要使用Redis事务或Lua脚本确保原子性throw new UnsupportedOperationException("Batch putIfAbsent operation is not supported yet.");}@Overridepublic void evictAll(Collection<?> keys) {// 转换键为字节数组Collection<byte[]> keyBytes = keys.stream().map(key -> serializeCacheKey(createCacheKey(key))).collect(Collectors.toList());// 批量删除缓存cacheWriter.removeAll(getName(), keyBytes);}// 辅助方法...private byte[] serializeCacheKey(String cacheKey) {return serializationContext.getKeySerializationPair().write(cacheKey);}private byte[] serializeCacheValue(Object value) {return serializationContext.getValueSerializationPair().write(value);}private Object deserializeCacheValue(byte[] valueBytes) {if (valueBytes == null) {return null;}return serializationContext.getValueSerializationPair().read(valueBytes);}private String createCacheKey(Object key) {String convertedKey = convertKey(key);if (!getCacheConfiguration().usePrefix()) {return convertedKey;}return getCacheConfiguration().getKeyPrefixFor(name) + convertedKey;}
}

3.4 实现BatchRedisCacheManager

/*** 支持批量操作的Redis缓存管理器** @author doubao*/
public class BatchRedisCacheManager extends RedisCacheManager {private final BatchRedisCacheWriter cacheWriter;private final RedisCacheConfiguration defaultCacheConfig;/*** 构造函数** @param cacheWriter 缓存写入器* @param defaultCacheConfiguration 默认缓存配置*/public BatchRedisCacheManager(BatchRedisCacheWriter cacheWriter,RedisCacheConfiguration defaultCacheConfiguration) {super(cacheWriter, defaultCacheConfiguration);this.cacheWriter = cacheWriter;this.defaultCacheConfig = defaultCacheConfiguration;}@Overrideprotected RedisCache createRedisCache(String name, @Nullable RedisCacheConfiguration cacheConfig) {return new BatchRedisCache(name, cacheWriter,cacheConfig != null ? cacheConfig : defaultCacheConfig);}/*** 从RedisCacheManager转换为BatchRedisCacheManager** @param cacheManager Redis缓存管理器* @return 支持批量操作的Redis缓存管理器*/public static BatchRedisCacheManager fromRedisCacheManager(RedisCacheManager cacheManager) {// 获取RedisCacheManager的私有字段Field cacheWriterField;Field defaultCacheConfigField;try {cacheWriterField = RedisCacheManager.class.getDeclaredField("cacheWriter");defaultCacheConfigField = RedisCacheManager.class.getDeclaredField("defaultCacheConfig");cacheWriterField.setAccessible(true);defaultCacheConfigField.setAccessible(true);RedisCacheWriter cacheWriter = (RedisCacheWriter) cacheWriterField.get(cacheManager);RedisCacheConfiguration defaultCacheConfig =(RedisCacheConfiguration) defaultCacheConfigField.get(cacheManager);// 创建BatchRedisCacheWriterBatchRedisCacheWriter batchCacheWriter;if (cacheWriter instanceof DefaultRedisCacheWriter) {DefaultRedisCacheWriter defaultWriter = (DefaultRedisCacheWriter) cacheWriter;// 使用反射获取RedisOperationsField redisOperationsField = DefaultRedisCacheWriter.class.getDeclaredField("redisOperations");redisOperationsField.setAccessible(true);RedisOperations<byte[], byte[]> redisOperations =(RedisOperations<byte[], byte[]>) redisOperationsField.get(defaultWriter);batchCacheWriter = new DefaultBatchRedisCacheWriter(redisOperations);} else {// 回退方案,使用RedisTemplate创建RedisTemplate<byte[], byte[]> redisTemplate = new RedisTemplate<>();redisTemplate.setConnectionFactory(cacheManager.getCacheWriter().getConnectionFactory());redisTemplate.afterPropertiesSet();batchCacheWriter = new DefaultBatchRedisCacheWriter(redisTemplate);}// 创建BatchRedisCacheManagerreturn new BatchRedisCacheManager(batchCacheWriter, defaultCacheConfig);} catch (NoSuchFieldException | IllegalAccessException e) {throw new RuntimeException("Failed to convert RedisCacheManager to BatchRedisCacheManager", e);}}
}

3.5 配置类实现

/*** 批量缓存配置类** @author doubao*/
@Configuration
public class BatchCacheConfiguration {@Beanpublic BatchRedisCacheManager batchRedisCacheManager(RedisConnectionFactory redisConnectionFactory) {// 创建默认配置RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig().entryTtl(Duration.ofMinutes(10)).serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer())).serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericJackson2JsonRedisSerializer()));// 创建BatchRedisCacheWriterRedisTemplate<byte[], byte[]> redisTemplate = new RedisTemplate<>();redisTemplate.setConnectionFactory(redisConnectionFactory);redisTemplate.setKeySerializer(new StringRedisSerializer());redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());redisTemplate.afterPropertiesSet();BatchRedisCacheWriter cacheWriter = new DefaultBatchRedisCacheWriter(redisTemplate);// 创建BatchRedisCacheManagerBatchRedisCacheManager cacheManager = new BatchRedisCacheManager(cacheWriter, config);cacheManager.setTransactionAware(true);// 设置缓存名称和配置的映射Map<String, RedisCacheConfiguration> cacheConfigurations = new HashMap<>();// 可以为不同的缓存名称配置不同的策略cacheConfigurations.put("batchCache", config.entryTtl(Duration.ofHours(1)));cacheManager.setCacheConfigurations(cacheConfigurations);return cacheManager;}/*** 自定义CacheResolver,支持BatchCache** @param cacheManager 缓存管理器* @return 缓存解析器*/@Beanpublic CacheResolver batchCacheResolver(BatchRedisCacheManager cacheManager) {return new SimpleCacheResolver(cacheManager) {@Overrideprotected Collection<? extends Cache> getCaches(CacheOperationInvocationContext<?> context) {Collection<? extends Cache> caches = super.getCaches(context);return caches.stream().map(cache -> {if (cache instanceof RedisCache && !(cache instanceof BatchRedisCache)) {// 将普通RedisCache转换为BatchRedisCacheRedisCache redisCache = (RedisCache) cache;return new BatchRedisCache(redisCache.getName(),(BatchRedisCacheWriter) cacheManager.getCacheWriter(),redisCache.getCacheConfiguration());}return cache;}).collect(Collectors.toList());}};}
}

四、使用示例

4.1 定义业务服务

/*** 示例服务类,演示BatchCache的使用** @author doubao*/
@Service
public class UserService {private final UserRepository userRepository;@Autowiredpublic UserService(UserRepository userRepository) {this.userRepository = userRepository;}/*** 批量获取用户信息,使用缓存** @param userIds 用户ID集合* @return 用户信息映射*/@Cacheable(value = "users", key = "#root.methodName + '_' + #userIds", unless = "#result == null")public Map<Long, User> getUsersBatch(Collection<Long> userIds) {// 模拟从数据库获取数据return userIds.stream().collect(Collectors.toMap(Function.identity(),userId -> userRepository.findById(userId).orElse(null)));}/*** 批量保存用户信息,并更新缓存** @param users 用户信息集合*/@CachePut(value = "users", key = "#root.methodName + '_' + #users.![id]", condition = "#users != null && !#users.isEmpty()")public Map<Long, User> saveUsersBatch(Collection<User> users) {// 模拟批量保存到数据库users.forEach(userRepository::save);// 返回保存后的用户信息return users.stream().collect(Collectors.toMap(User::getId, Function.identity()));}/*** 批量删除用户信息,并清除缓存** @param userIds 用户ID集合*/@CacheEvict(value = "users", allEntries = false, key = "#root.methodName + '_' + #userIds")public void deleteUsersBatch(Collection<Long> userIds) {// 模拟批量删除userIds.forEach(userRepository::deleteById);}/*** 直接使用BatchCache接口的批量操作** @param userIds 用户ID集合* @return 用户信息映射*/public Map<Long, User> getUsersBatchWithBatchCache(Collection<Long> userIds) {// 通过ApplicationContext获取BatchCacheBatchCache batchCache = (BatchCache) applicationContext.getBean("cacheManager").getCache("users");// 直接使用BatchCache的批量获取方法Map<Object, Object> cacheResults = batchCache.getAll(userIds);// 处理缓存结果Map<Long, User> result = new HashMap<>();for (Map.Entry<Object, Object> entry : cacheResults.entrySet()) {Long userId = (Long) entry.getKey();User user = (User) entry.getValue();if (user == null) {// 缓存未命中,从数据库获取user = userRepository.findById(userId).orElse(null);if (user != null) {// 手动放入缓存batchCache.put(userId, user);}}result.put(userId, user);}return result;}
}

4.2 配置文件示例

spring:redis:host: localhostport: 6379password:timeout: 10000mslettuce:pool:max-active: 8max-wait: -1msmax-idle: 8min-idle: 0cache:type: redisredis:time-to-live: 600000  # 10分钟cache-null-values: trueuse-key-prefix: truekey-prefix: batch_cache:

五、性能测试与优化

5.1 性能测试框架

/*** BatchCache性能测试** @author doubao*/
@SpringBootTest
public class BatchCachePerformanceTest {@Autowiredprivate UserService userService;@Autowiredprivate CacheManager cacheManager;private static final int TEST_SIZE = 1000;private static final int WARMUP_TIMES = 10;private static final int TEST_TIMES = 100;@BeforeEachpublic void setUp() {// 准备测试数据List<User> users = new ArrayList<>(TEST_SIZE);for (int i = 0; i < TEST_SIZE; i++) {User user = new User();user.setId((long) i);user.setName("User" + i);user.setAge(20 + i % 30);users.add(user);}// 预热for (int i = 0; i < WARMUP_TIMES; i++) {userService.saveUsersBatch(users);userService.getUsersBatch(users.stream().map(User::getId).collect(Collectors.toList()));}// 清除缓存Cache usersCache = cacheManager.getCache("users");if (usersCache != null) {usersCache.clear();}}@Testpublic void testBatchGetPerformance() {List<Long> userIds = IntStream.range(0, TEST_SIZE).mapToObj(Long::valueOf).collect(Collectors.toList());// 测试单键获取性能long singleStartTime = System.currentTimeMillis();for (int i = 0; i < TEST_TIMES; i++) {for (Long userId : userIds) {userService.getUser(userId);}}long singleEndTime = System.currentTimeMillis();long singleTotalTime = singleEndTime - singleStartTime;// 测试批量获取性能long batchStartTime = System.currentTimeMillis();for (int i = 0; i < TEST_TIMES; i++) {userService.getUsersBatch(userIds);}long batchEndTime = System.currentTimeMillis();long batchTotalTime = batchEndTime - batchStartTime;// 输出性能结果System.out.printf("单键获取 %d 次,总耗时: %d ms,平均每次: %f ms%n",TEST_SIZE * TEST_TIMES, singleTotalTime, (double) singleTotalTime / (TEST_SIZE * TEST_TIMES));System.out.printf("批量获取 %d 次,总耗时: %d ms,平均每次: %f ms%n",TEST_TIMES, batchTotalTime, (double) batchTotalTime / TEST_TIMES);System.out.printf("批量操作性能提升: %.2f%%%n",(1 - (double) batchTotalTime / singleTotalTime) * 100);}// 其他性能测试方法...
}

5.2 性能优化策略

  1. 批量操作优化
    • 使用Redis的MGET、MSET等批量命令
    • 合理设置批量操作的大小,避免单次操作过大
    • 考虑使用Redis Pipeline提升性能
  2. 序列化优化
    • 使用高效的序列化方式,如Kryo、Protostuff等
    • 避免序列化大对象,可考虑拆分数据
  3. 缓存配置优化
    • 根据业务场景设置合理的TTL
    • 使用分区缓存,避免不同业务数据相互影响
    • 考虑使用二级缓存(如Caffeine + Redis)提升性能

六、注意事项与最佳实践

6.1 使用注意事项

  1. 事务支持
    • Spring Cache的@Cacheable、@CachePut等注解不支持事务回滚
    • 如果需要事务支持,建议在业务代码中直接使用BatchCache接口
  2. 异常处理
    • 批量操作可能部分成功部分失败,需要业务层处理这种情况
    • 考虑实现重试机制,确保操作的最终一致性
  3. 缓存穿透与雪崩
    • 批量操作同样需要防范缓存穿透问题
    • 合理设置不同数据的TTL,避免缓存雪崩

6.2 最佳实践

  1. 批量操作大小控制
    • 对于大量数据的批量操作,建议分批处理
    • 每批大小可根据网络情况和Redis性能调整,一般建议在100-1000之间
  2. 缓存预热
    • 对于热点数据,启动时进行缓存预热
    • 使用BatchCache的批量操作能力快速填充缓存
  3. 监控与告警
    • 监控批量操作的性能指标,如QPS、响应时间等
    • 设置合理的告警阈值,及时发现性能问题

通过以上方案,我们成功地扩展了Spring Boot Cache的功能,实现了兼容RedisCache的BatchCache接口。这种实现方式不仅保持了与Spring Cache生态的兼容性,还显著提升了批量数据操作的性能,为高并发场景下的应用提供了有力支持。

http://www.dtcms.com/a/291350.html

相关文章:

  • 2130、链表最大孪生和
  • rsync报错解决
  • Shopify 知识点
  • 草木知音的认知进化:Deepoc具身智能如何让除草机读懂花园的呼吸
  • 设备监控之数据处理(1)-概述
  • MQ 核心知识点笔记
  • Android开发中卡顿治理方案
  • 用基础模型构建应用(第十章)AI Engineering: Building Applications with Foundation Models学习笔记
  • 如何用纯 HTML 文件实现 Vue.js 应用,并通过 CDN 引入 Element UI
  • 【PHP 流程控制完全指南】
  • 多端适配灾难现场:可视化界面在PC/平板/大屏端的响应式布局实战
  • .NET依赖注入IOC你了解吗?
  • 开发避坑短篇(3):解决@vitejs plugin-vue@5.0.5对Vite^5.0.0的依赖冲突
  • 万界星空科技锂电池MES解决方案
  • Shell判断结构
  • voice模块
  • 【图论】CF——B. Chamber of Secrets (0-1BFS)
  • 标准文件I/O补充知识
  • paddleocr安装,数据集制作,训练自己的模型,调用训练好的模型
  • 20250721-day19
  • 【PTA数据结构 | C语言版】双连通分量
  • C# 实现:动态规划解决 0/1 背包问题
  • nextjs编程式跳转
  • 《小白学习产品经理》第七章:方法论之波特五力模型
  • springcloud -- 微服务02
  • Iridium Certus 9704 卫星物联网开发套件
  • cuda编程笔记(9)--使用 Shared Memory 实现 tiled GEMM
  • 补环境基础(二) this的作用和绑定规则
  • 关于Ajax的学习笔记
  • synchronized 修饰符的使用