当前位置: 首页 > news >正文

自动识别手机和电脑版本网站阿里网站建设视频教程

自动识别手机和电脑版本网站,阿里网站建设视频教程,常熟专业做网站,wordpress 模板 源码有一个常见的任务:将数据从事务表搬到分析表。 整个过程比较长,所以是可能中断的,那就会比较麻烦:不知道在哪里断了,怎么衔接。 在034里,我试着用sqlite进行简单的任务持久化,每次根据持久化结果…

有一个常见的任务:将数据从事务表搬到分析表。

整个过程比较长,所以是可能中断的,那就会比较麻烦:不知道在哪里断了,怎么衔接。

在034里,我试着用sqlite进行简单的任务持久化,每次根据持久化结果执行任务。原理没问题,逻辑没问题。但实际执行还是出了幺蛾子:我发现有些重服务的任务执行。一方面是terminal中断导致的问题,另一方面sqlite的读写怎么会有影响???照理说,每次处理完我都commit状态更新,最坏的情况应该也是差一个区块。

后来我看了下,涉及到的表操作很简单,数据规模也很小,完全没必要用sqlite。pandas+pickle是更好的方案。

我稍微梳理一下:

在建立了jupyter环境之后

1 源信息映射

将数据源信息,主要是表结构用SQLAlchemy来映射

from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, func,and_ , BigInteger
from sqlalchemy.orm import sessionmaker,declarative_base
from datetime import datetime
from clickhouse_sqlalchemy import make_session, engines
import time 
# 创建数据库引擎
mysql_engine = create_engine(mysql_url)
# 创建基类
Base1 = declarative_base()
# 定义数据模型
class THEMODEL(Base1):__tablename__ = 'xxx'...def dict(self):data_dict = {}for col in cols:data_dict[col] = getattr(self, col)return data_dict 
# 创建表
Base1.metadata.create_all(mysql_engine)
# 创建会话
MysqlSession = sessionmaker(bind=mysql_engine)

2 创建任务信息

根据id的范围进行同步,将对应的任务表按dataframe的方式整合

from Basefuncs import *
# to run links 
tuple_list = slice_list_by_batch1(min_id,max_id, 10000)
task_df = pd.DataFrame()
task_df['task_name'] = ['lot_%s' % i  for i in range(len(tuple_list))]
task_df['content'] = tuple_list
task_df['status'] = 0
task_df['input_recs'] = 0
to_pickle(task_df, persistent_name)

3 创建目标表

使用clickhouse作为目标分析表,可以不用特别建立索引,额外加上一个lot_name字段,这样可以在结束时进行校验。

CREATE TABLE t_ds_company_basic_copy
(id Float64,                      
...lot_name String
)
ENGINE = MergeTree
ORDER BY  id
SETTINGS index_granularity = 8192;

4 创建转换逻辑

主要是处理时间格式,另外就是额外加入lot_name字段。

转换逻辑

import datetimeMIN_DATE = datetime.datetime(1970, 1, 1)
MAX_DATE = datetime.datetime(2106, 2, 7)def fix_dates(record: dict, lot_name = 'x'):for k, v in record.items():if isinstance(v, datetime.date) and not isinstance(v, datetime.datetime):v = datetime.datetime.combine(v, datetime.time())if isinstance(v, datetime.datetime):if v < MIN_DATE:v = MIN_DATEelif v > MAX_DATE:v = MAX_DATErecord[k] = vrecord['lot_name']=lot_namereturn recordmclick = MClickHouse(**chcfg.model_dump())
mclick._exe_sql('show databases;')

获取逻辑,使用SQLAlchemy

# 查询 id 范围的数据
def query_postgres_by_id_range(the_session, some_tuple):start_id, end_id = some_tuplewith the_session() as session:results = session.query(THE_SQLALCHEMY).filter(and_(THE_SQLALCHEMY.id >= start_id, THE_SQLALCHEMY.id < end_id)).all()return [x.dict() for x in results]

核心逻辑:读取task_df, 选择一个没有执行的区块,执行后更新区块状态,保存task_df。

def process_one(target_table='xxx', persistent_name = persistent_name):task_df = from_pickle(persistent_name)sel =  task_df['status'] == 0process_code = 1if sel.sum():row_dict = dict(task_df[sel].iloc[0])task_name = row_dict['task_name']print('>>>> ',task_name)try:some_tuple = row_dict['content']input_lod = query_postgres_by_id_range(MysqlSession, some_tuple)fixed_lod = [fix_dates(r, lot_name=task_name) for r in input_lod]task_df.loc[task_df['task_name'] == task_name, 'input_recs'] = len(input_lod)tem_df = pd.DataFrame(input_lod)_cols1 = list(tem_df.columns)table_name = target_tableinsert_cols = ','.join(_cols1)mclick.client.execute(f"insert into  {table_name}  ({insert_cols}) values",fixed_lod,types_check=True,  # 可以确保数据类型正确settings={"insert_deduplicate": True} )task_df.loc[task_df['task_name'] == task_name, 'status'] = 2except:print('error', task_name)task_df.loc[task_df['task_name'] == task_name, 'status'] = 3finally:# 重新覆盖to_pickle(task_df, persistent_name)else:process_code = 0return process_code

5 批量执行

可以给到一个稍大的循环,然后读到终止信号时停止。

for i in range(30000):process_code = process_one()if not process_code:print('无待处理任务')break 

这样就可以比较流程化的做这件事了,各部分还可以继续优化来简化工作。比如从源表进行映射,可以读取建表语句,然后自动创建ORM对象。

http://www.dtcms.com/a/513377.html

相关文章:

  • 银行网站建设中优化大师下载电脑版
  • 网站建设的通知网站维护分工关键词爱站网关键词挖掘工具
  • 盘锦市住房和城乡建设厅网站济南的企业网站建设
  • Aspose.Total for .NET Crack
  • 企业建设银行网站登录不了网络推广方案有哪些
  • 苏州网站建设免费网站制作平台
  • 做外贸的网站有何用处阜阳企业网站推广
  • Orleans 与 Kubernetes 结合的价值分析
  • 长春免费网站制作做网站用什么cms 知乎
  • 国外网站域名怎么做网站8uftp
  • 自媒体可做外链网站广州做网站建设的公司排名
  • 建设网站哪家便宜百度排名优化
  • MySQL(四) - 数据查询操作
  • Linux相关概念和易错知识点(47)(五种IO模型、多路转接、select、poll、epoll)
  • jq动画效果网站wordpress 两个数据库 互通
  • 网站如何做淘宝推广食品经营许可证
  • wordpress 架站 电子书三水区网站建设
  • opendds初入门之monitor监控的简单练习
  • 清新区住房和城乡建设局网站企业建设H5响应式网站的5大好处
  • 做营利网站的风险太原网站建设制作报价
  • 在Jetson 上使用Intel RealSense D435运行RTAB-Map就行建图
  • 怎么建立自己的网站域名咸阳软件开发公司
  • 织梦做的网站怎样做小程序的平台
  • 律师在哪个网站做推广好深圳哪个网站发布做网站
  • 三字型布局的网站中国肩章大全图解
  • 响应式网站成本品牌官网建设内容
  • 提供网站备案建设服务大连甘井子区社区工作者招聘
  • 公司网站开发费用大概多少商品交易平台
  • 徐州网页设计seo前景
  • 【Linux】基本指令(入门篇)(下)