Python基础(⑨Celery 分布式任务队列)
普通异步解决 “同一进程内不等待” 的问题,适合快速的 IO 操作。
Celery 解决 “把任务扔出去,让别人(独立进程 / 机器)干” 的问题,适合重活、累活,还能多 “人” 一起干。
对比维度 | 普通异步(如 async/await ) | Celery 分布式任务队列 |
---|---|---|
任务执行位置 | 和主程序在同一个进程内 | 在独立的 Worker 进程 / 机器上 |
适用场景 | 短时间的等待操作(如网络请求) | 长时间任务(如数据处理、文件转换) |
分布式能力 | 不能(单进程内) | 能(多机器部署 Worker 分工) |
任务管理 | 简单(无专门管理) | 强大(任务重试、优先级、定时等) |
安装 Celery 和依赖
pip install celery redis
确保 Redis 已安装并启动
tasks.py
from celery import Celery
import time# 初始化 Celery:指定 Broker 和 Backend(都用 Redis)
app = Celery('my_tasks',broker='redis://localhost:6379/0', # 任务队列backend='redis://localhost:6379/1' # 结果存储
)# 定义任务(用 @app.task 装饰)
@app.task
def add(a, b):print("开始计算...")time.sleep(3) # 模拟耗时操作return a + b@app.task
def send_email(to):print(f"发送邮件给 {to}...")time.sleep(2)return f"邮件已发送至 {to}"
启动 Worker
在终端切换到tasks.py的文件夹,输入
celery -A tasks worker --loglevel=info
main.py
from tasks import add, send_email# 异步发送任务(不会阻塞,立即返回任务ID)
result1 = add.delay(2, 3) # delay() 是发送任务的快捷方式
result2 = send_email.delay("user@example.com")print("任务1 ID:", result1.id)
print("任务2 ID:", result2.id)# 检查任务状态和结果(实际中可能在另一处代码中检查)
import time
time.sleep(5) # 等待任务完成print("任务1是否完成:", result1.ready())
print("任务1结果:", result1.result)
print("任务2结果:", result2.result)
运行 main.py