【最后203篇系列】035 数据表同步简单方案
有一个常见的任务:将数据从事务表搬到分析表。
整个过程比较长,所以是可能中断的,那就会比较麻烦:不知道在哪里断了,怎么衔接。
在034里,我试着用sqlite进行简单的任务持久化,每次根据持久化结果执行任务。原理没问题,逻辑没问题。但实际执行还是出了幺蛾子:我发现有些重服务的任务执行。一方面是terminal中断导致的问题,另一方面sqlite的读写怎么会有影响???照理说,每次处理完我都commit状态更新,最坏的情况应该也是差一个区块。
后来我看了下,涉及到的表操作很简单,数据规模也很小,完全没必要用sqlite。pandas+pickle是更好的方案。
我稍微梳理一下:
在建立了jupyter环境之后
1 源信息映射
将数据源信息,主要是表结构用SQLAlchemy来映射
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, func,and_ , BigInteger
from sqlalchemy.orm import sessionmaker,declarative_base
from datetime import datetime
from clickhouse_sqlalchemy import make_session, engines
import time
# 创建数据库引擎
mysql_engine = create_engine(mysql_url)
# 创建基类
Base1 = declarative_base()
# 定义数据模型
class THEMODEL(Base1):__tablename__ = 'xxx'...def dict(self):data_dict = {}for col in cols:data_dict[col] = getattr(self, col)return data_dict
# 创建表
Base1.metadata.create_all(mysql_engine)
# 创建会话
MysqlSession = sessionmaker(bind=mysql_engine)
2 创建任务信息
根据id的范围进行同步,将对应的任务表按dataframe的方式整合
from Basefuncs import *
# to run links
tuple_list = slice_list_by_batch1(min_id,max_id, 10000)
task_df = pd.DataFrame()
task_df['task_name'] = ['lot_%s' % i for i in range(len(tuple_list))]
task_df['content'] = tuple_list
task_df['status'] = 0
task_df['input_recs'] = 0
to_pickle(task_df, persistent_name)
3 创建目标表
使用clickhouse作为目标分析表,可以不用特别建立索引,额外加上一个lot_name字段,这样可以在结束时进行校验。
CREATE TABLE t_ds_company_basic_copy
(id Float64,
...lot_name String
)
ENGINE = MergeTree
ORDER BY id
SETTINGS index_granularity = 8192;
4 创建转换逻辑
主要是处理时间格式,另外就是额外加入lot_name字段。
转换逻辑
import datetimeMIN_DATE = datetime.datetime(1970, 1, 1)
MAX_DATE = datetime.datetime(2106, 2, 7)def fix_dates(record: dict, lot_name = 'x'):for k, v in record.items():if isinstance(v, datetime.date) and not isinstance(v, datetime.datetime):v = datetime.datetime.combine(v, datetime.time())if isinstance(v, datetime.datetime):if v < MIN_DATE:v = MIN_DATEelif v > MAX_DATE:v = MAX_DATErecord[k] = vrecord['lot_name']=lot_namereturn recordmclick = MClickHouse(**chcfg.model_dump())
mclick._exe_sql('show databases;')
获取逻辑,使用SQLAlchemy
# 查询 id 范围的数据
def query_postgres_by_id_range(the_session, some_tuple):start_id, end_id = some_tuplewith the_session() as session:results = session.query(THE_SQLALCHEMY).filter(and_(THE_SQLALCHEMY.id >= start_id, THE_SQLALCHEMY.id < end_id)).all()return [x.dict() for x in results]
核心逻辑:读取task_df, 选择一个没有执行的区块,执行后更新区块状态,保存task_df。
def process_one(target_table='xxx', persistent_name = persistent_name):task_df = from_pickle(persistent_name)sel = task_df['status'] == 0process_code = 1if sel.sum():row_dict = dict(task_df[sel].iloc[0])task_name = row_dict['task_name']print('>>>> ',task_name)try:some_tuple = row_dict['content']input_lod = query_postgres_by_id_range(MysqlSession, some_tuple)fixed_lod = [fix_dates(r, lot_name=task_name) for r in input_lod]task_df.loc[task_df['task_name'] == task_name, 'input_recs'] = len(input_lod)tem_df = pd.DataFrame(input_lod)_cols1 = list(tem_df.columns)table_name = target_tableinsert_cols = ','.join(_cols1)mclick.client.execute(f"insert into {table_name} ({insert_cols}) values",fixed_lod,types_check=True, # 可以确保数据类型正确settings={"insert_deduplicate": True} )task_df.loc[task_df['task_name'] == task_name, 'status'] = 2except:print('error', task_name)task_df.loc[task_df['task_name'] == task_name, 'status'] = 3finally:# 重新覆盖to_pickle(task_df, persistent_name)else:process_code = 0return process_code
5 批量执行
可以给到一个稍大的循环,然后读到终止信号时停止。
for i in range(30000):process_code = process_one()if not process_code:print('无待处理任务')break
这样就可以比较流程化的做这件事了,各部分还可以继续优化来简化工作。比如从源表进行映射,可以读取建表语句,然后自动创建ORM对象。