当前位置: 首页 > news >正文

【最后203篇系列】035 数据表同步简单方案

有一个常见的任务:将数据从事务表搬到分析表。

整个过程比较长,所以是可能中断的,那就会比较麻烦:不知道在哪里断了,怎么衔接。

在034里,我试着用sqlite进行简单的任务持久化,每次根据持久化结果执行任务。原理没问题,逻辑没问题。但实际执行还是出了幺蛾子:我发现有些重服务的任务执行。一方面是terminal中断导致的问题,另一方面sqlite的读写怎么会有影响???照理说,每次处理完我都commit状态更新,最坏的情况应该也是差一个区块。

后来我看了下,涉及到的表操作很简单,数据规模也很小,完全没必要用sqlite。pandas+pickle是更好的方案。

我稍微梳理一下:

在建立了jupyter环境之后

1 源信息映射

将数据源信息,主要是表结构用SQLAlchemy来映射

from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, func,and_ , BigInteger
from sqlalchemy.orm import sessionmaker,declarative_base
from datetime import datetime
from clickhouse_sqlalchemy import make_session, engines
import time 
# 创建数据库引擎
mysql_engine = create_engine(mysql_url)
# 创建基类
Base1 = declarative_base()
# 定义数据模型
class THEMODEL(Base1):__tablename__ = 'xxx'...def dict(self):data_dict = {}for col in cols:data_dict[col] = getattr(self, col)return data_dict 
# 创建表
Base1.metadata.create_all(mysql_engine)
# 创建会话
MysqlSession = sessionmaker(bind=mysql_engine)

2 创建任务信息

根据id的范围进行同步,将对应的任务表按dataframe的方式整合

from Basefuncs import *
# to run links 
tuple_list = slice_list_by_batch1(min_id,max_id, 10000)
task_df = pd.DataFrame()
task_df['task_name'] = ['lot_%s' % i  for i in range(len(tuple_list))]
task_df['content'] = tuple_list
task_df['status'] = 0
task_df['input_recs'] = 0
to_pickle(task_df, persistent_name)

3 创建目标表

使用clickhouse作为目标分析表,可以不用特别建立索引,额外加上一个lot_name字段,这样可以在结束时进行校验。

CREATE TABLE t_ds_company_basic_copy
(id Float64,                      
...lot_name String
)
ENGINE = MergeTree
ORDER BY  id
SETTINGS index_granularity = 8192;

4 创建转换逻辑

主要是处理时间格式,另外就是额外加入lot_name字段。

转换逻辑

import datetimeMIN_DATE = datetime.datetime(1970, 1, 1)
MAX_DATE = datetime.datetime(2106, 2, 7)def fix_dates(record: dict, lot_name = 'x'):for k, v in record.items():if isinstance(v, datetime.date) and not isinstance(v, datetime.datetime):v = datetime.datetime.combine(v, datetime.time())if isinstance(v, datetime.datetime):if v < MIN_DATE:v = MIN_DATEelif v > MAX_DATE:v = MAX_DATErecord[k] = vrecord['lot_name']=lot_namereturn recordmclick = MClickHouse(**chcfg.model_dump())
mclick._exe_sql('show databases;')

获取逻辑,使用SQLAlchemy

# 查询 id 范围的数据
def query_postgres_by_id_range(the_session, some_tuple):start_id, end_id = some_tuplewith the_session() as session:results = session.query(THE_SQLALCHEMY).filter(and_(THE_SQLALCHEMY.id >= start_id, THE_SQLALCHEMY.id < end_id)).all()return [x.dict() for x in results]

核心逻辑:读取task_df, 选择一个没有执行的区块,执行后更新区块状态,保存task_df。

def process_one(target_table='xxx', persistent_name = persistent_name):task_df = from_pickle(persistent_name)sel =  task_df['status'] == 0process_code = 1if sel.sum():row_dict = dict(task_df[sel].iloc[0])task_name = row_dict['task_name']print('>>>> ',task_name)try:some_tuple = row_dict['content']input_lod = query_postgres_by_id_range(MysqlSession, some_tuple)fixed_lod = [fix_dates(r, lot_name=task_name) for r in input_lod]task_df.loc[task_df['task_name'] == task_name, 'input_recs'] = len(input_lod)tem_df = pd.DataFrame(input_lod)_cols1 = list(tem_df.columns)table_name = target_tableinsert_cols = ','.join(_cols1)mclick.client.execute(f"insert into  {table_name}  ({insert_cols}) values",fixed_lod,types_check=True,  # 可以确保数据类型正确settings={"insert_deduplicate": True} )task_df.loc[task_df['task_name'] == task_name, 'status'] = 2except:print('error', task_name)task_df.loc[task_df['task_name'] == task_name, 'status'] = 3finally:# 重新覆盖to_pickle(task_df, persistent_name)else:process_code = 0return process_code

5 批量执行

可以给到一个稍大的循环,然后读到终止信号时停止。

for i in range(30000):process_code = process_one()if not process_code:print('无待处理任务')break 

这样就可以比较流程化的做这件事了,各部分还可以继续优化来简化工作。比如从源表进行映射,可以读取建表语句,然后自动创建ORM对象。

http://www.dtcms.com/a/347680.html

相关文章:

  • 深入理解 React useEffect
  • 语义普遍性与形式化:构建深层语义理解的统一框架
  • 串与数组:从字符处理到多维存储的数据结构详解
  • 【python】min(key=...)用法
  • 【Kubernetes知识点】资源配额与访问控制
  • 小白向:Obsidian(Markdown语法学习)快速入门完全指南:从零开始构建你的第二大脑(免费好用的笔记软件的知识管理系统)、黑曜石笔记
  • Redis学习笔记 ---- 基于token实现登录功能
  • 多媒体内容生成 - 超越文本的生产力
  • 使用自制的NTC测量模块测试Plecs的热仿真效果
  • python如何下载库——0基础教程
  • 【使用Unsloth 微调】数据集的种类
  • Linux|数据库|2025最新数据库管理工具cloudbeaver-25.0.1的docker方式部署和二进制方式部署
  • leetcode刷题记录03——top100题里的6道简单+1道中等题
  • 单例模式介绍
  • 企业视频库管理高效策略
  • Java和数据库的关系
  • 如何利用 DeepSeek 提升工作效率
  • C++的struct里面可以放函数,讨论一下C++和C关于struct的使用区别
  • 基于TimeMixer现有脚本扩展的思路分析
  • 网络参考模型操作指南
  • 大数据接口 - 企业风险报告(专业版)API接口文档
  • 【Vue✨】Vue 中的 diff 算法详解
  • Compose笔记(四十七)--SnackbarHost
  • 14.Shell脚本修炼手册--玩转循环结构(While 与 Until 的应用技巧与案例)
  • 使用sys数据库分析 MySQL
  • 2015-2018年咸海流域1km归一化植被指数8天合成数据集
  • 【大模型应用开发 4.RAG高级技术与实践】
  • LeetCode算法日记 - Day 20: 两整数之和、只出现一次的数字II
  • 《P3623 [APIO2008] 免费道路》
  • Java22 stream 新特性 窗口算子 与 虚拟线程map操作:Gatherer 和 Gatherers工具类