一些笔记在代码的注释中
因为使用的数据集比较简单,所以没有使用模型可视化和调试的内容,只是简单的数据集预处理和模型的搭建以及训练。
# 1. PyTorch 基础模块
import torch # 张量操作
import torch.nn as nn # 构建神经网络模块(如Conv2d, Module等)
import torch.nn.functional as F # 函数式API(如激活函数、卷积等)
import torch.optim as optim # 优化器(如Adam, SGD)
from torch.utils.data import Dataset, DataLoader # 数据加载
# 2. 图像处理和增强
import torchvision.transforms as transforms # 常用图像变换方法(ToTensor, Normalize等)
import torchvision.transforms.functional as TF # 具体变换函数(hflip, vflip等)
# 3. 读取医学图像文件(TIFF格式)
import tifffile # tif文件读取函数 tifffile.imread()
# 4. 其他辅助模块
import numpy as np # 数组和矩阵操作
import random # 随机数生成
import os
import zipfile
import requests
import matplotlib.pyplot as plt# 数据路径
data_dir = "D:\python\Pytorch"# 读取训练图像和标签
train_image_stack = tifffile.imread(os.path.join(data_dir, "train-volume.tif"))
train_label_stack = tifffile.imread(os.path.join(data_dir, "train-labels.tif"))print("图像 shape:", train_image_stack.shape) # (30, 512, 512)
print("标签 shape:", train_label_stack.shape) # (30, 512, 512)#数据集加载
class ISBIDataset(Dataset):def __init__(self,images,masks,patch_size=128,augment=True):##augment=True 这个参数通常用于控制**数据增强(data augmentation)**功能是否开启。self.images = imagesself.masks = masksself.patch_size = patch_sizeself.augment = True#作用:初始化 Dataset 对象,完成数据路径、预处理操作等的准备工作。你可以在这里加载数据列表、设置变换(transform)等。def __len__(self):return len(self.images)*10 #将一张图片分为10个patch 输出的数据的总数#作用:返回整个数据集的样本数量。DataLoader 会调用它来知道数据集有多大,从而决定迭代次数。def __getitem__(self, idx):img_idx = idx % len(self.images) #上面把一张图片分为10个patch 这里是把每10个patch的样本编码表示为同一图片image = self.images[img_idx]mask = self.masks[img_idx]#作用:根据给定的索引,返回对应的数据样本(例如图像和标签)。DataLoader 通过索引调用它,取出单个样本进行训练或推理。#数据预处理
#随即裁剪patch# 随机裁剪 patchi = random.randint(0, image.shape[0] - self.patch_size)j = random.randint(0, image.shape[1] - self.patch_size)image_patch = image[i:i + self.patch_size, j:j + self.patch_size]mask_patch = mask[i:i + self.patch_size, j:j + self.patch_size]# 转为 tensor 并归一化image_patch = torch.from_numpy(image_patch).float().unsqueeze(0) / 255.0 # (1, H, W)mask_patch = torch.from_numpy(mask_patch).float().unsqueeze(0) / 255.0mask_patch = (mask_patch > 0.5).float() # 转为0/1标签# 数据增强(水平/垂直翻转)if self.augment:if random.random() > 0.5:image_patch = TF.hflip(image_patch)mask_patch = TF.hflip(mask_patch)if random.random() > 0.5:image_patch = TF.vflip(image_patch)mask_patch = TF.vflip(mask_patch)return image_patch, mask_patch
#在面对不同的数据集时,都要根据数据集的文档或者特征使用不同的数据预处理的方法
# 创建训练数据集和 DataLoader
train_dataset = ISBIDataset(train_image_stack, train_label_stack, patch_size=128, augment=True)#这里的augment参数 是指数据增强又没有开启
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)# 测试一下 DataLoader
for img, mask in train_loader:print("图像 shape:", img.shape) # [8, 1, 128, 128]print("掩码 shape:", mask.shape) # [8, 1, 128, 128]break#卷积模块
class DoubleConv(nn.Module):def __init__(self, in_channels, out_channels):super(DoubleConv, self).__init__()self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1)self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1)self.relu = nn.ReLU(inplace=True) # 一般设置inplace=True节省内存def forward(self, x):x = self.relu(self.conv1(x))x = self.relu(self.conv2(x))return x
#每个卷积模块都是由 两层(conv+relu)组成的 方便后面使用#UNet模型核心模块
class UNet(nn.Module):def __init__(self,in_channels=1,out_channels=1):super(UNet,self).__init__()#编码器部分#连续的卷积块(Conv + ReLU + Conv + ReLU)#每个卷积块后接池化层(MaxPool)进行下采样,提取特征并降低分辨率self.down1 = DoubleConv(in_channels=1, out_channels=64)self.pool1 = nn.MaxPool2d(kernel_size=2)#池化层self.down2 = DoubleConv(64, 128)self.pool2 = nn.MaxPool2d(2)self.down3 = DoubleConv(128, 256)self.pool3 = nn.MaxPool2d(2)self.down4 = DoubleConv(256, 512)self.pool4 = nn.MaxPool2d(2)# 瓶颈层# 起到信息压缩 + 表征增强的作用# 这个位置的特征图最小(尺寸最小,语义最强)# 为解码器提供最深的上下文信息,提升分割准确性self.bottleneck = DoubleConv(512,1024)#解码器部分#使用反卷积(TransposedConv)或插值上采样#拼接对应编码器层的特征图(skipconnection)#卷积块提取融合特征self.up1 = nn.ConvTranspose2d(1024, 512, kernel_size=2, stride=2)self.dec1 = DoubleConv(1024, 512) # 拼接后通道是 512 + 512self.up2 = nn.ConvTranspose2d(512, 256, kernel_size=2, stride=2)self.dec2 = DoubleConv(512, 256)self.up3 = nn.ConvTranspose2d(256, 128, kernel_size=2, stride=2)self.dec3 = DoubleConv(256, 128)self.up4 = nn.ConvTranspose2d(128, 64, kernel_size=2, stride=2)self.dec4 = DoubleConv(128, 64)#最终卷积层1x1卷积输出预测掩码self.final = nn.Conv2d(64, out_channels, kernel_size=1)#模型的前向传播部分def forward(self,x):x1 = self.down1(x)x2 = self.down2(self.pool1(x1))x3 = self.down3(self.pool1(x2))x4 = self.down4(self.pool1(x3))x5 = self.bottleneck(self.pool1(x4))#实现跳跃连接的部分d1 = self.up1(x5) # 上采样 1024 -> 512d1 = torch.cat([d1, x4], dim=1) # 拼接编码器对应层(跳跃连接)d1 = self.dec1(d1) # -> 输出 512d2 = self.up2(d1) # 512 -> 256d2 = torch.cat([d2, x3], dim=1) #将对应的卷积层进行跳跃连接d2 = self.dec2(d2)d3 = self.up3(d2) # 256 -> 128d3 = torch.cat([d3, x2], dim=1)d3 = self.dec3(d3)d4 = self.up4(d3) # 128 -> 64d4 = torch.cat([d4, x1], dim=1)d4 = self.dec4(d4)out = self.final(d4)return out
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = UNet(in_channels=1,out_channels=1).to(device) #定义模型criterion = nn.BCEWithLogitsLoss() #损失函数
optimizer = torch.optim.Adam(model.parameters(),lr = 1e-4) #优化器#对单个训练计划的定义
def train_one_epoch(model,dataloader,optimizer,criterion,device):model.train()total_loss = 0.0for images,masks in dataloader:images = images.to(device)masks = masks.to(device)outputs = model(images)loss = criterion(outputs,masks)optimizer.zero_grad()loss.backward()optimizer.step()total_loss +=loss.item()*images.size(0)avg_loss = total_loss / len(dataloader.dataset)return avg_lossnum_epochs = 20 # 你可以根据数据大小调整for epoch in range(num_epochs):train_loss = train_one_epoch(model, train_loader, optimizer, criterion, device)print(f"Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss:.4f}")