Python 多线程与多进程入门指南
快速入门 Python concurrent.futures
concurrent.futures 是 Python 中用于实现并发编程的强大工具,它提供了高级接口来异步执行可调用对象。
1. 理解基本概念
-
Executor:执行器类,包括
ThreadPoolExecutor(线程池)
和ProcessPoolExecutor(进程池)
是核心类,负责管理线程/进程的生命周期。 -
提交任务:通过
submit()
提交单个任务,或map()
批量提交任务。 -
Future:表示异步计算的对象,用于获取结果或状态。可通过
result()
获取(阻塞),或as_completed()
迭代已完成的任务。 -
适用场景:理解线程(IO密集型)和进程(CPU密集型)的区别
- 线程池:I/O 密集型任务(如网络请求、文件读写),利用线程等待 I/O 的时间切换执行其他任务。
- 进程池:CPU 密集型任务(如数值计算),绕过 Python GIL(全局解释器锁)限制,利用多核 CPU。
-
安装与导入:concurrent.futures 是标准库,无需额外安装。
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed import time # 用于计时
2. 基本使用模式
操作 | 方法/说明 |
---|---|
创建线程池/进程池 | ThreadPoolExecutor(max_workers=N) 或 ProcessPoolExecutor(max_workers=N) |
提交单个任务 | executor.submit(func, *args, **kwargs) → 返回 Future 对象 |
批量提交任务 | executor.map(func, iterable) → 返回按输入顺序的迭代器(不支持异常捕获) |
异步获取结果 | future.result(timeout=None) → 阻塞直到完成(超时可选) |
迭代已完成的任务 | as_completed(futures) → 返回按完成顺序的 Future 迭代器 |
取消未执行的任务 | future.cancel() → 仅当任务未开始时可能成功(返回布尔值) |
3. 举例
I/O 密集型任务
def fetch_url(url):"""模拟网络请求:延迟 1 秒后返回 URL"""time.sleep(1) # 模拟 I/O 等待return f"Data from {url}"# 主程序
if __name__ == "__main__":urls = ["https://example.com", "https://google.com", "https://github.com"]# 1. 创建线程池(默认线程数为 CPU 核心数 * 5,可通过 max_workers 指定)with ThreadPoolExecutor(max_workers=3) as executor:# 2. 提交任务(方式 1:逐个提交,返回 Future 对象)futures = [executor.submit(fetch_url, url) for url in urls]# 3. 获取结果(方式 1:按完成顺序遍历)for future in as_completed(futures):try:result = future.result() # 阻塞直到任务完成print(result)except Exception as e:print(f"Task failed: {e}")# (可选)方式 2:批量提交并按输入顺序获取结果(map 方法)results = executor.map(fetch_url, urls) # 返回迭代器,按 urls 顺序输出print(list(results)) # 输出:["Data from ...", ...]
CPU 密集型任务
def calculate_factorial(n):"""计算阶乘(CPU 密集型)"""result = 1for i in range(1, n+1):result *= ireturn resultif __name__ == "__main__":numbers = [1000, 2000, 3000, 4000]# 使用进程池(绕过 GIL,利用多核)with ProcessPoolExecutor(max_workers=4) as executor:# 提交任务并获取结果futures = [executor.submit(calculate_factorial, num) for num in numbers]for future in as_completed(futures):print(f"Factorial result: {future.result()}")
4. 计时
使用 time.perf_counter()(高精度计时器)测量整个并发任务的执行时间:
import time
from concurrent.futures import ThreadPoolExecutordef process_data(a, b):time.sleep(0.5)return f"Result: {a}-{b}"if __name__ == "__main__":start_time = time.perf_counter() # 记录开始时间with ThreadPoolExecutor(max_workers=3) as executor:# 提交 5 个任务(多变量)futures = [executor.submit(process_data, i, f"str_{i}") for i in range(5)]# 等待所有任务完成(可选:遍历结果)for future in as_completed(futures):print(future.result())end_time = time.perf_counter() # 记录结束时间total_time = end_time - start_timeprint(f"Total time: {total_time:.2f} seconds") # 输出约 0.5~1 秒(并发执行)
学习资源
-
官方文档:
- concurrent.futures — 启动并行任务:最权威的参考资料,包含所有类和方法的详细说明
-
实用教程:
- Real Python 并发教程:实时 Python 的并发编程指南,涵盖 concurrent.futures 的详细用法和场景对比。
- 《Python并行编程手册》
- 《流畅的Python》中 第 19 章
- 《流畅的Python》中关于并发编程的章节笔记
- 《Python并发编程实战》
-
视频资源:
- Corey Schafer 的并发编程教程(YouTube)
- Python 多线程与多进程(YouTube)