Quantum transfer learning学习笔记
https://pennylane.ai/qml/demos/tutorial_quantum_transfer_learning
量子迁移代码解读
1.导入的库
import time # 计时功能
import os # 操作系统接口
import copy # 对象复制
import urllib.request # 网络请求
import shutil # 文件操作import torch
import torch.nn as nn # 神经网络模块
import torch.optim as optim # 优化器
from torch.optim import lr_scheduler # 学习率调度器
import torchvision # 计算机视觉库
from torchvision import datasets, transforms # 数据集和数据预处理import pennylane as qml
from pennylane import numpy as np # PennyLane 的 NumPy(支持自动微分)torch.manual_seed(42)
np.random.seed(42)import matplotlib.pyplot as plt # 数据可视化os.environ["OMP_NUM_THREADS"] = "1"
# OpenMP 线程数设置为 1,这通常是为了:
# 避免在多线程环境中的性能问题
# 确保结果的一致性
# 在有些系统中可以防止内存溢出
2.超参数设置
n_qubits = 4 # 量子比特数量
step = 0.0004 # 学习率 (非常小,适合精细调优)
batch_size = 4 # 批大小 (每次训练步使用的样本数)
num_epochs = 3 # 训练周期数 (原文建议30,这里设为3用于测试)
q_depth = 6 # 量子电路深度 (变分层的数量)
gamma_lr_scheduler = 0.1 # 学习率调度器参数 (每10个周期学习率乘以0.1)
q_delta = 0.01 # 量子权重的初始随机分布范围
start_time = time.time() # 开始计时器
量子设备:
dev = qml.device("default.qubit", wires=n_qubits)
-
使用 PennyLane 的 默认量子比特模拟器
-
配置了 4个量子比特(对应 n_qubits=4)
-
这是一个纯经典模拟器,不是真实量子硬件
经典计算设备:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
-
自动检测可用硬件:优先使用 GPU (CUDA),否则回退到 CPU
-
这是给 PyTorch 神经网络 使用的设备
-
量子电路部分仍在 PennyLane 模拟器上运行
3.数据加载
# 这是一个包含蚂蚁和蜜蜂图像的数据集
# 数据集很小(约200张图像),对于从头训练经典或量子模型来说太小
# 但对于迁移学习方法是足够的
data_transforms = {'train': transforms.Compose([transforms.Resize(256),transforms.CenterCrop(224),transforms.ToTensor(),# 使用ImageNet的均值和标准差进行标准化transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])]),'val': transforms.Compose([transforms.Resize(256),transforms.CenterCrop(224),transforms.ToTensor(),transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])])
}
1. transforms.Resize(256)
-
作用:将图像的较短边缩放到256像素
-
保持宽高比:较长边按比例缩放
-
目的:统一图像大小,便于后续处理
2. transforms.CenterCrop(224)
-
作用:从图像中心裁剪出224×224像素的区域
-
目的:得到固定尺寸的输入,符合预训练模型的要求
3. transforms.ToTensor()
-
作用:
-
将 PIL Image 或 numpy.ndarray 转换为 PyTorch Tensor
-
将像素值从 [0, 255] 缩放到 [0.0, 1.0]
-
调整维度顺序从 (H, W, C) 到 (C, H, W)
-
4. transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
-
作用:对图像进行标准化
-
计算公式:
output = (input - mean) / std
-
参数含义:
-
均值:[0.485, 0.456, 0.406] 对应 RGB 三个通道
-
标准差:[0.229, 0.224, 0.225] 对应 RGB 三个通道
-
为什么使用这些特定的标准化参数?
这些数值是 ImageNet 数据集的统计值:
-
在数百万张 ImageNet 图像上计算得出
-
大多数预训练模型(如 ResNet、VGG)都使用这些参数
-
保持输入分布一致,确保预训练权重有效
3.1数据集下载和解压
data_dir = "hymenoptera_data"
if not os.path.exists(data_dir):urllib.request.urlretrieve("https://download.pytorch.org/tutorial/hymenoptera_data.zip", f"{data_dir}.zip")shutil.unpack_archive(f"{data_dir}.zip")
作用:
-
检查
hymenoptera_data
目录是否存在 -
如果不存在,从 PyTorch 官网下载数据集zip文件
-
解压zip文件到当前目录
3.2 创建数据集对象
image_datasets = {x if x == "train" else "validation": datasets.ImageFolder(os.path.join(data_dir, x), data_transforms[x])for x in ["train", "val"]
}
这个字典推导式想象成一个"工厂生产线":
# 这是一个完整的表达式,一次性产生结果
image_datasets = {键: 值for x in ["train", "val"] # 原材料清单
}
第1步:Python 看到完整的字典推导式
第2步:开始循环处理每个 x 值
循环1:
x = "train" # 从列表中取出第一个值
键 = "train" if "train" == "train" else "validation" # → "train"
值 = datasets.ImageFolder("hymenoptera_data/train", data_transforms["train"])
临时存储:("train", 训练集对象)
循环2:
x = "val" # 从列表中取出第二个值
键 = "val" if "val" == "train" else "validation" # → "validation"
值 = datasets.ImageFolder("hymenoptera_data/val", data_transforms["val"])
临时存储:("validation", 验证集对象)
第3步:循环结束后,用所有临时存储的键值对构建字典
x的值没改,改的是字典的键名。
image_datasets = {"train": 训练集对象, "validation": 验证集对象}
值生成部分流程:
datasets.ImageFolder(os.path.join(data_dir, x), data_transforms[x])
第一次循环:
datasets.ImageFolder(os.path.join("hymenoptera_data", "train"), data_transforms["train"])
第二次循环:
datasets.ImageFolder(os.path.join("hymenoptera_data", "val"), data_transforms["val"])
atasets.ImageFolder
的作用
这是 PyTorch 的自动数据集加载器:
-
os.path.join(data_dir, x)
:数据集路径 -
data_transforms[x]
:对应的数据预处理流程
3.3获取数据集大小
dataset_sizes = {x: len(image_datasets[x]) for x in ["train", "validation"]}
循环1:x = "train"
"train": len(image_datasets["train"])
循环2:x = "validation"
"validation": len(image_datasets["validation"])
dataset_sizes = {"train": 244, # 假设训练集有244张图片"validation": 153 # 验证集有153张图片
}
3.4获取类别名称
class_names = image_datasets["train"].classes
假设目录结构:
hymenoptera_data/
├── train/
│ ├── ants/ ← 类别1
│ └── bees/ ← 类别2
└── val/├── ants/└── bees/
结果:
class_names = ['ants', 'bees']
3.5数据加载器
dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=batch_size, shuffle=True)for x in ["train", "validation"]
}
执行结果:
dataloaders = {"train": DataLoader(训练集, batch_size=4, shuffle=True),"validation": DataLoader(验证集, batch_size=4, shuffle=True)
}
参数说明:
-
image_datasets[x]
:之前创建的数据集对象 -
batch_size=batch_size
:使用之前定义的batch_size=4
-
shuffle=True
:每个epoch随机打乱数据顺序
数据加载器作用:
# 使用示例
for inputs, labels in dataloaders["train"]:# inputs: 形状为 [4, 3, 224, 224] 的张量 (4张图片)# labels: 形状为 [4] 的张量 (4个标签)print(f"批次图像形状: {inputs.shape}")print(f"批次标签: {labels}")break # 只看第一个批次
3.6图像显示函数
def imshow(inp, title=None):"""Display image from tensor."""# 1. 张量转numpy并调整维度顺序inp = inp.numpy().transpose((1, 2, 0))# 从 [C, H, W] 变为 [H, W, C] (matplotlib需要的格式)# 2. 反标准化:恢复原始像素值mean = np.array([0.485, 0.456, 0.406])std = np.array([0.229, 0.224, 0.225])inp = std * inp + mean # 逆向操作:inp = (inp - mean) / std 的逆运算# 3. 限制像素值在 [0, 1] 范围内inp = np.clip(inp, 0, 1)# 4. 显示图像plt.imshow(inp)if title is not None:plt.title(title)
为什么需要反标准化?
因为原始图像经过了这个变换:
# 之前的标准化
normalized = (original - mean) / std# 现在的反标准化
original_like = normalized * std + mean
3.7展示测试组图像
# 使用数据加载器
inputs, classes = next(iter(dataloaders["validation"]))# 创建图像网格
out = torchvision.utils.make_grid(inputs)# 显示图像
imshow(out, title=[class_names[x] for x in classes])# 最后才定义数据加载器(这看起来顺序不对)这段应该是没有意义吧,上面定义过了
dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=batch_size, shuffle=True)for x in ["train", "validation"]
}
4.变分量子电路
4.1量子层定义
哈达玛层:
def H_layer(nqubits):for idx in range(nqubits):qml.Hadamard(wires=idx)
作用:在所有量子比特上施加哈达玛门,创建叠加态
旋转层:
def RY_layer(w):for idx, element in enumerate(w):qml.RY(element, wires=idx)
作用:每个量子比特绕Y轴旋转特定角度(参数w可训练)
纠缠层:
def entangling_layer(nqubits):for i in range(0, nqubits - 1, 2): # 偶数索引qml.CNOT(wires=[i, i + 1])for i in range(1, nqubits - 1, 2): # 奇数索引 qml.CNOT(wires=[i, i + 1])
假设我们有 6个量子比特(nqubits = 6),索引为:0, 1, 2, 3, 4, 5
第一个循环:偶数索引纠缠:
-
i = 0
:qml.CNOT(wires=[0, 1])
→ 量子比特0控制量子比特1 -
i = 2
:qml.CNOT(wires=[2, 3])
→ 量子比特2控制量子比特3 -
i = 4
:qml.CNOT(wires=[4, 5])
→ 量子比特4控制量子比特5
比特: 0 --- 1 2 --- 3 4 --- 5
第二个循环:奇数索引纠缠:
-
i = 1
:qml.CNOT(wires=[1, 2])
→ 量子比特1控制量子比特2 -
i = 3
:qml.CNOT(wires=[3, 4])
→ 量子比特3控制量子比特4
0 --- 1 --- 2 --- 3 --- 4 --- 5
4.2电路整体结构
嵌入层:
# Start from state |+>, unbiased w.r.t. |0> and |1>
H_layer(n_qubits) # 创建均匀叠加态# Embed features in the quantum node
RY_layer(q_input_features) # 根据输入数据旋转
作用:将经典数据编码到量子态中
-
H_layer
:哈达玛门,创建|+⟩ = (|0⟩ + |1⟩)/√2
状态 -
RY_layer
:用输入特征数据作为旋转角度,实现数据嵌入
变分层:
for k in range(q_depth):entangling_layer(n_qubits) # 创建量子纠缠RY_layer(q_weights[k]) # 可训练的参数化旋转
作用:通过参数化量子门学习数据特征
-
q_depth
:变分层的重复次数(控制模型复杂度) -
entangling_layer
:固定的CNOT纠缠结构 -
RY_layer(q_weights[k])
:可训练的参数,通过优化这些参数来学习
测量层:
exp_vals = [qml.expval(qml.PauliZ(position)) for position in range(n_qubits)]
return tuple(exp_vals)
作用:将量子信息转换回经典数据
-
测量每个量子比特在Z基下的期望值
-
输出长度为
n_qubits
的经典向量
5.修饰量子电路
这是一个经典-量子-经典的混合神经网络:
输入(512维) → 经典预处理 → 量子电路 → 经典后处理 → 输出(2维)
5.1初始化方法 __init__
def __init__(self):super().__init__()self.pre_net = nn.Linear(512, n_qubits) # 512 → n_qubits (如4)self.q_params = nn.Parameter(q_delta * torch.randn(q_depth * n_qubits))self.post_net = nn.Linear(n_qubits, 2) # n_qubits → 2 (蚂蚁/蜜蜂)
-
pre_net
:经典预处理层,将ResNet18的512维特征压缩到量子比特数 -
q_params
:量子电路的可训练参数 -
post_net
:经典后处理层,将量子输出映射到最终分类
-
ResNet18的最后一个卷积层有512个输出通道
5.2前向传播 forward
pre_out = self.pre_net(input_features) # 512 → n_qubits
q_in = torch.tanh(pre_out) * np.pi / 2.0
-
nn.Linear
:线性降维 -
torch.tanh
:非线性激活,将输出限制在[-1, 1] -
* np.pi / 2.0
:缩放到[-π/2, π/2]范围,适合量子旋转门
q_out = torch.Tensor(0, n_qubits)
for elem in q_in:q_out_elem = torch.hstack(quantum_net(elem, self.q_params)).float().unsqueeze(0)q_out = torch.cat((q_out, q_out_elem))
初始化: q_out = [0, 4] (空)第1个样本: elem1 → 量子电路 → [0.1, 0.2, 0.3, 0.4] → unsqueeze → [[0.1, 0.2, 0.3, 0.4]]
q_out = [[0.1, 0.2, 0.3, 0.4]]第2个样本: elem2 → 量子电路 → [0.5, 0.6, 0.7, 0.8] → unsqueeze → [[0.5, 0.6, 0.7, 0.8]]
q_out = [[0.1, 0.2, 0.3, 0.4],[0.5, 0.6, 0.7, 0.8]]第3个样本: elem3 → 量子电路 → [0.9, 1.0, 1.1, 1.2] → unsqueeze → [[0.9, 1.0, 1.1, 1.2]]
q_out = [[0.1, 0.2, 0.3, 0.4],[0.5, 0.6, 0.7, 0.8],[0.9, 1.0, 1.1, 1.2]] # 形状: [3, 4]
关键点:
-
逐样本处理:由于量子电路目前不能批量处理,需要循环处理每个样本
-
quantum_net(elem, self.q_params)
:调用之前定义的量子电路 -
torch.hstack()
:将量子电路输出组合成张量 -
unsqueeze(0)
:添加批次维度
5.3经典后处理
return self.post_net(q_out) # n_qubits → 2
5.4加载预训练的ResNet18
weights = torchvision.models.ResNet18_Weights.IMAGENET1K_V1
model_hybrid = torchvision.models.resnet18(weights=weights)
作用:
-
下载在ImageNet数据集上预训练好的ResNet18模型
-
IMAGENET1K_V1
指定使用ImageNet-1K数据集的权重 -
模型会自动下载到本地(第一次运行需要时间)
5.5冻结所有参数
for param in model_hybrid.parameters():param.requires_grad = False
关键作用:
-
requires_grad = False
表示这些参数在训练时不会计算梯度 -
不更新权重,保持ResNet18学到的通用特征提取能力
-
大幅减少训练参数数量,加速训练过程
5.6替换最后一层
model_hybrid.fc = DressedQuantumNet()
-
model_hybrid.fc
是ResNet18的最后一个全连接层 -
原始:
nn.Linear(512, 1000)
(1000个ImageNet类别) -
替换为:我们自定义的
DressedQuantumNet()
-
新的输出:2个节点(蚂蚁/蜜蜂二分类)
5.7移动到设备
model_hybrid = model_hybrid.to(device)
作用:将整个模型移动到GPU或CPU上进行计算
6.训练和结果
6.1交叉熵损失函数
criterion = nn.CrossEntropyLoss()
作用:创建交叉熵损失函数的实例,用于衡量模型预测与真实标签之间的差异。
6.2优化器定义
optimizer_hybrid = optim.Adam(model_hybrid.fc.parameters(), lr=step)
优化器选择:Adam
-
自适应学习率:为每个参数单独调整学习率
-
动量机制:结合了动量法和RMSProp的优点
-
训练稳定:适合处理噪声数据和非平稳目标
-
收敛快速:在很多深度学习任务中表现优秀
优化参数范围
关键点:只优化fc
层的参数,也就是我们的DressedQuantumNet
这意味着:
-
✅ 优化:量子修饰电路的所有参数
-
pre_net
的权重和偏置 -
q_params
量子电路参数 -
post_net
的权重和偏置
-
-
❌ 不优化:ResNet18主干网络的参数(已被冻结)
学习率设置
-
step
是一个预设的学习率值 -
在量子机器学习中,学习率通常设置得较小(如0.001、0.0001)
-
因为量子参数的梯度可能比较敏感
6.3学习率调度器定义
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_hybrid, step_size=10, gamma=gamma_lr_scheduler
)
StepLR的特点:
-
阶梯式下降:在特定epoch将学习率乘以一个系数
-
简单有效:是最常用的学习率调度策略之一
-
可控性强:下降时机和幅度都可以精确控制
step_size=10
-
每10个epoch调整一次学习率
-
在epoch 10, 20, 30, ... 时触发学习率下降
gamma=gamma_lr_scheduler
-
学习率下降的乘数因子
-
假设
gamma_lr_scheduler = 0.5
,则学习率每次减半 -
通常设置在0.1到0.5之间
6.4训练
def train_model(model, criterion, optimizer, scheduler, num_epochs):"""训练模型的完整流程Args:model: 要训练的模型criterion: 损失函数optimizer: 优化器scheduler: 学习率调度器num_epochs: 训练轮数"""since = time.time()# 初始化最佳模型权重和指标best_model_wts = copy.deepcopy(model.state_dict()) # 保存最佳模型权重best_acc = 0.0 # 最佳验证准确率best_loss = 10000.0 # 最佳验证损失(初始化为一个大数)best_acc_train = 0.0 # 最佳训练准确率best_loss_train = 10000.0 # 最佳训练损失print("Training started:")# 开始训练循环for epoch in range(num_epochs):# 每个epoch包含训练和验证两个阶段for phase in ["train", "validation"]:if phase == "train":# 设置模型为训练模式(启用dropout、batchnorm更新)model.train()else:# 设置模型为评估模式(固定dropout、batchnorm)model.eval()# 初始化统计变量running_loss = 0.0 # 累计损失running_corrects = 0 # 累计正确预测数# 迭代处理数据批次n_batches = dataset_sizes[phase] // batch_size # 计算总批次数量it = 0 # 批次计数器for inputs, labels in dataloaders[phase]:since_batch = time.time() # 记录批次开始时间batch_size_ = len(inputs) # 当前批次的实际大小inputs = inputs.to(device) # 将数据移动到设备(GPU/CPU)labels = labels.to(device)optimizer.zero_grad() # 清空梯度# 只在训练阶段计算梯度和进行优化with torch.set_grad_enabled(phase == "train"):# 前向传播outputs = model(inputs)# 获取预测结果(最大概率的类别)_, preds = torch.max(outputs, 1)# 计算损失loss = criterion(outputs, labels)# 如果是训练阶段,进行反向传播和参数更新if phase == "train":loss.backward() # 反向传播计算梯度optimizer.step() # 更新模型参数# 统计和打印迭代结果running_loss += loss.item() * batch_size_ # 累计损失(乘以批次大小)batch_corrects = torch.sum(preds == labels.data).item() # 当前批次正确数running_corrects += batch_corrects # 累计正确数# 打印当前批次进度print("Phase: {} Epoch: {}/{} Iter: {}/{} Batch time: {:.4f}".format(phase,epoch + 1,num_epochs,it + 1,n_batches + 1,time.time() - since_batch, # 批次处理时间),end="\r", # 回车不换行,实现进度条效果flush=True,)it += 1 # 更新批次计数器# 计算并打印epoch结果epoch_loss = running_loss / dataset_sizes[phase] # 平均损失epoch_acc = running_corrects / dataset_sizes[phase] # 准确率print("Phase: {} Epoch: {}/{} Loss: {:.4f} Acc: {:.4f} ".format("train" if phase == "train" else "validation ",epoch + 1,num_epochs,epoch_loss,epoch_acc,))# 检查是否是最佳模型(基于验证集性能)if phase == "validation" and epoch_acc > best_acc:best_acc = epoch_acc # 更新最佳准确率best_model_wts = copy.deepcopy(model.state_dict()) # 保存最佳权重if phase == "validation" and epoch_loss < best_loss:best_loss = epoch_loss # 更新最佳损失# 记录训练集的最佳表现(用于监控)if phase == "train" and epoch_acc > best_acc_train:best_acc_train = epoch_accif phase == "train" and epoch_loss < best_loss_train:best_loss_train = epoch_loss# 更新学习率(只在训练阶段结束时)if phase == "train":scheduler.step() # 调用学习率调度器# 训练完成,打印最终结果model.load_state_dict(best_model_wts) # 加载最佳模型权重time_elapsed = time.time() - since # 计算总训练时间print("Training completed in {:.0f}m {:.0f}s".format(time_elapsed // 60, time_elapsed % 60))print("Best test loss: {:.4f} | Best test accuracy: {:.4f}".format(best_loss, best_acc))return model # 返回训练好的最佳模型
model_hybrid = train_model(model_hybrid, criterion, optimizer_hybrid, exp_lr_scheduler, num_epochs=num_epochs
)