简单串行执行
from transformers import Qwen2_5_VLForConditionalGeneration, AutoProcessor
from qwen_vl_utils import process_vision_info
import torch, time, threadingdef llm(model_path,prompt=None,image=None,video=None,images=None,videos=None,max_new_tokens=2048,temperature=0.6,
):"""model_path: 模型路径prompt: 文本promptimage: 单张图片路径video: 单个视频路径images: 图片路径列表videos: 视频路径列表max_new_tokens: 最大生成token数temperature: 采样温度"""old_time = time.time()gpu_memories = []def monitor_gpu_memory():import torchwhile not getattr(monitor_gpu_memory, 'stop', False):mem = torch.cuda.memory_allocated() / 1024 / 1024 gpu_memories.append(mem)time.sleep(5)model = Qwen2_5_VLForConditionalGeneration.from_pretrained(model_path,torch_dtype=torch.bfloat16,attn_implementation="flash_attention_2", device_map="cuda")monitor_gpu_memory.stop = Falsemem_thread = threading.Thread(target=monitor_gpu_memory)mem_thread.start()processor = AutoProcessor.from_pretrained(model_path, use_fast=True)contents = []if prompt is not None:contents.append({"type": "text", "text": prompt})if images is not None:for img in images:contents.append({"type": "image", "image": img})elif image is not None:contents.append({"type": "image", "image": image})elif videos is not None:for vid in videos:contents.append({"type": "video", "video": vid})elif video is not None:contents.append({"type": "video", "video": video})messages = [{"role": "user", "content": contents}]text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)image_inputs, video_inputs = process_vision_info(messages)inputs = processor(text=[text],images=image_inputs,videos=video_inputs,padding=True,return_tensors="pt",)inputs = inputs.to(model.device)generated_ids = model.generate(**inputs,max_new_tokens=max_new_tokens,temperature=temperature)generated_ids_trimmed = [out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)]output_text = processor.batch_decode(generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False)monitor_gpu_memory.stop = Truemem_thread.join()total_time = time.time() - old_timetotal_tokens = sum(len(ids) for ids in generated_ids_trimmed)speed = total_tokens / total_time if total_time > 0 else 0avg_mem = sum(gpu_memories) / len(gpu_memories) if gpu_memories else 0return {"output_text": output_text,"total_time": total_time,"total_tokens": total_tokens,"speed": speed,"avg_mem": avg_mem,}
if __name__ == "__main__":model_path = "/mnt/d/LLaMA-Factory/Qwen/Qwen2.5-VL-7B-Instruct-unsloth-bnb-4bit"result = llm(model_path=model_path,prompt="识别图中文字,如果有表格等特殊格式需要保留原格式。不用解释和总结,直接输出识别结果。",image=r"/mnt/c/Users/CJK/Desktop/3.png",max_new_tokens=2048,temperature=1.0)print(result["output_text"])print(f"总耗时: {result['total_time']:.2f}s, 生成token数: {result['total_tokens']}, 输出速度: {result['speed']:.2f} token/s, 平均占用显存: {result['avg_mem']:.2f} MB")result = llm(model_path=model_path,prompt="识别图中文字,如果有表格等特殊格式需要保留原格式。不用解释和总结,直接输出识别结果。",images=[r"/mnt/c/Users/CJK/Desktop/1.png", r"/mnt/c/Users/CJK/Desktop/3.png"],max_new_tokens=2048,temperature=0.6)print(result["output_text"])print(f"总耗时: {result['total_time']:.2f}s, 生成token数: {result['total_tokens']}, 输出速度: {result['speed']:.2f} token/s, 平均占用显存: {result['avg_mem']:.2f} MB")result = llm(model_path=model_path,prompt="识别图中文字,如果有表格等特殊格式需要保留原格式。不用解释和总结,直接输出识别结果。",video=r"/mnt/c/Users/CJK/Desktop/2.mp4",max_new_tokens=2048,temperature=0.6)print(result["output_text"])print(f"总耗时: {result['total_time']:.2f}s, 生成token数: {result['total_tokens']}, 输出速度: {result['speed']:.2f} token/s, 平均占用显存: {result['avg_mem']:.2f} MB")print(f"总耗时: {result['total_time']:.2f}s, 生成token数: {result['total_tokens']}, 输出速度: {result['speed']:.2f} token/s, 平均占用显存: {result['avg_mem']:.2f} MB")result = llm(model_path=model_path,prompt="识别图中文字,如果有表格等特殊格式需要保留原格式。不用解释和总结,直接输出识别结果。",videos=[r"/mnt/c/Users/CJK/Desktop/1.mp4", r"/mnt/c/Users/CJK/Desktop/2.mp4"],max_new_tokens=2048,temperature=0.6)print(result["output_text"])print(f"总耗时: {result['total_time']:.2f}s, 生成token数: {result['total_tokens']}, 输出速度: {result['speed']:.2f} token/s, 平均占用显存: {result['avg_mem']:.2f} MB")
异步/并行执行
from transformers import Qwen2_5_VLForConditionalGeneration, AutoProcessor
from qwen_vl_utils import process_vision_info
import torch, time, threading
import concurrent.futures
from typing import List, Dict, Union, Optional, Any
_MODEL = None
_PROCESSOR = None
_MODEL_LOCK = threading.Lock()def load_model_and_processor(model_path):"""加载模型和处理器,如果已经加载则返回缓存的实例"""global _MODEL, _PROCESSORwith _MODEL_LOCK:if _MODEL is None or _PROCESSOR is None:_MODEL = Qwen2_5_VLForConditionalGeneration.from_pretrained(model_path,torch_dtype=torch.bfloat16,attn_implementation="flash_attention_2", device_map="cuda")_PROCESSOR = AutoProcessor.from_pretrained(model_path, use_fast=True)return _MODEL, _PROCESSORdef llm(model_path,prompt=None,image=None,video=None,images=None,videos=None,max_new_tokens=2048,temperature=0.6,parallel=False,max_workers=4,
):"""model_path: 模型路径prompt: 文本promptimage: 单张图片路径video: 单个视频路径images: 图片路径列表videos: 视频路径列表max_new_tokens: 最大生成token数temperature: 采样温度parallel: 是否并行处理多个图片/视频max_workers: 并行处理的最大工作线程数"""if parallel and ((images and len(images) > 1) or (videos and len(videos) > 1)):return parallel_process(model_path=model_path,prompt=prompt,images=images,videos=videos,max_new_tokens=max_new_tokens,temperature=temperature,max_workers=max_workers)old_time = time.time()gpu_memories = []def monitor_gpu_memory():import torchwhile not getattr(monitor_gpu_memory, 'stop', False):mem = torch.cuda.memory_allocated() / 1024 / 1024 gpu_memories.append(mem)time.sleep(5)model, processor = load_model_and_processor(model_path)monitor_gpu_memory.stop = Falsemem_thread = threading.Thread(target=monitor_gpu_memory)mem_thread.start()contents = []if prompt is not None:contents.append({"type": "text", "text": prompt})if images is not None:for img in images:contents.append({"type": "image", "image": img})elif image is not None:contents.append({"type": "image", "image": image})elif videos is not None:for vid in videos:contents.append({"type": "video", "video": vid})elif video is not None:contents.append({"type": "video", "video": video})messages = [{"role": "user", "content": contents}]text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)image_inputs, video_inputs = process_vision_info(messages)inputs = processor(text=[text],images=image_inputs,videos=video_inputs,padding=True,return_tensors="pt",)inputs = inputs.to(model.device)generated_ids = model.generate(**inputs,max_new_tokens=max_new_tokens,temperature=temperature)generated_ids_trimmed = [out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)]output_text = processor.batch_decode(generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False)monitor_gpu_memory.stop = Truemem_thread.join()total_time = time.time() - old_timetotal_tokens = sum(len(ids) for ids in generated_ids_trimmed)speed = total_tokens / total_time if total_time > 0 else 0avg_mem = sum(gpu_memories) / len(gpu_memories) if gpu_memories else 0return {"output_text": output_text,"total_time": total_time,"total_tokens": total_tokens,"speed": speed,"avg_mem": avg_mem,}def process_single_item(model_path: str,prompt: Optional[str],item_path: str,is_video: bool = False,max_new_tokens: int = 2048,temperature: float = 0.6,
) -> Dict[str, Any]:"""处理单个图片或视频"""model, processor = load_model_and_processor(model_path)old_time = time.time()gpu_memories = []def monitor_gpu_memory():import torchwhile not getattr(monitor_gpu_memory, 'stop', False):mem = torch.cuda.memory_allocated() / 1024 / 1024 gpu_memories.append(mem)time.sleep(5)monitor_gpu_memory.stop = Falsemem_thread = threading.Thread(target=monitor_gpu_memory)mem_thread.start()contents = []if prompt is not None:contents.append({"type": "text", "text": prompt})if is_video:contents.append({"type": "video", "video": item_path})else:contents.append({"type": "image", "image": item_path})messages = [{"role": "user", "content": contents}]text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)image_inputs, video_inputs = process_vision_info(messages)inputs = processor(text=[text],images=image_inputs,videos=video_inputs,padding=True,return_tensors="pt",)inputs = inputs.to(model.device)with _MODEL_LOCK: generated_ids = model.generate(**inputs,max_new_tokens=max_new_tokens,temperature=temperature)generated_ids_trimmed = [out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)]output_text = processor.batch_decode(generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False)monitor_gpu_memory.stop = Truemem_thread.join()total_time = time.time() - old_timetotal_tokens = sum(len(ids) for ids in generated_ids_trimmed)speed = total_tokens / total_time if total_time > 0 else 0avg_mem = sum(gpu_memories) / len(gpu_memories) if gpu_memories else 0return {"output_text": output_text,"total_time": total_time,"total_tokens": total_tokens,"speed": speed,"avg_mem": avg_mem,}def parallel_process(model_path: str,prompt: Optional[str] = None,images: Optional[List[str]] = None,videos: Optional[List[str]] = None,max_new_tokens: int = 2048,temperature: float = 0.6,max_workers: int = 4,
) -> Dict[str, Any]:"""并行处理多个图片或视频"""start_time = time.time()results = []load_model_and_processor(model_path)items = []is_video_flags = []if images:items.extend(images)is_video_flags.extend([False] * len(images))if videos:items.extend(videos)is_video_flags.extend([True] * len(videos))with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:future_to_item = {executor.submit(process_single_item, model_path, prompt, item, is_video, max_new_tokens, temperature): (item, is_video) for item, is_video in zip(items, is_video_flags)}for future in concurrent.futures.as_completed(future_to_item):item, is_video = future_to_item[future]try:result = future.result()results.append(result)except Exception as e:print(f"处理 {'视频' if is_video else '图片'} {item} 时出错: {e}")all_output_texts = [result["output_text"] for result in results]total_time = time.time() - start_timetotal_tokens = sum(result["total_tokens"] for result in results)avg_speed = total_tokens / total_time if total_time > 0 else 0avg_mem = sum(result.get("avg_mem", 0) for result in results) / len(results) if results else 0return {"output_text": all_output_texts,"total_time": total_time,"total_tokens": total_tokens,"speed": avg_speed,"avg_mem": avg_mem,"individual_results": results}
if __name__ == "__main__":model_path = "/mnt/d/LLaMA-Factory/Qwen/Qwen2.5-VL-7B-Instruct-unsloth-bnb-4bit"result = llm(model_path=model_path,prompt="识别图中文字,如果有表格等特殊格式需要保留原格式。不用解释和总结,直接输出识别结果。",images=[r"/mnt/c/Users/CJK/Desktop/1.png", r"/mnt/c/Users/CJK/Desktop/2.png", r"/mnt/c/Users/CJK/Desktop/3.png", r"/mnt/c/Users/CJK/Desktop/4.png", r"/mnt/c/Users/CJK/Desktop/5.png", r"/mnt/c/Users/CJK/Desktop/6.png", r"/mnt/c/Users/CJK/Desktop/7.png", r"/mnt/c/Users/CJK/Desktop/8.png"],max_new_tokens=2048,temperature=0.6,parallel=True,max_workers=8)print("并行处理结果:")for i, text in enumerate(result["output_text"]):print(f"图片 {i+1} 结果: {text}")print(f"总耗时: {result['total_time']:.2f}s, 生成token数: {result['total_tokens']}, 平均输出速度: {result['speed']:.2f} token/s, 平均占用显存: {result['avg_mem']:.2f} MB")