任务均衡,即将100条任务的执行时间,均匀的分布在1小时内,可以降低运行负载的峰值,也可以用于控制多线程任务
import threading
import time
from datetime import datetime
from typing import Callable, List
import concurrent.futuresclass TaskScheduler:def __init__(self, task_list: List[str], cycle_seconds: int, reserve_seconds: int, func: Callable, single_cycle: bool = False, max_threads: int = 10):"""构造说明:- 必须满足 reserve_seconds < cycle_seconds- func 应实现异常处理逻辑- 单次模式时自动在任务完成后停止- max_threads 限制同时运行的线程数"""if reserve_seconds >= cycle_seconds:raise ValueError("reserve_seconds 必须小于 cycle_seconds")if max_threads <= 0:raise ValueError("max_threads 必须大于 0")self.task_list = task_listself.cycle_seconds = cycle_secondsself.reserve_seconds = reserve_secondsself.func = funcself.single_cycle = single_cycleself.max_threads = max_threadsself.lock = threading.Lock()self.running = Falsedef _execute_task(self, task: str, target_time: float):"""执行单个任务,确保在指定时间点执行"""current_time = time.time()sleep_time = max(0, target_time - current_time)time.sleep(sleep_time)try:self.func(task)except Exception as e:print(f"任务 {task} 执行出错: {e}")def _schedule_tasks(self):"""安排任务执行"""cycle_start_time = time.time()available_time = self.cycle_seconds - self.reserve_secondsinterval = available_time / len(self.task_list)current_time = cycle_start_timewith concurrent.futures.ThreadPoolExecutor(max_workers=self.max_threads) as executor:for task in self.task_list:target_time = current_time + intervalexecutor.submit(self._execute_task, task, target_time)current_time = target_timedef start(self):"""启动任务调度"""with self.lock:if self.running:print("任务调度器已经在运行")returnself.running = Truewhile self.running:self._schedule_tasks()if self.single_cycle:self.running = Falseelse:time.sleep(self.cycle_seconds)print("[系统时间] 所有任务执行完成")def stop(self):"""停止任务调度"""with self.lock:self.running = Falsedef runner_test(x):print(f"[{datetime.now().strftime('%H:%M:%S.%f')}] 处理 {x} start")if x == 3:raise ValueError("333333333333333333333333")time.sleep(2)print(f"[{datetime.now().strftime('%H:%M:%S.%f')}] 处理 {x} end")
if __name__ == "__main__":scheduler = TaskScheduler(task_list=[i for i in range(90)],cycle_seconds=10, reserve_seconds=1, func=runner_test,single_cycle=True,max_threads=20 )scheduler.start()
代码说明
- 时间控制:
- 任务均匀分布在
(周期时间 - 预留时间)
的时间窗内。 - 每个任务的间隔时间通过
available_time / len(self.task_list)
计算得出。 - 使用
time.sleep
确保任务在指定时间点执行,误差控制在毫秒级别。
- 执行控制:
- 支持自定义任务处理函数
func
,每个任务元素独立调用该函数。 - 使用多线程并发执行任务,确保任务的独立性。
- 模式选择:
- 支持单次执行模式和持续周期模式。
- 单次模式下,任务执行完成后自动停止。
- 异常处理:
- 在
_execute_task
方法中捕获异常,确保任务执行出错时不会影响其他任务。
- 线程安全:
- 使用
threading.Lock
确保多线程并发调用时的线程安全。
- 资源控制:
- 单实例最多同时管理 1000 个任务元素(可通过限制
task_list
的长度实现)。
- 执行保障:
- 确保最后一个任务完成时间不超过周期开始时间 +
(cycle_seconds - reserve_seconds)
。 - 跳过已过期的历史任务时间点。
- 误差容忍: