CUDA编程 - 如何使用 CUDA 流在 GPU 设备上并发执行多个内核 - 如何应用到自己的项目中 - concurrentKernels
如何使用 CUDA 流在 GPU 设备上并发执行多个内核
- 一、完整代码与例程目的
- 1.1、通过现实场景来理解多任务协作:
- 1.2、完整代码:
- 二、代码拆解与复用
- 2.1、编程模版
一、完整代码与例程目的
项目地址:https://github.com/NVIDIA/cuda-samples/tree/v11.8/Samples/0_Introduction/concurrentKernels
此代码演示了多流并发执行多个内核、流间依赖管理,核心是通过CUDA流和事件实现以下目标:
- 并发执行多个内核:创建多个流(nstreams = nkernels + 1),每个流独立执行
clock_block
核函数。 - 流间同步:通过
cudaStreamWaitEvent
让最后一个流等待所有其他流的完成事件,确保正确依赖关系。 - 性能测量:使用cudaEventElapsedTime统计总执行时间,对比串行与并发的效率差异
1.1、通过现实场景来理解多任务协作:
场景:快餐店厨房的并发任务处理
假设一个快餐店需要同时处理多个订单,每个订单包含汉堡、薯条和饮料。
厨房有不同的工作站(流),员工需要高效协作。
- 多订单并行处理(多个CUDA流) 场景:
同时来了8个订单(对应nkernels=8),每个订单的汉堡制作由不同厨师(CUDA流)并行处理。
代码映射:每个clock_block内核
模拟一个汉堡制作任务,分配到不同的流中并发执行:
clock_block<<<1, 1,0, streams[i]>>>(&d_a[i], time_clocks);
每个流独立工作,就像不同厨师同时做汉堡,互不干扰。
- 任务完成通知(事件记录) 场景:每个厨师完成汉堡后按铃(记录事件),通知前台。
代码映射:每个流完成任务后记录事件:cudaEventRecord
(kernelEvent[i], streams[i]);
这类似于厨师按铃告知汉堡完成。
- 等待所有订单完成(事件等待) 场景:配餐员(最后一个流)需等所有汉堡做好后才能打包。
代码映射:最后一个流通过cudaStreamWaitEvent
等待所有事件:
cudaStreamWaitEvent(streams[nstreams-1], kernelEvent[i], 0);
配餐员直到所有铃响(事件完成)才开始打包。
- 汇总与交付(归约和内存拷贝) 场景:配餐员汇总所有食品,检查无误后交给顾客。
代码映射:执行sum内核对结果归约,并拷贝回主机: sum<<<1,32,0,streams[nstreams-1]>>>(d_a,nkernels); cudaMemcpyAsync(a, d_a, …, streams[nstreams-1]);
归约操作相当于统计所有汉堡是否完成,拷贝则是交付订单。
- 性能对比(时间测量) 场景:串行处理8个订单需8倍时间,而并发处理仅需1倍(理想情况下)。
代码验证:测量实际耗时,对比预期值: printf(“Measured time for sample = %.3fs\n”,elapsed_time/1000.0f);
厨房通过优化流程缩短总时间,类似GPU通过并发提升效率。
1.2、完整代码:
#include <cooperative_groups.h>
#include <stdio.h>namespace cg = cooperative_groups;
#include <helper_cuda.h>
#include <helper_functions.h>// This is a kernel that does no real work but runs at least for a specified
// number of clocks
__global__ void clock_block(clock_t *d_o, clock_t clock_count) {unsigned int start_clock = (unsigned int)clock();clock_t clock_offset = 0;while (clock_offset < clock_count) {unsigned int end_clock = (unsigned int)clock();// The code below should work like// this (thanks to modular arithmetics)://// clock_offset = (clock_t) (end_clock > start_clock ?// end_clock - start_clock :// end_clock + (0xffffffffu - start_clock));//// Indeed, let m = 2^32 then// end - start = end + m - start (mod m).clock_offset = (clock_t)(end_clock - start_clock);}d_o[0] = clock_offset;
}// Single warp reduction kernel
__global__ void sum(clock_t *d_clocks, int N) {// Handle to thread block groupcg::thread_block cta = cg::this_thread_block();__shared__ clock_t s_clocks[32];clock_t my_sum = 0;for (int i = threadIdx.x; i < N; i += blockDim.x) {my_sum += d_clocks[i];}s_clocks[threadIdx.x] = my_sum;cg::sync(cta);for (int i = 16; i > 0; i /= 2) {if (threadIdx.x < i) {s_clocks[threadIdx.x] += s_clocks[threadIdx.x + i];}cg::sync(cta);}d_clocks[0] = s_clocks[0];
}int main(int argc, char **argv) {int nkernels = 8; // number of concurrent kernelsint nstreams = nkernels + 1; // use one more stream than concurrent kernelint nbytes = nkernels * sizeof(clock_t); // number of data bytesfloat kernel_time = 10; // time the kernel should run in msfloat elapsed_time; // timing variablesint cuda_device = 0;printf("[%s] - Starting...\n", argv[0]);// get number of kernels if overridden on the command lineif (checkCmdLineFlag(argc, (const char **)argv, "nkernels")) {nkernels = getCmdLineArgumentInt(argc, (const char **)argv, "nkernels");nstreams = nkernels + 1;}// use command-line specified CUDA device, otherwise use device with highest// Gflops/scuda_device = findCudaDevice(argc, (const char **)argv);cudaDeviceProp deviceProp;checkCudaErrors(cudaGetDevice(&cuda_device));checkCudaErrors(cudaGetDeviceProperties(&deviceProp, cuda_device));if ((deviceProp.concurrentKernels == 0)) {printf("> GPU does not support concurrent kernel execution\n");printf(" CUDA kernel runs will be serialized\n");}printf("> Detected Compute SM %d.%d hardware with %d multi-processors\n",deviceProp.major, deviceProp.minor, deviceProp.multiProcessorCount);// allocate host memoryclock_t *a = 0; // pointer to the array data in host memorycheckCudaErrors(cudaMallocHost((void **)&a, nbytes));// allocate device memoryclock_t *d_a = 0; // pointers to data and init value in the device memorycheckCudaErrors(cudaMalloc((void **)&d_a, nbytes));// allocate and initialize an array of stream handlescudaStream_t *streams =(cudaStream_t *)malloc(nstreams * sizeof(cudaStream_t));for (int i = 0; i < nstreams; i++) {checkCudaErrors(cudaStreamCreate(&(streams[i])));}// create CUDA event handlescudaEvent_t start_event, stop_event;checkCudaErrors(cudaEventCreate(&start_event));checkCudaErrors(cudaEventCreate(&stop_event));// the events are used for synchronization only and hence do not need to// record timings this also makes events not introduce global sync points when// recorded which is critical to get overlapcudaEvent_t *kernelEvent;kernelEvent = (cudaEvent_t *)malloc(nkernels * sizeof(cudaEvent_t));for (int i = 0; i < nkernels; i++) {checkCudaErrors(cudaEventCreateWithFlags(&(kernelEvent[i]), cudaEventDisableTiming));}//// time execution with nkernels streamsclock_t total_clocks = 0;
#if defined(__arm__) || defined(__aarch64__)// the kernel takes more time than the channel reset time on arm archs, so to// prevent hangs reduce time_clocks.clock_t time_clocks = (clock_t)(kernel_time * (deviceProp.clockRate / 100));
#elseclock_t time_clocks = (clock_t)(kernel_time * deviceProp.clockRate);
#endifcudaEventRecord(start_event, 0);// queue nkernels in separate streams and record when they are donefor (int i = 0; i < nkernels; ++i) {clock_block<<<1, 1, 0, streams[i]>>>(&d_a[i], time_clocks);total_clocks += time_clocks;checkCudaErrors(cudaEventRecord(kernelEvent[i], streams[i]));// make the last stream wait for the kernel event to be recordedcheckCudaErrors(cudaStreamWaitEvent(streams[nstreams - 1], kernelEvent[i], 0));}// queue a sum kernel and a copy back to host in the last stream.// the commands in this stream get dispatched as soon as all the kernel events// have been recordedsum<<<1, 32, 0, streams[nstreams - 1]>>>(d_a, nkernels);checkCudaErrors(cudaMemcpyAsync(a, d_a, sizeof(clock_t), cudaMemcpyDeviceToHost, streams[nstreams - 1]));// at this point the CPU has dispatched all work for the GPU and can continue// processing other tasks in parallel// in this sample we just wait until the GPU is donecheckCudaErrors(cudaEventRecord(stop_event, 0));checkCudaErrors(cudaEventSynchronize(stop_event));checkCudaErrors(cudaEventElapsedTime(&elapsed_time, start_event, stop_event));printf("Expected time for serial execution of %d kernels = %.3fs\n", nkernels,nkernels * kernel_time / 1000.0f);printf("Expected time for concurrent execution of %d kernels = %.3fs\n",nkernels, kernel_time / 1000.0f);printf("Measured time for sample = %.3fs\n", elapsed_time / 1000.0f);bool bTestResult = (a[0] > total_clocks);// release resourcesfor (int i = 0; i < nkernels; i++) {cudaStreamDestroy(streams[i]);cudaEventDestroy(kernelEvent[i]);}free(streams);free(kernelEvent);cudaEventDestroy(start_event);cudaEventDestroy(stop_event);cudaFreeHost(a);cudaFree(d_a);if (!bTestResult) {printf("Test failed!\n");exit(EXIT_FAILURE);}printf("Test passed\n");exit(EXIT_SUCCESS);
}
二、代码拆解与复用
2.1、编程模版
可以用下面的模版来填写自己的任务,详细的注释在代码里面:
#include <cstdio>
#include <cuda_runtime.h>
#include <cooperative_groups.h> // 如需使用协作组namespace cg = cooperative_groups;// 1. 定义任务参数
const int NUM_STREAMS = 4; // 流数量(根据硬件调整)
const int NUM_KERNELS = 3; // 每个流的任务数(示例:每个流3个任务)
const size_t DATA_SIZE = 1024; // 数据大小// 2. 错误检查宏(必须包含)
#define CHECK_CUDA(call) { \cudaError_t err = call; \if (err != cudaSuccess) { \printf("CUDA Error at %s:%d - %s\n", __FILE__, __LINE__, cudaGetErrorString(err)); \exit(EXIT_FAILURE); \} \
}// 3. 核函数示例(自定义具体任务)
__global__ void exampleKernel(float* d_data, int size, int step) {int idx = blockIdx.x * blockDim.x + threadIdx.x;if (idx < size) {d_data[idx] += step; // 示例操作:数据递增}
}// 4. 主函数模板
int main() {// ===================== 初始化阶段 =====================// 4.1 分配主机和设备内存float *h_data, *d_data[NUM_STREAMS];CHECK_CUDA(cudaMallocHost(&h_data, DATA_SIZE * sizeof(float))); // 页锁定内存for (int i = 0; i < NUM_STREAMS; ++i) {CHECK_CUDA(cudaMalloc(&d_data[i], DATA_SIZE * sizeof(float)));}// 4.2 创建流和事件cudaStream_t streams[NUM_STREAMS];cudaEvent_t events[NUM_STREAMS][NUM_KERNELS]; // 每个任务一个事件for (int i = 0; i < NUM_STREAMS; ++i) {CHECK_CUDA(cudaStreamCreate(&streams[i]));for (int j = 0; j < NUM_KERNELS; ++j) {// 事件无需计时,仅用于同步CHECK_CUDA(cudaEventCreateWithFlags(&events[i][j], cudaEventDisableTiming));}}// ===================== 任务分发阶段 =====================for (int i = 0; i < NUM_STREAMS; ++i) {// 4.3 异步内存拷贝(主机到设备)CHECK_CUDA(cudaMemcpyAsync(d_data[i], h_data, DATA_SIZE * sizeof(float),cudaMemcpyHostToDevice, streams[i]));// 4.4 提交多个任务到当前流(示例:3个连续任务)for (int j = 0; j < NUM_KERNELS; ++j) {// 定义内核参数(自定义)dim3 block(256);dim3 grid((DATA_SIZE + block.x - 1) / block.x);// 执行内核(示例:每个任务增加数据)exampleKernel<<<grid, block, 0, streams[i]>>>(d_data[i], DATA_SIZE, j+1);// 记录事件,标记任务完成CHECK_CUDA(cudaEventRecord(events[i][j], streams[i]));// 可选:后续任务等待当前事件(创建依赖)// 例如:下一任务需等待当前任务完成if (j < NUM_KERNELS - 1) {CHECK_CUDA(cudaStreamWaitEvent(streams[i], events[i][j], 0));}}// 4.5 异步拷贝回主机(设备到主机)CHECK_CUDA(cudaMemcpyAsync(h_data, d_data[i], DATA_SIZE * sizeof(float),cudaMemcpyDeviceToHost, streams[i]));}// ===================== 同步与清理阶段 =====================// 5.1 同步所有流for (int i = 0; i < NUM_STREAMS; ++i) {CHECK_CUDA(cudaStreamSynchronize(streams[i]));}// 5.2 释放资源CHECK_CUDA(cudaFreeHost(h_data));for (int i = 0; i < NUM_STREAMS; ++i) {CHECK_CUDA(cudaFree(d_data[i]));CHECK_CUDA(cudaStreamDestroy(streams[i]));for (int j = 0; j < NUM_KERNELS; ++j) {CHECK_CUDA(cudaEventDestroy(events[i][j]));}}printf("Execution completed successfully.\n");return 0;
}
模板关键点说明:
-
1、参数调整区:
- NUM_STREAMS:根据GPU并发能力调整(通常等于SM数量)。
- NUM_KERNELS:每个流中的任务数,用于创建依赖链。
- DATA_SIZE:根据实际数据规模调整。
-
2、任务自定义区:
- 核函数:替换exampleKernel为实际计算任务。
- 内核配置:调整block和grid维度,优化执行配置。
-
3、依赖管理:
- 事件等待:通过cudaStreamWaitEvent显式定义任务依赖关系。
- 流水线示例:每个流的任务按顺序执行,通过事件确保顺序。
-
4、异步操作:
- 使用cudaMemcpyAsync实现数据传输与计算重叠。
- 每个流的操作独立,最大化并行度。
技术总结:
- CUDA流:类似厨房的工作站,允许多任务并行。
- 事件同步:类似订单完成的通知机制,确保依赖任务正确执行顺序。
- 性能提升:在支持并发内核的GPU上,8个clock_block可重叠执行,总时间接近单个任务耗时,而非8倍。
此模式适用于需要并行独立任务且后续步骤依赖所有结果的应用:
- 视频处理流水线:多个帧同时处理(每帧解码→滤波→编码在不同流中)。
- 数值模拟:多个独立参数组的并发计算,最后汇总结果。
- 机器学习推理:批量数据分到不同流,并行执行预处理→推理→后处理。