深入理解Python协程:asyncio、异步并发、事件循环
概念解析
异步编程是一种
允许程序在等待 I/O 操作
(如网络请求、文件读写)时不被阻塞
,转而执行其他任务的编程范式。
在 Python 中,其核心实现概念如下:
协程(Coroutines)
- 定义:可暂停执行并在合适时机恢复的函数,通过
async def
关键字声明。- 特点:非抢占式调度,协程主动通过
await
让出执行权,适用于处理高并发I/O
场景。
事件循环(Event Loop)
- 定义:异步编程的 “调度器”,负责管理协程的执行顺序、监控 I/O 事件状态,并在事件就绪时恢复对应协程。
- 核心逻辑:循环检查可执行的协程任务,按事件触发顺序调度执行。
任务(Tasks)
- 定义:对协程的封装,代表一个独立的异步操作单元,通过
asyncio.create_task()
创建。- 功能:支持取消任务、等待任务完成(
await task
)或获取任务状态。
Future 对象
- 定义:表示一个尚未完成的异步操作结果,本质是协程间传递状态的载体。
- 作用:当协程需要等待某个异步操作的结果时,可通过
await Future
暂停执行,待操作完成后获取结果。
await 表达式
- 定义:协程中用于暂停执行的关键字,用于等待另一个协程、Task 或 Future 对象的完成。
- 示例:
result = await async_function()
,表示暂停当前协程,直到async_function
执行完毕并返回结果。
应用场景
异步编程尤其
适用于I/O 密集型任务
(如 HTTP 请求、数据库查询、文件操作等)。
通过减少线程切换开销和 CPU 闲置时间,显著提升程序的并发处理能力。
相比多线程编程,异步编程在高并发场景下更轻量、更高效。
asyncio库详解
Python 3.4
引入了asyncio
库,作为异步编程的核心组件,事件循环是asyncio
的核心。
在Windows
上使用ProactorEventLoop
,在Unix
上使用SelectorEventLoop
。
Unix系统(SelectorEventLoop)
- 基于
selectors 模块对底层 I/O 多路复用机制
(如select、poll、epoll、kqueue)的抽象。 - 采用 “就绪通知” 机制:监视文件描述符(如套接字)状态,当
I/O
操作就绪时(如可读 / 可写)通知应用程序。 - 优势场景1:擅长处理大量并发连接(如
Linux
的epoll机
制支持高效事件驱动)。 - 优势场景2:适用于网络服务器、高并发
I/O
场景。
Windows系统(ProactorEventLoop)
- 基于
Windows 专有 I/O 完成端口(IOCP)机制
,属于系统级异步I/O
框架。 - 采用 “完成通知” 机制:异步启动
I/O
操作,操作完成后由系统主动通知程序。 - 优势场景1:深度优化
Windows
平台特性,支持全类型异步I/O
操作(含文件I/O
)。 - 优势场景1:在
Windows
环境下性能通常优于SelectorEventLoop
。
两种事件循环的关键差异
维度 | SelectorEventLoop(Unix) | ProactorEventLoop(Windows) |
---|---|---|
通知机制 | 等待 I/O 就绪后执行操作(“询问式”:Can I read/write?) | 异步启动 I/O 操作,完成后被动接收通知(“回调式”:Notify when done) |
API 覆盖范围 | 部分平台可能不支持全类型文件 I/O 操作 | 原生支持 Windows 所有异步 I/O 操作(如管道、套接字、文件) |
性能特点 | Unix 系统下,依赖epoll/kqueue 等机制实现高效并发 | Windows 下利用 IOCP 机制实现低延迟、高吞吐量 |
事件循环管理
import asyncio# 获取事件循环
loop = asyncio.get_event_loop()# 运行协程直到完成
loop.run_until_complete(my_coroutine())# 运行事件循环直到stop()被调用
loop.run_forever()# 关闭事件循环
loop.close()
协程定义与执行
async def fetch_data(url):print(f"开始获取数据: {url}")await asyncio.sleep(2) # 模拟I/O操作print(f"数据获取完成: {url}")return f"来自 {url} 的数据"# Python 3.7+ 推荐方式
async def main():result = await fetch_data("example.com")print(result)asyncio.run(main()) # Python 3.7+引入,简化了事件循环管理
任务创建与管理
async def main():# 创建任务task1 = asyncio.create_task(fetch_data("site1.com"))task2 = asyncio.create_task(fetch_data("site2.com"))# 等待所有任务完成results = await asyncio.gather(task1, task2)print(results)# 并发运行多个协程results = await asyncio.gather(fetch_data("site3.com"),fetch_data("site4.com"))
超时管理
async def main():try:# 设置超时result = await asyncio.wait_for(fetch_data("example.com"), timeout=1.0)except asyncio.TimeoutError:print("操作超时")
同步与异步代码结合
import concurrent.futuresdef cpu_bound_task(x):# 计算密集型任务return x * xasync def main():# 使用线程池执行阻塞I/Oloop = asyncio.get_running_loop()with concurrent.futures.ThreadPoolExecutor() as pool:result = await loop.run_in_executor(pool, cpu_bound_task, 10)print(result)
高并发场景实战案例
案例1: 并发网络请求
import asyncio
import aiohttp
import timeasync def fetch(session, url):async with session.get(url) as response:return await response.text()async def fetch_all(urls):async with aiohttp.ClientSession() as session:tasks = [fetch(session, url) for url in urls]results = await asyncio.gather(*tasks)return results# 测试URLs
urls = ["https://www.google.com","https://www.github.com","https://www.python.org",# 可添加更多URL
] * 5 # 重复请求以增加数量async def main():start = time.time()results = await fetch_all(urls)end = time.time()print(f"获取了 {len(results)} 个页面,耗时: {end - start:.2f} 秒")# 运行
asyncio.run(main())
案例2: 异步数据库操作
使用
asyncpg
进行PostgreSQL
异步操作
import asyncio
import asyncpgasync def create_tables(conn):await conn.execute('''CREATE TABLE IF NOT EXISTS users(id SERIAL PRIMARY KEY,name TEXT,email TEXT)''')async def insert_users(conn, users):# 批量插入await conn.executemany('INSERT INTO users(name, email) VALUES($1, $2)',users)async def fetch_users(conn):return await conn.fetch('SELECT * FROM users')async def main():# 连接数据库conn = await asyncpg.connect(user='postgres',password='password',database='testdb',host='127.0.0.1')# 创建表await create_tables(conn)# 生成测试数据test_users = [('User1', 'user1@example.com'),('User2', 'user2@example.com'),('User3', 'user3@example.com'),]# 插入数据await insert_users(conn, test_users)# 查询数据users = await fetch_users(conn)for user in users:print(f"ID: {user['id']}, Name: {user['name']}, Email: {user['email']}")# 关闭连接await conn.close()# 运行
asyncio.run(main())
案例3: 异步Web爬虫
import asyncio
import aiohttp
from bs4 import BeautifulSoup
import timeasync def fetch(session, url):async with session.get(url) as response:return await response.text()async def parse(html):# 使用BeautifulSoup解析HTMLsoup = BeautifulSoup(html, 'html.parser')# 获取所有链接links = [a.get('href') for a in soup.find_all('a') if a.get('href')]return linksasync def crawl(url, max_depth=2):visited = set()async def _crawl(current_url, depth):if depth > max_depth or current_url in visited:returnvisited.add(current_url)print(f"正在爬取: {current_url}")try:async with aiohttp.ClientSession() as session:html = await fetch(session, current_url)links = await parse(html)# 过滤出同域名链接base_url = '/'.join(current_url.split('/')[:3])same_domain_links = [link if link.startswith('http') else f"{base_url}{link}"for link in links if link and (link.startswith('http') or link.startswith('/'))]# 创建子任务继续爬取tasks = [_crawl(link, depth + 1) for link in same_domain_links[:5] # 限制每页最多爬5个链接]await asyncio.gather(*tasks)except Exception as e:print(f"爬取 {current_url} 出错: {e}")await _crawl(url, 0)return visitedasync def main():start = time.time()visited = await crawl("https://python.org", max_depth=1)end = time.time()print(f"爬取了 {len(visited)} 个页面,耗时: {end - start:.2f} 秒")# 运行
asyncio.run(main())
案例4: 异步API服务器处理大量并发请求
使用
FastAPI
构建高并发API
服务
from fastapi import FastAPI, BackgroundTasks
import asyncio
import uvicorn
import time
import randomapp = FastAPI()# 模拟数据库
db = {}# 模拟异步数据库操作
async def db_operation(key, delay=None):if delay is None:delay = random.uniform(0.1, 0.5) # 随机延迟模拟真实场景await asyncio.sleep(delay)return db.get(key)# 模拟耗时任务
async def process_item(item_id):print(f"开始处理项目 {item_id}")await asyncio.sleep(5) # 模拟耗时操作print(f"项目 {item_id} 处理完成")return {"item_id": item_id, "status": "processed"}# 常规端点
@app.get("/items/{item_id}")
async def read_item(item_id: str):result = await db_operation(item_id)return {"item_id": item_id, "value": result}# 批量操作端点
@app.get("/batch")
async def batch_operation(items: str):item_ids = items.split(",")tasks = [db_operation(item_id) for item_id in item_ids]results = await asyncio.gather(*tasks)return dict(zip(item_ids, results))# 后台任务
@app.post("/items/{item_id}/process")
async def process(item_id: str, background_tasks: BackgroundTasks):background_tasks.add_task(process_item, item_id)return {"message": f"Processing item {item_id} in the background"}# 负载测试端点
@app.get("/load-test/{count}")
async def load_test(count: int):start = time.time()# 创建多个并发任务tasks = []for i in range(count):# 随机延迟delay = random.uniform(0.1, 0.5)tasks.append(asyncio.sleep(delay))# 并发执行所有任务await asyncio.gather(*tasks)end = time.time()return {"tasks_completed": count,"time_taken": f"{end - start:.2f} 秒","tasks_per_second": f"{count / (end - start):.2f}"}# 初始化一些测试数据
@app.on_event("startup")
async def startup_event():for i in range(1000):db[str(i)] = f"value-{i}"if __name__ == "__main__":uvicorn.run(app, host="0.0.0.0", port=8000)