python 实现将文件的非零数据扇区进行提取,并以偏移地址进行命名保存
import os
import sys
from datetime import datetime
def ensure_directory(directory):
“”“ensure the directory exists, create it if not”“”
if not os.path.exists(directory):
os.makedirs(directory)
print(f"crate directory: {directory}")
return directory
def find_and_save_nonzero_sectors(img_path, output_dir, handle_unit_size=512):
“find nonzero sectors and save each unit as a separate file to output directory, handle last incomplete unit”
total_size = os.path.getsize(img_path)
print(f"file size: {total_size} bytes, unit size: {handle_unit_size} bytes")
output_dir = ensure_directory(output_dir)nonzero_sectors = []
processed_count = 0
sector_count = (total_size + handle_unit_size - 1) // handle_unit_size # up to and calculated total unitswith open(img_path, 'rb') as f:for sector_num in range(sector_count):offset = sector_num * handle_unit_size# calculate actual bytes to readbytes_to_read = min(handle_unit_size, total_size - offset)# Locate and read sector dataf.seek(offset)data = f.read(bytes_to_read)# 如果读取的数据不足扇区大小,填充0if len(data) < sector_size:data += b'\x00' * (sector_size - len(data))# Check whether the sector contains non-zero dataif any(data):# generate filename:0xXXXXXXXX.binfilename = f"0x{offset:08X}.bin"filepath = os.path.join(output_dir, filename)# save sector data to file as binary filewith open(filepath, 'wb') as out_file:out_file.write(data)nonzero_sectors.append((sector_num, offset, bytes_to_read))processed_count += 1# Display the progress once every 100 sectors processedif processed_count % 100 == 0:print(f"proccessed {processed_count} units...")return nonzero_sectors, total_size
def generate_summary_report(output_dir, sectors, sector_size, original_size):
“”“Generate a summary report file containing incomplete sector information”“”
report_path = os.path.join(output_dir, “sector_report.txt”)
with open(report_path, 'w') as report:report.write(f"Disk image analysis report\n")report.write(f"generate time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")report.write(f"raw image total size: {original_size} bytes\n")report.write(f"unit size: {sector_size} bytes\n")report.write(f"non-zero unit count: {len(sectors)}\n")# Count the complete and incomplete sectorsfull_sectors = sum(1 for _, _, size in sectors if size == sector_size)partial_sectors = len(sectors) - full_sectorsreport.write("\nSector integrity statistics:\n")report.write(f"Complete sector: {full_sectors}\n")report.write(f"Incomplete sector: {partial_sectors}\n")report.write("\nDetailed sector list:\n")report.write("unit number \t offset address (hex)\t offset address (dec)\t actual byte count \n")for sector_num, offset, actual_size in sectors:size_note = f"{actual_size}" if actual_size == sector_size else f"{actual_size} (incomplete)"report.write(f"{sector_num}\t0x{offset:08X}\t{offset}\t{size_note}\n")return report_path
if name == “main”:
# if len(sys.argv) < 2:
# print(f"用法: {sys.argv[0]} <磁盘镜像文件> [输出目录] [扇区大小]“)
# print(f"示例: {sys.argv[0]} tb.img extracted_sectors 5120”)
# sys.exit(1)
img_file = 'virtual_disk.img' #sys.argv[1]
output_dir = 'extracted_sectors' #sys.argv[2] if len(sys.argv) > 2 else "extracted_sectors"try:sector_size = 524288 #int(sys.argv[3]) if len(sys.argv) > 3 else 512if sector_size % 512 != 0 or sector_size <=0 :raise ValueError("The detection unit must be a positive integer of sector size (512 bytes)")
except ValueError as e:print(f"Error: Invalid sector size - {e}")sys.exit(1)if not os.path.exists(img_file):print(f"Error : file not found - {img_file}")sys.exit(1)print(f"Start analyzing disk image: {img_file}")
print(f"output directory: {output_dir}")
print(f"checking unit size: {sector_size} bytes,{sector_size//512} sectors")try:sectors_unit,original_size = find_and_save_nonzero_sectors(img_file, output_dir, sector_size)report_path = generate_summary_report(output_dir, sectors_unit, sector_size,original_size)print(f"checked {len(sectors_unit)} no-zero units:")print(f"\nprocess completed! total found {len(sectors_unit)} non-zero units.")print(f"sector report saved to: {output_dir}")print(f"analysis report saved to: {report_path}")except Exception as e:print(f"process failed: {str(e)}")sys.exit(1)