Spring Cache 扩展:Redis 批量操作优化方案与 BatchCache 自定义实现
Spring Cache是一个强大的缓存抽象层,提供了统一的缓存操作接口,但原生支持主要集中在单键操作。在高并发场景下,批量操作能力对于提升系统性能至关重要。本文将深入探讨如何通过继承Cache接口,以RedisCache为基础,实现兼容Spring Cache规范的BatchCache扩展。
一、Spring Cache架构与RedisCache源码剖析
1.1 Spring Cache核心组件
Spring Cache的核心是org.springframework.cache.Cache
接口,定义了缓存的基本操作:
public interface Cache {String getName();Object getNativeCache();ValueWrapper get(Object key);<T> T get(Object key, Class<T> type);<T> T get(Object key, Callable<T> valueLoader);void put(Object key, Object value);ValueWrapper putIfAbsent(Object key, Object value);void evict(Object key);void clear();
}
主要实现类包括:
- ConcurrentMapCache:基于ConcurrentHashMap的简单实现
- RedisCache:Redis集成实现
- CaffeineCache:Caffeine缓存实现
1.2 RedisCache源码深入分析
RedisCache是Spring Data Redis提供的缓存实现,核心类结构如下:
public class RedisCache extends AbstractValueAdaptingCache {private final String name;private final RedisCacheWriter cacheWriter;private final RedisCacheConfiguration cacheConfig;// 构造函数和核心方法实现
}
核心组件解析:
- RedisCacheWriter:负责与Redis通信的底层接口
- RedisCacheConfiguration:缓存配置,如序列化器、TTL等
- AbstractValueAdaptingCache:提供缓存值处理的基础实现
1.3 RedisCacheWriter源码分析
RedisCacheWriter是Redis操作的核心接口:
public interface RedisCacheWriter {void put(String name, byte[] key, byte[] value, @Nullable Duration ttl);byte[] get(String name, byte[] key);byte[] putIfAbsent(String name, byte[] key, byte[] value, @Nullable Duration ttl);void remove(String name, byte[] key);void clean(String name, byte[] pattern);
}
主要实现类:
- DefaultRedisCacheWriter:基于RedisTemplate的默认实现
- LettuceRedisCacheWriter:基于Lettuce客户端的优化实现
- JedisRedisCacheWriter:基于Jedis客户端的实现
二、BatchCache接口设计与实现思路
2.1 BatchCache接口定义
为了实现批量操作能力,我们需要定义一个扩展接口:
/*** 扩展Spring Cache接口,提供批量操作能力** @author doubao*/
public interface BatchCache extends Cache {/*** 批量获取缓存值** @param keys 缓存键集合* @return 键值对映射,不存在的键对应的值为null*/Map<Object, Object> getAll(Collection<?> keys);/*** 批量存入缓存值** @param values 键值对映射*/void putAll(Map<?, ?> values);/*** 批量存入缓存值,仅当键不存在时** @param values 键值对映射* @return 实际存入的键值对映射,已存在的键对应的值为null*/Map<Object, Object> putAllIfAbsent(Map<?, ?> values);/*** 批量删除缓存** @param keys 缓存键集合*/void evictAll(Collection<?> keys);
}
2.2 实现思路概述
实现BatchCache接口的核心思路:
- 继承RedisCache类,复用现有功能
- 扩展RedisCacheWriter接口,添加批量操作方法
- 实现BatchRedisCache类,实现BatchCache接口
- 提供配置类,注册自定义CacheManager
- 确保与Spring Cache生态系统兼容
三、核心代码实现
3.1 扩展RedisCacheWriter接口
/*** 扩展RedisCacheWriter接口,添加批量操作方法** @author doubao*/
public interface BatchRedisCacheWriter extends RedisCacheWriter {/*** 批量获取缓存值** @param name 缓存名称* @param keys 缓存键集合(字节数组形式)* @return 键值对映射,不存在的键对应的值为null*/Map<byte[], byte[]> getAll(String name, Collection<byte[]> keys);/*** 批量存入缓存值** @param name 缓存名称* @param values 键值对映射(字节数组形式)* @param ttl 过期时间,null表示不过期*/void putAll(String name, Map<byte[], byte[]> values, @Nullable Duration ttl);/*** 批量删除缓存** @param name 缓存名称* @param keys 缓存键集合(字节数组形式)*/void removeAll(String name, Collection<byte[]> keys);
}
3.2 实现BatchRedisCacheWriter
/*** RedisCacheWriter的批量操作实现** @author doubao*/
public class DefaultBatchRedisCacheWriter implements BatchRedisCacheWriter {private final RedisTemplate<byte[], byte[]> redisTemplate;private final Duration sleepTime;/*** 构造函数** @param redisOperations Redis操作模板*/public DefaultBatchRedisCacheWriter(RedisOperations<byte[], byte[]> redisOperations) {this(redisOperations, Duration.ZERO);}/*** 构造函数** @param redisOperations Redis操作模板* @param sleepTime 重试间隔时间*/public DefaultBatchRedisCacheWriter(RedisOperations<byte[], byte[]> redisOperations, Duration sleepTime) {Assert.notNull(redisOperations, "RedisOperations must not be null!");Assert.notNull(sleepTime, "SleepTime must not be null!");this.redisTemplate = (RedisTemplate<byte[], byte[]>) redisOperations;this.sleepTime = sleepTime;}@Overridepublic void put(String name, byte[] key, byte[] value, @Nullable Duration ttl) {execute(name, connection -> {if (shouldExpireWithin(ttl)) {connection.setEx(key, ttl.getSeconds(), value);} else {connection.set(key, value);}return "OK";});}@Overridepublic byte[] get(String name, byte[] key) {return execute(name, connection -> connection.get(key));}// 其他方法实现...@Overridepublic Map<byte[], byte[]> getAll(String name, Collection<byte[]> keys) {return execute(name, connection -> {List<byte[]> values = connection.mGet(keys.toArray(new byte[0][]));Map<byte[], byte[]> result = new LinkedHashMap<>(keys.size());int index = 0;for (byte[] key : keys) {result.put(key, index < values.size() ? values.get(index) : null);index++;}return result;});}@Overridepublic void putAll(String name, Map<byte[], byte[]> values, @Nullable Duration ttl) {execute(name, connection -> {if (shouldExpireWithin(ttl)) {Pipeline pipeline = connection.openPipeline();for (Map.Entry<byte[], byte[]> entry : values.entrySet()) {pipeline.setEx(entry.getKey(), ttl.getSeconds(), entry.getValue());}pipeline.close();} else {Map<byte[], byte[]> nonNullValues = values.entrySet().stream().filter(e -> e.getValue() != null).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));if (!nonNullValues.isEmpty()) {connection.mSet(nonNullValues);}}return "OK";});}@Overridepublic void removeAll(String name, Collection<byte[]> keys) {execute(name, connection -> {if (!keys.isEmpty()) {connection.del(keys.toArray(new byte[0][]));}return "OK";});}// 辅助方法...private <T> T execute(String name, RedisCallback<T> callback) {try {return redisTemplate.execute(callback);} catch (Exception ex) {throw new CacheOperationFailedException(name, "Redis batch operation failed", ex);}}private boolean shouldExpireWithin(@Nullable Duration ttl) {return ttl != null && !ttl.isZero() && !ttl.isNegative();}
}
3.3 实现BatchRedisCache类
/*** 支持批量操作的Redis缓存实现** @author doubao*/
public class BatchRedisCache extends RedisCache implements BatchCache {private final BatchRedisCacheWriter cacheWriter;private final RedisSerializationContext<Object, Object> serializationContext;/*** 构造函数** @param name 缓存名称* @param cacheWriter 缓存写入器* @param cacheConfig 缓存配置*/protected BatchRedisCache(String name, BatchRedisCacheWriter cacheWriter,RedisCacheConfiguration cacheConfig) {super(name, cacheWriter, cacheConfig);this.cacheWriter = cacheWriter;this.serializationContext = cacheConfig.getSerializationContext();}@Overridepublic Map<Object, Object> getAll(Collection<?> keys) {// 转换键为字节数组Map<Object, byte[]> keyMappings = new LinkedHashMap<>(keys.size());for (Object key : keys) {keyMappings.put(key, serializeCacheKey(createCacheKey(key)));}// 批量获取缓存值Map<byte[], byte[]> results = cacheWriter.getAll(getName(), keyMappings.values());// 反序列化结果Map<Object, Object> valueMappings = new LinkedHashMap<>(results.size());for (Map.Entry<Object, byte[]> entry : keyMappings.entrySet()) {byte[] valueBytes = results.get(entry.getValue());valueMappings.put(entry.getKey(), deserializeCacheValue(valueBytes));}return valueMappings;}@Overridepublic void putAll(Map<?, ?> values) {// 序列化键值对Map<byte[], byte[]> serializedValues = new LinkedHashMap<>(values.size());for (Map.Entry<?, ?> entry : values.entrySet()) {if (entry.getValue() != null) {String cacheKey = createCacheKey(entry.getKey());byte[] keyBytes = serializeCacheKey(cacheKey);byte[] valueBytes = serializeCacheValue(entry.getValue());serializedValues.put(keyBytes, valueBytes);}}// 批量存入缓存cacheWriter.putAll(getName(), serializedValues, getTtl());}@Overridepublic Map<Object, Object> putAllIfAbsent(Map<?, ?> values) {// 实现略,需要使用Redis事务或Lua脚本确保原子性throw new UnsupportedOperationException("Batch putIfAbsent operation is not supported yet.");}@Overridepublic void evictAll(Collection<?> keys) {// 转换键为字节数组Collection<byte[]> keyBytes = keys.stream().map(key -> serializeCacheKey(createCacheKey(key))).collect(Collectors.toList());// 批量删除缓存cacheWriter.removeAll(getName(), keyBytes);}// 辅助方法...private byte[] serializeCacheKey(String cacheKey) {return serializationContext.getKeySerializationPair().write(cacheKey);}private byte[] serializeCacheValue(Object value) {return serializationContext.getValueSerializationPair().write(value);}private Object deserializeCacheValue(byte[] valueBytes) {if (valueBytes == null) {return null;}return serializationContext.getValueSerializationPair().read(valueBytes);}private String createCacheKey(Object key) {String convertedKey = convertKey(key);if (!getCacheConfiguration().usePrefix()) {return convertedKey;}return getCacheConfiguration().getKeyPrefixFor(name) + convertedKey;}
}
3.4 实现BatchRedisCacheManager
/*** 支持批量操作的Redis缓存管理器** @author doubao*/
public class BatchRedisCacheManager extends RedisCacheManager {private final BatchRedisCacheWriter cacheWriter;private final RedisCacheConfiguration defaultCacheConfig;/*** 构造函数** @param cacheWriter 缓存写入器* @param defaultCacheConfiguration 默认缓存配置*/public BatchRedisCacheManager(BatchRedisCacheWriter cacheWriter,RedisCacheConfiguration defaultCacheConfiguration) {super(cacheWriter, defaultCacheConfiguration);this.cacheWriter = cacheWriter;this.defaultCacheConfig = defaultCacheConfiguration;}@Overrideprotected RedisCache createRedisCache(String name, @Nullable RedisCacheConfiguration cacheConfig) {return new BatchRedisCache(name, cacheWriter,cacheConfig != null ? cacheConfig : defaultCacheConfig);}/*** 从RedisCacheManager转换为BatchRedisCacheManager** @param cacheManager Redis缓存管理器* @return 支持批量操作的Redis缓存管理器*/public static BatchRedisCacheManager fromRedisCacheManager(RedisCacheManager cacheManager) {// 获取RedisCacheManager的私有字段Field cacheWriterField;Field defaultCacheConfigField;try {cacheWriterField = RedisCacheManager.class.getDeclaredField("cacheWriter");defaultCacheConfigField = RedisCacheManager.class.getDeclaredField("defaultCacheConfig");cacheWriterField.setAccessible(true);defaultCacheConfigField.setAccessible(true);RedisCacheWriter cacheWriter = (RedisCacheWriter) cacheWriterField.get(cacheManager);RedisCacheConfiguration defaultCacheConfig =(RedisCacheConfiguration) defaultCacheConfigField.get(cacheManager);// 创建BatchRedisCacheWriterBatchRedisCacheWriter batchCacheWriter;if (cacheWriter instanceof DefaultRedisCacheWriter) {DefaultRedisCacheWriter defaultWriter = (DefaultRedisCacheWriter) cacheWriter;// 使用反射获取RedisOperationsField redisOperationsField = DefaultRedisCacheWriter.class.getDeclaredField("redisOperations");redisOperationsField.setAccessible(true);RedisOperations<byte[], byte[]> redisOperations =(RedisOperations<byte[], byte[]>) redisOperationsField.get(defaultWriter);batchCacheWriter = new DefaultBatchRedisCacheWriter(redisOperations);} else {// 回退方案,使用RedisTemplate创建RedisTemplate<byte[], byte[]> redisTemplate = new RedisTemplate<>();redisTemplate.setConnectionFactory(cacheManager.getCacheWriter().getConnectionFactory());redisTemplate.afterPropertiesSet();batchCacheWriter = new DefaultBatchRedisCacheWriter(redisTemplate);}// 创建BatchRedisCacheManagerreturn new BatchRedisCacheManager(batchCacheWriter, defaultCacheConfig);} catch (NoSuchFieldException | IllegalAccessException e) {throw new RuntimeException("Failed to convert RedisCacheManager to BatchRedisCacheManager", e);}}
}
3.5 配置类实现
/*** 批量缓存配置类** @author doubao*/
@Configuration
public class BatchCacheConfiguration {@Beanpublic BatchRedisCacheManager batchRedisCacheManager(RedisConnectionFactory redisConnectionFactory) {// 创建默认配置RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig().entryTtl(Duration.ofMinutes(10)).serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer())).serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericJackson2JsonRedisSerializer()));// 创建BatchRedisCacheWriterRedisTemplate<byte[], byte[]> redisTemplate = new RedisTemplate<>();redisTemplate.setConnectionFactory(redisConnectionFactory);redisTemplate.setKeySerializer(new StringRedisSerializer());redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());redisTemplate.afterPropertiesSet();BatchRedisCacheWriter cacheWriter = new DefaultBatchRedisCacheWriter(redisTemplate);// 创建BatchRedisCacheManagerBatchRedisCacheManager cacheManager = new BatchRedisCacheManager(cacheWriter, config);cacheManager.setTransactionAware(true);// 设置缓存名称和配置的映射Map<String, RedisCacheConfiguration> cacheConfigurations = new HashMap<>();// 可以为不同的缓存名称配置不同的策略cacheConfigurations.put("batchCache", config.entryTtl(Duration.ofHours(1)));cacheManager.setCacheConfigurations(cacheConfigurations);return cacheManager;}/*** 自定义CacheResolver,支持BatchCache** @param cacheManager 缓存管理器* @return 缓存解析器*/@Beanpublic CacheResolver batchCacheResolver(BatchRedisCacheManager cacheManager) {return new SimpleCacheResolver(cacheManager) {@Overrideprotected Collection<? extends Cache> getCaches(CacheOperationInvocationContext<?> context) {Collection<? extends Cache> caches = super.getCaches(context);return caches.stream().map(cache -> {if (cache instanceof RedisCache && !(cache instanceof BatchRedisCache)) {// 将普通RedisCache转换为BatchRedisCacheRedisCache redisCache = (RedisCache) cache;return new BatchRedisCache(redisCache.getName(),(BatchRedisCacheWriter) cacheManager.getCacheWriter(),redisCache.getCacheConfiguration());}return cache;}).collect(Collectors.toList());}};}
}
四、使用示例
4.1 定义业务服务
/*** 示例服务类,演示BatchCache的使用** @author doubao*/
@Service
public class UserService {private final UserRepository userRepository;@Autowiredpublic UserService(UserRepository userRepository) {this.userRepository = userRepository;}/*** 批量获取用户信息,使用缓存** @param userIds 用户ID集合* @return 用户信息映射*/@Cacheable(value = "users", key = "#root.methodName + '_' + #userIds", unless = "#result == null")public Map<Long, User> getUsersBatch(Collection<Long> userIds) {// 模拟从数据库获取数据return userIds.stream().collect(Collectors.toMap(Function.identity(),userId -> userRepository.findById(userId).orElse(null)));}/*** 批量保存用户信息,并更新缓存** @param users 用户信息集合*/@CachePut(value = "users", key = "#root.methodName + '_' + #users.![id]", condition = "#users != null && !#users.isEmpty()")public Map<Long, User> saveUsersBatch(Collection<User> users) {// 模拟批量保存到数据库users.forEach(userRepository::save);// 返回保存后的用户信息return users.stream().collect(Collectors.toMap(User::getId, Function.identity()));}/*** 批量删除用户信息,并清除缓存** @param userIds 用户ID集合*/@CacheEvict(value = "users", allEntries = false, key = "#root.methodName + '_' + #userIds")public void deleteUsersBatch(Collection<Long> userIds) {// 模拟批量删除userIds.forEach(userRepository::deleteById);}/*** 直接使用BatchCache接口的批量操作** @param userIds 用户ID集合* @return 用户信息映射*/public Map<Long, User> getUsersBatchWithBatchCache(Collection<Long> userIds) {// 通过ApplicationContext获取BatchCacheBatchCache batchCache = (BatchCache) applicationContext.getBean("cacheManager").getCache("users");// 直接使用BatchCache的批量获取方法Map<Object, Object> cacheResults = batchCache.getAll(userIds);// 处理缓存结果Map<Long, User> result = new HashMap<>();for (Map.Entry<Object, Object> entry : cacheResults.entrySet()) {Long userId = (Long) entry.getKey();User user = (User) entry.getValue();if (user == null) {// 缓存未命中,从数据库获取user = userRepository.findById(userId).orElse(null);if (user != null) {// 手动放入缓存batchCache.put(userId, user);}}result.put(userId, user);}return result;}
}
4.2 配置文件示例
spring:redis:host: localhostport: 6379password:timeout: 10000mslettuce:pool:max-active: 8max-wait: -1msmax-idle: 8min-idle: 0cache:type: redisredis:time-to-live: 600000 # 10分钟cache-null-values: trueuse-key-prefix: truekey-prefix: batch_cache:
五、性能测试与优化
5.1 性能测试框架
/*** BatchCache性能测试** @author doubao*/
@SpringBootTest
public class BatchCachePerformanceTest {@Autowiredprivate UserService userService;@Autowiredprivate CacheManager cacheManager;private static final int TEST_SIZE = 1000;private static final int WARMUP_TIMES = 10;private static final int TEST_TIMES = 100;@BeforeEachpublic void setUp() {// 准备测试数据List<User> users = new ArrayList<>(TEST_SIZE);for (int i = 0; i < TEST_SIZE; i++) {User user = new User();user.setId((long) i);user.setName("User" + i);user.setAge(20 + i % 30);users.add(user);}// 预热for (int i = 0; i < WARMUP_TIMES; i++) {userService.saveUsersBatch(users);userService.getUsersBatch(users.stream().map(User::getId).collect(Collectors.toList()));}// 清除缓存Cache usersCache = cacheManager.getCache("users");if (usersCache != null) {usersCache.clear();}}@Testpublic void testBatchGetPerformance() {List<Long> userIds = IntStream.range(0, TEST_SIZE).mapToObj(Long::valueOf).collect(Collectors.toList());// 测试单键获取性能long singleStartTime = System.currentTimeMillis();for (int i = 0; i < TEST_TIMES; i++) {for (Long userId : userIds) {userService.getUser(userId);}}long singleEndTime = System.currentTimeMillis();long singleTotalTime = singleEndTime - singleStartTime;// 测试批量获取性能long batchStartTime = System.currentTimeMillis();for (int i = 0; i < TEST_TIMES; i++) {userService.getUsersBatch(userIds);}long batchEndTime = System.currentTimeMillis();long batchTotalTime = batchEndTime - batchStartTime;// 输出性能结果System.out.printf("单键获取 %d 次,总耗时: %d ms,平均每次: %f ms%n",TEST_SIZE * TEST_TIMES, singleTotalTime, (double) singleTotalTime / (TEST_SIZE * TEST_TIMES));System.out.printf("批量获取 %d 次,总耗时: %d ms,平均每次: %f ms%n",TEST_TIMES, batchTotalTime, (double) batchTotalTime / TEST_TIMES);System.out.printf("批量操作性能提升: %.2f%%%n",(1 - (double) batchTotalTime / singleTotalTime) * 100);}// 其他性能测试方法...
}
5.2 性能优化策略
- 批量操作优化:
- 使用Redis的MGET、MSET等批量命令
- 合理设置批量操作的大小,避免单次操作过大
- 考虑使用Redis Pipeline提升性能
- 序列化优化:
- 使用高效的序列化方式,如Kryo、Protostuff等
- 避免序列化大对象,可考虑拆分数据
- 缓存配置优化:
- 根据业务场景设置合理的TTL
- 使用分区缓存,避免不同业务数据相互影响
- 考虑使用二级缓存(如Caffeine + Redis)提升性能
六、注意事项与最佳实践
6.1 使用注意事项
- 事务支持:
- Spring Cache的@Cacheable、@CachePut等注解不支持事务回滚
- 如果需要事务支持,建议在业务代码中直接使用BatchCache接口
- 异常处理:
- 批量操作可能部分成功部分失败,需要业务层处理这种情况
- 考虑实现重试机制,确保操作的最终一致性
- 缓存穿透与雪崩:
- 批量操作同样需要防范缓存穿透问题
- 合理设置不同数据的TTL,避免缓存雪崩
6.2 最佳实践
- 批量操作大小控制:
- 对于大量数据的批量操作,建议分批处理
- 每批大小可根据网络情况和Redis性能调整,一般建议在100-1000之间
- 缓存预热:
- 对于热点数据,启动时进行缓存预热
- 使用BatchCache的批量操作能力快速填充缓存
- 监控与告警:
- 监控批量操作的性能指标,如QPS、响应时间等
- 设置合理的告警阈值,及时发现性能问题
通过以上方案,我们成功地扩展了Spring Boot Cache的功能,实现了兼容RedisCache的BatchCache接口。这种实现方式不仅保持了与Spring Cache生态的兼容性,还显著提升了批量数据操作的性能,为高并发场景下的应用提供了有力支持。