redis的pipline使用结合线程池优化实战
文章目录
- 代码
- 讲解
- 与事务 (`MULTI/EXEC`) 的区别
- 在你这段代码里的价值
- 可能的坑
- 实战建议
代码
/*** 批量根据用户 ID 查询用户信息** @param findUsersByIdsReqDTO* @return*/@Overridepublic Response<List<FindUserByIdRspDTO>> findByIds(FindUsersByIdsReqDTO findUsersByIdsReqDTO) {// 需要查询的用户 ID 集合List<Long> userIds = findUsersByIdsReqDTO.getIds();// 构建 Redis Key 集合List<String> redisKeys = userIds.stream().map(RedisKeyConstants::buildUserInfoKey).toList();// 先从 Redis 缓存中查, multiGet 批量查询提升性能List<Object> redisValues = redisTemplate.opsForValue().multiGet(redisKeys);// 如果缓存中不为空if (CollUtil.isNotEmpty(redisValues)) {// 过滤掉为空的数据redisValues = redisValues.stream().filter(Objects::nonNull).toList();}// 返参List<FindUserByIdRspDTO> findUserByIdRspDTOS = null;// 将过滤后的缓存集合,转换为 DTO 返参实体类if (CollUtil.isNotEmpty(redisValues)) {findUserByIdRspDTOS = redisValues.stream().map(value -> JsonUtils.parseObject(String.valueOf(value), FindUserByIdRspDTO.class)).toList();}// 如果被查询的用户信息,都在 Redis 缓存中, 则直接返回if (CollUtil.size(userIds) == CollUtil.size(findUserByIdRspDTOS)) {return Response.success(findUserByIdRspDTOS);}// 还有另外两种情况:一种是缓存里没有用户信息数据,还有一种是缓存里数据不全,需要从数据库中补充// 筛选出缓存里没有的用户数据,去查数据库List<Long> userIdsNeedQuery = null;if (CollUtil.isNotEmpty(findUserByIdRspDTOS)) {// 将 findUserInfoByIdRspDTOS 集合转 MapMap<Long, FindUserByIdRspDTO> map = findUserByIdRspDTOS.stream().collect(Collectors.toMap(FindUserByIdRspDTO::getId, p -> p));// 筛选出需要查 DB 的用户 IDuserIdsNeedQuery = userIds.stream().filter(id -> Objects.isNull(map.get(id))).toList();} else { // 缓存中一条用户信息都没查到,则提交的用户 ID 集合都需要查数据库userIdsNeedQuery = userIds;}// 从数据库中批量查询List<UserDO> userDOS = userDOMapper.selectByIds(userIdsNeedQuery);List<FindUserByIdRspDTO> findUserByIdRspDTOS2 = null;// 若数据库查询的记录不为空if (CollUtil.isNotEmpty(userDOS)) {// DO 转 DTOfindUserByIdRspDTOS2 = userDOS.stream().map(userDO -> FindUserByIdRspDTO.builder().id(userDO.getId()).nickName(userDO.getNickname()).avatar(userDO.getAvatar()).introduction(userDO.getIntroduction()).build()).collect(Collectors.toList());// 异步线程将用户信息同步到 Redis 中List<FindUserByIdRspDTO> finalFindUserByIdRspDTOS = findUserByIdRspDTOS2;threadPoolTaskExecutor.submit(() -> {// DTO 集合转 MapMap<Long, FindUserByIdRspDTO> map = finalFindUserByIdRspDTOS.stream().collect(Collectors.toMap(FindUserByIdRspDTO::getId, p -> p));// 执行 pipeline 操作redisTemplate.executePipelined((RedisCallback<Void>) connection -> {for (UserDO userDO : userDOS) {Long userId = userDO.getId();// 用户信息缓存 Redis KeyString userInfoRedisKey = RedisKeyConstants.buildUserInfoKey(userId);// DTO 转 JSON 字符串FindUserByIdRspDTO findUserInfoByIdRspDTO = map.get(userId);String value = JsonUtils.toJsonString(findUserInfoByIdRspDTO);// 过期时间(保底1天 + 随机秒数,将缓存过期时间打散,防止同一时间大量缓存失效,导致数据库压力太大)long expireSeconds = 60*60*24 + RandomUtil.randomInt(60*60*24);redisTemplate.opsForValue().set(userInfoRedisKey, value, expireSeconds, TimeUnit.SECONDS);}return null;});});}// 合并数据if (CollUtil.isNotEmpty(findUserByIdRspDTOS) && CollUtil.isNotEmpty(findUserByIdRspDTOS2)) {findUserByIdRspDTOS.addAll(findUserByIdRspDTOS2);}return Response.success(findUserByIdRspDTOS);}
讲解
在 RedisTemplate.executePipelined(…) 里把多条 SET
操作放进 pipeline(管道),主要有 3 个好处——简记“省时、省包、增吞吐”。
优势 | 说明 | 什么时候最明显 |
---|---|---|
1. 省时:减少 RTT | 一次 pipeline 会先把 N 条命令 打包写入 Socket,然后再统一读回 N 条结果。 原来一条命令要一次 Request → Response 往返(RTT),现在 N 条命令只要 1 次 RTT。 | 网络时延不小(跨 IDC、云 Redis)或批量写入上万条键时,节省几十到上百毫秒 |
2. 省包:减少 TCP/IO 开销 | 连发 1000 条 SET ,不做管道 = 1000 个包;管道 = 1 个包写 + 1 个包读。少了大量系统调用与 TCP 包头,CPU 使用率和带宽占用都更低。 | 批量插入/更新、导入数据、刷新缓存 |
3. 增吞吐:服务器侧一次性处理 | Redis 收到整段命令流后,在单线程内部顺序执行,避免了命令‐等待‐回复的空闲间隙,同一时间窗口可以执行更多指令。 | 高并发场景下,比单条调用 QPS 可提升数倍 |
👉 简化理解:把 1000 趟“银行柜台排队”换成“把 1000 张单子一次塞给柜员”。
与事务 (MULTI/EXEC
) 的区别
特性 | Pipeline | MULTI…EXEC 事务 |
---|---|---|
原子性 | ❌ 不保证;命令只是顺序执行 | ✔ 执行期内一次性、原子提交 |
回包时机 | 一口气回 N 条结果 | EXEC 时一次性回所有结果 |
用途 | 性能优化(批量、同步刷库) | 一致性(要么全部成功要么失败) |
很多业务里两者会 一起用:先
MULTI
、把命令写进管道、最后EXEC
,既批量又原子。
在你这段代码里的价值
redisTemplate.executePipelined((RedisCallback<Void>) conn -> {for (UserDO u : userDOS) {conn.stringCommands().set(rawKey(u.getId()), // set keyrawValue(dtoJson(u)), // set valueExpiration.seconds(ttl), // expireSetOption.UPSERT // NX/XX 选项);}return null;
});
- 用管道写入全部用户缓存:
- 只走 1 次 Socket 写 + 1 次读,避免 300 条
set
* 300 次 RTT。
- 只走 1 次 Socket 写 + 1 次读,避免 300 条
- 线程池异步执行:主线程即刻返回,提升接口响应速度。
- TTL 打散:即便一起写,也给每条 key 加随机过期;结合管道不会产生“雪崩”。
可能的坑
- 结果一次性回传 —— 如果 N 很大,客户端要等所有命令执行完才能拿到结果,期间内存会上涨。
- 非原子 —— 若中间有一条命令写失败,不会回滚之前已成功的命令;业务需要能接受“部分成功”。
- 大批量要分段 —— 数十万条写同一管道可能撑爆客户端/服务器缓冲区,建议 5k~10k 条一批。
实战建议
- 小批量 (< 20) 就别开管道,省下序列化/反序列化集合的开销。
- 中批量 (几十~几千):管道最划算,RTT 成本明显下降。
- 大批量 (万级以上):用
SCAN
+ 管道分批,或直接走异步任务/数据导入工具。
一句话:Pipeline 让“成百上千条命令”在网络上看起来像“一条”,极大降低网络和系统调用成本,是 Java 里批量读写 Redis 的首选加速器。