3 celery任务与队列
一、任务定义与使用
1.1 @task 装饰器
from celery import Celeryapp = Celery('demo', broker='redis://localhost:6379/0')@app.task
def add(x, y):return x + y# 调用任务(立即执行)
add.delay(4, 5)# 定时任务(5分钟后执行)
add.apply_async(args=(4, 5), countdown=300)
特性说明:
- 支持命名任务:
@app.task(name='math_operations.add')
- 绑定任务实例:
@app.task(bind=True)
可访问任务上下文 - 新版语法:Celery 5+ 支持
@shared_task
装饰器
二、任务参数与返回值
2.1 参数传递规范
@app.task
def process_data(data, threshold=0.5, *, format='json'):# 处理逻辑return processed_data
最佳实践:
- 避免传递不可序列化对象(如数据库连接)
- 复杂参数建议使用字典打包
- 使用关键字参数提高可读性
2.2 返回值处理
result = add.delay(4, 5)# 获取结果(阻塞方式)
print(result.get(timeout=10)) # 输出 9# 检查状态
if result.ready():print("任务已完成")
返回值特性:
- 默认存储于配置的结果后端(Redis/RabbitMQ等)
- 支持自定义序列化方式
- 大结果建议使用外部存储(如S3、数据库)
三、队列配置与管理
3.1 基础队列配置
# celeryconfig.py# 定义多个队列
CELERY_CREATE_MISSING_QUEUES = False # 关闭自动创建
CELERY_QUEUES = (Queue('high_priority', routing_key='hp.#'),Queue('default', routing_key='default'),Queue('reports', routing_key='reports.#')
)# 默认路由配置
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'
CELERY_DEFAULT_ROUTING_KEY = 'default'
3.2 任务路由策略
# 方法1:装饰器指定
@app.task(queue='high_priority')
def generate_report():# 生成报表逻辑pass# 方法2:全局路由配置
CELERY_ROUTES = {'tasks.generate_pdf': {'queue': 'reports','routing_key': 'reports.pdf'},'tasks.*': {'queue': 'default'}
}
3.3 Worker 启动配置
# 启动专用Worker
celery -A proj worker -Q high_priority,reports -c 4# 常用参数:
# -Q 指定监听队列(逗号分隔)
# -c 并发worker数量
# --prefetch-multiplier 预取任务数量控制
四、实战应用场景
场景1:优先级队列系统
# 紧急任务路由
CELERY_ROUTES = {'payment.process': {'queue': 'critical'},'emails.send': {'queue': 'medium'},'analytics.*': {'queue': 'low'}
}# 启动不同优先级的Worker
# 高优先级:celery worker -Q critical -c 2
# 中优先级:celery worker -Q medium -c 4
# 低优先级:celery worker -Q low -c 8
场景2:任务类型隔离
# 分离I/O密集和CPU密集任务
CELERY_QUEUES = (Queue('io_intensive', routing_key='io.#'),Queue('cpu_intensive', routing_key='cpu.#')
)# 配置不同Worker资源
# I/O Worker:celery worker -Q io_intensive -P gevent -c 100
# CPU Worker:celery worker -Q cpu_intensive -c $(nproc)
五、调试与监控技巧
- 实时查看队列状态
celery -A proj inspect active_queues
- 任务追踪配置
@app.task(track_started=True)
def long_running_task():# 任务将记录开始时间
- 异常处理策略
@app.task(autoretry_for=(TimeoutError,), max_retries=3)
def unreliable_api_call():# 自动重试逻辑