当前位置: 首页 > news >正文

FastAPI 异步处理模板:高效处理耗时任务,解决使用async api调用时返回顺序不对的问题

出现过几个问题:

1、接收到两个请求,但是一个请求运行结束后不会直接返回,只有第二个运行完毕才一起返回

2、有的方式,两个服务的计算时间会被加起来,比如一共运行了20,最后的log两个都写的20s

下面是一个精简的 FastAPI 异步处理模板,适合处理需要长时间运行的任务(如图像生成、视频处理等)。这个模板采用后台工作线程模式,使 API 能够立即响应请求,同时在后台处理耗时任务。

核心异步处理模式说明

这个模板展示了基于 FastAPI 的异步处理模式,主要包含以下几个关键部分:

1. 核心组件

  • 任务队列 (task_queue): 存储待处理的任务
  • 结果字典 (results): 存储已处理任务的结果
  • 处理线程: 从队列获取任务,处理后将结果存入字典
  • 异步等待机制: 在 API 端点中等待任务处理完成

2. 工作流程

  1. 请求接收:

    • API 接收到请求,包含任务数据和唯一标识符 (uuid)
    • 进行基本验证
  2. 任务提交:

    • 将任务数据放入队列
    • 后台线程不断从队列取出任务处理
  3. 异步等待:

    • API 端点使用 wait_for_result 函数等待结果
    • 通过轮询 results 字典检查结果是否可用
    • 设置超时机制防止无限等待
  4. 结果返回:

    • 一旦结果可用,从 results 中获取并返回
    • 同时从字典中移除结果,避免内存泄漏

3. 优势

  • 简单直观: 通过队列和字典实现异步处理,容易理解和扩展
  • 资源控制: 通过队列限制并发处理任务数量
  • 直接响应: 等待任务完成并直接返回结果,无需客户端轮询

4. 使用场景

  • 适合耗时操作: 如图像处理、视频转码、复杂计算等
  • 单一响应模式: 适合客户端期望一次请求得到完整结果的场景
  • 中等处理时间: 最适合处理时间在几秒到几十秒之间的任务

5. 扩展方向

  • 优先级队列: 可以扩展为支持任务优先级的队列
  • 并行处理: 可以增加多个处理线程,提高吞吐量
  • 持久化存储: 对于重要任务,可以添加任务和结果的持久化
  • 进度报告: 可以扩展为支持任务进度报告

import argparse  
import traceback  
import time  
import asyncio  
import threading  
import queue  
from typing import Dict, Any  

import uvicorn  
from fastapi import FastAPI, Body  
from fastapi.middleware.cors import CORSMiddleware  
from fastapi.responses import JSONResponse  
from fastapi.exceptions import RequestValidationError  

# 导入日志工具(根据实际情况可以替换或移除)  
from log_utils import logger  

# 创建 FastAPI 应用  
app = FastAPI()  

# 创建请求队列和结果存储  
task_queue = queue.Queue()  
results = {}  

# 处理线程停止控制  
stop_processor = threading.Event()  

# 基本参数验证函数  
def validate_request(data: dict) -> tuple[bool, str]:  
    """基本请求验证,实际应用中根据需要扩展"""  
    if not data or "uuid" not in data:  
        return False, "Missing required field: uuid"  
    return True, ""  

# 任务处理函数(替换为实际业务逻辑)  
def process_task(task_data: Dict[str, Any]) -> tuple[bool, str, Any]:  
    """  
    处理任务的函数,返回处理结果  
    
    Args:  
        task_data: 包含任务数据的字典  
        
    Returns:  
        tuple[bool, str, Any]: (成功标志, 消息, 结果数据)  
    """  
    # 模拟处理过程  
    task_id = task_data.get("uuid", "unknown")  
    time.sleep(2)  # 模拟耗时操作  
    
    # 这里替换为实际的处理逻辑  
    success = True  
    message = "处理成功"  
    result = f"result_for_{task_id}"  # 模拟的结果数据  
    
    return success, message, result  

# 后台处理线程  
def task_processor():  
    while not stop_processor.is_set():  
        try:  
            # 从队列获取任务  
            try:  
                task_data = task_queue.get(block=False)  
            except queue.Empty:  
                time.sleep(0.1)  
                continue  
            
            task_id = task_data.get("uuid", "unknown")  
            
            try:  
                # 处理任务  
                start_time = time.time()  
                success, message, result_data = process_task(task_data)  
                processing_time = time.time() - start_time  
                
                # 构建响应  
                result = {  
                    "success": success,  
                    "code": 0 if success else 1,  
                    "msg": message,  
                    "data": {  
                        "uuid": task_id,  
                        "result": result_data if success else ""  
                    }  
                }  
                
                # 记录处理结果  
                log_func = logger.info if success else logger.error  
                log_func(f"Task {task_id}: {message}, Time: {processing_time:.2f}s")  
                
                # 存储结果  
                results[task_id] = result  
                
            except Exception as e:  
                # 处理异常  
                logger.error(f"Task processing error: {str(e)}")  
                logger.error(traceback.format_exc())  
                
                # 存储错误结果  
                results[task_id] = {  
                    "success": False,  
                    "code": 1,  
                    "msg": f"Processing error: {str(e)}",  
                    "data": {  
                        "uuid": task_id,  
                        "result": ""  
                    }  
                }  
            
            finally:  
                # 标记任务完成  
                task_queue.task_done()   

        except Exception as e:  
            logger.error(f"Processor thread error: {str(e)}")  
            time.sleep(1)  # 避免高速循环  

# 启动处理线程  
processor_thread = threading.Thread(target=task_processor, daemon=True)  
processor_thread.start()  

# 提交任务  
def submit_task(data):  
    """提交任务到队列"""  
    task_queue.put(data)  

# 等待结果  
async def wait_for_result(task_id, timeout=60):  
    """等待指定任务ID的结果,带超时机制"""  
    start_time = time.time()  
    while True:  
        # 检查结果是否可用  
        if task_id in results:  
            result = results[task_id]  
            # 从结果字典中移除结果  
            del results[task_id]  
            return result  
        
        # 检查是否超时  
        if time.time() - start_time > timeout:  
            return {  
                "success": False,  
                "code": 1,  
                "msg": f"Request timeout ({timeout}s)",  
                "data": {  
                    "uuid": task_id,  
                    "result": ""  
                }  
            }  
        
        # 非阻塞等待  
        await asyncio.sleep(0.1)  

# API 端点  
@app.get("/health")  
async def health_check():  
    """健康检查接口"""  
    return {  
        "status": "ok",  
        "queue_size": task_queue.qsize(),  
        "results_pending": len(results)  
    }  

@app.post("/process")    
async def process_api(data: Dict[str, Any] = Body(...)):    
    """处理请求的主API"""  
    # 获取任务ID  
    task_id = str(data.get("uuid", ""))  
    if not task_id:  
        task_id = str(time.time())  # 如果未提供ID,生成一个时间戳ID  
        data["uuid"] = task_id  
    
    try:  
        # 基本验证  
        is_valid, error_msg = validate_request(data)  
        if not is_valid:  
            return JSONResponse(  
                status_code=400,  
                content={  
                    "success": False,  
                    "code": 1,  
                    "msg": error_msg,  
                    "data": {  
                        "uuid": task_id,  
                        "result": ""  
                    }  
                }  
            )  

        logger.info(f"Received task: {task_id}")  

        # 提交任务到处理队列  
        submit_task(data)  
        
        # 等待处理完成  
        result = await wait_for_result(task_id)   

        # 返回结果  
        return JSONResponse(status_code=200, content=result)  
    
    except Exception as e:  
        # 异常处理  
        logger.error(f"API error for task {task_id}: {str(e)}")  
        logger.error(traceback.format_exc())  
        
        result = {  
            "success": False,  
            "code": 1,  
            "msg": f"System error: {str(e)}",  
            "data": {  
                "uuid": task_id,  
                "result": ""  
            }  
        }  
        
        return JSONResponse(status_code=500, content=result)  


if __name__=="__main__":  
    parser = argparse.ArgumentParser(description='Async Task Processor')  
    parser.add_argument('--host', type=str, default='0.0.0.0')  
    parser.add_argument('--port', type=int, default=8080)   
    
    args = parser.parse_args()  
    logger.info(f"Starting server with: {args}")  
    
    uvicorn.run(app, host=args.host, port=args.port, log_level="info")  

尝试过了,多个副本的话不会只有一个副本接收请求。

相关文章:

  • 深度优先搜索(DFS)完全解析:从原理到 Java 实战
  • 1000BASE-T的磁性模块和1000BASE-TX的磁性模块的区别
  • Ubuntu16.04网卡ens33找不到异常修复
  • 二分图判定算法
  • CFD计算中如何应对cell aspect ratio比例严重失调情况
  • 第一章,网络发展史
  • LangChain组件Tools/Toolkits详解(7)——工具调用与Toolkits
  • Java线程池深度解析:从使用到调优
  • QT笔记---JSON
  • 高并发库存系统是否适合使用 ORM(Hibernate / MyBatis)
  • kafka压缩
  • 从0到1在windows上用flutter开发android app(环境准备、创建项目、加速构建)
  • Linux环境变量:深入解析与实用指南
  • 软件上线倒计时,测试团队如何量化风险优先级?
  • 本地基于Ollama部署的DeepSeek详细接口文档说明
  • 【dify】 dify环境变量配置说明
  • AI智能问答“胡说八道“-RAG探索之路
  • 微信小程序使用状态管理 - mobx-miniprogram
  • 打破同源策略:前端跨域的全面解析与应对策略
  • MIPI 详解:XAPP894 D-PHY Solutions
  • 广西钦州:坚决拥护自治区党委对钟恒钦进行审查调查的决定
  • 云南一餐馆收购长江野生鱼加工为菜品,被查处罚款
  • 雇来的“妈妈”:为入狱雇主无偿带娃4年,没做好准备说再见
  • 吉林市马拉松5月18日开赛,奖牌、参赛服公布
  • 经济日报整版聚焦“妈妈岗”:就业路越走越宽,有温度重实效
  • 2025江西跨境电子商务发展交流会召开,探索行业发展新趋势