Python中的异步与并行
文章目录
- 核心概念辨析
- 1. 异步编程 (Asynchronous Programming)
- 核心原理:事件循环 + 协程
- 解决什么问题?
- Python 实现
- 2. 并行编程 (Parallel Programming)
- 核心原理:多进程 + 操作系统调度
- 解决什么问题?
- Python 实现
- 3. 总结对比与如何选择
- 如何选择?
Python中的异步与并行,这二者是提升程序性能的利器,但原理和适用场景完全不同。
我会分三部分来介绍:
- 异步编程 (Asynchronous):以
asyncio为核心。 - 并行编程 (Parallel):以
multiprocessing为核心。 - 总结对比与选择:告诉你何时该用哪个。
核心概念辨析
在开始之前,我们先用一个生活中的例子来理解四个关键概念:同步、异步、并发、并行。
假设你在厨房做饭,任务有:① 烧水(10分钟)、② 切菜(5分钟)。
- 同步 (Synchronous):你先站在炉子前,盯着水壶直到水烧开(10分钟),然后再去切菜(5分钟)。总耗时:15分钟。 特点:一件一件事做,死等。
- 异步 (Asynchronous):你把水壶放上炉子,然后立刻去切菜(5分钟)。切完菜,你再等水烧开剩下的5分钟。总耗时:10分钟。 特点:不傻等,利用等待时间做别的事。这是“单人”完成的。
- 并发 (Concurrent):听起来和异步很像。核心在于 任务切换。你烧上水,切一会菜,看看水,再切一会菜… 你在多个任务间来回切换。从宏观上看,任务是“同时”推进的。异步是实现并发的一种方式。
- 并行 (Parallel):你叫来一个朋友。你烧水,同时 他在切菜。总耗时:10分钟(取决于最长的任务)。 特点:多个任务在同一时刻真正地同时执行。这需要“多人”(多核CPU)才能完成。
1. 异步编程 (Asynchronous Programming)
异步在Python中主要通过 asyncio 库实现,它是一种基于 事件循环(Event Loop) 的 协程(Coroutine) 并发模型。
核心原理:事件循环 + 协程
- 单线程模型:
asyncio本质上是在 一个线程 内工作的。 - 协程 (Coroutine):可以被认为是“可暂停的函数”。使用
async def定义的函数就是一个协程。当它执行到一个耗时操作(如网络请求、文件读写)时,它不会傻等。 await关键字:协程通过await关键字,告诉事件循环:“我要在这里等待一个结果(比如等待网站响应),但我现在没事干了,你可以先去执行别的任务。”- 事件循环 (Event Loop):这是
asyncio的心脏。它是一个任务调度器,维护着一个“待执行”和“已暂停”的任务列表。当一个任务通过await暂停自己时,事件循环就会从“待执行”列表中拿出另一个任务来执行。当之前暂停的任务所等待的操作完成时(比如网站返回了数据),事件循环会再次唤醒它,从它上次暂停的地方继续执行。
一句话总结原理:通过在单个线程内高效地切换任务,将等待I/O(输入/输出)的时间利用起来,从而实现并发,提升程序对I/O密集型任务的处理能力。
解决什么问题?
I/O密集型 (I/O-Bound) 任务。
这类任务的瓶颈在于等待外部资源,而不是CPU的计算能力。例如:
- 网络爬虫(等待网站服务器响应)
- Web服务器(等待客户端请求)
- 数据库查询(等待数据库返回结果)
- 文件读写(等待硬盘)
在这些场景下,CPU大部分时间是空闲的,用异步可以极大地提高效率。
Python 实现
你需要掌握以下几个关键点:
asyncio:Python的异步标准库。async def:用于定义一个协程函数。await:用于暂停协程,等待一个异步操作完成。asyncio.run(coro):运行顶层协程的入口。asyncio.gather(*aws):并发地运行多个异步任务。
示例1:基础异步
import asyncio
import timeasync def say_after(delay, what):"""一个简单的协程,等待指定秒数后打印信息"""await asyncio.sleep(delay) # asyncio.sleep是异步的sleep,会交出控制权print(what)async def main():start_time = time.time()print(f"started at {time.strftime('%X')}")# 使用asyncio.gather来并发执行多个任务await asyncio.gather(say_after(1, 'hello'),say_after(2, 'world'))print(f"finished at {time.strftime('%X')}")end_time = time.time()print(f"总耗时: {end_time - start_time:.2f} 秒")# 运行主协程
# 在Python 3.7+中,这是最简单的启动方式
asyncio.run(main())
输出分析:
程序总耗时约2秒,而不是3秒(1+2)。因为 say_after(1, ...) 启动后,在 await asyncio.sleep(1) 时,事件循环会切换去执行 say_after(2, ...)。这完美体现了异步的优势。
示例2:异步网络请求 (需要安装 aiohttp 库: pip install aiohttp)
import asyncio
import aiohttpasync def fetch(session, url):"""异步获取URL内容"""async with session.get(url) as response:# response.text() 也是一个异步操作return await response.text()async def main():urls = ['http://httpbin.org/delay/1','http://httpbin.org/delay/2','http://httpbin.org/delay/1']async with aiohttp.ClientSession() as session:# 创建所有任务tasks = [fetch(session, url) for url in urls]# 并发执行所有任务htmls = await asyncio.gather(*tasks)for i, html in enumerate(htmls):print(f"URL {i+1} 内容长度: {len(html)}")asyncio.run(main())
输出分析:
总耗时约2秒(取决于最长的那个请求),而不是4秒(1+2+1)。所有请求几乎是“同时”发出的。
2. 并行编程 (Parallel Programming)
并行在Python中主要通过 multiprocessing 库实现,它利用 多进程 来规避Python的 全局解释器锁 (GIL)。
核心原理:多进程 + 操作系统调度
- 全局解释器锁 (GIL):这是CPython解释器的一个历史遗留问题。它保证在任何时刻,一个Python进程中只有一个线程在执行Python字节码。这意味着即使在多核CPU上,Python的多线程也无法实现真正的并行计算,只能做并发。
- 多进程 (
multiprocessing):为了打破GIL的限制,multiprocessing模块通过创建全新的子进程来执行任务。每个子进程都有自己独立的Python解释器和内存空间,因此它们各自拥有自己的GIL,互不影响。 - 操作系统调度:这些独立的进程由操作系统(OS)负责调度,可以被分配到不同的CPU核心上,从而实现真正的并行执行。
一句话总结原理:通过创建多个拥有独立GIL的进程,并由操作系统将它们调度到不同CPU核心上,实现真正的并行计算,压榨CPU性能。
解决什么问题?
CPU密集型 (CPU-Bound) 任务。
这类任务的瓶颈在于CPU的计算能力,需要大量的计算。例如:
- 大规模数学运算、科学计算
- 视频编码、图像处理
- 数据分析和机器学习模型训练
- 文件压缩
在这些场景下,asyncio 毫无用处,因为它只有一个线程,一个高计算量的任务会“霸占”CPU,导致整个事件循环被阻塞。
Python 实现
multiprocessing 提供了与 threading 模块相似的API,但它使用进程而不是线程。
multiprocessing.Process:创建一个子进程来执行一个函数。multiprocessing.Pool:创建一个进程池,方便地将任务分发给多个进程并行处理。multiprocessing.Queue,Pipe:用于在进程间安全地通信。
示例:并行计算
import multiprocessing
import timedef cpu_bound_task(n):"""一个CPU密集型任务,计算从0到n的平方和"""total = 0for i in range(n):total += i * ireturn totaldef main():numbers = [10_000_000, 10_000_001, 10_000_002, 10_000_003]start_time = time.time()# 使用进程池# os.cpu_count() 获取CPU核心数,可以充分利用硬件with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool:# map方法会将numbers列表中的每个元素作为参数传递给cpu_bound_task# 并且是并行执行的results = pool.map(cpu_bound_task, numbers)end_time = time.time()print(f"Results: {results}")print(f"总耗时: {end_time - start_time:.2f} 秒")if __name__ == '__main__':# 在Windows和macOS上,多进程代码必须放在 if __name__ == '__main__': 块中main()
输出分析:
在一台4核CPU的机器上,这4个任务会被分配到4个核心上几乎同时执行。总耗时约等于执行单个任务的时间,而不是4倍。如果你把 pool.map 改成普通的循环,你会发现耗时会增加接近4倍。
3. 总结对比与如何选择
| 特性 | 异步 (asyncio) | 并行 (multiprocessing) |
|---|---|---|
| 目标 | I/O密集型 (网络、磁盘读写) | CPU密集型 (大量计算) |
| CPU使用 | 单核(单个进程内) | 多核(多个进程) |
| 切换方式 | 协作式 (Cooperative) - 代码通过await主动让出 | 抢占式 (Preemptive) - 操作系统强制调度 |
| 内存空间 | 共享内存(单进程),数据交换成本低 | 独立内存,数据交换成本高(需IPC) |
| 开销 | 轻量级 (协程创建和切换快) | 重量级 (进程创建和销毁慢) |
| 核心障碍 | CPU密集型任务会阻塞整个事件循环 | GIL(已通过多进程规避) |
如何选择?
- 你的瓶颈是I/O吗?
- 是 (比如写爬虫、Web服务):首选
asyncio。它资源消耗小,并发能力强,非常适合处理成千上万的并发连接。
- 是 (比如写爬虫、Web服务):首选
- 你的瓶颈是CPU吗?
- 是 (比如做数据分析、科学计算):必须用
multiprocessing。这是在Python中利用多核CPU的唯一标准方法。
- 是 (比如做数据分析、科学计算):必须用
- 既有CPU密集型,又有I/O密集型?(混合型)
- 这是一个高级场景。最佳实践是
asyncio+multiprocessing结合。 - 策略:主程序使用
asyncio管理I/O密集型任务(如网络请求)。当遇到一个CPU密集型任务时,使用loop.run_in_executor()将这个耗时的计算任务扔到multiprocessing的进程池中去执行,从而避免阻塞事件循环。执行完毕后,再通过await拿回结果。
- 这是一个高级场景。最佳实践是
混合型示例:
import asyncio
from concurrent.futures import ProcessPoolExecutordef cpu_bound_task(n):# 模拟一个耗时的CPU计算total = 0for i in range(n):total += i * ireturn totalasync def main():loop = asyncio.get_running_loop()# 创建一个进程池执行器with ProcessPoolExecutor() as pool:print("开始一个耗时的CPU计算...")# run_in_executor将CPU任务提交到进程池,并返回一个future# await等待这个future完成,但事件循环不会被阻塞result = await loop.run_in_executor(pool, cpu_bound_task, 10_000_000)print(f"CPU计算完成,结果: {result}")print("CPU计算的同时,我们可以做点别的异步I/O操作...")await asyncio.sleep(2) # 模拟其他I/Oprint("其他I/O操作完成")asyncio.run(main())