当前位置: 首页 > news >正文

第J7周:ResNeXt解析

  • 🍨 本文为🔗365天深度学习训练营 中的学习记录博客
  • 🍖 原作者:K同学啊

目标

具体实现

(一)环境

语言环境:Python 3.10
编 译 器: PyCharm
框 架: Tensorflow

(二)具体步骤
1. 代码
import os  
import numpy as np  
import tensorflow as tf  
from tensorflow.keras import backend as K  
from tensorflow.keras.models import Model  
from tensorflow.keras.layers import (  Input, Conv2D, BatchNormalization, ReLU, Add, MaxPooling2D,  GlobalAveragePooling2D, Dense, Concatenate, Lambda  
)  
from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, EarlyStopping, TensorBoard  
from tensorflow.keras.preprocessing.image import ImageDataGenerator  
import matplotlib.pyplot as plt  
from datetime import datetime  
import time  # 设置GPU内存增长  
gpus = tf.config.experimental.list_physical_devices('GPU')  
if gpus:  try:  for gpu in gpus:  tf.config.experimental.set_memory_growth(gpu, True)  print(f"找到 {len(gpus)} 个GPU,已设置内存增长")  except RuntimeError as e:  print(f"设置GPU内存增长时出错: {e}")  # 设置中文字体支持  
def set_chinese_font():  """配置Matplotlib中文字体支持"""  import platform  if platform.system() == 'Windows':  plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'SimSun']  else:  # Linux/Mac  plt.rcParams['font.sans-serif'] = ['WenQuanYi Micro Hei', 'Arial Unicode MS', 'Heiti TC']  plt.rcParams['axes.unicode_minus'] = False  # 分组卷积块实现  
def grouped_convolution_block(inputs, filters, strides, groups, prefix=None):  """  实现分组卷积  参数:  - inputs: 输入张量  - filters: 过滤器数量  - strides: 步长  - groups: 分组数量  - prefix: 层名称前缀,用于避免命名冲突  返回:  - 输出张量  """    # 确保过滤器数量可以被分组数整除  assert filters % groups == 0, "过滤器数量必须能被分组数整除"  # 计算每组的过滤器数量  group_filters = filters // groups  # 初始化保存分组卷积结果的列表  group_convs = []  # 对每个组执行卷积  for group_idx in range(groups):  name = f'{prefix}_group_conv_{group_idx}' if prefix else None  group_conv = Conv2D(  group_filters,  kernel_size=(3, 3),  strides=strides,  padding='same',  use_bias=False,  name=name  )(inputs)  group_convs.append(group_conv)  # 合并所有组的卷积结果  if len(group_convs) > 1:  name = f'{prefix}_concat' if prefix else None  output = Concatenate(name=name)(group_convs)  else:  output = group_convs[0]  return output  # ResNeXt残差块  
def block(x, filters, strides=1, groups=32, conv_shortcut=True, block_id=None):  """  ResNeXt残差单元  参数:  - x: 输入张量  - filters: 过滤器数量(最终输出将是filters*2)  - strides: 步长  - groups: 分组数量  - conv_shortcut: 是否使用卷积快捷连接  - block_id: 块ID,用于唯一命名  返回:  - 输出张量  """    prefix = f'block{block_id}' if block_id is not None else None  # 快捷连接  if conv_shortcut:  shortcut_name = f'{prefix}_shortcut_conv' if prefix else None  shortcut = Conv2D(filters * 2, kernel_size=(1, 1), strides=strides,  padding='same', use_bias=False, name=shortcut_name)(x)  shortcut_bn_name = f'{prefix}_shortcut_bn' if prefix else None  shortcut = BatchNormalization(epsilon=1.001e-5, name=shortcut_bn_name)(shortcut)  else:  shortcut = x  # 三层卷积  # 第一层: 1x1卷积降维  conv1_name = f'{prefix}_conv1' if prefix else None  x = Conv2D(filters=filters, kernel_size=(1, 1), strides=1,  padding='same', use_bias=False, name=conv1_name)(x)  bn1_name = f'{prefix}_bn1' if prefix else None  x = BatchNormalization(epsilon=1.001e-5, name=bn1_name)(x)  relu1_name = f'{prefix}_relu1' if prefix else None  x = ReLU(name=relu1_name)(x)  # 第二层: 分组3x3卷积  x = grouped_convolution_block(x, filters, strides, groups, prefix=prefix)  bn2_name = f'{prefix}_bn2' if prefix else None  x = BatchNormalization(epsilon=1.001e-5, name=bn2_name)(x)  relu2_name = f'{prefix}_relu2' if prefix else None  x = ReLU(name=relu2_name)(x)  # 第三层: 1x1卷积升维  conv3_name = f'{prefix}_conv3' if prefix else None  x = Conv2D(filters=filters * 2, kernel_size=(1, 1), strides=1,  padding='same', use_bias=False, name=conv3_name)(x)  bn3_name = f'{prefix}_bn3' if prefix else None  x = BatchNormalization(epsilon=1.001e-5, name=bn3_name)(x)  # 添加残差连接  add_name = f'{prefix}_add' if prefix else None  x = Add(name=add_name)([x, shortcut])  relu3_name = f'{prefix}_relu3' if prefix else None  x = ReLU(name=relu3_name)(x)  return x  # 堆叠残差块  
def stack(x, filters, blocks, strides=1, groups=32, stack_id=None):  """  堆叠多个残差单元  参数:  - x: 输入张量  - filters: 过滤器数量  - blocks: 残差单元数量  - strides: 第一个残差单元的步长  - groups: 分组数量  - stack_id: 堆栈ID,用于唯一命名  返回:  - 输出张量  """    # 第一个残差单元可能会改变通道数和特征图大小  block_prefix = f'{stack_id}_0' if stack_id is not None else None  x = block(x, filters, strides=strides, groups=groups, block_id=block_prefix)  # 堆叠剩余的残差单元  for i in range(1, blocks):  block_prefix = f'{stack_id}_{i}' if stack_id is not None else None  x = block(x, filters, groups=groups, conv_shortcut=False, block_id=block_prefix)  return x  # 构建ResNeXt50模型  
def ResNeXt50(input_shape=(224, 224, 3), num_classes=1000, groups=32):  """  构建ResNeXt-50模型  参数:  - input_shape: 输入图像形状  - num_classes: 分类数量  - groups: 基数(分组数量)  返回:  - Keras模型  """    # 定义输入  input_tensor = Input(shape=input_shape)  # 初始卷积层  x = Conv2D(64, kernel_size=(7, 7), strides=2, padding='same',  use_bias=False, name='conv1')(input_tensor)  x = BatchNormalization(epsilon=1.001e-5, name='bn1')(x)  x = ReLU(name='relu1')(x)  # 最大池化  x = MaxPooling2D(pool_size=(3, 3), strides=2, padding='same', name='max_pool')(x)  # 四个阶段的残差块堆叠  # Stage 1  x = stack(x, 128, 3, strides=1, groups=groups, stack_id='stage1')  # Stage 2  x = stack(x, 256, 4, strides=2, groups=groups, stack_id='stage2')  # Stage 3  x = stack(x, 512, 6, strides=2, groups=groups, stack_id='stage3')  # Stage 4  x = stack(x, 1024, 3, strides=2, groups=groups, stack_id='stage4')  # 全局平均池化  x = GlobalAveragePooling2D(name='avg_pool')(x)  # 全连接分类层  x = Dense(num_classes, activation='softmax', name='fc')(x)  # 创建模型  model = Model(inputs=input_tensor, outputs=x, name='resnext50')  return model  # 创建数据生成器  
def create_data_generators(data_dir, img_size=(224, 224), batch_size=32):  """  创建训练、验证和测试数据生成器  参数:  - data_dir: 数据集根目录  - img_size: 图像大小  - batch_size: 批次大小  返回:  - train_generator: 训练数据生成器  - validation_generator: 验证数据生成器  - test_generator: 测试数据生成器  - num_classes: 类别数量  """    # 数据增强设置 - 训练集  train_datagen = ImageDataGenerator(  rescale=1. / 255,  rotation_range=20,  width_shift_range=0.2,  height_shift_range=0.2,  shear_range=0.2,  zoom_range=0.2,  horizontal_flip=True,  fill_mode='nearest'  )  # 仅进行缩放 - 验证集和测试集  valid_datagen = ImageDataGenerator(  rescale=1. / 255  )  # 路径设置  train_dir = os.path.join(data_dir, 'train')  valid_dir = os.path.join(data_dir, 'val')  test_dir = os.path.join(data_dir, 'test')  # 检查目录是否存在  if not os.path.exists(train_dir):  raise FileNotFoundError(f"训练集目录不存在: {train_dir}")  if not os.path.exists(valid_dir):  raise FileNotFoundError(f"验证集目录不存在: {valid_dir}")  # 创建生成器  train_generator = train_datagen.flow_from_directory(  train_dir,  target_size=img_size,  batch_size=batch_size,  class_mode='categorical',  shuffle=True  )  validation_generator = valid_datagen.flow_from_directory(  valid_dir,  target_size=img_size,  batch_size=batch_size,  class_mode='categorical',  shuffle=False  )  # 检查测试集  test_generator = None  if os.path.exists(test_dir):  test_generator = valid_datagen.flow_from_directory(  test_dir,  target_size=img_size,  batch_size=batch_size,  class_mode='categorical',  shuffle=False  )  print(f"测试集已加载: {test_generator.samples} 张图像")  num_classes = len(train_generator.class_indices)  print(f"类别数量: {num_classes}")  print(f"类别映射: {train_generator.class_indices}")  return train_generator, validation_generator, test_generator, num_classes  # 训练模型  
def train_model(model, train_generator, validation_generator, epochs=20, initial_epoch=0):  """  训练模型  参数:  - model: Keras模型  - train_generator: 训练数据生成器  - validation_generator: 验证数据生成器  - epochs: 总训练轮数  - initial_epoch: 初始轮数(用于断点续训)  返回:  - history: 训练历史  """    # 创建保存目录  os.makedirs('models', exist_ok=True)  os.makedirs('logs', exist_ok=True)  # 设置回调函数  callbacks = [  # 保存最佳模型  ModelCheckpoint(  filepath='models/resnext50_best.h5',  monitor='val_accuracy',  save_best_only=True,  verbose=1  ),  # 学习率调度器  ReduceLROnPlateau(  monitor='val_loss',  factor=0.5,  patience=3,  verbose=1,  min_delta=0.0001,  min_lr=1e-6  ),  # 早停  EarlyStopping(  monitor='val_loss',  patience=8,  verbose=1,  restore_best_weights=True  ),  # TensorBoard日志  TensorBoard(  log_dir=f'logs/resnext50_{datetime.now().strftime("%Y%m%d-%H%M%S")}',  histogram_freq=1  )  ]  # 编译模型  model.compile(  optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),  loss='categorical_crossentropy',  metrics=['accuracy']  )  # 设置训练步数  steps_per_epoch = train_generator.samples // train_generator.batch_size  validation_steps = validation_generator.samples // validation_generator.batch_size  # 确保至少有一个步骤  steps_per_epoch = max(1, steps_per_epoch)  validation_steps = max(1, validation_steps)  print(f"开始训练模型,共 {epochs} 轮...")  print(f"训练步数: {steps_per_epoch}, 验证步数: {validation_steps}")  # 训练模型  history = model.fit(  train_generator,  steps_per_epoch=steps_per_epoch,  epochs=epochs,  initial_epoch=initial_epoch,  validation_data=validation_generator,  validation_steps=validation_steps,  callbacks=callbacks,  verbose=1  )  # 保存最终模型  model.save('models/resnext50_final.h5')  print("训练完成,模型已保存为 'models/resnext50_final.h5'")  return history  # 评估模型  
def evaluate_model(model, generator, set_name="测试集"):  """  评估模型  参数:  - model: Keras模型  - generator: 数据生成器  - set_name: 数据集名称(用于打印)  返回:  - results: 评估结果  """    if generator is None:  print(f"{set_name}不存在,跳过评估")  return None  print(f"评估模型在{set_name}上的性能...")  steps = generator.samples // generator.batch_size  steps = max(1, steps)  # 确保至少有一个步骤  results = model.evaluate(generator, steps=steps, verbose=1)  print(f"{set_name}损失: {results[0]:.4f}")  print(f"{set_name}准确率: {results[1]:.4f}")  return results  # 绘制训练历史  
def plot_training_history(history):  """  绘制训练历史曲线  参数:  - history: 训练历史  """    set_chinese_font()  # 创建图表  fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))  # 绘制准确率曲线  ax1.plot(history.history['accuracy'], label='训练准确率', linewidth=2)  ax1.plot(history.history['val_accuracy'], label='验证准确率', linewidth=2)  ax1.set_title('模型准确率', fontsize=14)  ax1.set_ylabel('准确率', fontsize=12)  ax1.set_xlabel('轮次', fontsize=12)  ax1.grid(True, linestyle='--', alpha=0.7)  ax1.legend(loc='lower right', fontsize=10)  # 绘制损失曲线  ax2.plot(history.history['loss'], label='训练损失', linewidth=2)  ax2.plot(history.history['val_loss'], label='验证损失', linewidth=2)  ax2.set_title('模型损失', fontsize=14)  ax2.set_ylabel('损失', fontsize=12)  ax2.set_xlabel('轮次', fontsize=12)  ax2.grid(True, linestyle='--', alpha=0.7)  ax2.legend(loc='upper right', fontsize=10)  plt.tight_layout()  plt.savefig('training_history.png', dpi=120)  plt.show()  # 可视化预测结果  
def visualize_predictions(model, generator, num_images=5):  """  可视化模型预测结果  参数:  - model: Keras模型  - generator: 数据生成器  - num_images: 要显示的图像数量  """    set_chinese_font()  # 获取类别标签  class_indices = generator.class_indices  class_names = {v: k for k, v in class_indices.items()}  # 获取一批图像  x, y_true = next(generator)  # 仅使用前num_images张图像  x = x[:num_images]  y_true = y_true[:num_images]  # 预测  y_pred = model.predict(x)  # 创建图表  fig = plt.figure(figsize=(15, 10))  for i in range(num_images):  # 获取图像  img = x[i]  # 获取真实标签和预测标签  true_label = np.argmax(y_true[i])  pred_label = np.argmax(y_pred[i])  pred_prob = y_pred[i][pred_label]  # 获取类别名称  true_class_name = class_names[true_label]  pred_class_name = class_names[pred_label]  # 创建子图  plt.subplot(1, num_images, i + 1)  # 显示图像  plt.imshow(img)  # 设置标题  title_color = 'green' if true_label == pred_label else 'red'  plt.title(f"真实: {true_class_name}\n预测: {pred_class_name}\n概率: {pred_prob:.2f}",  color=title_color, fontsize=10)  plt.axis('off')  plt.tight_layout()  plt.savefig('prediction_results.png', dpi=120)  plt.show()  # 测试单张图像  
def predict_image(model, image_path, class_names, img_size=(224, 224)):  """  预测单张图像  参数:  - model: Keras模型  - image_path: 图像路径  - class_names: 类别名称字典  - img_size: 图像大小  返回:  - pred_class: 预测的类别  - confidence: 置信度  """    from tensorflow.keras.preprocessing import image  # 加载图像  img = image.load_img(image_path, target_size=img_size)  # 转换为数组  x = image.img_to_array(img)  x = np.expand_dims(x, axis=0)  x = x / 255.0  # 归一化  # 预测  preds = model.predict(x)  # 获取最高置信度的类别  pred_class_idx = np.argmax(preds[0])  confidence = preds[0][pred_class_idx]  # 获取类别名称  pred_class = class_names[pred_class_idx]  return pred_class, confidence  # 打印模型架构并显示中间特征图尺寸  
def print_model_architecture(model):  """  打印模型架构信息,包括每层输出形状  参数:  - model: Keras模型  """    # 打印模型摘要  model.summary()  # 显示每个块的输出形状  layer_outputs = []  layer_names = []  # 选择要显示的关键层  target_layers = [  'conv1', 'max_pool',  'stage1_0_add', 'stage1_2_add',  'stage2_0_add', 'stage2_3_add',  'stage3_0_add', 'stage3_5_add',  'stage4_0_add', 'stage4_2_add',  'avg_pool'  ]  print("\n关键层的输出形状:")  print("-" * 50)  print(f"{'层名称':<30} {'输出形状':<20}")  print("-" * 50)  for layer in model.layers:  if any(target_name in layer.name for target_name in target_layers):  print(f"{layer.name:<30} {str(layer.output_shape):<20}")  # 主函数  
def main():  """主函数"""  # 设置参数  DATA_DIR = './data'  IMG_SIZE = (224, 224)  BATCH_SIZE = 32  EPOCHS = 20  CARDINALITY = 32  # 获取当前设备信息  print(f"TensorFlow版本: {tf.__version__}")  print(f"使用设备: {'GPU' if tf.config.list_physical_devices('GPU') else 'CPU'}")  try:  # 创建数据生成器  print("加载数据集...")  train_generator, validation_generator, test_generator, num_classes = create_data_generators(  DATA_DIR, IMG_SIZE, BATCH_SIZE  )  # 创建模型  print(f"创建ResNeXt-50模型 (基数={CARDINALITY})...")  model = ResNeXt50(  input_shape=(IMG_SIZE[0], IMG_SIZE[1], 3),  num_classes=num_classes,  groups=CARDINALITY  )  # 显示模型架构  print_model_architecture(model)  # 计算模型参数量  trainable_params = np.sum([np.prod(v.get_shape()) for v in model.trainable_weights])  non_trainable_params = np.sum([np.prod(v.get_shape()) for v in model.non_trainable_weights])  total_params = trainable_params + non_trainable_params  print(f"模型参数数量: {total_params:,}")  print(f"可训练参数: {trainable_params:,}")  print(f"不可训练参数: {non_trainable_params:,}")  # 检查是否有已保存的模型,实现断点续训  initial_epoch = 0  if os.path.exists('models/resnext50_final.h5'):  print("找到已保存的模型,询问是否继续训练...")  choice = input("是否继续训练已保存的模型?(y/n): ")  if choice.lower() == 'y':  print("加载已保存的模型...")  model = tf.keras.models.load_model('models/resnext50_final.h5')  initial_epoch = int(input("请输入起始轮数: "))  else:  print("从头开始训练新模型...")  # 训练模型  print("开始训练模型...")  start_time = time.time()  history = train_model(model, train_generator, validation_generator, EPOCHS, initial_epoch)  training_time = time.time() - start_time  print(f"训练完成,耗时: {training_time:.2f} 秒")  # 绘制训练历史  plot_training_history(history)  # 评估验证集  evaluate_model(model, validation_generator, "验证集")  # 评估测试集  evaluate_model(model, test_generator, "测试集")  # 可视化预测结果  print("可视化预测结果...")  visualize_predictions(model, validation_generator)  # 找一张测试图像进行单独预测  if test_generator:  print("查找测试图像进行单独预测...")  # 获取测试集中的一张图像路径  test_dir = os.path.join(DATA_DIR, 'test')  class_dirs = [d for d in os.listdir(test_dir) if os.path.isdir(os.path.join(test_dir, d))]  if class_dirs:  # 选择第一个类别目录  class_dir = class_dirs[0]  class_path = os.path.join(test_dir, class_dir)  # 获取目录中的图像  images = [f for f in os.listdir(class_path) if f.endswith(('.jpg', '.jpeg', '.png'))]  if images:  # 选择第一张图像  image_path = os.path.join(class_path, images[0])  # 获取类别名称  class_indices = test_generator.class_indices  class_names = {v: k for k, v in class_indices.items()}  # 预测图像  pred_class, confidence = predict_image(model, image_path, class_names, IMG_SIZE)  print(f"测试图像路径: {image_path}")  print(f"真实类别: {class_dir}")  print(f"预测类别: {pred_class}")  print(f"预测置信度: {confidence:.4f}")  print("所有操作完成!")  except Exception as e:  print(f"发生错误: {e}")  import traceback  traceback.print_exc()  if __name__ == "__main__":  main()
2. 关于快捷链接

残差连接是ResNet和ResNeXt架构的核心创新之一,它允许信息直接从一层"跳过"到另一层,绕过中间的卷积操作。这解决了深层网络中的梯度消失问题,使得非常深的网络也能有效训练。

# 快捷连接  
if conv_shortcut:  shortcut_name = f'{prefix}_shortcut_conv' if prefix else None  shortcut = Conv2D(filters * 2, kernel_size=(1, 1), strides=strides,  padding='same', use_bias=False, name=shortcut_name)(x)  shortcut_bn_name = f'{prefix}_shortcut_bn' if prefix else None  shortcut = BatchNormalization(epsilon=1.001e-5, name=shortcut_bn_name)(shortcut)  
else:  shortcut = x

当conv_shortcut=True:当输入和输出的尺寸或通道数不匹配时,需要使用卷积型快捷连接,使用kernel_size=(1, 1)的卷积进行通道转换,将输入通道数转换为filters*2,然后使用批量归一批标准化卷积输出。
当conv_shortcut=False: 当输入和输出的尺寸和通道数完全匹配时,使用恒等型快捷连接,也就是不做任何的变换。确实这里可能会出现一个问题那就是通道数不匹配的问题,但是我们的代码是可以正常执行的,为什么呢?按我的理解:通道数不一致肯定不行。看一下残差堆叠的代码:

# 堆叠残差块  
def stack(x, filters, blocks, strides=1, groups=32, stack_id=None):  """  堆叠多个残差单元  参数:  - x: 输入张量  - filters: 过滤器数量  - blocks: 残差单元数量  - strides: 第一个残差单元的步长  - groups: 分组数量  - stack_id: 堆栈ID,用于唯一命名  返回:  - 输出张量  """    # 第一个残差单元可能会改变通道数和特征图大小  block_prefix = f'{stack_id}_0' if stack_id is not None else None  x = block(x, filters, strides=strides, groups=groups, block_id=block_prefix)  # 堆叠剩余的残差单元  for i in range(1, blocks):  block_prefix = f'{stack_id}_{i}' if stack_id is not None else None  x = block(x, filters, groups=groups, conv_shortcut=False, block_id=block_prefix)  return x

在for之前,第一个残差单元是确定的:x = block(x, filters, strides=strides, groups=groups, block_id=block_prefix) 这是通道数已经完成了转换,而后续的残差单元是通过for在生成的,它并没有改变通道数,而是使用了第一个残差单元的通道数。那么最后输出肯定也是一致的通道数。
因此我们总结:

  • 第一个残差单元总是默认conv_shortcut=True,完成了通道数的转换。
  • 前一个block的返回值成为下一个block的输入,这样保证了通道数一致。
    image.png

相关文章:

  • 【“星睿O6”AI PC开发套件评测】+ MTCNN 开源模型部署和测试对比
  • 重构 cluster-db 选择器,新增限制字段 showDb 不影响原功能前提实现查询功能增量拓展
  • CSS display: none
  • linux0.11内核源码修仙传第十六章——获取硬盘信息
  • 5G让媒体传播更快更智能——技术赋能内容新时代
  • PostgreSQL可见性映射VM
  • 高性能编程相关
  • PHP会话技术
  • 运用fmpeg写一个背英文单词的demo带翻译
  • UE5.3 C++ 房屋管理系统(一)
  • 在线caj转换word
  • 机器人运动控制技术简介
  • Rust 官方文档:人话版翻译指南
  • 动态规划-62.不同路径-力扣(LeetCode)
  • YOLO目标检测算法
  • 大模型——Trae IDE 指南:轻松配置自定义 AI 规则 (Trae Rules)
  • OR算法+ML模型混合推理框架架构演进
  • 智慧农业运维平台养殖—传感器管理监控设计—仙盟创梦IDE
  • 英伟达Blackwell架构重构未来:AI算力革命背后的技术逻辑与产业变革
  • Typora+PicGo+Gitee图床配置教程 自动图片上传
  • 要更加冷静地看待“东升西降”的判断
  • 罕见沙尘再度入川,官方:沙尘传输高度达到平流层,远超以往
  • 越怕出错越会出错,“墨菲定律”的魔咒该怎么破?
  • 浙江一民企拍地后遭政府两次违约,“民告官”三年又提起民事诉讼
  • 马上评|让“贾宝玉是长子长孙”争议回归理性讨论
  • 最快3天开通一条定制公交线路!上海推出服务平台更快响应市民需求