当前位置: 首页 > wzjs >正文

做文学类网站后期花费优化网站找哪家

做文学类网站后期花费,优化网站找哪家,网易做相册旅游网站,专门做化妆品平台的网站有哪些工作中需要写很多数据采集器以不同的时间间隔运行。首先想到的是通过cron或jenkins等工具来进行管理,由于各种原因不太方便使用这些工具。 问了一下 deepseek,发现可以使用 croniter 实现类似 cron 的功能,完美满足了我的需求。 使用方法 …

工作中需要写很多数据采集器以不同的时间间隔运行。首先想到的是通过cron或jenkins等工具来进行管理,由于各种原因不太方便使用这些工具。

问了一下 deepseek,发现可以使用 croniter 实现类似 cron 的功能,完美满足了我的需求。

使用方法

1. 首先安装  croniter

pip install croniter

2. 创建任务配置文件

tasks:- name: "Task name"schedule: "*/10 * * * *"command: "..."workdir: "..."

配置文件可以采用yaml格式,其中name参数表示任务的名称, schedule 参数指定任务的运行时间,格式与 cron 保持一致; command 参数指定要运行的命令,最好使用全路径;workdir 参数命令运行目录。

在配置文件中可以定义多个任务。

3. 定时任务管理程序如下

在 deekseek 给出的代码基础上做了部分优化。

import os
import sys
import time
import signal
import logging
import subprocess
from datetime import datetime
from croniter import croniter
import pickle
import yaml
import threading
import json
from pathlib import Path
from logging.handlers import RotatingFileHandler# 日志配置
logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',handlers=[RotatingFileHandler('taskscheduler.log',maxBytes=50*1024*1024,  # 50MB大小限制backupCount=2,          # 保留2个备份encoding='utf-8'        # 明确指定编码),logging.StreamHandler()]
)
logger = logging.getLogger('taskscheduler')def readConf(cfname):cfpath = os.path.join(os.path.dirname(__file__), f"{cfname}")if not os.path.exists(cfpath):logging.warning(f"Missing configuration file : {cfpath}")sys.exit(1)with open(cfpath, 'r') as fd:return yaml.load(fd, Loader=yaml.FullLoader)def writeDataFile(path, data, type='json'):fname = os.path.join(os.path.dirname(__file__), f"{path}")with open(fname, 'w') as fd:if type == 'json':return json.dump(data, fd)elif type == 'pickle':return pickle.dump(data, fd)elif type == 'yaml':return yaml.dump(data, fd)def readDataFile(path, type='json'):fname = os.path.join(os.path.dirname(__file__), f"{path}")if not os.path.exists(fname):logging.warning(f"Missing data file : {fname}")raisewith open(fname, 'r') as fd:if type == 'json':return json.load(fd)elif type == 'pickle':return pickle.load(fd)elif type == 'yaml':return yaml.load(fd, Loader=yaml.FullLoader)class TaskScheduler:def __init__(self, config_file="taskscheduler.yaml", state_file='.taskscheduler.states.json'):self.config_file = config_fileself.state_file = state_fileself.lock = threading.Lock()self.tasks = {}self.shutdown_flag = Falseself._load_config()self._load_states()self._setup_signal_handlers()def _setup_signal_handlers(self):"""注册信号处理"""signal.signal(signal.SIGINT, self._handle_signal)signal.signal(signal.SIGTERM, self._handle_signal)def _handle_signal(self, signum, frame):"""处理中断信号"""logger.warning(f"接收到终止信号 {signum},开始优雅关闭...")self.shutdown_flag = Truedef _load_config(self):"""加载YAML配置文件"""try:config = readConf(self.config_file)for task_cfg in config['tasks']:self._validate_task_config(task_cfg)task_id = task_cfg['name']def make_task_closure(cmd, workdir, task_id):def task_function():try:Path(workdir).mkdir(parents=True, exist_ok=True)logger.info(f"开始执行命令: {cmd}", extra={'task_id': task_id})start_time = time.time()process = subprocess.run(cmd,cwd=workdir,shell=True,check=True,stdout=subprocess.PIPE,stderr=subprocess.STDOUT,text=True)duration = time.time() - start_timelogger.info(f"命令执行成功\n"f"• 耗时: {duration:.2f}s\n"f"• 输出: {process.stdout.strip() or '无输出'}",extra={'task_id': task_id})except subprocess.CalledProcessError as e:logger.error(f"命令执行失败(退出码 {e.returncode})\n"f"• 错误输出: {e.stdout.strip()}",extra={'task_id': task_id})except Exception as e:logger.error(f"执行异常: {str(e)}",exc_info=True,extra={'task_id': task_id})return task_functionself.tasks[task_id] = {'cron': task_cfg['schedule'],'func': make_task_closure(task_cfg['command'],task_cfg.get('workdir', '.'),task_id),'next_run': None}logger.info("配置文件加载完成")except Exception as e:logger.critical(f"配置文件加载失败: {str(e)}", exc_info=True)raisedef _validate_task_config(self, config):"""验证任务配置"""required_fields = ['name', 'schedule', 'command']for field in required_fields:if field not in config:raise ValueError(f"任务配置缺少必要字段: {field}")# 验证cron表达式有效性try:croniter(config['schedule'])except Exception as e:raise ValueError(f"无效的cron表达式: {config['schedule']}") from edef _load_states(self):"""加载任务状态"""try:states = readDataFile(self.state_file)if states:for task_id, timestamp in states.items():if timestamp and task_id in self.tasks:states[task_id] = datetime.fromisoformat(timestamp)self.states = stateselse:self.states = {}except Exception as e:logger.warning(f"加载任务状态失败: {str(e)}", exc_info=True)self.states = {}# 初始化未记录的任务for task_id in self.tasks:if task_id not in self.states:now = datetime.now()prev_time = croniter(self.tasks[task_id]['cron'], now).get_prev(datetime)self.states[task_id] = prev_timedef _save_states(self):"""保存任务状态"""logger.info(f"states: {self.states}")serialized = {k: v.isoformat() if v else None for k, v in self.states.items()}writeDataFile(self.state_file, serialized)def _update_next_run(self, task_id):"""更新任务的下次执行时间"""task = self.tasks[task_id]cron_gen = croniter(task['cron'], self.states[task_id])task['next_run'] = cron_gen.get_next(datetime)def run(self):"""启动调度器主循环"""logger.info("调度器启动")with self.lock:for task_id in self.tasks:self._update_next_run(task_id)logger.info(f"已加载 {len(self.tasks)} 个任务")for task_id, task in self.tasks.items():logger.info(f"任务注册 - ID: {task_id}",extra={'task_id': task_id})logger.info(f"下次执行时间: {task['next_run']}",extra={'task_id': task_id})try:while not self.shutdown_flag:now = datetime.now()earliest_run = Nonewith self.lock:# 执行到期任务for task_id, task in self.tasks.items():# logger.info(f"任务 {task_id} 下次执行时间: {task['next_run']} 当前时间: {now}")# 跳过未设置下次执行时间的任务if task['next_run'] is None:continueif task['next_run'] <= now:self._execute_task(task_id)self._update_next_run(task_id)# 查找最近的下次执行时间for task in self.tasks.values():if (earliest_run is None) or (task['next_run'] < earliest_run):earliest_run = task['next_run']if earliest_run:sleep_time = (earliest_run - datetime.now()).total_seconds()sleep_time = max(sleep_time, 0)timeout = min(sleep_time, 60)# 分段休眠以快速响应关闭信号start = time.time()while (time.time() - start < timeout) and not self.shutdown_flag:time.sleep(1)else:time.sleep(1)finally:self._shutdown()def _execute_task(self, task_id):"""执行单个任务"""task = self.tasks[task_id]logger.info("触发执行",extra={'task_id': task_id})try:task['func']()# 更新状态self.states[task_id] = task['next_run']self._save_states()except Exception as e:logger.error(f"任务执行异常: {str(e)}",exc_info=True,extra={'task_id': task_id})def _shutdown(self):"""关闭时的清理操作"""logger.info("正在关闭调度器...")# 确保保存最终状态with self.lock:self._save_states()logger.info("所有任务状态已保存")logging.shutdown()if __name__ == "__main__":scheduler = TaskScheduler()try:scheduler.run()except Exception as e:logger.critical(f"未捕获的异常: {str(e)}", exc_info=True)raise

http://www.dtcms.com/wzjs/357151.html

相关文章:

  • 苏州网站建设开发seo关键词排名优化矩阵系统
  • 洛阳哪里做网站武汉seo结算
  • 做一个企业的官网可以做静态网站深圳网站关键词排名优化
  • wordpress用户注册之后不显示密码win10系统优化软件
  • 商贸有限公司起名怀来网站seo
  • 如何重建网站搜索引擎是什么
  • wordpress404无法加载免费seo网站自动推广软件
  • 鱿鱼网站男女做愛免費视頻搜索引擎优化排名seo
  • 哈尔滨服务专业的建站百度快速排名系统查询
  • 前端网站大全广州新闻热点事件
  • 国际网站制作高端网站定制开发
  • logo设计说明模板100字上海百度搜索优化
  • 怎么样用dw做网站太原网站建设开发
  • 大型网站快速排名seo短视频网页入口引流网站
  • 网盘搜索网站如何做的沈阳网站优化
  • 电影网站怎么做的百度指数官方版
  • 做网站需要学什么语言万网域名查询工具
  • 制作公司网站的规划教育培训网
  • 做网站大图片传统营销与网络营销的整合方法
  • wordpress 私信插件seo网络营销技巧
  • 免费网站建设糕点烘焙专业培训学校
  • 网站建设订单模板seoaoo
  • 做网站首选科远网络信息流广告投放渠道
  • 桂林北京网站建设seo推广一年要多少钱
  • 网站建设的可用性优化关键词的方法
  • php做网站弊端百度推广费用一天多少钱
  • vs2013做网站教程2024年重大政治时事汇总
  • 成都建设网站报价搜索词分析
  • 做暧嗳xo小视频免费网站百度一下网页版搜索引擎
  • 建立网站平台搜索引擎营销题库和答案