当前位置: 首页 > news >正文

APScheduler定时

异步IO 定时(协程)

import asyncio
import logging
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class ScheduleService:
    def __init__(self):
        self.fixed_times = ["08:00", "12:00", "18:00"]  # 固定调度时间列表
        self.scheduler = AsyncIOScheduler()

    async def fixed_task(self):
        logger.info("[定时任务] ✅ 执行固定时间任务逻辑")
        # TODO: 这里是你的异步业务逻辑
        await asyncio.sleep(1)  # 模拟异步操作
        logger.info("[定时任务] ✅ 执行完成")

    def add_jobs(self):
        for time_str in self.fixed_times:
            hour, minute = map(int, time_str.split(":"))
            self.scheduler.add_job(self._wrap_async(self.fixed_task), CronTrigger(hour=hour, minute=minute))
            logger.info(f"[调度器] 已添加定时任务: 每天 {hour:02d}:{minute:02d}")

    def _wrap_async(self, coro_func):
        """封装异步函数为 APScheduler 可调度的同步函数"""
        def wrapper():
            asyncio.create_task(coro_func())
        return wrapper

    def start(self):
        self.add_jobs()
        self.scheduler.start()

if __name__ == "__main__":
    service = ScheduleService()
    service.start()
    logger.info("[服务启动] Scheduler 已启动")
    asyncio.get_event_loop().run_forever()

🔍 背后原因:apscheduler 的 job 函数是同步调用的

虽然我们使用的是 AsyncIOScheduler(支持 asyncio 的调度器),但是它 内部的 add_job() 方法要求传入的是一个 普通函数(sync function),而不是 async def 的协程函数。

这个 wrapper() 是普通函数,apscheduler 就可以调度它,而它内部通过 asyncio.create_task() 启动了异步任务。

你的调度器Job函数要求你传的是
AsyncIOScheduler仍然是同步函数所以要把 async def 包成 def

🔧 APScheduler 本质机制:

“先注册任务 ➜ 后续内部通过调度器的机制定时触发这些任务。”

🔁 它内部怎么工作的?

  1. 注册任务:

• scheduler.add_job(…) 会把任务注册到调度器的任务队列中。

• 你提供的时间、频率(如 interval、cron、date)会被转成内部的调度计划。

  1. 内部调度机制:

• AsyncIOScheduler / BlockingScheduler 等调度器都有自己的 后台线程 / 协程循环

• 它会不断 检查当前时间是否满足某个任务的触发条件(也就是你设的 cron, interval 等触发器)。

  1. 一旦时间匹配:

• 就 执行注册的函数(或通过 asyncio.create_task() 启动异步任务);

• 支持并发调度多个任务。

🔁 它内部怎么工作的?

  1. 注册任务:

• scheduler.add_job(…) 会把任务注册到调度器的任务队列中。

• 你提供的时间、频率(如 interval、cron、date)会被转成内部的调度计划。

  1. 内部调度机制:

• AsyncIOScheduler / BlockingScheduler 等调度器都有自己的 后台线程 / 协程循环

• 它会不断 检查当前时间是否满足某个任务的触发条件(也就是你设的 cron, interval 等触发器)。

  1. 一旦时间匹配:

• 就 执行注册的函数(或通过 asyncio.create_task() 启动异步任务);

• 支持并发调度多个任务。

🧠 类比你可以理解为:

就像一个高级版的 while True + sleep(1) 的守护线程,维护着一堆计划任务,每秒看一下:

“诶?有任务要执行了吗?有的话,马上执行它。”

🚦 为什么你看不到它的轮询代码?

•	因为 APScheduler 的调度循环封装在它自己的调度器类里了,比如:
•	AsyncIOScheduler 会将轮询 loop 挂到你自己的 asyncio 事件循环上;
•	BlockingScheduler 会单独跑个线程自己轮询

阶段描述
任务注册通过 .add_job() 把任务及触发条件存入调度器
内部机制调度器内部 loop 每秒轮询判断:是否到时间
到点执行匹配到执行时间后立即触发对应任务(同步或异步)

常用的ASP 异步写法

✅ 1. 每天 10:30 执行任务

from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
import asyncio

async def job():
    print("每天 10:30 执行任务")

scheduler = AsyncIOScheduler()
scheduler.add_job(lambda: asyncio.create_task(job()), CronTrigger(hour=10, minute=30))
scheduler.start()

✅ 2. 每周一 9:00 执行任务

async def weekly_job():
    print("每周一上午 9:00 执行任务")

scheduler.add_job(lambda: asyncio.create_task(weekly_job()), CronTrigger(day_of_week='mon', hour=9, minute=0))

✅ 3. 每小时的第 5 分钟执行

async def hourly_job():
    print("每小时的第 5 分钟执行")

scheduler.add_job(lambda: asyncio.create_task(hourly_job()), CronTrigger(minute=5))

✅ 4. 每隔 5 分钟执行一次(interval 模式)

from apscheduler.triggers.interval import IntervalTrigger

async def interval_job():
    print("每 5 分钟执行一次")

scheduler.add_job(lambda: asyncio.create_task(interval_job()), IntervalTrigger(minutes=5))

(使用 lambda 表达式包装了一个同步函数)

• asyncio.create_task(…) 是一个 同步函数,用来在当前事件循环中安排一个 协程异步运行

• lambda 是快速生成这个同步函数的方式。

✅ 补充:APScheduler 的 CronTrigger 类似 crontab 语法

参数说明示例
second0-59
minute分钟0-59
hour小时0-23
day1-31
month1-12
day_of_week星期几(0-6 或 mon-sun)‘mon’, 0
year2025

相关文章:

  • root账号修改密码
  • 【WRF工具】GIS4WRF详细介绍:配置 WPS/WRF
  • Metabase:一个免费开源的BI平台
  • Vue Transition组件类名+TailwindCSS
  • 程序化广告行业(50/89):Cookie映射技术深度剖析
  • 中级:MyBatis面试题深度剖析
  • Qt - findChild
  • LVGL Chart和Spinner详解
  • 决策树(DecisionTree)
  • My first day in QT programming
  • Ansys Zemax | 如何使用物理光学传播(POP)工具描述空间电场传播(二)
  • Java实现websocket
  • sourceinsight 4.0 任意配置主题颜色风格的方法
  • 用spring-webmvc包实现AI(Deepseek)事件流(SSE)推送
  • esp32 idf中的外部组件
  • OpenAI最近放出大新闻,准备在接下来的几个月内推出一款“开放”的语言模型
  • 基于HUTOOL实现RSA工具类
  • Vue3+Vite+TypeScript+Element Plus开发-02.Element Plus安装与配置
  • deepseek使用记录26——思维混乱背后的理论泡沫与骗局
  • LeetCode 热题 100_打家劫舍(83_198_中等_C++)(动态规划)
  • 调查:“网约摩的”上线起步价五六元,合规性及安全性引质疑
  • 中国旅游日|上天当个“显眼包”!低空经济的“飞”凡魅力
  • 中科院合肥物质院迎来新一届领导班子:刘建国继续担任院长
  • 福建、广西等地有大暴雨,国家防总启动防汛四级应急响应
  • 纽约市长称墨西哥海军帆船撞桥事故已致2人死亡
  • 大学2025丨北大教授陈平原:当卷不过AI时,何处是归途