YOLO入门教程(番外):机器视觉实践—Kaggle CIFAR-10图像分类竞赛
Kaggle CIFAR-10图像分类竞赛全攻略:从数据准备到模型提交
手把手教你使用深度学习技术解决真实世界的图像分类问题
在深度学习领域,理论知识和实践能力同样重要。参加Kaggle竞赛是提升实战能力的绝佳方式,而CIFAR-10图像分类竞赛正是入门计算机视觉的理想选择。本文将带你全面了解如何从零开始解决这个经典的图像分类问题。
竞赛背景与数据集介绍
CIFAR-10数据集
CIFAR-10是一个广泛使用的图像分类数据集,包含60,000张32x32像素的彩色图像,分为10个类别:
- 飞机(airplane)
- 汽车(automobile)
- 鸟类(bird)
- 猫(cat)
- 鹿(deer)
- 狗(dog)
- 青蛙(frog)
- 马(horse)
- 船(ship)
- 卡车(truck)
数据集分为50,000张训练图像和10,000张测试图像。在Kaggle竞赛版本中,测试集扩展至300,000张图像,其中只有10,000张用于评估。
竞赛目标
开发一个能够准确分类CIFAR-10图像的深度学习模型,并在测试集上达到高精度。竞赛提交需要按照特定格式生成CSV文件,包含每个测试图像的预测类别。
环境设置与数据准备
安装必要库
import torch
import torchvision
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import pandas as pd
import os
from PIL import Image
import matplotlib.pyplot as plt
数据下载与组织
Kaggle提供了完整的数据集,但对于初步实验,我们可以使用小规模样本数据:
# 数据下载(小规模样本)
def download_data():# 这里使用小规模样本进行演示# 实际比赛中应从Kaggle下载完整数据data_dir = '../data/cifar-10/'# 创建目录结构os.makedirs(os.path.join(data_dir, 'train'), exist_ok=True)os.makedirs(os.path.join(data_dir, 'test'), exist_ok=True)return data_dirdata_dir = download_data()
数据探索
在开始建模前,先了解数据集的基本情况:
def explore_dataset(data_dir):# 读取训练标签train_labels = pd.read_csv(os.path.join(data_dir, 'trainLabels.csv'))print(f"训练样本数量: {len(train_labels)}")print("类别分布:")print(train_labels['label'].value_counts())# 显示一些样本图像fig, axes = plt.subplots(2, 5, figsize=(15, 6))for i, (idx, row) in enumerate(train_labels.head(10).iterrows()):img_path = os.path.join(data_dir, 'train', f"{row['id']}.png")img = Image.open(img_path)axes[i//5, i%5].imshow(img)axes[i//5, i%5].set_title(f"ID: {row['id']}\nLabel: {row['label']}")axes[i//5, i%5].axis('off')plt.tight_layout()plt.show()explore_dataset(data_dir)
数据预处理与增强
创建数据加载器
正确的数据预处理和增强对模型性能至关重要:
def get_transforms():# 训练数据增强train_transform = torchvision.transforms.Compose([torchvision.transforms.RandomResizedCrop(32, scale=(0.64, 1.0), ratio=(1.0, 1.0)),torchvision.transforms.RandomHorizontalFlip(),torchvision.transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),torchvision.transforms.ToTensor(),torchvision.transforms.Normalize(mean=[0.4914, 0.4822, 0.4465],std=[0.2023, 0.1994, 0.2010])])# 测试/验证转换(仅标准化)test_transform = torchvision.transforms.Compose([torchvision.transforms.ToTensor(),torchvision.transforms.Normalize(mean=[0.4914, 0.4822, 0.4465],std=[0.2023, 0.1994, 0.2010])])return train_transform, test_transformtrain_transform, test_transform = get_transforms()
数据集划分
将训练集进一步划分为训练集和验证集:
def prepare_datasets(data_dir, train_transform, test_transform, valid_ratio=0.1):# 读取所有标签labels_df = pd.read_csv(os.path.join(data_dir, 'trainLabels.csv'))# 创建训练和验证集from sklearn.model_selection import train_test_splittrain_ids, valid_ids = train_test_split(labels_df['id'], test_size=valid_ratio, stratify=labels_df['label'],random_state=42)# 创建自定义数据集类class CIFAR10Dataset(torch.utils.data.Dataset):def __init__(self, ids, labels_df, data_dir, transform=None):self.ids = idsself.labels_df = labels_df.set_index('id')self.data_dir = data_dirself.transform = transformself.class_to_idx = {cls: idx for idx, cls in enumerate(sorted(labels_df['label'].unique()))}def __len__(self):return len(self.ids)def __getitem__(self, idx):img_id = self.ids.iloc[idx]img_path = os.path.join(self.data_dir, 'train', f"{img_id}.png")image = Image.open(img_path).convert('RGB')label = self.class_to_idx[self.labels_df.loc[img_id, 'label']]if self.transform:image = self.transform(image)return image, label# 创建数据集实例train_dataset = CIFAR10Dataset(train_ids, labels_df, data_dir, train_transform)valid_dataset = CIFAR10Dataset(valid_ids, labels_df, data_dir, test_transform)return train_dataset, valid_datasettrain_dataset, valid_dataset = prepare_datasets(data_dir, train_transform, test_transform)
创建数据加载器
def create_dataloaders(train_dataset, valid_dataset, batch_size=128):train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True,num_workers=4,pin_memory=True)valid_loader = DataLoader(valid_dataset,batch_size=batch_size,shuffle=False,num_workers=4,pin_memory=True)return train_loader, valid_loaderbatch_size = 128
train_loader, valid_loader = create_dataloaders(train_dataset, valid_dataset, batch_size)
模型构建
使用预训练的ResNet模型
def create_model(num_classes=10):# 使用预训练的ResNet18作为基础模型model = torchvision.models.resnet18(pretrained=True)# 修改最后一层以适应CIFAR-10的类别数in_features = model.fc.in_featuresmodel.fc = nn.Linear(in_features, num_classes)return modelmodel = create_model()
print(f"模型参数量: {sum(p.numel() for p in model.parameters()):,}")
自定义更轻量的模型
对于CIFAR-10这样相对简单的数据集,也可以使用自定义的轻量模型:
class CIFAR10Model(nn.Module):def __init__(self, num_classes=10):super(CIFAR10Model, self).__init__()self.features = nn.Sequential(# 第一卷积块nn.Conv2d(3, 32, kernel_size=3, padding=1),nn.BatchNorm2d(32),nn.ReLU(inplace=True),nn.Conv2d(32, 32, kernel_size=3, padding=1),nn.BatchNorm2d(32),nn.ReLU(inplace=True),nn.MaxPool2d(kernel_size=2),nn.Dropout2d(0.2),# 第二卷积块nn.Conv2d(32, 64, kernel_size=3, padding=1),nn.BatchNorm2d(64),nn.ReLU(inplace=True),nn.Conv2d(64, 64, kernel_size=3, padding=1),nn.BatchNorm2d(64),nn.ReLU(inplace=True),nn.MaxPool2d(kernel_size=2),nn.Dropout2d(0.3),# 第三卷积块nn.Conv2d(64, 128, kernel_size=3, padding=1),nn.BatchNorm2d(128),nn.ReLU(inplace=True),nn.Conv2d(128, 128, kernel_size=3, padding=1),nn.BatchNorm2d(128),nn.ReLU(inplace=True),nn.MaxPool2d(kernel_size=2),nn.Dropout2d(0.4))self.classifier = nn.Sequential(nn.Linear(128 * 4 * 4, 512),nn.ReLU(inplace=True),nn.Dropout(0.5),nn.Linear(512, num_classes))def forward(self, x):x = self.features(x)x = x.view(x.size(0), -1)x = self.classifier(x)return x# 选择使用哪个模型
use_pretrained = True # 设为False使用自定义模型if use_pretrained:model = create_model()
else:model = CIFAR10Model()
训练策略
定义训练函数
def train_model(model, train_loader, valid_loader, num_epochs=50, lr=0.01):device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')model = model.to(device)# 损失函数和优化器criterion = nn.CrossEntropyLoss()optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9, weight_decay=5e-4)# 学习率调度器scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=num_epochs)# 记录训练过程history = {'train_loss': [], 'train_acc': [], 'val_loss': [], 'val_acc': []}for epoch in range(num_epochs):# 训练阶段model.train()running_loss = 0.0correct = 0total = 0for inputs, labels in train_loader:inputs, labels = inputs.to(device), labels.to(device)optimizer.zero_grad()outputs = model(inputs)loss = criterion(outputs, labels)loss.backward()optimizer.step()running_loss += loss.item()_, predicted = outputs.max(1)total += labels.size(0)correct += predicted.eq(labels).sum().item()train_loss = running_loss / len(train_loader)train_acc = 100. * correct / total# 验证阶段model.eval()val_loss = 0.0val_correct = 0val_total = 0with torch.no_grad():for inputs, labels in valid_loader:inputs, labels = inputs.to(device), labels.to(device)outputs = model(inputs)loss = criterion(outputs, labels)val_loss += loss.item()_, predicted = outputs.max(1)val_total += labels.size(0)val_correct += predicted.eq(labels).sum().item()val_loss = val_loss / len(valid_loader)val_acc = 100. * val_correct / val_total# 更新学习率scheduler.step()# 记录历史history['train_loss'].append(train_loss)history['train_acc'].append(train_acc)history['val_loss'].append(val_loss)history['val_acc'].append(val_acc)print(f'Epoch [{epoch+1}/{num_epochs}]')print(f'Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%')print(f'Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%')print('-' * 50)return model, history# 开始训练
trained_model, history = train_model(model, train_loader, valid_loader)
可视化训练过程
def plot_training_history(history):fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))# 绘制损失曲线ax1.plot(history['train_loss'], label='Training Loss')ax1.plot(history['val_loss'], label='Validation Loss')ax1.set_title('Loss Curve')ax1.set_xlabel('Epoch')ax1.set_ylabel('Loss')ax1.legend()# 绘制准确率曲线ax2.plot(history['train_acc'], label='Training Accuracy')ax2.plot(history['val_acc'], label='Validation Accuracy')ax2.set_title('Accuracy Curve')ax2.set_xlabel('Epoch')ax2.set_ylabel('Accuracy (%)')ax2.legend()plt.tight_layout()plt.show()plot_training_history(history)
模型评估与优化
计算详细评估指标
from sklearn.metrics import classification_report, confusion_matrix
import seaborn as snsdef evaluate_model(model, dataloader, class_names):device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')model.eval()all_preds = []all_labels = []with torch.no_grad():for inputs, labels in dataloader:inputs = inputs.to(device)outputs = model(inputs)_, preds = torch.max(outputs, 1)all_preds.extend(preds.cpu().numpy())all_labels.extend(labels.numpy())# 分类报告print("分类报告:")print(classification_report(all_labels, all_preds, target_names=class_names))# 混淆矩阵cm = confusion_matrix(all_labels, all_preds)plt.figure(figsize=(10, 8))sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',xticklabels=class_names, yticklabels=class_names)plt.title('Confusion Matrix')plt.ylabel('True Label')plt.xlabel('Predicted Label')plt.show()# 类别名称
class_names = ['airplane', 'automobile', 'bird', 'cat', 'deer','dog', 'frog', 'horse', 'ship', 'truck']evaluate_model(trained_model, valid_loader, class_names)
模型优化技巧
def advanced_training_strategy(model, train_loader, valid_loader):device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')model = model.to(device)# 使用更先进的优化器optimizer = optim.AdamW(model.parameters(), lr=0.001, weight_decay=0.01)# 使用余弦退火学习率调度scheduler = optim.lr_scheduler.CosineAnnealingWarmRestarts(optimizer, T_0=10, T_mult=2)# 使用标签平滑criterion = nn.CrossEntropyLoss(label_smoothing=0.1)# 添加早停机制best_acc = 0patience = 5counter = 0history = {'train_loss': [], 'val_loss': [], 'val_acc': []}for epoch in range(50):# 训练代码...# 验证代码...# 早停检查if val_acc > best_acc:best_acc = val_acccounter = 0# 保存最佳模型torch.save(model.state_dict(), 'best_model.pth')else:counter += 1if counter >= patience:print(f"早停在第{epoch+1}轮")breakreturn model, history
测试集预测与提交
生成预测结果
def predict_test_set(model, test_dir, transform):device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')model.eval()# 获取测试图像文件列表test_files = sorted(os.listdir(os.path.join(test_dir, 'test')),key=lambda x: int(x.split('.')[0]))predictions = []with torch.no_grad():for file_name in test_files:img_path = os.path.join(test_dir, 'test', file_name)image = Image.open(img_path).convert('RGB')image = transform(image).unsqueeze(0).to(device)output = model(image)_, pred = torch.max(output, 1)predictions.append(pred.item())return predictions, test_files# 生成预测
test_predictions, test_files = predict_test_set(trained_model, data_dir, test_transform
)
创建提交文件
def create_submission_file(predictions, file_names, class_names, output_path='submission.csv'):# 将数字预测转换为类别名称predicted_labels = [class_names[pred] for pred in predictions]# 提取图像ID(去掉.png扩展名)image_ids = [int(fname.split('.')[0]) for fname in file_names]# 创建提交DataFramesubmission_df = pd.DataFrame({'id': image_ids,'label': predicted_labels})# 按ID排序submission_df = submission_df.sort_values('id')# 保存为CSV文件submission_df.to_csv(output_path, index=False)print(f"提交文件已保存至: {output_path}")print(f"共包含 {len(submission_df)} 个预测")return submission_dfsubmission_df = create_submission_file(test_predictions, test_files, class_names)
提交文件验证
def validate_submission(submission_path):submission = pd.read_csv(submission_path)print("提交文件预览:")print(submission.head())print(f"\n总提交数量: {len(submission)}")print("类别分布:")print(submission['label'].value_counts())# 检查ID是否连续且完整expected_ids = set(range(1, len(submission) + 1))actual_ids = set(submission['id'])if expected_ids == actual_ids:print("✓ ID检查通过: 所有ID都存在且连续")else:print("✗ ID检查失败: 存在缺失或重复的ID")# 检查所有预测类别是否有效valid_classes = set(class_names)predicted_classes = set(submission['label'])if predicted_classes.issubset(valid_classes):print("✓ 类别检查通过: 所有预测类别都是有效的")else:invalid = predicted_classes - valid_classesprint(f"✗ 类别检查失败: 发现无效类别 {invalid}")validate_submission('submission.csv')
高级技巧与改进建议
集成学习
def create_ensemble(models, dataloader):"""创建模型集成预测"""device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')all_predictions = []for model in models:model.eval()model_predictions = []with torch.no_grad():for inputs, _ in dataloader:inputs = inputs.to(device)outputs = model(inputs)_, preds = torch.max(outputs, 1)model_predictions.extend(preds.cpu().numpy())all_predictions.append(model_predictions)# 多数投票集成ensemble_preds = []for i in range(len(all_predictions[0])):votes = [preds[i] for preds in all_predictions]ensemble_preds.append(max(set(votes), key=votes.count))return ensemble_preds
数据增强进阶
def get_advanced_transforms():"""更高级的数据增强策略"""train_transform = torchvision.transforms.Compose([torchvision.transforms.RandomResizedCrop(32, scale=(0.8, 1.0)),torchvision.transforms.RandomHorizontalFlip(),torchvision.transforms.RandomRotation(15),torchvision.transforms.ColorJitter(brightness=0.3, contrast=0.3, saturation=0.3, hue=0.1),torchvision.transforms.RandomAffine(degrees=0, translate=(0.1, 0.1), scale=(0.9, 1.1)),torchvision.transforms.ToTensor(),torchvision.transforms.Normalize(mean=[0.4914, 0.4822, 0.4465],std=[0.2023, 0.1994, 0.2010]),torchvision.transforms.RandomErasing(p=0.5, scale=(0.02, 0.1))])return train_transform
总结与后续步骤
关键学习点
通过本次CIFAR-10竞赛实践,我们掌握了:
- 数据预处理:正确的数据组织、增强和标准化
- 模型选择:预训练模型与自定义模型的权衡
- 训练策略:学习率调度、正则化和早停机制
- 评估方法:多维度模型性能评估
- 竞赛流程:从数据准备到结果提交的完整流程
进一步提高性能的建议
- 使用更大的模型:如ResNet-50、EfficientNet等
- 尝试不同的优化器:AdamW、RAdam等
- 应用更复杂的数据增强:MixUp、CutMix等
- 使用模型集成:多个模型的预测结果融合
- 尝试自监督预训练:在无标签数据上进行预训练
后续学习方向
- 参加更多Kaggle竞赛:如Dogs vs Cats、Plant Seedlings等
- 探索其他计算机视觉任务:目标检测、语义分割等
- 学习模型部署:将训练好的模型部署到生产环境
- 深入研究模型解释性:理解模型如何做出决策
通过本次实战,你不仅学会了如何解决一个具体的图像分类问题,更重要的是掌握了深度学习的完整工作流程。这些技能将为你解决更复杂的计算机视觉问题奠定坚实的基础。
记住,在机器学习领域,实践是最好的老师。不断尝试新方法、学习新技术,你将在深度学习道路上越走越远!