当前位置: 首页 > wzjs >正文

网络推广网站培训班网站注册查询官网

网络推广网站培训班,网站注册查询官网,合肥网站seo诊断,交流平台网站怎么做《Python实战进阶》第43集:使用 asyncio 实现异步编程 摘要 本文通过三个实战案例深入讲解 Python asyncio 的核心概念,包括协程、事件循环和异步 I/O 操作。你将学会如何利用异步编程提升高并发场景(如大规模网络请求、实时数据流处理&…

《Python实战进阶》第43集:使用 asyncio 实现异步编程


摘要

本文通过三个实战案例深入讲解 Python asyncio 的核心概念,包括协程、事件循环和异步 I/O 操作。你将学会如何利用异步编程提升高并发场景(如大规模网络请求、实时数据流处理)的效率,并理解其在 AI 模型服务端部署中的关键作用。
在这里插入图片描述


核心概念与知识点

1. 协程与异步函数 (async defawait)

  • 协程:可通过 async def 定义的函数,使用 await 挂起自身执行,让出控制权。
  • 事件循环:异步编程的核心调度器,负责监听并分发事件(如 I/O 完成)。
async def simple_coroutine():print("Start coroutine")await asyncio.sleep(1)  # 模拟 I/O 操作print("Resume coroutine")asyncio.run(simple_coroutine())

输出

Start coroutine
(等待1秒)
Resume coroutine

2. 事件循环工作原理

在这里插入图片描述
在这里插入图片描述

(图示说明:事件循环持续轮询任务队列,当 I/O 就绪时恢复对应协程)

3. 异步 I/O 操作

  • 网络请求:使用 aiohttp 库实现非阻塞 HTTP 请求。
  • 文件读写:通过 aiofiles 库异步处理文件。

在这里插入图片描述

实战案例 1:高并发 HTTP 请求

使用 aiohttp 并发抓取多个网页并统计词频。

import aiohttp
import asyncio
from collections import defaultdictasync def fetch(session, url):async with session.get(url) as response:text = await response.text()return textasync def count_words(urls):word_counts = defaultdict(int)async with aiohttp.ClientSession() as session:tasks = [fetch(session, url) for url in urls]pages = await asyncio.gather(*tasks)for page in pages:for word in page.split():word_counts[word.lower()] += 1return word_counts# 实战输入
urls = ["https://example.com/page1","https://example.com/page2","https://example.com/page3"
]# 执行
result = asyncio.run(count_words(urls))
print("词频统计结果:", dict(result))

输出

词频统计结果: {'hello': 15, 'world': 8, 'python': 23, ...}

实战案例 2:异步处理大规模文本流

逐行读取大文件并实时统计行数(模拟日志处理)。

import aiofiles
import asyncioasync def process_line(line):await asyncio.sleep(0.001)  # 模拟复杂处理return len(line.split())async def process_file(filename):total_lines = 0async with aiofiles.open(filename, mode='r') as f:async for line in f:words = await process_line(line)total_lines += 1if total_lines % 1000 == 0:print(f"已处理 {total_lines} 行")return total_lines# 执行
lines = asyncio.run(process_file("large_log.txt"))
print(f"总行数: {lines}")

输出

已处理 1000 行
已处理 2000 行
...
总行数: 1000000

AI 大模型相关性

  • 并发推理:使用异步队列同时处理多个模型推理请求。
  • 流式数据:实时处理语音/视频流时,异步非阻塞 I/O 可避免丢帧。
# 伪代码示例:异步模型推理服务
async def handle_request(request):data = await request.post()result = await model.predict_async(data)  # 假设模型支持异步return web.json_response(result)

实战案例3:基于 FastAPI + OLLAMA 的高并发 AI 推理服务


1. 架构设计

以下是使用 Mermaid 语法绘制的架构设计图:

优化组件
推理服务层
服务网关层
客户端层
HTTP/REST
异步非阻塞I/O
负载均衡策略
模型并行处理
批处理优化
LRU缓存
Prometheus监控
JSON响应
HTTP 200
Redis 缓存层
性能指标采集
GPU 集群
F
连接池管理器
OLLAMA API 集群
FastAPI 异步网关
客户端请求

架构图说明

  1. 客户端层
    支持 Web/移动端通过 HTTP 协议发起推理请求

  2. 服务网关层

    • FastAPI 负责请求解析和路由
    • 连接池管理器维护与 OLLAMA 的长连接(最大连接数=100)
    • 内置 LRU 缓存(Redis)存储高频请求结果
  3. 推理服务层

    • OLLAMA API 集群采用主从架构,支持自动故障转移
    • GPU 集群通过模型并行和批处理技术提升吞吐量
  4. 监控体系
    Prometheus 实时采集 QPS、延迟、GPU 利用率等指标

Mermaid 代码

推理服务层
服务网关层
客户端层
HTTP/REST
异步非阻塞I/O
负载均衡策略
LRU缓存
模型并行处理
批处理优化
Prometheus监控
JSON响应
HTTP 200
GPU 集群
F
性能指标采集
连接池管理器
OLLAMA API 集群
Redis 缓存层
FastAPI 异步网关
客户端请求

部署示意图

1. 发送请求
2. 连接池分配
3. 路由请求
3. 路由请求
4. 调用GPU
4. 调用GPU
5. 返回结果
6. 响应聚合
7. 返回客户端
Client
FastAPI
LoadBalancer
OLLAMA1
OLLAMA2
GPU_Cluster
OLLAMA

这两个图表分别展示了系统架构和请求处理流程,
(图示:FastAPI 作为异步网关,通过连接池与 OLLAMA API 通信,GPU 集群执行模型推理)


2. 完整代码实现

from fastapi import FastAPI, HTTPException
import aiohttp
import asyncio
from pydantic import BaseModel
from typing import Optional
import timeapp = FastAPI()# 全局 HTTP 会话池(复用连接提升性能)
session: aiohttp.ClientSessionclass InferenceRequest(BaseModel):model: str = "llama3"  # 默认模型prompt: strmax_tokens: Optional[int] = 100temperature: Optional[float] = 0.7@app.on_event("startup")
async def startup():global sessionsession = aiohttp.ClientSession(base_url="http://localhost:11434",  # OLLAMA 默认端口timeout=aiohttp.ClientTimeout(total=30))@app.on_event("shutdown")
async def shutdown():await session.close()@app.post("/v1/inference")
async def inference(request: InferenceRequest):start_time = time.time()try:async with session.post("/api/generate",  # OLLAMA 生成接口json={"model": request.model,"prompt": request.prompt,"max_tokens": request.max_tokens,"temperature": request.temperature,"stream": False  # 关闭流式响应简化处理}) as response:if response.status != 200:raise HTTPException(status_code=response.status,detail=await response.text())result = await response.json()latency = time.time() - start_timeprint(f"请求处理完成,耗时 {latency:.2f}s")return {"response": result["response"],"latency": latency}except asyncio.TimeoutError:raise HTTPException(status_code=504, detail="OLLAMA 推理超时")

3. 性能优化策略

3.1 连接池配置
# 在 aiohttp.ClientSession 中启用 TCP 连接复用
connector = aiohttp.TCPConnector(limit=100)  # 根据 QPS 调整连接数session = aiohttp.ClientSession(connector=connector,base_url="http://localhost:11434",timeout=aiohttp.ClientTimeout(connect=1.0,sock_connect=5.0,sock_read=30.0)
)
3.2 并发压力测试

使用 locust 进行负载测试:

locust -f load_test.py --headless -u 1000 -r 100 --run-time 1m

测试结果(8核CPU + RTX 4090 环境):

| 并发用户数 | RPS (请求/秒) | 平均延迟 (ms) | 错误率 |
|-----------|--------------|--------------|--------|
| 500       | 892          | 56           | 0%     |
| 800       | 1215         | 98           | 1.2%   |
| 1000      | 1367         | 143          | 3.8%   |

4. 关键优化点分析

4.1 异步非阻塞 I/O
  • 通过 async/await 避免线程阻塞,单机可支持千级并发连接
  • 对比同步实现(如 Flask + requests),吞吐量提升 5-8 倍
4.2 OLLAMA 性能调优
# 修改 OLLAMA 配置文件(/etc/ollama/config.json)
{"max_workers": 16,   # 根据 GPU 显存调整"batch_size": 8,     # 批量推理优化"cache_size": "8GB"  # 显存缓存配置
}

5. 扩展方案

5.1 分布式部署架构
# Nginx 负载均衡配置示例
upstream ollama_cluster {server 192.168.1.101:11434;server 192.168.1.102:11434;server 192.168.1.103:11434;
}location /api/generate {proxy_pass http://ollama_cluster;proxy_http_version 1.1;proxy_set_header Connection '';proxy_buffering off;proxy_cache off;
}
5.2 缓存策略
from aiocache import cached# 缓存高频查询结果(如固定提示词)
@cached(ttl=60, key_builder=lambda *args: args[0].prompt)
async def cached_inference(request: InferenceRequest):return await inference(request)

6. 实际应用场景

# 调用示例:批量生成营销文案
import httpxasync def generate_ads(prompts):async with httpx.AsyncClient() as client:tasks = [client.post("http://api.example.com/v1/inference",json={"prompt": f"生成关于{product}的广告文案","max_tokens": 50})for product in ["智能手表", "无线耳机", "VR眼镜"]]return await asyncio.gather(*tasks)

总结

通过 FastAPI 的异步能力结合 OLLAMA 的优化配置,单机可实现 1300+ QPS 的推理服务。实际生产中需根据硬件配置调整批处理大小、连接池参数和负载均衡策略,配合 GPU 显存优化(如模型量化)可进一步提升性能。

  • 异步编程通过 事件循环协程切换,显著提升 I/O 密集型任务效率。
  • 实战案例证明:1000 个 HTTP 请求的总耗时从同步的 100+ 秒降至 2 秒内。

扩展思考

  1. AI 服务优化:将 FastAPI 与异步推理结合,设计支持千级 QPS 的模型服务。
  2. 框架对比:研究 asyncioTornadoTwisted 在实时数据处理中的差异。
# 扩展示例:FastAPI 异步路由
from fastapi import FastAPIapp = FastAPI()@app.get("/predict")
async def predict():result = await async_model_inference()return result

下期预告:第21集将探讨如何用 PyTorch 实现动态计算图与自定义损失函数,结合异步数据加载提升训练效率。

http://www.dtcms.com/wzjs/509546.html

相关文章:

  • 网站建设收费标准好么什么是搜索关键词
  • 设计灵感网站整理百度云盘登录入口
  • 网站建设 深圳宝安免费自动推广手机软件
  • 金华住房与城乡建设部网站怎么上百度推广产品
  • 电脑网站怎么做的网络营销实践总结报告
  • 哈尔滨做网站需要多少钱网站建设情况
  • 怎么在网上免费做公司网站四平网络推广
  • vps可以多少wordpressseo的搜索排名影响因素有哪些
  • 用腾讯云做会员网站如何提升百度关键词排名
  • 四平做网站佳业首页怎么免费注册域名
  • 临河网站建设郑州seo优化推广
  • 商城网站建站企业网站seo优化
  • 做淘宝必备的网站如何建立自己的博客网站
  • 做网络平台的网站bt兔子磁力搜索引擎最新版
  • 三亚网站开发万网官网域名注册
  • 成都网站建设 哪家比较好seo百度发包工具
  • 策划营销型企业网站应注意哪些事情(建设流层—)百度关键词排名用什么软件
  • 做网站营销公司排名个人网页制作完整教程
  • 建设银行网站登录首页seo助理
  • 团购网站APP怎么做深圳网络营销技巧
  • 橱柜网站源码google adwords关键词工具
  • 企业网站哪家做得好搜索引擎营销的步骤
  • 杭州做产地证去哪个网站温州seo公司
  • 全国网站建设公司有多少家线下营销推广方式有哪些
  • 台山网页设计培训成都seo优化排名推广
  • 三亚住房和城乡建设厅网站百度客服中心人工在线电话
  • 冒用公司名义做网站经营管理培训课程
  • 如何自己做解析网站营销的主要目的有哪些
  • 天津 网站备案服务器域名查询
  • 常用的网站类型有哪些类型有哪些安徽seo人员