当前位置: 首页 > news >正文

深入浅出 Python Asynchronous I/O:从 asyncio 入门到实战

在现代软件开发中,性能是一个永恒的话题。特别是在处理网络请求、文件读写等 I/O 密集型任务时,传统的同步编程模型可能会因为等待而浪费大量时间。为了解决这个问题,异步编程应运而生。Python 通过内置的 asyncio 库,为开发者提供了强大而优雅的异步编程能力。 [1][2]

本文将带你从零开始,逐步深入 asyncio 的世界,理解其核心概念,并最终通过实战案例掌握其用法。

1. 什么是异步编程?为什么要用它?

想象一下你在厨房做饭,需要同时烧水、切菜和炒菜。

  • 同步 (Synchronous):你先把水壶放到灶上,然后就一直盯着它,直到水烧开。之后,你再去切菜,切完所有菜后,最后才开始炒菜。在这个过程中,当你在等待水烧开时,你什么也做不了,时间被白白浪费。
  • 异步 (Asynchronous):你把水壶放到灶上后,就不管它了,直接去切菜。切菜的间隙,你抽空看一眼水开了没。水一开,你就去处理。这样,等待水烧开的时间被你用来切菜,整个做饭的效率大大提高。 [3]

代码的世界也是如此。同步编程就是一次只做一件事,必须等前一件事(比如一次网络请求)完成后才能做下一件。 [1] 而异步编程允许程序在等待一个耗时操作(通常是 I/O 操作)时,切换去执行其他任务,从而提高整体效率。 [1][3]

asyncio 正是 Python 用于实现这种高效工作模式的标准库,它特别适合 I/O 密集型和高层级的网络代码。 [3]

2. asyncio 的核心基石:async/await 与协程

要使用 asyncio,首先需要理解几个关键概念。

协程 (Coroutine)

在 Python 中,使用 async def 关键字定义的函数,我们称之为协程函数。调用它并不会立即执行函数体,而是会返回一个协程对象。 [4] 协程可以被看作是一种可以暂停和恢复执行的特殊函数。 [1]

await

这个关键字只能在 async def 函数内部使用。它的作用是“等待”一个可等待对象 (Awaitable) 执行完成。 [5] 可等待对象包括协程、任务 (Task) 和 Future 对象。 [5] 当程序执行到 await 时,它会告诉事件循环:“这个操作有点耗时,我先在这里暂停,你可以去忙别的,等我好了再回来继续。” [6]

事件循环 (Event Loop)

事件循环是 asyncio 的心脏。 [2][6] 你可以把它想象成一个大管家,负责调度和执行所有的异步任务。它会不断检查是否有任务已经准备好可以继续运行,或者是否有新的任务需要开始。 [2]

asyncio.run()

这是启动异步程序的入口。它会创建一个新的事件循环,运行你传入的顶级协程(通常是 main 函数),并在协程执行完毕后关闭事件循环。 [7][8]

3. 牛刀小试:你的第一个 asyncio 程序

让我们来看一个最简单的例子。

import asyncio
import time# 使用 async def 定义一个协程函数
async def say_hello(delay, message):"""一个简单的协程,会延迟指定秒数后打印消息。"""print(f"[{time.strftime('%X')}] 开始任务: {message}")# asyncio.sleep 是一个异步的 time.sleep()# 当遇到 await asyncio.sleep() 时,事件循环会切换到其他任务await asyncio.sleep(delay)print(f"[{time.strftime('%X')}] 完成任务: {message}")# 定义主入口协程
async def main():print(f"[{time.strftime('%X')}] 程序开始")# 直接 await 调用协程await say_hello(2, "你好")await say_hello(1, "世界")print(f"[{time.strftime('%X')}] 程序结束")# 使用 asyncio.run() 启动程序
if __name__ == "__main__":asyncio.run(main())

运行结果分析:

[13:30:00] 程序开始
[13:30:00] 开始任务: 你好
[13:30:02] 完成任务: 你好
[13:30:02] 开始任务: 世界
[13:30:03] 完成任务: 世界
[13:30:03] 程序结束

你会发现,这段代码虽然是异步的,但执行顺序和同步代码一样,总共耗时 3 秒。这是因为我们依次 await 了两个协程,必须等第一个完成后,第二个才会开始。

那么,如何让它们“同时”运行呢?

4. 并发执行:asyncio.gatherasyncio.create_task

为了真正实现并发,我们需要让多个任务在事件循环中同时被调度。 [3]

asyncio.gather

asyncio.gather() 可以接收一个或多个可等待对象,将它们并发执行,并按输入顺序返回所有结果。 [9]

修改上面的 main 函数:

async def main():print(f"[{time.strftime('%X')}] 程序开始")# 使用 asyncio.gather 并发运行两个协程await asyncio.gather(say_hello(2, "你好"),say_hello(1, "世界"))print(f"[{time.strftime('%X')}] 程序结束")# ... 其他代码不变 ...

新的运行结果:

[13:32:10] 程序开始
[13:32:10] 开始任务: 你好
[13:32:10] 开始任务: 世界
[13:32:11] 完成任务: 世界
[13:32:12] 完成任务: 你好
[13:32:12] 程序结束

观察时间戳,两个任务几乎是同时开始的。耗时1秒的任务先结束,耗时2秒的后结束。整个程序的总耗时取决于最长的那个任务,也就是 2 秒,而不是之前的 3 秒。这就是并发带来的效率提升! [3]

asyncio.create_task

asyncio.create_task() 用于将一个协程包装成一个任务 (Task),并提交给事件循环立即开始执行,而不需要马上 await 它。 [6][7] TaskFuture 的一个子类,专门用于管理协程。 [4]

这就像是“发射后不管”(fire-and-forget),你创建了一个任务让它在后台运行,然后可以继续做其他事情。 [6]

async def main():print(f"[{time.strftime('%X')}] 程序开始")# 创建任务,任务会立即开始在事件循环中被调度task1 = asyncio.create_task(say_hello(2, "你好"))task2 = asyncio.create_task(say_hello(1, "世界"))print(f"[{time.strftime('%X')}] 任务已创建")# 在这里可以做其他事情await asyncio.sleep(0.5)print(f"[{time.strftime('%X')}] 主程序做了一些其他工作")# 等待任务完成await task1await task2print(f"[{time.strftime('%X')}] 程序结束")

运行结果:

[13:35:20] 程序开始
[13:35:20] 任务已创建
[13:35:20] 开始任务: 你好
[13:35:20] 开始任务: 世界
[13:35:20] 主程序做了一些其他工作
[13:35:21] 完成任务: 世界
[13:35:22] 完成任务: 你好
[13:35:22] 程序结束

create_taskgather 的区别在于控制的粒度。gather 是一种更高级的抽象,适合一次性并发运行多个任务并收集结果的场景。 [9] create_task 则提供了更灵活的控制,允许你在任务运行期间执行其他逻辑。 [6][9]

5. 实战演练:使用 aiohttp 并发下载网页

理论讲了这么多,让我们来看一个最能体现 asyncio 价值的场景:并发网络请求。我们将使用流行的异步 HTTP 客户端库 aiohttp。 [10][11]

首先,你需要安装 aiohttp
pip install aiohttp

下面的例子将对比同步和异步方式获取多个网页标题所花费的时间。

import asyncio
import time
import aiohttp
import requests  # 用于同步对比urls = ['https://www.python.org','https://github.com','https://www.wikipedia.org','https://www.youtube.com','https://www.amazon.com',
]def get_title_sync(url):"""同步获取网页标题"""try:resp = requests.get(url, timeout=10)# 一个简单的解析,实际应用中建议使用 BeautifulSoupreturn resp.text.split('<title>')[1].split('</title>')[0].strip()except Exception as e:return f"Error: {e}"async def get_title_async(session, url):"""异步获取网页标题"""try:# aiohttp 使用 session.get() 发起请求async with session.get(url, timeout=10) as resp:# resp.text() 是一个协程,需要 awaithtml = await resp.text()return html.split('<title>')[1].split('</title>')[0].strip()except Exception as e:return f"Error: {e}"async def main_async():# aiohttp 建议使用一个 ClientSession 来执行所有请求async with aiohttp.ClientSession() as session:tasks = [get_title_async(session, url) for url in urls]# 使用 gather 并发执行所有任务titles = await asyncio.gather(*tasks)for url, title in zip(urls, titles):print(f"{url}: {title}")if __name__ == "__main__":# --- 同步版本 ---print("--- 开始同步请求 ---")start_time_sync = time.time()for url in urls:title = get_title_sync(url)print(f"{url}: {title}")end_time_sync = time.time()print(f"同步请求总耗时: {end_time_sync - start_time_sync:.2f} 秒\n")# --- 异步版本 ---print("--- 开始异步请求 ---")start_time_async = time.time()asyncio.run(main_async())end_time_async = time.time()print(f"异步请求总耗时: {end_time_async - start_time_async:.2f} 秒")

典型的运行结果:

--- 开始同步请求 ---
https://www.python.org: Welcome to Python.org
https://github.com: GitHub: Let’s build from here
https://www.wikipedia.org: Wikipedia
https://www.youtube.com: YouTube
https://www.amazon.com: Amazon.com. Spend less. Smile more.
同步请求总耗时: 4.58 秒--- 开始异步请求 ---
https://www.python.org: Welcome to Python.org
https://github.com: GitHub: Let’s build from here
https://www.wikipedia.org: Wikipedia
https://www.youtube.com: YouTube
https://www.amazon.com: Amazon.com. Spend less. Smile more.
异步请求总耗时: 0.95 秒

结果一目了然。异步版本的速度比同步版本快了数倍。 [5] 这是因为 asyncio 在等待一个网站响应时,没有闲着,而是立即去请求下一个网站,极大地利用了网络 I/O 的等待时间。 [11]

6. 进阶:协程间的同步与通信

当我们有多个协程并发运行时,有时它们需要访问同一个资源,或者需要相互传递工作任务。这时,为了避免数据混乱和协调工作流程,就需要用到同步和通信机制。asyncio 提供了与多线程编程中类似的工具,但它们是为协程专门设计的。

6.1 资源保护:asyncio.Lock

在并发环境中,如果多个任务同时尝试修改一个共享资源(例如一个变量或文件),就可能导致竞争条件 (Race Condition),使得最终结果不可预测。

虽然 asyncio 在单线程上运行,不会有真正的并行执行,但一个协程可以在 await 处被挂起,此时事件循环会运行另一个协程。如果这两个协程都在修改同一个数据,问题依然存在。

asyncio.Lock 就是用来解决这个问题的。它保证在任何时候,只有一个协程能够获得锁并执行“临界区”代码。

使用场景:保护对共享资源的访问,确保操作的原子性。

让我们看一个例子:多个协程同时增加一个共享计数器。

import asyncio# 一个共享的资源
shared_counter = 0async def unsafe_worker():"""一个没有锁保护的协程"""global shared_counter# 1. 读取当前值current_value = shared_counter# 在这里,协程可能会被挂起,切换到另一个 workerawait asyncio.sleep(0.01) # 2. 基于旧值计算新值new_value = current_value + 1# 3. 写入新值shared_counter = new_valueasync def safe_worker(lock):"""一个有锁保护的协程"""global shared_counter# 使用 async with lock 语法可以自动获取和释放锁async with lock:current_value = shared_counterawait asyncio.sleep(0.01)new_value = current_value + 1shared_counter = new_valueasync def main():global shared_counter# --- 演示不安全的情况 ---print("--- 演示不安全的情况 ---")shared_counter = 0tasks_unsafe = [unsafe_worker() for _ in range(100)]await asyncio.gather(*tasks_unsafe)print(f"没有锁保护,100个任务完成后的计数器值: {shared_counter}") # 结果通常远小于100# --- 演示安全的情况 ---print("\n--- 演示安全的情况 ---")shared_counter = 0lock = asyncio.Lock()tasks_safe = [safe_worker(lock) for _ in range(100)]await asyncio.gather(*tasks_safe)print(f"使用锁保护,100个任务完成后的计数器值: {shared_counter}") # 结果总是100if __name__ == "__main__":asyncio.run(main())

代码解读与结果分析:

  • unsafe_worker: 在读取 (current_value = ...) 和写入 (shared_counter = ...) 之间有一个 await。这给了事件循环切换到另一个 unsafe_worker 的机会。多个 worker 可能会基于同一个旧值进行计算,导致一些增加操作丢失。因此,最终结果会小于 100。
  • safe_worker: 使用了 async with lock:。当一个协程进入这个代码块时,它会获取锁。如果此时其他协程也想进入,它们必须 await,直到第一个协程执行完毕并自动释放锁。这确保了“读-改-写”这个操作的完整性,所以最终结果总是正确的 100。

6.2 任务分发:asyncio.Queue

asyncio.Queue 是一个为异步编程设计的队列,它非常适合经典的生产者-消费者 (Producer-Consumer) 模型。

  • 生产者 (Producer):创建任务或数据,并将其放入队列。
  • 消费者 (Consumer):从队列中取出任务或数据,并进行处理。

队列本身处理了所有的同步逻辑:

  • 如果消费者试图从空队列中获取 (get) 数据,它会自动 await,直到队列中有新数据。
  • 如果生产者试图向一个已满的队列(如果创建时指定了 maxsize)中放入 (put) 数据,它会自动 await,直到队列有空位。

使用场景:解耦任务的创建和执行,实现任务分发系统,控制并发处理任务的数量。

让我们构建一个简单的爬虫模型:一个生产者负责发现 URL 并放入队列,多个消费者负责从队列中取出 URL 并“下载”。

import asyncio
import randomasync def producer(queue, num_urls):"""生产者:生成一些模拟的URL并放入队列"""print("生产者启动...")for i in range(num_urls):url = f"https://example.com/page/{i}"# 模拟发现URL需要一些时间await asyncio.sleep(random.uniform(0.1, 0.5))# 将URL放入队列await queue.put(url)print(f"生产者放入: {url}")print("生产者完成任务。")async def consumer(name, queue):"""消费者:从队列中获取URL并处理"""print(f"消费者 {name} 启动...")# 持续从队列中获取任务while True:# 从队列中获取URL,如果队列为空,会在此处等待url = await queue.get()print(f"消费者 {name} 正在处理: {url}")# 模拟处理任务需要的时间await asyncio.sleep(random.uniform(0.5, 1.5))print(f"消费者 {name} 完成处理: {url}")# 必须调用 task_done() 来通知队列这个任务已经处理完毕queue.task_done()async def main():# 创建一个不限大小的队列task_queue = asyncio.Queue()num_urls_to_produce = 10num_consumers = 3# 启动生产者producer_task = asyncio.create_task(producer(task_queue, num_urls_to_produce))# 启动多个消费者consumer_tasks = []for i in range(num_consumers):task = asyncio.create_task(consumer(f"C{i+1}", task_queue))consumer_tasks.append(task)# 等待生产者完成所有URL的放入await producer_taskprint("所有URL已放入队列,等待消费者处理...")# 等待队列中的所有任务都被处理完毕# queue.join() 会阻塞,直到队列中每个项目的 task_done() 都被调用await task_queue.join()print("所有任务处理完毕!")# 所有任务都处理完了,消费者们还在 while True 循环里等待新任务# 为了让程序能正常退出,我们需要取消这些消费者任务for task in consumer_tasks:task.cancel()if __name__ == "__main__":asyncio.run(main())

代码解读与关键点:

  1. queue.put(item): 生产者使用它来异步地添加项目。
  2. queue.get(): 消费者使用它来异步地获取项目。这是主要的同步点。
  3. queue.task_done(): 这是至关重要的一步!消费者处理完一个项目后,必须调用此方法。它会减少队列的内部计数器。
  4. queue.join(): main 函数用它来等待所有项目都被处理。它会一直阻塞,直到队列的内部计数器归零。这确保了我们在程序结束前,所有工作都已完成。
  5. 任务取消: 因为消费者通常在一个无限循环中工作,当所有工作完成后,我们需要显式地取消它们,否则 asyncio.run(main()) 将永远不会退出。

7. 总结

asyncio 为 Python 带来了强大的并发能力,是构建高性能网络应用和服务的利器。

核心要点回顾:

  • 适用场景:I/O 密集型任务(如网络爬虫、Web 服务器、数据库连接等)。
  • 核心语法async def 定义协程,await 暂停协程并等待结果。
  • 启动方式asyncio.run() 是现代 Python 中启动异步程序的标准方式。
  • 并发执行:使用 asyncio.gather()asyncio.create_task() 来并发运行多个任务。
  • 同步与通信:使用 asyncio.Lock 保护共享资源,避免竞争条件;使用 asyncio.Queue 构建生产者-消费者模型,高效地分发和处理任务。
  • 生态系统:需要配合 aiohttp, aiodns, asyncpg 等异步库才能发挥最大威力。

从 Python 3.4 首次引入 asyncio 至今,它已经变得越来越成熟和易用。虽然异步编程的思维方式需要一些时间来适应,但一旦你掌握了它,它将成为你工具箱中应对高并发挑战的一把“瑞士军刀”。希望这篇博客能为你打开异步编程的大门。


参考文章

  1. asyncio 教程- 什么是异步? - Graia 官方文档
  2. Python asyncio 模块 - 菜鸟教程
  3. Asyncio in Python: A Comprehensive Guide with Examples. | by Obafemi - Medium
  4. 使用asyncio - python并发编程-中文版
  5. Python asyncio 從不會到上路 - MyApollo
  6. Solve Common Asynchronous Scenarios With Python’s “asyncio” - Better Programming
  7. Coroutines and Tasks — Python 3.13.5 documentation
  8. 使用asyncio - Python教程- 廖雪峰的官方网站
  9. Is it more efficient to use create_task(), or gather()? - Stack Overflow
  10. python asyncio 异步I/O - 实现并发http请求(asyncio + aiohttp) - yuminhu - 博客园
  11. python asyncio 异步I/O - 实现并发http请求(asyncio + aiohttp) - 上海-悠悠- 博客园
  12. asyncio教程原创 - CSDN博客
http://www.dtcms.com/a/272682.html

相关文章:

  • Arc Institute提出首个AIVC虚拟细胞模型STATE
  • 上海交大医学院张维拓老师赴同济医院做R语言训练营培训
  • 从Debug中学习MiniGPT4
  • 在Vue中如何对组件进行销毁在进行挂载
  • 模型训练之数据标注-Labelme的使用教程
  • 5款工具高效制作插图,PPT设计新选择!
  • 货车车架和悬架设计cad【7张】+设计说明书
  • leetcode 3440. 重新安排会议得到最多空余时间 II 中等
  • 《PyQt6-3D:开启Python 3D编程新世界 2》
  • 【TCP/IP】8. 传输层协议
  • hive小文件问题
  • 二层环路避免-STP技术
  • Linux【大数据运维】下制作Redis绿色免安装包(一)
  • 企业网络安全的“金字塔”策略:构建全方位防护体系的核心思路
  • upload-labs靶场通关详解:第20关 /.绕过
  • 以下哪种类型在Golang中不是内置类型?
  • zookeeper etcd区别
  • Keepalived+LVS实现LNMP网站的高可用部署
  • 登录为图片验证时,selenium通过token直接进入页面操作
  • Java 导出word 实现饼状图导出--可编辑数据
  • CIEDE2000 色差公式C++及MATLAB实现
  • 【零基础学AI】第35讲:策略梯度方法 - 连续控制任务实战
  • Swift 图论实战:DFS 算法解锁 LeetCode 323 连通分量个数
  • 快速搭建服务器,fetch请求从服务器获取数据
  • ReentrantLock 与 Synchronized 的区别
  • 给MySQL做定时备份,一天3次
  • method_name字段是什么
  • 单片机基础(STM32-DAY2(GPIO))
  • Linux驱动06 --- UDP
  • 飞书AI技术体系