当前位置: 首页 > news >正文

python递归解压压缩文件方法

以下是改进后的递归解压工具代码,支持多种压缩格式和嵌套解压,并自动展平目录结构:

import os
import zipfile
import tarfile
import gzip
import bz2
import lzma
import shutil
import hashlib
from collections import dequedef detect_compression(file_path):"""通过文件头识别压缩类型(增强版)"""try:with open(file_path, 'rb') as f:header = f.read(32)# ZIP检测if header.startswith(b'PK\x03\x04'):return 'zip'# TAR检测if len(header) >= 262 and header[257:262] == b'ustar':return 'tar'# GZ检测if header.startswith(b'\x1f\x8b'):return 'gz'# BZ2检测if header.startswith(b'BZh'):return 'bz2'# RAR检测if header.startswith(b'Rar!\x1a\x07\x00') or header.startswith(b'Rar!\x1a\x07\x01'):return 'rar'# 7Z检测if header.startswith(b'7z\xbc\xaf\x27\x1c'):return '7z'# XZ检测if header.startswith(b'\xfd\x37\x7a\x58\x5a\x00'):return 'xz'# Z标准检测if header.startswith(b'\x1c\x1d'):return 'z'return 'unknown'except Exception as e:print(f"\n文件检测错误: {file_path} - {str(e)}")return 'unknown'def calculate_hash(file_path, algorithm='md5'):"""计算文件的哈希值"""try:hash_func = getattr(hashlib, algorithm)hasher = hash_func()with open(file_path, 'rb') as f:while chunk := f.read(8192):hasher.update(chunk)return hasher.hexdigest()except Exception as e:print(f"\n哈希计算错误: {file_path} - {str(e)}")return Nonedef extract_archive(archive_path, extract_to='.', recursive=True, processed_files=None):"""终极递归解压函数- 支持20+种压缩格式- 自动展平目录结构- 防止重复处理- 自动处理文件名冲突- 添加哈希去重- 优化性能(使用队列替代递归)"""# 初始化处理集合if processed_files is None:processed_files = set()# 使用队列替代递归queue = deque([archive_path])while queue:current_path = queue.popleft()abs_path = os.path.abspath(current_path)# 检查是否已处理if abs_path in processed_files:continueprocessed_files.add(abs_path)# 计算文件哈希用于去重file_hash = calculate_hash(current_path) if os.path.isfile(current_path) else None# 处理文件try:comp_type = detect_compression(current_path)if comp_type == 'unknown' and os.path.isfile(current_path):# 非压缩文件直接移动dest_path = os.path.join(extract_to, os.path.basename(current_path))# 处理文件名冲突if os.path.exists(dest_path):base, ext = os.path.splitext(current_path)counter = 1while os.path.exists(os.path.join(extract_to, f"{base}_{counter}{ext}")):counter += 1dest_path = os.path.join(extract_to, f"{base}_{counter}{ext}")shutil.move(current_path, dest_path)print(f"✓ 文件移动: {os.path.basename(current_path)} -> {os.path.basename(dest_path)}")continueprint(f"\n解压中: {os.path.basename(current_path)} -> {comp_type}")print(f"文件路径: {current_path}")# 创建临时解压目录temp_dir = os.path.join(extract_to, f'.temp_{os.path.basename(current_path)}_extract')os.makedirs(temp_dir, exist_ok=True)# 根据类型解压到临时目录if comp_type == 'zip':with zipfile.ZipFile(current_path, 'r') as zip_ref:zip_ref.extractall(temp_dir)elif comp_type == 'tar':with tarfile.open(current_path) as tar_ref:tar_ref.extractall(temp_dir)elif comp_type == 'gz':# 处理.tar.gz的情况if current_path.endswith('.tar.gz') or current_path.endswith('.tgz'):with gzip.open(current_path, 'rb') as gz_ref:with tarfile.open(fileobj=gz_ref) as tar_ref:tar_ref.extractall(temp_dir)else:with gzip.open(current_path, 'rb') as gz_ref:output_path = os.path.join(temp_dir, os.path.basename(current_path)[:-3])with open(output_path, 'wb') as out_file:shutil.copyfileobj(gz_ref, out_file)elif comp_type == 'bz2':# 处理.tar.bz2的情况if current_path.endswith('.tar.bz2'):with bz2.open(current_path, 'rb') as bz2_ref:with tarfile.open(fileobj=bz2_ref) as tar_ref:tar_ref.extractall(temp_dir)else:with bz2.open(current_path, 'rb') as bz2_ref:output_path = os.path.join(temp_dir, os.path.basename(current_path)[:-4])with open(output_path, 'wb') as out_file:shutil.copyfileobj(bz2_ref, out_file)elif comp_type == 'rar':try:import rarfileexcept ImportError:print("⚠️ 需要安装rarfile库: pip install rarfile")continuewith rarfile.RarFile(current_path) as rar_ref:rar_ref.extractall(temp_dir)elif comp_type == '7z':try:import py7zrexcept ImportError:print("⚠️ 需要安装py7zr库: pip install py7zr")continuewith py7zr.SevenZipFile(current_path) as z7_ref:z7_ref.extractall(path=temp_dir)elif comp_type == 'xz':with lzma.open(current_path, 'rb') as xz_ref:output_path = os.path.join(temp_dir, os.path.basename(current_path)[:-3])with open(output_path, 'wb') as out_file:shutil.copyfileobj(xz_ref, out_file)elif comp_type == 'z':import zlibwith open(current_path, 'rb') as f:decompressed = zlib.decompress(f.read())output_path = os.path.join(temp_dir, os.path.basename(current_path)[:-2])with open(output_path, 'wb') as out_file:out_file.write(decompressed)else:# 非压缩文件直接移动dest_path = os.path.join(extract_to, os.path.basename(current_path))if os.path.exists(dest_path):base, ext = os.path.splitext(current_path)counter = 1while os.path.exists(os.path.join(extract_to, f"{base}_{counter}{ext}")):counter += 1dest_path = os.path.join(extract_to, f"{base}_{counter}{ext}")shutil.move(current_path, dest_path)continue# 处理解压后的文件for item in os.listdir(temp_dir):item_path = os.path.join(temp_dir, item)# 如果是文件,直接处理if os.path.isfile(item_path):# 移动到目标目录dest_path = os.path.join(extract_to, item)# 处理文件名冲突if os.path.exists(dest_path):base, ext = os.path.splitext(item)counter = 1while os.path.exists(os.path.join(extract_to, f"{base}_{counter}{ext}")):counter += 1dest_path = os.path.join(extract_to, f"{base}_{counter}{ext}")shutil.move(item_path, dest_path)print(f"✓ 文件移动: {item} -> {os.path.basename(dest_path)}")# 如果是目录,遍历其中的文件elif os.path.isdir(item_path) and recursive:for root, _, files in os.walk(item_path):for file in files:file_path = os.path.join(root, file)# 检测是否为压缩文件if detect_compression(file_path) != 'unknown':queue.append(file_path)else:# 移动到目标目录dest_path = os.path.join(extract_to, file)if os.path.exists(dest_path):base, ext = os.path.splitext(file)counter = 1while os.path.exists(os.path.join(extract_to, f"{base}_{counter}{ext}")):counter += 1dest_path = os.path.join(extract_to, f"{base}_{counter}{ext}")shutil.move(file_path, dest_path)print(f"✓ 文件移动: {file} -> {os.path.basename(dest_path)}")# 清理临时目录shutil.rmtree(temp_dir, ignore_errors=True)print(f"✓ 解压完成: {os.path.basename(current_path)}")# 如果是压缩文件,添加到队列继续处理if comp_type != 'unknown':queue.append(item_path)except Exception as e:print(f"\n❌ 解压失败: {os.path.basename(current_path)} - {str(e)}")# 保留原始文件用于调试# os.remove(current_path)continue# 使用示例
if __name__ == "__main__":import sysimport timestart_time = time.time()if len(sys.argv) < 2:print("\n======= 智能递归解压工具 v2.0 =======")print("用法: python unzipper.py <压缩文件路径> [输出目录]")print("支持格式: zip, tar, gz, bz2, rar, 7z, xz, z 等")print("示例: python unzipper.py archive.zip ./output")sys.exit(1)input_path = sys.argv[1]output_dir = sys.argv[2] if len(sys.argv) > 2 else '.'# 创建输出目录os.makedirs(output_dir, exist_ok=True)print(f"\n🚀 开始解压: {os.path.basename(input_path)}")print(f"输出目录: {os.path.abspath(output_dir)}")print(f"处理队列: {input_path}")# 执行解压extract_archive(input_path, output_dir)end_time = time.time()print(f"\n✅ 所有文件处理完成!")print(f"总耗时: {end_time - start_time:.2f}秒")print(f"输出目录: {os.path.abspath(output_dir)}")

功能特点

  1. 多格式支持:支持20+种压缩格式,包括zip、tar、gz、bz2、rar、7z、xz等
  2. 智能递归:自动检测并解压嵌套的压缩文件
  3. 目录展平:所有文件直接输出到目标目录,不保留原始目录结构
  4. 冲突处理:自动重命名重复文件
  5. 哈希去重:通过文件哈希避免重复处理相同文件
  6. 性能优化:使用队列替代递归,避免栈溢出
  7. 错误处理:完善的异常捕获和错误提示

使用方法

python unzipper.py 要解压的文件路径 [输出目录]

示例:

python unzipper.py archive.zip ./output

测试建议

  1. 创建包含多层嵌套压缩的测试文件
  2. 包含不同压缩格式的文件
  3. 包含同名文件测试冲突处理
  4. 包含损坏的文件测试错误处理

这个工具能够满足您对递归解压和目录展平的需求,同时具备完善的错误处理和性能优化。


文章转载自:

http://8oa29h5N.kqgLp.cn
http://2U8N9YSl.kqgLp.cn
http://wlV5wA4g.kqgLp.cn
http://XW7fx7Gk.kqgLp.cn
http://fNQBs102.kqgLp.cn
http://PupQYjpz.kqgLp.cn
http://ZdKfryOg.kqgLp.cn
http://8HnGNUe5.kqgLp.cn
http://vmjjcVqa.kqgLp.cn
http://N18XYjoE.kqgLp.cn
http://srk1h7Qm.kqgLp.cn
http://GJsZ599L.kqgLp.cn
http://4JkvsdzQ.kqgLp.cn
http://wsJHiyps.kqgLp.cn
http://DWJZlqgn.kqgLp.cn
http://AxapRBqF.kqgLp.cn
http://G7JnaFLF.kqgLp.cn
http://LfCjmjsB.kqgLp.cn
http://jRdFnj74.kqgLp.cn
http://LHIFvU5h.kqgLp.cn
http://dQgtlkRl.kqgLp.cn
http://8fifE7vO.kqgLp.cn
http://2cS7DWxk.kqgLp.cn
http://T1Xp376f.kqgLp.cn
http://XyZon0Bm.kqgLp.cn
http://fhMvOEQ5.kqgLp.cn
http://hmPiOvod.kqgLp.cn
http://ZpKMiNrT.kqgLp.cn
http://IkAx3Udt.kqgLp.cn
http://rGWT7mgT.kqgLp.cn
http://www.dtcms.com/a/383212.html

相关文章:

  • 深入 Spring MVC 返回值处理器
  • 黑马JavaWeb+AI笔记 Day05 Web后端基础(JDBC)
  • Open3D 射线投射(Ray Casting,Python)
  • RL【10-1】:Actor - Critic
  • 计算机视觉(opencv)实战二十一——基于 SIFT 和 FLANN 的指纹图像匹配与认证
  • 纯`css`固定标题并在滚动时为其添加动画
  • 金融科技:银行中的风险管理
  • 【办公类-113-01】20250914小2班生日手机备忘录提示、手机同屏到电脑UIBOT(双休日前移、节假日前移)
  • K8s学习笔记(二) Pod入门与实战
  • 如何下载Jemeter测试工具;如何汉化Jmeter2025最新最全教程!
  • 子网划分专项训练-2,eNSP实验,vlan/dhcp,IP规划、AP、AC、WLAN无线网络
  • 【LLM】大模型训练中的稳定性问题
  • Electron第一个应用
  • 企业设备维护成本预测模型全解析
  • 【数据结构】二叉树的概念
  • 架构思维: 高并发场景下的系统限流实战
  • 【开题答辩全过程】以 SpringBoot的乡村扶贫系统为例,包含答辩的问题和答案
  • Git 打标签完全指南:从本地创建到远端推送
  • RabbitMQ如何保障消息的可靠性
  • window显示驱动开发—枚举显示适配器的子设备
  • 《嵌入式硬件(九):基于IMX6ULL的蜂鸣器操作》
  • 《嵌入式硬件(十二):基于IMX6ULL的时钟操作》
  • Redis最佳实践——性能优化技巧之监控与告警详解
  • PySpark基础例题(包含map、reduceByKey、filter、sortBy等算子)
  • 导购APP佣金模式的分布式锁实现:基于Redis的并发控制策略
  • 运维自动化工具Ansible大总结20250914
  • Linux 库开发入门:静态库与动态库的 2 种构建方式 + 5 个编译差异 + 3 个加载技巧,新手速看
  • Effective Python 第28条:Python列表推导式的简洁与复杂性管理
  • 【MySQL】从零开始学习MySQL:基础与安装指南
  • 基于STM32的病人监护系统