当前位置: 首页 > news >正文

redis的pipline使用结合线程池优化实战

文章目录

  • 代码
  • 讲解
      • 与事务 (`MULTI/EXEC`) 的区别
      • 在你这段代码里的价值
      • 可能的坑
        • 实战建议

代码

 /*** 批量根据用户 ID 查询用户信息** @param findUsersByIdsReqDTO* @return*/@Overridepublic Response<List<FindUserByIdRspDTO>> findByIds(FindUsersByIdsReqDTO findUsersByIdsReqDTO) {// 需要查询的用户 ID 集合List<Long> userIds = findUsersByIdsReqDTO.getIds();// 构建 Redis Key 集合List<String> redisKeys = userIds.stream().map(RedisKeyConstants::buildUserInfoKey).toList();// 先从 Redis 缓存中查, multiGet 批量查询提升性能List<Object> redisValues = redisTemplate.opsForValue().multiGet(redisKeys);// 如果缓存中不为空if (CollUtil.isNotEmpty(redisValues)) {// 过滤掉为空的数据redisValues = redisValues.stream().filter(Objects::nonNull).toList();}// 返参List<FindUserByIdRspDTO> findUserByIdRspDTOS = null;// 将过滤后的缓存集合,转换为 DTO 返参实体类if (CollUtil.isNotEmpty(redisValues)) {findUserByIdRspDTOS = redisValues.stream().map(value -> JsonUtils.parseObject(String.valueOf(value), FindUserByIdRspDTO.class)).toList();}// 如果被查询的用户信息,都在 Redis 缓存中, 则直接返回if (CollUtil.size(userIds) == CollUtil.size(findUserByIdRspDTOS)) {return Response.success(findUserByIdRspDTOS);}// 还有另外两种情况:一种是缓存里没有用户信息数据,还有一种是缓存里数据不全,需要从数据库中补充// 筛选出缓存里没有的用户数据,去查数据库List<Long> userIdsNeedQuery = null;if (CollUtil.isNotEmpty(findUserByIdRspDTOS)) {// 将 findUserInfoByIdRspDTOS 集合转 MapMap<Long, FindUserByIdRspDTO> map = findUserByIdRspDTOS.stream().collect(Collectors.toMap(FindUserByIdRspDTO::getId, p -> p));// 筛选出需要查 DB 的用户 IDuserIdsNeedQuery = userIds.stream().filter(id -> Objects.isNull(map.get(id))).toList();} else { // 缓存中一条用户信息都没查到,则提交的用户 ID 集合都需要查数据库userIdsNeedQuery = userIds;}// 从数据库中批量查询List<UserDO> userDOS = userDOMapper.selectByIds(userIdsNeedQuery);List<FindUserByIdRspDTO> findUserByIdRspDTOS2 = null;// 若数据库查询的记录不为空if (CollUtil.isNotEmpty(userDOS)) {// DO 转 DTOfindUserByIdRspDTOS2 = userDOS.stream().map(userDO -> FindUserByIdRspDTO.builder().id(userDO.getId()).nickName(userDO.getNickname()).avatar(userDO.getAvatar()).introduction(userDO.getIntroduction()).build()).collect(Collectors.toList());// 异步线程将用户信息同步到 Redis 中List<FindUserByIdRspDTO> finalFindUserByIdRspDTOS = findUserByIdRspDTOS2;threadPoolTaskExecutor.submit(() -> {// DTO 集合转 MapMap<Long, FindUserByIdRspDTO> map = finalFindUserByIdRspDTOS.stream().collect(Collectors.toMap(FindUserByIdRspDTO::getId, p -> p));// 执行 pipeline 操作redisTemplate.executePipelined((RedisCallback<Void>) connection -> {for (UserDO userDO : userDOS) {Long userId = userDO.getId();// 用户信息缓存 Redis KeyString userInfoRedisKey = RedisKeyConstants.buildUserInfoKey(userId);// DTO 转 JSON 字符串FindUserByIdRspDTO findUserInfoByIdRspDTO = map.get(userId);String value = JsonUtils.toJsonString(findUserInfoByIdRspDTO);// 过期时间(保底1天 + 随机秒数,将缓存过期时间打散,防止同一时间大量缓存失效,导致数据库压力太大)long expireSeconds = 60*60*24 + RandomUtil.randomInt(60*60*24);redisTemplate.opsForValue().set(userInfoRedisKey, value, expireSeconds, TimeUnit.SECONDS);}return null;});});}// 合并数据if (CollUtil.isNotEmpty(findUserByIdRspDTOS) && CollUtil.isNotEmpty(findUserByIdRspDTOS2)) {findUserByIdRspDTOS.addAll(findUserByIdRspDTOS2);}return Response.success(findUserByIdRspDTOS);}

讲解

RedisTemplate.executePipelined(…) 里把多条 SET 操作放进 pipeline(管道),主要有 3 个好处——简记“省时、省包、增吞吐”。

优势说明什么时候最明显
1. 省时:减少 RTT一次 pipeline 会先把 N 条命令 打包写入 Socket,然后再统一读回 N 条结果
原来一条命令要一次 Request → Response 往返(RTT),现在 N 条命令只要 1 次 RTT
网络时延不小(跨 IDC、云 Redis)或批量写入上万条键时,节省几十到上百毫秒
2. 省包:减少 TCP/IO 开销连发 1000 条 SET不做管道 = 1000 个包;管道 = 1 个包写 + 1 个包读
少了大量系统调用与 TCP 包头,CPU 使用率和带宽占用都更低。
批量插入/更新、导入数据、刷新缓存
3. 增吞吐:服务器侧一次性处理Redis 收到整段命令流后,在单线程内部顺序执行,避免了命令‐等待‐回复的空闲间隙,同一时间窗口可以执行更多指令。高并发场景下,比单条调用 QPS 可提升数倍

👉 简化理解:把 1000 趟“银行柜台排队”换成“把 1000 张单子一次塞给柜员”。


与事务 (MULTI/EXEC) 的区别

特性PipelineMULTI…EXEC 事务
原子性❌ 不保证;命令只是顺序执行✔ 执行期内一次性、原子提交
回包时机一口气回 N 条结果EXEC 时一次性回所有结果
用途性能优化(批量、同步刷库)一致性(要么全部成功要么失败)

很多业务里两者会 一起用:先 MULTI、把命令写进管道、最后 EXEC,既批量又原子。


在你这段代码里的价值

redisTemplate.executePipelined((RedisCallback<Void>) conn -> {for (UserDO u : userDOS) {conn.stringCommands().set(rawKey(u.getId()),          // set keyrawValue(dtoJson(u)),       // set valueExpiration.seconds(ttl),    // expireSetOption.UPSERT            // NX/XX 选项);}return null;
});
  1. 用管道写入全部用户缓存
    • 只走 1 次 Socket 写 + 1 次读,避免 300 条 set * 300 次 RTT。
  2. 线程池异步执行:主线程即刻返回,提升接口响应速度。
  3. TTL 打散:即便一起写,也给每条 key 加随机过期;结合管道不会产生“雪崩”。

可能的坑

  1. 结果一次性回传 —— 如果 N 很大,客户端要等所有命令执行完才能拿到结果,期间内存会上涨。
  2. 非原子 —— 若中间有一条命令写失败,不会回滚之前已成功的命令;业务需要能接受“部分成功”。
  3. 大批量要分段 —— 数十万条写同一管道可能撑爆客户端/服务器缓冲区,建议 5k~10k 条一批。

实战建议
  • 小批量 (< 20) 就别开管道,省下序列化/反序列化集合的开销。
  • 中批量 (几十~几千):管道最划算,RTT 成本明显下降。
  • 大批量 (万级以上):用 SCAN + 管道分批,或直接走异步任务/数据导入工具。

一句话:Pipeline 让“成百上千条命令”在网络上看起来像“一条”,极大降低网络和系统调用成本,是 Java 里批量读写 Redis 的首选加速器。

相关文章:

  • 精益数据分析(63/126):移情阶段的深度潜入——从用户生活到产品渗透的全链路解析
  • linux——mysql高可用
  • 用 CodeBuddy 打造我的「TextBeautifier」文本美化引擎
  • SEO 优化实战:ZKmall模板商城的 B2C商城的 URL 重构与结构化数据
  • Webpack DefinePlugin插件介绍(允许在编译时创建JS全局常量,常量可以在源代码中直接使用)JS环境变量
  • TCP/UDP协议原理和区别 笔记
  • RAGFlow Arbitrary Account Takeover Vulnerability
  • python的漫画网站管理系统
  • 目标检测工作原理:从滑动窗口到Haar特征检测的完整实现
  • 现代健康养生新风尚
  • 【前端基础】10、CSS的伪元素(::first-line、::first-letter、::before、::after)【注:极简描述】
  • upload-labs通关笔记-第10关 文件上传之点多重过滤(空格点绕过)
  • 【JavaWeb】MySQL
  • Github 2025-05-17 Rust开源项目日报 Top10
  • STM32 | FreeRTOS 递归信号量
  • 理解 plank 自动生成的 copyWithBlock: 方法
  • java函数内的变量问题
  • 永久免费!专为 Apache Doris 打造的可视化数据管理工具 SelectDB Studio V1.1.0 重磅发布!
  • 素数筛(欧拉筛算法)
  • 游戏引擎学习第288天:继续完成Brains
  • “先增聘再离任”又添一例,景顺长城基金经理鲍无可官宣辞职
  • 中国恒大披露清盘进展:要求债权人提交债权证明表
  • 租车订单时隔7年从花呗免密扣费?“GoFun出行”引质疑
  • 上海静安将发放七轮文旅消费券,住宿券最高满800元减250元
  • 宜昌谱写新叙事:长江大保护与高质量发展如何相互成就
  • 端午假期购票日历发布,今日可购买5月29日火车票