我的全栈学习之旅:Celery(持续更新!!!)
一、什么是 Celery?它为谁解决什么问题?
Celery 是一个用 Python 写的分布式任务队列/异步任务处理框架。它让你把需要耗时或者异步执行的工作(发送邮件、视频转码、调用第三方 API、批处理)从主应用脱离,交给后台 worker 去做,从而不阻塞主线程/请求。它也原生支持周期性任务调度(celery beat)和任务重试、结果存储、路由/优先级/限速等功能。
用处概括:
- 把耗时任务异步化,提升响应速度与吞吐。
- 可水平扩展(多 worker),实现高可用与并发处理。
- 支持任务重试、过期、结果后端、任务路由与限速。
- 内置调度器(celery beat)支持周期性任务(cron / interval / solar 等)。
二、核心架构(简明)
- Producer(生产者):你的应用,调用
task.delay()或task.apply_async()把任务发到队列。 - Broker(消息中间件):保存任务消息的组件(常见:Redis、RabbitMQ)。Worker 从这里取任务。
- Worker(消费者):后台进程,运行任务函数。可以横向扩容。
- Result backend(可选):保存任务执行结果(Redis、数据库等)。
- Beat(scheduler):调度器,按配置定期把任务放入队列(类似 cron,但可以和 Celery 协同)。
三、Celery 如何解决这些问题(原理简述)
- 异步执行:
delay()将任务序列化并发到 broker,立即返回,worker 异步消费执行 -> 不阻塞主流程。 - 可靠性:broker 与重试机制保证失败可重试(可配置重试次数、重试间隔、指数退避等)。
- 定时/周期:beat 负责按 schedule(interval/crontab/solar)把任务放入队列,由 worker 实际执行。可把 schedule 存在配置里或数据库(如 django-celery-beat)。
一个Demo:
下面这个Demo是实现,使用celery框架定时(以北京时间为准)的向一个后端服务发送请求。后端服务使用FastAPI,随便写一个简单的。然后,需要每10s发送一个请求,celery的broker使用redis来实现。使用redis作为Result Backend。
具体代码如下所示(推荐使用3.8+版本的Python):
文件结构:
--app--__init__.py--celery_app.py--requirements.txt--service.py--tasks.py
app/init.py:
"""Celery demo package."""# 从同目录下的 celery_app.py 文件导入 celery_app 实例。使用相对导入(.表示当前包)。这样外部代码可以直接通过 from app import celery_app 来获取 Celery 应用实例。
from .celery_app import celery_app# 定义模块的公开接口。当使用 from app import * 时,只会导入 celery_app。这是一个最佳实践,明确告诉使用者哪些是公开 API。
__all__ = ("celery_app",)
app/celery_app.py:
"""Celery application setup for the demo project."""
from __future__ import annotationsimport os
from celery import Celery# 定义默认的 Redis 连接 URL。格式是 redis://主机:端口/数据库编号。这里连接本地 Redis 的 0 号数据库。
DEFAULT_REDIS_URL = "redis://127.0.0.1:6379/0"# "learning.celery": 设置 Celery 应用的名称(main name)。这个名称会出现在日志中,帮助识别不同的 Celery 应用。通常使用项目的模块路径。
# include=["app.tasks"]: 指定要自动导入的任务模块列表。Celery 启动时会自动导入 app.tasks 模块,发现其中用 @celery_app.task 装饰的函数,并注册为可调用的任务。
celery_app = Celery("learning.celery",broker=os.getenv("CELERY_BROKER_URL", DEFAULT_REDIS_URL),backend=os.getenv("CELERY_RESULT_BACKEND", DEFAULT_REDIS_URL),include=["app.tasks"],
)# 更新Celery应用的配置
# task_default_queue="learning.celery.default": 作用: 设置默认的任务队列名称。
# 如果任务没有指定队列,会被发送到这个队列。
# 队列名称有助于:
### 任务分类管理
### 不同 worker 监听不同队列
### 任务优先级控制
# beat_schedule: 配置 Celery Beat(定时任务调度器)的调度表。Beat 是 Celery 的定时任务组件,类似于 Linux 的 cron。
# "ping-fastapi-daily-11am": { 定义一个定时任务的唯一标识符(可以是任意字符串)。这个名字会出现在 Beat 的日志中。
# "task": "app.tasks.ping_backend"。作用: 定义一个定时任务的唯一标识符(可以是任意字符串)。这个名字会出现在 Beat 的日志中。
celery_app.conf.update(timezone="Asia/Shanghai",enable_utc=False,task_default_queue="learning.celery.default",beat_schedule={"ping-fastapi-every-10-seconds": {"task": "app.tasks.ping_backend","schedule": 10.0,}},
)app/requirements.txt:
celery>=5.3
fastapi>=0.111
uvicorn[standard]>=0.30
redis>=5.0
httpx>=0.27
pytz>=2024.1app/service.py:
"""FastAPI application that receives requests from celery tasks."""
from __future__ import annotationsfrom datetime import datetime
from typing import Any, Dictfrom fastapi import FastAPI
from pydantic import BaseModel
import pytzapp = FastAPI(title="Celery Demo Service", version="0.1.0")
BEIJING_TZ = pytz.timezone("Asia/Shanghai")class PingPayload(BaseModel):"""Schema describing the request body sent by the celery task."""triggered_at: strtask_id: str@app.post("/ping")
def handle_ping(payload: PingPayload) -> Dict[str, Any]:"""Return a confirmation response including the server-side timestamp."""received_at = datetime.now(BEIJING_TZ).isoformat(timespec="seconds")return {"message": "pong","received_at": received_at,"payload": payload.dict(),}@app.get("/health")
def healthcheck() -> Dict[str, str]:"""Lightweight endpoint used by smoke tests."""return {"status": "ok"}if __name__ == "__main__":import uvicornuvicorn.run("app.service:app", host="127.0.0.1", port=8000, reload=True)app/tasks.py:
"""Celery tasks responsible for interacting with the FastAPI service."""
from __future__ import annotationsimport os
from datetime import datetime
from typing import Any, Dictimport httpx
import pytz
from httpx import HTTPStatusError, RequestError
from celery.utils.log import get_task_loggerfrom .celery_app import celery_applogger = get_task_logger(__name__)FASTAPI_BASE_URL = os.getenv("FASTAPI_BASE_URL", "<http://127.0.0.1:8000>")
FASTAPI_PING_PATH = os.getenv("FASTAPI_PING_PATH", "/ping")
BEIJING_TZ = pytz.timezone("Asia/Shanghai")def _beijing_now() -> datetime:"""Return the current time in Beijing."""return datetime.now(BEIJING_TZ)# 作用: 使用装饰器将函数注册为 Celery 任务。装饰器模式是 Celery 注册任务的标准方式。
# name="app.tasks.ping_backend"。作用: 显式指定任务名称。
@celery_app.task(name="app.tasks.ping_backend",bind=True,autoretry_for=(RequestError,),retry_backoff=True,retry_kwargs={"max_retries": 3},
)
def ping_backend(self) -> Dict[str, Any]:"""Send a POST request to the FastAPI service and return the response payload."""payload = {"triggered_at": _beijing_now().isoformat(timespec="seconds"),"task_id": self.request.id,}url = f"{FASTAPI_BASE_URL.rstrip('/')}{FASTAPI_PING_PATH}"try:with httpx.Client(timeout=5.0) as client:response = client.post(url, json=payload)response.raise_for_status()except HTTPStatusError as exc:logger.warning("FastAPI service responded with %s for %s | payload=%s",exc.response.status_code,url,payload,)raise self.retry(exc=exc, countdown=5)except RequestError as exc:logger.error("Network error when reaching %s | payload=%s | error=%s", url, payload, exc)raiselogger.info("Successfully pinged backend | url=%s status=%s",url,response.status_code,)return {"request": payload,"response": response.json(),}具体执行步骤:
(1)创建一个Python虚拟环境,并激活这个虚拟环境:
python -m venv celery_venv ./celery_venv/Scripts/Activate
(2)安装Python的依赖模块: pip install -r requirements.txt
(3)启动redis:
sudo systemctl start redis-server
(4)启动FastAPI服务(一个窗口): uvicorn app.service:app --reload
(5)启动celery worker(一个窗口): celery -A app.tasks worker --loglevel=info
(6)启动调度器(一个窗口): celery -A app.tasks beat --loglevel=info
