当前位置: 首页 > news >正文

Python的asyncio核心组件

asyncio核心组件

  1. Future对象
    Future是异步编程的基础构建块,代表一个尚未完成的计算结果。
  2. Task对象深入分析
    Task是包装协程的特殊Future,负责协程的调度和执行。
  3. Executor机制详解
    Executor提供同步代码与异步环境的桥梁。

1. Future对象

Future是异步编程的基础构建块,代表一个尚未完成的计算结果。
代码:

import asyncio
import time
from concurrent.futures import Future as SyncFuturedef callback_example(future):"""Future完成时的回调函数示例"""try:result = future.result()print(f"回调函数收到结果: {result}")except asyncio.CancelledError:print("回调函数: Future已被取消")except Exception as e:print(f"回调函数捕获到异常: {e}")async def basic_future_example():"""基本Future使用示例"""print("=== 基本Future使用示例 ===")loop = asyncio.get_event_loop()# 创建Future并注册回调函数future = loop.create_future()future.add_done_callback(callback_example)def on_data_received(data):future.set_result(f"接收到数据: {data}")# 模拟异步数据接收loop.call_later(0.1, on_data_received, "Hello World")result = await futureprint(f"主程序收到结果: {result}")print()async def concurrent_operations_example():"""并发操作示例"""print("=== 并发操作示例 ===")loop = asyncio.get_event_loop()# 创建多个Futurefutures = [loop.create_future() for _ in range(3)]# 为每个Future添加回调for i, future in enumerate(futures):def make_callback(index):def callback(f):print(f"任务 {index} 完成: {f.result()}")return callbackfuture.add_done_callback(make_callback(i))# 模拟异步完成def complete_task(index, value):futures[index].set_result(f"任务{index}的结果: {value}")# 安排任务完成(缩短时间)loop.call_later(0.1, complete_task, 0, "A")loop.call_later(0.2, complete_task, 1, "B")loop.call_later(0.15, complete_task, 2, "C")# 等待所有Future完成results = await asyncio.gather(*futures)print(f"所有任务完成,结果: {results}")print()# 测试所有示例
async def main():"""主函数:运行Future示例"""await basic_future_example()await concurrent_operations_example()if __name__ == "__main__":asyncio.run(main())

结果:

=== 基本Future使用示例 ===
回调函数收到结果: 接收到数据: Hello World
主程序收到结果: 接收到数据: Hello World=== 并发操作示例 ===
任务 0 完成: 任务0的结果: A
任务 2 完成: 任务2的结果: C
任务 1 完成: 任务1的结果: B
所有任务完成,结果: ['任务0的结果: A', '任务1的结果: B', '任务2的结果: C']

2. Task对象深入分析

Task是包装协程的特殊Future,负责协程的调度和执行。

import asyncio
import timeasync def simple_coroutine():"""简单的协程函数"""print(" 协程开始执行")await asyncio.sleep(2)  # 模拟耗时操作print(" 协程执行完成")return "Hello from Task!"async def demonstrate_create_task():"""演示 asyncio.create_task() 的基本用法"""# 方法1: 直接创建任务print(" 方法1: 直接创建任务")task1 = asyncio.create_task(simple_coroutine())print(f"任务已创建: {task1}")print(f"任务是否完成: {task1.done()}")# 等待任务完成并获取结果result1 = await task1print(f"任务结果: {result1}")print(f"任务是否完成: {task1.done()}\n")# 方法2: 创建多个任务并发执行print(" 方法2: 创建多个任务并发执行")async def quick_task(name, delay):print(f"任务 {name} 开始执行")await asyncio.sleep(delay)print(f"任务 {name} 完成")return f"结果-{name}"# 同时创建多个任务task_a = asyncio.create_task(quick_task("A", 1))task_b = asyncio.create_task(quick_task("B", 2))task_c = asyncio.create_task(quick_task("C", 1.5))print("所有任务已创建,开始并发执行...")# 等待所有任务完成results = await asyncio.gather(task_a, task_b, task_c)print(f"所有任务结果: {results}\n")async def main():await demonstrate_create_task()if __name__ == "__main__":# 运行异步主函数start_time = time.time()asyncio.run(main())end_time = time.time()print(f"\n总执行时间: {end_time - start_time:.2f} 秒")

结果:

方法1: 直接创建任务
任务已创建: <Task pending name='Task-2' coro=<simple_coroutine() running at F:\AI 工程化项目实战营\week09\p12_2Task.py:4>>
任务是否完成: False协程开始执行协程执行完成
任务结果: Hello from Task!
任务是否完成: True方法2: 创建多个任务并发执行
所有任务已创建,开始并发执行...
任务 A 开始执行
任务 B 开始执行
任务 C 开始执行
任务 A 完成
任务 C 完成
任务 B 完成
所有任务结果: ['结果-A', '结果-B', '结果-C']总执行时间: 4.01 秒Process finished with exit code 0

3. Executor机制详解

Executor提供同步代码与异步环境的桥梁。
代码:

import asyncio
import multiprocessing
import time
from concurrent.futures import ProcessPoolExecutor# I/O密集型 - asyncio原生异步
async def io_task(task_id):await asyncio.sleep(1)  # 模拟I/O等待return f"I/O任务{task_id}完成"# CPU密集型 - 多进程并行
def cpu_task(task_id):total = sum(i*i for i in range(1000000))  # CPU计算return f"CPU任务{task_id}完成"async def main():print("=== Executor核心机制演示 ===")# I/O密集型:asyncio并发print("\n1. I/O密集型 (asyncio):")start = time.time()io_results = await asyncio.gather(*[io_task(i) for i in range(5)])io_time = time.time() - startprint(f"5个I/O任务并发执行: {io_time:.2f}秒")# CPU密集型:ProcessPool并行print("\n2. CPU密集型 (ProcessPoolExecutor):")start = time.time()with ProcessPoolExecutor(max_workers=multiprocessing.cpu_count()) as pool:loop = asyncio.get_event_loop()cpu_results = await asyncio.gather(*[loop.run_in_executor(pool, cpu_task, i) for i in range(5)])cpu_time = time.time() - startprint(f"5个CPU任务并行执行: {cpu_time:.2f}秒")print(f"\n核心原理:")print(f"- I/O密集型用asyncio: 单线程并发,避免阻塞")print(f"- CPU密集型用ProcessPool: 多进程并行,绕过GIL")if __name__ == "__main__":asyncio.run(main())

结果:

=== Executor核心机制演示 ===1. I/O密集型 (asyncio):
5个I/O任务并发执行: 1.012. CPU密集型 (ProcessPoolExecutor):
5个CPU任务并行执行: 0.65秒核心原理:
- I/O密集型用asyncio: 单线程并发,避免阻塞
- CPU密集型用ProcessPool: 多进程并行,绕过GILProcess finished with exit code 0

4. 使用aiohttp实现高性能HTTP客户端

原生异步I/O,单线程处理数千并发连接
支持HTTP/1.1与WebSocket协议
生产级连接池管理(复用TCP连接)
与asyncio无缝集成,零额外开销

import aiohttp
import asyncio
import time
from typing import Dict, Any, Optional, List
import jsonclass HighPerformanceHttpClient:"""高性能HTTP客户端 - 演示aiohttp的高级特性"""def __init__(self, max_connections=100, max_keepalive_connections=30, timeout=30):# 配置TCP连接器 - 连接池管理self.connector = aiohttp.TCPConnector(limit=max_connections,                    # 总连接数限制limit_per_host=30,                       # 每主机连接数限制  enable_cleanup_closed=True,              # 清理已关闭连接keepalive_timeout=30,                    # 保持连接超时force_close=False,                       # 重用连接ttl_dns_cache=300,                       # DNS缓存时间use_dns_cache=True                       # 启用DNS缓存)# 超时配置timeout_config = aiohttp.ClientTimeout(total=timeout,                           # 总超时时间connect=10,                              # 连接超时sock_read=timeout                        # 读取超时)# 创建会话self.session = aiohttp.ClientSession(connector=self.connector,timeout=timeout_config,raise_for_status=False,                  # 手动处理状态码headers={'User-Agent': 'HighPerformanceClient/1.0','Accept': 'application/json, text/plain, */*','Accept-Encoding': 'gzip, deflate','Connection': 'keep-alive'})async def close(self):"""优雅关闭会话"""try:if self.session and not self.session.closed:await self.session.close()except Exception:passtry:if self.connector and not self.connector.closed:await self.connector.close()except Exception:passasync def get(self, url: str, **kwargs) -> Dict[str, Any]:"""GET请求封装 - 带错误处理"""try:async with self.session.get(url, **kwargs) as response:result = {'status': response.status,'headers': dict(response.headers),'url': str(response.url),'content_type': response.content_type}# 根据内容类型处理响应if response.content_type == 'application/json':result['data'] = await response.json()elif response.content_type == 'text/html':result['data'] = await response.text()else:result['data'] = await response.read()return resultexcept aiohttp.ClientError as e:return {'error': f'Client error: {str(e)}', 'status': 0}except asyncio.TimeoutError:return {'error': 'Request timeout', 'status': 0}except Exception as e:return {'error': f'Unexpected error: {str(e)}', 'status': 0}async def post(self, url: str, data=None, json_data=None, **kwargs) -> Dict[str, Any]:"""POST请求封装 - 支持多种数据格式"""try:async with self.session.post(url, data=data, json=json_data, **kwargs) as response:result = {'status': response.status,'headers': dict(response.headers),'url': str(response.url)}if response.content_type == 'application/json':result['data'] = await response.json()else:result['data'] = await response.text()return resultexcept Exception as e:return {'error': str(e), 'status': 0}async def get_with_retry(self, url: str, max_retries=3, **kwargs) -> Dict[str, Any]:"""带重试机制的GET请求"""for attempt in range(max_retries):result = await self.get(url, **kwargs)if 'error' not in result and result['status'] < 500:return resultif attempt < max_retries - 1:await asyncio.sleep(2 ** attempt)  # 指数退避return result# 演示1: 连接池性能对比实验
async def connection_pool_performance_test():"""测试不同连接池配置的性能"""print("=== 连接池性能对比实验 ===")urls = [f"https://httpbin.org/delay/1"] * 50# 测试1: 无连接复用print("测试1: 无连接复用...")start = time.time()async with aiohttp.ClientSession() as session:tasks = [session.get(url) for url in urls]await asyncio.gather(*tasks, return_exceptions=True)no_pool_time = time.time() - start# 测试2: 启用连接复用print("测试2: 启用连接复用...")connector = aiohttp.TCPConnector(limit=50, keepalive_timeout=30)async with aiohttp.ClientSession(connector=connector) as session:start = time.time()tasks = [session.get(url) for url in urls]await asyncio.gather(*tasks, return_exceptions=True)pooled_time = time.time() - startprint(f"无连接复用: {no_pool_time:.2f}s")print(f"启用连接复用: {pooled_time:.2f}s")print(f"性能提升: {no_pool_time/pooled_time:.2f}x")print()# 演示2: 并发请求演示
async def concurrent_requests_demo():"""演示并发请求能力"""print("=== 并发请求演示 ===")client = HighPerformanceHttpClient()# 不同类型的API端点urls = ['https://httpbin.org/json','https://httpbin.org/html','https://httpbin.org/delay/1','https://httpbin.org/status/200','https://httpbin.org/headers']print(f"并发请求 {len(urls)} 个不同的端点...")start = time.time()# 并发执行所有请求tasks = [client.get(url) for url in urls]results = await asyncio.gather(*tasks)elapsed = time.time() - startprint(f"总耗时: {elapsed:.2f}s")# 显示结果for i, (url, result) in enumerate(zip(urls, results)):status = result.get('status', 'ERROR')if 'error' in result:print(f"  {i+1}. {url} - 错误: {result['error']}")else:print(f"  {i+1}. {url} - 状态码: {status}")await client.close()print()# 演示3: POST请求和JSON处理
async def post_request_demo():"""演示POST请求和JSON数据处理"""print("=== POST请求演示 ===")client = HighPerformanceHttpClient()# POST JSON数据json_data = {'name': '张三','age': 25,'city': '北京','skills': ['Python', 'asyncio', 'aiohttp']}print("POST JSON数据到 httpbin.org...")result = await client.post('https://httpbin.org/post',json_data=json_data,headers={'Content-Type': 'application/json'})if 'error' not in result and result['status'] == 200:response_data = result['data']print(f"  状态码: {result['status']}")print(f"  接收到的JSON数据: {response_data.get('json', {})}")print(f"  请求头: {list(response_data.get('headers', {}).keys())}")else:print(f"  请求失败: {result.get('error', 'Unknown error')}")await client.close()print()# 演示4: 错误处理和重试机制
async def error_handling_demo():"""演示错误处理和重试机制"""print("=== 错误处理和重试机制演示 ===")client = HighPerformanceHttpClient(timeout=5)# 测试各种错误情况test_urls = ['https://httpbin.org/status/404',  # 404错误'https://httpbin.org/status/500',  # 500错误'https://httpbin.org/delay/10',    # 超时'https://invalid-domain-12345.com' # 无效域名]for url in test_urls:print(f"\n测试URL: {url}")# 普通请求result = await client.get(url)if 'error' in result:print(f"  普通请求错误: {result['error']}")else:print(f"  普通请求状态码: {result['status']}")# 带重试的请求retry_result = await client.get_with_retry(url, max_retries=2)if 'error' not in retry_result:print(f"  重试请求成功: 状态码 {retry_result['status']}")else:print(f"  重试后仍然失败: {retry_result['error']}")await client.close()print()# 演示5: 会话管理和连接池统计
async def session_management_demo():"""演示会话管理和连接池统计"""print("=== 会话管理和连接池统计演示 ===")client = HighPerformanceHttpClient(max_connections=10)print("连接池配置:")print(f"  总连接数限制: {client.connector.limit}")print(f"  每主机连接数限制: {client.connector.limit_per_host}")# print(f"  Keep-alive超时: {client.connector.keepalive_timeout}s")  # 该属性不存在# 执行一些请求urls = ['https://httpbin.org/json'] * 5tasks = [client.get(url) for url in urls]results = await asyncio.gather(*tasks)successful = sum(1 for r in results if 'error' not in r and r['status'] == 200)print(f"\n成功请求数: {successful}/{len(urls)}")# 显示连接池状态print(f"连接池已关闭: {client.connector.closed}")print(f"会话已关闭: {client.session.closed}")await client.close()print("会话已优雅关闭")print()async def main():"""主函数:运行所有演示"""try:# 运行所有演示await connection_pool_performance_test()await concurrent_requests_demo()await post_request_demo()await error_handling_demo()await session_management_demo()except KeyboardInterrupt:print("\n用户中断程序")except Exception as e:print(f"程序执行错误: {e}")if __name__ == "__main__":# 使用推荐的asyncio运行方式try:asyncio.run(main())except KeyboardInterrupt:print("\n程序被用户中断")

结果:

=== 连接池性能对比实验 ===
测试1: 无连接复用...
测试2: 启用连接复用...
无连接复用: 8.49s
启用连接复用: 10.83s
性能提升: 0.78x=== 并发请求演示 ===
并发请求 5 个不同的端点...
总耗时: 4.94s1. https://httpbin.org/json - 状态码: 2002. https://httpbin.org/html - 状态码: 2003. https://httpbin.org/delay/1 - 状态码: 2004. https://httpbin.org/status/200 - 状态码: 2005. https://httpbin.org/headers - 状态码: 200=== POST请求演示 ===
POST JSON数据到 httpbin.org...状态码: 200接收到的JSON数据: {'age': 25, 'city': '北京', 'name': '张三', 'skills': ['Python', 'asyncio', 'aiohttp']}请求头: ['Accept', 'Accept-Encoding', 'Content-Length', 'Content-Type', 'Host', 'User-Agent', 'X-Amzn-Trace-Id']=== 错误处理和重试机制演示 ===测试URL: https://httpbin.org/status/404普通请求状态码: 404重试请求成功: 状态码 404测试URL: https://httpbin.org/status/500普通请求状态码: 500重试请求成功: 状态码 500测试URL: https://httpbin.org/delay/10普通请求错误: Client error: Timeout on reading data from socket重试请求成功: 状态码 502测试URL: https://invalid-domain-12345.com普通请求错误: Client error: Cannot connect to host invalid-domain-12345.com:443 ssl:default [getaddrinfo failed]重试后仍然失败: Client error: Cannot connect to host invalid-domain-12345.com:443 ssl:default [getaddrinfo failed]=== 会话管理和连接池统计演示 ===
连接池配置:总连接数限制: 10每主机连接数限制: 30成功请求数: 5/5
连接池已关闭: False
会话已关闭: False
会话已优雅关闭Process finished with exit code 0
http://www.dtcms.com/a/602306.html

相关文章:

  • 建立网站要多少钱销售平台有哪些
  • 诸暨公司做网站免费项目进度管理软件
  • leetcode:逆波兰表达式求值
  • sql中left join和inner join的区别
  • 最小栈--leetcode
  • 做网站的学什么代码wordpress 主题末班
  • 网站建设二公司psd转wordpress主题
  • 线性代数 - 3 阶方阵的行列式 可视化
  • 营销型网站首页模板做纺织生意用什么网站好
  • flink部署选型方案以及flink-on-k8s部署
  • 3GPP标准各个版本的介绍和演变
  • 网站设置的参数江西建设厅网站查询施工员
  • 程序员个人网站开发模板之家网页模板
  • 彭阳网站建设多少钱做网站怎么发展客户
  • 做软件项目的网站百度制作企业网站多少钱
  • 大型电商网站建设武平县网站建设
  • seo01网站营销推广软件有哪些
  • 建设银行网站信息补充网站建设app端
  • 结合 Leetcode 题探究KMP算法
  • Vue3 VueUse(组合式 API 工具库)
  • seo推广专员工作好做吗绍兴seo管理
  • 嵌入式开发中的 Git CI/CD
  • 【ZeroRange WebRTC】STUN srflx 与 ICE 连通性检查
  • 网站开发公司怎么接单外贸销售怎么找客户
  • vps如何设置网站权限曲阜市古建设计院网站
  • 公司做网站一定要钱吗旅游网站建设
  • 从零到迁移:Docker Desktop + WSL2 完整安装与迁移教程(2025-11-12实测版)
  • 制作网站的程序语言wordpress收不到
  • python Anaconda3 5.3.1(version:4.5.11)下载安装教程
  • 现在网站建设用什么语言最便宜的免费建站