什么时候会用到 concurrent.futures?要不要背?
什么时候会用到 concurrent.futures?要不要背?
- 一、到底什么时候会用到它?
- 二、哪些 API 必须背下来?
- 1. 创建线程池:`ThreadPoolExecutor`
- 2. 提交单个任务:`submit()` + `Future`
- 3. 等最快先完成:`as_completed()`
- 4. 统一收集结果(顺序固定):`executor.map()`
- 5. 任务完成后自动回调:`add_done_callback()`
- 6. 取消任务:`cancel()`
- 7. CPU 密集场景换进程池:`ProcessPoolExecutor`
- 三、易踩的坑(背下来少加班)
- 四、总结背诵清单
一、到底什么时候会用到它?
场景 | 举例 | 是否必须掌握 |
---|---|---|
IO 密集型并发 | 同时爬 100 个网页、并发下载、并发调用第三方 API | ✅ 推荐掌握 |
CPU 密集型并行 | 图像批量压缩、科学计算(NumPy 之外的部分) | ✅ 推荐掌握 |
简化 Thread/Process 手写 | 不想手动写 threading.Thread 或 multiprocessing.Process | ✅ 推荐掌握 |
异步回调需求 | 任务完成后自动写日志、发通知 | ✅ 推荐掌握 |
需要超时/取消任务 | 爬虫里 3 秒还没下完就放弃 | ✅ 推荐掌握 |
一句话:只要你写过“for 循环逐个处理,太慢!”就值得用
concurrent.futures
来提速/简化代码。
二、哪些 API 必须背下来?
下面把真正常用、面试常问、开发必会的 7 个核心点提炼出来,并给出最小可运行示例。
复制即可跑,建议收藏+背诵。
1. 创建线程池:ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor
import requestsURLS = ["https://www.baidu.com","https://www.zhihu.com","https://www.bilibili.com",
]def fetch(url):return requests.get(url, timeout=5).status_code# ===== 背:最常用的 3 行 =====
with ThreadPoolExecutor(max_workers=3) as pool:for status in pool.map(fetch, URLS):print(status)
2. 提交单个任务:submit()
+ Future
from concurrent.futures import ThreadPoolExecutorpool = ThreadPoolExecutor()
future = pool.submit(pow, 2, 10) # 2**10
print(future.result()) # 1024
pool.shutdown() # 别忘了,或用 with
3. 等最快先完成:as_completed()
from concurrent.futures import ThreadPoolExecutor, as_completed
import time, randomdef work(x):time.sleep(random.random())return x * xwith ThreadPoolExecutor() as pool:futures = [pool.submit(work, i) for i in range(5)]for f in as_completed(futures): # 谁先完就处理谁print(f"done: {f.result()}")
4. 统一收集结果(顺序固定):executor.map()
with ThreadPoolExecutor() as pool:squares = list(pool.map(lambda x: x**2, range(5)))
print(squares) # [0, 1, 4, 9, 16] 顺序与输入一致
5. 任务完成后自动回调:add_done_callback()
def callback(fut):print("Task finished, result =", fut.result())with ThreadPoolExecutor() as pool:f = pool.submit(lambda: 42)f.add_done_callback(callback)
6. 取消任务:cancel()
with ThreadPoolExecutor(max_workers=2) as pool:future = pool.submit(time.sleep, 10)print("cancel success?", future.cancel()) # True/False
7. CPU 密集场景换进程池:ProcessPoolExecutor
from concurrent.futures import ProcessPoolExecutor
import mathdef is_prime(n):return n > 1 and all(n % i for i in range(2, int(math.sqrt(n)) + 1))with ProcessPoolExecutor() as pool:primes = list(pool.map(is_prime, range(100)))
print(sum(primes)) # 25
三、易踩的坑(背下来少加班)
坑 | 解决口诀 |
---|---|
全局变量在进程池不共享 | 传参、队列或 multiprocessing.Manager |
进程池里抛异常会 Broken | try/except BrokenProcessPool |
submit 忘 shutdown | 用 with ThreadPoolExecutor() as ex: |
四、总结背诵清单
- 线程池:
ThreadPoolExecutor(max_workers=n)
- 进程池:
ProcessPoolExecutor(max_workers=n)
- 批量 map:
executor.map(func, iterable)
→ 顺序固定 - 单个 submit:
future = executor.submit(func, *args)
- 谁先完成:
for f in as_completed(futures): ...
- 回调:
future.add_done_callback(fn)
- 取消:
future.cancel()
- 异常:
future.exception()
/try...except BrokenProcessPool
- 上下文管理:
with executor:
自动shutdown