Python 中的异步编程:从基础到实战
Python 中的异步编程:从基础到实战
一、异步编程的核心概念
1. 同步 vs 异步
- 同步编程:代码按顺序逐行执行,前一个任务完成才能执行下一个(如传统HTTP请求)
- 异步编程:允许在等待I/O操作(如网络请求、文件读写)时切换执行其他任务,提高资源利用率
2. 核心组件
- 协程(Coroutine):可暂停/恢复的函数,使用
"async def"定义
- 事件循环(Event Loop):调度协程执行的中央控制器
- Future/Task:表示异步操作最终结果的对象
二、基础语法与实现
1. 协程定义与调用
import asyncio
# 定义协程函数
async def say_hello(name):
print(f"准备问候 {name}...")
await asyncio.sleep(1) # 模拟I/O操作
print(f"Hello, {name}!")
# 调用协程
async def main():
await say_hello("World") # 正确方式:在协程内await
# 运行事件循环
asyncio.run(main()) # Python 3.7+
2. 并发执行多个协程
async def fetch_data(id, delay):
print(f"任务 {id} 开始获取数据")
await asyncio.sleep(delay)
print(f"任务 {id} 完成,耗时 {delay}秒")
return f"数据-{id}"
async def main():
# 方式1:asyncio.gather (推荐)
results = await asyncio.gather(
fetch_data(1, 2),
fetch_data(2, 1),
fetch_data(3, 3)
)
print("所有结果:", results)
# 方式2:asyncio.create_task (更灵活)
task1 = asyncio.create_task(fetch_data(1, 2))
task2 = asyncio.create_task(fetch_data(2, 1))
await task1
await task2
asyncio.run(main())
三、高级应用场景
1. 超时控制
async def long_running_task():
try:
async with asyncio.timeout(2): # Python 3.11+
await asyncio.sleep(3)
print("任务完成")
except TimeoutError:
print("任务超时被取消")
asyncio.run(long_running_task())
2. 任务取消
async def cancellable_task():
try:
while True:
print("任务运行中...")
await asyncio.sleep(1)
except asyncio.CancelledError:
print("任务被取消")
raise
async def main():
task = asyncio.create_task(cancellable_task())
await asyncio.sleep(3)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("主程序捕获到任务取消")
asyncio.run(main())
3. 异步上下文管理器
class AsyncResource:
async def __aenter__(self):
print("获取资源")
await asyncio.sleep(1)
return self
async def __aexit__(self, exc_type, exc, tb):
print("释放资源")
await asyncio.sleep(0.5)
async def use_resource():
async with AsyncResource() as resource:
print("正在使用资源...")
await asyncio.sleep(1)
asyncio.run(use_resource())
四、实际项目案例
1. 异步Web爬虫
import aiohttp
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.text()
async def crawl_website(urls):
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
for url, content in zip(urls, results):
print(f"{url} 获取到 {len(content)} 字节")
urls = [
"https://www.example.com",
"https://www.python.org",
"https://www.github.com"
]
asyncio.run(crawl_website(urls))
2. 异步数据库操作
import asyncpg
async def query_database():
conn = await asyncpg.connect(
user='user', password='pass',
database='db', host='localhost'
)
try:
# 并发执行多个查询
user_task = conn.fetch('SELECT * FROM users WHERE id = $1', 1)
product_task = conn.fetch('SELECT * FROM products LIMIT 5')
user, products = await asyncio.gather(user_task, product_task)
print(f"用户: {user}, 商品数: {len(products)}")
finally:
await conn.close()
asyncio.run(query_database())
五、调试与性能优化
1. 常见问题排查
- 忘记await:协程未被正确调度(检查所有协程调用是否都有await)
- 阻塞事件循环:在协程中使用同步I/O操作(如time.sleep())
- 任务泄漏:创建的任务未被正确await或取消
2. 性能优化技巧
1. 批量处理:使用
"asyncio.gather"并发执行独立任务
2. 限制并发数:使用信号量控制最大并发
semaphore = asyncio.Semaphore(5) # 最大5个并发
async def limited_task(id):
async with semaphore:
print(f"任务 {id} 开始")
await asyncio.sleep(1)
print(f"任务 {id} 完成")
3. 合理设置超时:避免单个任务阻塞整个系统
六、学习路径建议
1. 入门阶段:
- 掌握
"async/await"基本语法
- 理解事件循环工作原理
- 练习简单的并发任务
2. 进阶阶段:
- 学习
"asyncio"高级API(Queue、Lock等)
- 实现异步上下文管理器
- 处理异常和任务取消
3. 生产环境:
- 结合aiohttp、asyncpg等异步库
- 实现完整的异步服务架构
- 进行性能监控和优化
异步编程是现代Python开发的重要技能,特别适合I/O密集型应用。通过合理使用可以显著提升程序性能,但需要注意避免常见陷阱,建议从简单场景开始逐步深入。
