fastapi -- 耗时任务处理
文章目录
- 耗时任务处理方法
- fastapi自带BackgroundTasks
- 基于Celery任务队列
耗时任务处理方法
fastapi自带BackgroundTasks
BackgroundTasks 属于:轻量级的内存任务队列管理器
主要弊端:
- 无持久化,任务可靠性差
- 无任务状态追踪
- 服务器重启丢失任务
- 内存限制,不适合大数据
- 缺乏重试、超时控制
异步CPU密集型会阻塞事件循环,fastapi无法继续处理其他的请求;
建议:仅用于开发环境或非关键的轻量级任务,生产环境建议使用专业的任务队列系统 。
案例代码:
# __author__ = "laufing"
# class_based_qt
import time
import uvicorn
import asyncio
from fastapi import FastAPI, BackgroundTasks, Form # Form依赖python-multipartapp = FastAPI()# BackgroundTasks 是一个任务队列管理器
def task_handler(msg): # 类型注解# 模拟发送邮件print(f"Sending email to {msg}")for i in range(10000000):print(i)print("task run end....")async def async_task_handler(msg): # 类型注解,自动识别并调用email-validator# 模拟发送邮件print(f"async sending email to {msg}")# for i in range(10000000): # CPU密集型的异步任务# print(i)await asyncio.sleep(100) # IO密集型print("async task run end....")# 添加路由 (同步终点函数 + 同步的耗时) + requests 同步的请求
@app.post("/task", status_code=202) # 成功则返回202
def submit_task( # 同步端点函数,在线程池中执行;内部的逻辑必须的同步的,不能使用async def/awaitbackground_tasks: BackgroundTasks, # 后台任务管理message: str=Form(..., description="从请求体中解析", alias="message") # 必须通过类型注释标识从哪里解析数据,否则可能报错422
):print("parsed message:", message)# 添加到后台,并开始执行# - 在同步的终点函数中,同步的耗时任务会在子线程中异步执行# IO密集型、CPU密集型均不会阻塞fastapi处理其他请求background_tasks.add_task(task_handler, message)return {"message": "Notification sent in background"}# 添加路由 (异步端点函数 + 异步的耗时任务) + requests 同步的请求
@app.post("/task2", status_code=202) # 成功则返回202
async def submit_task( # 异步的端点函数,在事件循环执行;内部的逻辑最好是异步的,使用async def/awaitbackground_tasks: BackgroundTasks, # 后台任务管理message: str=Form(..., description="从请求体中解析", alias="message") # 必须通过类型注释标识从哪里解析数据,否则可能报错422
):print("parsed message:", message)# 异步任务添加到后台,并开始执行# 异步的CPU密集型会阻塞fastapi处理其他请求(阻塞事件循环)# 异步的IO密集型则不会阻塞background_tasks.add_task(async_task_handler, message)return {"message": "Notification sent in background"}if __name__ == '__main__':uvicorn.run(app, host="0.0.0.0", port=5050)
对于以上CPU密集型任务阻塞事件循环场景,总结如下:
- 不要在 asyncio 事件循环中直接运行 CPU 密集型任务;
- 使用 asyncio.run_in_executor 将任务转移到进程池,选择 ProcessPoolExecutor 而不是 ThreadPoolExecutor;
- 合理配置工作进程数量(通常等于 CPU 核心数)
- 对于生产环境,考虑使用 Celery 或 RQ 等专业任务队列
这样就能在保持 FastAPI 高性能的同时,处理 CPU 密集型任务而不会阻塞事件循环。
import os
from concurrent.futures import ProcessPoolExecutordef sync_task_handler(msg): # 同步任务处理器print(f"sync sending email to {msg}")for i in range(1000000): # CPU密集型的任务print(i)print("sync task run end....")async def async_task_wrapper(msg): # 获取一个事件循环loop = asyncio.get_event_loop()# 事件循环运行在进程池中await loop.run_in_executor(process_pool, sync_task_handler, msg) # cannot pickle 'coroutine' object,所有只能用同步的处理器print("进程池中的任务执行完成...")# 获取CPU的核心数
cpu_count = os.cpu_count()
process_pool = ProcessPoolExecutor(max_workers=cpu_count)# 添加路由 (异步端点函数 + 异步的耗时任务) + requests 同步的请求
@app.post("/task2", status_code=202) # 成功则返回202
async def submit_task( # 异步的端点函数,在事件循环执行;内部的逻辑最好是异步的,使用async def/awaitbackground_tasks: BackgroundTasks, # 后台任务管理message: str=Form(..., description="从请求体中解析", alias="message") # 必须通过类型注释标识从哪里解析数据,否则可能报错422
):#添加任务background_tasks.add_task(async_task_wrapper, message)return {"message": "Notification sent in background"}# 优化进程池的创建
from contextlib import asynccontextmanager# 生命周期管理
@asynccontextmanager
async def lifespan(app: FastAPI):# 启动时创建进程池process_pool = ProcessPoolExecutor(max_workers=os.cpu_count())app.state.process_pool = process_poolyield# 关闭服务时清理process_pool.shutdown(wait=True)app = FastAPI(lifespan=lifespan)
将异步CPU密集型任务放入进程池的完成方案:
# __author__ = "laufing"
# class_based_qt
import time
import os
import uvicorn
import asyncio
import uuid
from concurrent.futures import ProcessPoolExecutor
from contextlib import asynccontextmanager
from fastapi import FastAPI, BackgroundTasks, Form # Form依赖python-multipart
from fastapi import Query@asynccontextmanager
async def lifespan(app: FastAPI):# 启动服务时,创建进程池# 获取CPU的核心数cpu_count = os.cpu_count()process_pool = ProcessPoolExecutor(max_workers=cpu_count)app.state.process_pool = process_poolapp.state.task_data = {}#yield# 关闭服务时,关闭进程池process_pool.shutdown(wait=True)app = FastAPI(lifespan=lifespan) # 指定生命周期data = {"state": "ready"
}async def async_task_handler(msg): # 异步任务处理器print(f"async sending email to {msg}")for i in range(10): # CPU密集型的异步任务print(i)print("async task run end....")def run_task(msg):# 子进程中无法更新父进程的数据# data["state"] = "running"asyncio.run(async_task_handler(msg))# data["state"] = "success"# 添加路由 (异步端点函数 + 异步的耗时任务) + requests 同步的请求
@app.post("/task2", status_code=202) # 成功则返回202
async def submit_task( # 异步的端点函数,在事件循环执行;内部的逻辑最好是异步的,使用async def/awaitbackground_tasks: BackgroundTasks, # 后台任务管理message: str=Form(..., description="从请求体中解析", alias="message") # 必须通过类型注释标识从哪里解析数据,否则可能报错422
):# 可以直接访问app对象process_pool = app.state.process_pool# 进程池提交任务future = process_pool.submit(run_task, message)print("future?", future)# future是可await对象# future.done() 是否完成# future.cancled() 是否取消# future.result() 同步获取结果# future.running()/exception# await futuretask_id = str(uuid.uuid4())app.state.task_data[task_id] = futurereturn {"message": "任务以提交", "task_id": task_id}@app.get("/task2", status_code=200)
async def task_result(task_id: str=Query(..., description="task id")):task_data = app.state.task_datafuture = task_data.get(task_id)print(app.state.task_data)print(future, future.done())return {"task_status": future.done()}if __name__ == '__main__':uvicorn.run(app, host="0.0.0.0", port=5050)
基于Celery任务队列
# celery_tasks.py
from celery import Celery
import oscelery_app = Celery('tasks',broker='redis://localhost:6379/0',backend='redis://localhost:6379/0'
)@celery_app.task
def heavy_cpu_task(n: int):"""Celery 任务 - 在单独的进程中运行"""result = 0for i in range(n):result += i * ireturn result# FastAPI 集成
@app.post("/cpu-intensive-celery")
async def cpu_intensive_celery(n: int):from celery_tasks import heavy_cpu_task# 发送任务到 Celery workertask = heavy_cpu_task.delay(n)return {"task_id": task.id, "status": "queued"}@app.get("/task-result/{task_id}")
async def get_task_result(task_id: str):from celery_tasks import heavy_cpu_taskresult = heavy_cpu_task.AsyncResult(task_id)if result.ready():return {"status": "completed", "result": result.result}else:return {"status": "processing"}