当前位置: 首页 > news >正文

Python异步编程:asyncio.create_task() 用法解析

asyncio.create_task() 是Python 3.7+引入的函数,用于创建一个Task对象。Task是对协程的封装,可以让协程在事件循环中并发执行。

1. 基本语法

task = asyncio.create_task(coro)
  • coro: 要包装的协程对象
  • 返回值: Task对象

2. 基础用法示例

2.1 简单的任务创建

import asyncio
import timeasync def say_hello(name, delay):print(f"开始执行 {name}")await asyncio.sleep(delay)print(f"Hello, {name}!")return f"完成 {name}"async def main():# 创建任务task1 = asyncio.create_task(say_hello("Alice", 2))task2 = asyncio.create_task(say_hello("Bob", 1))# 等待任务完成result1 = await task1result2 = await task2print(result1)print(result2)# 运行主函数
asyncio.run(main())

2.2 多个任务并发执行

import asyncioasync def fetch_data(url, delay):print(f"开始获取 {url}")await asyncio.sleep(delay)return f"数据来自 {url}"async def main():# 创建多个任务tasks = [asyncio.create_task(fetch_data("https://api1.com", 1)),asyncio.create_task(fetch_data("https://api2.com", 2)),asyncio.create_task(fetch_data("https://api3.com", 1.5))]# 等待所有任务完成results = await asyncio.gather(*tasks)for result in results:print(result)asyncio.run(main())

3. 高级用法

3.1 任务的取消

import asyncioasync def long_running_task():try:for i in range(10):print(f"任务执行中... {i}")await asyncio.sleep(1)return "任务完成"except asyncio.CancelledError:print("任务被取消")raiseasync def main():task = asyncio.create_task(long_running_task())# 3秒后取消任务await asyncio.sleep(3)task.cancel()try:await taskexcept asyncio.CancelledError:print("捕获到取消异常")asyncio.run(main())

3.2 任务状态检查

import asyncioasync def sample_task():await asyncio.sleep(2)return "完成"async def main():task = asyncio.create_task(sample_task())print(f"任务创建后 - 完成: {task.done()}")await asyncio.sleep(1)print(f"等待1秒后 - 完成: {task.done()}")await taskprint(f"任务完成后 - 完成: {task.done()}")print(f"任务结果: {task.result()}")asyncio.run(main())

3.3 异常处理

import asyncioasync def task_with_exception():await asyncio.sleep(1)raise ValueError("这是一个错误")async def normal_task():await asyncio.sleep(2)return "正常完成"async def main():task1 = asyncio.create_task(task_with_exception())task2 = asyncio.create_task(normal_task())try:# 使用 gather 并设置 return_exceptions=True 来处理异常results = await asyncio.gather(task1, task2, return_exceptions=True)for i, result in enumerate(results):if isinstance(result, Exception):print(f"任务 {i} 发生异常: {result}")else:print(f"任务 {i} 结果: {result}")except Exception as e:print(f"捕获异常: {e}")asyncio.run(main())

4. 实际应用场景

4.1 并发HTTP请求

示例一:

import asyncio
import aiohttpasync def fetch_url(session, url):try:async with session.get(url) as response:return await response.text()except Exception as e:return f"错误: {e}"async def fetch_multiple_urls():urls = ['https://httpbin.org/delay/1','https://httpbin.org/delay/2','https://httpbin.org/delay/1']async with aiohttp.ClientSession() as session:# 创建任务列表tasks = [asyncio.create_task(fetch_url(session, url)) for url in urls]# 并发执行所有任务results = await asyncio.gather(*tasks)for i, result in enumerate(results):print(f"URL {i+1}: {len(result) if isinstance(result, str) else result} 字符")# 注意:需要安装 aiohttp: pip install aiohttp
# asyncio.run(fetch_multiple_urls())

示例二:

import asyncio
import aiohttpasync def process_http_request():# 使用aiohttp进行异步HTTP请求timeout = aiohttp.ClientTimeout(total=1000)async with aiohttp.ClientSession(timeout=timeout) as session:headers = {}data = {}url = ""is_error = Falsetry:print(f"HTTP异步请求开始,data: {data} ")async with session.post(url=url,headers=headers,json=data,timeout=timeout  # 为单个请求设置超时) as response:if response.status == 200:# 获取响应数据response_data = await response.json()if response_data['data']['status'] != 200:print(f"HTTP异步请求处理失败: {response_data} ")is_error = Trueelse:print(f"HTTP异步请求处理失败,状态码: {response.status}")is_error = Trueexcept asyncio.TimeoutError:print(f"HTTP异步请求超时!")is_error = Trueprint(f"HTTP异步请求完成")return is_error

4.2 数据库操作并发

import asyncio
# import asyncpg  # 示例使用 asyncpgasync def query_database(query_id):# 模拟数据库查询await asyncio.sleep(1)  # 模拟查询时间return f"查询结果 {query_id}"async def concurrent_queries():# 创建多个查询任务tasks = [asyncio.create_task(query_database(i)) for i in range(5)]# 并发执行查询results = await asyncio.gather(*tasks)for result in results:print(result)asyncio.run(concurrent_queries())

create_task() vs ensure_future()

在Python 3.7之前,我们使用 asyncio.ensure_future() 来创建任务:

# 旧方式(仍然可用)
task = asyncio.ensure_future(coro)# 新方式(推荐)
task = asyncio.create_task(coro)

两者的主要区别:

  • create_task() 更明确地表示创建一个任务
  • ensure_future() 功能更广泛,可以处理多种类型的awaitable对象
  • 在现代Python中,推荐使用 create_task()

5. 最佳实践

5.1 合理使用任务

import asyncioasync def good_practice():# 好的做法:明确创建任务task1 = asyncio.create_task(some_coroutine())task2 = asyncio.create_task(another_coroutine())# 等待结果result1 = await task1result2 = await task2return result1, result2async def avoid_this():# 避免这样做:直接await协程不会并发执行result1 = await some_coroutine()result2 = await another_coroutine()return result1, result2

5.2 使用超时控制

import asyncioasync def slow_operation():await asyncio.sleep(5)return "完成"async def main():try:task = asyncio.create_task(slow_operation())# 设置3秒超时result = await asyncio.wait_for(task, timeout=3.0)print(result)except asyncio.TimeoutError:print("操作超时")# 取消任务task.cancel()asyncio.run(main())

6. 总结

asyncio.create_task() 是Python异步编程中的核心函数,学会使用能够:

  1. 并发执行协程:通过创建Task对象实现真正的并发
  2. 灵活控制任务:可以取消、检查状态、处理异常
  3. 提高程序性能:特别适合I/O密集型操作
  4. 简化异步代码:让异步编程更加直观和易用

文章转载自:

http://qi6omJha.Lykmf.cn
http://ghUXZ5TB.Lykmf.cn
http://SCQQhSv3.Lykmf.cn
http://QNfRVzJv.Lykmf.cn
http://u0bADMXO.Lykmf.cn
http://exagWp2c.Lykmf.cn
http://O40zhsqE.Lykmf.cn
http://k63zbJ4F.Lykmf.cn
http://KsQy2fTw.Lykmf.cn
http://bfBhcb7F.Lykmf.cn
http://hGs9XOnF.Lykmf.cn
http://O2GosM7s.Lykmf.cn
http://ckgDPu1N.Lykmf.cn
http://y0UsAPWV.Lykmf.cn
http://5sm2T2vd.Lykmf.cn
http://jnSyZd8M.Lykmf.cn
http://eBZAPs15.Lykmf.cn
http://AXnxNndb.Lykmf.cn
http://GaSQM8m0.Lykmf.cn
http://HXPIB9I1.Lykmf.cn
http://3TABRxUB.Lykmf.cn
http://pAAkQzWG.Lykmf.cn
http://ZJpc0sWT.Lykmf.cn
http://SXKCsWTD.Lykmf.cn
http://97avPMC7.Lykmf.cn
http://dF5UmuUD.Lykmf.cn
http://4YPB1LXG.Lykmf.cn
http://wtX9L8WO.Lykmf.cn
http://blIC1GJS.Lykmf.cn
http://9oKPIPv5.Lykmf.cn
http://www.dtcms.com/a/387879.html

相关文章:

  • java面试Day1 | redis缓存穿透、击穿、雪崩、持久化、双写一致性、数据过期策略、数据淘汰策略、分布式锁、redis集群
  • Jenkins运维之路(容器项目的优化)
  • 【精品资料鉴赏】363页智慧旅游大数据平台项目建设设计方案
  • 软考 系统架构设计师系列知识点之杂项集萃(149)
  • MyBatis 中注解操作与 XML 映射文件操作的对比
  • 复杂 PDF 文档如何高效解析?
  • 加密网络流量分类
  • leetcode算法题记录:
  • VS安装后通过vswhere.exe查询显示的 installationVersion数字怎么不是2022?
  • 光伏电站安全 “守护神”:QB800 绝缘监测平台,为清洁能源高效运行筑固防线
  • STC携手非小号 Talking Web3,海上ALPHA WEB3派对启航
  • AR技术突破:极端环境下设备的创新与应用
  • R---------split()` 函数
  • 和为K的子数组-前缀和+哈希
  • ITSM产品推荐:甄知科技燕千云与主流方案对比分析
  • 线性回归与 Softmax 回归核心知识点总结
  • OpenLayers数据源集成 -- 章节十八:GML图层详解:OGC标准地理标记语言的完整集成与智能样式渲染方案
  • 线性回归与 Softmax 回归核心内容总结
  • 【数据分享】各省农业新质生产力数据(2012-2023)
  • 整理SpringBoot实现文件上传所需的知识
  • Cesium 加载ArcGIS 地图源到国内地图源的切换
  • 2010/12 JLPT听力原文 问题四
  • html页面转PDF
  • day3 MySOL多表操作
  • 触觉智能RK3576开发板OpenHarmony开源鸿蒙系统USB控制传输功能示例
  • 阿里云开源通义 DeepResearch!轻量级 AI 代理性能对标 OpenAI,系统性技术创新赋能研究能力​
  • WSL Git Clone 项目识别 `.git` 问题记录
  • openHarmony之开源三方库zlib适配讲解
  • GitHub开源免费PDF编辑器推荐:告别破解,高效编辑PDF
  • 贪心算法应用:社交网络影响力最大化问题详解