Python异步编程:asyncio.create_task() 用法解析
asyncio.create_task()
是Python 3.7+引入的函数,用于创建一个Task对象。Task是对协程的封装,可以让协程在事件循环中并发执行。
1. 基本语法
task = asyncio.create_task(coro)
coro
: 要包装的协程对象- 返回值: Task对象
2. 基础用法示例
2.1 简单的任务创建
import asyncio
import timeasync def say_hello(name, delay):print(f"开始执行 {name}")await asyncio.sleep(delay)print(f"Hello, {name}!")return f"完成 {name}"async def main():# 创建任务task1 = asyncio.create_task(say_hello("Alice", 2))task2 = asyncio.create_task(say_hello("Bob", 1))# 等待任务完成result1 = await task1result2 = await task2print(result1)print(result2)# 运行主函数
asyncio.run(main())
2.2 多个任务并发执行
import asyncioasync def fetch_data(url, delay):print(f"开始获取 {url}")await asyncio.sleep(delay)return f"数据来自 {url}"async def main():# 创建多个任务tasks = [asyncio.create_task(fetch_data("https://api1.com", 1)),asyncio.create_task(fetch_data("https://api2.com", 2)),asyncio.create_task(fetch_data("https://api3.com", 1.5))]# 等待所有任务完成results = await asyncio.gather(*tasks)for result in results:print(result)asyncio.run(main())
3. 高级用法
3.1 任务的取消
import asyncioasync def long_running_task():try:for i in range(10):print(f"任务执行中... {i}")await asyncio.sleep(1)return "任务完成"except asyncio.CancelledError:print("任务被取消")raiseasync def main():task = asyncio.create_task(long_running_task())# 3秒后取消任务await asyncio.sleep(3)task.cancel()try:await taskexcept asyncio.CancelledError:print("捕获到取消异常")asyncio.run(main())
3.2 任务状态检查
import asyncioasync def sample_task():await asyncio.sleep(2)return "完成"async def main():task = asyncio.create_task(sample_task())print(f"任务创建后 - 完成: {task.done()}")await asyncio.sleep(1)print(f"等待1秒后 - 完成: {task.done()}")await taskprint(f"任务完成后 - 完成: {task.done()}")print(f"任务结果: {task.result()}")asyncio.run(main())
3.3 异常处理
import asyncioasync def task_with_exception():await asyncio.sleep(1)raise ValueError("这是一个错误")async def normal_task():await asyncio.sleep(2)return "正常完成"async def main():task1 = asyncio.create_task(task_with_exception())task2 = asyncio.create_task(normal_task())try:# 使用 gather 并设置 return_exceptions=True 来处理异常results = await asyncio.gather(task1, task2, return_exceptions=True)for i, result in enumerate(results):if isinstance(result, Exception):print(f"任务 {i} 发生异常: {result}")else:print(f"任务 {i} 结果: {result}")except Exception as e:print(f"捕获异常: {e}")asyncio.run(main())
4. 实际应用场景
4.1 并发HTTP请求
示例一:
import asyncio
import aiohttpasync def fetch_url(session, url):try:async with session.get(url) as response:return await response.text()except Exception as e:return f"错误: {e}"async def fetch_multiple_urls():urls = ['https://httpbin.org/delay/1','https://httpbin.org/delay/2','https://httpbin.org/delay/1']async with aiohttp.ClientSession() as session:# 创建任务列表tasks = [asyncio.create_task(fetch_url(session, url)) for url in urls]# 并发执行所有任务results = await asyncio.gather(*tasks)for i, result in enumerate(results):print(f"URL {i+1}: {len(result) if isinstance(result, str) else result} 字符")# 注意:需要安装 aiohttp: pip install aiohttp
# asyncio.run(fetch_multiple_urls())
示例二:
import asyncio
import aiohttpasync def process_http_request():# 使用aiohttp进行异步HTTP请求timeout = aiohttp.ClientTimeout(total=1000)async with aiohttp.ClientSession(timeout=timeout) as session:headers = {}data = {}url = ""is_error = Falsetry:print(f"HTTP异步请求开始,data: {data} ")async with session.post(url=url,headers=headers,json=data,timeout=timeout # 为单个请求设置超时) as response:if response.status == 200:# 获取响应数据response_data = await response.json()if response_data['data']['status'] != 200:print(f"HTTP异步请求处理失败: {response_data} ")is_error = Trueelse:print(f"HTTP异步请求处理失败,状态码: {response.status}")is_error = Trueexcept asyncio.TimeoutError:print(f"HTTP异步请求超时!")is_error = Trueprint(f"HTTP异步请求完成")return is_error
4.2 数据库操作并发
import asyncio
# import asyncpg # 示例使用 asyncpgasync def query_database(query_id):# 模拟数据库查询await asyncio.sleep(1) # 模拟查询时间return f"查询结果 {query_id}"async def concurrent_queries():# 创建多个查询任务tasks = [asyncio.create_task(query_database(i)) for i in range(5)]# 并发执行查询results = await asyncio.gather(*tasks)for result in results:print(result)asyncio.run(concurrent_queries())
create_task() vs ensure_future()
在Python 3.7之前,我们使用 asyncio.ensure_future()
来创建任务:
# 旧方式(仍然可用)
task = asyncio.ensure_future(coro)# 新方式(推荐)
task = asyncio.create_task(coro)
两者的主要区别:
create_task()
更明确地表示创建一个任务ensure_future()
功能更广泛,可以处理多种类型的awaitable对象- 在现代Python中,推荐使用
create_task()
5. 最佳实践
5.1 合理使用任务
import asyncioasync def good_practice():# 好的做法:明确创建任务task1 = asyncio.create_task(some_coroutine())task2 = asyncio.create_task(another_coroutine())# 等待结果result1 = await task1result2 = await task2return result1, result2async def avoid_this():# 避免这样做:直接await协程不会并发执行result1 = await some_coroutine()result2 = await another_coroutine()return result1, result2
5.2 使用超时控制
import asyncioasync def slow_operation():await asyncio.sleep(5)return "完成"async def main():try:task = asyncio.create_task(slow_operation())# 设置3秒超时result = await asyncio.wait_for(task, timeout=3.0)print(result)except asyncio.TimeoutError:print("操作超时")# 取消任务task.cancel()asyncio.run(main())
6. 总结
asyncio.create_task()
是Python异步编程中的核心函数,学会使用能够:
- 并发执行协程:通过创建Task对象实现真正的并发
- 灵活控制任务:可以取消、检查状态、处理异常
- 提高程序性能:特别适合I/O密集型操作
- 简化异步代码:让异步编程更加直观和易用