Python基础(⑧APScheduler任务调度框架)
APScheduler 是一个功能强大的定时任务调度框架,主要用来自动在指定时间或按照一定规律重复执行代码。它的核心价值是帮你 “自动化” 那些需要定时执行的操作,无需人工干预。
具体能做什么?举几个常见场景:
1. 周期性任务
定时备份数据:比如每天凌晨 2 点自动备份数据库
定时清理文件:比如每小时删除临时文件夹中的过期文件
定时同步数据:比如每 10 分钟从远程接口同步一次数据
2. 一次性定时任务
延迟执行操作:比如用户下单后,30 分钟内未支付就自动取消订单
未来某时执行:比如预约明天早上 8 点发送提醒消息
3. 复杂时间规则任务
按工作日执行:比如每周一至周五下午 5 点生成日报
按月执行:比如每月 1 号生成上月财务报表
特定日期执行:比如每年 12 月 31 日执行年度统计
4. 与其他框架结合
Web 应用中:在 Django/Flask 里,用它定时更新缓存、发送邮件通知
爬虫程序中:定时爬取目标网站的最新数据
监控系统中:每隔一段时间检查服务器状态,异常时报警
APScheduler 有 4 个核心组件:
触发器(Trigger):决定任务何时执行(日期、间隔、 cron 表达式)
任务(Job):要执行的函数或方法
执行器(Executor):负责执行任务(如线程池、进程池)
调度器(Scheduler):协调上述组件,是核心控制器
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime# 定义要执行的任务
def job1():print(f"任务1执行:{datetime.datetime.now()}")def job2(name):print(f"任务2({name})执行:{datetime.datetime.now()}")# 创建调度器(BlockingScheduler 会阻塞当前线程,适合脚本运行)
scheduler = BlockingScheduler()# 1. 间隔触发器(interval):每隔一段时间执行
# 每隔 5 秒执行 job1
scheduler.add_job(job1,'interval',seconds=5,id='job1' # 给任务设置唯一ID,方便后续操作
)# 2. 日期触发器(date):在特定时间点执行一次
# 在 3 秒后执行 job2(传递参数)
run_time = datetime.datetime.now() + datetime.timedelta(seconds=3)
scheduler.add_job(job2,'date',run_date=run_time,args=['一次性任务'], # 传递给 job2 的参数id='job2'
)# 3. Cron 触发器(cron):类似 Linux 的 cron 表达式,适合复杂定时
# 每分钟的第 0 秒执行 job1(即每分钟执行一次)
scheduler.add_job(job1,'cron',minute='*', # 每分钟second=0, # 第 0 秒id='job3'
)print("调度器开始运行...")
print("当前时间:", datetime.datetime.now())
print("按 Ctrl+C 停止")# 启动调度器(程序会阻塞在这里)
try:scheduler.start()
except (KeyboardInterrupt, SystemExit):pass
任务操作
# 暂停任务
scheduler.pause_job('job1')# 恢复任务
scheduler.resume_job('job1')# 修改任务(如修改间隔为 10 秒)
scheduler.modify_job('job1', seconds=10)# 删除任务
scheduler.remove_job('job1')
123