当前位置: 首页 > news >正文

【redis】pipeline管道

Redis Pipeline(管道)是一种将多个命令批量发送到服务器的技术。与逐个发送命令的传统方式不同,Pipeline允许客户端一次性打包多条命令,通过单次网络往返完成批量操作。

为什么需要Pipeline?

在常规操作模式下,每个Redis命令的执行流程为:

客户端发送命令 -> 服务器处理 -> 客户端接收响应

当执行N条命令时会产生N次网络往返延迟(RTT)。

而Pipeline通过批量发送机制:

客户端打包命令1,2,3... -> 服务器顺序处理 -> 批量返回响应

将网络开销从O(N)降低到O(1),在批量操作场景下可带来数十倍的性能提升

适用场景:

  • 批量写入/读取数据

  • 需要执行多个无关联命令

  • 高并发且对实时性要求不高的操作

Pipeline命令的使用

命令行使用

使用redis-cli--pipe参数实现Pipeline:

# 生成测试命令数据(生成5条SET命令)
$ printf "SET key%d value%d\n" $(seq 1 10) > commands.txt

# 查询生成的测试命令数据
$ cat commands.txt
SET key1 value2
SET key3 value4
SET key5 value6
SET key7 value8
SET key9 value10

# 通过Pipeline执行
$ cat commands.txt | redis-cli --pipe
All data transferred. Waiting for the last reply...
Last reply received from server.
errors: 0, replies: 5

Java代码实现

使用Jedis客户端库示例:

package com.morris.redis.demo.pipeline;


import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;

import java.util.List;
import java.util.concurrent.ExecutionException;

/**
 * jedis使用pipeline
 */
public class JedisPipelineDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Jedis jedis = new Jedis("localhost", 6379);
        Pipeline pipeline = jedis.pipelined();

        // 批量写入
        for (int i = 1; i <= 10000; i++) {
            pipeline.set("key" + i, "value" + i);
        }

        // 提交并获取响应(注意响应顺序与命令顺序一致)
        List<Object> responses = pipeline.syncAndReturnAll();
        System.out.println("写入完成,响应数量:" + responses.size());
    }
}

性能对比测试

测试代码:

package com.morris.redis.demo.pipeline;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;

/**
 * pipeline和单命令执行性能对比
 */
public class PerformanceTestDemo {

    public static final int count = 1000;

    public static void main(String[] args) {
        // 普通模式
        long start = System.currentTimeMillis();
        try (Jedis jedis = new Jedis("localhost", 6379)) {
            for (int i = 0; i < count; i++) {
                jedis.set("normal_" + i, "value");
            }
        }
        long normalCost = System.currentTimeMillis() - start;

        // Pipeline模式
        start = System.currentTimeMillis();
        try (Jedis jedis = new Jedis("localhost", 6379)) {
            Pipeline pipeline = jedis.pipelined();
            for (int i = 0; i < count; i++) {
                pipeline.set("pipe_" + i, "value");
            }
            pipeline.sync();
        }
        long pipeCost = System.currentTimeMillis() - start;

        System.out.printf("操作次数: %d | 普通模式: %dms | Pipeline模式: %dms\n",
                count, normalCost, pipeCost);
    }
}

测试结果:

操作次数: 1000 | 普通模式: 2784ms | Pipeline模式: 2140ms
操作次数: 10000 | 普通模式: 8315ms | Pipeline模式: 185ms
操作次数: 100000 | 普通模式: 52180ms | Pipeline模式: 2479ms

注意事项

  1. 批量大小控制:建议单次Pipeline命令数控制在10,000以内

  2. 非原子性:Pipeline不是事务,中间命令失败不会影响后续执行

  3. 内存消耗:服务端会将所有响应缓存在内存中,超大Pipeline可能引发OOM

  4. 错误处理:通过解析响应列表检查每个命令的执行结果

相关文章:

  • 第八章:C++ 实践
  • 调试正常 ≠ 运行正常:Keil5中MicroLIB的“量子态BUG”破解实录
  • 【Java面试题汇总】Java面试100道最新合集!
  • 笔记六:单链表链表介绍与模拟实现
  • cocos creator使用mesh修改图片为圆形,减少使用mask,j减少drawcall,优化性能
  • Linux 进程信息查看
  • docker私有仓库配置
  • π0源码剖析——从π0模型架构的实现(如何基于PaLI-Gemma和扩散策略去噪生成动作),到基于C/S架构下的模型训练与部署
  • 深度学习数值精度详细对比:BF16、FP16、FP32
  • 【商城实战(18)】后台管理系统基础搭建:从0到1构建电商中枢
  • 大空间多人互动技术、大空间LBE、VR大空间什么意思?如何实现?
  • from psbody.mesh import MeshModuleNotFoundError: No module named ‘psbody‘
  • AI算法与应用 全栈开发 前端开发 后端开发 测试开发 运维开发
  • Ubuntu22.04修改root用户并安装cuda
  • 解锁「3D格式转换SDK」HOOPS Exchange高质量B-REP功能的三大应用场景
  • 基于单片机的智慧音乐播放系统研究
  • Java多线程与高并发专题——阻塞队列常用方法与区别
  • 推动人工智能从“通用”向“专用”转变:GAI认证如何助力个人职业生涯
  • 1688店铺所有商品数据接口详解
  • Android 源码下载以及编译指南
  • 消费维权周报丨上周涉汽车类投诉较多,涉加油“跳枪”等问题
  • 上海迪士尼蜘蛛侠主题园区正式动工,毗邻“疯狂动物城”
  • 倒计时1天:走进“中国荔乡”茂名,探寻农交文旅商融合发展新模式
  • 假冒政府机构账号卖假货?“假官号”为何屡禁不绝?媒体调查
  • 国内规模最大女子赛艇官方赛事在沪启航,中外41支队伍逐浪
  • 杨建全已任天津市委副秘书长、市委市政府信访办主任