本地实现视频分析与总结工具
这里写自定义目录标题
- 引言
- 需求拆解
- 记录
- 0、硬件资源与环境搭建
- 1、网络音视频获取
- **yt-dlp**
- **安装**
- **常用指令**
- **python 使用示例**
- 2、视频提取音频、提取关键帧
- FFmpeg
- **安装**
- 常用指令
- **python 使用示例**
- 3、音频内容解析为文字
- Whisper
- **安装**
- **常用指令**
- **python 使用示例**
- Paddle
- 安装
- python 示例
- 4、GPT 类模型进行文字的统合分析与总结
- Ollama + Qwen3 model
- 安装
- python 示例
- 5、推送
- 6、汇总
- Main.py
- part1_download_video.py
- part2_extract_key_frames.py
- part3_extract_audio.py
- part4_ocr_frames.py
- part5_transcribe_audio.py
- part6_summarize_content.py
- 记录
引言
基于所使用过的一个音视频内容总结平台,它支持对网络视频进行解析并总结内容。
但由于免费使用次数有限,且受限网络情况,故计划借由DeepSeek辅助,实现本地部署类似功能的平台进行使用。(租服务器部署)
日期 | 说明 |
---|---|
2025年9月20日 | ver 0.1: 代码较为粗糙; 缺少对有含义的图片内容进行分析; 缺少对多国语言的兼容 |
需求拆解
- 网络音视频获取
- 音频转析为文字
- 视频转为图片
- 识别图中文字内容
- 识别图片非文字内容并解析为文字
- GPT 类模型进行文字的统合分析与总结
- 以合适的格式将结果输出
记录
0、硬件资源与环境搭建
- NVIDIA GPU 3060
- 16 G 内存
- 创建 conda 环境
- python:Python 3.10.18
涉及工具:
- yt-dlp:根据网址下载视频(可能受限于网站权限)
- FFmpeg:可以提取视频的关键帧和音频
- NVIDIA 版 GPU torchvision torchaudio(视自身电脑资源而定)
- whisper:将视频或音频转录文字
- CLIP:分析图像内容
- PaddleOCR:识别图片中的文字
- 注:可使用 VideoLLM(如 Video-LLaVA) 等工具替代 whisper、FFmpeg、CLIP
1、网络音视频获取
yt-dlp
yt-dlp 是一个开源的命令行工具,用于从互联网上下载视频。它是 youtube-dl 的一个分支。
支持从大量网站下载视频,包括但不限于 YouTube、Bilibili、TikTok 等。
安装
pip install yt-dlp
常用指令
# yt-dlp 常用指令## 基本下载
- 下载视频:yt-dlp <视频链接>## 指定视频格式
- 下载最高质量的视频:yt-dlp -f best <视频链接>
- 下载特定分辨率的视频(例如 1080p):yt-dlp -f bestvideo[height<=1080]+bestaudio/best[height<=1080] <视频链接>## 下载音频
- 仅下载音频文件,并将其转换为 MP3 格式:yt-dlp -x --audio-format mp3 <视频链接>## 下载字幕
- 下载视频的字幕文件(英语):yt-dlp --write-sub --sub-lang en <视频链接>
- 下载多种语言的字幕(例如英语和简体中文):yt-dlp --write-sub --sub-lang en,zh-CN <视频链接>## 自定义输出文件名
- 指定下载文件的输出路径和文件名:yt-dlp -o ~/Downloads/%(title)s.%(ext)s <视频链接>## 下载播放列表
- 下载整个播放列表:yt-dlp -f best -o ~/Downloads/%(playlist)s/%(playlist_index)s-%(title)s.%(ext)s <播放列表链接>## 限制下载速度
- 限制下载速度为 1MB/s:yt-dlp --limit-rate 1M <视频链接>## 跳过已下载的文件
- 跳过已经下载的文件:yt-dlp --skip-download <视频链接>## 更新 yt-dlp
- 更新到最新版本的 yt-dlp:pip install --upgrade yt-dlp## 查看帮助
- 查看所有可用的选项和命令:yt-dlp --help## 示例
- 下载一个 YouTube 视频,并将其保存为 MP3 格式的音频文件:yt-dlp -x --audio-format mp3 -o ~/Music/%(title)s.%(ext)s <视频链接>
python 使用示例
import yt_dlpclass YouTubeDownloader:def __init__(self):# 初始化 yt-dlp 的配置self.ydl_opts = {}def download_video(self, url, output_path='.', format='bestvideo+bestaudio/best', video_title=None):"""下载视频:param url: 视频链接:param output_path: 输出路径:param format: 视频格式"""if video_title is not None:title = video_titleelse:title = 'default'self.ydl_opts.update({'format': format,'outtmpl': f'{output_path}/{title}_%(epoch)s.%(ext)s','restrictfilenames': True})with yt_dlp.YoutubeDL(self.ydl_opts) as ydl:ydl.download([url])def download_audio(self, url, output_path='.', format='mp3'):"""下载音频:param url: 视频链接:param output_path: 输出路径:param format: 音频格式"""self.ydl_opts.update({'format': 'bestaudio/best','postprocessors': [{'key': 'FFmpegExtractAudio','preferredcodec': format,'preferredquality': '192',}],'outtmpl': f'{output_path}/%(title)s.%(ext)s',})with yt_dlp.YoutubeDL(self.ydl_opts) as ydl:ydl.download([url])def download_subtitles(self, url, output_path='.', lang='zh'):"""下载字幕:param url: 视频链接:param output_path: 输出路径:param lang: 字幕语言"""self.ydl_opts.update({'writesubtitles': True, # 启用字幕下载'writeautomaticsub': True, # 也下载自动生成的字幕'subtitleslangs': [lang, 'en', 'zh-Hans', 'zh'], # 多语言备选'skip_download': True, # 关键:跳过视频下载'outtmpl': f'{output_path}/%(title)s_{lang}.%(ext)s',})with yt_dlp.YoutubeDL(self.ydl_opts) as ydl:ydl.download([url])def download_playlist(self, url, output_path='.', format='best'):"""下载播放列表:param url: 播放列表链接:param output_path: 输出路径:param format: 视频格式"""self.ydl_opts.update({'format': format,'outtmpl': f'{output_path}/%(playlist)s/%(playlist_index)s-%(title)s.%(ext)s',})with yt_dlp.YoutubeDL(self.ydl_opts) as ydl:ydl.download([url])def limit_download_speed(self, url, output_path='.', format='best', speed_limit='1M'):"""限制下载速度:param url: 视频链接:param output_path: 输出路径:param format: 视频格式:param speed_limit: 速度限制(例如 '1M' 表示 1MB/s)"""self.ydl_opts.update({'format': format,'outtmpl': f'{output_path}/%(title)s.%(ext)s','ratelimit': speed_limit,})with yt_dlp.YoutubeDL(self.ydl_opts) as ydl:ydl.download([url])def skip_existing_files(self, url, output_path='.', format='best'):"""跳过已下载的文件:param url: 视频链接:param output_path: 输出路径:param format: 视频格式"""self.ydl_opts.update({'format': format,'outtmpl': f'{output_path}/%(title)s.%(ext)s','skip_download': True,})with yt_dlp.YoutubeDL(self.ydl_opts) as ydl:ydl.download([url])# 示例用法
if __name__ == "__main__":downloader = YouTubeDownloader()# 下载视频downloader.download_video("https://www.bilibili.com/video/BV1E3a1zsEGJ", output_path='results/videos/')# 下载音频# downloader.download_audio('https://www.bilibili.com/video/BV1E3a1zsEGJ', output_path='results/audios/')# 下载字幕(B站字幕下载需要登录信息)# downloader.download_subtitles('https://www.bilibili.com/video/BV1E3a1zsEGJ', output_path='results/subtitles/', lang='en')# 下载播放列表# downloader.download_playlist('https://www.youtube.com/playlist?list=PLexample', output_path='~/Downloads')# 限制下载速度# downloader.limit_download_speed('https://www.bilibili.com/video/BV1E3a1zsEGJ', output_path='~/Downloads', speed_limit='1M')# 跳过已下载的文件# downloader.skip_existing_files('https://www.bilibili.com/video/BV1E3a1zsEGJ', output_path='~/Downloads')
2、视频提取音频、提取关键帧
FFmpeg
FFmpeg 是一个开源的、跨平台的音视频处理工具集和开发库。
安装
conda install -c conda-forge ffmpeg
常用指令
# 转换视频格式
ffmpeg -i input.mp4 output.avi
ffmpeg -i input.mkv output.mp4# 转换音频格式
ffmpeg -i audio.wav audio.mp3# 提取音频
ffmpeg -i video.mp4 -vn audio.mp3# 提取视频(去掉音频)
ffmpeg -i video.mp4 -an silent_video.mp4# 裁剪视频
ffmpeg -i input.mp4 -ss 00:01:00 -to 00:02:00 -c copy output.mp4# 调整分辨率
ffmpeg -i input.mp4 -vf "scale=1280:720" output.mp4# 调整视频码率
ffmpeg -i input.mp4 -b:v 1M output.mp4# 调整音频码率
ffmpeg -i input.mp4 -b:a 128k output.mp4# 调整帧率
ffmpeg -i input.mp4 -r 30 output.mp4# 高质量压缩
ffmpeg -i input.mp4 -c:v libx264 -crf 28 -preset slow -c:a copy output.mp4# 从视频中提取缩略图
ffmpeg -i video.mp4 -ss 00:00:05 -vframes 1 thumbnail.jpg# 添加图片水印
ffmpeg -i input.mp4 -i watermark.png -filter_complex "overlay=10:10" output.mp4# 合并视频和音频文件
ffmpeg -i video.mp4 -i audio.mp3 -c copy output.mp4
提取关键帧
# 提取所有关键帧(I帧)
ffmpeg -i input.mp4 -vf "select=eq(pict_type\,I)" -vsync vfr keyframe_%04d.jpg# 限制输出帧率,避免太多图片
ffmpeg -i input.mp4 -vf "select=eq(pict_type\,I)" -vsync vfr -r 1 keyframe_%04d.jpg# 指定输出质量
ffmpeg -i input.mp4 -vf "select=eq(pict_type\,I)" -vsync vfr -q:v 2 keyframe_%04d.jpg# 只处理关键帧
ffmpeg -skip_frame nokey -i input.mp4 -vsync vfr keyframe_%04d.jpg# 每10秒提取一帧
ffmpeg -i input.mp4 -vf "fps=1/10" -q:v 2 frame_%04d.jpg# 每分钟提取一帧
ffmpeg -i input.mp4 -vf "fps=1/60" -q:v 2 frame_%04d.jpg# 缩放为指定宽度,保持比例
ffmpeg -i input.mp4 -vf "select=eq(pict_type\,I),scale=320:-1" -vsync vfr keyframe_%04d.jpg# 固定分辨率
ffmpeg -i input.mp4 -vf "select=eq(pict_type\,I),scale=320:240" -vsync vfr keyframe_%04d.jpg# 提取前30秒的关键帧
ffmpeg -i input.mp4 -t 30 -vf "select=eq(pict_type\,I)" -vsync vfr keyframe_%04d.jpg# 提取从1分钟到2分钟的关键帧
ffmpeg -i input.mp4 -ss 00:01:00 -to 00:02:00 -vf "select=eq(pict_type\,I)" -vsync vfr keyframe_%04d.jpg# 生成3x3的缩略图网格
ffmpeg -i input.mp4 -vf "select=eq(pict_type\,I),scale=150:-1,tile=3x3" -frames 1 preview.jpg
python 使用示例
import subprocess
import osdef convert_video(input_path, output_path):"""使用FFmpeg转换视频格式"""cmd = ['ffmpeg','-i', input_path,'-c:v', 'libx264','-crf', '23','-c:a', 'aac','-b:a', '128k',output_path,'-y' # 覆盖已存在文件]try:result = subprocess.run(cmd, capture_output=True, text=True, check=True)print("转换成功!")return Trueexcept subprocess.CalledProcessError as e:print(f"转换失败: {e.stderr}")return False# 使用示例
convert_video('input.avi', 'output.mp4')
def get_video_info(video_path):"""使用ffprobe获取视频信息"""cmd = ['ffprobe','-v', 'quiet','-print_format', 'json','-show_format','-show_streams',video_path]try:result = subprocess.run(cmd, capture_output=True, text=True, check=True)import jsoninfo = json.loads(result.stdout)return infoexcept Exception as e:print(f"获取信息失败: {e}")return None
import yt_dlpdef download_with_ffmpeg(url):"""使用yt-dlp和FFmpeg下载并处理视频"""ydl_opts = {'format': 'bestvideo+bestaudio', # 分别下载最佳视频和音频'outtmpl': '%(title)s.%(ext)s','merge_output_format': 'mp4', # 使用FFmpeg合并'postprocessors': [{'key': 'FFmpegVideoConvertor','preferedformat': 'mp4', # 转换为mp4格式}],}with yt_dlp.YoutubeDL(ydl_opts) as ydl:ydl.download([url])
import json
import shutil
import subprocess
from pathlib import Path
from typing import Optional, List, Dict, Tupleclass FFmpegTools:def __init__(self, ffmpeg_path: str = "ffmpeg", ffprobe_path: str = "ffprobe"):self.ffmpeg_path = ffmpeg_pathself.ffprobe_path = ffprobe_pathself._check_ffmpeg_available()def _check_ffmpeg_available(self):"""检查FFmpeg是否可用"""if shutil.which(self.ffmpeg_path) is None:raise Exception(f"FFmpeg未找到,请先安装FFmpeg")if shutil.which(self.ffprobe_path) is None:raise Exception(f"FFprobe未找到,请先安装FFmpeg")def _run_command(self, cmd: List[str], timeout: int = 300) -> Tuple[bool, str]:"""运行FFmpeg命令"""try:result = subprocess.run(cmd,capture_output=True,text=True,timeout=timeout)return result.returncode == 0, result.stderrexcept subprocess.TimeoutExpired:return False, "命令执行超时"except Exception as e:return False, str(e)def get_video_info(self, video_path: str) -> Optional[Dict]:"""获取视频详细信息"""cmd = [self.ffprobe_path,'-v', 'quiet','-print_format', 'json','-show_format','-show_streams',video_path]try:result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)if result.returncode == 0:return json.loads(result.stdout)return Noneexcept:return Nonedef get_duration(self, video_path: str) -> Optional[float]:"""获取视频时长(秒)"""info = self.get_video_info(video_path)if info and 'format' in info:return float(info['format'].get('duration', 0))return Nonedef get_resolution(self, video_path: str) -> Optional[Tuple[int, int]]:"""获取视频分辨率"""info = self.get_video_info(video_path)if info and 'streams' in info:for stream in info['streams']:if stream['codec_type'] == 'video':width = int(stream.get('width', 0))height = int(stream.get('height', 0))return width, heightreturn Nonedef get_bitrate(self, video_path: str) -> Optional[int]:"""获取视频码率"""info = self.get_video_info(video_path)if info and 'format' in info:return int(info['format'].get('bit_rate', 0))return Nonedef convert_format(self, input_path: str, output_path: str,output_format: str = "mp4", crf: int = 23) -> bool:"""转换视频格式"""cmd = [self.ffmpeg_path,'-i', input_path,'-c:v', 'libx264','-crf', str(crf),'-preset', 'medium','-c:a', 'aac','-b:a', '128k',output_path,'-y']success, error = self._run_command(cmd)if not success:print(f"格式转换失败: {error}")return successdef compress_video(self, input_path: str, output_path: str,target_size_mb: int = 10) -> bool:"""压缩视频到目标大小"""duration = self.get_duration(input_path)if not duration:return False# 计算目标码率 (bitrate = size * 8 / duration)target_bitrate = int((target_size_mb * 8 * 1024 * 1024) / duration)cmd = [self.ffmpeg_path,'-i', input_path,'-c:v', 'libx264','-b:v', f'{target_bitrate}','-maxrate', f'{target_bitrate}','-bufsize', f'{target_bitrate * 2}','-c:a', 'aac','-b:a', '64k',output_path,'-y']success, error = self._run_command(cmd)return successdef cut_video(self, input_path: str, output_path: str,start_time: str, end_time: str) -> bool:"""裁剪视频片段"""cmd = [self.ffmpeg_path,'-i', input_path,'-ss', start_time, # 开始时间,格式: 00:00:00'-to', end_time, # 结束时间,格式: 00:00:00'-c', 'copy', # 直接复制流,不重新编码output_path,'-y']success, error = self._run_command(cmd)return successdef extract_audio(self, input_path: str, output_path: str) -> bool:"""提取音频"""cmd = [self.ffmpeg_path,'-i', input_path,'-vn', # 不要视频流'-acodec', 'copy', # 直接复制音频编码output_path,'-y']success, error = self._run_command(cmd)return successdef extract_video(self, input_path: str, output_path: str) -> bool:"""提取视频(去掉音频)"""cmd = [self.ffmpeg_path,'-i', input_path,'-an', # 不要音频流'-vcodec', 'copy', # 直接复制视频编码output_path,'-y']success, error = self._run_command(cmd)return successdef change_resolution(self, input_path: str, output_path: str,width: int, height: int) -> bool:"""改变视频分辨率"""cmd = [self.ffmpeg_path,'-i', input_path,'-vf', f'scale={width}:{height}','-c:a', 'copy',output_path,'-y']success, error = self._run_command(cmd)return successdef change_framerate(self, input_path: str, output_path: str,framerate: int) -> bool:"""改变视频帧率"""cmd = [self.ffmpeg_path,'-i', input_path,'-r', str(framerate),'-c:a', 'copy',output_path,'-y']success, error = self._run_command(cmd)return successdef add_watermark(self, input_path: str, output_path: str,watermark_path: str, position: str = "10:10") -> bool:"""添加图片水印"""cmd = [self.ffmpeg_path,'-i', input_path,'-i', watermark_path,'-filter_complex', f'overlay={position}','-codec:a', 'copy',output_path,'-y']success, error = self._run_command(cmd)return successdef extract_thumbnail(self, input_path: str, output_path: str,time: str = "00:00:01") -> bool:"""提取视频缩略图"""cmd = [self.ffmpeg_path,'-i', input_path,'-ss', time,'-vframes', '1','-q:v', '2',output_path,'-y']success, error = self._run_command(cmd)return successdef generate_preview(self, input_path: str, output_path: str,cols: int = 3, rows: int = 3) -> bool:"""生成视频预览图(多张缩略图拼贴)"""duration = self.get_duration(input_path)if not duration:return False# 生成缩略图命令cmd = [self.ffmpeg_path,'-i', input_path,'-vf', f'fps=1/{duration / (cols * rows)},scale=320:-1,tile={cols}x{rows}','-frames', '1',output_path,'-y']success, error = self._run_command(cmd)return successdef convert_audio(self, input_path: str, output_path: str,format: str = "mp3", bitrate: str = "128k") -> bool:"""转换音频格式"""cmd = [self.ffmpeg_path,'-i', input_path,'-codec:a', 'libmp3lame' if format == 'mp3' else 'aac','-b:a', bitrate,output_path,'-y']success, error = self._run_command(cmd)return successdef change_audio_volume(self, input_path: str, output_path: str,volume: float = 1.0) -> bool:"""调整音频音量"""cmd = [self.ffmpeg_path,'-i', input_path,'-af', f'volume={volume}','-codec:v', 'copy' if input_path.endswith(('.mp4', '.mov', '.avi')) else '',output_path,'-y']success, error = self._run_command(cmd)return successdef batch_convert(self, input_dir: str, output_dir: str,output_format: str = "mp4") -> Dict[str, bool]:"""批量转换目录中的所有视频"""results = {}input_path = Path(input_dir)output_path = Path(output_dir)output_path.mkdir(exist_ok=True)video_extensions = ['.mp4', '.avi', '.mov', '.mkv', '.flv', '.wmv']for video_file in input_path.iterdir():if video_file.suffix.lower() in video_extensions:output_file = output_path / f"{video_file.stem}.{output_format}"success = self.convert_format(str(video_file), str(output_file))results[video_file.name] = successreturn resultsdef batch_extract_thumbnails(self, input_dir: str, output_dir: str) -> Dict[str, bool]:"""批量提取缩略图"""results = {}input_path = Path(input_dir)output_path = Path(output_dir)output_path.mkdir(exist_ok=True)video_extensions = ['.mp4', '.avi', '.mov', '.mkv']for video_file in input_path.iterdir():if video_file.suffix.lower() in video_extensions:output_file = output_path / f"{video_file.stem}_thumbnail.jpg"success = self.extract_thumbnail(str(video_file), str(output_file))results[video_file.name] = successreturn results# 使用示例
if __name__ == "__main__":# 创建FFmpeg工具实例ffmpeg_tools = FFmpegTools()# 获取视频信息video_info = ffmpeg_tools.get_video_info('D:/Vidoes/index.mp4')print("视频信息:", video_info)# 转换格式# ffmpeg_tools.convert_format("input.avi", "output.mp4")# 裁剪视频# ffmpeg_tools.cut_video("input.mp4", "output_cut.mp4", "00:00:10", "00:01:00")# 提取音频# ffmpeg_tools.extract_audio("input.mp4", "audio.mp3")# 改变分辨率# ffmpeg_tools.change_resolution('D:/Vidoes/index.mp4', 'results/videos/index.mp4', 1280, 720)# 提取缩略图# ffmpeg_tools.extract_thumbnail('D:/Vidoes/index.mp4', "results/images/thumbnail.jpg")# 批量处理# results = ffmpeg_tools.batch_convert("./videos", "./converted", "mp4")# print("批量转换结果:", results)
提取关键帧
import subprocess
import os
import re
from pathlib import Path
from typing import List, Optional, Dict
import jsonclass FFmpegKeyframeExtractor:def __init__(self, ffmpeg_path: str = "ffmpeg", ffprobe_path: str = "ffprobe"):self.ffmpeg_path = ffmpeg_pathself.ffprobe_path = ffprobe_pathdef extract_keyframes(self,video_path: str,output_dir: str = "./keyframes",output_pattern: str = "keyframe_%04d.jpg",quality: int = 2,method: str = "pict_type",interval: Optional[float] = None,max_frames: Optional[int] = None) -> List[str]:"""提取视频关键帧:param video_path: 视频文件路径:param output_dir: 输出目录:param output_pattern: 输出文件名模式:param quality: 输出质量 (1-31, 1最好):param method: 提取方法 ('pict_type', 'nokey', 'interval'):param interval: 时间间隔(秒),method='interval'时使用:param max_frames: 最大提取帧数:return: 提取的关键帧文件列表"""if not os.path.exists(video_path):raise FileNotFoundError(f"视频文件不存在: {video_path}")os.makedirs(output_dir, exist_ok=True)output_path = os.path.join(output_dir, output_pattern)cmd = [self.ffmpeg_path, '-i', video_path]if method == "pict_type":# 使用 pict_type 方法提取 I 帧cmd.extend(['-vf', "select=eq(pict_type\\,I)",'-vsync', 'vfr'])elif method == "nokey":# 使用 skip_frame 方法cmd.extend(['-skip_frame', 'nokey', '-vsync', 'vfr'])elif method == "interval" and interval:# 按时间间隔提取cmd.extend(['-vf', f"fps=1/{interval}"])else:raise ValueError("不支持的提取方法")# 添加输出参数cmd.extend(['-q:v', str(quality),'-f', 'image2',output_path,'-y'])# 限制最大帧数if max_frames:cmd.extend(['-vframes', str(max_frames)])try:result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)if result.returncode != 0:raise Exception(f"FFmpeg 错误: {result.stderr}")# 获取生成的文件列表return self._get_generated_frames(output_dir, output_pattern)except subprocess.TimeoutExpired:raise Exception("提取超时,可能视频文件太大")except Exception as e:raise Exception(f"提取关键帧失败: {e}")def _get_generated_frames(self, output_dir: str, pattern: str) -> List[str]:"""获取生成的关键帧文件列表"""frames = []pattern_re = pattern.replace('%04d', r'(\d{4})').replace('%d', r'(\d+)')for file in os.listdir(output_dir):if re.match(pattern_re.replace('.jpg', r'\.jpg'), file):frames.append(os.path.join(output_dir, file))return sorted(frames)def get_keyframe_timestamps(self, video_path: str) -> List[float]:"""获取关键帧时间戳(不提取图片):param video_path: 视频文件路径:return: 关键帧时间戳列表(秒)"""cmd = [self.ffmpeg_path,'-i', video_path,'-vf', "select=eq(pict_type\\,I)",'-f', 'null','-']try:result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)if result.returncode != 0:raise Exception(f"FFmpeg 错误: {result.stderr}")# 从 stderr 中解析时间戳timestamps = []lines = result.stderr.split('\n')for line in lines:if 'pts_time:' in line:# 解析时间戳: pts_time:1.234match = re.search(r'pts_time:(\d+\.\d+)', line)if match:timestamps.append(float(match.group(1)))return timestampsexcept Exception as e:raise Exception(f"获取关键帧时间戳失败: {e}")def extract_keyframes_at_timestamps(self,video_path: str,timestamps: List[float],output_dir: str = "./keyframes",output_pattern: str = "frame_%04d.jpg",quality: int = 2) -> List[str]:"""在指定时间戳提取帧:param video_path: 视频文件路径:param timestamps: 时间戳列表(秒):param output_dir: 输出目录:param output_pattern: 输出文件名模式:param quality: 输出质量:return: 提取的帧文件列表"""if not timestamps:return []os.makedirs(output_dir, exist_ok=True)frames = []for i, timestamp in enumerate(timestamps):output_file = os.path.join(output_dir, output_pattern.replace('%04d', f"{i:04d}"))cmd = [self.ffmpeg_path,'-i', video_path,'-ss', str(timestamp),'-vframes', '1','-q:v', str(quality),output_file,'-y']try:result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)if result.returncode == 0:frames.append(output_file)else:print(f"在时间戳 {timestamp} 提取失败: {result.stderr}")except Exception as e:print(f"在时间戳 {timestamp} 提取出错: {e}")return framesdef get_video_info(self, video_path: str) -> Dict:"""获取视频信息:param video_path: 视频文件路径:return: 视频信息字典"""cmd = [self.ffprobe_path,'-v', 'quiet','-print_format', 'json','-show_format','-show_streams',video_path]try:result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)if result.returncode == 0:return json.loads(result.stdout)else:raise Exception("无法获取视频信息")except Exception as e:raise Exception(f"获取视频信息失败: {e}")def batch_extract_keyframes(self,input_dir: str,output_base_dir: str = "./keyframes_output",video_extensions: List[str] = ['.mp4', '.avi', '.mov', '.mkv', '.flv'],**kwargs) -> Dict[str, List[str]]:"""批量提取多个视频的关键帧:param input_dir: 输入目录:param output_base_dir: 输出基础目录:param video_extensions: 视频文件扩展名:return: 每个视频的提取结果字典"""results = {}for ext in video_extensions:for video_file in Path(input_dir).glob(f"*{ext}"):try:video_name = video_file.stemoutput_dir = os.path.join(output_base_dir, video_name)frames = self.extract_keyframes(str(video_file),output_dir=output_dir,**kwargs)results[video_file.name] = {'success': True,'frame_count': len(frames),'output_dir': output_dir}print(f"成功提取 {video_file.name}: {len(frames)} 个关键帧")except Exception as e:results[video_file.name] = {'success': False,'error': str(e)}print(f"提取 {video_file.name} 失败: {e}")return results# 使用示例
if __name__ == "__main__":# 创建提取器实例extractor = FFmpegKeyframeExtractor()video_name = 'index'# 示例1: 提取所有关键帧try:frames = extractor.extract_keyframes(video_path=f"results/videos/{video_name}.mp4",output_dir=f"results/images/{video_name}/",method="pict_type",quality=2)print(f"提取了 {len(frames)} 个关键帧")except Exception as e:print(f"提取失败: {e}")# 示例2: 获取关键帧时间戳try:timestamps = extractor.get_keyframe_timestamps("input.mp4")print(f"找到 {len(timestamps)} 个关键帧时间戳")print("前5个时间戳:", timestamps[:5])except Exception as e:print(f"获取时间戳失败: {e}")# 示例3: 按时间间隔提取try:frames = extractor.extract_keyframes(video_path="input.mp4",output_dir="./interval_frames",method="interval",interval=10, # 每10秒一帧quality=2)print(f"按间隔提取了 {len(frames)} 帧")except Exception as e:print(f"间隔提取失败: {e}")# 示例4: 批量处理try:batch_results = extractor.batch_extract_keyframes(input_dir="./videos",output_base_dir="./batch_keyframes",method="pict_type",max_frames=100 # 每个视频最多100帧)print(f"批量处理完成,成功: {sum(1 for r in batch_results.values() if r['success'])} 个视频")except Exception as e:print(f"批量处理失败: {e}")
3、音频内容解析为文字
Whisper
Whisper 是 OpenAI 开发的开源语音识别系统,能够将语音转换为文本,支持多种语言。
- 核心特性:
- 多语言支持:支持99种语言的语音识别
- 多种模型尺寸:从 tiny 到 large,满足不同精度和速度需求
- 零样本学习:无需针对特定语言训练即可识别
- 多种任务:语音识别、翻译、语言识别
- 模型规格:
模型 | 参数量 | 所需显存 | 相对速度 | 适合场景 |
---|---|---|---|---|
tiny | 39M | ~1GB | 32x | 快速转录,精度要求低 |
base | 74M | ~1GB | 16x | 平衡速度和精度 |
small | 244M | ~2GB | 6x | 较好的精度 |
medium | 769M | ~5GB | 2x | 高精度 |
large | 1550M | ~10GB | 1x | 最高精度 |
安装
# 基础安装
pip install openai-whisper
# 安装 GPU 版本(如果有NVIDIA显卡)
pip install openai-whisper
pip install git+https://github.com/openai/whisper.git
pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118
常用指令
# 基本转录(自动检测语言)
whisper audio.mp3# 指定输出文件
whisper audio.mp3 --output_dir ./transcripts# 指定模型大小
whisper audio.mp3 --model small# 指定语言(中文)
whisper audio.mp3 --language Chinese# 同时输出多种格式
whisper audio.mp3 --output_format txt srt vtt# 只输出字幕文件(不输出txt)
whisper audio.mp3 --task translate --output_format srt# 使用GPU加速
whisper audio.mp3 --device cuda# 指定精度(fp16更快)
whisper audio.mp3 --fp16 False# 批量处理多个文件
whisper audio1.mp3 audio2.wav audio3.m4a --model base# 处理视频中的音频
whisper video.mp4 --model small# 生成带时间戳的SRT字幕
whisper audio.mp3 --output_format srt# 生成VTT字幕(网页常用)
whisper audio.mp3 --output_format vtt# 生成TSV格式(带置信度)
whisper audio.mp3 --output_format tsv# 生成JSON详细输出
whisper audio.mp3 --output_format json# 使用CPU多线程
whisper audio.mp3 --threads 8# 指定显存限制
whisper audio.mp3 --device cuda --compute_type int8# 只转录特定时间段
whisper audio.mp3 --initial_prompt "这是技术讲座" --condition_on_previous_text True
参数 | 说明 | 示例 |
---|---|---|
--model | 模型大小 | --model small |
--language | 指定语言 | --language zh |
--output_dir | 输出目录 | --output_dir ./results |
--output_format | 输出格式 | --output_format txt srt |
--fp16 | 使用半精度 | --fp16 False (CPU时关闭) |
--device | 计算设备 | --device cuda |
--threads | CPU线程数 | --threads 8 |
--temperature | 采样温度 | --temperature 0.2 |
python 使用示例
import torch
import whisper
import time
import warnings
import opencc
warnings.filterwarnings("ignore")def optimize_gpu_memory():"""优化 GPU 内存使用"""torch.cuda.empty_cache()if torch.cuda.is_available():# 设置更高效的内存分配策略torch.backends.cudnn.benchmark = Truetorch.backends.cuda.matmul.allow_tf32 = Truetorch.backends.cudnn.allow_tf32 = Truedef get_gpu_info():"""获取 GPU 信息"""if torch.cuda.is_available():gpu_name = torch.cuda.get_device_name(0)total_memory = torch.cuda.get_device_properties(0).total_memory / 1024 ** 3allocated = torch.cuda.memory_allocated() / 1024 ** 3cached = torch.cuda.memory_reserved() / 1024 ** 3free = total_memory - allocatedprint(f"🎯 GPU: {gpu_name}")print(f"💾 总内存: {total_memory:.1f} GB")print(f"📊 已使用: {allocated:.1f} GB")print(f"📈 缓存: {cached:.1f} GB")print(f"🆓 可用: {free:.1f} GB")return freereturn 0def transcribe_with_gpu_optimized(audio_path, output_path, model_size="medium"):"""针对 RTX 3060 6GB 优化的转录函数"""print("=" * 60)print("🤖 Whisper GPU 加速转录")print("=" * 60)# 优化 GPU 内存optimize_gpu_memory()free_memory = get_gpu_info()# 根据可用内存选择合适的模型if free_memory < 2.0:model_size = "base"print("⚠️ 内存较低,自动切换到 base 模型")elif free_memory < 4.0:model_size = "small"print("ℹ️ 内存适中,使用 small 模型")elif free_memory < 8.0:model_size = "medium"print("✅ 内存充足,使用 medium 模型")else:model_size = "large"print("✅ 使用 large 模型")try:# 加载模型到 GPUprint(f"\n📦 正在加载 {model_size} 模型到 GPU...")start_time = time.time()model = whisper.load_model(model_size).cuda()load_time = time.time() - start_timeprint(f"✅ 模型加载完成,耗时: {load_time:.2f} 秒")# 显示模型内存占用model_memory = sum(p.numel() * p.element_size() for p in model.parameters()) / 1024 ** 3print(f"🧠 模型内存占用: {model_memory:.2f} GB")# 使用 GPU 进行转录print(f"\n🎧 正在转录: {audio_path}")start_time = time.time()# 使用强化的 initial_promptinitial_prompt = """请使用简体中文进行转录。以下是普通话内容,请输出标准简体中文。"""result = model.transcribe(audio_path,fp16=True, # GPU 必须开启,大幅提升速度language="zh", # 指定中文,提高准确度verbose=False, # 显示进度temperature=0.0, # 确定性输出best_of=5, # 平衡质量和速度beam_size=3, # 适合 6GB 内存的设置patience=1.0, # 提前停止,节省时间task="transcribe", # 明确指定是转录,不是翻译condition_on_previous_text=False, # 不依赖前文,减少错误累积initial_prompt=initial_prompt # 引导模型使用简体中文)transcribe_time = time.time() - start_timeprint(f"✅ 转录完成,耗时: {transcribe_time:.2f} 秒")# 性能分析if result["segments"]:audio_duration = result["segments"][-1]["end"]speed_ratio = transcribe_time / audio_durationrealtime_factor = audio_duration / transcribe_timeprint(f"\n📊 性能报告:")print(f" 音频时长: {audio_duration:.1f} 秒")print(f" 转录时间: {transcribe_time:.1f} 秒")print(f" 实时系数: {realtime_factor:.2f}x (比实时快 {realtime_factor:.1f} 倍)")print(f" 处理速度: {speed_ratio:.3f} 秒/音频秒")# 输出结果print(f"\n📝 转录结果 ({len(result['text'])} 字符):")print("=" * 50)print(result["text"])# 简繁转换converter = opencc.OpenCC('t2s')simplified_result = converter.convert(result['text'])print("\n简繁转换后:")print("=" * 50)print(simplified_result)with open(output_path, 'w', encoding='utf-8') as file:file.write(simplified_result)file.close()# 显示分段信息(可选)print(f"\n📋 分段信息 ({len(result['segments'])} 段):")for i, segment in enumerate(result["segments"][:5]): # 只显示前5段print(f"[{segment['start']:6.1f}s - {segment['end']:6.1f}s] {segment['text'][:50]}...")if len(result["segments"]) > 5:print(f"... 还有 {len(result['segments']) - 5} 段")return resultexcept torch.cuda.OutOfMemoryError:print("❌ GPU 内存不足!尝试以下方案:")print(" 1. 使用更小的模型: base 或 small")print(" 2. 关闭其他占用 GPU 的程序")print(" 3. 重启 Python 内核释放内存")return Noneexcept Exception as e:print(f"❌ 发生错误: {e}")return None# 使用示例
if __name__ == "__main__":# 替换为你的音频文件路径audio_file = "audios/Java_Review.mp3"audio_text_file = 'text_content/Java_Review.txt'# 你可以尝试不同的模型大小# "tiny" - 最快,精度最低# "base" - 平衡# "small" - 推荐用于 6GB GPU# "medium" - 高质量,可能需要更多内存# "large" - 消耗最多result = transcribe_with_gpu_optimized(audio_file, audio_text_file, model_size="large")if result:print("\n🎉 转录成功完成!")else:print("\n❌ 转录失败")
Paddle
解析图片结果的格式
[{# 输入信息'input_path': None,'page_index': None,# 文档预处理结果'doc_preprocessor_res': {...},# 检测到的文本多边形区域'dt_polys': [...],# 模型设置'model_settings': {...},# 文本检测参数'text_det_params': {...},# 文本类型'text_type': 'general',# 识别阈值'text_rec_score_thresh': 0.0,# 识别出的文本内容'rec_texts': [...],# 识别置信度分数'rec_scores': [...],# 识别文本对应的多边形区域'rec_polys': [...],# 可视化字体对象'vis_fonts': [...],# 文本行方向角度'textline_orientation_angles': [...],# 识别文本框坐标'rec_boxes': array(...)
}]
paddle 的 predict 参数
参数名称 | 类型 | 默认值 | 说明 |
---|---|---|---|
input | str/np.array | 必填 | 输入图像,可以是文件路径或numpy数组 |
use_doc_orientation_classify | bool | None | 是否使用文档方向分类 (检测文档整体旋转角度) |
use_doc_unwarping | bool | None | 是否使用文档展平功能 (纠正弯曲文档) |
use_textline_orientation | bool | None | 是否使用文本行方向检测 (替代旧的use_angle_cls) |
text_det_limit_side_len | int | None | 文本检测的图像边长限制(像素) |
text_det_limit_type | str | None | 文本检测的尺寸限制类型: |
‘max’ | - | - | 限制最大边长 |
‘min’ | - | - | 限制最小边长 |
text_det_thresh | float | None | 文本检测的阈值(0-1), 值越高检测越严格 |
text_det_box_thresh | float | None | 文本框检测的阈值(0-1), 过滤低置信度文本框 |
text_det_unclip_ratio | float | None | 文本框扩展比例,控制检测框的大小: >1 - 扩大文本框<1 - 缩小文本框 |
text_rec_score_thresh | float | None | 文本识别的置信度阈值(0-1), 过滤低置信度识别结果 |
部分参数的用法
参数 | 作用阶段 | 功能 | 默认值 |
---|---|---|---|
文本检测阈值 (text_det_thresh) | 文本检测阶段 | 判断某个像素点是否为文字的概率阈值 | ~0.3 值越高,检测到的文字区域越少但更准确 |
文本框检测阈值 (text_det_box_thresh) | 文本框生成阶段 | 过滤低质量文本框的阈值 | ~0.6 值越高,保留的文本框质量越高但数量越少 |
文本识别置信度 (text_rec_score_thresh) | 文本识别阶段 | 过滤低置信度识别结果的阈值 | ~0.5 |
安装
# 示例:CUDA 11.8
pip install paddlepaddle-gpu==2.5.2.post118 -f https://www.paddlepaddle.org.cn/whl/linux/mkl/avx/stable.html
python 示例
import os
from datetime import datetime
from typing import List, Dict, Any
from paddleocr import PaddleOCRclass OCRResultProcessor:def __init__(self, output_file: str = "pic_content.txt"):"""初始化OCR结果处理器Args:output_file: 输出文件名"""self.output_file = output_fileself.processed_count = 0# 创建文件并写入文件头(如果文件不存在)if not os.path.exists(self.output_file):self._write_file_header()def _write_file_header(self):"""写入文件头"""with open(self.output_file, 'w', encoding='utf-8') as f:f.write("PaddleOCR 图片内容解析结果\n")f.write("=" * 60 + "\n")f.write(f"创建时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")f.write("=" * 60 + "\n\n")def process_and_append_result(self, result: List[Dict], image_path: str = None):try:# 提取高置信度文本high_confidence_texts = self._extract_high_confidence_texts(result)if not high_confidence_texts:print("⚠️ 未找到高置信度文本,跳过保存")return False# 准备写入内容content_to_write = self._format_content(result, high_confidence_texts, image_path)# 追加到文件with open(self.output_file, 'a', encoding='utf-8') as f:f.write(content_to_write)self.processed_count += 1print(f"✅ 第 {self.processed_count} 张图片内容已追加到 {self.output_file}")return Trueexcept Exception as e:print(f"❌ 处理OCR结果时出错: {e}")return Falsedef _extract_high_confidence_texts(self, result: List[Dict], confidence_threshold: float = 0.7) -> List[str]:high_confidence_texts = []for page_result in result:rec_texts = page_result.get('rec_texts', [])rec_scores = page_result.get('rec_scores', [])for text, score in zip(rec_texts, rec_scores):if score >= confidence_threshold:high_confidence_texts.append(text)return high_confidence_textsdef _format_content(self, result: List[Dict], high_confidence_texts: List[str], image_path: str = None) -> str:content = []# 添加分隔符和图片信息content.append(f"\n{'=' * 60}")content.append(f"📷 图片 {self.processed_count + 1}")content.append(f"{'=' * 60}")if image_path:content.append(f"文件: {os.path.basename(image_path)}")content.append(f"处理时间: {datetime.now().strftime('%H:%M:%S')}")content.append(f"识别到 {len(high_confidence_texts)} 个高置信度文本")content.append("-" * 40)# 添加识别结果if high_confidence_texts:content.append("【识别内容】")for i, text in enumerate(high_confidence_texts, 1):content.append(f"{i:2d}. {text}")else:content.append("⚠️ 未识别到高置信度文本")# 添加统计信息total_texts = sum(len(page.get('rec_texts', [])) for page in result)avg_confidence = self._calculate_average_confidence(result)content.append("-" * 40)content.append(f"统计: 总文本{total_texts}个, 平均置信度{avg_confidence:.3f}")content.append("\n") # 空行分隔return "\n".join(content)def _calculate_average_confidence(self, result: List[Dict]) -> float:"""计算平均置信度"""total_score = 0total_count = 0for page_result in result:rec_scores = page_result.get('rec_scores', [])if rec_scores:total_score += sum(rec_scores)total_count += len(rec_scores)return total_score / total_count if total_count > 0 else 0def get_summary(self) -> Dict[str, Any]:"""获取处理统计信息"""return {"processed_count": self.processed_count,"output_file": self.output_file,"file_size": os.path.getsize(self.output_file) if os.path.exists(self.output_file) else 0}# 使用示例
def analysis_keyframes(file_name=None):if file_name is None:file_name = 'default'# 初始化处理器processor = OCRResultProcessor(f'results/text_content/{file_name}.txt')# 初始化OCR(只需要一次)ocr = PaddleOCR(use_textline_orientation=True,lang="ch",device='gpu')folder_path = f'results/images/{file_name}/'all_items = os.listdir(folder_path)image_paths = []for item in all_items:item_path = os.path.join(folder_path, item)if os.path.isfile(item_path):image_paths.append(folder_path + item)for image_path in image_paths:if not os.path.exists(image_path):print(f"⚠️ 图片不存在: {image_path}")continuetry:# 进行OCR识别print(f"🔍 正在处理: {image_path}")# 图片路径有中文或特殊字符容易找不到图片result = ocr.predict(image_path)# 处理并追加结果processor.process_and_append_result(result, image_path)except Exception as e:print(f"❌ 处理图片 {image_path} 时出错: {e}")# 输出统计信息summary = processor.get_summary()print(f"\n🎯 处理完成!")print(f"• 总共处理: {summary['processed_count']} 张图片")print(f"• 输出文件: {summary['output_file']}")print(f"• 文件大小: {summary['file_size']} 字节")# 简单的单图片处理函数
def process_single_image(result, image_path=None):"""快速处理单张图片的便捷函数"""processor = OCRResultProcessor("pic_content.txt")success = processor.process_and_append_result(result, image_path)return successif __name__ == "__main__":analysis_keyframes()
4、GPT 类模型进行文字的统合分析与总结
安装 ollama + 下载 qwen3 model + python 本地调用
Ollama + Qwen3 model
安装
- ollama:https://ollama.com/
- qwen3 model:
ollama pull qwen3:8b
python 示例
import os
import re
from datetime import datetimeimport ollamaclass MultiTextAnalyzer:def __init__(self, model: str = "qwen3:8b"):self.model = modelself.client = ollamadef read_text_file(self, file_path: str) -> str:"""读取文本文件内容,自动处理编码"""try:# 尝试UTF-8编码with open(file_path, 'r', encoding='utf-8') as f:return f.read()except UnicodeDecodeError:# 尝试GBK编码try:with open(file_path, 'r', encoding='gbk') as f:return f.read()except Exception as e:raise Exception(f"无法读取文件 {file_path}: {e}")except Exception as e:raise Exception(f"读取文件失败 {file_path}: {e}")def create_comprehensive_prompt(self, frame_text: str, audio_text: str) -> str:"""创建综合理解提示词"""prompt = f"""两个文本分别为通过视频提取出的关键帧上的文字以及音频转录的文字。在综合理解后,整理保存为一个完整的markdown格式文档。请仔细分析以下两个文本内容,进行去重、补全、纠错和结构化整理:【关键帧文本内容】
{frame_text}【音频转录文本内容】
{audio_text}请按照以下要求生成Markdown文档:
1. 对两个文本进行深度融合,去除重复内容
2. 纠正明显的识别错误和错别字
3. 补充不完整的句子和概念
4. 按照逻辑主题进行结构化组织
5. 使用恰当的Markdown格式(标题、列表、代码块等)
6. 保持技术准确性和内容完整性生成一个专业、清晰、易于阅读的技术文档。"""return promptdef analyze_and_integrate(self, frame_text_path: str, audio_text_path: str, output_path: str = None) -> str:"""综合分析两个文本并生成Markdown文档"""print("开始读取文本文件...")# 读取两个文本文件try:frame_text = self.read_text_file(frame_text_path)audio_text = self.read_text_file(audio_text_path)print(f"✓ 关键帧文本: {len(frame_text)} 字符")print(f"✓ 音频转录文本: {len(audio_text)} 字符")except Exception as e:print(f"✗ 文件读取失败: {e}")return None# 创建提示词prompt = self.create_comprehensive_prompt(frame_text, audio_text)print("正在调用Qwen3模型进行综合分析...")print("=" * 60)# 使用流式调用messages = [{"role": "user", "content": prompt}]full_response = ""print("生成结果:")print("=" * 60)try:stream = self.client.chat(model=self.model,messages=messages,stream=True,options={"temperature": 0.3, # 较低温度以保证准确性"top_p": 0.9,"num_ctx": 4096 # 上下文长度})for chunk in stream:content = chunk['message']['content']print(content, end='', flush=True)full_response += contentexcept Exception as e:print(f"✗ 模型调用失败: {e}")return None# 保存结果if output_path:self.save_markdown(full_response, output_path)else:# 默认保存路径timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")output_path = f"integrated_document_{timestamp}.md"self.save_markdown(full_response, output_path)return full_responsedef save_markdown(self, content: str, output_path: str):"""保存Markdown文档"""try:# 去除模型思考过程content = re.sub(r'<think>.*?</think>', '', content, flags=re.DOTALL)# 确保输出目录存在os.makedirs(os.path.dirname(output_path), exist_ok=True)with open(output_path, 'w', encoding='utf-8') as f:f.write(content)print(f"\n\n✓ Markdown文档已保存: {output_path}")print(f"✓ 文件大小: {len(content)} 字符")except Exception as e:print(f"✗ 保存文件失败: {e}")def batch_process(self, text_pairs: list, output_dir: str = "output_docs"):"""批量处理多对文本text_pairs: [(frame_path1, audio_path1), (frame_path2, audio_path2), ...]"""results = []for i, (frame_path, audio_path) in enumerate(text_pairs):print(f"\n{'=' * 50}")print(f"处理第 {i + 1} 对文本:")print(f"关键帧: {frame_path}")print(f"音频: {audio_path}")print(f"{'=' * 50}")# 生成输出文件名output_filename = f"integrated_doc_{i + 1}_{datetime.now().strftime('%H%M%S')}.md"output_path = os.path.join(output_dir, output_filename)result = self.analyze_and_integrate(frame_path, audio_path, output_path)if result:results.append({'frame_path': frame_path,'audio_path': audio_path,'output_path': output_path,'content': result})return results# 使用示例
def main():# 初始化分析器,使用qwen2模型analyzer = MultiTextAnalyzer(model="qwen3:8b")title = '给大家普及一下过Java面试需要达到的强度_哔哩哔哩_bilibili'# 定义文本文件路径frame_text_path = f"results/text_content/{title}_image_content.txt" # 替换为您的实际路径audio_text_path = f"results/text_content/{title}_audio_content.txt" # 替换为您的实际路径# 指定输出路径(可选)output_path = f"results/summary/{title}_summary.md"# 执行分析整合result = analyzer.analyze_and_integrate(frame_text_path=frame_text_path,audio_text_path=audio_text_path,output_path=output_path)if result:print("\n✅ 文档整合完成!")else:print("\n❌ 处理失败")def batch_example():"""批量处理示例"""analyzer = MultiTextAnalyzer(model="qwen3:8b")# 多对文本处理text_pairs = [("video1_frames.txt", "video1_audio.txt"),("video2_frames.txt", "video2_audio.txt"),("video3_frames.txt", "video3_audio.txt")]results = analyzer.batch_process(text_pairs, "batch_output")print(f"\n批量处理完成,共生成 {len(results)} 个文档")if __name__ == "__main__":# 运行单个处理示例main()# 或者运行批量处理# batch_example()
5、推送
TODO:发送邮件或自动发布文章
6、汇总
本地可用的一个所有代码汇总
Main.py
import multiprocessing as mp
import os
import shutilfrom common_utils import get_page_info_requests
from part6_summarize_content import MultiTextAnalyzer
from part1_download_video import MultimediaDownloader
from part2_extract_key_frames import KeyFramesExtractor
from part3_extract_audio import AudioExtractordef _run_paddle_task(video_file_name, content_path, title):"""全局函数:执行Paddle OCR任务"""try:from part4_ocr_frames import analysis_keyframesanalysis_keyframes(os.path.join(content_path, f'{title}_image_content.txt'),video_file_name)return Trueexcept Exception as e:print(f"Paddle任务失败: {str(e)}")import tracebacktraceback.print_exc()return Falsedef _run_whisper_task(video_file_name, audio_path, content_path, title):"""全局函数:执行Whisper任务"""try:from part5_transcribe_audio import transcribe_audio_to_txttranscribe_audio_to_txt(os.path.join(audio_path, f'{video_file_name}.mp3'),os.path.join(content_path, f'{title}_audio_content.txt'))return Trueexcept Exception as e:print(f"Whisper任务失败: {str(e)}")import tracebacktraceback.print_exc()return Falsedef _worker_wrapper(func, args, result_queue):"""全局函数:进程工作包装器"""try:result = func(*args)result_queue.put(result)except Exception as e:print(f"进程执行出错: {str(e)}")import tracebacktraceback.print_exc()result_queue.put(False)def run_in_separate_process(target_func, *args, timeout=300):"""在独立进程中安全运行函数(完全Windows兼容):param target_func: 目标函数(必须是全局函数):param args: 传递给目标函数的参数:param timeout: 超时时间(秒):return: 函数执行结果"""# 创建队列用于获取结果result_queue = mp.Queue()# 创建进程(使用全局可见的_worker_wrapper)p = mp.Process(target=_worker_wrapper,args=(target_func, args, result_queue))# 启动进程p.start()# 等待进程完成或超时p.join(timeout)# 检查是否超时if p.is_alive():print(f"任务超时({timeout}秒),终止进程")p.terminate()p.join(5) # 再给5秒时间清理if p.is_alive():print("警告:进程无法终止,可能已变成僵尸进程")return False# 检查结果if not result_queue.empty():return result_queue.get()# 如果没有结果,可能是进程崩溃了print("警告:进程未返回结果,可能已崩溃")return Falseif __name__ == '__main__':video_path = 'results/videos'audio_path = 'results/audios'frame_path = 'results/images'content_path = 'results/text_content'summary_path = 'results/summary'video_file_name = ''# 0. 确定视频 urlvideo_url = 'https://www.bilibili.com/video/xxxxxx'title = get_page_info_requests(video_url).get('title')# 1. 使用 yt-dlp 下载视频downloader = MultimediaDownloader()video_file_name = MultimediaDownloader.download_video(downloader, video_url, video_path, video_title=title)# 2. 使用 ffmpeg 处理视频:截取关键帧 与 提取音频extractor = KeyFramesExtractor()try:frames = KeyFramesExtractor.extract_keyframes(extractor, video_path=f"{video_path}/{video_file_name}.mp4",output_dir=f"{frame_path}/{video_file_name}/", method="pict_type", quality=2)print(f"提取了 {len(frames)} 个关键帧")except Exception as e:print(f"提取失败: {e}")# 提取音频ffmpeg_tools = AudioExtractor()AudioExtractor.extract_audio(ffmpeg_tools, f"{video_path}/{video_file_name}.mp4", f"{audio_path}/{video_file_name}.mp3")# 3. 使用 paddle 识别截图中的文字print("开始执行Paddle OCR任务...")if not run_in_separate_process(_run_paddle_task, video_file_name, content_path, title):print("Paddle OCR任务失败,程序终止")exit(1)# 4. 使用 whisper 转换音频为文字print("开始执行Whisper音频转文字任务...")if not run_in_separate_process(_run_whisper_task, video_file_name, audio_path, content_path, title):print("Whisper任务失败,程序终止")exit(1)# 5. GPT/LLM 等模型处理文字内容,分析与总结得到报告analyzer = MultiTextAnalyzer()analyzer.analyze_and_integrate(f'{content_path}/{title}_image_content.txt', f'{content_path}/{title}_audio_content.txt',f'{summary_path}/{title}_summary.md')print('Summary Done')# 6. 发送 EMAIL ? 或者 推送?# 7. 删除文件# 删除视频os.remove(f'{video_path}/{video_file_name}.mp4')# 删除音频os.remove(f'{audio_path}/{video_file_name}.mp3')# 删除图片shutil.rmtree(f'{frame_path}/{video_file_name}')# 删除文本os.remove(f'{content_path}/{title}_image_content.txt')os.remove(f'{content_path}/{title}_audio_content.txt')print("delete all success")
part1_download_video.py
import yt_dlp
import timeclass MultimediaDownloader:def __init__(self):# 初始化 yt-dlp 的配置self.ydl_opts = {}def download_video(self, url, output_path='.', format='bestvideo+bestaudio/best', video_title=None, save_origin_name=None):"""下载视频:param url: 视频链接:param output_path: 输出路径:param format: 视频格式"""if video_title is not None:title = video_titleelse:title = 'default'save_name = f'{title}_{int(time.time())}'print(f'origin_name {save_name}')if save_origin_name is None:save_name = save_name.replace(title, 'temp')self.ydl_opts.update({'format': format,'outtmpl': f'{output_path}/{save_name}.%(ext)s','restrictfilenames': True,'noprogress': True,'quiet': True})with yt_dlp.YoutubeDL(self.ydl_opts) as ydl:ydl.download([url])return save_namedef download_audio(self, url, output_path='.', format='mp3'):"""下载音频:param url: 视频链接:param output_path: 输出路径:param format: 音频格式"""self.ydl_opts.update({'format': 'bestaudio/best','postprocessors': [{'key': 'FFmpegExtractAudio','preferredcodec': format,'preferredquality': '192',}],'outtmpl': f'{output_path}/%(title)s.%(ext)s',})with yt_dlp.YoutubeDL(self.ydl_opts) as ydl:ydl.download([url])def download_subtitles(self, url, output_path='.', lang='zh'):"""下载字幕:param url: 视频链接:param output_path: 输出路径:param lang: 字幕语言"""self.ydl_opts.update({'writesubtitles': True, # 启用字幕下载'writeautomaticsub': True, # 也下载自动生成的字幕'subtitleslangs': [lang, 'en', 'zh-Hans', 'zh'], # 多语言备选'skip_download': True, # 关键:跳过视频下载'outtmpl': f'{output_path}/%(title)s_{lang}.%(ext)s',})with yt_dlp.YoutubeDL(self.ydl_opts) as ydl:ydl.download([url])def download_playlist(self, url, output_path='.', format='best'):"""下载播放列表:param url: 播放列表链接:param output_path: 输出路径:param format: 视频格式"""self.ydl_opts.update({'format': format,'outtmpl': f'{output_path}/%(playlist)s/%(playlist_index)s-%(title)s.%(ext)s',})with yt_dlp.YoutubeDL(self.ydl_opts) as ydl:ydl.download([url])def limit_download_speed(self, url, output_path='.', format='best', speed_limit='1M'):"""限制下载速度:param url: 视频链接:param output_path: 输出路径:param format: 视频格式:param speed_limit: 速度限制(例如 '1M' 表示 1MB/s)"""self.ydl_opts.update({'format': format,'outtmpl': f'{output_path}/%(title)s.%(ext)s','ratelimit': speed_limit,})with yt_dlp.YoutubeDL(self.ydl_opts) as ydl:ydl.download([url])def skip_existing_files(self, url, output_path='.', format='best'):"""跳过已下载的文件:param url: 视频链接:param output_path: 输出路径:param format: 视频格式"""self.ydl_opts.update({'format': format,'outtmpl': f'{output_path}/%(title)s.%(ext)s','skip_download': True,})with yt_dlp.YoutubeDL(self.ydl_opts) as ydl:ydl.download([url])
part2_extract_key_frames.py
import subprocess
import os
import re
from pathlib import Path
from typing import List, Optional, Dict
import jsonclass KeyFramesExtractor:def __init__(self, ffmpeg_path: str = "ffmpeg", ffprobe_path: str = "ffprobe"):self.ffmpeg_path = ffmpeg_pathself.ffprobe_path = ffprobe_pathdef extract_keyframes(self,video_path: str,output_dir: str = "./keyframes",output_pattern: str = "keyframe_%04d.jpg",quality: int = 2,method: str = "pict_type",interval: Optional[float] = None,max_frames: Optional[int] = None) -> List[str]:"""提取视频关键帧:param video_path: 视频文件路径:param output_dir: 输出目录:param output_pattern: 输出文件名模式:param quality: 输出质量 (1-31, 1最好):param method: 提取方法 ('pict_type', 'nokey', 'interval'):param interval: 时间间隔(秒),method='interval'时使用:param max_frames: 最大提取帧数:return: 提取的关键帧文件列表"""if not os.path.exists(video_path):raise FileNotFoundError(f"视频文件不存在: {video_path}")os.makedirs(output_dir, exist_ok=True)output_path = os.path.join(output_dir, output_pattern)cmd = [self.ffmpeg_path, '-i', video_path]if method == "pict_type":# 使用 pict_type 方法提取 I 帧cmd.extend(['-vf', "select=eq(pict_type\\,I)",'-vsync', 'vfr'])elif method == "nokey":# 使用 skip_frame 方法cmd.extend(['-skip_frame', 'nokey', '-vsync', 'vfr'])elif method == "interval" and interval:# 按时间间隔提取cmd.extend(['-vf', f"fps=1/{interval}"])else:raise ValueError("不支持的提取方法")# 添加输出参数cmd.extend(['-q:v', str(quality),'-f', 'image2',output_path,'-y'])# 限制最大帧数if max_frames:cmd.extend(['-vframes', str(max_frames)])try:result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)if result.returncode != 0:raise Exception(f"FFmpeg 错误: {result.stderr}")# 获取生成的文件列表return self._get_generated_frames(output_dir, output_pattern)except subprocess.TimeoutExpired:raise Exception("提取超时,可能视频文件太大")except Exception as e:raise Exception(f"提取关键帧失败: {e}")def _get_generated_frames(self, output_dir: str, pattern: str) -> List[str]:"""获取生成的关键帧文件列表"""frames = []pattern_re = pattern.replace('%04d', r'(\d{4})').replace('%d', r'(\d+)')for file in os.listdir(output_dir):if re.match(pattern_re.replace('.jpg', r'\.jpg'), file):frames.append(os.path.join(output_dir, file))return sorted(frames)def get_keyframe_timestamps(self, video_path: str) -> List[float]:"""获取关键帧时间戳(不提取图片):param video_path: 视频文件路径:return: 关键帧时间戳列表(秒)"""cmd = [self.ffmpeg_path,'-i', video_path,'-vf', "select=eq(pict_type\\,I)",'-f', 'null','-']try:result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)if result.returncode != 0:raise Exception(f"FFmpeg 错误: {result.stderr}")# 从 stderr 中解析时间戳timestamps = []lines = result.stderr.split('\n')for line in lines:if 'pts_time:' in line:# 解析时间戳: pts_time:1.234match = re.search(r'pts_time:(\d+\.\d+)', line)if match:timestamps.append(float(match.group(1)))return timestampsexcept Exception as e:raise Exception(f"获取关键帧时间戳失败: {e}")def extract_keyframes_at_timestamps(self,video_path: str,timestamps: List[float],output_dir: str = "./keyframes",output_pattern: str = "frame_%04d.jpg",quality: int = 2) -> List[str]:"""在指定时间戳提取帧:param video_path: 视频文件路径:param timestamps: 时间戳列表(秒):param output_dir: 输出目录:param output_pattern: 输出文件名模式:param quality: 输出质量:return: 提取的帧文件列表"""if not timestamps:return []os.makedirs(output_dir, exist_ok=True)frames = []for i, timestamp in enumerate(timestamps):output_file = os.path.join(output_dir, output_pattern.replace('%04d', f"{i:04d}"))cmd = [self.ffmpeg_path,'-i', video_path,'-ss', str(timestamp),'-vframes', '1','-q:v', str(quality),output_file,'-y']try:result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)if result.returncode == 0:frames.append(output_file)else:print(f"在时间戳 {timestamp} 提取失败: {result.stderr}")except Exception as e:print(f"在时间戳 {timestamp} 提取出错: {e}")return framesdef get_video_info(self, video_path: str) -> Dict:"""获取视频信息:param video_path: 视频文件路径:return: 视频信息字典"""cmd = [self.ffprobe_path,'-v', 'quiet','-print_format', 'json','-show_format','-show_streams',video_path]try:result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)if result.returncode == 0:return json.loads(result.stdout)else:raise Exception("无法获取视频信息")except Exception as e:raise Exception(f"获取视频信息失败: {e}")def batch_extract_keyframes(self,input_dir: str,output_base_dir: str = "./keyframes_output",video_extensions: List[str] = ['.mp4', '.avi', '.mov', '.mkv', '.flv'],**kwargs) -> Dict[str, List[str]]:"""批量提取多个视频的关键帧:param input_dir: 输入目录:param output_base_dir: 输出基础目录:param video_extensions: 视频文件扩展名:return: 每个视频的提取结果字典"""results = {}for ext in video_extensions:for video_file in Path(input_dir).glob(f"*{ext}"):try:video_name = video_file.stemoutput_dir = os.path.join(output_base_dir, video_name)frames = self.extract_keyframes(str(video_file),output_dir=output_dir,**kwargs)results[video_file.name] = {'success': True,'frame_count': len(frames),'output_dir': output_dir}print(f"成功提取 {video_file.name}: {len(frames)} 个关键帧")except Exception as e:results[video_file.name] = {'success': False,'error': str(e)}print(f"提取 {video_file.name} 失败: {e}")return results
part3_extract_audio.py
import json
import shutil
import subprocess
from pathlib import Path
from typing import Optional, List, Dict, Tuple
import osclass AudioExtractor:def __init__(self, ffmpeg_path: str = "ffmpeg", ffprobe_path: str = "ffprobe"):self.ffmpeg_path = ffmpeg_pathself.ffprobe_path = ffprobe_pathself._check_ffmpeg_available()def _check_ffmpeg_available(self):"""检查FFmpeg是否可用"""if shutil.which(self.ffmpeg_path) is None:raise Exception(f"FFmpeg未找到,请先安装FFmpeg")if shutil.which(self.ffprobe_path) is None:raise Exception(f"FFprobe未找到,请先安装FFmpeg")def _run_command(self, cmd: List[str], timeout: int = 300) -> Tuple[bool, str]:"""运行FFmpeg命令,包含详细的调试信息"""try:print(f"🚀 开始执行命令...")print(f"📝 命令: {' '.join(cmd)}")# 检查FFmpeg是否存在if not shutil.which(self.ffmpeg_path):print(f"❌ FFmpeg未找到: {self.ffmpeg_path}")return False, "FFmpeg not found"# 执行命令result = subprocess.run(cmd,capture_output=True,timeout=timeout,encoding='utf-8',errors='ignore')# 详细的输出信息print(f"✅ 命令执行完成")print(f"📊 返回码: {result.returncode}")if result.stdout:print("📄 标准输出内容:")print(result.stdout)else:print("📄 标准输出: 空")if result.stderr:print("⚠️ 错误输出内容:")print(result.stderr)else:print("⚠️ 错误输出: 空")# 检查输出文件是否创建output_file = Nonefor arg in cmd:if not arg.startswith('-') and arg != self.ffmpeg_path and os.path.exists(arg):output_file = argbreakif output_file and os.path.exists(output_file):file_size = os.path.getsize(output_file)print(f"📁 输出文件: {output_file} ({file_size} 字节)")else:print("📁 输出文件: 未找到")return result.returncode == 0, result.stderrexcept subprocess.TimeoutExpired:print("⏰ 命令执行超时")return False, "命令执行超时"except Exception as e:print(f"❌ 命令执行错误: {e}")import tracebacktraceback.print_exc()return False, str(e)def get_video_info(self, video_path: str) -> Optional[Dict]:"""获取视频详细信息"""cmd = [self.ffprobe_path,'-v', 'quiet','-print_format', 'json','-show_format','-show_streams',video_path]try:result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)if result.returncode == 0:return json.loads(result.stdout)return Noneexcept:return Nonedef get_duration(self, video_path: str) -> Optional[float]:"""获取视频时长(秒)"""info = self.get_video_info(video_path)if info and 'format' in info:return float(info['format'].get('duration', 0))return Nonedef get_resolution(self, video_path: str) -> Optional[Tuple[int, int]]:"""获取视频分辨率"""info = self.get_video_info(video_path)if info and 'streams' in info:for stream in info['streams']:if stream['codec_type'] == 'video':width = int(stream.get('width', 0))height = int(stream.get('height', 0))return width, heightreturn Nonedef get_bitrate(self, video_path: str) -> Optional[int]:"""获取视频码率"""info = self.get_video_info(video_path)if info and 'format' in info:return int(info['format'].get('bit_rate', 0))return Nonedef convert_format(self, input_path: str, output_path: str,output_format: str = "mp4", crf: int = 23) -> bool:"""转换视频格式"""cmd = [self.ffmpeg_path,'-i', input_path,'-c:v', 'libx264','-crf', str(crf),'-preset', 'medium','-c:a', 'aac','-b:a', '128k',output_path,'-y']success, error = self._run_command(cmd)if not success:print(f"格式转换失败: {error}")return successdef compress_video(self, input_path: str, output_path: str,target_size_mb: int = 10) -> bool:"""压缩视频到目标大小"""duration = self.get_duration(input_path)if not duration:return False# 计算目标码率 (bitrate = size * 8 / duration)target_bitrate = int((target_size_mb * 8 * 1024 * 1024) / duration)cmd = [self.ffmpeg_path,'-i', input_path,'-c:v', 'libx264','-b:v', f'{target_bitrate}','-maxrate', f'{target_bitrate}','-bufsize', f'{target_bitrate * 2}','-c:a', 'aac','-b:a', '64k',output_path,'-y']success, error = self._run_command(cmd)return successdef cut_video(self, input_path: str, output_path: str,start_time: str, end_time: str) -> bool:"""裁剪视频片段"""cmd = [self.ffmpeg_path,'-i', input_path,'-ss', start_time, # 开始时间,格式: 00:00:00'-to', end_time, # 结束时间,格式: 00:00:00'-c', 'copy', # 直接复制流,不重新编码output_path,'-y']success, error = self._run_command(cmd)return successdef extract_audio(self, input_path: str, output_path: str) -> bool:"""提取音频(先检查音频流)"""# 先获取音频流信息audio_info = self.get_audio_stream_info(input_path)if not audio_info:print("无法获取音频流信息")return Falseprint(f"音频流信息: {audio_info}")cmd = [self.ffmpeg_path, '-i', input_path, '-vn']# 根据音频流格式选择合适的编码器if audio_info.get('codec_name') == 'mp3':# 如果是MP3,尝试直接复制cmd.extend(['-acodec', 'copy'])else:# 其他格式转换为MP3cmd.extend(['-acodec', 'libmp3lame', '-q:a', '2'])cmd.extend(['-map', '0:a:0', output_path, '-y']) # 只选择第一个音频流success, error = self._run_command(cmd)return successdef get_audio_stream_info(self, input_path: str) -> Optional[Dict]:"""获取音频流信息"""cmd = [self.ffprobe_path, # 需要ffprobe'-v', 'quiet','-print_format', 'json','-show_streams','-select_streams', 'a', # 只选择音频流input_path]try:result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)if result.returncode == 0:import jsondata = json.loads(result.stdout)if data.get('streams'):return data['streams'][0] # 返回第一个音频流信息return Noneexcept:return Nonedef extract_video(self, input_path: str, output_path: str) -> bool:"""提取视频(去掉音频)"""cmd = [self.ffmpeg_path,'-i', input_path,'-an', # 不要音频流'-vcodec', 'copy', # 直接复制视频编码output_path,'-y']success, error = self._run_command(cmd)return successdef change_resolution(self, input_path: str, output_path: str,width: int, height: int) -> bool:"""改变视频分辨率"""cmd = [self.ffmpeg_path,'-i', input_path,'-vf', f'scale={width}:{height}','-c:a', 'copy',output_path,'-y']success, error = self._run_command(cmd)return successdef change_framerate(self, input_path: str, output_path: str,framerate: int) -> bool:"""改变视频帧率"""cmd = [self.ffmpeg_path,'-i', input_path,'-r', str(framerate),'-c:a', 'copy',output_path,'-y']success, error = self._run_command(cmd)return successdef add_watermark(self, input_path: str, output_path: str,watermark_path: str, position: str = "10:10") -> bool:"""添加图片水印"""cmd = [self.ffmpeg_path,'-i', input_path,'-i', watermark_path,'-filter_complex', f'overlay={position}','-codec:a', 'copy',output_path,'-y']success, error = self._run_command(cmd)return successdef extract_thumbnail(self, input_path: str, output_path: str,time: str = "00:00:01") -> bool:"""提取视频缩略图"""cmd = [self.ffmpeg_path,'-i', input_path,'-ss', time,'-vframes', '1','-q:v', '2',output_path,'-y']success, error = self._run_command(cmd)return successdef generate_preview(self, input_path: str, output_path: str,cols: int = 3, rows: int = 3) -> bool:"""生成视频预览图(多张缩略图拼贴)"""duration = self.get_duration(input_path)if not duration:return False# 生成缩略图命令cmd = [self.ffmpeg_path,'-i', input_path,'-vf', f'fps=1/{duration / (cols * rows)},scale=320:-1,tile={cols}x{rows}','-frames', '1',output_path,'-y']success, error = self._run_command(cmd)return successdef convert_audio(self, input_path: str, output_path: str,format: str = "mp3", bitrate: str = "128k") -> bool:"""转换音频格式"""cmd = [self.ffmpeg_path,'-i', input_path,'-codec:a', 'libmp3lame' if format == 'mp3' else 'aac','-b:a', bitrate,output_path,'-y']success, error = self._run_command(cmd)return successdef change_audio_volume(self, input_path: str, output_path: str,volume: float = 1.0) -> bool:"""调整音频音量"""cmd = [self.ffmpeg_path,'-i', input_path,'-af', f'volume={volume}','-codec:v', 'copy' if input_path.endswith(('.mp4', '.mov', '.avi')) else '',output_path,'-y']success, error = self._run_command(cmd)return successdef batch_convert(self, input_dir: str, output_dir: str,output_format: str = "mp4") -> Dict[str, bool]:"""批量转换目录中的所有视频"""results = {}input_path = Path(input_dir)output_path = Path(output_dir)output_path.mkdir(exist_ok=True)video_extensions = ['.mp4', '.avi', '.mov', '.mkv', '.flv', '.wmv']for video_file in input_path.iterdir():if video_file.suffix.lower() in video_extensions:output_file = output_path / f"{video_file.stem}.{output_format}"success = self.convert_format(str(video_file), str(output_file))results[video_file.name] = successreturn resultsdef batch_extract_thumbnails(self, input_dir: str, output_dir: str) -> Dict[str, bool]:"""批量提取缩略图"""results = {}input_path = Path(input_dir)output_path = Path(output_dir)output_path.mkdir(exist_ok=True)video_extensions = ['.mp4', '.avi', '.mov', '.mkv']for video_file in input_path.iterdir():if video_file.suffix.lower() in video_extensions:output_file = output_path / f"{video_file.stem}_thumbnail.jpg"success = self.extract_thumbnail(str(video_file), str(output_file))results[video_file.name] = successreturn results
part4_ocr_frames.py
import os
from datetime import datetime
from typing import List, Dict, Any
from paddleocr import PaddleOCR
import cv2
import numpy as npdef __init__(output_file: str = "pic_content.txt"):# 创建文件并写入文件头(如果文件不存在)if not os.path.exists(output_file):_write_file_header(output_file)return output_filedef _write_file_header(output_file):"""写入文件头"""with open(output_file, 'w', encoding='utf-8') as f:f.write("PaddleOCR 图片内容解析结果\n")f.write("=" * 60 + "\n")f.write(f"创建时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")f.write("=" * 60 + "\n\n")def process_and_append_result(output_file, result: List[Dict], image_path: str = None):try:# 提取高置信度文本high_confidence_texts = _extract_high_confidence_texts(result)if not high_confidence_texts:print("⚠️ 未找到高置信度文本,跳过保存")return False# 准备写入内容content_to_write = _format_content(result, high_confidence_texts, image_path)# 追加到文件with open(output_file, 'a', encoding='utf-8') as f:f.write(content_to_write)print(f"✅ 图片内容已追加到 {output_file}")return Trueexcept Exception as e:print(f"❌ 处理OCR结果时出错: {e}")return Falsedef _extract_high_confidence_texts(result: List[Dict], confidence_threshold: float = 0.7) -> List[str]:high_confidence_texts = []for page_result in result:rec_texts = page_result.get('rec_texts', [])rec_scores = page_result.get('rec_scores', [])for text, score in zip(rec_texts, rec_scores):if score >= confidence_threshold:high_confidence_texts.append(text)return high_confidence_textsdef _format_content(result: List[Dict], high_confidence_texts: List[str], image_path: str = None) -> str:content = []# 添加分隔符和图片信息content.append(f"\n{'=' * 60}")if image_path:content.append(f"文件: {os.path.basename(image_path)}")content.append(f"处理时间: {datetime.now().strftime('%H:%M:%S')}")content.append(f"识别到 {len(high_confidence_texts)} 个高置信度文本")content.append("-" * 40)# 添加识别结果if high_confidence_texts:content.append("【识别内容】")for i, text in enumerate(high_confidence_texts, 1):content.append(f"{i:2d}. {text}")else:content.append("⚠️ 未识别到高置信度文本")# 添加统计信息total_texts = sum(len(page.get('rec_texts', [])) for page in result)avg_confidence = _calculate_average_confidence(result)content.append("-" * 40)content.append(f"统计: 总文本{total_texts}个, 平均置信度{avg_confidence:.3f}")content.append("\n") # 空行分隔return "\n".join(content)def _calculate_average_confidence(result: List[Dict]) -> float:"""计算平均置信度"""total_score = 0total_count = 0for page_result in result:rec_scores = page_result.get('rec_scores', [])if rec_scores:total_score += sum(rec_scores)total_count += len(rec_scores)return total_score / total_count if total_count > 0 else 0def get_summary(self) -> Dict[str, Any]:"""获取处理统计信息"""return {"processed_count": processed_count,"output_file": output_file,"file_size": os.path.getsize(output_file) if os.path.exists(output_file) else 0}def imread_chinese(path):"""专门处理中文路径的图像读取函数"""try:# 使用numpy和OpenCV的原始方法读取img = cv2.imdecode(np.fromfile(path, dtype=np.uint8), cv2.IMREAD_COLOR)if img is None:raise IOError(f"无法读取图像: {path}")return imgexcept Exception as e:print(f"读取图像失败: {str(e)}")raise# 简单的单图片处理函数
def process_single_image(result, image_path=None):"""快速处理单张图片的便捷函数"""processor = OCRResultProcessor("pic_content.txt")ocr = PaddleOCR(use_textline_orientation=True,lang="ch",device='gpu')result = ocr.predict(image_path)success = processor.process_and_append_result(result, image_path)return success# 使用
def analysis_keyframes(output_path, folder_name=None):output_file = __init__(output_path)if folder_name is None:print('The output path is None')return# 初始化处理器# 初始化OCR(只需要一次)ocr = PaddleOCR(use_textline_orientation=True,lang="ch",device='gpu')# 遍历文件夹folder_path = f'results/images/{folder_name}/'all_items = os.listdir(folder_path)image_paths = []for item in all_items:item_path = os.path.join(folder_path, item)if os.path.isfile(item_path):image_paths.append(folder_path + item)# 循环处理所有图片for image_path in image_paths:if not os.path.exists(image_path):print(f"⚠️ 图片不存在: {image_path}")continuetry:# 进行OCR识别print(f"🔍 正在处理: {image_path}")# 图片路径有中文或特殊字符容易找不到图片img = imread_chinese(image_path)result = ocr.predict(img)# 处理并追加结果process_and_append_result(output_file, result, image_path)except Exception as e:print(f"❌ 处理图片 {image_path} 时出错: {e}")print("图片处理完成...")def process_imgs():file_name = 'test'file_path = f'results/text_content/{file_name}.txt'analysis_keyframes(file_path, file_name)
part5_transcribe_audio.py
import time
import warningsimport numpy as np
import opencc
import torch
import whisper
from pydub import AudioSegmentwarnings.filterwarnings("ignore")def optimize_gpu_memory():"""优化 GPU 内存使用"""torch.cuda.empty_cache()if torch.cuda.is_available():# 设置更高效的内存分配策略torch.backends.cudnn.benchmark = Truetorch.backends.cuda.matmul.allow_tf32 = Truetorch.backends.cudnn.allow_tf32 = Truedef get_gpu_info():"""获取 GPU 信息"""if torch.cuda.is_available():gpu_name = torch.cuda.get_device_name(0)total_memory = torch.cuda.get_device_properties(0).total_memory / 1024 ** 3allocated = torch.cuda.memory_allocated() / 1024 ** 3cached = torch.cuda.memory_reserved() / 1024 ** 3free = total_memory - allocatedprint(f"🎯 GPU: {gpu_name}")print(f"💾 总内存: {total_memory:.1f} GB")print(f"📊 已使用: {allocated:.1f} GB")print(f"📈 缓存: {cached:.1f} GB")print(f"🆓 可用: {free:.1f} GB")return freereturn 0def load_audio_with_pydub(audio_path, sr=16000):"""使用 pydub 加载音频(比 soundfile 更稳定)返回 16-bit PCM 的 float32 数组 [T,]"""try:# 使用 pydub 加载(支持 mp3/wav 等格式)audio = AudioSegment.from_file(audio_path)# 转为单声道、16kHzaudio = audio.set_channels(1)audio = audio.set_frame_rate(sr)# 转为 numpy 数组samples = np.array(audio.get_array_of_samples(), dtype=np.float32)# 归一化到 [-1, 1]samples = samples / 32768.0print(f"✅ 成功加载音频: {audio_path}, 长度: {len(samples) / sr:.2f}s")return samplesexcept Exception as e:print(f"❌ 无法加载音频: {str(e)}")raisedef transcribe_audio_to_txt(audio_path, output_path, model_size="medium"):print("=" * 60)print("🤖 Whisper GPU 加速转录")print("=" * 60)# 优化 GPU 内存optimize_gpu_memory()free_memory = get_gpu_info()# 根据可用内存选择合适的模型if free_memory < 2.0:model_size = "base"print("⚠️ 内存较低,自动切换到 base 模型")elif free_memory < 4.0:model_size = "small"print("ℹ️ 内存适中,使用 small 模型")elif free_memory < 8.0:model_size = "medium"print("✅ 内存充足,使用 medium 模型")else:model_size = "large"print("✅ 使用 large 模型")try:# 加载模型到 GPUprint(f"\n📦 正在加载 {model_size} 模型到 GPU...")start_time = time.time()model = whisper.load_model(model_size).cuda()load_time = time.time() - start_timeprint(f"✅ 模型加载完成,耗时: {load_time:.2f} 秒")# 显示模型内存占用model_memory = sum(p.numel() * p.element_size() for p in model.parameters()) / 1024 ** 3print(f"🧠 模型内存占用: {model_memory:.2f} GB")# ✅ 关键修复:使用 pydub 加载音频print(f"\n🎧 正在加载音频: {audio_path}")audio_np = load_audio_with_pydub(audio_path) # 返回 numpy 数组# ✅ 传入 numpy 数组,而不是文件路径print(f"\n📝 开始转录...")start_time = time.time()initial_prompt = "请使用简体中文进行转录。以下是普通话内容,请输出标准简体中文。"result = model.transcribe(audio_np, # ✅ 传入 numpy 数组,不是字符串路径fp16=True, # GPU 必须开启,大幅提升速度language="zh", # 指定中文,提高准确度verbose=False, # 显示进度temperature=0.0, # 确定性输出best_of=5, # 平衡质量和速度beam_size=3, # 适合 6GB 内存的设置patience=1.0, # 提前停止,节省时间task="transcribe", # 明确指定是转录,不是翻译condition_on_previous_text=False, # 不依赖前文,减少错误累积initial_prompt=initial_prompt # 引导模型使用简体中文)transcribe_time = time.time() - start_timeprint(f"✅ 转录完成,耗时: {transcribe_time:.2f} 秒")# 性能分析if result["segments"]:audio_duration = result["segments"][-1]["end"]speed_ratio = transcribe_time / audio_durationrealtime_factor = audio_duration / transcribe_timeprint(f"\n📊 性能报告:")print(f" 音频时长: {audio_duration:.1f} 秒")print(f" 转录时间: {transcribe_time:.1f} 秒")print(f" 实时系数: {realtime_factor:.2f}x (比实时快 {realtime_factor:.1f} 倍)")print(f" 处理速度: {speed_ratio:.3f} 秒/音频秒")# 输出结果print(f"\n📝 转录结果 ({len(result['text'])} 字符):")print("=" * 50)print(result["text"])# 简繁转换converter = opencc.OpenCC('t2s')simplified_result = converter.convert(result['text'])print("\n简繁转换后:")print("=" * 50)print(simplified_result)with open(output_path, 'w', encoding='utf-8') as file:file.write(simplified_result)file.close()# 显示分段信息(可选)print(f"\n📋 分段信息 ({len(result['segments'])} 段):")for i, segment in enumerate(result["segments"][:5]): # 只显示前5段print(f"[{segment['start']:6.1f}s - {segment['end']:6.1f}s] {segment['text'][:50]}...")if len(result["segments"]) > 5:print(f"... 还有 {len(result['segments']) - 5} 段")return resultexcept torch.cuda.OutOfMemoryError:print("❌ GPU 内存不足!尝试以下方案:")print(" 1. 使用更小的模型: base 或 small")print(" 2. 关闭其他占用 GPU 的程序")print(" 3. 重启 Python 内核释放内存")return Noneexcept Exception as e:print(f"❌ 发生错误: {e}")import tracebacktraceback.print_exc()return None
part6_summarize_content.py
import os
import re
from datetime import datetimeimport ollamaclass MultiTextAnalyzer:def __init__(self, model: str = "qwen3:8b"):self.model = modelself.client = ollamadef read_text_file(self, file_path: str) -> str:"""读取文本文件内容,自动处理编码"""try:# 尝试UTF-8编码with open(file_path, 'r', encoding='utf-8') as f:return f.read()except UnicodeDecodeError:# 尝试GBK编码try:with open(file_path, 'r', encoding='gbk') as f:return f.read()except Exception as e:raise Exception(f"无法读取文件 {file_path}: {e}")except Exception as e:raise Exception(f"读取文件失败 {file_path}: {e}")def create_comprehensive_prompt(self, frame_text: str, audio_text: str) -> str:"""创建综合理解提示词"""prompt = f"""两个文本分别为通过视频提取出的关键帧上的文字以及音频转录的文字。在综合理解后,整理保存为一个完整的markdown格式文档。请仔细分析以下两个文本内容,进行去重、补全、纠错和结构化整理:【关键帧文本内容】
{frame_text}【音频转录文本内容】
{audio_text}请按照以下要求生成Markdown文档:
1. 对两个文本进行深度融合,去除重复内容
2. 纠正明显的识别错误和错别字
3. 补充不完整的句子和概念
4. 按照逻辑主题进行结构化组织
5. 使用恰当的Markdown格式(标题、列表、代码块等)
6. 保持技术准确性和内容完整性生成一个专业、清晰、易于阅读的技术文档。"""return promptdef analyze_and_integrate(self, frame_text_path: str, audio_text_path: str, output_path: str = None) -> str:"""综合分析两个文本并生成Markdown文档"""print("开始读取文本文件...")# 读取两个文本文件try:frame_text = self.read_text_file(frame_text_path)audio_text = self.read_text_file(audio_text_path)print(f"✓ 关键帧文本: {len(frame_text)} 字符")print(f"✓ 音频转录文本: {len(audio_text)} 字符")except Exception as e:print(f"✗ 文件读取失败: {e}")return None# 创建提示词prompt = self.create_comprehensive_prompt(frame_text, audio_text)print("正在调用Qwen3模型进行综合分析...")print("=" * 60)# 使用流式调用messages = [{"role": "user", "content": prompt}]full_response = ""print("生成结果:")print("=" * 60)try:stream = self.client.chat(model=self.model,messages=messages,stream=True,options={"temperature": 0.3, # 较低温度以保证准确性"top_p": 0.9,"num_ctx": 4096 # 上下文长度})for chunk in stream:content = chunk['message']['content']print(content, end='', flush=True)full_response += contentexcept Exception as e:print(f"✗ 模型调用失败: {e}")return None# 保存结果if output_path:self.save_markdown(full_response, output_path)else:# 默认保存路径timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")output_path = f"integrated_document_{timestamp}.md"self.save_markdown(full_response, output_path)return full_responsedef save_markdown(self, content: str, output_path: str):"""保存Markdown文档"""try:# 去除模型思考过程content = re.sub(r'<think>.*?</think>', '', content, flags=re.DOTALL)# 确保输出目录存在os.makedirs(os.path.dirname(output_path), exist_ok=True)with open(output_path, 'w', encoding='utf-8') as f:f.write(content)print(f"\n\n✓ Markdown文档已保存: {output_path}")print(f"✓ 文件大小: {len(content)} 字符")except Exception as e:print(f"✗ 保存文件失败: {e}")def batch_process(self, text_pairs: list, output_dir: str = "output_docs"):"""批量处理多对文本text_pairs: [(frame_path1, audio_path1), (frame_path2, audio_path2), ...]"""results = []for i, (frame_path, audio_path) in enumerate(text_pairs):print(f"\n{'=' * 50}")print(f"处理第 {i + 1} 对文本:")print(f"关键帧: {frame_path}")print(f"音频: {audio_path}")print(f"{'=' * 50}")# 生成输出文件名output_filename = f"integrated_doc_{i + 1}_{datetime.now().strftime('%H%M%S')}.md"output_path = os.path.join(output_dir, output_filename)result = self.analyze_and_integrate(frame_path, audio_path, output_path)if result:results.append({'frame_path': frame_path,'audio_path': audio_path,'output_path': output_path,'content': result})return results# 使用示例
def main():# 初始化分析器,使用qwen2模型analyzer = MultiTextAnalyzer(model="qwen3:8b")title = '给大家普及一下过Java面试需要达到的强度_哔哩哔哩_bilibili'# 定义文本文件路径frame_text_path = f"results/text_content/{title}_image_content.txt" # 替换为您的实际路径audio_text_path = f"results/text_content/{title}_audio_content.txt" # 替换为您的实际路径# 指定输出路径(可选)output_path = f"results/summary/{title}_summary.md"# 执行分析整合result = analyzer.analyze_and_integrate(frame_text_path=frame_text_path,audio_text_path=audio_text_path,output_path=output_path)if result:print("\n✅ 文档整合完成!")else:print("\n❌ 处理失败")def batch_example():"""批量处理示例"""analyzer = MultiTextAnalyzer(model="qwen3:8b")# 多对文本处理text_pairs = [("video1_frames.txt", "video1_audio.txt"),("video2_frames.txt", "video2_audio.txt"),("video3_frames.txt", "video3_audio.txt")]results = analyzer.batch_process(text_pairs, "batch_output")print(f"\n批量处理完成,共生成 {len(results)} 个文档")
记录
① CUDA 环境冲突
- 问题描述
import paddle # 先
from whisper_utils import transcribe_audio_to_txt # 后
报错:
OSError: [WinError 127] 找不到指定的程序。 Error loading "D:\ProgramData\anaconda3\envs\whisper\lib\site-packages\torch\lib\cufftw64_11.dll" or one of its dependencies.from whisper_utils import transcribe_audio_to_txt # 先
import paddle # 后
报错:
OSError: [WinError 127] 找不到指定的程序。 Error loading "D:\ProgramData\anaconda3\envs\whisper\lib\site-packages\paddle..\nvidia\cublas\bin\cublas64_12.dll" or one of its dependencies.
- 问题原因
- PyTorch 需要 CUDA 11.x 的库(如 cufftw64_11.dll)
- PaddlePaddle 需要 CUDA 12.x 的库(如 cublas64_12.dll)
- 两者同时导入时,CUDA运行时库冲突
- 解决方案
- Ⅰ、统一 paddle 和 torch 的 cuda 版本
- Paddle 降级到 12.0 后原本的 PaddleOCR 方法可能失效了
- paddle 和 torch 的 gpu cuda 版本难以统一
- 如果还有更多依赖所需 cuda 版本不一致,将更难统一
- Ⅱ、代码层面隔离环境变量路径,先后调用(不成功)
- Ⅲ、包装方法后使用子线程进行启动(√ 采用)
- Ⅳ、程序拆开,手动按顺序执行(流程不完整)
- Ⅴ、以主要使用 GPU 的代码保留使用 GPU 版本,其他的转换为 CPU 版本(浪费资源)
- Ⅰ、统一 paddle 和 torch 的 cuda 版本