当前位置: 首页 > news >正文

3 celery任务与队列

一、任务定义与使用

1.1 @task 装饰器

from celery import Celeryapp = Celery('demo', broker='redis://localhost:6379/0')@app.task
def add(x, y):return x + y# 调用任务(立即执行)
add.delay(4, 5)# 定时任务(5分钟后执行)
add.apply_async(args=(4, 5), countdown=300)

特性说明:

  • 支持命名任务:@app.task(name='math_operations.add')
  • 绑定任务实例:@app.task(bind=True) 可访问任务上下文
  • 新版语法:Celery 5+ 支持 @shared_task 装饰器

二、任务参数与返回值

2.1 参数传递规范

@app.task
def process_data(data, threshold=0.5, *, format='json'):# 处理逻辑return processed_data

最佳实践:

  • 避免传递不可序列化对象(如数据库连接)
  • 复杂参数建议使用字典打包
  • 使用关键字参数提高可读性

2.2 返回值处理

result = add.delay(4, 5)# 获取结果(阻塞方式)
print(result.get(timeout=10))  # 输出 9# 检查状态
if result.ready():print("任务已完成")

返回值特性:

  • 默认存储于配置的结果后端(Redis/RabbitMQ等)
  • 支持自定义序列化方式
  • 大结果建议使用外部存储(如S3、数据库)

三、队列配置与管理

3.1 基础队列配置

# celeryconfig.py# 定义多个队列
CELERY_CREATE_MISSING_QUEUES = False  # 关闭自动创建
CELERY_QUEUES = (Queue('high_priority', routing_key='hp.#'),Queue('default', routing_key='default'),Queue('reports', routing_key='reports.#')
)# 默认路由配置
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'
CELERY_DEFAULT_ROUTING_KEY = 'default'

3.2 任务路由策略

# 方法1:装饰器指定
@app.task(queue='high_priority')
def generate_report():# 生成报表逻辑pass# 方法2:全局路由配置
CELERY_ROUTES = {'tasks.generate_pdf': {'queue': 'reports','routing_key': 'reports.pdf'},'tasks.*': {'queue': 'default'}
}

3.3 Worker 启动配置

# 启动专用Worker
celery -A proj worker -Q high_priority,reports -c 4# 常用参数:
# -Q 指定监听队列(逗号分隔)
# -c 并发worker数量
# --prefetch-multiplier 预取任务数量控制

四、实战应用场景

场景1:优先级队列系统

# 紧急任务路由
CELERY_ROUTES = {'payment.process': {'queue': 'critical'},'emails.send': {'queue': 'medium'},'analytics.*': {'queue': 'low'}
}# 启动不同优先级的Worker
# 高优先级:celery worker -Q critical -c 2
# 中优先级:celery worker -Q medium -c 4
# 低优先级:celery worker -Q low -c 8

场景2:任务类型隔离

# 分离I/O密集和CPU密集任务
CELERY_QUEUES = (Queue('io_intensive', routing_key='io.#'),Queue('cpu_intensive', routing_key='cpu.#')
)# 配置不同Worker资源
# I/O Worker:celery worker -Q io_intensive -P gevent -c 100
# CPU Worker:celery worker -Q cpu_intensive -c $(nproc)

五、调试与监控技巧

  1. 实时查看队列状态
celery -A proj inspect active_queues
  1. 任务追踪配置
@app.task(track_started=True)
def long_running_task():# 任务将记录开始时间
  1. 异常处理策略
@app.task(autoretry_for=(TimeoutError,), max_retries=3)
def unreliable_api_call():# 自动重试逻辑

相关文章:

  • linux FTP服务器搭建
  • 【Python零基础入门系列】第1篇:Python 是什么?怎么装环境?推荐哪些 IDE?
  • 系统的环境变量
  • flink cdc 配置
  • 客户案例分享|运营商数智化经典案例 — XX运营商
  • Apache Flink的架构设计与运行流程说明
  • 电子电器架构 --- 人工智能、固态电池和先进自动驾驶功能等新兴技术的影响
  • IntelliJ IDEA 2024.3.1 for Mac 中文 Java开发工具
  • 织梦dedecms发布文章时取消自动生成关键字
  • 数据挖掘专栏介绍:用 Python + 大语言模型 (LLM) 重塑电商数据价值
  • 【Hive入门】Hive高级特性:视图与物化视图
  • C++——调用OpenCV和NVIDIA Video Codec SDK库实现使用GPU硬解码MP4视频文件
  • Go 1.25为什么要废除核心类型
  • 后验概率最大化(MAP)估计算法原理以及相具体的应用实例附C++代码示例
  • 设计模式 | 详解常用设计模式(六大设计原则,单例模式,工厂模式,建造者模式,代理模式)
  • 最新的30个Android Kotlin面试题
  • Python程序开发,麒麟系统模拟电脑打开文件实现
  • <c++>使用detectMultiScale的时候出现opencv.dll冲突
  • EtherCAT 分布式时钟(DC)补偿技术解析
  • 【今日半导体行业分析】2025年4月29日
  • 力箭二号火箭成功进行满载起竖试验,计划今年首飞发射轻舟飞船
  • 昂立教育:去年减亏1.39亿元,今年以“利润持续增长”为核心目标
  • 美财长称关税战升级的责任在中方,外交部:关税战、贸易战没有赢家
  • 言短意长|政府食堂、停车场开放的示范效应
  • 大学男生被捉奸后将女生推下高楼?桂林理工大学辟谣
  • 病人有头发,照护者不发疯:《黑镜》中的身体缺席与虚伪关怀