当前位置: 首页 > wzjs >正文

青岛做网站公司电话WordPress的电影播放器代码

青岛做网站公司电话,WordPress的电影播放器代码,平面设计免费网站,莆田做网站排名介绍 🧩 什么是“定时任务”? 定时任务,就是按照设定的时间间隔或时间点自动执行某些操作。比如: • 每天早上8点发通知 • 每隔10秒采集一次数据 • 每小时清理一次缓存相关使用 ✅ 最简单的方式:while True tim…

介绍

🧩 什么是“定时任务”?

定时任务,就是按照设定的时间间隔或时间点自动执行某些操作。比如:

•	每天早上8点发通知
•	每隔10秒采集一次数据
•	每小时清理一次缓存

相关使用

✅ 最简单的方式:while True + time.sleep()

import timedef job():print("执行任务")while True:job()time.sleep(10)  # 每10秒执行一次

✅ 优点:

• 写法简单,不需要任何依赖

• 控制力强

❌ 缺点:

会阻塞当前线程

• 精度差(任务执行时间会影响间隔)

• 没有“精确到几点几分”的调度能力

✅ 进阶方案:使用 schedule 库

pip install schedule
import schedule
import timedef job():print("每隔5秒执行一次")schedule.every(5).seconds.do(job)while True:schedule.run_pending()time.sleep(1)>>>>>>>>>
schedule.every().day.at("10:30").do(job) 每天10:30
schedule.every().monday.at("09:00").do(job) 每周一上午9

• schedule.run_pending() 会检查:是否有任务应该执行

• 如果是,就执行对应的 job()

• 然后 time.sleep(1) 让主线程休息 1 秒,再循环检查

所以:任务是每5秒一次,不是“休息6秒”。sleep(1) 是用于“轮询检测任务是否该执行”,并不会影响任务周期本身。

✅ 优点:

• 语法优雅、简单

• 支持 every().minutes, every().day.at(“10:00”) 这种写法

• 适合小型任务管理

✅ 更强大的方案:使用 APScheduler

from apscheduler.schedulers.blocking import BlockingSchedulerdef job():print("每5秒执行一次")scheduler = BlockingScheduler() # 创建一个阻塞型调度器实例 
scheduler.add_job(job, 'interval', seconds=5) # 每隔5秒调度一次 job 函数
scheduler.start()  # 启动调度器(这个会阻塞主线程)

内部原理确实 本质上就是一个“定时任务调度器 + 任务轮询器”

APScheduler 提供了多种调度器,比如:

• BlockingScheduler: 启动后会阻塞主线程,适合简单脚本

• BackgroundScheduler: 在后台启动,不阻塞主线程

• AsyncIOScheduler, TornadoScheduler, TwistedScheduler: 分别适配不同异步框架

类型含义
interval固定时间间隔
cron类似 crontab 表达式,支持精确到秒
date只运行一次,指定时间点

调用 start() 后,它会启动一个内部的 循环调度线程(或事件循环):

• 持续维护一个 “任务执行计划表”(内部是优先队列)

• 每一轮 tick(可能是 0.5~1 秒级别),检查是否有任务到了执行时间

• 有就执行(用线程池或进程池执行)

特性说明
支持多种调度类型interval / cron / date
有任务注册中心管理所有待运行的任务
有时间轮询机制类似事件循环,周期性检查是否“触发”
可以并发执行默认使用线程池
可持久化任务支持 SQLite、Redis 等存储后恢复

基于asyncio + 自定义时间

优点:

• 自由度高:可以灵活计算下一次执行时间。

• 无需额外依赖,原生 asyncio 就能跑。

• 跟 Redis 结合很好,可以做跨进程/跨机器任务协调。

缺点:

• 手动管理定时逻辑(get_next_run_time + asyncio.sleep())。

• 多任务可能不好管理,比如暂停/重启某个 job。

• 不支持 cron 表达式等复杂调度。

import asyncio
import logging
import os
from datetime import datetime, timedeltaimport redis.asyncio as redislogging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)class ScheduleService:def __init__(self):self.redis_subscribe_key = "demo:subscription"self.exist_subscribe_key = "demo:exist_push"self.push_interval_seconds = 600  # 推送间隔时间self.fixed_times = ["08:00", "12:00", "18:00"]  # 固定调度时间列表def get_next_run_time(self):now_time = datetime.now()today_str = now_time.strftime("%Y-%m-%d")# 今日的所有调度时间点run_times = [datetime.strptime(f"{today_str} {t}", "%Y-%m-%d %H:%M") for t in self.fixed_times]future_times = [t for t in run_times if t > now_time]if future_times:return min(future_times)else:# 如果今天已经过了所有调度时间,则返回明天的第一个时间点next_day = (now_time + timedelta(days=1)).strftime('%Y-%m-%d')return datetime.strptime(f"{next_day} {self.fixed_times[0]}", "%Y-%m-%d %H:%M")async def redis_client(self):return await redis.from_url("redis://localhost:6379", decode_responses=True)async def fixed_time_task(self):while True:next_run_time = self.get_next_run_time()sleep_seconds = max(1, (next_run_time - datetime.now()).total_seconds())logger.info(f"[定时任务] 下次执行时间: {next_run_time}, sleep {sleep_seconds:.0f} 秒")await asyncio.sleep(sleep_seconds)logger.info("[定时任务] 执行具体逻辑...✅")# TODO: 添加你自己的定时任务逻辑async def check_redis_and_push(self):while True:try:redis = await self.redis_client()now_score = int(datetime.now().strftime("%H%M"))trigger_score = now_score + 4subscriptions = await redis.zrangebyscore(self.redis_subscribe_key, now_score, trigger_score, withscores=True)if subscriptions:logger.info(f"[推送检查] 检测到 {len(subscriptions)} 条订阅")for sub_key, _ in subscriptions:user_id, tag = self.parse_key(sub_key)push_key = f"{self.exist_subscribe_key}:{user_id}|{tag}"if not await redis.exists(push_key):logger.info(f"[推送中] 推送消息给用户 {user_id},标签:{tag}")# TODO: 实际的推送逻辑await redis.setex(push_key, self.push_interval_seconds, "1")await redis.aclose()except Exception as e:logger.error(f"[推送异常] {str(e)}")await asyncio.sleep(60)def parse_key(self, key):"""解析订阅键 user:xxx|tag:xxx"""try:parts = key.split("|")user_id = parts[0].split(":")[1]tag = parts[1].split(":")[1]return user_id, tagexcept Exception as e:logger.error(f"解析 key 失败: {key} -> {e}")return "", ""if __name__ == "__main__":async def main():service = ScheduleService()await asyncio.gather(service.fixed_time_task(),service.check_redis_and_push(),)asyncio.run(main())
方案是否异步优点缺点推荐场景
上述自定义写法灵活,Redis 任务配合好需手动维护时间逻辑任务量少 + redis调度系统
schedule简单、易用不支持异步、不适合服务部署脚本类、一次性任务
APScheduler支持异步 + cron/interval,易维护初学者需要学习下语法推荐服务常驻型场景

支持异步”,就是指在任务调度器中,能直接运行这种:

async def job():# 这里可以有 await,例如操作数据库、访问 Redis、发 HTTP 请求等await some_async_operation()print("异步任务完成")

而 APScheduler 的 AsyncIOScheduler 会让你 不需要 while True,你只要注册一次 job 函数,它会自动在对应时间点调度、支持并发、支持异步、支持 cron 等复杂逻辑,例如:

from apscheduler.schedulers.asyncio import AsyncIOSchedulerasync def job():await do_something_async()scheduler = AsyncIOScheduler()
scheduler.add_job(job, 'interval', seconds=10)
scheduler.start()

这个 wrapper() 是普通函数,apscheduler 就可以调度它,而它内部通过 asyncio.create_task() 启动了异步任务。


文章转载自:

http://Tlro4utw.yxnfd.cn
http://0qX2Pr6b.yxnfd.cn
http://g9rxtpKC.yxnfd.cn
http://Vfujafdm.yxnfd.cn
http://tORolBDL.yxnfd.cn
http://Fm2LiQjB.yxnfd.cn
http://eouDE9nT.yxnfd.cn
http://HQFQm4Uc.yxnfd.cn
http://MCKesFcG.yxnfd.cn
http://l6Um32hk.yxnfd.cn
http://Rg2Qn8dB.yxnfd.cn
http://gY47QpnT.yxnfd.cn
http://I4rrmcFm.yxnfd.cn
http://DNRdLcjl.yxnfd.cn
http://vX4PnhiD.yxnfd.cn
http://KKXdWurp.yxnfd.cn
http://8nHPkKWg.yxnfd.cn
http://2v1oq1yA.yxnfd.cn
http://dG8NmbaA.yxnfd.cn
http://WrL6MJrH.yxnfd.cn
http://15Z1ngea.yxnfd.cn
http://gNLAUZDU.yxnfd.cn
http://CoRoY0Ez.yxnfd.cn
http://psyBnxzb.yxnfd.cn
http://F1xZYwdq.yxnfd.cn
http://yN0IQtUO.yxnfd.cn
http://KVPbEov3.yxnfd.cn
http://E684Ib9W.yxnfd.cn
http://zBpa1Kqe.yxnfd.cn
http://dlMRuEVe.yxnfd.cn
http://www.dtcms.com/wzjs/739466.html

相关文章:

  • 手机版网站怎么做的网站建设误区图
  • 来个网站好人有好报单一产品销售网站建设模板
  • 中山古镇做网站苏州公众号开发公司
  • 网页制作与网站建设实战大全 豆瓣网站开发汇报ppt模板
  • 做网站不推广平台设计图片
  • 如何做网站卖产品广州17做网站
  • 程序开发 网站开发成都旅游住哪里
  • 南海网站建设公司网站建设云服务器
  • 阿里云建站教程视频域名查询ip网站
  • 南京定制网站建设怎么收费wordpress 图片域名
  • 中国常用网站十大网站建设排名
  • 搭建网站平台举三个成功的新媒体营销案例
  • 网页设计模板图片什么软件好用快速排名优化
  • 深圳网站设计专家乐云seo品牌北京社保网上服务平台
  • 网站开发的目的意义特色创新wordpress完整搬家
  • 法制教育网站谷歌浏览器官网手机版
  • 打开网页出现网站建设中场所码小程序怎么做
  • 制作个人网站主页网站开发公司怎么接单
  • 域名买完后如何做网站mil后缀网站
  • 中国企业公司网站建设跨境购网站建设
  • 常州网站制作计划wordpress主机配置
  • 郑州企业网站推广外包高德地图怎么没有菲律宾位置
  • 如何用dw制作网页框架重庆seo综合优化
  • 网站建设开发报价方案模板网站开发的著作权和版权
  • 邹平建设局官方网站微网站开发视频
  • 企业解决方案提供商清理优化大师
  • 长尾关键词有哪些东莞公司网站做优化
  • 石家庄医院网站建设爬虫 做资讯网站
  • 用数据库做学校网站论文北京文化传媒有限公司网站建设
  • 做网站需不需要云数据库app页面模板简单制作