ubuntu部署whisper+speaker_large+qwen【gradio界面版】
上文已经完成ubutun部署whisper+speaker+qwen3-1.7B,现采用gradio实现界面,本意是想通过tkinter创建ui界面,之后通过pyinstaller将其打包为可执行文件,但因其一直报错,未解决,则暂时采用gradio实现。
python struct.error: 'i' format requires -2147483648 <= number <= 2147483647
最终的实现界面如下图所示:

安装依赖:
pip install gradio torch transformers pyannote.audio huggingface_hub
全部代码如下:
import os
import json
import gradio as gr
import subprocess
import torch
import huggingface_hub
from pyannote.audio import Pipeline
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline
from transformers import AutoModelForCausalLM, AutoTokenizer
import warnings
import tempfile
import shutil# 屏蔽不必要的警告
warnings.filterwarnings("ignore")# 设置镜像
os.environ["HF_ENDPOINT"] = "https://hf-mirror.com"class AudioProcessorGradio:def __init__(self):self.is_running = Falseself.current_process = Nonedef extract_and_filter_text(self, json_data):"""从对齐后的JSON数据中提取文本,过滤无关平台话术"""irrelevant_keywords = ["点赞", "订阅", "转发", "打赏", "小红书", "抖音","YoYo Television Series Exclusive", "孔优优独播剧场","中文字幕志愿者", "李宗盛"]all_text = []for item in json_data:text = item.get("text", "").strip()if text and not any(keyword in text for keyword in irrelevant_keywords):all_text.append(text)return " ".join(all_text)def process_document(self, aligned_json_path, prompt, combined_text):"""加载对齐结果,调用Qwen模型完成文档整理并保存"""if not os.path.exists(aligned_json_path):return f"❌ 对齐结果文件不存在:{aligned_json_path}"with open(aligned_json_path, "r", encoding="utf-8") as f:json_data = json.load(f)combined_text = self.extract_and_filter_text(json_data)if not combined_text:return "⚠️ 过滤后无有效文本,跳过文档整理步骤"yield "🔄 加载文档整理模型(Qwen3-1.7B)..."model_name = "Qwen/Qwen3-1.7B"try:tokenizer = AutoTokenizer.from_pretrained(model_name)model = AutoModelForCausalLM.from_pretrained(model_name,torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,device_map="auto")yield "✅ 文档整理模型加载成功"except Exception as e:return f"❌ 模型加载失败:{str(e)}"# 处理promptprompt = prompt.replace("{combined_text}", combined_text)messages = [{"role": "user", "content": prompt}]text = tokenizer.apply_chat_template(messages,tokenize=False,add_generation_prompt=True)model_inputs = tokenizer([text], return_tensors="pt").to(model.device)generated_ids = model.generate(**model_inputs,max_new_tokens=512,temperature=0.3,do_sample=True)generated_ids = generated_ids[0][len(model_inputs.input_ids[0]):]result = tokenizer.decode(generated_ids, skip_special_tokens=True).strip()output_content = f"=== 文档整理结果 ===\n{result}"yield output_contentdef combine_results(self, rttm_file_path, asr_result, min_segment_duration=0.5):"""优化匹配逻辑:去重+时间戳修正+重叠最优匹配"""# 1. 读取并过滤RTTM片段(<0.5秒的噪声片段)rttm_data = []with open(rttm_file_path, 'r', encoding='utf-8') as rttm_file:for line in rttm_file.readlines():parts = line.strip().split()if len(parts) < 9:continuestart = float(parts[3])duration = float(parts[4])end = start + durationspeaker = parts[7]if duration >= min_segment_duration:rttm_data.append({"start": start,"end": end,"speaker": speaker,"duration": duration})if not rttm_data:return []# 2. 读取并过滤ASR文本块(去重+时间戳修正)asr_chunks = []seen_text = set()for chunk in asr_result.get("chunks", []):if not chunk.get("timestamp") or None in chunk["timestamp"]:continuechunk_start, chunk_end = chunk["timestamp"]# 修正时间戳顺序if chunk_start > chunk_end:chunk_start, chunk_end = chunk_end, chunk_startduration = chunk_end - chunk_startif duration < 0.3:continuetext = chunk["text"].strip()# 文本去重(忽略空格和标点差异)clean_text = text.replace(" ", "").replace(",", "").replace("。", "").replace("!", "")if clean_text not in seen_text and len(clean_text) > 1:seen_text.add(clean_text)asr_chunks.append({"start": chunk_start,"end": chunk_end,"text": text,"duration": duration})if not asr_chunks:return []# 3. 重叠度优先匹配(仅取最优说话人)combined_results = []for asr in asr_chunks:asr_start, asr_end = asr["start"], asr["end"]matched_segments = []# 计算与所有说话人片段的重叠度for seg in rttm_data:seg_start, seg_end = seg["start"], seg["end"]overlap_start = max(asr_start, seg_start)overlap_end = min(asr_end, seg_end)overlap_duration = max(0.0, overlap_end - overlap_start)overlap_ratio = overlap_duration / asr["duration"] if asr["duration"] > 0 else 0if overlap_ratio > 0.1:matched_segments.append({"speaker": seg["speaker"],"overlap_start": overlap_start,"overlap_end": overlap_end,"overlap_ratio": overlap_ratio})# 处理匹配结果if not matched_segments:combined_results.append({"start_time": asr_start,"end_time": asr_end,"speaker": "UNKNOWN","text": asr["text"]})else:# 按重叠度排序,仅取最高的matched_segments.sort(key=lambda x: x["overlap_ratio"], reverse=True)top_seg = matched_segments[0]combined_results.append({"start_time": top_seg["overlap_start"],"end_time": top_seg["overlap_end"],"speaker": top_seg["speaker"],"text": asr["text"]})# 4. 合并同一说话人连续片段(增强去重)if not combined_results:return []merged = [combined_results[0]]for curr in combined_results[1:]:last = merged[-1]if (curr["speaker"] == last["speaker"] and curr["start_time"] - last["end_time"] < 1.0):last["end_time"] = curr["end_time"]# 文本去重:避免重复添加相同内容if curr["text"] not in last["text"]:last["text"] += " " + curr["text"]else:merged.append(curr)return mergeddef convert_to_wav(self, input_path, output_dir):"""转换音频为16kHz单声道WAV(修复时间戳偏移)"""filename = os.path.splitext(os.path.basename(input_path))[0]wav_path = os.path.join(output_dir, f"{filename}.wav")yield f"🔄 转换音频:{os.path.basename(input_path)} -> {os.path.basename(wav_path)}"try:cmd = ["ffmpeg", "-y", "-i", input_path,"-ar", "16000", "-ac", "1", "-c:a", "pcm_s16le","-avoid_negative_ts", "make_zero",wav_path]result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)if result.returncode != 0:yield f"❌ ffmpeg错误:{result.stderr}"return Noneyield f"✅ 转换成功"return wav_pathexcept Exception as e:yield f"❌ 转换失败:{str(e)}"return Nonedef process_audio_gradio(self, audio_file_path, prompt_text):"""Gradio主处理函数"""if self.is_running:yield "⚠️ 已有处理任务正在进行中,请等待完成", Nonereturnself.is_running = Trueoutput_content = ""output_file_path = Nonedef add_output(text):nonlocal output_contentoutput_content += text + "\n"return output_content, Nonetry:# 创建临时工作目录with tempfile.TemporaryDirectory() as temp_dir:# 检查文件是否已经在临时目录中if not audio_file_path.startswith(temp_dir):# 文件不在临时目录中,需要复制audio_filename = os.path.basename(audio_file_path)new_audio_path = os.path.join(temp_dir, audio_filename)yield add_output(f"📁 复制音频文件到工作目录...")shutil.copy2(audio_file_path, new_audio_path)audio_path = new_audio_pathelse:# 文件已经在临时目录中audio_path = audio_file_pathyield add_output(f"📁 处理音频文件: {os.path.basename(audio_path)}")# 基础校验if not os.path.exists(audio_path):yield add_output(f"❌ 音频文件不存在:{audio_path}")return# 检查ffmpegtry:subprocess.run(["ffmpeg", "-version"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True)except (FileNotFoundError, subprocess.CalledProcessError):yield add_output("❌ 未找到ffmpeg或版本不兼容,请先安装ffmpeg")return# 转换音频yield add_output("🔄 开始音频格式转换...")conversion_results = []for result in self.convert_to_wav(audio_path, temp_dir):conversion_results.append(result)yield add_output(result)wav_path = Nonefor result in conversion_results:if isinstance(result, str) and result.endswith('.wav') and os.path.exists(result):wav_path = resultbreakif not wav_path:# 如果没有找到wav文件,尝试在临时目录中查找audio_filename = os.path.splitext(os.path.basename(audio_path))[0]possible_wav_path = os.path.join(temp_dir, f"{audio_filename}.wav")if os.path.exists(possible_wav_path):wav_path = possible_wav_pathyield add_output(f"✅ 使用现有WAV文件: {os.path.basename(wav_path)}")else:yield add_output("❌ 音频转换失败,未生成WAV文件")returnaudio_filename = os.path.splitext(os.path.basename(wav_path))[0]rttm_filename = os.path.join(temp_dir, f"{audio_filename}.rttm")aligned_json = os.path.join(temp_dir, f"{audio_filename}_combined_aligned.json")asr_json = os.path.join(temp_dir, f"{audio_filename}_asr_with_timestamps.json")output_txt = os.path.join(temp_dir, f"{audio_filename}_result.txt")# 1. 说话人分离token = "hf_TBTbupvBJJoh#############"try:yield add_output("🔄 登录HuggingFace...")huggingface_hub.login(token=token)yield add_output(f"✅ 登录成功")yield add_output("🔄 加载说话人分离模型...")diarization_pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1", token=token)yield add_output("✅ 说话人分离模型加载成功")# 执行分离yield add_output(f"🎧 处理音频:{audio_filename}")diarization_result = diarization_pipeline(wav_path)yield add_output("✅ 说话人分离完成!")# 生成RTTM(过滤<0.5秒片段)speaker_segments = diarization_result.speaker_diarizationwith open(rttm_filename, "w", encoding='utf-8') as rttm_file:for segment, _, speaker in speaker_segments.itertracks(yield_label=True):duration = segment.end - segment.startif duration >= 0.5:rttm_line = (f"SPEAKER {audio_filename} 1 {segment.start:.3f} "f"{duration:.3f} <NA> <NA> {speaker} <NA> <NA>\n")rttm_file.write(rttm_line)yield add_output(f"✅ RTTM文件保存完成")except Exception as e:yield add_output(f"❌ 说话人分离出错:{str(e)}")return# 2. 语音识别try:device = "cuda:0" if torch.cuda.is_available() else "cpu"torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32yield add_output(f"💻 识别设备:{device}")yield add_output("🔄 加载语音识别模型...")model_id = "openai/whisper-large-v3-turbo"model = AutoModelForSpeechSeq2Seq.from_pretrained(model_id, torch_dtype=torch_dtype, low_cpu_mem_usage=True, use_safetensors=True).to(device)processor = AutoProcessor.from_pretrained(model_id)yield add_output("✅ 语音识别模型加载成功")# 构建ASR流水线asr_pipeline = pipeline("automatic-speech-recognition",model=model,tokenizer=processor.tokenizer,feature_extractor=processor.feature_extractor,chunk_length_s=30,batch_size=8,torch_dtype=torch_dtype,device=device,return_timestamps=True,)# 执行识别yield add_output("🔄 正在语音识别...")asr_result = asr_pipeline(wav_path,generate_kwargs={"language": "chinese","task": "transcribe",})yield add_output("✅ 语音识别完成!")# ASR结果去重deduplicated_chunks = []seen_text = set()for chunk in asr_result.get("chunks", []):text = chunk["text"].strip()clean_text = text.replace(" ", "").replace(",", "").replace("。", "").replace("!", "")if clean_text not in seen_text and len(clean_text) > 1:seen_text.add(clean_text)deduplicated_chunks.append(chunk)asr_result["chunks"] = deduplicated_chunks# 保存ASR结果with open(asr_json, "w", encoding="utf-8") as f:json.dump(asr_result["chunks"], f, ensure_ascii=False, indent=2)yield add_output(f"✅ ASR结果保存完成")except Exception as e:yield add_output(f"❌ 语音识别出错:{str(e)}")return# 3. 时间对齐yield add_output("🔄 进行时间对齐...")combined = self.combine_results(rttm_filename, asr_result)if not combined:yield add_output("⚠️ 未生成有效匹配结果")return# 保存对齐结果with open(aligned_json, "w", encoding="utf-8") as f:json.dump(combined, f, ensure_ascii=False, indent=2)yield add_output(f"✅ 时间对齐完成")# 4. 文档整理yield add_output("🔄 开始文档整理...")with open(aligned_json, "r", encoding="utf-8") as f:json_data = json.load(f)combined_text = self.extract_and_filter_text(json_data)if not combined_text:yield add_output("⚠️ 过滤后无有效文本,跳过文档整理")return# 调用文档整理模型yield add_output("🔄 调用文档整理模型...")doc_results = list(self.process_document(aligned_json, prompt_text, combined_text))for result in doc_results:yield add_output(result)# 保存最终结果final_result = doc_results[-1] if doc_results else "无结果生成"with open(output_txt, "w", encoding="utf-8") as f:f.write(final_result)output_file_path = output_txtyield add_output("🎉 处理完成!")yield add_output(f"📁 结果文件已生成")# 返回最终结果yield output_content, output_file_pathexcept Exception as e:yield add_output(f"❌ 处理过程中出错:{str(e)}"), Nonefinally:self.is_running = Falsedef create_gradio_interface():"""创建Gradio界面"""processor = AudioProcessorGradio()with gr.Blocks(title="音频处理与文档整理工具", theme=gr.themes.Soft()) as demo:gr.Markdown("# 🎵 音频处理与文档整理工具")gr.Markdown("上传音频文件,自动进行说话人分离、语音识别和文档整理")with gr.Row():with gr.Column(scale=1):audio_input = gr.File(label="上传音频文件",file_types=[".mp3", ".wav", ".m4a", ".aac", ".flac"],type="filepath")prompt_input = gr.Textbox(label="Prompt 内容",lines=8,value="""1. 整理文档的核心主题;
2. 概括文档大意;文本内容:{combined_text}""",placeholder="请输入prompt内容,使用 {combined_text} 作为文本占位符")with gr.Row():process_btn = gr.Button("开始处理", variant="primary")stop_btn = gr.Button("停止处理", variant="stop")with gr.Column(scale=1):output_log = gr.Textbox(label="处理日志",lines=20,max_lines=50,interactive=False,show_copy_button=True)download_output = gr.File(label="下载结果",interactive=False)# 处理函数def process_audio(audio_file, prompt_text):if audio_file is None:yield "❌ 请先上传音频文件", Nonereturnfor log_content, file_path in processor.process_audio_gradio(audio_file, prompt_text):yield log_content, file_pathdef stop_processing():processor.is_running = Falsereturn "处理已停止", None# 事件绑定process_btn.click(fn=process_audio,inputs=[audio_input, prompt_input],outputs=[output_log, download_output])stop_btn.click(fn=stop_processing,outputs=[output_log, download_output])return demodef main():"""主函数"""demo = create_gradio_interface()# 获取本机IP地址,允许局域网访问try:import sockethostname = socket.gethostname()local_ip = socket.gethostbyname(hostname)print(f"🚀 服务启动中...")print(f"📱 本地访问: http://localhost:7860")print(f"🌐 局域网访问: http://{local_ip}:7860")except:print(f"🚀 服务启动中...")print(f"📱 本地访问: http://localhost:7860")print("⏹️ 按 Ctrl+C 停止服务")# 启动Gradio服务,允许局域网访问demo.launch(server_name="0.0.0.0", # 允许所有网络接口访问server_port=7860,share=False, # 不创建公共链接inbrowser=True # 自动打开浏览器)if __name__ == "__main__":main()
