当前位置: 首页 > news >正文

深度集成学习不均衡样本图像分类

用五个不同的网络,然后对分类概率进行平均,得到分类结果。基本上分类精度可以提升10%

1.导入基本库

import torch
import copy
import torch.nn as nn
import torchvision.models as models
from torchvision import datasets
from torchvision import transforms
from tqdm import tqdm
from torch.utils.data import DataLoader
from torch.utils.data import random_split
from transformers import AutoModelForImageClassification,AutoConfig

2.数据集准备

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 数据预处理
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),  
])

train_dataset = datasets.ImageFolder(root='./aug_datasets1', transform=transform)
dataset_size  = len(train_dataset)

train_size = int(0.8 * dataset_size)
val_size = dataset_size - train_size

train_dataset, val_dataset = random_split(train_dataset, [train_size, val_size])


train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=32, shuffle=False)

3.定义不同模型与对应的训练策略

模型1 ResNet

class ResNet(nn.Module):
    def __init__(self, num_classes=21,train=True):
        super(ResNet, self).__init__()
        if(train):
            self.resnet = models.resnet50(weights=torchvision.models.ResNet50_Weights.IMAGENET1K_V1)
        else:
            self.resnet = models.resnet50(weights=None)
        in_features = self.resnet.fc.in_features
        self.resnet.fc = nn.Sequential(
            nn.Linear(in_features, 512),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(512, num_classes)
        )
        self.resnet.to(device)
    def forward(self, x):
        return self.resnet(x)

    # 训练策略
    def startTrain(self, train_loader, val_loader):
        criterion = nn.CrossEntropyLoss()
        optimizer = torch.optim.AdamW(self.parameters(), lr=1e-4, weight_decay=1e-4)
        scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=50)
        Best_Acc = 0.0
        print("Training ResNet.....")
        for epoch in range(10):  # 训练 10 个 epoch
            self.train()
            train_loss = 0
            for batch in tqdm(train_loader):
                images, labels = batch
                images, labels = images.to(device), labels.to(device)
                
                optimizer.zero_grad()
                # 处理图像并将其传递给模型
                logits = self(images)

                # 计算损失并进行反向传播
                loss = criterion(logits, labels)
                loss.backward()
                optimizer.step()
                

                train_loss += loss.item()
            print(f"Epoch {epoch+1}/{10}, Train Loss: {train_loss/len(train_loader)}")
            scheduler.step()
            self.eval()
            val_loss = 0
            correct = 0
            total = 0

            with torch.no_grad():
                for batch in tqdm(val_loader):
                    images, labels = batch
                    images, labels = images.to(device), labels.to(device)

                    # 处理图像并传递给模型
                    logits = self(images)

                    # 计算损失
                    loss = criterion(logits, labels)
                    val_loss += loss.item()

                    # 计算准确率
                    _, predicted = torch.max(logits, 1)
                    total += labels.size(0)
                    correct += (predicted == labels).sum().item()

            print(f"Validation Loss: {val_loss/len(val_loader)}")
            print(f"Accuracy: {100 * correct / total}%")
            if(100 * correct / total > Best_Acc):
                Best_Acc = 100 * correct / total
                torch.save(self.state_dict(), './saved/resnet/model_weights_{}.pth'.format(Best_Acc))

模型2 EfficientNet

class EfficientNet(nn.Module):
    def __init__(self, num_classes=21,train=True):
        super(EfficientNet, self).__init__()
        if(train):
            self.effnet = models.efficientnet_b2(weights=torchvision.models.EfficientNet_B2_Weights.IMAGENET1K_V1)
        else:
            self.effnet = models.efficientnet_b2(weights=None)
        
        in_features = self.effnet.classifier[1].in_features
        self.effnet.classifier = nn.Sequential(
            nn.Linear(in_features, 512),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(512, num_classes)
        )
        self.effnet.to(device)
    def forward(self, x):
        return self.effnet(x)

    # 训练策略
    def startTrain(self, train_loader, val_loader):
        # 焦点损失,gamma参数增强对少数类的关注
        criterion = nn.CrossEntropyLoss()
        optimizer = torch.optim.AdamW(self.parameters(), lr=1e-4, weight_decay=1e-4)
        scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=5)
        Best_Acc = 0.0
        print("Training EfficientNet.....")
        for epoch in range(10):  # 训练 10 个 epoch
            self.train()
            train_loss = 0
            for batch in tqdm(train_loader):
                images, labels = batch
                images, labels = images.to(device), labels.to(device)

                optimizer.zero_grad()
                # 处理图像并将其传递给模型
                logits = self(images)
                
                # 计算损失并进行反向传播
                loss = criterion(logits, labels)
                loss.backward()
                optimizer.step()
                

                train_loss += loss.item()
            print(f"Epoch {epoch+1}/{10}, Train Loss: {train_loss/len(train_loader)}")
            scheduler.step(train_loss/len(train_loader))
            self.eval()
            val_loss = 0
            correct = 0
            total = 0

            with torch.no_grad():
                for batch in tqdm(val_loader):
                    images, labels = batch
                    images, labels = images.to(device), labels.to(device)

                    # 处理图像并传递给模型
                    logits = self(images)

                    # 计算损失
                    loss = criterion(logits, labels)
                    val_loss += loss.item()

                    # 计算准确率
                    _, predicted = torch.max(logits, 1)
                    total += labels.size(0)
                    correct += (predicted == labels).sum().item()

            print(f"Validation Loss: {val_loss/len(val_loader)}")
            print(f"Accuracy: {100 * correct / total}%")
            if(100 * correct / total > Best_Acc):
                Best_Acc = 100 * correct / total
                torch.save(self.state_dict(), './saved/efficientnet/model_weights_{}.pth'.format(Best_Acc))    

模型3 DenseNet

class DenseNet(nn.Module):
    def __init__(self, num_classes=21, train=True):
        super(DenseNet, self).__init__()
        self.num_classes = num_classes
        if(train):
            self.densenet = models.densenet121(weights=torchvision.models.DenseNet121_Weights.IMAGENET1K_V1)
        else:
            self.densenet = models.densenet121(weights=None) 
        
        in_features = self.densenet.classifier.in_features
        self.densenet.classifier = nn.Sequential(
            nn.BatchNorm1d(in_features),
            nn.Linear(in_features, 512),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(512, num_classes)
        )
        self.densenet.to(device)
    def forward(self, x):
        return self.densenet(x)

    # 训练策略
    def startTrain(self, train_loader, val_loader):
        
        criterion = nn.CrossEntropyLoss()
        optimizer = torch.optim.Adam(self.parameters(), lr=1e-4)
        scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=50)
        Best_Acc = 0.0
        print("Training DenseNet.....")
        for epoch in range(10):  # 训练 10 个 epoch
            self.train()
            train_loss = 0
            for batch in tqdm(train_loader):
                images, labels = batch
                images, labels = images.to(device), labels.to(device)

                optimizer.zero_grad()
                # 处理图像并将其传递给模型
                logits = self(images)

                # 计算损失并进行反向传播
                loss = criterion(logits, labels)
                loss.backward()
                optimizer.step()
                

                train_loss += loss.item()
            print(f"Epoch {epoch+1}/{10}, Train Loss: {train_loss/len(train_loader)}")
            scheduler.step()
            self.eval()
            val_loss = 0
            correct = 0
            total = 0

            with torch.no_grad():
                for batch in tqdm(val_loader):
                    images, labels = batch
                    images, labels = images.to(device), labels.to(device)

                    # 处理图像并传递给模型
                    logits = self(images)

                    # 计算损失
                    loss = criterion(logits, labels)
                    val_loss += loss.item()

                    # 计算准确率
                    _, predicted = torch.max(logits, 1)
                    total += labels.size(0)
                    correct += (predicted == labels).sum().item()

            print(f"Validation Loss: {val_loss/len(val_loader)}")
            print(f"Accuracy: {100 * correct / total}%")
            if(100 * correct / total > Best_Acc):
                Best_Acc = 100 * correct / total
                torch.save(self.state_dict(), './saved/densenet/model_weights_{}.pth'.format(Best_Acc))        

 模型4 ResNeXt

class ResNeXt(nn.Module):

    def __init__(self, num_classes=21,train=True):
        super(ResNeXt, self).__init__()
        if(train):
            self.resnext50 = models.resnext50_32x4d(weights=torchvision.models.ResNeXt50_32X4D_Weights.IMAGENET1K_V1)
        else:
            self.resnext50 = models.resnext50_32x4d(weights=None)
        
        in_features = self.resnext50.fc.in_features
        self.resnext50.fc = nn.Sequential(
            nn.BatchNorm1d(in_features),
            nn.Linear(in_features, 512),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(512, num_classes)
        )
        self.resnext50.to(device)
              
        self.to(device)
    def forward(self, x):
        return self.resnext50(x)

    def startTrain(self, train_loader, val_loader):
        
        optimizer = torch.optim.AdamW(self.parameters(), lr=1e-4)
        scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr=5e-4, epochs=30, steps_per_epoch=len(train_loader))        
        criterion = nn.CrossEntropyLoss()
        Best_Acc = 0.0
        print("Training ResNeXt.....")
        for epoch in range(10):  # 训练 10 个 epoch
            self.train()
            train_loss = 0
            for batch in tqdm(train_loader):
                images, labels = batch
                images, labels = images.to(device), labels.to(device)

                optimizer.zero_grad()
                # 处理图像并将其传递给模型
                logits = self(images)

                # 计算损失并进行反向传播
                loss = criterion(logits, labels)
                loss.backward()
                optimizer.step()

                train_loss += loss.item()
            print(f"Epoch {epoch+1}/{10}, Train Loss: {train_loss/len(train_loader)}")
            scheduler.step(train_loss/len(train_loader))
            self.eval()
            val_loss = 0
            correct = 0
            total = 0

            with torch.no_grad():
                for batch in tqdm(val_loader):
                    images, labels = batch
                    images, labels = images.to(device), labels.to(device)

                    # 处理图像并传递给模型
                    logits = self(images)

                    # 计算损失
                    loss = criterion(logits, labels)
                    val_loss += loss.item()

                    # 计算准确率
                    _, predicted = torch.max(logits, 1)
                    total += labels.size(0)
                    correct += (predicted == labels).sum().item()

            print(f"Validation Loss: {val_loss/len(val_loader)}")
            print(f"Accuracy: {100 * correct / total}%")
            if(100 * correct / total > Best_Acc):
                Best_Acc = 100 * correct / total
                torch.save(self.state_dict(), './saved/se-resnext/model_weights_{}.pth'.format(Best_Acc))           

模型5 SwinTransformer

class SwinTransformer(nn.Module):
    def __init__(self, num_classes=21,train=True):
        super(SwinTransformer, self).__init__()
        if(train):
            self.vit = AutoModelForImageClassification.from_pretrained('./swinv2-tiny-patch4-window16-256/models--microsoft--swinv2-tiny-patch4-window16-256/snapshots/f4d3075206f2ad5eda586c30d6b4d0500f312421/')   
            #这个地方怎么写加载模型
            self.vit.classifier = nn.Sequential(
                nn.Dropout(0.5),
                nn.Linear(self.vit.classifier.in_features, num_classes)
            )
            # 冻结Swin Transformer模型中的所有层
            for param in self.vit.parameters():
                param.requires_grad = False        
            
            # 只解冻最后两个Transformer块和分类头
            for param in self.vit.swinv2.encoder.layers[-4:].parameters():  # 假设你想解冻最后两层
                param.requires_grad = True
            for param in self.vit.classifier.parameters():
                param.requires_grad = True
        else:
            # 先加载 config,然后手动修改 num_labels
            config = AutoConfig.from_pretrained('./saved/swin-transformer/')
            config.num_labels = 21
            self.vit = AutoModelForImageClassification.from_pretrained('./saved/swin-transformer/',config=config)   
        self.vit.to(device)
        
    def forward(self, x):
        return self.vit(x)

    # 训练策略
    def startTrain(self, train_loader, val_loader):
        # 使用标签平滑处理,考虑到类别是连续尺度
        criterion = nn.CrossEntropyLoss()
        # 两阶段训练策略
        # 阶段1: 只训练解冻的层
        num_epochs_stage1 = 10
        num_epochs_stage2 = 10
        optimizer_stage1 = torch.optim.AdamW([p for p in self.parameters() if p.requires_grad], lr=1e-3)

        scheduler_stage1 = torch.optim.lr_scheduler.OneCycleLR(
            optimizer_stage1, max_lr=1e-3, epochs=num_epochs_stage1, steps_per_epoch=len(train_loader)
        )
        best_model_wts = copy.deepcopy(self.state_dict())
        print("Training SwinTransformer.....") 
        print("===== Stage 1 Training =====")
        Best_Acc = 0.0
        for epoch in range(num_epochs_stage1):  # 训练 10 个 epoch
            self.train()
            train_loss = 0
            for batch in tqdm(train_loader):
                images, labels = batch
                images, labels = images.to(device), labels.to(device)

                optimizer_stage1.zero_grad()
                # 处理图像并将其传递给模型
                outputs = self(images)
                logits = outputs.logits

                # 计算损失并进行反向传播
                loss = criterion(logits, labels)
                loss.backward()
                optimizer_stage1.step()


                train_loss += loss.item()
            print(f"Epoch {epoch+1}/{10}, Train Loss: {train_loss/len(train_loader)}")
            scheduler_stage1.step()
            self.eval()
            val_loss = 0
            correct = 0
            total = 0

            with torch.no_grad():
                for batch in tqdm(val_loader):
                    images, labels = batch
                    images, labels = images.to(device), labels.to(device)

                    # 处理图像并传递给模型
                    outputs = self(images)
                    logits = outputs.logits

                    # 计算损失
                    loss = criterion(logits, labels)
                    val_loss += loss.item()

                    # 计算准确率
                    _, predicted = torch.max(logits, 1)
                    total += labels.size(0)
                    correct += (predicted == labels).sum().item()

            print(f"Validation Loss: {val_loss/len(val_loader)}")
            print(f"Accuracy: {100 * correct / total}%")
            if(100 * correct / total > Best_Acc):
                Best_Acc = 100 * correct / total
                best_model_wts = copy.deepcopy(self.state_dict())
                self.vit.save_pretrained('./saved/swin-transformer/', safe_serialization=False)       
        
        # 阶段1结束后加载最佳模型权重
        self.load_state_dict(best_model_wts)    
        Best_Acc = 0.0
        print("===== Stage 2 Training =====")
        # 阶段2: 微调整个网络
        for param in self.parameters():
            param.requires_grad = True
        optimizer_stage2 = torch.optim.Adam(self.parameters(), lr=1e-6)
        scheduler_stage2 = torch.optim.lr_scheduler.OneCycleLR(
            optimizer_stage2, max_lr=5e-6, epochs=num_epochs_stage2, steps_per_epoch=len(train_loader)
        )
        for epoch in range(num_epochs_stage2):  # 训练 10 个 epoch
            self.train()
            train_loss = 0
            for batch in tqdm(train_loader):
                images, labels = batch
                images, labels = images.to(device), labels.to(device)

                optimizer_stage2.zero_grad()
                # 处理图像并将其传递给模型
                outputs = self(images)
                logits = outputs.logits

                # 计算损失并进行反向传播
                loss = criterion(logits, labels)
                loss.backward()
                optimizer_stage2.step()
                

                train_loss += loss.item()
            print(f"Epoch {epoch+1}/{10}, Train Loss: {train_loss/len(train_loader)}")
            scheduler_stage2.step()
            self.eval()
            val_loss = 0
            correct = 0
            total = 0

            with torch.no_grad():
                for batch in tqdm(val_loader):
                    images, labels = batch
                    images, labels = images.to(device), labels.to(device)

                    # 处理图像并传递给模型
                    outputs = self(images)
                    logits = outputs.logits

                    # 计算损失
                    loss = criterion(logits, labels)
                    val_loss += loss.item()

                    # 计算准确率
                    _, predicted = torch.max(logits, 1)
                    total += labels.size(0)
                    correct += (predicted == labels).sum().item()

            print(f"Validation Loss: {val_loss/len(val_loader)}")
            print(f"Accuracy: {100 * correct / total}%")
            if(100 * correct / total > Best_Acc):
                Best_Acc = 100 * correct / total
                self.vit.save_pretrained('./saved/swin-transformer/', safe_serialization=False)       

4.分别训练,然后得到权重

    swinTransformer= SwinTransformer()
    swinTransformer.startTrain(train_dataloader,val_dataloader)
      
    efficientNet= EfficientNet()
    efficientNet.startTrain(train_dataloader,val_dataloader)

    resNet= ResNet()
    resNet.startTrain(train_dataloader,val_dataloader)
    
    resNeXt= ResNeXt()
    resNeXt.startTrain(train_dataloader,val_dataloader)
    
    denseNet= DenseNet()
    denseNet.startTrain(train_dataloader,val_dataloader)

5.构建集成分类模型

import torch
import torchvision.transforms as transforms
import torch.nn as nn
from torchvision import datasets
from torchvision import transforms
from tqdm import tqdm
from torch.utils.data import DataLoader
from torch.utils.data import random_split
from tqdm import tqdm
from PIL import Image

def remove_prefix_from_state_dict(state_dict, prefix='resnext.'):
    return {"resnext50." + k[len(prefix):] if k.startswith(prefix) else k: v for k, v in state_dict.items()}


# 定义集成模型
class EnsembleModel():
    def __init__(self, efficientNet, resNet, resNeXt, denseNet,swinTransformer):
        super(EnsembleModel, self).__init__()

        self.efficientNet= efficientNet.eval()
        self.resNet= resNet.eval()
        self.resNeXt= resNeXt.eval()
        self.denseNet= denseNet.eval()
        self.swinTransformer= swinTransformer.eval()

    def predict(self, x):
        efficientNet_out = torch.softmax(self.efficientNet(x),dim=1)
        resNet_out = torch.softmax(self.resNet(x),dim=1)
        resNeXt_out = torch.softmax(self.resNeXt(x),dim=1)
        denseNet_out = torch.softmax(self.denseNet(x),dim=1)
        swinTransformer_out = torch.softmax(self.swinTransformer(x).logits,dim=1)
        avg_pred = (efficientNet_out + resNet_out + resNeXt_out + denseNet_out + swinTransformer_out ) / 5
        return avg_pred

这样就可以提升性能

相关文章:

  • Java Functional Interface 函数式接口
  • 大文件断点续传
  • 011_异常、泛型和集合框架
  • 大数据(5)(基础概念)Spark从入门到实战:核心原理与大数据处理实战案例
  • 【算法】前缀和(下)
  • 【Django】教程-12-柱状图
  • 5.JVM-G1垃圾回收器
  • 顺序栈简记
  • 为什么选择Redis?解析核心使用场景与性能优化技巧
  • QML面试笔记--UI设计篇02布局控件
  • 山东大学计算机网络第五章习题解析
  • 虚拟表、TDgpt、JDBC 异步写入…TDengine 3.3.6.0 版本 8 大升级亮点
  • 数字电子技术基础(四十)——使用Digital软件和Multisim软件模拟显示译码器
  • PyTorch 生态迎来新成员:SGLang 高效推理引擎解析
  • JMeterPlugins-Standard-1.4.0 插件详解:安装、功能与使用指南
  • “拈彩”测试报告
  • 【力扣刷题实战】全排列II
  • JavaScript惰性加载优化实例
  • day22 学习笔记
  • 算法卷一:起行
  • 陕西网站开发价格/优帮云排名优化
  • 汽车之家网站是谁做的/网站怎样优化关键词好
  • 专业网站设计专家/站长工具seo综合查询可以访问
  • 网站建设座谈会/百度网址大全怎么设为主页
  • 邢台 网站建设/关键词优化公司排名榜
  • 如何做幸运28网站代理/百度云网盘资源