机器学习周报十四
文章目录
- 摘要
- Abstract
- 1 模型训练
- 2 pytorch入门
- 2.1 数据加载
- 2.2数据预处理
- 2.3模型训练
- 总结
摘要
本周对pytorch和扩散模型进行了学习,接着上一周的工作对扩散模型进行了训练,在此之前深入学习了pytorch,然后基于此对扩散模型进行实现。
Abstract
This week, I studied PyTorch and diffusion models, following last week’s work on training the diffusion model. Prior to that, I delved deeply into PyTorch, and based on this, I implemented the diffusion models.
1 模型训练
xt=aˉtx0+1−atˉϵ\begin{aligned}x_t=\sqrt{\bar a_t}x_0+\sqrt {1-\bar {a_t}}\epsilon\end{aligned}xt=aˉtx0+1−atˉϵ
P(xt−1∣xt)=N(1αt(xt−1−αt1−αtˉϵ),(1−αt)(1−αt−1ˉ)1−αtˉ)P(x_{t-1}|x_t) = N(\frac{1}{\sqrt{\alpha_t}}(x_t - \frac{1-\alpha_t}{\sqrt{1-\bar{\alpha_t}}}\epsilon),\frac{(1-\alpha_t)(1-\bar{\alpha_{t-1}})}{1-\bar{\alpha_t}})P(xt−1∣xt)=N(αt1(xt−1−αtˉ1−αtϵ),1−αtˉ(1−αt)(1−αt−1ˉ))
上式中不确定的变量是ϵ\epsilonϵ
对于这个变量,通过神经网络从xtx_txt中提取,xtx_txt本身就是一张图片。
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision.transforms import transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader
from tqdm import tqdm
import numpy as np# 根据时间t来取对应的系数
def extract(v, t, x_shape):out = torch.gather(v, index=t, dim=0).float()return out.view([t.shape[0]] + [1] * (len(x_shape) - 1))# 简化的UNet块
class UNetBlock(nn.Module):def __init__(self, in_channels, out_channels, time_emb_dim=None):super(UNetBlock, self).__init__()self.conv1 = nn.Conv2d(in_channels, out_channels, 3, padding=1)self.norm1 = nn.GroupNorm(8, out_channels)self.conv2 = nn.Conv2d(out_channels, out_channels, 3, padding=1)self.norm2 = nn.GroupNorm(8, out_channels)self.act = nn.SiLU()self.dropout = nn.Dropout(0.1)if time_emb_dim is not None:self.time_mlp = nn.Linear(time_emb_dim, out_channels)def forward(self, x, t_emb=None):h = self.conv1(x)h = self.norm1(h)h = self.act(h)h = self.dropout(h)h = self.conv2(h)h = self.norm2(h)if t_emb is not None and hasattr(self, 'time_mlp'):t_emb = self.time_mlp(self.act(t_emb))t_emb = t_emb.unsqueeze(-1).unsqueeze(-1)h = h + t_embh = self.act(h)return h# 简化的UNet网络
class UNet(nn.Module):def __init__(self, T=1000, base_channels=64):super(UNet, self).__init__()self.T = Tself.time_embedding = nn.Embedding(T, base_channels * 4)# 下采样路径self.down1 = UNetBlock(3, base_channels)self.down2 = UNetBlock(base_channels, base_channels * 2)self.down3 = UNetBlock(base_channels * 2, base_channels * 4)self.down4 = UNetBlock(base_channels * 4, base_channels * 8)# 瓶颈层self.bottleneck = UNetBlock(base_channels * 8, base_channels * 8)# 上采样路径self.up4 = UNetBlock(base_channels * 16, base_channels * 4) # 输入是跳跃连接+上采样self.up3 = UNetBlock(base_channels * 8, base_channels * 2)self.up2 = UNetBlock(base_channels * 4, base_channels)self.up1 = UNetBlock(base_channels * 2, base_channels)# 最终输出self.final_conv = nn.Conv2d(base_channels, 3, 1)def forward(self, x, t):# 时间嵌入t_emb = self.time_embedding(t)# 下采样d1 = self.down1(x, t_emb)d2 = self.down2(F.avg_pool2d(d1, 2), t_emb)d3 = self.down3(F.avg_pool2d(d2, 2), t_emb)d4 = self.down4(F.avg_pool2d(d3, 2), t_emb)# 瓶颈bottleneck = self.bottleneck(F.avg_pool2d(d4, 2), t_emb)# 上采样u4 = self.up4(torch.cat([F.interpolate(bottleneck, scale_factor=2), d4], dim=1), t_emb)u3 = self.up3(torch.cat([F.interpolate(u4, scale_factor=2), d3], dim=1), t_emb)u2 = self.up2(torch.cat([F.interpolate(u3, scale_factor=2), d2], dim=1), t_emb)u1 = self.up1(torch.cat([F.interpolate(u2, scale_factor=2), d1], dim=1), t_emb)return self.final_conv(u1)class GaussianDiffusionTrainer(nn.Module):def __init__(self, model, beta_1, beta_T, T):super(GaussianDiffusionTrainer, self).__init__()self.model = modelself.T = Tself.register_buffer('betas', torch.linspace(beta_1, beta_T, T).double())alphas = 1. - self.betasalphas_bar = torch.cumprod(alphas, dim=0)self.register_buffer('sqrt_alphas_bar', torch.sqrt(alphas_bar))self.register_buffer('sqrt_one_minus_alphas_bar', torch.sqrt(1. - alphas_bar))def forward(self, x_0):t = torch.randint(self.T, size=(x_0.shape[0],), device=x_0.device)noise = torch.randn_like(x_0)x_t = (extract(self.sqrt_alphas_bar, t, x_0.shape) * x_0 +extract(self.sqrt_one_minus_alphas_bar, t, x_0.shape) * noise)# 预测噪声predicted_noise = self.model(x_t, t)loss = F.mse_loss(predicted_noise, noise, reduction='none')return lossclass GaussianDiffusionSampler(nn.Module):def __init__(self, model, beta_1, beta_T, T):super(GaussianDiffusionSampler, self).__init__()self.model = modelself.T = Tself.register_buffer('betas', torch.linspace(beta_1, beta_T, T).double())alphas = 1. - self.betasalphas_bar = torch.cumprod(alphas, dim=0)# 计算反向过程需要的参数self.register_buffer('sqrt_recip_alphas_bar', torch.sqrt(1. / alphas_bar))self.register_buffer('sqrt_recipm1_alphas_bar', torch.sqrt(1. / alphas_bar - 1))self.register_buffer('posterior_var', self.betas)def p_mean_variance(self, x_t, t):# 使用模型预测噪声eps = self.model(x_t, t)# 计算均值mean = extract(self.sqrt_recip_alphas_bar, t, x_t.shape) * (x_t - extract(self.sqrt_recipm1_alphas_bar, t, x_t.shape) * eps)# 简化方差计算var = extract(self.posterior_var, t, x_t.shape)log_var = torch.log(var)return mean, log_vardef forward(self, x_T):x_t = x_Tfor time_step in tqdm(reversed(range(self.T)), desc="Inference"):t = x_t.new_ones([x_T.shape[0], ], dtype=torch.long) * time_stepmean, log_var = self.p_mean_variance(x_t, t)if time_step > 0:noise = torch.randn_like(x_t)else:noise = 0x_t = mean + torch.exp(0.5 * log_var) * noisereturn torch.clip(x_t, -1, 1)# 设置超参数
T = 1000
beta_1 = 1e-4
beta_T = 0.02
lr = 1e-4
batch_size = 32
num_workers = 4
total_epochs = 10
sample_size = 16
img_size = 32# 设置设备
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")# 准备数据集
dataset = CIFAR10(root='./data', train=True, download=True,transform=transforms.Compose([transforms.Resize((img_size, img_size)),transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
)dataloader = DataLoader(dataset, batch_size, shuffle=True, drop_last=True, num_workers=num_workers
)# 创建模型
net_model = UNet(T=T).to(device)
trainer = GaussianDiffusionTrainer(net_model, beta_1, beta_T, T).to(device)
optim = torch.optim.Adam(net_model.parameters(), lr=lr)# 训练循环
net_model.train()
for epoch in range(total_epochs):total_loss = 0pbar = tqdm(dataloader, desc=f"Epoch {epoch+1}/{total_epochs}")for batch_idx, (images, labels) in enumerate(pbar):optim.zero_grad()x_0 = images.to(device)loss = trainer(x_0).mean()loss.backward()torch.nn.utils.clip_grad_norm_(net_model.parameters(), 1.0)optim.step()total_loss += loss.item()pbar.set_postfix(loss=loss.item())avg_loss = total_loss / len(dataloader)print(f"Epoch {epoch+1}, Average Loss: {avg_loss:.4f}")# 创建采样器并生成样本
sampler = GaussianDiffusionSampler(net_model, beta_1, beta_T, T).to(device)
net_model.eval()with torch.no_grad():x_T = torch.randn(sample_size, 3, img_size, img_size).to(device)x_0 = sampler(x_T)# 转换回[0,1]范围x_0 = (x_0 + 1) / 2print(f"Generated images with shape: {x_0.shape}")
2 pytorch入门
在学习了之前的内容之后,觉得pytorch的学习是理论实践中必须熟练的一环,所以在学习完扩散模型的理论后,转入对pytorch的深入学习。
2.1 数据加载
通过torch
的Dataset和DataLoader来加载数据
import torch
from torch.utils.data import Dataset,DataLoader
from PIL import Image
import os#数据加载
class MyData(Dataset):#继承Dataset类def __init__(self,root_dir,label_dir):self.root_dir = root_dirself.label_dir = label_dirself.path = os.path.join(self.root_dir,self.label_dir)self.img_path=os.listdir(root_dir)#Dataset继承类需要实现__getitem__函数def __getitem__(self, index):img_name = self.img_path[index]img_item_path = os.path.join(self.root_dir,self.label_dir,img_name)img = Image.open(img_item_path)label = self.label_dirreturn img,labeldef __len__(self):return len(self.img_path)root_dir="D:/code/dataset/hymenoptera_data/train"
ants_label_dir="ants"
bees_label_dir="bees"
ants=MyData(root_dir,ants_label_dir)
bees=MyData(root_dir,bees_label_dir)train_dataset=ants+bees
2.2数据预处理
对数据进行处理,控制相同的大小和类型转换等操作可以方便模型的训练。
tensor=transforms.ToTensor()
#Normalize
tensor_img=tensor(img)
print(tensor_img[0][0][0])
tensor_norm=transforms.Normalize([0.5,0.5,0.5],[0.5,0.5,0.5])
img_norm=tensor_norm(tensor_img)
print(img_norm[0][0][0])
'''
Normalize:input[channel]=(input[channel]-mean[channel])/std[channel]
(input-0.5)/0.5=2*input - 1
input [0,1]
result [-1,1]
'''
Resize
print(img.size)
trans_resize=transforms.Resize((512,512))
img_resize=trans_resize(img)
img_resize=tensor(img_resize)
print(img_resize)tensor_compose=transforms.Compose([transforms.Resize(512),transforms.ToTensor()
])compose_img=tensor_compose(img)
print(compose_img)
2.3模型训练
#train.py
import torch
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
from learn_nn import *
from torch import optimdevice=torch.device("cuda" if torch.cuda.is_available() else "cpu")compose=transforms.Compose([transforms.ToTensor()])
train_set=datasets.CIFAR10("./dataset",train=True,transform=compose,download=True)
test_set=datasets.CIFAR10("./dataset",train=False,transform=compose,download=True)train_loader=DataLoader(dataset=train_set,batch_size=64,shuffle=True,drop_last=False)
test_loader=DataLoader(dataset=test_set,batch_size=64,shuffle=True,drop_last=False)
#创建网络
model=Model()
model.to(device)loss=nn.CrossEntropyLoss()
loss.to(device)learning_rate=1e-2
optimizer=optim.SGD(model.parameters(),lr=learning_rate)epochs=10
for epoch in range(epochs):for data in train_loader:imgs,targets=dataimgs,targets=imgs.to(device),targets.to(device)outputs=model(imgs)train_loss=loss(outputs,targets)optimizer.zero_grad()train_loss.backward()optimizer.step()
#learn_nn.py
#模型为CIFAIR10模型
from torch import nnclass Model(nn.Module):def __init__(self):super(Model, self).__init__()self.model=nn.Sequential(nn.Conv2d(3,32,5,1,2),nn.MaxPool2d(2),nn.Conv2d(32,32,5,1,2),nn.MaxPool2d(2),nn.Conv2d(32,64,5,1,2),nn.MaxPool2d(2),nn.Flatten(),nn.Linear(in_features=1024,out_features=64),nn.Linear(in_features=64,out_features=10),)def forward(self,x):return self.model(x)
总结
本周对扩散模型的训练过程进行了学习,学习中发现对torch的学习不够深入,于是重新对pytorch进行更多的学习。