【最后203篇系列】034 使用SQLite构建简单的任务管理
表数据同步的断点续传
有时候需要将一个表的数据复制到另一个表,循环是常用的方式。当表比较大,执行的时间很长,会有很多因素引起失败。我希望可以比较简单的跑数,所以做一个简单的任务系统。
SQLitre是嵌入式数据库,这样脚本可以不必考虑太多依赖,又可以用到数据库的持久化功能。
1 数据库初始化
给到一个文件路径参数,确定持久化的文件
import sqlite3
import time
import randomdef init_db(DB_FILE = "tasks.db"):"""初始化数据库和表"""conn = sqlite3.connect(DB_FILE)conn.row_factory = sqlite3.Row cur = conn.cursor()cur.execute("""CREATE TABLE IF NOT EXISTS tasks (id INTEGER PRIMARY KEY AUTOINCREMENT,name TEXT NOT NULL,content TEXT, status INT DEFAULT 0 -- 0=未开始, 1=进行中, 2=完成, 3=失败)""")# 可选:给 status 建索引cur.execute("CREATE INDEX IF NOT EXISTS idx_status ON tasks(status)")cur.execute("CREATE INDEX IF NOT EXISTS idx_name ON tasks(name)")conn.commit()return conn, cur
实际中可以以脚本的file path作为参数(结尾改为.db)
# 建立数据库
conn, cur = init_db(DB_FILE = './the_script.db')
2 任务初始化
同步任务,可以按照id分为若干区间,每个区间任务额可以称为lot。
tuple_list = slice_list_by_batch1(min_id,max_id, 10000)
task_df = pd.DataFrame()
task_df['name'] = ['lot_%s' % i for i in range(len(tuple_list)) ]
task_df['content'] = tuple_list
task_df['content'] = task_df['content'].apply(lambda x: json.dumps(x))
task_lod = df2lod(task_df)
# 写入任务
cur.executemany("INSERT INTO tasks (name, content) VALUES (?,?)", [(t['name'],t['content']) for t in task_lod])
conn.commit()res = cur.execute('select count(*) from tasks').fetchone()
dict(res)
sqlite也可以提供类似 Row Dict的格式,需要
conn.row_factory = sqlite3.Row cur = conn.cursor()row = cur.execute(new_task_sql).fetchone()
3 处理一次
每次启动时,读出未处理的任务。任务里有tuple参数,根据tuple的起止区间获取数据后进行处理,如果成功就根据任务名将状态更新为2,异常则更新为3。这样每次重复执行这个就可以了,因为数据持久化在文件中,所以即使和服务器断开连接也没关系。
def process_one():new_task_sql = 'select name, content, status from tasks where status = 0 limit 1'# new_task = cur.execute(new_task_sql).fetchall()conn.row_factory = sqlite3.Row cur = conn.cursor()row = cur.execute(new_task_sql).fetchone()process_code = 1if row:row_dict = dict(row)tem_tuple = json.loads(row_dict['content'])print('>>>> ',row_dict['name'])try:some_tuple = tem_tuple---- DO LOGICcur.execute("UPDATE tasks SET status = 2 WHERE name = ?", (row_dict['name'],))conn.commit()except:cur.execute("UPDATE tasks SET status = 3 WHERE name = ?", (row_dict['name'],))conn.commit()else:process_code = 0return process_code
4 处理直到结束
由于 process_one
在没有获取到待处理任务行时会返回0,这个作为结束信号。所以可以给到一个略大的循环次数,当收到结束信号时停止。
for i in range(30000):process_code = process_one()if not process_code:print('无待处理任务')break