调用qwen3-omni的api对本地文件生成视频文本描述(批量生成)
# 导入所有需要的库
import os
import pandas as pd
from tqdm import tqdm
import time
import mimetypes
import json
import subprocess # 新增:导入subprocess模块来执行命令行命令# 导入dashscope SDK
import dashscope
from dashscope import MultiModalConversation
from dashscope.api_entities.dashscope_response import DashScopeAPIResponse# ==============================================================================
# 1. 配置区域 (Configuration Area)
# ==============================================================================# 您的阿里云百炼API Key
API_KEY = "去百炼生成自己的api key"# 阿里云的API基地址 (使用dashscope SDK时,通常不需要显式设置base_url,除非是特定区域)
# dashscope.base_http_api_url = "https://dashscope-intl.aliyuncs.com/api/v1" # 使用支持视频的模型
MODEL_NAME = "qwen3-omni-flash" # 输入目录:包含所有.mp4视频文件的目录
INPUT_VIDEO_DIR = r"D:\打砸机器" # 请确保此路径存在且包含.mp4文件# 输出文件:保存处理结果的JSON文件名
OUTPUT_JSON_FILE = "video_analysis_results.json" # 您想对每个视频提出的问题
PROMPT_QUESTION_TEMPLATE = "尽可能详细的描述视频中关于'{folder_name}'这部分,结合音频和字幕来一段话连贯地生成视频的详细文本描述,不要分点回答。"# 最大视频大小(单位:MB)- 阿里云API通常有100MB限制,这里设置为1000MB以留有余地
MAX_VIDEO_SIZE_MB = 1000 # 视频抽帧参数 (fps: 每隔1/fps 秒抽取一帧)
VIDEO_FPS = 1# 新增:视频截取时长限制 (秒)。对于超过此长度的视频,将截取前N秒。
# !!! 重要:请根据DashScope官方文档确认qwen3-omni-flash模型对视频输入的实际最大时长限制 !!!
# !!! 如果API限制远小于150秒,即使截取到150秒也仍然会失败。 !!!
MAX_TRIM_DURATION_SECONDS = 150 # ==============================================================================
# 2. 辅助函数 (Helper Functions)
# ==============================================================================def get_video_files_from_directory(directory):"""从目录获取所有.mp4视频文件路径"""video_files = []if not os.path.isdir(directory):print(f"错误: 输入目录 '{directory}' 不存在或不是一个有效的目录。")return []for root, _, files in os.walk(directory):for file in files:if file.lower().endswith('.mp4'):full_path = os.path.join(root, file)# 检查文件大小file_size_mb = os.path.getsize(full_path) / (1024 * 1024)if file_size_mb <= MAX_VIDEO_SIZE_MB:video_files.append(full_path)else:print(f"跳过文件 {file} (大小 {file_size_mb:.2f}MB > {MAX_VIDEO_SIZE_MB}MB)")return video_files# ... (其他导入和配置保持不变) ...def get_video_duration(video_path):"""使用ffprobe获取视频时长 (秒)"""try:cmd = ['ffprobe','-v', 'error','-show_entries', 'format=duration','-of', 'default=noprint_wrappers=1:nokey=1',video_path]# 确保 encoding='utf-8' 以正确处理可能包含中文的路径或输出result = subprocess.run(cmd, capture_output=True, text=True, check=True, encoding='utf-8')duration = float(result.stdout.strip())print(f"DEBUG: [get_video_duration] 视频 '{os.path.basename(video_path)}' 实际时长: {duration:.2f}秒")return durationexcept FileNotFoundError:print("ERROR: [get_video_duration] ffprobe未找到。请确保FFmpeg已安装并添加到系统环境变量。")return 0except subprocess.CalledProcessError as e:print(f"ERROR: [get_video_duration] 无法获取视频 '{os.path.basename(video_path)}' 的时长。错误: {e.stderr.strip()}")return 0except Exception as e:print(f"ERROR: [get_video_duration] 获取视频 '{os.path.basename(video_path)}' 时长时发生意外错误: {e}")return 0def compress_video_if_needed(video_path):"""如果视频太长(超过MAX_TRIM_DURATION_SECONDS),则截取前MAX_TRIM_DURATION_SECONDS秒。返回处理后的视频路径(可能是原始路径,也可能是截取后的临时路径)。"""print(f"\nDEBUG: [compress_video_if_needed] 进入函数,处理文件: '{os.path.basename(video_path)}'")# 优先检查文件大小(您已有的逻辑)file_size_mb = os.path.getsize(video_path) / (1024 * 1024)if file_size_mb > MAX_VIDEO_SIZE_MB:print(f"WARNING: [compress_video_if_needed] 文件 '{os.path.basename(video_path)}' 大小 {file_size_mb:.2f}MB 超过 {MAX_VIDEO_SIZE_MB}MB。")# 如果文件过大,这里可以考虑添加压缩逻辑,但目前只关注时长截取。duration = get_video_duration(video_path)print(f"DEBUG: [compress_video_if_needed] 视频原始时长: {duration:.2f}秒,MAX_TRIM_DURATION_SECONDS: {MAX_TRIM_DURATION_SECONDS}秒")# 如果视频时长超过设定的最大截取时长if duration > MAX_TRIM_DURATION_SECONDS:print(f"DEBUG: [compress_video_if_needed] 视频时长超出限制,正在准备截取...")# 构建截取后的临时文件路径base_name = os.path.basename(video_path)dir_name = os.path.dirname(video_path)# 确保临时文件名称不会与原始文件冲突,且易于识别output_path = os.path.join(dir_name, f"trimmed_{base_name}")# 使用ffmpeg截取视频前N秒cmd_trim = ['ffmpeg','-i', video_path,'-ss', '00:00:00','-t', str(MAX_TRIM_DURATION_SECONDS),'-c', 'copy','-y', # 强制覆盖输出文件output_path]print(f"DEBUG: [compress_video_if_needed] 正在执行截取命令: {' '.join(cmd_trim)}")try:# 执行ffmpeg命令result = subprocess.run(cmd_trim, check=True, capture_output=True, text=True, encoding='utf-8')print(f"DEBUG: [compress_video_if_needed] FFmpeg标准输出:\n{result.stdout.strip()}")print(f"DEBUG: [compress_video_if_needed] FFmpeg标准错误:\n{result.stderr.strip()}")if os.path.exists(output_path) and os.path.getsize(output_path) > 0:print(f"DEBUG: [compress_video_if_needed] 视频已成功截取至: '{output_path}'")# 验证截取后的文件时长trimmed_duration = get_video_duration(output_path)# 允许少量误差,例如1秒if trimmed_duration > MAX_TRIM_DURATION_SECONDS + 1: print(f"ERROR: [compress_video_if_needed] 截取后的视频 '{os.path.basename(output_path)}' 时长 {trimmed_duration:.2f}秒 仍超出 {MAX_TRIM_DURATION_SECONDS}秒!请检查ffmpeg命令或API限制。")return video_path # 截取后仍太长,返回原路径,API会报错else:return output_path # 返回截取后的文件路径else:print(f"ERROR: [compress_video_if_needed] FFmpeg命令执行成功,但截取后的文件 '{output_path}' 不存在或大小为零!")return video_path # 文件不存在或为空,返回原路径except FileNotFoundError:print("ERROR: [compress_video_if_needed] ffmpeg未找到。请确保FFmpeg已安装并添加到系统环境变量。")return video_pathexcept subprocess.CalledProcessError as e:print(f"ERROR: [compress_video_if_needed] 截取视频 '{os.path.basename(video_path)}' 失败。错误码: {e.returncode}, 错误输出: {e.stderr.strip()}")return video_pathexcept Exception as e:print(f"ERROR: [compress_video_if_needed] 截取视频 '{os.path.basename(video_path)}' 时发生意外错误: {e}")return video_pathprint(f"DEBUG: [compress_video_if_needed] 视频时长符合要求,返回原始路径: '{video_path}'")return video_pathdef compress_video_if_needed(video_path):"""如果视频太长(超过MAX_TRIM_DURATION_SECONDS),则截取前MAX_TRIM_DURATION_SECONDS秒。返回处理后的视频路径(可能是原始路径,也可能是截取后的临时路径)。"""print(f"\nDEBUG: 进入 compress_video_if_needed,处理文件: {os.path.basename(video_path)}")# 优先检查文件大小(您已有的逻辑)file_size_mb = os.path.getsize(video_path) / (1024 * 1024)if file_size_mb > MAX_VIDEO_SIZE_MB:print(f"警告: 文件 {os.path.basename(video_path)} 大小 {file_size_mb:.2f}MB 超过 {MAX_VIDEO_SIZE_MB}MB。")# 如果文件过大,这里可以考虑添加压缩逻辑,但目前只关注时长截取。# 如果文件过大且无法截取到足够小,可能需要用户手动处理或跳过。duration = get_video_duration(video_path)print(f"DEBUG: 视频原始时长: {duration:.2f}秒,MAX_TRIM_DURATION_SECONDS: {MAX_TRIM_DURATION_SECONDS}秒")# 如果视频时长超过设定的最大截取时长if duration > MAX_TRIM_DURATION_SECONDS:print(f"视频 {os.path.basename(video_path)} 时长 {duration:.2f}秒 超过 {MAX_TRIM_DURATION_SECONDS}秒,正在截取前 {MAX_TRIM_DURATION_SECONDS}秒...")# 构建截取后的临时文件路径# 截取后的文件保存在原始文件同目录下,并添加前缀 "trimmed_"base_name = os.path.basename(video_path)dir_name = os.path.dirname(video_path)# 确保临时文件名称不会与原始文件冲突,且易于识别output_path = os.path.join(dir_name, f"trimmed_{base_name}")# 使用ffmpeg截取视频前N秒# -y: 覆盖输出文件而不询问# -ss 00:00:00: 从视频开头开始# -t MAX_TRIM_DURATION_SECONDS: 截取指定秒数# -c copy: 复制视频和音频流,不重新编码,速度快,无质量损失(推荐)cmd_trim = ['ffmpeg','-i', video_path,'-ss', '00:00:00','-t', str(MAX_TRIM_DURATION_SECONDS),'-c', 'copy','-y', # 强制覆盖输出文件output_path]print(f"DEBUG: 正在执行截取命令: {' '.join(cmd_trim)}")try:# 执行ffmpeg命令subprocess.run(cmd_trim, check=True, capture_output=True, text=True, encoding='utf-8')print(f"DEBUG: 视频已成功截取至: {output_path}")# 验证截取后的文件时长trimmed_duration = get_video_duration(output_path)if trimmed_duration > MAX_TRIM_DURATION_SECONDS + 1: # 允许少量误差print(f"ERROR: 截取后的视频 {os.path.basename(output_path)} 时长 {trimmed_duration:.2f}秒 仍超出 {MAX_TRIM_DURATION_SECONDS}秒!请检查ffmpeg命令或API限制。")return video_path # 截取后仍太长,返回原路径,API会报错return output_path # 返回截取后的文件路径except FileNotFoundError:print("错误: ffmpeg未找到。请确保FFmpeg已安装并添加到系统环境变量。")return video_path # FFmpeg未安装,返回原路径,可能导致后续API调用失败except subprocess.CalledProcessError as e:print(f"截取视频 {os.path.basename(video_path)} 失败。错误: {e.stderr.strip()}")return video_path # 截取失败,返回原路径except Exception as e:print(f"截取视频 {os.path.basename(video_path)} 时发生意外错误: {e}")return video_path # 截取失败,返回原路径print(f"DEBUG: 视频时长符合要求,返回原始路径: {video_path}")return video_path # 视频时长符合要求,返回原路径# ==============================================================================
# 3. 核心处理逻辑 (Core Processing Logic)
# ==============================================================================def process_single_video(video_path, api_key): # 传入api_key而不是client"""处理单个视频文件,调用API并返回结果"""current_prompt_question = "" # 初始化,以防在生成问题前出错try:# 获取视频文件名video_filename = os.path.basename(video_path)# 获取视频所在文件夹名称folder_path = os.path.dirname(video_path)folder_name = os.path.basename(folder_path)# 检查并截取视频(如果需要)processed_video_path = compress_video_if_needed(video_path) # 对于Qwen,视频路径应采用"file://{local_path}"的格式qwen_video_path = f"file://{processed_video_path}" print(f"正在处理: {video_filename} - 使用Qwen本地路径格式: {qwen_video_path}")# 动态生成问题current_prompt_question = PROMPT_QUESTION_TEMPLATE.format(folder_name=folder_name)print(f"当前问题: {current_prompt_question}")# 构建符合多模态模型要求的messages参数messages = [{"role": "user","content": [{"video": qwen_video_path, "fps": VIDEO_FPS}, {"text": current_prompt_question} ]}]# 调用APIprint(f"正在分析: {video_filename}...")response = MultiModalConversation.call(api_key=api_key, model=MODEL_NAME,messages=messages,)ai_response = "未能获取模型描述。" if isinstance(response, DashScopeAPIResponse): if response.status_code == 200: if response.output and response.output.choices and len(response.output.choices) > 0:if response.output.choices[0].message and response.output.choices[0].message.content and len(response.output.choices[0].message.content) > 0:ai_response = response.output.choices[0].message.content[0]["text"]else:ai_response = f"API返回结构不完整: {response.output.choices[0].message}"else:ai_response = f"API返回无有效choices: {response.output}"else:ai_response = f"API调用失败,状态码: {response.status_code}, 错误信息: {response.message}"if response.code and response.message:ai_response = f"API调用失败,错误码: {response.code}, 错误信息: {response.message}"elif response.output and response.output.error:ai_response = f"API调用失败,错误信息: {response.output.error}"else:ai_response = f"API返回非预期类型: {type(response)}, 响应内容: {response}"return {"video_filename": video_filename,"video_path": video_path, "prompt": current_prompt_question, "description": ai_response,"status": "success" if "API调用失败" not in ai_response and "未能获取模型描述" not in ai_response else "failed"}except Exception as e:print(f"\n处理失败: {os.path.basename(video_path)}\n错误原因: {e}")return {"video_filename": os.path.basename(video_path),"video_path": video_path, "prompt": current_prompt_question, "description": f"Error: {e}","status": "failed"}# ==============================================================================
# 4. 主执行流程 (Main Execution Flow)
# ==============================================================================if __name__ == "__main__":print("--- 视频批量处理脚本开始 ---")print(f"模型: {MODEL_NAME}")print(f"问题模板: {PROMPT_QUESTION_TEMPLATE}")print(f"视频截取时长限制: {MAX_TRIM_DURATION_SECONDS}秒") # 打印新的配置项# 初始化dashscope API Keyprint("1. 配置API Key...")dashscope.api_key = API_KEY# 获取视频文件列表print(f"2. 正在从 '{INPUT_VIDEO_DIR}' 目录读取.mp4视频文件...")video_files = get_video_files_from_directory(INPUT_VIDEO_DIR)if not video_files:print(f"错误!在 '{INPUT_VIDEO_DIR}' 目录中未找到任何符合要求的.mp4视频文件。")print(f"请确保文件大小不超过 {MAX_VIDEO_SIZE_MB}MB。")exit()print(f" 找到 {len(video_files)} 个符合条件的视频文件。")# 创建结果列表all_results = []print("3. 开始处理视频文件...")for video_path in tqdm(video_files, desc="视频处理进度"):result = process_single_video(video_path, API_KEY)all_results.append(result)# 防止API调用过于频繁time.sleep(2)print("\n4. 所有视频处理完成!")# 将结果保存为JSON文件print(f"5. 正在保存结果到 '{OUTPUT_JSON_FILE}'...")# 直接将all_results列表保存为JSON,因为列表本身就是JSON数组的结构with open(OUTPUT_JSON_FILE, 'w', encoding='utf-8') as f:json.dump(all_results, f, ensure_ascii=False, indent=4) # ensure_ascii=False 支持中文,indent=4 美化输出print(f"--- 任务成功结束!分析结果已保存。 ---")# 统计成功和失败的视频数量successful_videos = [r for r in all_results if r['status'] == 'success']failed_videos = [r for r in all_results if r['status'] == 'failed']print(f"成功处理: {len(successful_videos)} 个视频")print(f"处理失败: {len(failed_videos)} 个视频")# 如果有失败的情况,保存失败文件列表if len(failed_videos) > 0:failed_filenames = [r['video_filename'] for r in failed_videos]with open('failed_videos.txt', 'w', encoding='utf-8') as f:f.write("\n".join(failed_filenames))print(f"失败文件列表已保存到 'failed_videos.txt'")
①可以处理文件夹中所有的.mp4文件
②按照要求视频长度在150s以内,所以要配置了ffmpeg环境变量,截取视频的前150s内容(按需取用)
③我的prompt是围绕.mp4所在文件夹名字为主题去描述的
