当前位置: 首页 > news >正文

基于Celery+Supervisord的异步任务管理方案

一、架构设计背景

1.1 需求场景分析

在Web应用中,当遇到以下场景时需要异步任务处理方案:

  • 高延迟操作(大文件解析/邮件发送/复杂计算)
  • 请求响应解耦(客户端快速响应)
  • 任务队列管理(任务优先级/失败重试)
  • 分布式任务调度(多Worker节点)

1.2 技术选型说明

组件作用版本要求
FastAPI构建高性能API接口>=0.68
Redis消息中间件+结果存储>=5.0
Celery分布式任务队列>=5.2
Supervisord进程监控与管理>=4.2

二、核心实现逻辑

2.1 异步任务处理流程

  1. 客户端上传文件到FastAPI
  2. API生成唯一任务ID并持久化任务信息
  3. 任务进入Redis队列
  4. Celery Worker消费队列任务
  5. 状态更新与结果存储
  6. 客户端轮询获取任务状态

2.2 代码实现优化

2.2.1 增强型FastAPI服务
# 文件校验中间件
def validate_file(file: UploadFile):
    if not file.filename.lower().endswith(('.csv', '.xlsx')):
        raise HTTPException(400, "仅支持CSV/XLSX格式")
    if file.size > 1024*1024*100:  # 100MB限制
        raise HTTPException(413, "文件超过大小限制")
    return file

# 上传接口
@app.post("/upload")
async def upload(file: UploadFile = File(...)):
    validated_file = validate_file(file)
    task_id = f"{uuid.uuid4().hex}_{secure_filename(file.filename)}"
    
    # 异步存储文件
    await file.seek(0)
    content = await file.read()
    loop = asyncio.get_event_loop()
    await loop.run_in_executor(None, save_upload_file, content, task_id)
    
    task_data = {
        "task_id": task_id,
        "file_path": file_path,
    }         
    r.lpush("task_queue", json.dumps(task_data))
    r.hset(name="task_status", 
           key=task_id, 
           value="pending")
    
    return JSONResponse({
        "code": 200,
        "data": {"task_id": task_id},
        "msg": "任务创建成功"
    })
2.2.2 健壮型Celery Worker
@app.task(
    bind=True,
    max_retries=3,
    soft_time_limit=300,
    autoretry_for=(Exception,),
    retry_backoff=True
)
def process_file_task(self, task_data):
    try:
        logger.info(f"Processing {task_data['task_id']}")
        # 实际业务逻辑
        time.sleep(10)
        r.hset("task_status", task_data["task_id"], "completed")
    except Exception as exc:
        self.retry(exc=exc, countdown=2 ** self.request.retries) 
2.2.3 轮询任务队列
r = redis.Redis(host="localhost", port=6379, db=0,password="123456")

def main():
    logger.info("任务轮询启动,正在轮询 Redis...")
    while True:
        task_data = r.lpop("task_queue")
        if task_data:
            data = json.loads(task_data)  
            logger.info("轮询到任务ID:"+data["task_id"])
            r.hset("task_status", data["task_id"], "processing")               
            save_file_to_disk.delay(data)
            logger.info("已在后台执行,继续轮询")
        time.sleep(3)  

if __name__ == "__main__":
    main()

三、生产级Supervisord配置

3.1 配置文件

[supervisord]
logfile=/var/log/supervisord.log
logfile_maxbytes=50MB
logfile_backups=10
loglevel=info
pidfile=/tmp/supervisord.pid
nodaemon=false

[program:fastapi]
command=uvicorn main:app --host 0.0.0.0 --port 8000
directory=/opt/app
autostart=true
autorestart=unexpected
startsecs=5
stopwaitsecs=30
user=www-data
environment=PYTHONPATH="/opt/app"

[program:celery_worker]
command=celery -A worker.celery_app worker --concurrency=4 -O fair
directory=/opt/app
autostart=true
autorestart=true
stdout_logfile=/var/log/celery_worker.log
redirect_stderr=true
killasgroup=true
stopasgroup=true

3.2 关键配置说明

  • 进程分组管理:killasgroup/stopasgroup确保子进程被正确回收
  • 日志轮转:logfile_maxbytes和logfile_backups防止日志膨胀
  • 资源限制:通过concurrency参数控制Worker并发数
  • 环境隔离:指定运行用户和Python路径

一、总结

在Web应用开发中,为了应对诸如处理大文件上传、发送邮件、执行复杂计算等耗时操作,以及实现请求响应解耦和分布式任务调度的需求,我们通常需要采用异步任务处理方案。本文介绍了一种基于FastAPI、Redis、Celery和Supervisord构建的高效异步任务处理架构。

相关文章:

  • HCIE云计算学什么?怎么学?未来职业发展如何?
  • 01 SQl注入基础步骤(数字、字符、布尔盲注、报错)
  • R2S的网络丢包率高问题小记
  • C0复习——课堂笔记<1>
  • 【C++】: STL详解 —— set和map类
  • 《MySQL三大核心日志解析:Undo Log/Redo Log/Bin Log对比与实践指南》
  • 私有云基础架构与运维(二)
  • Kylin麒麟操作系统服务部署 | NFS服务部署
  • 【音视频】ffplay常用命令
  • 【玩转正则表达式】正则表达式常用语法汇总
  • DevOps全流程
  • Redis高频面试题10个
  • 机器视觉运动控制一体机在天地盖同步跟随贴合解决方案
  • 系统架构设计师—数据库基础篇—数据库设计
  • C++编程:进阶阶段—4.1封装
  • Fork/Join 框架详解:分支合并的高性能并发编程
  • NoClassDefFoundError:UnsynchronizedByteArrayOutputStream
  • MySQL复合查询——通过案例讲解每个指令
  • MR的环形缓冲区(底层)
  • MyBatis-Plus开发流程:Spring Boot + MyBatis-Plus 实现对 book_tab 表的增删改查及Redis缓存
  • 国家统计局:2024年城镇单位就业人员工资平稳增长
  • 悬疑剧背后的女编剧:创作的差异不在性别,而在经验
  • 美国将与阿联酋合作建立海外最大的人工智能数据中心
  • 中办、国办关于持续推进城市更新行动的意见
  • 押井守在30年前创造的虚拟世界何以比当下更超前?
  • 订婚不等于性同意!山西订婚强奸案入选最高法案例