当前位置: 首页 > news >正文

Python 异步编程从基础到高级全面指南

Python异步编程从基础到高级全面指南

一、异步编程基础概念
1. 同步 vs 异步:为什么需要异步?
  • 同步编程:代码按顺序执行,前一个任务完成后才会执行下一个,IO操作(如网络请求、文件读写)会阻塞线程。
  • 异步编程:允许程序在等待IO操作时执行其他任务,通过事件循环(Event Loop)调度协程,提高并发效率。
  • 适用场景:高IO密集型任务(如网络爬虫、API服务、实时数据处理),但不适合CPU密集型任务(需用多进程)。
2. 核心概念:协程、事件循环、Future
  • 协程(Coroutine):用户态的轻量级线程,通过async/await定义和调用,比线程更轻量(无需系统调度)。
  • 事件循环(Event Loop):异步编程的核心,负责调度协程,监控IO事件,决定何时执行协程。
  • Future:表示一个异步操作的最终结果,与协程配合使用,可理解为“未完成的任务”。
  • Task:对Future的封装,用于管理协程的执行状态(运行、暂停、完成)。
二、Python异步编程基础:asyncio库
1. 基础语法:async/await
import asyncio# 定义异步函数(协程)
async def hello_world():print("Hello")# 模拟IO操作,await会暂停当前协程,让事件循环处理其他任务await asyncio.sleep(1)print("World")# 运行协程的方式1:通过事件循环
loop = asyncio.get_event_loop()
loop.run_until_complete(hello_world())  # 输出:Hello (1秒后) World# 运行协程的方式2:Python 3.7+新语法
asyncio.run(hello_world())  # 效果同上,更简洁
2. 并发执行多个协程:asyncio.gather
async def task(n):await asyncio.sleep(0.1)return f"Task {n} completed"async def main():# 并发执行3个任务results = await asyncio.gather(task(1),task(2),task(3))print(results)  # 输出:['Task 1 completed', 'Task 2 completed', 'Task 3 completed']asyncio.run(main())
3. 创建任务:asyncio.create_task(Python 3.7+)
async def task(n):await asyncio.sleep(0.1)return n * 2async def main():tasks = []for i in range(3):# 创建任务并添加到列表tasks.append(asyncio.create_task(task(i)))# 等待所有任务完成并获取结果for task in tasks:print(await task)  # 输出:0, 2, 4(顺序可能不同)asyncio.run(main())
三、异步编程核心组件详解
1. 事件循环(Event Loop)的底层操作
async def coro():print("Start")await asyncio.sleep(1)print("End")# 手动操作事件循环(了解原理用,实际开发推荐asyncio.run)
loop = asyncio.new_event_loop()
try:# 方式1:直接运行协程loop.run_until_complete(coro())# 方式2:先创建任务,再运行task = loop.create_task(coro())loop.run_until_complete(task)
finally:loop.close()  # 关闭事件循环
2. Future:异步操作的结果容器
async def set_future(future):await asyncio.sleep(1)future.set_result("Future is done!")async def main():future = asyncio.Future()# 启动协程设置Future的结果asyncio.create_task(set_future(future))# 等待Future完成并获取结果result = await futureprint(result)  # 输出:Future is done!asyncio.run(main())
3. 异步迭代器与异步生成器
# 异步迭代器:支持async for循环
class AsyncIterator:def __init__(self, max_num):self.num = 0self.max_num = max_numdef __aiter__(self):return selfasync def __anext__(self):if self.num >= self.max_num:raise StopAsyncIterationawait asyncio.sleep(0.1)  # 模拟异步操作self.num += 1return self.num# 异步生成器:用async def + yield
async def async_generator(max_num):for i in range(max_num):await asyncio.sleep(0.1)yield i * 2async def main():# 使用异步迭代器async for item in AsyncIterator(3):print(item)  # 输出:1, 2, 3(间隔0.1秒)# 使用异步生成器async for item in async_generator(3):print(item)  # 输出:0, 2, 4(间隔0.1秒)asyncio.run(main())
四、异步IO实战:网络与文件操作
1. 异步网络请求:aiohttp库
# 安装aiohttp
pip install aiohttp
import aiohttp
import asyncioasync def fetch(url):async with aiohttp.ClientSession() as session:async with session.get(url) as response:if response.status == 200:return await response.text()  # 异步读取响应内容async def main():urls = ["https://api.github.com","https://example.com","https://python.org"]tasks = [fetch(url) for url in urls]results = await asyncio.gather(*tasks)for url, result in zip(urls, results):print(f"{url} - Length: {len(result)}")asyncio.run(main())
2. 异步文件操作:asyncio.open
import asyncioasync def read_file(path):async with asyncio.open(path, 'r') as f:content = await f.read()return contentasync def write_file(path, content):async with asyncio.open(path, 'w') as f:await f.write(content)async def main():# 异步读取文件content = await read_file("data.txt")# 异步处理内容(假设是IO密集型操作)processed = content.upper()# 异步写入文件await write_file("data_processed.txt", processed)asyncio.run(main())
3. 异步数据库操作:asyncpg(以PostgreSQL为例)
pip install asyncpg
import asyncpgasync def main():# 连接数据库conn = await asyncpg.connect(host="localhost",port=5432,user="user",password="password",database="dbname")try:# 异步执行SQLrows = await conn.fetch("SELECT * FROM users WHERE age > $1", 30)for row in rows:print(row)finally:# 关闭连接await conn.close()asyncio.run(main())
五、高级主题:任务管理与性能优化
1. 任务异常处理
async def risky_task(n):if n == 2:raise ValueError(f"Task {n} failed")await asyncio.sleep(0.1)return n * 2async def main():tasks = [asyncio.create_task(risky_task(i)) for i in range(3)]# 方式1:等待所有任务,捕获异常try:results = await asyncio.gather(*tasks, return_exceptions=True)for i, result in enumerate(results):if isinstance(result, Exception):print(f"Task {i} error: {result}")else:print(f"Task {i} result: {result}")except Exception as e:print(f"Global error: {e}")asyncio.run(main())
2. 超时控制
async def slow_task():await asyncio.sleep(2)return "Done"async def main():task = asyncio.create_task(slow_task())try:# 等待1秒,超时则抛出异常result = await asyncio.wait_for(task, timeout=1)print(result)except asyncio.TimeoutError:print("Task timed out")# 取消任务task.cancel()try:# 等待任务取消(可选)await taskexcept asyncio.CancelledError:print("Task was cancelled")asyncio.run(main())
3. 同步与异步的转换:run_in_executor
import time
from concurrent.futures import ThreadPoolExecutor# 同步函数(模拟CPU密集型操作)
def sync_function(n):time.sleep(1)return n * 2async def main():# 创建线程池 executorwith ThreadPoolExecutor() as executor:loop = asyncio.get_running_loop()# 方式1:在默认 executor 中运行result1 = await loop.run_in_executor(None, lambda: sync_function(1))# 方式2:在线程池 executor 中运行result2 = await loop.run_in_executor(executor, lambda: sync_function(2))print(result1, result2)  # 输出:2 4(同时执行,总耗时约1秒)asyncio.run(main())
4. 自定义事件循环策略
import asyncio
import platformasync def main():# 根据系统选择事件循环策略if platform.system() == "Windows":# Windows 10+ 推荐使用 ProactorEventLoopasyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())else:# Linux/macOS 使用默认策略asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy())# 创建并运行事件循环loop = asyncio.new_event_loop()try:asyncio.set_event_loop(loop)# 执行其他异步任务...finally:loop.close()# 运行主函数
asyncio.run(main())
六、异步编程框架与实战应用
1. Web框架:FastAPI(基于异步)
pip install fastapi uvicorn
from fastapi import FastAPI
import asyncioapp = FastAPI()# 异步API路由
@app.get("/async-task")
async def async_endpoint():# 模拟异步操作await asyncio.sleep(1)return {"message": "Async task completed"}# 同步API路由(建议避免,可能阻塞事件循环)
@app.get("/sync-task")
def sync_endpoint():import timetime.sleep(1)return {"message": "Sync task completed"}# 启动服务
# uvicorn main:app --host 0.0.0.0 --port 8000
2. 异步爬虫实战
import aiohttp
import asyncio
from bs4 import BeautifulSoupasync def fetch(url, session):async with session.get(url) as response:return await response.text()async def parse_html(html, url):soup = BeautifulSoup(html, 'html.parser')title = soup.title.text if soup.title else "No title"return f"{url} - {title}"async def crawl(urls):async with aiohttp.ClientSession() as session:tasks = [fetch(url, session) for url in urls]htmls = await asyncio.gather(*tasks)parse_tasks = [parse_html(html, url) for html, url in zip(htmls, urls)]results = await asyncio.gather(*parse_tasks)return resultsasync def main():urls = ["https://python.org","https://github.com","https://fastapi.tiangolo.com"]results = await crawl(urls)for result in results:print(result)asyncio.run(main())
七、异步编程陷阱与最佳实践
1. 常见陷阱
  • 在异步函数中调用同步阻塞函数:如time.sleep()会阻塞事件循环,需用asyncio.sleep()替代。
  • 忘记await:直接调用协程(如coro())不会执行,需用await coro()或包装为任务。
  • 全局事件循环关闭后复用loop.close()后不可再使用,需重新创建。
  • 大量任务创建导致内存问题:合理控制并发量,使用asyncio.Semaphore限制并发数。
2. 最佳实践
  • 使用asyncio.run():Python 3.7+推荐的运行方式,自动管理事件循环的生命周期。
  • 分离IO与CPU任务:IO密集型用异步,CPU密集型用多进程+run_in_executor
  • 错误处理:使用try/except捕获异常,或在asyncio.gather()中设置return_exceptions=True
  • 性能监控:使用asyncio.Profiler或第三方库(如dowser)分析异步程序性能。
  • 代码结构:将异步逻辑封装为函数,避免过长的协程,保持代码可读性。
八、进阶学习资源
  1. 官方文档

    • asyncio官方文档
    • PEP 492: Coroutines with async and await syntax
  2. 书籍推荐

    • 《Python异步编程实战》—— 详细讲解asyncio及实战案例。
    • 《流畅的Python》—— 第18章深入讲解协程与异步编程。
  3. 实战项目

    • aiohttp:异步HTTP客户端/服务器库。
    • FastAPI:高性能异步Web框架。
    • asyncpg:异步PostgreSQL驱动。
  4. 视频教程

    • Python Asyncio Tutorial - Complete Walkthrough
    • High Performance Python with Asyncio

通过以上从基础到高级的讲解,你可以系统掌握Python异步编程的核心概念、语法和实战技巧。异步编程是处理高并发IO任务的利器,熟练掌握后能显著提升程序性能和响应速度。建议从简单案例入手,逐步尝试复杂场景,在实践中加深理解。

http://www.dtcms.com/a/269082.html

相关文章:

  • 模拟数字电路基础-2
  • 初识Neo4j之Cypher(三)
  • leetcode1089.复写零
  • 代码审计-SQL注入
  • 简单的安卓ANR与卡顿分析
  • 要将本地分支强制更新为与远程分支完全一致(以远程为主
  • c++文字游戏_闯关打怪2.0(开源)
  • paimon.disk包:磁盘处理
  • 关于Novatek B/G-R/G白平衡色温坐标系再探究
  • 谢飞机的Java高级开发面试:从Spring Boot到分布式架构的蜕变之旅
  • 安卓10.0系统修改定制化____如何修改ROM 实现开机自动开启开发者选项与隐藏开发者选项
  • 基于区块链的电子签署系统的设计与实现(源码+文档+部署讲解)
  • da y54
  • LED 闪烁 LED 流水灯 蜂鸣器
  • IROS 2025|RL vs MPC性能对比:加州理工无人机实测,谁在「变形控制」中更胜一筹?
  • pg_class 系统表信息
  • React + Express 传输加密以及不可逆加密
  • OpenCV人脸分析------绘制面部关键点函数drawFacemarks()
  • day08-Elasticsearch
  • MinIO与SpringBoot集成完整指南
  • maven 发布到中央仓库常用脚本-02
  • 视频序列和射频信号多模态融合算法Fusion-Vital解读
  • 力扣 hot100 Day37
  • C++笔记之和的区别
  • Isaac Lab:让机器人学习更简单的开源框架
  • Go defer(二):从汇编的角度理解延迟调用的实现
  • RAG实战指南 Day 8:PDF、Word和HTML文档解析实战
  • Stirling-PDF 本地化部署,建立自己的专属PDF工具箱
  • 力扣_链表(前后指针)_python版本
  • 虚幻引擎UE5 GAS开发RPG游戏-02 设置英雄角色-18 改成网络多人游戏