FastAPI 异步处理模板:高效处理耗时任务,解决使用async api调用时返回顺序不对的问题
出现过几个问题:
1、接收到两个请求,但是一个请求运行结束后不会直接返回,只有第二个运行完毕才一起返回
2、有的方式,两个服务的计算时间会被加起来,比如一共运行了20,最后的log两个都写的20s
下面是一个精简的 FastAPI 异步处理模板,适合处理需要长时间运行的任务(如图像生成、视频处理等)。这个模板采用后台工作线程模式,使 API 能够立即响应请求,同时在后台处理耗时任务。
核心异步处理模式说明
这个模板展示了基于 FastAPI 的异步处理模式,主要包含以下几个关键部分:
1. 核心组件
- 任务队列 (
task_queue
): 存储待处理的任务 - 结果字典 (
results
): 存储已处理任务的结果 - 处理线程: 从队列获取任务,处理后将结果存入字典
- 异步等待机制: 在 API 端点中等待任务处理完成
2. 工作流程
-
请求接收:
- API 接收到请求,包含任务数据和唯一标识符 (
uuid
) - 进行基本验证
- API 接收到请求,包含任务数据和唯一标识符 (
-
任务提交:
- 将任务数据放入队列
- 后台线程不断从队列取出任务处理
-
异步等待:
- API 端点使用
wait_for_result
函数等待结果 - 通过轮询
results
字典检查结果是否可用 - 设置超时机制防止无限等待
- API 端点使用
-
结果返回:
- 一旦结果可用,从
results
中获取并返回 - 同时从字典中移除结果,避免内存泄漏
- 一旦结果可用,从
3. 优势
- 简单直观: 通过队列和字典实现异步处理,容易理解和扩展
- 资源控制: 通过队列限制并发处理任务数量
- 直接响应: 等待任务完成并直接返回结果,无需客户端轮询
4. 使用场景
- 适合耗时操作: 如图像处理、视频转码、复杂计算等
- 单一响应模式: 适合客户端期望一次请求得到完整结果的场景
- 中等处理时间: 最适合处理时间在几秒到几十秒之间的任务
5. 扩展方向
- 优先级队列: 可以扩展为支持任务优先级的队列
- 并行处理: 可以增加多个处理线程,提高吞吐量
- 持久化存储: 对于重要任务,可以添加任务和结果的持久化
- 进度报告: 可以扩展为支持任务进度报告
import argparse
import traceback
import time
import asyncio
import threading
import queue
from typing import Dict, Any
import uvicorn
from fastapi import FastAPI, Body
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
# 导入日志工具(根据实际情况可以替换或移除)
from log_utils import logger
# 创建 FastAPI 应用
app = FastAPI()
# 创建请求队列和结果存储
task_queue = queue.Queue()
results = {}
# 处理线程停止控制
stop_processor = threading.Event()
# 基本参数验证函数
def validate_request(data: dict) -> tuple[bool, str]:
"""基本请求验证,实际应用中根据需要扩展"""
if not data or "uuid" not in data:
return False, "Missing required field: uuid"
return True, ""
# 任务处理函数(替换为实际业务逻辑)
def process_task(task_data: Dict[str, Any]) -> tuple[bool, str, Any]:
"""
处理任务的函数,返回处理结果
Args:
task_data: 包含任务数据的字典
Returns:
tuple[bool, str, Any]: (成功标志, 消息, 结果数据)
"""
# 模拟处理过程
task_id = task_data.get("uuid", "unknown")
time.sleep(2) # 模拟耗时操作
# 这里替换为实际的处理逻辑
success = True
message = "处理成功"
result = f"result_for_{task_id}" # 模拟的结果数据
return success, message, result
# 后台处理线程
def task_processor():
while not stop_processor.is_set():
try:
# 从队列获取任务
try:
task_data = task_queue.get(block=False)
except queue.Empty:
time.sleep(0.1)
continue
task_id = task_data.get("uuid", "unknown")
try:
# 处理任务
start_time = time.time()
success, message, result_data = process_task(task_data)
processing_time = time.time() - start_time
# 构建响应
result = {
"success": success,
"code": 0 if success else 1,
"msg": message,
"data": {
"uuid": task_id,
"result": result_data if success else ""
}
}
# 记录处理结果
log_func = logger.info if success else logger.error
log_func(f"Task {task_id}: {message}, Time: {processing_time:.2f}s")
# 存储结果
results[task_id] = result
except Exception as e:
# 处理异常
logger.error(f"Task processing error: {str(e)}")
logger.error(traceback.format_exc())
# 存储错误结果
results[task_id] = {
"success": False,
"code": 1,
"msg": f"Processing error: {str(e)}",
"data": {
"uuid": task_id,
"result": ""
}
}
finally:
# 标记任务完成
task_queue.task_done()
except Exception as e:
logger.error(f"Processor thread error: {str(e)}")
time.sleep(1) # 避免高速循环
# 启动处理线程
processor_thread = threading.Thread(target=task_processor, daemon=True)
processor_thread.start()
# 提交任务
def submit_task(data):
"""提交任务到队列"""
task_queue.put(data)
# 等待结果
async def wait_for_result(task_id, timeout=60):
"""等待指定任务ID的结果,带超时机制"""
start_time = time.time()
while True:
# 检查结果是否可用
if task_id in results:
result = results[task_id]
# 从结果字典中移除结果
del results[task_id]
return result
# 检查是否超时
if time.time() - start_time > timeout:
return {
"success": False,
"code": 1,
"msg": f"Request timeout ({timeout}s)",
"data": {
"uuid": task_id,
"result": ""
}
}
# 非阻塞等待
await asyncio.sleep(0.1)
# API 端点
@app.get("/health")
async def health_check():
"""健康检查接口"""
return {
"status": "ok",
"queue_size": task_queue.qsize(),
"results_pending": len(results)
}
@app.post("/process")
async def process_api(data: Dict[str, Any] = Body(...)):
"""处理请求的主API"""
# 获取任务ID
task_id = str(data.get("uuid", ""))
if not task_id:
task_id = str(time.time()) # 如果未提供ID,生成一个时间戳ID
data["uuid"] = task_id
try:
# 基本验证
is_valid, error_msg = validate_request(data)
if not is_valid:
return JSONResponse(
status_code=400,
content={
"success": False,
"code": 1,
"msg": error_msg,
"data": {
"uuid": task_id,
"result": ""
}
}
)
logger.info(f"Received task: {task_id}")
# 提交任务到处理队列
submit_task(data)
# 等待处理完成
result = await wait_for_result(task_id)
# 返回结果
return JSONResponse(status_code=200, content=result)
except Exception as e:
# 异常处理
logger.error(f"API error for task {task_id}: {str(e)}")
logger.error(traceback.format_exc())
result = {
"success": False,
"code": 1,
"msg": f"System error: {str(e)}",
"data": {
"uuid": task_id,
"result": ""
}
}
return JSONResponse(status_code=500, content=result)
if __name__=="__main__":
parser = argparse.ArgumentParser(description='Async Task Processor')
parser.add_argument('--host', type=str, default='0.0.0.0')
parser.add_argument('--port', type=int, default=8080)
args = parser.parse_args()
logger.info(f"Starting server with: {args}")
uvicorn.run(app, host=args.host, port=args.port, log_level="info")
尝试过了,多个副本的话不会只有一个副本接收请求。