当前位置: 首页 > news >正文

社保、医保、个税、公积金纵向横向合并 python3

import pandas as pd
import os
import re
from functools import reduce
import openpyxl
from openpyxl.styles import PatternFillDEBUG = False  # 设置为 True 可以输出调试信息# 配置
directory = 'D:/Documents/2025/社保费管理客户端/08/combind'
key_column = '证件号码'
name_column = '姓名'
amount_column = '应补(退)费额'output_dir = os.path.join(directory, 'output')
os.makedirs(output_dir, exist_ok=True)final_output_file = os.path.join(output_dir, '合并员工保险数据.xlsx')# 处理个税Excel文件(要把导出的 xls 另存为 xlsx 格式)
def read_tax_excel(file_path, key_column, type_name, name_column):wb = openpyxl.load_workbook(file_path, data_only=True)ws = wb.active# 数据从第9行开始data = []for row in ws.iter_rows(min_row=9, values_only=True):id_val = row[3]   # D列amount_val = row[40]  # AO列if id_val is None:continuedata.append({key_column: str(id_val), type_name: amount_val, name_column: ""})return pd.DataFrame(data)# 处理住房公积金申报表 Excel文件(要把 xlsm 另存为 xlsx 格式)
def read_fund_excel(file_path, key_column, type_name, name_column):wb = openpyxl.load_workbook(file_path, data_only=True)ws = wb.active# 数据从第7行开始data = []for row in ws.iter_rows(min_row=7, values_only=True):id_val = row[4]   # E列 5amount_val = row[18]  # S列 19if id_val is None:continuedata.append({key_column: str(id_val), type_name: amount_val, name_column: ""})return pd.DataFrame(data)files = [f for f in os.listdir(directory) if f.endswith('.xlsx')]group_patterns = [(r'失业保险\(个人缴纳\)', '失业保险(个人)'),(r'失业保险\(单位缴纳\)', '失业保险(单位)'),(r'工伤保险', '工伤保险'),(r'机关事业单位基本养老保险费(个人缴纳)', '养老保险(个人)'),(r'机关事业单位基本养老保险费(单位缴纳)', '养老保险(单位)'),(r'职工基本养老保险\(个人缴纳\)', '养老保险(个人)'),(r'职工基本医疗保险\(单位缴纳\)', '医疗保险(单位)'),(r'职工基本医疗保险\(个人缴纳\)', '医疗保险(个人)'),(r'职工大额医疗互助保险\(单位缴纳\)', '医保大额(个人)'),(r'公务员医疗补助', '公务员医补(单位)'),(r'职工基本养老保险\(单位缴纳\)', '养老保险(单位)'),(r'机关事业单位职业年金(个人缴费)','职业年金(个人)'),(r'.*_综合所得申报.*', '个税'),(r'.*住房公积金申报表.*', '公积金(单位)'),
]# 1. 分组
similar_groups = {}
for f in files:for pat, group in group_patterns:if re.search(pat, f):similar_groups.setdefault(group, []).append(f)break
if DEBUG:print("分组结果:", similar_groups)# 2. 纵向合并并保存每个险种的合并结果
vertical_files = []
for type_name, group_files in similar_groups.items():vertical_dfs = []for f in group_files:file_path = os.path.join(directory, f)print("====",file_path)if os.path.exists(file_path):try:if type_name == '个税':# 个税文件特殊处理df_v = read_tax_excel(file_path, key_column, type_name, name_column)df_v[key_column] = df_v[key_column].astype(str).str.replace('x', 'X')if DEBUG:print(df_v.head())elif type_name== '公积金(单位)':# 住房公积金申报表特殊处理df_v = read_fund_excel(file_path, key_column, type_name, name_column)df_v[key_column] = df_v[key_column].astype(str).str.replace('x', 'X')if DEBUG:print(df_v.head())else:usecols = [key_column, name_column, amount_column]df_v = pd.read_excel(file_path,usecols=lambda col: col in usecols,dtype={key_column: str},converters={key_column: str})df_v[key_column] = df_v[key_column].astype(str).str.replace('x', 'X')cols = [c for c in [key_column, name_column, amount_column] if c in df_v.columns]if len(cols) < 2:print(f"警告: 文件 {f} 缺少必要列,跳过")continuedf_v = df_v[cols]df_v = df_v.rename(columns={amount_column: type_name})vertical_dfs.append(df_v)except Exception as e:print(f"错误: 读取文件 {f} 失败 - {e}")if vertical_dfs:merged = pd.concat(vertical_dfs, ignore_index=True)# 对同一证件号码,姓名取第一个非空,金额求和agg_dict = {type_name: 'sum'}if name_column in merged.columns:agg_dict[name_column] = 'first'merged = merged.groupby(key_column, as_index=False).agg(agg_dict)# 保存每个险种的合并结果out_path = os.path.join(output_dir, f"{type_name}_合并.xlsx")merged.to_excel(out_path, index=False)print(f"{type_name} 合并完成,输出文件: {out_path}")vertical_files.append(out_path)# 3. 横向合并所有险种的合并结果
dfs = []
for f in vertical_files:df = pd.read_excel(f, dtype={key_column: str}, converters={key_column: str})dfs.append(df)if dfs:def merge_func(left, right):# 合并前处理姓名列,优先保留左侧姓名if name_column in left.columns and name_column in right.columns:merged = pd.merge(left, right, on=[key_column], how='outer', suffixes=('', '_dup'))merged[name_column] = merged[name_column].combine_first(merged.get(f"{name_column}_dup"))if f"{name_column}_dup" in merged.columns:merged.drop(columns=[f"{name_column}_dup"], inplace=True)return mergedelse:return pd.merge(left, right, on=[key_column], how='outer')merged_df = reduce(merge_func, dfs)merged_df[key_column] = merged_df[key_column].astype(str)# 只统计险种列amount_cols = [col for col in merged_df.columns if col not in [key_column, name_column, '总金额']]for col in amount_cols:merged_df[col] = pd.to_numeric(merged_df[col], errors='coerce').fillna(0)# 新增一列“公积金个人”,金额与“公积金(单位)”相同if '公积金(单位)' in merged_df.columns:merged_df['公积金(个人)'] = merged_df['公积金(单位)']# 汇总列unit_types = ['养老保险(单位)', '失业保险(单位)', '工伤保险', '医疗保险(单位)', '公务员医补(单位)', '公积金(单位)']personal_types = ['公积金(个人)','养老保险(个人)', '职业年金(个人)', '失业保险(个人)', '个税', '医疗保险(个人)', '医保大额(个人)']unit_cols = [col for col in unit_types if col in merged_df.columns]personal_cols = [col for col in personal_types if col in merged_df.columns]merged_df['单位汇总'] = merged_df[unit_cols].sum(axis=1) if unit_cols else 0merged_df['个人汇总'] = merged_df[personal_cols].sum(axis=1) if personal_cols else 0# 列顺序调整final_cols = ([name_column, key_column] +unit_cols +['公积金个人'] +         # 新增公积金个人列['单位汇总'] +personal_cols +['个人汇总'])# 保证所有列都在表中final_cols = [col for col in final_cols if col in merged_df.columns]merged_df = merged_df.reindex(columns=final_cols)merged_df = merged_df.sort_values(by=key_column).reset_index(drop=True)merged_df.to_excel(final_output_file, index=False)print(f"最终合并完成,输出文件: {final_output_file}")# 设置所有列名单元格自动换行wb = openpyxl.load_workbook(final_output_file)ws = wb.activefor cell in ws[1]:  # 第一行是列名cell.alignment = openpyxl.styles.Alignment(wrap_text=True)# 设置单位汇总和个人汇总列背景色为浅蓝色fill = PatternFill(fill_type="solid", fgColor="B7DEE8")  # 浅蓝色# 获取第一行所有列名header = [cell.value for cell in ws[1]]for col_name in ['单位汇总', '个人汇总']:if col_name in header:col_idx = header.index(col_name) + 1  # openpyxl列索引从1开始for row in ws.iter_rows(min_row=2, min_col=col_idx, max_col=col_idx):for cell in row:cell.fill = fillwb.save(final_output_file)
else:print("无数据可合并")
http://www.dtcms.com/a/334463.html

相关文章:

  • 深入理解 Vue Router
  • Centos7.9安装Dante
  • 04时间复杂度计算方法
  • Python 桌面应用形态后台管理系统的技术选型与方案报告
  • Linux系统之lslogins 命令详解
  • vector 手动实现 及遇到的各种细节问题
  • 深入剖析 TOTP 算法:基于时间的一次性密码生成机制
  • Golang分布式事务处理方案
  • 如何在win服务器中部署若依项目
  • JVM垃圾回收器
  • 深度解析Java synchronized关键字及其底层实现原理
  • python学习DAY43打卡
  • C++实战
  • 如果构建企业本地的ERP智能ai系统,让先进的大模型数据处理ERP的各类数据,更加轻松智能,准确?从企业资源计划ERP变成企业资源智能EPA的升级
  • CUDA 编程笔记:CUDA内存模型概述
  • 【数据库】Oracle学习笔记整理之五:ORACLE体系结构 - 参数文件与控制文件(Parameter Files Control Files)
  • 虚拟专用网技术
  • Gradle#构建生命周期三个阶段
  • PyTorch神经网络工具箱(如何构建神经网络?)
  • 基于几何平面的寻路算法:SPEV1Auxiliary全面解析
  • 数据库Microsoft Access、SQL Server和SQLite三者对比及数据库的选型建议
  • Win11家庭版docker安装Minio
  • HTTP 1.0, 2.0 和 3.0 有什么区别?
  • Day11 栈与队列part2
  • 图论Day4学习心得
  • Git Revert 特定文件/路径的方法
  • 使用openssl创建自签名CA并用它签发服务器证书
  • 技术赋能与深度洞察:北京国标政务窗口第三方调查
  • 【攻防实战】红队攻防之Goby反杀
  • day34-LNMP详解