当前位置: 首页 > news >正文

《Effective Python》第九章 并发与并行——使用 concurrent.futures 实现真正的并行化

引言

本文基于 **《Effective Python: 125 Specific Ways to Write Better Python, 3rd Edition》**的 第9章 并发与并行 中的 **Item 79: Consider concurrent.futures for True Parallelism **,旨在总结书中关于利用 Python 的 concurrent.futures 模块实现并行计算的核心要点,结合个人实际开发中的经验与理解,深入探讨如何通过多进程、线程优化程序性能,并延伸讨论一些常见误区和进阶思考。

在现代计算机硬件日益强大的背景下,单核 CPU 已无法满足高性能计算的需求。Python 由于 GIL(全局解释器锁)的存在,使得多线程并不能真正实现 CPU 密集型任务的并行。因此,掌握如何有效利用多核 CPU 成为了提升 Python 程序性能的关键。concurrent.futures 提供了一个简洁而高效的接口,尤其适合处理可拆分、无状态的任务,是迈向高效并发编程的重要一步。


一、如何突破 GIL 的限制,实现真正的并行?

多线程受 GIL 限制,无法实现 CPU 并行;而 ProcessPoolExecutor 可以绕过 GIL,在多个 CPU 核心上真正并行执行任务。

背景与问题

在 Python 中,由于 GIL(Global Interpreter Lock) 的存在,即使我们启动多个线程,它们也只能在一个 CPU 核心上轮流执行。这意味着对于 CPU 密集型任务(如图像处理、数值计算等),使用 ThreadPoolExecutor 并不会带来速度上的提升,反而可能因为线程切换带来额外开销。

例如,以下是一个串行计算最大公约数(GCD)的例子:

def gcd(pair):a, b = pairlow = min(a, b)for i in range(low, 0, -1):if a % i == 0 and b % i == 0:return iraise RuntimeError("Not reachable")numbers = [(19633090, 22659730),(20306770, 38141720),# ...更多元组...
]results = list(map(gcd, numbers))

如果我们尝试用线程池来加速这段代码:

from concurrent.futures import ThreadPoolExecutorwith ThreadPoolExecutor(max_workers=8) as executor:results = list(executor.map(gcd, numbers))

结果并不会比串行快,甚至更慢,因为 GIL 会阻止多个线程同时运行 Python 字节码。

解决方案:使用 ProcessPoolExecutor

要真正利用多核 CPU,我们需要使用 多进程模型。Python 的 multiprocessing 模块允许我们创建子进程,每个进程都有独立的内存空间和 GIL,从而实现真正的并行。

concurrent.futures.ProcessPoolExecutor 是一个封装良好的高层接口,简化了多进程编程。我们可以简单地将上面的 ThreadPoolExecutor 替换为 ProcessPoolExecutor

from concurrent.futures import ProcessPoolExecutorwith ProcessPoolExecutor(max_workers=8) as executor:results = list(executor.map(gcd, numbers))

这样就能充分利用多核 CPU,显著提高计算效率。

原理简析

ProcessPoolExecutor 的底层依赖于 multiprocessing 模块,其工作流程大致如下:

  1. 主进程将数据序列化(通过 pickle)。
  2. 数据通过本地 socket 发送到子进程。
  3. 子进程反序列化数据后执行目标函数。
  4. 子进程将结果再次序列化返回主进程。
  5. 主进程合并所有结果。

虽然这个过程涉及多次序列化/反序列化操作,但对 CPU 密集型任务而言,这种开销是值得的,因为它能真正实现并行计算。

如果把每个 CPU 核心比作一个工人,那么 ThreadPoolExecutor 就像让一个工人反复切换任务,效率低;而 ProcessPoolExecutor 则是让多个工人各自负责一个任务,互不干扰,效率高得多。


二、什么样的任务适合用 ProcessPoolExecutor 加速?

最适合并行化的任务是那些——孤立、高杠杆的任务

  • 孤立性:任务之间无需共享状态或相互依赖;
  • 高杠杆性:输入输出数据小,计算量大。

典型适用场景

✅ 数值计算与算法模拟

比如文章中提到的最大公约数计算,就是一个典型的 CPU 密集型任务,且每个任务完全独立,非常适合并行化。

其他类似任务包括:

  • 矩阵运算
  • 图像滤波处理
  • 遗传算法模拟
  • 蒙特卡洛模拟
✅ 批量文件处理

如果你需要批量处理大量图片、日志文件、JSON 文件等,也可以使用 ProcessPoolExecutor 来并行处理这些文件。

例如:

def process_file(filename):with open(filename, 'r') as f:data = f.read()# 进行复杂解析或转换逻辑return processed_datafilenames = ['file1.txt', 'file2.txt', ...]
with ProcessPoolExecutor() as executor:results = list(executor.map(process_file, filenames))
❌ 不适合的情况

当然,并不是所有任务都适合用 ProcessPoolExecutor 来加速:

  • I/O 密集型任务:如网络请求、数据库查询等更适合使用 ThreadPoolExecutor,因为它们大部分时间在等待外部响应。
  • 频繁通信的任务:如果任务之间需要频繁交换状态或共享资源,那么进程间通信的开销会抵消并行带来的收益。
  • 小规模任务:如果每个任务的计算量很小,那么进程启动和数据传输的开销反而会导致整体性能下降。

⚠️ 常见误区提醒

有人认为“只要是多核 CPU 就应该用多进程”,这是错误的。只有当任务本身具有足够的计算量和独立性时,才适合使用 ProcessPoolExecutor


三、如何避免在多进程中误用共享状态?

多进程之间的内存是隔离的,不能直接共享变量。试图在多个进程中修改同一个全局变量会导致行为不可预测。

错误示例

下面是一个常见的错误写法:

shared_counter = 0def bad_task(x):global shared_countershared_counter += xreturn shared_counterwith ProcessPoolExecutor() as executor:results = list(executor.map(bad_task, [1, 2, 3, 4]))

在这个例子中,我们试图在多个进程中修改一个全局变量 shared_counter,但实际上每个进程都有自己的一份副本,修改的是局部变量,最终的结果也无法预期。

正确做法

如果你确实需要在多个进程之间共享状态,可以使用 multiprocessing 提供的一些高级机制:

1. 使用 ValueArray 实现共享内存
from multiprocessing import Value, Arraycounter = Value('i', 0)  # 整数类型共享变量
arr = Array('d', [0.0] * 10)  # 浮点数组共享变量def safe_task(x):with counter.get_lock():  # 获取锁counter.value += xreturn counter.value

这种方式需要显式加锁,防止数据竞争。

2. 使用 Manager 创建跨进程对象
from multiprocessing import Managerdef manager_task(d, key, value):d[key] = valuewith Manager() as manager:shared_dict = manager.dict()with ProcessPoolExecutor() as executor:futures = [executor.submit(manager_task, shared_dict, str(i), i*2) for i in range(5)]for future in futures:future.result()print(shared_dict)  # 输出:{'0': 0, '1': 2, '2': 4, '3': 6, '4': 8}

Manager 提供了一个服务器进程,用于管理共享对象,支持字典、列表等多种结构。

建议

  • 尽量避免共享状态:设计任务时应尽量做到无状态,减少跨进程通信需求。
  • 必须共享时谨慎处理:使用 Value, Array, Manager 等工具时,务必注意加锁和同步,否则容易引发数据竞争。

四、如何选择线程还是进程?从实践出发谈选型策略

CPU 密集用进程,I/O 密集用线程,混合任务灵活组合。

三种并发方式对比

类型模块特点适用场景
单线程简单易懂,但性能有限简单脚本、原型验证
多线程threading, ThreadPoolExecutor支持 I/O 并发,受限于 GIL网络请求、文件读写
多进程multiprocessing, ProcessPoolExecutor绕过 GIL,真正并行数值计算、图像处理

实际开发中的选型策略

✅ 场景一:纯 CPU 计算 → 优先使用 ProcessPoolExecutor

如前所述,最大公约数、矩阵乘法等任务适合用多进程加速。

✅ 场景二:大量 I/O 请求 → 优先使用 ThreadPoolExecutor

例如爬虫、API 接口调用、文件读写等:

import requests
from concurrent.futures import ThreadPoolExecutordef fetch_url(url):return requests.get(url).status_codeurls = ['https://example.com'] * 10
with ThreadPoolExecutor() as executor:results = list(executor.map(fetch_url, urls))
✅ 场景三:混合型任务 → 分阶段使用不同并发方式

比如先用 ThreadPoolExecutor 抓取一批数据,再用 ProcessPoolExecutor 对数据进行处理:

# 第一阶段:网络请求
def fetch_data(url):return requests.get(url).json()# 第二阶段:数据分析
def analyze(data):return sum(data.values())with ThreadPoolExecutor() as pool1:raw_data = list(pool1.map(fetch_data, urls))with ProcessPoolExecutor() as pool2:results = list(pool2.map(analyze, raw_data))
❌ 场景四:低效组合 → 避免滥用并发

不要盲目地给所有任务都加上并发。比如:

# 错误示例:对非常简单的任务强行并发
with ProcessPoolExecutor() as executor:result = list(executor.map(lambda x: x + 1, [1, 2, 3]))

这样的任务计算量极小,反而因进程创建和通信带来额外开销,得不偿失。


总结

本文围绕《Effective Python》第9章第79条内容,系统讲解了如何使用 concurrent.futures 模块实现真正的并行化,重点包括:

  • GIL 的限制导致多线程无法实现 CPU 并行;
  • ProcessPoolExecutor 是实现并行计算的有效手段;
  • 并行化最适合孤立、高杠杆任务;
  • 多进程中不能直接共享状态,需借助 multiprocessing 提供的工具;
  • 应根据任务类型选择线程或进程,混合任务可分阶段使用。

这些知识不仅适用于学术研究或理论学习,更是我们在日常开发中提升程序性能、应对大规模计算挑战的实用技巧。


结语

通过阅读本书这一章节并结合自己的实践经验,我深刻体会到并发编程在现代软件开发中的重要性。Python 虽然有 GIL 的限制,但只要合理使用 concurrent.futuresmultiprocessing,我们依然可以写出高效、稳定的并行程序。

如果你觉得这篇文章对你有所帮助,欢迎点赞、收藏、分享给你的朋友!后续我会继续分享更多关于《Effective Python》精读笔记系列,参考我的代码库 effective_python_3rd,一起交流成长!

相关文章:

  • HarmonyOS 5的分布式通信矩阵是如何工作的?
  • Docker 高级管理笔记
  • HTML5简介
  • 高云GW5AT-LV60 FPGA图像处理板
  • 升级到 .NET 9 分步指南
  • redis分布式锁 Redisson在电商平台开发中的实际应用
  • sqlsuger 获取表行数
  • 基于物联网的智能饮水机系统设计
  • 1.23Node.js 中操作 mongodb
  • Qt Library库系列----Serial串口
  • 智慧医院核心引擎:IBMS 系统守护医疗环境高效与安全​
  • clickhouse-server连不上clickhouse-keeper的问题记录
  • SQL Server 分区方案 VS 分表方案——区别与选型分析
  • 【unity】批量剔除图片四周空白像素的工具
  • UE5 游戏模板 —— FirstShootGame
  • 【论文笔记】【强化微调】T-GRPO:对视频数据进行强化微调
  • WPF调试三种工具介绍:Live Visual Tree、Live Property Explorer与Snoop
  • TMultiplexedProtocol 和 TMultiplexedProcessor
  • java 找出两个json文件的不同之处
  • Python Day55
  • 有哪些网站教做吃的/电商关键词查询工具
  • 深圳金融投资网站建设/seo建站公司
  • 邳州哪家做百度推广网站/软文写作范例大全
  • 网上购物商城网站建设/成都seo培
  • 用php做的网站模版/西安企业seo外包服务公司
  • 网站建设学的课程/含有友情链接的网页