当前位置: 首页 > news >正文

Django+Celery 进阶:动态定时任务的添加、修改与智能调度实战

文章目录

    • 一、Celery定时任务
      • Celery Beat 介绍
      • Celery Beat 配置持久化
    • 二、Celery Beat与Django集成
      • 安装配置
      • 数据库迁移
    • 三、Celery Beat项目实战
      • 定义Celery任务
      • 通过视图集动态管理定时任务
      • 启动Celery Beat
    • 四、实战效果


一、Celery定时任务

Celery Beat 介绍

Celery Beat 是 Celery 框架的一个内置组件,专门用于定时任务调度。它可以按照预设的时间规则(如固定间隔、特定时间点、CRON 表达式等)自动触发 Celery 任务,广泛应用于需要周期性执行的场景(如定时数据备份、日志清理、报表生成等)。

工作原理

  • Beat 进程:独立运行的调度进程,负责解析定时规则并生成任务消息。
  • 任务发送:当到达预设时间,Beat 进程将任务发送到 Celery 的消息队列(如 Redis)。
  • 任务执行:Celery Worker 进程从队列中获取任务并执行。

简单来说,Celery Beat 是 “定时发令枪”,而 Worker 是 “执行者”。

Celery Beat 配置持久化

默认情况下,任务配置存储在内存中,重启后会丢失。需要通过后端存储(如数据库)实现持久化,确保任务配置不丢失。

项目名称说明
django-celery-beat通过数据库实现任务配置持久化
django-celery-results通过数据库实现任务结果持久化
django-celery只支持Celery 3.X版本(不推荐)

二、Celery Beat与Django集成

安装配置

安装

pip install django-celery-beat django-celery-results

在Django项目settings.py中添加

INSTALLED_APPS = [...'django_celery_beat','django_celery_results'
]### Celery 配置
CELERY_BROKER_URL = f"{REDIS_URL}/{REDIS_DB}"  # 使用Redis作为消息代理
CELERY_RESULT_BACKEND = "django-db"  # 使用数据库存储结果
CELERY_BEAT_SCHEDULER = ("django_celery_beat.schedulers:DatabaseScheduler"  # 使用数据库保存定时任务
)
CELERY_TIMEZONE = "Asia/Shanghai"
CELERY_ENABLE_UTC = True
CELERY_RESULT_EXTENDED = True  # 启用后才会记录 task_name、date_started 等字段
CELERY_TASK_TRACK_STARTED = True  # 记录任务开始时间

定义 Celery 实例:创建文件mysite\mysite\celery.py

"""定义和配置 Celery 实例"""import os
from celery import Celery
from django.conf import settingsos.environ.setdefault("DJANGO_SETTINGS_MODULE", "mysite.settings")
# 创建 Celery 实例
app = Celery("mysite")
# 加载配置文件中的 Celery 配置
app.config_from_object("django.conf:settings", namespace="CELERY")
# 自动发现并加载任务
app.autodiscover_tasks(["myapp_infra", "myapp_system"] + settings.MY_APPS, force=True)

配置 Django 启动时会加载应用:修改文件mysite\mysite\__init__.py

"""Django 启动时加载Celery实例"""from .celery import app as celery_app__all__ = ("celery_app",)

数据库迁移

执行数据库迁移,创建相关数据库表。其中:

  • django_celery_beat_periodictask:用于存储任务名称、任务路径、任务参数等元数据。
  • django_celery_beat_crontabschedule:用于存储CRON表达式。
# 在Django项目根目录(包括manage.py的目录)执行
python manage.py migrate django_celery_beat
python manage.py migrate django_celery_results

在这里插入图片描述

三、Celery Beat项目实战

定义Celery任务

发现任务:Celery 将自动从所有已安装的应用APP中发现任务,需要遵守以下目录结构

- myapp_system/- tasks.py- models.py
- myapp_infra/- tasks.py- models.py

定义任务:创建文件myapp_infra/tasks.py,使用@shared_task装饰器定义 Celery 任务

"""定义 Celery 任务"""from time import sleep
from celery import shared_task
from django.utils import timezone@shared_task
def send_daily_report():# 示例:发送日报print(f"开始发送日报,现在时间:{timezone.now()}")sleep(30)print("发送成功")return "发送成功"@shared_task
def cleanup_expired_data():# 示例:清理过期数据print("清理过期数据")sleep(15)print("清理完成")return "清理完成"

通过视图集动态管理定时任务

下面是通过 DRF 视图集,动态管理定时任务示例,实现对定时任务的增删改查、手动触发、开启暂停等操作

  • 定义视图:myapp_infra\job\views.py
import json
from celery import current_app
from django_celery_beat.models import PeriodicTask
from drf_spectacular.utils import extend_schema
from rest_framework.decorators import action
from rest_framework.generics import get_object_or_404from mars_framework.viewsets.base import CustomModelViewSetNoSimple
from mars_framework.permissions.base import HasPermission
from mars_framework.response.base import CommonResponse
from .serializers import JobSaveSerializer, JobSerializer
from .filters import JobFilter
from .services import infra_job_service@extend_schema(tags=["管理后台-infra-定时任务"])
class JobViewSet(CustomModelViewSetNoSimple):queryset = PeriodicTask.objects.all()serializer_class = JobSerializerfilterset_class = JobFilteraction_serializers = {"create": JobSaveSerializer,"update": JobSaveSerializer,}action_permissions = {"create": [HasPermission("infra:job:create")],"destroy": [HasPermission("infra:job:delete")],  # TODO 是否需要删除对应shedule"update": [HasPermission("infra:job:update")],"retrieve": [HasPermission("infra:job:query")],"list": [HasPermission("infra:job:query")],"export": [HasPermission("infra:job:export")],"update_status": [HasPermission("infra:job:update")],"trigger": [HasPermission("infra:job:trigger")],"sync": [HasPermission("infra:job:create")],"get_next_times": [HasPermission("infra:job:query")],}action_querysets = {# 排除name=celery.backend_cleanup"list": PeriodicTask.objects.exclude(name="celery.backend_cleanup"),"export": PeriodicTask.objects.exclude(name="celery.backend_cleanup"),}export_name = "定时任务列表"export_fields_labels = {"id": "任务编号","name": "任务名称","task": "处理器名字","kwargs": "处理器参数","cron_expression": "CRON表达式","status": "任务状态",}export_data_map = {"status": {1: "开启", 2: "暂停"},}@extend_schema(summary="新增")def create(self, request, *args, **kwargs):"""创建定时任务"""serializer = self.get_serializer(data=request.data)serializer.is_valid(raise_exception=True)# CRON表达式cron_expression = serializer.validated_data.pop("cron_expression")schedule = infra_job_service.get_or_create_crontab_schedule(cron_expression)# 创建任务task_data = {"name": serializer.validated_data.get("name"),"task": serializer.validated_data.get("task"),"kwargs": serializer.validated_data.get("kwargs"),"crontab": schedule,"enabled": False,  # 默认禁用}PeriodicTask.objects.create(**task_data)return CommonResponse.success()@extend_schema(summary="更新")def update(self, request, *args, **kwargs):"""更新定时任务"""instance = self.get_object()serializer = self.get_serializer(instance, data=request.data)serializer.is_valid(raise_exception=True)# 任务CRON表达式cron_expression = serializer.validated_data.pop("cron_expression")schedule = infra_job_service.get_or_create_crontab_schedule(cron_expression)# 更新任务task_data = {"name": serializer.validated_data.get("name"),"task": serializer.validated_data.get("task"),"kwargs": serializer.validated_data.get("kwargs"),"crontab": schedule,}PeriodicTask.objects.filter(id=instance.id).update(**task_data)return CommonResponse.success()@extend_schema(summary="触发定时任务")@action(methods=["put"],detail=True,url_path="trigger",)def trigger(self, request, *args, **kwargs):"""触发定时任务"""instance = self.get_object()# 获取任务函数并手动触发task_name = instance.task  # 任务路径如 "myapp_infra.tasks.send_daily_report"task_kwargs = json.loads(instance.kwargs or "{}")try:# 动态加载任务函数task = current_app.tasks[task_name]task.delay(**task_kwargs)return CommonResponse.success()except KeyError:return CommonResponse.error(code=121101,msg=f"找不到 {task_name}  任务,或该任务未注册",)except Exception as e:return CommonResponse.error(code=121102,msg=f"触发任务 {task_name} 失败,错误信息:{e}",)@extend_schema(summary="更新定时任务状态")@action(methods=["put"],detail=True,url_path="status",)def update_status(self, request, *args, **kwargs):"""更新定时任务状态"""status = request.query_params.get("status")if status is None or status not in ["1", "2"]:  # 1:开启 2:暂停return CommonResponse.error(code=121104, msg="任务状态值错误")instance = get_object_or_404(PeriodicTask, pk=kwargs.get("pk"))instance.enabled = status == "1"instance.save()return CommonResponse.success()@extend_schema(summary="获取定时任务的下 n 次执行时间")@action(methods=["get"],detail=True,url_path="next-times",)def get_next_times(self, request, *args, **kwargs):"""获取定时任务的下 n 次执行时间"""count = int(request.query_params.get("count", 5))task = self.get_object()# 生成CORN 表达式crontab = task.crontabcron_expression = f"{crontab.minute} {crontab.hour} {crontab.day_of_month} {crontab.month_of_year} {crontab.day_of_week}"try:data = infra_job_service.get_next_times(cron_expression, count)except Exception as e:return CommonResponse.error(code=121102, msg=str(e))return CommonResponse.success(data=data)
  • 配置路由:myapp_infra\urls.py
from .job.views import JobViewSet# 管理后台 - 定时任务
router.register(r"job", JobViewSet, basename="job")

启动Celery Beat

  • 启动Celery Worker和Celery Beat调度器
# 在项目目录(与manage.py同级),启动Celery Worker
celery -A mysite worker -l info -P solo# 新建另一个终端窗口,在项目目录(与manage.py同级),启动Celery Beat
celery -A mysite beat -l info -S django_celery_beat.schedulers:DatabaseScheduler# 新建另一个终端窗口,在项目目录(与manage.py同级),启动Django
python manage.py runserver

四、实战效果

通过上面定义的DRF视图集API,配合 Vue3 前端界面实现效果

  • 定时任务的增、删、开启暂停功能

在这里插入图片描述

  • 定时任务的修改功能
    • 处理器名字:填写定义任务的全路径名称
    • CRON表达式:填写标准的CRON表达式

在这里插入图片描述

  • 定时任务执行结果查询功能:能看到定时任务的执行时间、状态、返回结果等信息

在这里插入图片描述

点击查看完整代码


您正在阅读的是《Django从入门到实战》专栏!关注不迷路~


文章转载自:
http://barbarize.dmyyro.cn
http://aplasia.dmyyro.cn
http://benthos.dmyyro.cn
http://aphicide.dmyyro.cn
http://carpsucker.dmyyro.cn
http://areological.dmyyro.cn
http://caky.dmyyro.cn
http://aerophagia.dmyyro.cn
http://aristocratic.dmyyro.cn
http://bludgeon.dmyyro.cn
http://carolina.dmyyro.cn
http://cholecyst.dmyyro.cn
http://authentification.dmyyro.cn
http://chemotactic.dmyyro.cn
http://angor.dmyyro.cn
http://airwaves.dmyyro.cn
http://carboy.dmyyro.cn
http://chartbuster.dmyyro.cn
http://baniyas.dmyyro.cn
http://channels.dmyyro.cn
http://bilocular.dmyyro.cn
http://asciferous.dmyyro.cn
http://bookteller.dmyyro.cn
http://aerotrack.dmyyro.cn
http://anaerophyte.dmyyro.cn
http://barytone.dmyyro.cn
http://biometry.dmyyro.cn
http://auditoria.dmyyro.cn
http://airstop.dmyyro.cn
http://astroarchaeology.dmyyro.cn
http://www.dtcms.com/a/281199.html

相关文章:

  • Android target34升级到35中的edge-to-edge适配
  • Nestjs框架: 数据库架构设计与 NestJS 多 ORM 动态数据库应用与连接池的配置
  • 利用android studio,对图片资源进行二次压缩
  • 基于Ruoyi和PostgreSQL的统一POI分类后台管理实战
  • 三步把餐饮回访差评变口碑
  • java+vue+SpringBoot在线租房和招聘平台(程序+数据库+报告+部署教程+答辩指导)
  • Ajax原理、用法与经典代码实例
  • TCP协议可靠性设计的核心机制与底层逻辑
  • 基于YOLOv8的水稻叶片病害检测系统的设计与实现【近6W条数据集+多病害特征+高准确率】
  • 修改系统配置后,如何编写 Python 脚本以适应 SSL 证书验证的变化
  • Axios 和 Promise 区别对比
  • C语言---自定义类型(下)(枚举和联合类型)
  • 利用DeepSeek为chdb命令行客户端添加输出重定向和执行SQL脚本功能
  • nlp论文:分本分类:《Bag of Tricks for Efficient Text Classification》
  • VirtualBox网络配置全指南:桥接、Host-Only与双网卡实战
  • 2025华为ODB卷-士兵过河-三语言题解
  • 限制apk使用时长第二篇-限制/拦截Apk启动-应用锁功能
  • 创客匠人:创始人 IP 变现,从 “单点尝试” 到 “生态赋能” 的跨越
  • S7-200 SMART PLC:不同CPU及数字量 IO 接线全解析
  • ESLint 配置错误:ReferenceError: prettier is not defined 解决方案
  • 运维技术教程之Jenkins上的known_hosts文件
  • 高频高密度趋势下磁芯材料评价指标探讨
  • UVM(1)—配置环境
  • 算法学习day16----Python数据结构--模拟队列
  • 传统三层网络架构和现代数据中心网络架构(如思科 ACI 的 Spine-Leaf 架构)的对比和分析(Grok3 回答)
  • Mac电脑上无需卸载即可切换使用多个EasyConnect客户端的解决方案
  • Docker安装升级redis,并设置持久化
  • 使用 sudo iftop -i 分析服务器带宽使用情况
  • 一文读懂语义解析技术:从规则到神经网络的演进与挑战
  • PPP 链路及 MP 捆绑与 CHAP 验证实验