使用 gemini api + 异步执行,批量翻译文档
1. 灵感闪现:打造一个 AI 批量翻译官
故事的开头,我雄心勃勃,想用 Gemini AI 打造一个能自动翻译整个项目文档的 Agent。目标很明确:
- 输入一个
docs
文件夹。 - 输出一个
docs-zh
文件夹,里面是所有翻译好的中文版 Markdown 文件。 - 图片等非文本文件,要能自动复制过去。
很快,我的第一个版本诞生了,一个勤勤恳恳的“老实人”脚本。
2. 初版诞生:勤恳的“蜗牛” 🐌
这个脚本逻辑清晰,简单粗暴:
- 用一个
for
循环遍历所有文件。 - 读一个英文文件。
- 向 Gemini API 发起一个翻译请求。
- 等啊等… 等到 API 返回结果。
- 把翻译好的内容写入新文件。
- 主动
sleep(1)
休息一秒,生怕 API 服务器太累。 - 回到第 2 步,处理下一个文件。
它还带上了 tqdm
进度条,看起来很专业。但一跑起来,我就发现了问题:40 多个文件,进度条像被施了定身法,慢得令人发指!
3. 灵魂拷问:瓶颈到底在哪?🤔
看着龟速爬行的进度条,我陷入了沉思。这点文件读写,CPU 根本不眨眼。那问题出在哪?很快,我锁定了三大元凶:
-
元凶一:【网络延迟】
每次 API 调用,数据都要在我的电脑和谷歌的服务器之间跑一个来回。这段旅程的时间,远比代码执行本身要长得多。在等待的每一毫秒,我的程序都在“摸鱼”。 -
元凶二:【主动“自残”】
那个time.sleep(1)
,本意是好的,为了防止请求太快被封。但它也意味着,每翻译一个文件,就要强制罚站 1 秒。40 个文件就是 40 秒纯粹的等待! -
元凶三(也是罪魁祸首):【串行执行】
我的脚本是个“一根筋”,它必须等上一个文件从请求->等待->返回->写入的全过程结束后,才开始处理下一个。这就像一个只有一个窗口的办事大厅,效率奇低。
4. 救星登场:asyncio
并发编程 🚀
问题的核心是等待,而不是计算。那么解决方案就是:在等待的时候,去做别的事!
这就是 Python 的异步编程 asyncio
登场的时候了。
它的理念就像一个优秀的厨师:
不会傻傻地盯着一锅汤等它煮沸(等待网络),而是在炖汤的同时,去洗菜、切菜(发起其他网络请求)。
我果断地对脚本进行了“引擎升级”:
- 切换到异步函数:把核心的翻译函数
def
改为async def
。 - 调用异步 API:使用
await model.generate_content_async()
,告诉程序“你先去请求,我不等你,我去忙别的”。 - 批量下达任务:用
asyncio.gather(*tasks)
把所有文件的翻译任务一次性“扔”进事件循环,让 Python 自己去调度。 - 智能限流:抛弃
time.sleep(1)
,改用asyncio.Semaphore(10)
,这就像给办事大厅开了 10 个窗口,同时处理 10 个任务,既保证了效率,又不会因为瞬间请求太多而挤爆服务器。
5. 最终章:从“蜗牛”到“火箭”的蜕变 🎉
新脚本一运行,效果立竿见影!
- 之前:进度条一格一格地挪,总耗时好几分钟。
- 现在:进度条“唰”地一下飞速前进,整个过程在几十秒内就完成了!
结论:
这次改造的核心,是认清了任务的本质。当你的程序大部分时间都在等待 I/O(网络请求、文件读写)时,并发就是那把开启效率之门的钥匙。通过 asyncio
,我们把程序从一个“单线程的笨蛋”,变成了一个“懂得多任务调度的天才”,实现了质的飞跃。
import os
import time
import shutil
import asyncio # 引入 asyncio 库
import google.generativeai as genai
from tqdm import tqdm
from tqdm.asyncio import tqdm_asyncio # 引入tqdm的异步版本
from dotenv import load_dotenv# --- 配置 ---
load_dotenv()
os.environ["http_proxy"] = "http://127.0.0.1:7899"
os.environ["https_proxy"] = "http://127.0.0.1:7899"GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")genai.configure(api_key=GOOGLE_API_KEY)
model = genai.GenerativeModel("gemini-2.5-flash") # 建议使用 latest 标签prompt_template = """
你是一名专业的IT技术文档翻译员。
请将下面的英文 Markdown 文本翻译成简体中文。翻译要求:
1. 完整保留原始的 Markdown 格式。
2. 技术术语需翻译得精准、专业且符合行业习惯 (例如: 'repository' -> '仓库', 'commit' -> '提交', 'pull request' -> '拉取请求')。
3. 不要翻译代码块 (```...```)、行内代码 (`...`)、URL链接和终端命令。
4. 确保翻译后的语言流畅、专业,适合开发者阅读。
5. 直接返回翻译后的文本内容,不要包含任何额外的解释或前言。英文原文如下:
---
{english_text}
---
"""# 将翻译函数改为异步函数
async def translate_text_async(text: str, model: genai.GenerativeModel) -> str:prompt = prompt_template.format(english_text=text)safety_settings = [{"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"},{"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_NONE"},{"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE"},{"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"},]# 使用异步版本的 generate_contentresponse = await model.generate_content_async(prompt, safety_settings=safety_settings)return response.text.strip()# 定义一个处理单个文件的异步任务
async def process_file_async(input_path, output_dir, model, stats, pbar):relative_path = os.path.relpath(input_path, os.path.dirname(os.path.commonprefix([input_path, output_dir])))output_path = os.path.join(output_dir, relative_path)os.makedirs(os.path.dirname(output_path), exist_ok=True)try:if input_path.lower().endswith(('.md', '.markdown')):with open(input_path, 'r', encoding='utf-8') as f:content = f.read()if not content.strip():translated_content = ""else:# 等待异步翻译完成translated_content = await translate_text_async(content, model)with open(output_path, 'w', encoding='utf-8') as f:f.write(translated_content)stats['success'] += 1else:shutil.copy2(input_path, output_path)stats['copied'] += 1except Exception as e:stats['failed'] += 1stats['failures'].append(f"处理失败: {relative_path}, 原因: {e}")finally:pbar.update(1)# 主执行函数
async def main(input_dir: str, output_dir: str):files_to_process = []for root, _, files in os.walk(input_dir):for file in files:files_to_process.append(os.path.join(root, file))if not files_to_process:print(f"在目录 '{input_dir}' 中没有找到任何文件。")returnstart_time = time.time()stats = {'success': 0, 'failed': 0, 'copied': 0, 'failures': []}os.makedirs(output_dir, exist_ok=True)print(f"开始并发处理目录 '{input_dir}'...")# 创建一个 Semaphore 来控制并发数量,防止瞬间请求过多导致API拒绝# 比如,我们限制最多同时有 10 个请求在进行CONCURRENT_LIMIT = 10semaphore = asyncio.Semaphore(CONCURRENT_LIMIT)async def throttled_process_file(input_path, output_dir, model, stats, pbar):async with semaphore:await process_file_async(input_path, output_dir, model, stats, pbar)# 在这里可以加一个非常小的延时,进一步避免触发频率限制await asyncio.sleep(0.1)with tqdm(total=len(files_to_process), desc="翻译进度", unit="个文件") as pbar:# 创建所有文件的处理任务tasks = [throttled_process_file(fp, output_dir, model, stats, pbar) for fp in files_to_process]# 等待所有任务完成await asyncio.gather(*tasks)# 打印总结报告duration = time.time() - start_timeprint("\n" + "=" * 25 + " 任务总结 " + "=" * 25)print(f"| 总耗时: {duration:.2f} 秒")print(f"| 总处理文件数: {len(files_to_process)}")print(f"| -> 翻译成功: {stats['success']}")print(f"| -> 资源文件复制成功: {stats['copied']}")print(f"| -> 处理失败: {stats['failed']}")if stats['failures']:print("\n" + "-" * 25 + " 失败详情 " + "-" * 25)for fail_info in stats['failures']:print(f"| {fail_info}")print("\n翻译任务全部完成!")if __name__ == "__main__":input_directory = r"docs-eng"output_directory = "docs-zh-async"# 运行异步主函数asyncio.run(main(input_directory, output_directory))