当前位置: 首页 > news >正文

如何用单张gpu跑sglang的数据并行

有时候,你只有一张GPU显卡或者一张低配(显存)显卡,为了方便开发测试,需要模拟在单卡上执行分布式数据并行操作,默认sglang是不支持的,需要做简单的修改,并控制执行脚本(参数配置),从而支持单卡上跑多个sglang的data parallel scheduler process,前提是你对sglang的代码很熟悉。

  • 代码改动及说明

代码文件:python/sglang/srt/managers/data_parallel_controller.py
类名及功能:class DataParallelController:这个类是data parallel(数据并行的)调度器,一般情况下,一个主进程process,会根据ServerArgs.py中dp_size的大小,为不同的dp_rank初始化不同的Scheduler(进程),主进程负责向dp_size的子进程(Scheduler)发送来自于tokenizer进程的request,子进程scheduler接到request后便开始通过worker->model runner执行torch module的forward处理(即,模型的前向推理),进程间通信是通过zmq context socket进行的。

修改如下:

def launch_dp_schedulers(self, server_args, port_args):base_gpu_id = 0threads = []sockets = []dp_port_args = []ready_events = []for dp_rank in range(server_args.dp_size):tmp_port_args = PortArgs.init_new(server_args)tmp_port_args.tokenizer_ipc_name = port_args.tokenizer_ipc_nametmp_port_args.detokenizer_ipc_name = port_args.detokenizer_ipc_namedp_port_args.append(tmp_port_args)# This port is checked free in PortArgs.init_new.# We hold it first so that the next dp worker gets a different portsockets.append(bind_port(tmp_port_args.nccl_port))ready_event = threading.Event()ready_events.append(ready_event)# Create a thread for each workerthread = threading.Thread(target=self.launch_tensor_parallel_group_thread,args=(server_args, tmp_port_args, base_gpu_id, dp_rank, ready_event),)threads.append(thread)# 重点是这里,需要强制为base_gpu_id=0(如果雅观的修改,需要通过ServerArgs.py传入控制信息,从而区别是不是单卡模拟多卡的data parallel)# 假设dp_size = 2,则,修改完,运行后,dp_rank 0和dp_rank 1都会运行在device 0上# 因为gpu_id是一直透传的,经过worker->model runner->torch.distributed,如果不需要这句话,那么底层torch.distributed初始化的时候,torch.cuda.set_device会报错(因为只有1张卡,但gpu_id=1)#base_gpu_id += server_args.tp_size * server_args.gpu_id_step# Free all sockets before starting the threads to launch TP workersfor sock in sockets:sock.close()def launch_tensor_parallel_group_thread(self,server_args: ServerArgs,port_args: PortArgs,base_gpu_id: int,dp_rank: int,ready_event: threading.Event,):self.launch_tensor_parallel_group(server_args, port_args, base_gpu_id, dp_rank)ready_event.set()# This thread cannot be closed because otherwise the `kill_itself_when_parent_died`# function in scheduler.py will kill the scheduler.while True:time.sleep(30 * 24 * 3600)def launch_dp_attention_schedulers(self, server_args, port_args):self.launch_tensor_parallel_group(server_args, port_args, 0, None)dp_port_args = []for dp_rank in range(server_args.dp_size):dp_port_args.append(PortArgs.init_new(server_args, dp_rank))return dp_port_argsdef launch_tensor_parallel_group(self,server_args: ServerArgs,port_args: PortArgs,base_gpu_id: int,dp_rank: int,):if not server_args.enable_dp_attention:logger.info(f"Launch DP{dp_rank} starting at GPU #{base_gpu_id}.")memory_saver_adapter = TorchMemorySaverAdapter.create(enable=server_args.enable_memory_saver)scheduler_pipe_readers = []nnodes_per_tp_group = max(server_args.nnodes // server_args.pp_size, 1)tp_size_per_node = server_args.tp_size // nnodes_per_tp_grouptp_rank_range = range(tp_size_per_node * (server_args.node_rank % nnodes_per_tp_group),tp_size_per_node * (server_args.node_rank % nnodes_per_tp_group + 1),)pp_size_per_node = max(server_args.pp_size // server_args.nnodes, 1)pp_rank_range = range(pp_size_per_node * (server_args.node_rank // nnodes_per_tp_group),pp_size_per_node * (server_args.node_rank // nnodes_per_tp_group + 1),)for pp_rank in pp_rank_range:for tp_rank in tp_rank_range:rank_port_args = port_argsif server_args.enable_dp_attention:# dp attention has different sharding logic_, _, dp_rank = compute_dp_attention_world_info(server_args.enable_dp_attention,tp_rank,server_args.tp_size,server_args.dp_size,)# compute zmq ports for this dp rankrank_port_args = PortArgs.init_new(server_args, dp_rank)# Data parallelism reuses the tensor parallelism group,# so all dp ranks should use the same nccl port.rank_port_args.nccl_port = port_args.nccl_portreader, writer = mp.Pipe(duplex=False)gpu_id = (server_args.base_gpu_id+ base_gpu_id+ ((pp_rank % pp_size_per_node) * tp_size_per_node)+ (tp_rank % tp_size_per_node) * server_args.gpu_id_step)moe_ep_rank = tp_rank // (server_args.tp_size // server_args.ep_size)proc = mp.Process(target=run_scheduler_process,args=(server_args,rank_port_args,gpu_id,tp_rank,moe_ep_rank,pp_rank,dp_rank,writer,self.balance_meta,),)with memory_saver_adapter.configure_subprocess():proc.start()self.scheduler_procs.append(proc)scheduler_pipe_readers.append(reader)# Wait for model to finish loadingscheduler_info = []for i in range(len(scheduler_pipe_readers)):scheduler_info.append(scheduler_pipe_readers[i].recv())self.max_total_num_tokens = scheduler_info[0]["max_total_num_tokens"]self.max_req_input_len = scheduler_info[0]["max_req_input_len"]
  • 脚本改动及说明

说明:为了支持两个scheduler进程运行在同一张卡上(两个进程会各自加载一份weight,各自初始化kv cache等static memory,会导致GPU显存OOM),需要配置不同脚本参数,降低memory使用,具体参数含义参考ServerArgs.py中的注释 (前提:需要明白大模型LLM推理的一些概念,如:prefill/decode/max total tokens/input sequence length/output sequence length/total sequence length等)

脚本:python test.py --model-path (你的模型路径) --host 127.0.0.1 --dp 2 --tp 1 --mem-fraction-static 0.7 --max-total-tokens 128 --cuda-graph-max-bs 4

http://www.dtcms.com/a/348860.html

相关文章:

  • Java全栈开发面试实战:从基础到高并发场景的深度解析
  • MATLAB 与 Python 数据交互:数据导入、导出及联合分析技巧
  • `free` 内存释放函数
  • 【蓝桥杯 2024 省 C】挖矿
  • K8s 实战:六大核心控制器
  • yggjs_rlayout框架v0.1.2使用教程 01快速开始
  • python---类
  • 服装生产跟单系统是什么?主要功能有哪些?
  • 【51单片机按键控制LED按下位移】2022-11-12
  • 若依4.7.8(springboot2.5.15)升级到4.8.1(springboot3.3.5)并集成Dubbo3客户端
  • cmake--CPack/deb
  • Linux系统编程——网络协议
  • The United Nations Is Already Dead
  • comfyUI背后的一些技术——CLIP
  • LeetCode 热题100——56.合并区间
  • 【Docker项目实战】使用Docker部署轻量级LetsMarkdown文本编辑器
  • kafka基本思路即概念
  • PCIE总线接口TSN网卡
  • 【DeepResearch调研】大模型多跳推理能力的深度解析:瓶颈、去偏研究与前沿进展
  • C++(vector):
  • 笔试——Day48
  • 【C++组件】ODB 安装与使用
  • LeetCode 42.接雨水
  • 【Flex SerialPort】一个基于Qt6的支持自定义按键指令的串口工具
  • 浏览器发送网页详细过程分解
  • 释放工作精力:火语言 RPA 的实用功能与效率提升​
  • VMware centos磁盘容量扩容教程
  • 解决虚拟机network服务启动失败问题
  • Linux中的指令
  • 从字节码层面剖析以太坊智能合约创建原理