当前位置: 首页 > news >正文

我的全栈学习之旅:Celery(持续更新!!!)

一、什么是 Celery?它为谁解决什么问题?

Celery 是一个用 Python 写的分布式任务队列/异步任务处理框架。它让你把需要耗时或者异步执行的工作(发送邮件、视频转码、调用第三方 API、批处理)从主应用脱离,交给后台 worker 去做,从而不阻塞主线程/请求。它也原生支持周期性任务调度(celery beat)和任务重试、结果存储、路由/优先级/限速等功能。

用处概括:

  • 把耗时任务异步化,提升响应速度与吞吐。
  • 可水平扩展(多 worker),实现高可用与并发处理。
  • 支持任务重试、过期、结果后端、任务路由与限速。
  • 内置调度器(celery beat)支持周期性任务(cron / interval / solar 等)。

二、核心架构(简明)

  • Producer(生产者):你的应用,调用 task.delay()task.apply_async() 把任务发到队列。
  • Broker(消息中间件):保存任务消息的组件(常见:Redis、RabbitMQ)。Worker 从这里取任务。
  • Worker(消费者):后台进程,运行任务函数。可以横向扩容。
  • Result backend(可选):保存任务执行结果(Redis、数据库等)。
  • Beat(scheduler):调度器,按配置定期把任务放入队列(类似 cron,但可以和 Celery 协同)。

三、Celery 如何解决这些问题(原理简述)

  • 异步执行delay() 将任务序列化并发到 broker,立即返回,worker 异步消费执行 -> 不阻塞主流程。
  • 可靠性:broker 与重试机制保证失败可重试(可配置重试次数、重试间隔、指数退避等)。
  • 定时/周期:beat 负责按 schedule(interval/crontab/solar)把任务放入队列,由 worker 实际执行。可把 schedule 存在配置里或数据库(如 django-celery-beat)。

一个Demo:

下面这个Demo是实现,使用celery框架定时(以北京时间为准)的向一个后端服务发送请求。后端服务使用FastAPI,随便写一个简单的。然后,需要每10s发送一个请求,celery的broker使用redis来实现。使用redis作为Result Backend。

具体代码如下所示(推荐使用3.8+版本的Python):

文件结构:

--app--__init__.py--celery_app.py--requirements.txt--service.py--tasks.py

app/init.py:

"""Celery demo package."""# 从同目录下的 celery_app.py 文件导入 celery_app 实例。使用相对导入(.表示当前包)。这样外部代码可以直接通过 from app import celery_app 来获取 Celery 应用实例。
from .celery_app import celery_app# 定义模块的公开接口。当使用 from app import * 时,只会导入 celery_app。这是一个最佳实践,明确告诉使用者哪些是公开 API。
__all__ = ("celery_app",)

app/celery_app.py:

"""Celery application setup for the demo project."""
from __future__ import annotationsimport os
from celery import Celery# 定义默认的 Redis 连接 URL。格式是 redis://主机:端口/数据库编号。这里连接本地 Redis 的 0 号数据库。
DEFAULT_REDIS_URL = "redis://127.0.0.1:6379/0"# "learning.celery": 设置 Celery 应用的名称(main name)。这个名称会出现在日志中,帮助识别不同的 Celery 应用。通常使用项目的模块路径。
# include=["app.tasks"]: 指定要自动导入的任务模块列表。Celery 启动时会自动导入 app.tasks 模块,发现其中用 @celery_app.task 装饰的函数,并注册为可调用的任务。
celery_app = Celery("learning.celery",broker=os.getenv("CELERY_BROKER_URL", DEFAULT_REDIS_URL),backend=os.getenv("CELERY_RESULT_BACKEND", DEFAULT_REDIS_URL),include=["app.tasks"],
)# 更新Celery应用的配置
# task_default_queue="learning.celery.default": 作用: 设置默认的任务队列名称。
# 如果任务没有指定队列,会被发送到这个队列。
# 队列名称有助于:
### 任务分类管理
### 不同 worker 监听不同队列
### 任务优先级控制
# beat_schedule: 配置 Celery Beat(定时任务调度器)的调度表。Beat 是 Celery 的定时任务组件,类似于 Linux 的 cron。
# "ping-fastapi-daily-11am": { 定义一个定时任务的唯一标识符(可以是任意字符串)。这个名字会出现在 Beat 的日志中。
# "task": "app.tasks.ping_backend"。作用: 定义一个定时任务的唯一标识符(可以是任意字符串)。这个名字会出现在 Beat 的日志中。
celery_app.conf.update(timezone="Asia/Shanghai",enable_utc=False,task_default_queue="learning.celery.default",beat_schedule={"ping-fastapi-every-10-seconds": {"task": "app.tasks.ping_backend","schedule": 10.0,}},
)

app/requirements.txt:

celery>=5.3
fastapi>=0.111
uvicorn[standard]>=0.30
redis>=5.0
httpx>=0.27
pytz>=2024.1

app/service.py:

"""FastAPI application that receives requests from celery tasks."""
from __future__ import annotationsfrom datetime import datetime
from typing import Any, Dictfrom fastapi import FastAPI
from pydantic import BaseModel
import pytzapp = FastAPI(title="Celery Demo Service", version="0.1.0")
BEIJING_TZ = pytz.timezone("Asia/Shanghai")class PingPayload(BaseModel):"""Schema describing the request body sent by the celery task."""triggered_at: strtask_id: str@app.post("/ping")
def handle_ping(payload: PingPayload) -> Dict[str, Any]:"""Return a confirmation response including the server-side timestamp."""received_at = datetime.now(BEIJING_TZ).isoformat(timespec="seconds")return {"message": "pong","received_at": received_at,"payload": payload.dict(),}@app.get("/health")
def healthcheck() -> Dict[str, str]:"""Lightweight endpoint used by smoke tests."""return {"status": "ok"}if __name__ == "__main__":import uvicornuvicorn.run("app.service:app", host="127.0.0.1", port=8000, reload=True)

app/tasks.py:

"""Celery tasks responsible for interacting with the FastAPI service."""
from __future__ import annotationsimport os
from datetime import datetime
from typing import Any, Dictimport httpx
import pytz
from httpx import HTTPStatusError, RequestError
from celery.utils.log import get_task_loggerfrom .celery_app import celery_applogger = get_task_logger(__name__)FASTAPI_BASE_URL = os.getenv("FASTAPI_BASE_URL", "<http://127.0.0.1:8000>")
FASTAPI_PING_PATH = os.getenv("FASTAPI_PING_PATH", "/ping")
BEIJING_TZ = pytz.timezone("Asia/Shanghai")def _beijing_now() -> datetime:"""Return the current time in Beijing."""return datetime.now(BEIJING_TZ)# 作用: 使用装饰器将函数注册为 Celery 任务。装饰器模式是 Celery 注册任务的标准方式。
# name="app.tasks.ping_backend"。作用: 显式指定任务名称。
@celery_app.task(name="app.tasks.ping_backend",bind=True,autoretry_for=(RequestError,),retry_backoff=True,retry_kwargs={"max_retries": 3},
)
def ping_backend(self) -> Dict[str, Any]:"""Send a POST request to the FastAPI service and return the response payload."""payload = {"triggered_at": _beijing_now().isoformat(timespec="seconds"),"task_id": self.request.id,}url = f"{FASTAPI_BASE_URL.rstrip('/')}{FASTAPI_PING_PATH}"try:with httpx.Client(timeout=5.0) as client:response = client.post(url, json=payload)response.raise_for_status()except HTTPStatusError as exc:logger.warning("FastAPI service responded with %s for %s | payload=%s",exc.response.status_code,url,payload,)raise self.retry(exc=exc, countdown=5)except RequestError as exc:logger.error("Network error when reaching %s | payload=%s | error=%s", url, payload, exc)raiselogger.info("Successfully pinged backend | url=%s status=%s",url,response.status_code,)return {"request": payload,"response": response.json(),}

具体执行步骤:

(1)创建一个Python虚拟环境,并激活这个虚拟环境:

python -m venv celery_venv ./celery_venv/Scripts/Activate

(2)安装Python的依赖模块: pip install -r requirements.txt

(3)启动redis:

sudo systemctl start redis-server

(4)启动FastAPI服务(一个窗口): uvicorn app.service:app --reload

(5)启动celery worker(一个窗口): celery -A app.tasks worker --loglevel=info

(6)启动调度器(一个窗口): celery -A app.tasks beat --loglevel=info

http://www.dtcms.com/a/537344.html

相关文章:

  • 【Linux】xargs命令
  • CCUT应用OJ题解——贪吃的松鼠
  • [已解决]Python将COCO格式实例分割数据集转换为YOLO格式
  • CSS Backgrounds (背景)
  • Blender入门学习08 - 骨骼绑定
  • 家装设计网站开发企业做网站大概多少钱
  • TCP/UDP端口、IP协议号与路由协议 强行记忆点总结
  • (一)React面试()
  • 配置文件加载顺序与优先级规则
  • 数字化转型迫在眉睫,企业应该如何面对?
  • 做网站百度云网站登录不了
  • HF4054H-B 50V耐压 集成6.1V过压保护和1.3A过流保护 42V热拔插性能的500mA锂电池线性充电芯片
  • 网站可以做音频线吗网站如何安装源码
  • 小学校园文化建设网站网站打不开显示asp
  • 142.DDR报错bank32,33,34
  • Android性能优化深度解析与实际案例
  • 网站素材网站建设的目标和需求
  • 与您探讨电子元器件结构陶瓷(陶瓷基板)的分类及结构陶瓷的应用
  • 模板建站自适应互联网网站分了
  • 苹果ios安卓apk应用APP文件怎么修改手机APP显示的名称
  • 网站界面用什么做的网站创建方法
  • 《自动控制原理》第 3 章 线性控制系统的运动分析:3.6、3.7
  • 特征选择中的统计思维:哪些变量真的重要?
  • 项目七 使用ODL Yang UI操作流表
  • 电子商务网站怎么建料远若近网站建设
  • [CSP-S 2024] 超速检测
  • 基于MT5的K线处理逻辑
  • 河南郑州网站建设哪家公司好免费wordpress主题下载地址
  • 低空经济网络安全的政策体系构建
  • 网页设计网站规划深圳设计网站公司哪家好