如何使用asyncio库
asyncio简介
asyncio是Python的标准库,用于编写单线程并发代码,基于协程(coroutines)、事件循环(event loop)和非阻塞I/O操作。它适用于I/O密集型任务(如网络请求、文件读写),而非CPU密集型任务。
核心组件
事件循环(Event Loop)
事件循环是asyncio的核心,负责调度和执行协程任务。通过asyncio.run()
或直接管理事件循环对象启动。
协程(Coroutines)
使用async def
定义的函数称为协程,可通过await
暂停执行,等待其他协程或I/O操作完成。
任务(Tasks)
协程通过asyncio.create_task()
包装为任务,由事件循环调度执行。任务可追踪协程状态。
Future对象
低级可等待对象,表示异步操作的最终结果,通常由任务封装。
基本用法
定义协程
import asyncioasync def hello():print("Hello")await asyncio.sleep(1)print("World")
运行协程
asyncio.run(hello()) # Python 3.7+
并发执行任务
async def task_one():await asyncio.sleep(2)return "Task 1 done"async def task_two():await asyncio.sleep(1)return "Task 2 done"async def main():task1 = asyncio.create_task(task_one())task2 = asyncio.create_task(task_two())results = await asyncio.gather(task1, task2)print(results) # ['Task 1 done', 'Task 2 done']
高级功能
超时控制
async def long_running_task():await asyncio.sleep(10)async def main():try:await asyncio.wait_for(long_running_task(), timeout=5)except asyncio.TimeoutError:print("Task timed out")
队列(Queue)
生产者-消费者模式:
async def producer(queue):for i in range(5):await queue.put(i)await asyncio.sleep(1)async def consumer(queue):while True:item = await queue.get()print(f"Consumed: {item}")queue.task_done()async def main():queue = asyncio.Queue()await asyncio.gather(producer(queue), consumer(queue))
同步原语
如锁(Lock)、信号量(Semaphore):
lock = asyncio.Lock()async def safe_update():async with lock:# 临界区代码pass
常见问题
阻塞代码破坏并发
避免在协程中调用同步阻塞函数(如time.sleep()
),需使用asyncio.sleep()
或loop.run_in_executor()
。
事件循环策略
在Jupyter等环境中可能需要手动管理事件循环:
import nest_asyncio
nest_asyncio.apply()
适用场景
- 高并发网络请求(HTTP API、WebSockets)。
- 异步数据库驱动(如asyncpg、aiomysql)。
- 微服务间通信。
性能优化
- 使用
uvloop
替代默认事件循环(非Windows平台)。 - 避免频繁创建/销毁任务,复用连接池。
通过合理设计协程和任务调度,asyncio可显著提升I/O密集型应用的吞吐量。