当前位置: 首页 > news >正文

Python定时任务管理器

工作中需要写很多数据采集器以不同的时间间隔运行。首先想到的是通过cron或jenkins等工具来进行管理,由于各种原因不太方便使用这些工具。

问了一下 deepseek,发现可以使用 croniter 实现类似 cron 的功能,完美满足了我的需求。

使用方法

1. 首先安装  croniter

pip install croniter

2. 创建任务配置文件

tasks:
  - name: "Task name"
    schedule: "*/10 * * * *"
    command: "..."
    workdir: "..."

配置文件可以采用yaml格式,其中name参数表示任务的名称, schedule 参数指定任务的运行时间,格式与 cron 保持一致; command 参数指定要运行的命令,最好使用全路径;workdir 参数命令运行目录。

在配置文件中可以定义多个任务。

3. 定时任务管理程序如下

在 deekseek 给出的代码基础上做了部分优化。

import os
import sys
import time
import signal
import logging
import subprocess
from datetime import datetime
from croniter import croniter
import pickle
import yaml
import threading
import json
from pathlib import Path
from logging.handlers import RotatingFileHandler

# 日志配置
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        RotatingFileHandler(
            'taskscheduler.log',
            maxBytes=50*1024*1024,  # 50MB大小限制
            backupCount=2,          # 保留2个备份
            encoding='utf-8'        # 明确指定编码
        ),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger('taskscheduler')

def readConf(cfname):
    cfpath = os.path.join(os.path.dirname(__file__), f"{cfname}")
    if not os.path.exists(cfpath):
        logging.warning(f"Missing configuration file : {cfpath}")
        sys.exit(1)

    with open(cfpath, 'r') as fd:
        return yaml.load(fd, Loader=yaml.FullLoader)

def writeDataFile(path, data, type='json'):
    fname = os.path.join(os.path.dirname(__file__), f"{path}")
    with open(fname, 'w') as fd:
        if type == 'json':
            return json.dump(data, fd)
        elif type == 'pickle':
            return pickle.dump(data, fd)
        elif type == 'yaml':
            return yaml.dump(data, fd)

def readDataFile(path, type='json'):
    fname = os.path.join(os.path.dirname(__file__), f"{path}")
    if not os.path.exists(fname):
        logging.warning(f"Missing data file : {fname}")
        raise

    with open(fname, 'r') as fd:
        if type == 'json':
            return json.load(fd)
        elif type == 'pickle':
            return pickle.load(fd)
        elif type == 'yaml':
            return yaml.load(fd, Loader=yaml.FullLoader)
        
class TaskScheduler:
    def __init__(self, config_file="taskscheduler.yaml", state_file='.taskscheduler.states.json'):
        self.config_file = config_file
        self.state_file = state_file
        self.lock = threading.Lock()
        self.tasks = {}
        self.shutdown_flag = False
        self._load_config()
        self._load_states()
        self._setup_signal_handlers()

    def _setup_signal_handlers(self):
        """注册信号处理"""
        signal.signal(signal.SIGINT, self._handle_signal)
        signal.signal(signal.SIGTERM, self._handle_signal)

    def _handle_signal(self, signum, frame):
        """处理中断信号"""
        logger.warning(f"接收到终止信号 {signum},开始优雅关闭...")
        self.shutdown_flag = True

    def _load_config(self):
        """加载YAML配置文件"""
        try:
            config = readConf(self.config_file)
            
            for task_cfg in config['tasks']:
                self._validate_task_config(task_cfg)
                task_id = task_cfg['name']
                
                def make_task_closure(cmd, workdir, task_id):
                    def task_function():
                        try:
                            Path(workdir).mkdir(parents=True, exist_ok=True)
                            logger.info(f"开始执行命令: {cmd}", extra={'task_id': task_id})
                            
                            start_time = time.time()
                            process = subprocess.run(
                                cmd,
                                cwd=workdir,
                                shell=True,
                                check=True,
                                stdout=subprocess.PIPE,
                                stderr=subprocess.STDOUT,
                                text=True
                            )
                            
                            duration = time.time() - start_time
                            logger.info(
                                f"命令执行成功\n"
                                f"• 耗时: {duration:.2f}s\n"
                                f"• 输出: {process.stdout.strip() or '无输出'}",
                                extra={'task_id': task_id}
                            )
                        except subprocess.CalledProcessError as e:
                            logger.error(
                                f"命令执行失败(退出码 {e.returncode})\n"
                                f"• 错误输出: {e.stdout.strip()}",
                                extra={'task_id': task_id}
                            )
                        except Exception as e:
                            logger.error(
                                f"执行异常: {str(e)}",
                                exc_info=True,
                                extra={'task_id': task_id}
                            )
                    return task_function

                self.tasks[task_id] = {
                    'cron': task_cfg['schedule'],
                    'func': make_task_closure(
                        task_cfg['command'],
                        task_cfg.get('workdir', '.'),
                        task_id
                    ),
                    'next_run': None
                }
            logger.info("配置文件加载完成")
            
        except Exception as e:
            logger.critical(f"配置文件加载失败: {str(e)}", exc_info=True)
            raise

    def _validate_task_config(self, config):
        """验证任务配置"""
        required_fields = ['name', 'schedule', 'command']
        for field in required_fields:
            if field not in config:
                raise ValueError(f"任务配置缺少必要字段: {field}")
        
        # 验证cron表达式有效性
        try:
            croniter(config['schedule'])
        except Exception as e:
            raise ValueError(f"无效的cron表达式: {config['schedule']}") from e

    def _load_states(self):
        """加载任务状态"""
        try:
            states = readDataFile(self.state_file)
            if states:
                for task_id, timestamp in states.items():
                    if timestamp and task_id in self.tasks:
                        states[task_id] = datetime.fromisoformat(timestamp)
                self.states = states
            else:
                self.states = {}
        except Exception as e:
            logger.warning(f"加载任务状态失败: {str(e)}", exc_info=True)
            self.states = {}

        # 初始化未记录的任务
        for task_id in self.tasks:
            if task_id not in self.states:
                now = datetime.now()
                prev_time = croniter(self.tasks[task_id]['cron'], now).get_prev(datetime)
                self.states[task_id] = prev_time

    def _save_states(self):
        """保存任务状态"""
        logger.info(f"states: {self.states}")
        serialized = {k: v.isoformat() if v else None 
                        for k, v in self.states.items()}
        writeDataFile(self.state_file, serialized)

    def _update_next_run(self, task_id):
        """更新任务的下次执行时间"""
        task = self.tasks[task_id]
        cron_gen = croniter(task['cron'], self.states[task_id])
        task['next_run'] = cron_gen.get_next(datetime)

    def run(self):
        """启动调度器主循环"""
        logger.info("调度器启动")
        with self.lock:
            for task_id in self.tasks:
                self._update_next_run(task_id)

        logger.info(f"已加载 {len(self.tasks)} 个任务")
        for task_id, task in self.tasks.items():
            logger.info(
                f"任务注册 - ID: {task_id}",
                extra={'task_id': task_id}
            )
            logger.info(
                f"下次执行时间: {task['next_run']}",
                extra={'task_id': task_id}
            )

        try:
            while not self.shutdown_flag:
                now = datetime.now()
                earliest_run = None

                with self.lock:
                    # 执行到期任务
                    for task_id, task in self.tasks.items():
                        # logger.info(f"任务 {task_id} 下次执行时间: {task['next_run']} 当前时间: {now}")
                        # 跳过未设置下次执行时间的任务
                        if task['next_run'] is None:
                            continue
                        if task['next_run'] <= now:
                            self._execute_task(task_id)
                            self._update_next_run(task_id)

                    # 查找最近的下次执行时间
                    for task in self.tasks.values():
                        if (earliest_run is None) or (task['next_run'] < earliest_run):
                            earliest_run = task['next_run']

                if earliest_run:
                    sleep_time = (earliest_run - datetime.now()).total_seconds()
                    sleep_time = max(sleep_time, 0)
                    timeout = min(sleep_time, 60)
                    
                    # 分段休眠以快速响应关闭信号
                    start = time.time()
                    while (time.time() - start < timeout) and not self.shutdown_flag:
                        time.sleep(1)
                else:
                    time.sleep(1)

        finally:
            self._shutdown()

    def _execute_task(self, task_id):
        """执行单个任务"""
        task = self.tasks[task_id]
        logger.info(
            "触发执行",
            extra={'task_id': task_id}
        )
        try:
            task['func']()
            # 更新状态
            self.states[task_id] = task['next_run']
            self._save_states()
        except Exception as e:
            logger.error(
                f"任务执行异常: {str(e)}",
                exc_info=True,
                extra={'task_id': task_id}
            )

    def _shutdown(self):
        """关闭时的清理操作"""
        logger.info("正在关闭调度器...")
        # 确保保存最终状态
        with self.lock:
            self._save_states()
        logger.info("所有任务状态已保存")
        logging.shutdown()

if __name__ == "__main__":
    scheduler = TaskScheduler()
    try:
        scheduler.run()
    except Exception as e:
        logger.critical(f"未捕获的异常: {str(e)}", exc_info=True)
        raise

相关文章:

  • CUDA编程入门代码
  • VUE叉的工作原理?
  • mysql下载与安装、关系数据库和表的创建
  • 【LLM学习】1-NLP回顾+Pytorch复习
  • 如何快速辨别zip压缩包伪加密
  • 系统架构设计师—系统架构设计篇—微服务架构
  • 【AI实践】基于TensorFlow/Keras的CNN(卷积神经网络)简单实现:手写数字识别的工程实践
  • 【玩转23种Java设计模式】结构型模式篇:组合模式
  • (最新教程)Cursor Pro订阅升级开通教程,使用支付宝订阅Cursor Pro Plus
  • saltstack通过master下发脚本批量修改minion_id,修改为IP
  • Spring使用@Scheduled注解的参数详解
  • 【数据仓库与数据挖掘基础】决策分析
  • ChromeDriver下载 最新版本 134.0.6998.35
  • WebUSB的常用API及案例
  • 【Python 数据结构 10.二叉树】
  • 爬虫案例七Python协程爬取视频
  • c++ auto关键字
  • SQL经典查询
  • 基于DeepSeek实现PDF嵌入SVG图片无损放大
  • CarPlanner:用于自动驾驶大规模强化学习的一致性自回归轨迹规划
  • 做网站需要哪些人员/韶山百度seo
  • 看24小时b站直播/网络营销方法和手段
  • 有没有专门做儿童房的网站/哈尔滨seo关键词优化
  • 网站建设日程表/360推广官网
  • 佛山专业做企业网站/建网站的公司排名
  • 网站建设使用多语言/站长是什么级别