当前位置: 首页 > news >正文

【最后203篇系列】034 使用SQLite构建简单的任务管理

表数据同步的断点续传

有时候需要将一个表的数据复制到另一个表,循环是常用的方式。当表比较大,执行的时间很长,会有很多因素引起失败。我希望可以比较简单的跑数,所以做一个简单的任务系统。

SQLitre是嵌入式数据库,这样脚本可以不必考虑太多依赖,又可以用到数据库的持久化功能。

1 数据库初始化

给到一个文件路径参数,确定持久化的文件

import sqlite3
import time
import randomdef init_db(DB_FILE = "tasks.db"):"""初始化数据库和表"""conn = sqlite3.connect(DB_FILE)conn.row_factory = sqlite3.Row cur = conn.cursor()cur.execute("""CREATE TABLE IF NOT EXISTS tasks (id INTEGER PRIMARY KEY AUTOINCREMENT,name TEXT NOT NULL,content TEXT, status INT DEFAULT 0  -- 0=未开始, 1=进行中, 2=完成, 3=失败)""")# 可选:给 status 建索引cur.execute("CREATE INDEX IF NOT EXISTS idx_status ON tasks(status)")cur.execute("CREATE INDEX IF NOT EXISTS idx_name ON tasks(name)")conn.commit()return conn, cur

实际中可以以脚本的file path作为参数(结尾改为.db)

# 建立数据库
conn, cur = init_db(DB_FILE = './the_script.db')

2 任务初始化

同步任务,可以按照id分为若干区间,每个区间任务额可以称为lot。

tuple_list = slice_list_by_batch1(min_id,max_id, 10000)
task_df = pd.DataFrame()
task_df['name'] = ['lot_%s' % i for i in range(len(tuple_list)) ]  
task_df['content'] = tuple_list
task_df['content'] = task_df['content'].apply(lambda x: json.dumps(x))
task_lod = df2lod(task_df)
# 写入任务
cur.executemany("INSERT INTO tasks (name, content) VALUES (?,?)", [(t['name'],t['content']) for t in task_lod])
conn.commit()res = cur.execute('select count(*) from tasks').fetchone()
dict(res)

sqlite也可以提供类似 Row Dict的格式,需要

    conn.row_factory = sqlite3.Row cur = conn.cursor()row = cur.execute(new_task_sql).fetchone()

3 处理一次

每次启动时,读出未处理的任务。任务里有tuple参数,根据tuple的起止区间获取数据后进行处理,如果成功就根据任务名将状态更新为2,异常则更新为3。这样每次重复执行这个就可以了,因为数据持久化在文件中,所以即使和服务器断开连接也没关系。

def process_one():new_task_sql  = 'select name, content, status from tasks where status = 0 limit 1'# new_task = cur.execute(new_task_sql).fetchall()conn.row_factory = sqlite3.Row cur = conn.cursor()row = cur.execute(new_task_sql).fetchone()process_code = 1if row:row_dict = dict(row)tem_tuple = json.loads(row_dict['content'])print('>>>> ',row_dict['name'])try:some_tuple = tem_tuple---- DO LOGICcur.execute("UPDATE tasks SET status = 2 WHERE name = ?", (row_dict['name'],))conn.commit()except:cur.execute("UPDATE tasks SET status = 3 WHERE name = ?", (row_dict['name'],))conn.commit()else:process_code = 0return process_code

4 处理直到结束

由于 process_one在没有获取到待处理任务行时会返回0,这个作为结束信号。所以可以给到一个略大的循环次数,当收到结束信号时停止。

for i in range(30000):process_code = process_one()if not process_code:print('无待处理任务')break 
http://www.dtcms.com/a/340607.html

相关文章:

  • Qt5.9.9 + Windows API 开发系统监控工具 - 教学级项目实战
  • Obsidian 1.9.10升级
  • 19.web api 10
  • SQL-leetcode— 2356. 每位教师所教授的科目种类的数量
  • 有关SWD 仿真和PA.15, PB3, PB4的冲突问题
  • 深入Linux内核:架构设计与核心功能解析
  • CSS3DRenderer+ CSS3DObject实现在 Three.js 中添加文本内容
  • 算法230. 二叉搜索树中第 K 小的元素
  • 10M25DCF484C8G Altera FPGA MAX10
  • 云原生俱乐部-RH294知识点归纳(1)
  • RK-Android11-PackageInstaller安装器自动安装功能实现
  • iOS App 混淆工具实战 医疗健康类 App 的安全与合规保护
  • 电脑驱动免费更新? 这款驱动管理工具:一键扫更新,还能备份恢复,小白也会用~
  • 【知识杂记】方差、标准差、均方误差、均方根误差与平均绝对误差,概念、计算公式、物理意义
  • 微型导轨的快速调平技术如何提升激光加工效率?
  • Python默认参数
  • CPTS--Administrator
  • 【clion】调试脚本并cmake构建Fargo项目win32版本
  • Spring Boot 配置
  • C++---向下取整(>>)与向零取整(/)
  • Vue2封装Axios
  • PyTorch - Developer Notes
  • 《录井工程与管理》——第二章井位勘测技术
  • 精品方案 | GCKontrol与OMNeT++联合仿真在机载网络性能分析中的应用
  • 基于单片机环境火灾安全检测
  • 驾驭复杂表单:用 RxJava 实现响应式表单处理
  • mysql-8.0.37-linux-glibc2.12-x86_64安装
  • 数据结构与算法系列(大白话模式)小学生起点(一)
  • 【Kafka】常见简单八股总结
  • 【39】OpenCV C++实战篇——直线拟合、直线测距、平行线段测距;(边缘检测,剔除噪点,轮廓检测,渐进概率霍夫直线)