当前位置: 首页 > news >正文

flask使用celery通过数据库定时

celery基本配置

from .celery_signals import task_failure_handler,task_postrun_handlerbroker_connection_retry_on_startup=Truetask_serializer='json'
accept_content=['json']
result_serializer='json'
timezone='Asia/Shanghai'
enable_utc=Trueinclude=['app.tasks.test_tasks']
beat_schedule= {} # 动态获取
# 使用自定义调度器
beat_scheduler = 'app.scheduler.database_scheduler.FlaskDBScheduler'
beat_scheduler_reload_interval = 30  # 可覆盖默认值

celery信号机制:

import datetimefrom celery.signals import task_postrun, task_failure
from app.models import TaskExecutionLog, db@task_postrun.connect
def task_postrun_handler(task_id, task, args, kwargs, retval, state, **other):from app import appif state == 'SUCCESS':duration = other.get('runtime', 0)last_run_at = datetime.datetime.now() - datetime.timedelta(seconds=duration)with app.app_context():db.session.add(TaskExecutionLog(task_id=task_id,task_name=task.name,status='SUCCESS',result=retval,args=args,kwargs=kwargs,start_time=last_run_at,end_time=datetime.datetime.now(datetime.timezone.utc),duration=duration))db.session.commit()@task_failure.connect
def task_failure_handler(task_id, exception, traceback, task, args, kwargs, einfo, **other):# 记录返回异常的任务from app import appduration = other.get('runtime', 0)last_run_at = datetime.datetime.now() - datetime.timedelta(seconds=duration)with app.app_context():db.session.add(TaskExecutionLog(task_id=task_id,task_name=task.name,status='FAILURE',result=exception,args=args,kwargs=kwargs,start_time=last_run_at,end_time=datetime.datetime.now(datetime.timezone.utc),duration=duration))db.session.commit()

flask基本配置:

from flask import Flask
from celery import Celery
from flask_sqlalchemy import SQLAlchemyfrom config import Configcelery = Celery()
db = SQLAlchemy()def create_app(config_class=Config):app = Flask(__name__)app.config.from_object(config_class)# 注册celeryinit_celery(app)# 注册dbinit_db(app)# 注册路由from app.views import bpapp.register_blueprint(bp)return appdef init_celery(app: Flask):from config import celery_configcelery.config_from_object(celery_config)celery.conf.update(broker_url=app.config['CELERY_BROKER_URL'],result_backend=app.config['CELERY_RESULT_BACKEND'])celery.flask_app = app #scheduler类可直接调用class ContextTask(celery.Task):def __call__(self, *args, **kwargs):with app.app_context():return self.run(*args, **kwargs)celery.Task = ContextTaskdef init_db(app: Flask):db.init_app(app)app=create_app()

scheduler类重写:

import time
from typing import Dictfrom celery.beat import Scheduler, ScheduleEntry
from celery.schedules import crontabfrom app.models import ScheduledTask,dbclass FlaskDBScheduler(Scheduler):def __init__(self, *args, **kwargs):self._schedule = {}self._last_reload = 0self.reload_interval = kwargs.get('reload_interval', 30)  # 默认30秒重载super().__init__(*args, **kwargs)# 确保在调度器初始化时创建表with self.app.flask_app.app_context():db.create_all()def setup_schedule(self):self._schedule = self.load_schedule()def load_schedule(self) -> Dict[str, ScheduleEntry]:"""从Flask数据库加载调度配置"""schedule = {}with self.app.flask_app.app_context():for task in ScheduledTask.query.filter_by(enabled=1).all():schedule[task.name] = self.create_entry(task)return scheduledef create_entry(self, db_task) -> ScheduleEntry:"""将数据库记录转换为ScheduleEntry"""try:db_schedule=float(db_task.schedule)except:minute, hour, day, month, week_day = db_task.schedule.strip().split(' ')db_schedule = crontab(minute=minute, hour=hour, day_of_week=week_day, day_of_month=day,month_of_year=month)return ScheduleEntry(name=db_task.name,task=db_task.task,schedule=db_schedule,args=db_task.args,kwargs=db_task.kwargs,options={'enabled': db_task.enabled})def tick(self, event_t=None, min=None, max=None):"""重载tick方法实现定期检查"""now = time.time()if now - self._last_reload > self.reload_interval:self._schedule = self.load_schedule()self._last_reload = nowself.logger.debug('Reloaded schedule from Flask DB')return super().tick()@propertydef schedule(self) -> Dict[str, ScheduleEntry]:return self._schedule

表基本配置:

from app import db
from datetime import datetime,timezoneclass ScheduledTask(db.Model):id = db.Column(db.Integer, primary_key=True,comment='id')name = db.Column(db.String(128), unique=True, nullable=False) # 别名,望文生义task = db.Column(db.String(256), nullable=False)  # e.g. 'app.tasks.sample_task'schedule = db.Column(db.String(128), nullable=False)  # e.g. '10.0' or '0 8 * * *'args = db.Column(db.JSON, default=list)kwargs = db.Column(db.JSON, default=dict)enabled = db.Column(db.Boolean, default=True)last_updated = db.Column(db.DateTime, default=lambda :datetime.now(timezone.utc))def __repr__(self):return '<ScheduledTask %r %s>' % self.name,self.taskclass TaskExecutionLog(db.Model):id = db.Column(db.Integer, primary_key=True)task_id = db.Column(db.String(128), index=True)task_name = db.Column(db.String(256))status = db.Column(db.String(50))  # SUCCESS, FAILURE, RETRYresult = db.Column(db.Text)traceback = db.Column(db.Text)args = db.Column(db.JSON)kwargs = db.Column(db.JSON)start_time = db.Column(db.DateTime)end_time = db.Column(db.DateTime)duration = db.Column(db.Float)  # in secondsdef __repr__(self):return f'<TaskExecution {self.task_name} {self.status}>'

接口展示及添加任务:

import importlibfrom flask import Blueprint, jsonify, request
from app.models import ScheduledTask, TaskExecutionLog
from app import dbbp = Blueprint('api', __name__, url_prefix='/api')@bp.route('/tasks', methods=['GET'])
def list_tasks():tasks = ScheduledTask.query.all()return jsonify([{'id': t.id,'name': t.name,'task': t.task,'schedule': t.schedule,'enabled': t.enabled} for t in tasks])@bp.route('/tasks', methods=['POST'])
def create_task():data = request.jsonprint(data)task = ScheduledTask(name=data['name'],task=data['task'],schedule=data['schedule'],args=data.get('args') or [],kwargs=data.get('kwargs') or {},enabled=data.get('enabled', True))db.session.add(task)db.session.commit()return jsonify({'message': 'Task created'}), 201@bp.route('/run', methods=['POST'])
def run_task():data = ScheduledTask.query.filter(ScheduledTask.id == request.json['id'], ScheduledTask.enabled == True).first()if data and data.task:module, func = data.task.rsplit('.', 1)task = getattr(importlib.import_module(module), func)result = task.delay(*data.args, **data.kwargs)return jsonify({'task_id': result.task_id})return jsonify({'message': 'Task not found'}), 404@bp.route('/task-logs', methods=['GET'])
def list_task_logs():logs = TaskExecutionLog.query.order_by(TaskExecutionLog.start_time.desc()).limit(50).all()return jsonify([{'task_name': log.task_name,'status': log.status,'start_time': log.start_time.isoformat(),'duration': log.duration} for log in logs])

项目根目录导出celery便于celery命令执行:

from app import celery

tasks任务编写:

from app import celery
from app.models import ScheduledTask@celery.task
def add(x, y):return x + y@celery.task
def multiply(x, y):return x * y@celery.task
def hello_world():return "Hello World!"@celery.task
def monitor_task_list():tasks = ScheduledTask.query.all()return [{'id': t.id,'name': t.name,'task': t.task,'schedule': t.schedule,'enabled': t.enabled} for t in tasks]

版本依赖:

celery==5.2.7
Flask==3.1.1
flask_sqlalchemy==3.1.1

tasks简单编写方便举例配置:

from app import celery
from app.models import ScheduledTask@celery.task
def add(x, y):return x + y@celery.task
def multiply(x, y):return x * y@celery.task
def hello_world():return "Hello World!"@celery.task
def monitor_task_list():tasks = ScheduledTask.query.all()return [{'id': t.id,'name': t.name,'task': t.task,'schedule': t.schedule,'enabled': t.enabled} for t in tasks]

celery启动:

celery -A make_celery worker --pool=solo --loglevel=info #worker启动命令
celery -A make_celery beat --loglevel=info # beat启动命令

数据库配置:

按照上述步骤配置即可生效

http://www.dtcms.com/a/305385.html

相关文章:

  • 【专题十六】BFS 解决最短路径
  • Qt制作一个简单通信程序
  • C语言---万能指针(void *)、查找子串(strncmp函数的应用)多维数组(一维数组指针、二维数组指针)、返回指针值函数、关键字(const)
  • MongoDB系列教程-第一章:MongoDB简介、安装 、概念解析、用户管理、连接、实际应用示例
  • 数据结构-图的相关定义
  • 猎豹移动宣布控股UFACTORY,合计持股超80%
  • Oracle优化学习十六
  • Java高级技术知识点
  • 书籍推荐算法研究
  • 分布式链路追踪的实现原理
  • 系统学习算法:专题十五 哈希表
  • 第十一天:不定方程求解
  • windows下Docker安装路径、存储路径修改
  • LeetCode 刷题【19. 删除链表的倒数第 N 个结点、20. 有效的括号、21. 合并两个有序链表】
  • Ragflow 文档处理深度解析:从解析到存储的完整流程
  • 2025年06月 C/C++(三级)真题解析#中国电子学会#全国青少年软件编程等级考试
  • 删除不了文件(文件夹)需更改文件夹(文件)权限
  • nodejs 实现Excel数据导入数据库,以及数据库数据导出excel接口(核心使用了multer和node-xlsx库)
  • Java 队列
  • 【密码学】4. 分组密码
  • Coze:Window操作系统部署Coze Studio
  • 5.1 动⼿实现⼀个 LLaMA2 ⼤模型
  • Kun_Tools(全能文档工具)V0.4.6 便携版
  • 正运动控制器Zbasic回零详细教程(带Z信号)
  • 智能图书馆管理系统开发实战系列(一):项目架构设计与技术选型
  • 【Android】三种弹窗 Fragment弹窗管理
  • CTF Misc入门篇
  • 携全双工语音通话大模型亮相WAIC,Soul重塑人机互动新范式
  • Linux学习篇12——Shell编程入门与Shell编程变量详解大全
  • C++ 枚举enum的使用详细总结