当前位置: 首页 > news >正文

Python代码片段-断点任务

使用Python处理一堆长耗时任务的时候,为了防止异常退出程序或者手动退出程序后丢失任务进度,可用使用断点的方式记录任务进度,下次重载任务后,继续运行上次未完成的任务即可。

这里用json文件作为数据持久化的方式,免去了使用数据库来作为持久的依赖问题,为了一个小的任务来搭建一套数据库,耗时耗力,得不偿失,不如选择本地文件来做简单的持久化,就算丢失部分任务进度,少数任务重新执行所消耗的时间完全处于容忍范围内。另外使用json文件记录任务进度,可用修改json文件来人为干预任务的执行情况,也是不错的一个优点

import json
import time
import os


class Task:
    def __init__(self, task_file='task_progress.json'):
        self.task_file = task_file
        self.progress = []
        if os.path.exists(self.task_file):
            self.load_progress()

    def load_progress(self):
        """加载任务进度"""
        with open(self.task_file, 'r') as f:
            data = json.load(f)
            self.progress = data.get('progress', 0)

    def save_progress(self):
        """保存任务进度"""
        data = {'progress': self.progress}
        with open(self.task_file, 'w') as f:
            json.dump(data, f,indent=4, separators=(',', ': '))
            f.flush()
            os.fsync(f.fileno())  # 确保数据已写入磁盘

    def execute_task(self):

        legacy_tasks = [t  for t in self.progress if t['finish'] ==0 ]

        for i, v in enumerate(legacy_tasks):
            print(f"执行任务 {v['key']} ...")
            time.sleep(3)  # 模拟任务执行的时间
            self.mark_finished( v['key']) # 更行进度
            self.save_progress()  # 每次执行后保存进度
            print(f"任务 {v['key']} 完成。")

        print("任务完成!")

    def flush_progresses(self):
        tasks = [
            {"key":"xxx1"},
            {"key":"xxx2"},
            {"key":"xxx3"}
        ]

        self.progress = [ {"key" : t['key'], 'finish':0} for t in tasks]
        self.save_progress()

    def mark_finished(self, key):
        """标记完成"""
        print(key)
        for i, t in enumerate(self.progress) :
            if t ['key'] ==  key :
                self.progress[i]['finish'] = 1
                break
        self.save_progress()
    
    def count(self):
        """查询未完成"""
        return len( [t for t in self.progress if t['finish'] == 0])



if __name__ == "__main__":
    task = Task()
    # 第一次生成任务列表,下次任务时候注释掉此行
    task.flush_progresses()
    try:
        task.execute_task()
    except KeyboardInterrupt:
        print("\n任务被中断,进度已保存。")

相关文章:

  • Linux常见问题
  • 算法日记27:完全背包(DFS->记忆化搜索->倒叙DP->顺序DP->空间优化)
  • Unity Android SDK 升级、安装 build-tools、platform-tools
  • 【HeadFirst系列之HeadFirstJava】第5天之超强力方法 —— 从战舰游戏到循环控制
  • 【C语言】指针(6)
  • 通俗理解什么是云原生?
  • Spring Boot 3 集成 RabbitMQ 实践指南
  • 《操作系统 - 清华大学》 8 -6:进程管理:进程状态变化模型
  • 3、优先级翻转问题
  • Ubuntu中部署deepseek
  • 【漫话机器学习系列】101.特征选择法之Lasso(Lasso For Feature Selection)
  • 离子阱量子计算机的原理与应用:开辟量子计算的新天地
  • 代码随想录|62.不同路径,63.不同路径Ⅱ,343.整数拆分
  • 论文笔记(七十二)Reward Centering(四)
  • Linux系统移植之对NXP的Uboot修改后移植
  • 给SQL server数据库表字段添加注释SQL,附修改、删除注释SQL及演示
  • Comfyui Windows Desktop桌面版便携版安装教程
  • 深入了解 MySQL 中的 JSON_CONTAINS
  • com库原理使用
  • Python 环境管理介绍
  • 纪念苏联伟大卫国战争胜利80周年阅兵彩排,解放军仪仗队亮相
  • 现场|万米云端,遇见上博
  • 王耀庆化身“罗朱”说书人,一人挑战15个角色
  • 【社论】三个“靠谱”为市场注入确定性
  • 宁合两大都市圈交汇之城含山:要想身体好,常往含山跑
  • 从黄土高原到黄浦江畔,澄城樱桃品牌推介会明日在上海举办