任务均衡 ,即将100条任务的执行时间,均匀的分布在1小时内,可以降低运行负载的峰值,也可以用于控制多线程任务
import threading
import time
from datetime import datetime
from typing import Callable, List
import concurrent. futuresclass TaskScheduler : def __init__ ( self, task_list: List[ str ] , cycle_seconds: int , reserve_seconds: int , func: Callable, single_cycle: bool = False , max_threads: int = 10 ) : """构造说明:- 必须满足 reserve_seconds < cycle_seconds- func 应实现异常处理逻辑- 单次模式时自动在任务完成后停止- max_threads 限制同时运行的线程数""" if reserve_seconds >= cycle_seconds: raise ValueError( "reserve_seconds 必须小于 cycle_seconds" ) if max_threads <= 0 : raise ValueError( "max_threads 必须大于 0" ) self. task_list = task_listself. cycle_seconds = cycle_secondsself. reserve_seconds = reserve_secondsself. func = funcself. single_cycle = single_cycleself. max_threads = max_threadsself. lock = threading. Lock( ) self. running = False def _execute_task ( self, task: str , target_time: float ) : """执行单个任务,确保在指定时间点执行""" current_time = time. time( ) sleep_time = max ( 0 , target_time - current_time) time. sleep( sleep_time) try : self. func( task) except Exception as e: print ( f"任务 { task} 执行出错: { e} " ) def _schedule_tasks ( self) : """安排任务执行""" cycle_start_time = time. time( ) available_time = self. cycle_seconds - self. reserve_secondsinterval = available_time / len ( self. task_list) current_time = cycle_start_timewith concurrent. futures. ThreadPoolExecutor( max_workers= self. max_threads) as executor: for task in self. task_list: target_time = current_time + intervalexecutor. submit( self. _execute_task, task, target_time) current_time = target_timedef start ( self) : """启动任务调度""" with self. lock: if self. running: print ( "任务调度器已经在运行" ) return self. running = True while self. running: self. _schedule_tasks( ) if self. single_cycle: self. running = False else : time. sleep( self. cycle_seconds) print ( "[系统时间] 所有任务执行完成" ) def stop ( self) : """停止任务调度""" with self. lock: self. running = False def runner_test ( x) : print ( f"[ { datetime. now( ) . strftime( '%H:%M:%S.%f' ) } ] 处理 { x} start" ) if x == 3 : raise ValueError( "333333333333333333333333" ) time. sleep( 2 ) print ( f"[ { datetime. now( ) . strftime( '%H:%M:%S.%f' ) } ] 处理 { x} end" )
if __name__ == "__main__" : scheduler = TaskScheduler( task_list= [ i for i in range ( 90 ) ] , cycle_seconds= 10 , reserve_seconds= 1 , func= runner_test, single_cycle= True , max_threads= 20 ) scheduler. start( )
代码说明
时间控制 : 任务均匀分布在 (周期时间 - 预留时间)
的时间窗内。 每个任务的间隔时间通过 available_time / len(self.task_list)
计算得出。 使用 time.sleep
确保任务在指定时间点执行,误差控制在毫秒级别。 执行控制 : 支持自定义任务处理函数 func
,每个任务元素独立调用该函数。 使用多线程并发执行任务,确保任务的独立性。 模式选择 : 支持单次执行模式和持续周期模式。 单次模式下,任务执行完成后自动停止。 异常处理 : 在 _execute_task
方法中捕获异常,确保任务执行出错时不会影响其他任务。 线程安全 : 使用 threading.Lock
确保多线程并发调用时的线程安全。 资源控制 : 单实例最多同时管理 1000 个任务元素(可通过限制 task_list
的长度实现)。 执行保障 : 确保最后一个任务完成时间不超过周期开始时间 + (cycle_seconds - reserve_seconds)
。 跳过已过期的历史任务时间点。 误差容忍 :