Python asyncio库:基本概念与使用方法
Python的asyncio库是Python 3.4及以后版本引入的标准库,用于编写单线程并发代码,特别是基于协程的异步IO操作。本文将详细介绍asyncio的基本概念和使用方法,帮助你快速掌握Python异步编程。
1.核心概念
1. 协程(Coroutine)
协程是asyncio的核心概念,它是一种特殊的函数,可以在执行过程中暂停并恢复。在Python中,协程使用async def语法定义:
async def my_coroutine():print("协程开始")await asyncio.sleep(1) # 暂停执行,让出控制权print("协程结束")
2. await表达式
await是一个特殊的表达式,只能在协程内部使用。它用于暂停协程的执行,直到等待的对象(通常是另一个协程或Future)完成:
async def fetch_data():print("开始获取数据")await asyncio.sleep(2) # 模拟IO操作print("数据获取完成")return "data"async def process_data():
data = await fetch_data() # 等待fetch_data协程完成print(f"处理数据: {data}")
3. 事件循环(Event Loop)
事件循环是asyncio的调度器,负责管理和执行所有的异步任务。它不断从任务队列中取出任务并执行,当任务遇到await时会暂停,事件循环会继续执行其他任务:
import asyncioasync def main():print("Hello")await asyncio.sleep(1)print("World")# 创建并运行事件循环
asyncio.run(main()) # Python 3.7+
4. Future和Task
- Future:表示一个异步操作的未来结果,是一个占位符,当操作完成时,结果会被设置到这个Future中。
- Task:是Future的子类,专门用于包装协程。当协程被包装成Task后,会被事件循环调度执行。
async def my_task():
await asyncio.sleep(1)
return "Task result"async def main():
# 创建任务
task = asyncio.create_task(my_task()) # 可以继续执行其他操作
print("Doing other things...") # 等待任务完成
result = await task
print(f"Task result: {result}")
2.基本使用方法
1. 运行单个协程
最基本的用法是使用asyncio.run()来运行一个协程:
import asyncioasync def say_hello():print("Hello")await asyncio.sleep(1)print("World")# 运行协程
asyncio.run(say_hello())
2. 并发运行多个协程
使用asyncio.gather()可以并发运行多个协程,并等待所有协程完成:
async def task(name, delay):print(f"Task {name} started")await asyncio.sleep(delay)print(f"Task {name} completed")return delayasync def main():# 并发运行三个协程
results = await asyncio.gather(
task("A", 2),
task("B", 1),
task("C", 3))print(f"Results: {results}")asyncio.run(main())
3. 创建和管理任务
使用asyncio.create_task()可以显式地将协程包装成任务,并获得对任务的引用:
async def task(name, delay):print(f"Task {name} started")await asyncio.sleep(delay)print(f"Task {name} completed")return delayasync def main():# 创建任务
task1 = asyncio.create_task(task("A", 2))
task2 = asyncio.create_task(task("B", 1))# 可以在等待前执行其他操作print("Tasks created")# 等待任务完成
result1 = await task1
result2 = await task2print(f"Results: {result1}, {result2}")asyncio.run(main())
4. 异步IO操作
asyncio最常见的用途是执行异步IO操作,例如网络请求:
import asyncio
import aiohttpasync def fetch(url):async with aiohttp.ClientSession() as session:async with session.get(url) as response:print(f"Fetching {url}, status: {response.status}")return await response.text()async def main():
urls = ["https://www.example.com","https://www.python.org","https://www.github.com"]# 并发执行所有请求
tasks = [fetch(url) for url in urls]
htmls = await asyncio.gather(*tasks)for url, html in zip(urls, htmls):print(f"{url}: {len(html)} characters")asyncio.run(main())
5. 超时控制
使用asyncio.wait_for()可以为协程设置超时时间:
async def slow_operation():
await asyncio.sleep(10)
return "Done"async def main():
try:
# 设置超时时间为5秒
result = await asyncio.wait_for(slow_operation(), timeout=5)
print(f"Result: {result}")
except asyncio.TimeoutError:
print("操作超时")asyncio.run(main())
6. 异步迭代器和生成器
asyncio支持异步迭代器和生成器,允许在迭代过程中执行异步操作:
async def async_generator():
for i in range(3):
await asyncio.sleep(1)
yield iasync def main():
async for i in async_generator():
print(f"Received: {i}")asyncio.run(main())
7. 异步上下文管理器
异步上下文管理器允许在async with语句中使用,在进入和退出上下文时执行异步操作:
class AsyncContextManager:
async def __aenter__(self):
print("Entering async context")
await asyncio.sleep(1)
return self async def __aexit__(self, exc_type, exc, tb):
print("Exiting async context")
await asyncio.sleep(1) async def do_something(self):
print("Doing something async")async def main():
async with AsyncContextManager() as acm:
await acm.do_something()asyncio.run(main())
3.常见应用场景
1. 网络爬虫
异步编程非常适合网络爬虫,因为爬虫主要是IO密集型操作:
import asyncio
import aiohttpasync def fetch(url):async with aiohttp.ClientSession() as session:async with session.get(url) as response:return await response.text()async def main():
urls = ["https://www.example.com","https://www.python.org","https://www.github.com"] tasks = [fetch(url) for url in urls]
results = await asyncio.gather(*tasks)for url, html in zip(urls, results):print(f"{url}: {len(html)} characters")asyncio.run(main())
2. 实时数据处理
在需要实时处理大量数据流的场景中,异步编程可以高效地处理多个数据源的输入:
async def data_source(name, queue):
for i in range(5):
await queue.put((name, i))
await asyncio.sleep(0.1) # 模拟数据源产生数据的间隔async def processor(queue):
while True:
source, data = await queue.get()
print(f"Processing {data} from {source}")
queue.task_done()async def main():
queue = asyncio.Queue() # 创建多个数据源
sources = [
data_source("Source1", queue),
data_source("Source2", queue),
data_source("Source3", queue)
] # 创建处理器
processor_task = asyncio.create_task(processor(queue)) # 运行所有数据源
await asyncio.gather(*sources) # 等待队列中的所有任务完成
await queue.join() # 取消处理器任务
processor_task.cancel()asyncio.run(main())
3. Web服务器
异步编程可以构建高性能的Web服务器,能够同时处理大量并发请求:
from aiohttp import webasync def handle(request):
name = request.match_info.get('name', "Anonymous")return web.Response(text=f"Hello, {name}!")app = web.Application()
app.router.add_get('/', handle)
app.router.add_get('/{name}', handle)web.run_app(app)
4.最佳实践与注意事项
1. 避免阻塞操作:在异步代码中使用同步IO操作会阻塞整个事件循环
2. 正确处理异常:确保异步操作中的异常能够被捕获和处理
3. 控制并发数量:虽然异步可以处理大量并发,但也要避免无限制增加
4. 使用适当的工具:使用asyncio提供的工具,如Queue、Lock等进行同步
5. 测试异步代码:使用专门的异步测试框架,如pytest-asyncio
通过掌握asyncio的基本概念和使用方法,你可以编写出高效、并发的Python程序,特别适合处理IO密集型任务。