【图像超分】python实现制作超分辨率数据集
【图像超分】python实现制作超分辨率数据集
前言
此博客针对于.264视频,其他视频文件逻辑相同
1、帧截取
将视频文件按照每x帧截取
import cv2
import os
import argparse
from pathlib import Pathdef extract_h264_frames(file_path, output_dir, interval=30, fps=25):"""从H.264视频文件中按指定间隔提取帧参数:file_path (str): .264文件路径output_dir (str): 输出图片目录interval (int): 帧间隔,默认30帧fps (int): 视频帧率,用于进度估算"""# 创建输出目录output_dir = Path(output_dir)output_dir.mkdir(parents=True, exist_ok=True)# 使用FFmpeg后端打开H.264文件cap = cv2.VideoCapture(file_path, cv2.CAP_FFMPEG)# 检查文件是否成功打开if not cap.isOpened():print(f"无法打开H.264文件: {file_path}")print("请确保已安装FFmpeg并正确配置OpenCV")returnframe_number = 0extracted_count = 0error_count = 0print(f"开始从 {file_path} 提取帧...")print(f"帧间隔: {interval},输出目录: {output_dir}")while True:# 读取一帧ret, frame = cap.read()# 检查是否读取到帧if not ret:break# 每interval帧提取一次if frame_number % interval == 0:# 生成文件名filename = f"frame_{extracted_count:06d}.jpg"output_path = output_dir / filename# 保存帧if cv2.imwrite(str(output_path), frame):extracted_count += 1# 每提取10张图片显示一次进度if extracted_count % 10 == 0:print(f"已提取 {extracted_count} 张图片")frame_number += 1# 错误处理if frame is None:error_count += 1if error_count % 10 == 0:print(f"已遇到 {error_count} 个错误帧")# 释放资源cap.release()# 输出统计信息print(f"提取完成!")print(f"总处理帧数: {frame_number}")print(f"成功提取图片: {extracted_count} 张")print(f"错误帧数量: {error_count}")print(f"所有图片已保存至: {output_dir}")if __name__ == "__main__":# 解析命令行参数parser = argparse.ArgumentParser(description='从H.264视频中按间隔提取帧并保存为图片')parser.add_argument('file_path', help='.264文件的路径')parser.add_argument('output_dir', help='保存图片的目录')parser.add_argument('--interval', type=int, default=30, help='帧间隔,默认30帧')parser.add_argument('--fps', type=int, default=25, help='视频帧率,默认25fps')args = parser.parse_args()# 调用提取函数extract_h264_frames(args.file_path, args.output_dir, args.interval, args.fps)
多线程实现双三次插值缩小图像分辨率
缩小二倍,三倍,四倍,代码中有变量控制
import numpy as np
from PIL import Image
import math
import time
import os
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor, as_completeddef cubic_kernel(x, a=-0.5):"""三次卷积核函数(线程安全)"""x = abs(x)if x <= 1:return (a + 2) * math.pow(x, 3) - (a + 3) * math.pow(x, 2) + 1elif x < 2:return a * math.pow(x, 3) - 5 * a * math.pow(x, 2) + 8 * a * x - 4 * aelse:return 0def process_single_row(row_idx, img_array, scale_factor, height, width, channels, is_gray):"""单线程处理一行像素(双三次插值核心)"""inv_scale = 1.0 / scale_factornew_width = int(width * scale_factor)row_pixels = np.zeros((new_width, channels), dtype=np.uint8) if not is_gray else np.zeros(new_width, dtype=np.uint8)# 计算垂直方向位置与权重y = row_idx * inv_scaley0 = math.floor(y)dy = y - y0v_weights = [cubic_kernel(1 + dy), cubic_kernel(dy), cubic_kernel(1 - dy), cubic_kernel(2 - dy)]# 逐列计算像素值for j in range(new_width):x = j * inv_scalex0 = math.floor(x)dx = x - x0h_weights = [cubic_kernel(1 + dx), cubic_kernel(dx), cubic_kernel(1 - dx), cubic_kernel(2 - dx)]if is_gray:pixel_value = 0.0for k in range(4):y_coord = max(0, min(y0 - 1 + k, height - 1))for l in range(4):x_coord = max(0, min(x0 - 1 + l, width - 1))pixel_value += img_array[y_coord, x_coord] * v_weights[k] * h_weights[l]row_pixels[j] = np.clip(pixel_value, 0, 255).astype(np.uint8)else:for c in range(channels):pixel_value = 0.0for k in range(4):y_coord = max(0, min(y0 - 1 + k, height - 1))for l in range(4):x_coord = max(0, min(x0 - 1 + l, width - 1))pixel_value += img_array[y_coord, x_coord, c] * v_weights[k] * h_weights[l]row_pixels[j, c] = np.clip(pixel_value, 0, 255).astype(np.uint8)return row_idx, row_pixelsdef bicubic_interpolation_multi_thread(image, scale_factor, max_workers=None):"""多线程双三次插值(单图处理核心)"""img_array = np.array(image)if len(img_array.shape) == 3:height, width, channels = img_array.shapeis_gray = Falseelse:height, width = img_array.shapechannels = 1is_gray = Truenew_height = int(height * scale_factor)new_width = int(width * scale_factor)output = np.zeros((new_height, new_width), dtype=np.uint8) if is_gray else np.zeros((new_height, new_width, channels), dtype=np.uint8)# 线程池配置(默认CPU核心数)max_workers = max_workers or os.cpu_count() or 4# 并行处理所有行with ThreadPoolExecutor(max_workers=max_workers) as executor:futures = {executor.submit(process_single_row, idx, img_array, scale_factor, height, width, channels, is_gray): idx for idx in range(new_height)}for future in as_completed(futures):row_idx, row_pixels = future.result()output[row_idx, :] = row_pixels if is_gray else row_pixels[:, :]return Image.fromarray(output)def process_single_image(input_img_path, output_dir, scale_factor, max_workers_per_img=None):"""单图处理(核心保障:输出文件名 = 输入文件名)返回:(处理结果, 输入文件名, 输出文件名, 错误信息)"""try:# 1. 提取输入图像的原始文件名(含后缀,如"frame_001.jpg")input_filename = os.path.basename(input_img_path)if not input_filename:raise ValueError("无法提取输入图像的文件名")# 2. 读取输入图像with Image.open(input_img_path) as img:img_copy = img.copy() # 避免文件句柄占用img_format = img.format # 保留原格式(确保输出格式与输入一致)# 3. 执行双三次插值缩放scaled_img = bicubic_interpolation_multi_thread(img_copy, scale_factor, max_workers_per_img)# 4. 构建输出路径(输出目录 + 原始文件名,保障名字完全一致)output_img_path = os.path.join(output_dir, input_filename)output_filename = os.path.basename(output_img_path)# 5. 验证:输出文件名必须与输入文件名完全一致(强化保障)if output_filename != input_filename:raise RuntimeError(f"文件名不一致错误!输入:{input_filename},输出:{output_filename}")# 6. 保存图像(保留原格式,避免格式转换导致的文件名后缀变化)scaled_img.save(output_img_path, format=img_format)return (True, input_filename, output_filename, "")except Exception as e:input_filename = os.path.basename(input_img_path) or "未知文件名"return (False, input_filename, "", str(e)[:150]) # 截取错误信息,避免日志过长def batch_process_images(input_dir, output_dir, scale_factor, max_workers_per_img=None, max_batch_workers=4):"""批量处理文件夹图像(核心规则:输入输出文件名完全一致)"""# 1. 基础合法性检查if not os.path.isdir(input_dir):print(f"❌ 输入路径不存在或不是文件夹:{input_dir}")return# 2. 创建输出目录(确保目录存在,避免保存失败)os.makedirs(output_dir, exist_ok=True)print(f"✅ 输出目录已准备:{os.path.abspath(output_dir)}")print(f"📌 核心规则:输出图像文件名 = 输入图像文件名(含后缀)")# 3. 筛选支持的图像文件(避免处理非图像文件)supported_formats = (".jpg", ".jpeg", ".png", ".bmp", ".tiff", ".gif") # 可扩展img_files = []for root, _, files in os.walk(input_dir): # 遍历所有子文件夹for file in files:if file.lower().endswith(supported_formats):img_files.append(os.path.join(root, file))# 4. 检查是否有可处理的图像if len(img_files) == 0:print(f"❌ 输入文件夹中未找到支持的图像文件(支持格式:{supported_formats})")returnprint(f"📊 批量任务信息:共{len(img_files)}张图像 | 缩放因子:{scale_factor}x")print(f"🔧 线程配置:单图线程数={max_workers_per_img or os.cpu_count() or 4} | 并行处理图数={max_batch_workers}")# 5. 批量并行处理(外层控制同时处理的图像数,避免内存过载)start_total = time.time()success_count = 0fail_count = 0fail_details = []print(f"\n🚀 开始批量处理(输入输出文件名严格一致)...")with ThreadPoolExecutor(max_workers=max_batch_workers) as batch_executor:# 提交所有图像处理任务futures = [batch_executor.submit(process_single_image,input_img_path=img_path,output_dir=output_dir,scale_factor=scale_factor,max_workers_per_img=max_workers_per_img) for img_path in img_files]# 实时跟踪进度with tqdm(total=len(futures), desc="批量处理进度", unit="张", ncols=100) as pbar:for future in as_completed(futures):success, input_name, output_name, error_msg = future.result()if success:success_count += 1pbar.set_postfix({"当前成功": input_name}) # 显示当前成功的文件名else:fail_count += 1fail_details.append((input_name, error_msg))pbar.update(1)# 6. 输出最终报告(明确显示文件名一致性情况)total_time = time.time() - start_totalprint(f"\n📋 批量处理完成!")print(f"⏱️ 总耗时:{total_time:.2f}秒")print(f"✅ 成功处理:{success_count}张(所有输出文件名与输入完全一致)")print(f"❌ 处理失败:{fail_count}张")# 显示失败详情(便于排查问题)if fail_count > 0:print(f"\n❌ 失败详情:")for idx, (img_name, err) in enumerate(fail_details, 1):print(f" {idx}. 文件名:{img_name} | 错误:{err}")if idx >= 10: # 最多显示10条,避免日志过长print(f" ... 还有{len(fail_details)-10}条失败记录未显示")break# -------------------------- 批量任务配置(根据需求修改) --------------------------
if __name__ == "__main__":INPUT_DIR = "dataset/hongwai_image" # 输入图像文件夹(例:之前提取的.264帧文件夹)OUTPUT_DIR = "dataset/scaled_hongwai" # 输出图像文件夹(自动创建,无需提前准备)SCALE_FACTOR = 0.25 # 缩放因子(0.25=缩小到1/4,2=放大2倍,3=放大3倍)MAX_WORKERS_PER_IMG = 8 # 单张图像处理的线程数(建议设为CPU核心数,如8/16)MAX_BATCH_WORKERS = 4 # 同时处理的图像数量(避免内存过载,建议4-8)# 执行批量处理(严格保障输入输出文件名一致)batch_process_images(input_dir=INPUT_DIR,output_dir=OUTPUT_DIR,scale_factor=SCALE_FACTOR,max_workers_per_img=MAX_WORKERS_PER_IMG,max_batch_workers=MAX_BATCH_WORKERS)
批量RGB转单通道脚本
按需使用吧
import os
from PIL import Imagedef convert_to_gray(input_dir, output_dir):"""将输入目录中的所有图片转换为单通道灰度图,并保存到输出目录参数:input_dir: 输入图片所在的文件夹路径output_dir: 处理后图片的保存路径"""# 确保输出目录存在,如果不存在则创建if not os.path.exists(output_dir):os.makedirs(output_dir)# 获取输入目录中所有的文件file_list = os.listdir(input_dir)# 支持的图片格式supported_formats = ('.jpg', '.jpeg', '.png', '.bmp', '.gif', '.tiff')# 遍历所有文件for filename in file_list:# 检查文件是否为图片if filename.lower().endswith(supported_formats):try:# 构建完整的文件路径input_path = os.path.join(input_dir, filename)# 打开图像img = Image.open(input_path)# 转换为单通道灰度图gray_img = img.convert('L') # 'L'表示8位灰度,单通道# 构建输出文件路径output_filename = f"{filename}" # 在原文件名前加上gray_前缀output_path = os.path.join(output_dir, output_filename)# 保存单通道图像gray_img.save(output_path)print(f"已处理: {filename} -> {output_filename}")except Exception as e:print(f"处理 {filename} 时出错: {str(e)}")# 示例用法
if __name__ == "__main__":# 输入文件夹路径 - 请根据实际情况修改input_directory = "C:\\Users\\27879\\jiangguolong\\dataset\\DIV2K\\DIV2K\\DIV2K_train_LR_bicubic\\X3"# 输出文件夹路径 - 请根据实际情况修改output_directory = "C:\\Users\\27879\\jiangguolong\\dataset\\DIV2K_single\\DIV2K_train_LR_bicubic\\X3"# 执行转换convert_to_gray(input_directory, output_directory)print("所有图片处理完成!")
PS:打开.264视频的脚本
import cv2
import argparse
import timedef play_h264_file(file_path, fps=25):"""播放H.264格式视频文件参数:file_path (str): .264文件路径fps (int): 视频帧率,默认25fps"""# 使用FFmpeg后端打开H.264文件# 需要系统中安装FFmpeg并确保OpenCV编译时支持FFmpegcap = cv2.VideoCapture(file_path, cv2.CAP_FFMPEG)# 检查是否成功打开文件if not cap.isOpened():print(f"无法打开文件: {file_path}")print("请确保已安装FFmpeg并正确配置OpenCV支持FFmpeg")return# 计算每帧之间的延迟时间(毫秒)frame_delay = int(1000 / fps)print("正在播放H.264视频...")print("按 'q' 键退出播放")while True:# 读取一帧ret, frame = cap.read()# 检查是否读取成功if not ret:print("视频播放完毕或读取失败")break# 显示帧cv2.imshow('H.264 Player', frame)# 控制播放速度并检查按键if cv2.waitKey(frame_delay) & 0xFF == ord('q'):print("用户退出播放")break# 释放资源cap.release()cv2.destroyAllWindows()if __name__ == "__main__":# 设置命令行参数parser = argparse.ArgumentParser(description='播放H.264格式视频文件')parser.add_argument('file_path', help='.264文件的路径')parser.add_argument('--fps', type=int, default=25, help='视频帧率,默认25fps')# 解析参数args = parser.parse_args()# 播放视频play_h264_file(args.file_path, args.fps)