当前位置: 首页 > news >正文

fastapi -- 耗时任务处理

文章目录

  • 耗时任务处理方法
      • fastapi自带BackgroundTasks
      • 基于Celery任务队列

耗时任务处理方法

fastapi自带BackgroundTasks

BackgroundTasks 属于:轻量级的内存任务队列管理器
主要弊端:

  • 无持久化,任务可靠性差
  • 无任务状态追踪
  • 服务器重启丢失任务
  • 内存限制,不适合大数据
  • 缺乏重试、超时控制
  • 异步CPU密集型会阻塞事件循环,fastapi无法继续处理其他的请求;

建议:仅用于开发环境或非关键的轻量级任务,生产环境建议使用专业的任务队列系统 。
案例代码:

# __author__ = "laufing"
# class_based_qt
import time
import uvicorn
import asyncio
from fastapi import FastAPI, BackgroundTasks, Form # Form依赖python-multipartapp = FastAPI()# BackgroundTasks 是一个任务队列管理器
def task_handler(msg):  # 类型注解# 模拟发送邮件print(f"Sending email to {msg}")for i in range(10000000):print(i)print("task run end....")async def async_task_handler(msg):  # 类型注解,自动识别并调用email-validator# 模拟发送邮件print(f"async sending email to {msg}")# for i in range(10000000): # CPU密集型的异步任务#     print(i)await asyncio.sleep(100) # IO密集型print("async task run end....")# 添加路由  (同步终点函数 + 同步的耗时) + requests 同步的请求
@app.post("/task", status_code=202) # 成功则返回202
def submit_task( # 同步端点函数,在线程池中执行;内部的逻辑必须的同步的,不能使用async def/awaitbackground_tasks: BackgroundTasks, # 后台任务管理message: str=Form(..., description="从请求体中解析", alias="message")  # 必须通过类型注释标识从哪里解析数据,否则可能报错422
):print("parsed message:", message)# 添加到后台,并开始执行# - 在同步的终点函数中,同步的耗时任务会在子线程中异步执行# IO密集型、CPU密集型均不会阻塞fastapi处理其他请求background_tasks.add_task(task_handler, message)return {"message": "Notification sent in background"}# 添加路由  (异步端点函数 + 异步的耗时任务) + requests 同步的请求
@app.post("/task2", status_code=202) # 成功则返回202
async def submit_task( # 异步的端点函数,在事件循环执行;内部的逻辑最好是异步的,使用async def/awaitbackground_tasks: BackgroundTasks, # 后台任务管理message: str=Form(..., description="从请求体中解析", alias="message")  # 必须通过类型注释标识从哪里解析数据,否则可能报错422
):print("parsed message:", message)# 异步任务添加到后台,并开始执行# 异步的CPU密集型会阻塞fastapi处理其他请求(阻塞事件循环)# 异步的IO密集型则不会阻塞background_tasks.add_task(async_task_handler, message)return {"message": "Notification sent in background"}if __name__ == '__main__':uvicorn.run(app, host="0.0.0.0", port=5050)

 
对于以上CPU密集型任务阻塞事件循环场景,总结如下:

  • 不要在 asyncio 事件循环中直接运行 CPU 密集型任务;
  • 使用 asyncio.run_in_executor 将任务转移到进程池,选择 ProcessPoolExecutor 而不是 ThreadPoolExecutor;
  • 合理配置工作进程数量(通常等于 CPU 核心数)
  • 对于生产环境,考虑使用 Celery 或 RQ 等专业任务队列
    这样就能在保持 FastAPI 高性能的同时,处理 CPU 密集型任务而不会阻塞事件循环。
import os
from concurrent.futures import ProcessPoolExecutordef sync_task_handler(msg):   # 同步任务处理器print(f"sync sending email to {msg}")for i in range(1000000): # CPU密集型的任务print(i)print("sync task run end....")async def async_task_wrapper(msg):  # 获取一个事件循环loop = asyncio.get_event_loop()# 事件循环运行在进程池中await loop.run_in_executor(process_pool, sync_task_handler, msg) # cannot pickle 'coroutine' object,所有只能用同步的处理器print("进程池中的任务执行完成...")# 获取CPU的核心数
cpu_count = os.cpu_count()
process_pool = ProcessPoolExecutor(max_workers=cpu_count)# 添加路由  (异步端点函数 + 异步的耗时任务) + requests 同步的请求
@app.post("/task2", status_code=202) # 成功则返回202
async def submit_task( # 异步的端点函数,在事件循环执行;内部的逻辑最好是异步的,使用async def/awaitbackground_tasks: BackgroundTasks, # 后台任务管理message: str=Form(..., description="从请求体中解析", alias="message")  # 必须通过类型注释标识从哪里解析数据,否则可能报错422
):#添加任务background_tasks.add_task(async_task_wrapper, message)return {"message": "Notification sent in background"}# 优化进程池的创建
from contextlib import asynccontextmanager# 生命周期管理
@asynccontextmanager
async def lifespan(app: FastAPI):# 启动时创建进程池process_pool = ProcessPoolExecutor(max_workers=os.cpu_count())app.state.process_pool = process_poolyield# 关闭服务时清理process_pool.shutdown(wait=True)app = FastAPI(lifespan=lifespan)

 
将异步CPU密集型任务放入进程池的完成方案:

# __author__ = "laufing"
# class_based_qt
import time
import os
import uvicorn
import asyncio
import uuid
from concurrent.futures import ProcessPoolExecutor
from contextlib import asynccontextmanager
from fastapi import FastAPI, BackgroundTasks, Form # Form依赖python-multipart
from fastapi import Query@asynccontextmanager
async def lifespan(app: FastAPI):# 启动服务时,创建进程池# 获取CPU的核心数cpu_count = os.cpu_count()process_pool = ProcessPoolExecutor(max_workers=cpu_count)app.state.process_pool = process_poolapp.state.task_data = {}#yield# 关闭服务时,关闭进程池process_pool.shutdown(wait=True)app = FastAPI(lifespan=lifespan)  # 指定生命周期data = {"state": "ready"
}async def async_task_handler(msg):   # 异步任务处理器print(f"async sending email to {msg}")for i in range(10): # CPU密集型的异步任务print(i)print("async task run end....")def run_task(msg):# 子进程中无法更新父进程的数据# data["state"] = "running"asyncio.run(async_task_handler(msg))# data["state"] = "success"# 添加路由  (异步端点函数 + 异步的耗时任务) + requests 同步的请求
@app.post("/task2", status_code=202) # 成功则返回202
async def submit_task( # 异步的端点函数,在事件循环执行;内部的逻辑最好是异步的,使用async def/awaitbackground_tasks: BackgroundTasks, # 后台任务管理message: str=Form(..., description="从请求体中解析", alias="message")  # 必须通过类型注释标识从哪里解析数据,否则可能报错422
):# 可以直接访问app对象process_pool = app.state.process_pool# 进程池提交任务future = process_pool.submit(run_task, message)print("future?", future)# future是可await对象# future.done() 是否完成# future.cancled() 是否取消# future.result()  同步获取结果# future.running()/exception# await futuretask_id = str(uuid.uuid4())app.state.task_data[task_id] = futurereturn {"message": "任务以提交", "task_id": task_id}@app.get("/task2", status_code=200)
async def task_result(task_id: str=Query(..., description="task id")):task_data = app.state.task_datafuture = task_data.get(task_id)print(app.state.task_data)print(future, future.done())return {"task_status": future.done()}if __name__ == '__main__':uvicorn.run(app, host="0.0.0.0", port=5050)

 

基于Celery任务队列

# celery_tasks.py
from celery import Celery
import oscelery_app = Celery('tasks',broker='redis://localhost:6379/0',backend='redis://localhost:6379/0'
)@celery_app.task
def heavy_cpu_task(n: int):"""Celery 任务 - 在单独的进程中运行"""result = 0for i in range(n):result += i * ireturn result# FastAPI 集成
@app.post("/cpu-intensive-celery")
async def cpu_intensive_celery(n: int):from celery_tasks import heavy_cpu_task# 发送任务到 Celery workertask = heavy_cpu_task.delay(n)return {"task_id": task.id, "status": "queued"}@app.get("/task-result/{task_id}")
async def get_task_result(task_id: str):from celery_tasks import heavy_cpu_taskresult = heavy_cpu_task.AsyncResult(task_id)if result.ready():return {"status": "completed", "result": result.result}else:return {"status": "processing"}
http://www.dtcms.com/a/561961.html

相关文章:

  • 网站建设咨询服务企业级服务器配置
  • 做阅读任务挣钱的网站北京网站建设公司哪家最好
  • 零基础学Python_自动补全符号
  • C++14 新特性:更加简洁和高效的编程体验
  • 邹城网站设计百中搜
  • 青海省住房和城乡建设厅官方网站wordpress s.w.org
  • Apollo Planning 模块技术深度解析
  • 哪个网站可以帮助做数学题百度推送
  • 企业网站和信息化建设哪里有网站制作服务
  • 【Linux】深入理解进程(三)(环境变量)
  • 【C学生序号姓名学号年龄降序排序】2022-12-9
  • 平衡二叉树解题思路
  • 电子商务网站应该如何建设四川教育公共信息服务平台
  • 响应式官方网站便宜自适应网站建设厂家
  • 实例016 百元买百鸡问题
  • 硬件-射频学习DAY3——高频电流的“恐深症”:趋肤效应解密
  • Hudi安装部署
  • 宠物之家网站开发九江网站建设优化公司
  • 网站的360快照怎么做锦州网站seo
  • 【Android】结合View的事件分发机制解决滑动冲突
  • python 异步编程 -- 理解asyncio里的Future 对象
  • zoho crmvue做网站对seo
  • Java---System 类
  • 31.使用等待队列实现阻塞访问
  • Tyme 技术赋能:节气与季节的高效求解实战攻略
  • 【C++】2025CSP-J第二轮真题及解析
  • 网站建设教程流程更改wordpress主题语言
  • 朝阳区网站建设蒙特网设计公司
  • 济南网站优化厂家做同城服务网站比较成功的网站
  • 老鼠目标检测数据集(3000张)