pytorch 数据加载加速
默认情况下,从 CPU 到 GPU 的 tensor.to(device)
操作是阻塞的(blocking/synchronous)。**
1. 核心概念:阻塞 vs. 非阻塞
在 CPU-GPU 交互的语境下:
-
阻塞(Blocking)/ 同步(Synchronous)操作:当 CPU 发出一个指令给 GPU 后(例如,传输数据),CPU 会暂停执行后续代码,直到 GPU 完成该指令并返回确认信号。CPU 在此期间处于等待状态。
- 优点:代码逻辑简单,易于调试,能保证在下一行代码执行时,数据已经准备就绪。
- 缺点:浪费 CPU 时间,无法实现计算与数据传输的重叠,限制了程序整体性能。
-
非阻塞(Non-blocking)/ 异步(Asynchronous)操作:当 CPU 发出一个指令给 GPU 后,该指令被放入一个 CUDA 流(Stream)的队列中,然后 CPU 立即继续执行后续代码,无需等待 GPU 完成。
- 优点:极大地提升效率。CPU 可以继续准备下一批数据或执行其他计算,而 GPU 则在后台并行地执行数据传输或核函数计算。这是实现高性能深度学习训练的关键。
- 缺点:需要更小心地管理同步,确保在使用数据之前,相关的异步操作已经完成。
2. tensor.to()
的行为详解
tensor.to()
的行为取决于数据的流向以及您是否使用了额外的参数。
场景一:CPU -> GPU (最常见的场景)
这是您问题最核心的场景,例如 cpu_tensor.to('cuda')
。
-
tensor.to('cuda')
或tensor.to('cuda', non_blocking=False)
(默认行为)- 行为:阻塞(Blocking)。
- 原因:默认情况下,在 CPU 上创建的
torch.Tensor
存储在**可分页内存(Pageable Memory)**中。正如在前一个关于cudaMallocHost
的回答中提到的,GPU 的 DMA 引擎无法安全地从可分页内存中进行异步传输,因为操作系统可能随时移动这块内存的物理地址。为了保证数据传输的正确性,CUDA 驱动程序必须:- 在内部创建一个临时的**锁页内存(Pinned Memory)**缓冲区。
- 将您的数据从可分页内存拷贝到这个临时缓冲区。
- 从临时缓冲区异步地将数据传输到 GPU。
- 等待传输完成,以确保整个操作的原子性和安全性。
- 这个“等待传输完成”的步骤使得整个
to()
调用在 CPU 看来是阻塞的。
-
tensor.to('cuda', non_blocking=True)
- 行为:可能非阻塞(Potentially Non-blocking)。
- 前提条件:要使此操作真正成为非阻塞的,源张量(CPU tensor)必须位于**锁页内存(Pinned Memory)**中。
- 如何实现:您需要先调用
tensor.pin_memory()
。# 1. 创建一个在可分页内存中的普通张量 pageable_tensor = torch.randn(1000, 1000)# 2. 将其转换为锁页内存版本 pinned_tensor = pageable_tensor.pin_memory()# 3. 现在,这个 to() 调用就是非阻塞的了 gpu_tensor = pinned_tensor.to('cuda', non_blocking=True)# CPU 可以立即执行下一行代码,而数据正在后台传输 print("This line executes immediately!")
- 如果源张量不在锁页内存中:即使您指定了
non_blocking=True
,PyTorch(或底层的 CUDA 驱动)通常也会忽略这个标志,操作仍然会表现为阻塞行为,因为它无法安全地执行异步拷贝。
场景二:GPU -> CPU
例如 gpu_tensor.to('cpu')
。
- 行为:隐式阻塞(Implicitly Blocking)。
- 原因:当您将数据从 GPU 移回 CPU 时,通常是为了在 CPU 上立即使用它(例如,打印、保存、转换为 NumPy 数组)。为了确保您访问数据时它已经从 GPU 完全拷贝回来,PyTorch 必须在此处进行同步。
gpu_tensor.to('cpu', non_blocking=True)
这个用法虽然语法上允许,但实际意义不大。它只会让发起拷贝的 CPU 线程不被阻塞,但任何试图访问这个新创建的 CPU 张量数据的代码,都会触发一个同步,被迫等待数据传输完成。
场景三:GPU -> GPU
例如 gpu_tensor_on_cuda0.to('cuda:1')
。
- 行为:非阻塞(Non-blocking)。
- 原因:设备之间的传输由 GPU 直接处理,并且默认是异步的。CPU 只需要向 CUDA 驱动发出指令,然后就可以继续工作了。GPU 会在适当的时候处理设备间的拷贝。
3. 代码示例与性能对比
下面的代码清晰地展示了阻塞与非阻塞操作的区别,并测量了它们的性能。
import torch
import time# 确保有可用的 CUDA 设备
if not torch.cuda.is_available():print("CUDA is not available. Exiting.")exit()device = torch.device("cuda:0")
data_size = (4096, 4096) # 创建一个较大的张量以观察时间差异# --- 1. 默认的阻塞式传输 ---
print("--- 1. Testing Default Blocking Transfer ---")
cpu_tensor = torch.randn(data_size, dtype=torch.float32)# 预热:第一次CUDA操作通常较慢,先执行一次以获得稳定计时
_ = cpu_tensor.to(device)
torch.cuda.synchronize() # 等待预热完成start_time = time.time()
gpu_tensor_blocking = cpu_tensor.to(device)
# 在这里,CPU会等待数据完全到达GPU
# 为了准确测量,我们仍然需要同步,但这只是为了确保计时终点正确
torch.cuda.synchronize()
end_time = time.time()
print(f"Blocking transfer time: {(end_time - start_time) * 1000:.4f} ms")# --- 2. 优化的非阻塞式传输 ---
print("\n--- 2. Testing Non-blocking Transfer with Pinned Memory ---")
# 关键步骤1: 将CPU张量放入锁页内存
pinned_cpu_tensor = cpu_tensor.pin_memory()# 预热
_ = pinned_cpu_tensor.to(device, non_blocking=True)
torch.cuda.synchronize()start_time = time.time()
# 关键步骤2: 使用 non_blocking=True
gpu_tensor_non_blocking = pinned_cpu_tensor.to(device, non_blocking=True)
# CPU 在这里不会等待!它可以继续执行其他任务。
# 我们可以模拟一些CPU工作
cpu_work_start = time.time()
# 模拟一个轻量级的CPU密集型任务
for _ in range(1000000):_ = 1 + 1
cpu_work_end = time.time()
print(f"CPU was able to do other work for: {(cpu_work_end - cpu_work_start) * 1000:.4f} ms while data was transferring.")# 关键步骤3: 在需要使用GPU数据之前,必须进行同步
torch.cuda.synchronize()
end_time = time.time()
print(f"Total time for non-blocking transfer (including synchronization): {(end_time - start_time) * 1000:.4f} ms")
print("Note: The total time is similar, but the key benefit is that the CPU was free to work in parallel.")# --- 3. 在 DataLoader 中的应用 ---
print("\n--- 3. Practical Application in DataLoader ---")
print("The most common and effective use of this pattern is in the DataLoader.")
print("Use `pin_memory=True` when creating a DataLoader instance.")
print("This tells the loader's worker processes to place fetched data batches directly into pinned memory.")
print("Then, in your training loop, you can use `batch.to(device, non_blocking=True)` to overlap data loading/transfer with model computation.")
# 示例:
# dataloader = torch.utils.data.DataLoader(dataset, batch_size=64, shuffle=True, num_workers=4, pin_memory=True)
# for batch, labels in dataloader:
# batch = batch.to(device, non_blocking=True)
# labels = labels.to(device, non_blocking=True)
# # Model computation can start here while the next batch is being fetched and transferred
# ...
总结与最佳实践
- 默认是阻塞的:
tensor.to('cuda')
默认是阻塞操作,因为它需要处理可分页内存,以保证安全性。 - 实现非阻塞:要实现真正的非阻塞 CPU->GPU 传输,必须满足两个条件:
- 源 CPU 张量位于锁页内存中(通过
tensor.pin_memory()
)。 - 在
.to()
调用中指定non_blocking=True
。
- 源 CPU 张量位于锁页内存中(通过
DataLoader
是关键:在实际应用中,手动调用pin_memory()
比较繁琐。最佳实践是在创建torch.utils.data.DataLoader
时设置pin_memory=True
。这会自动将每个数据批次加载到锁页内存中,使您可以在训练循环中无缝地使用非阻塞传输,从而极大地提升 GPU 的利用率和训练速度。- 同步点:请记住,异步操作并非没有代价。您必须在代码的某个点进行同步(例如,在需要使用计算结果时,或在计时结束前调用
torch.cuda.synchronize()
),以确保所有排队的 GPU 操作都已完成。PyTorch 会在需要时自动插入同步点(如tensor.item()
或tensor.cpu()
),但手动管理可以实现更精细的性能控制。