当前位置: 首页 > news >正文

CUDA Stream 回调函数示例代码

文章目录

  • CUDA Stream 回调函数示例代码
    • 基本概念
    • 示例代码
    • 代码解释
    • 回调函数的特点
    • 更复杂的示例:多个回调
    • 注意事项
  • CUDA Stream 回调函数中使用 MPI 或 NCCL
    • 示例程序
    • 注意事项

CUDA Stream 回调函数示例代码

CUDA 中的流回调函数(stream callback)是一种在 CUDA 流中插入异步回调的机制,它允许你在流的特定位置插入一个主机端函数调用。回调函数会在流中所有前面的操作都完成后被调用。

基本概念

  • 回调函数: 一个在主机上执行的函数,当流中前面的所有操作都完成后被调用
  • 异步执行: 回调不会阻塞主机线程
  • 执行顺序: 回调函数在流中按照插入顺序执行

示例代码

#include <stdio.h>
#include <cuda_runtime.h>// CUDA核函数
__global__ void kernel(int *data, int value, int N) {int idx = blockIdx.x * blockDim.x + threadIdx.x;if (idx < N) {data[idx] = value;}
}// 回调函数
void CUDART_CB myCallback(cudaStream_t stream, cudaError_t status, void *userData) {printf("Callback executed! Status: %s, User data: %d\n",cudaGetErrorString(status), *(int*)userData);
}int main() {const int N = 1024;const int value = 42;int *d_data = nullptr;int userData = 123; // 用户自定义数据// 分配设备内存cudaMalloc(&d_data, N * sizeof(int));// 创建流cudaStream_t stream;cudaStreamCreate(&stream);// 启动核函数dim3 block(256);dim3 grid((N + block.x - 1) / block.x);kernel<<<grid, block, 0, stream>>>(d_data, value, N);// 添加回调函数到流cudaStreamAddCallback(stream, myCallback, &userData, 0);// 可以继续添加其他操作到流kernel<<<grid, block, 0, stream>>>(d_data, value + 1, N);// 等待流完成cudaStreamSynchronize(stream);// 清理资源cudaFree(d_data);cudaStreamDestroy(stream);return 0;
}

代码解释

  1. 核函数: 简单的核函数,将数组元素设置为指定值。

  2. 回调函数:

    • 必须具有 void CUDART_CB func(cudaStream_t stream, cudaError_t status, void *userData) 的签名
    • status 参数表示流中前面操作的状态
    • userData 是用户提供的自定义数据
  3. 主程序流程:

    • 分配设备内存
    • 创建CUDA流
    • 启动第一个核函数
    • 添加回调函数到流中
    • 启动第二个核函数
    • 同步流以确保所有操作完成
    • 释放资源

回调函数的特点

  1. 执行时机: 回调函数会在流中所有前面的操作完成后执行,但在后续操作开始前执行。

  2. 线程安全: 回调函数在独立的线程中执行,不是主线程。

  3. 限制:

    • 回调函数中不应调用CUDA API函数
    • 不应执行耗时的操作
    • 不应抛出异常
  4. 用户数据: 可以通过 userData 参数传递数据给回调函数,但需要确保在回调执行时数据仍然有效。

更复杂的示例:多个回调

#include <stdio.h>
#include <cuda_runtime.h>__global__ void kernel(int *data, int value, int N) {int idx = blockIdx.x * blockDim.x + threadIdx.x;if (idx < N) {data[idx] += value;}
}void CUDART_CB callback1(cudaStream_t stream, cudaError_t status, void *userData) {printf("Callback 1: Step %d completed\n", *(int*)userData);
}void CUDART_CB callback2(cudaStream_t stream, cudaError_t status, void *userData) {printf("Callback 2: Step %d completed\n", *(int*)userData);
}int main() {const int N = 1024;int *d_data = nullptr;int step1 = 1, step2 = 2;cudaMalloc(&d_data, N * sizeof(int));cudaMemset(d_data, 0, N * sizeof(int));cudaStream_t stream;cudaStreamCreate(&stream);dim3 block(256);dim3 grid((N + block.x - 1) / block.x);// 执行序列: 核函数 -> 回调1 -> 核函数 -> 回调2kernel<<<grid, block, 0, stream>>>(d_data, 10, N);cudaStreamAddCallback(stream, callback1, &step1, 0);kernel<<<grid, block, 0, stream>>>(d_data, 20, N);cudaStreamAddCallback(stream, callback2, &step2, 0);cudaStreamSynchronize(stream);// 验证结果int h_data[N];cudaMemcpy(h_data, d_data, N * sizeof(int), cudaMemcpyDeviceToHost);printf("First element: %d (should be 30)\n", h_data[0]);cudaFree(d_data);cudaStreamDestroy(stream);return 0;
}

这个示例展示了如何在流中插入多个回调函数,每个回调在不同的执行阶段被触发。

注意事项

  1. 回调函数执行时间应尽量短,避免影响整体性能
  2. 确保回调函数中使用的用户数据在回调执行时仍然有效
  3. 回调函数中不应进行CUDA API调用
  4. 回调函数执行顺序与它们在流中的添加顺序一致

回调函数是CUDA流中实现主机-设备交互和任务协调的有力工具,特别适合用于构建复杂的异步执行流水线。


CUDA Stream 回调函数中使用 MPI 或 NCCL

在 CUDA 中,stream 回调函数是在主机端执行的函数,当 stream 中所有前面的操作都完成后会被调用。关于在回调函数中使用 MPI 或 NCCL 的问题:

  1. MPI: 可以在回调函数中使用 MPI 函数,但需要注意 MPI 的线程安全性。MPI 需要初始化为 MPI_THREAD_SERIALIZEDMPI_THREAD_MULTIPLE 级别才能安全地在回调中使用。

  2. NCCL: 也可以在回调函数中使用 NCCL 函数,但需要注意 NCCL 通信可能会与 CUDA 操作交错,需要确保正确的同步。

示例程序

下面是一个展示如何在 CUDA stream 回调函数中使用 MPI 和 NCCL 的示例程序:

#include <stdio.h>
#include <mpi.h>
#include <cuda_runtime.h>
#include <nccl.h>#define CUDACHECK(cmd) do {                         \cudaError_t e = cmd;                              \if( e != cudaSuccess ) {                          \printf("CUDA error %s:%d '%s'\n",             \__FILE__,__LINE__,cudaGetErrorString(e)); \exit(EXIT_FAILURE);                           \}                                                 \
} while(0)#define NCCLCHECK(cmd) do {                         \ncclResult_t r = cmd;                             \if( r != ncclSuccess ) {                          \printf("NCCL error %s:%d '%s'\n",             \__FILE__,__LINE__,ncclGetErrorString(r)); \exit(EXIT_FAILURE);                           \}                                                 \
} while(0)void CUDART_CB myStreamCallback(cudaStream_t stream, cudaError_t status, void *userData) {int *data = (int*)userData;int rank, size;// 获取MPI信息MPI_Comm_rank(MPI_COMM_WORLD, &rank);MPI_Comm_size(MPI_COMM_WORLD, &size);printf("Rank %d: Stream callback executed. Data value: %d\n", rank, *data);// 在这里可以使用MPI函数MPI_Barrier(MPI_COMM_WORLD);// 也可以使用NCCL函数(需要先初始化NCCL)ncclComm_t comm = *(ncclComm_t*)((void**)userData + 1);float *sendbuff, *recvbuff;// 假设这些缓冲区已经在其他地方分配和初始化// NCCLCHECK(ncclAllReduce(sendbuff, recvbuff, count, ncclFloat, ncclSum, comm, stream));printf("Rank %d: Finished MPI/NCCL operations in callback\n", rank);
}int main(int argc, char* argv[]) {int rank, size;// 初始化MPI,要求线程支持MPI_Init_thread(&argc, &argv, MPI_THREAD_SERIALIZED, &provided);if (provided < MPI_THREAD_SERIALIZED) {printf("MPI thread support insufficient\n");MPI_Abort(MPI_COMM_WORLD, 1);}MPI_Comm_rank(MPI_COMM_WORLD, &rank);MPI_Comm_size(MPI_COMM_WORLD, &size);// 初始化CUDAint dev = rank % 8;  // 假设每个进程使用不同的GPUCUDACHECK(cudaSetDevice(dev));// 初始化NCCLncclComm_t comm;ncclUniqueId id;if (rank == 0) ncclGetUniqueId(&id);MPI_Bcast(&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD);NCCLCHECK(ncclCommInitRank(&comm, size, id, rank));// 创建CUDA streamcudaStream_t stream;CUDACHECK(cudaStreamCreate(&stream));// 准备一些数据传递给回调函数int *h_data, *d_data;h_data = (int*)malloc(sizeof(int));*h_data = rank * 100;CUDACHECK(cudaMalloc(&d_data, sizeof(int)));CUDACHECK(cudaMemcpyAsync(d_data, h_data, sizeof(int), cudaMemcpyHostToDevice, stream));// 准备用户数据(包含普通数据和NCCL通信器)void *userData[2];userData[0] = h_data;userData[1] = &comm;// 添加回调函数CUDACHECK(cudaStreamAddCallback(stream, myStreamCallback, userData, 0));// 等待stream完成CUDACHECK(cudaStreamSynchronize(stream));// 清理资源NCCLCHECK(ncclCommDestroy(comm));CUDACHECK(cudaStreamDestroy(stream));CUDACHECK(cudaFree(d_data));free(h_data);MPI_Finalize();return 0;
}

注意事项

  1. MPI 线程安全: 必须使用 MPI_Init_thread 并确保提供的线程支持级别足够(至少 MPI_THREAD_SERIALIZED)。

  2. NCCL 使用: 在回调中使用 NCCL 时需要确保:

    • NCCL 通信器已经初始化
    • 使用的 CUDA stream 与 NCCL 操作兼容
    • 缓冲区已经正确分配和初始化
  3. 死锁风险: 在回调中进行集体通信操作(如 MPI_Barrier 或 ncclAllReduce)时要小心,确保所有进程都能到达该点。

  4. 性能考虑: 在回调中进行通信操作可能会影响整体性能,需要仔细评估。

这个示例展示了基本用法,实际应用中需要根据具体需求进行调整。

相关文章:

  • Pacman-search
  • 【RabbitMQ】保证消息不丢失
  • PaddleX的安装
  • “八股训练营”学习总结
  • C++STL(九) :bitset的介绍与使用
  • 特征工程四:数据特征提取TfidfVectorizer的使用
  • re题(48)BUUCTF-[网鼎杯 2020 青龙组]singal
  • 对日开发 秀丸文本编辑器 宏的基本使用
  • 计算属性 vs methods方法
  • Java大厂面试突击:从Spring Boot自动配置到Kafka分区策略实战解析
  • SVT-AV1源码分析-函数svt_aom_motion_estimation_kernel
  • linux:进程的替换
  • 深入解读:2025 数字化转型管理 参考架构
  • 【算法】回溯法
  • 杭电oj(1010、1015、1241)题解
  • 【沉浸式求职学习day27】
  • 【视频生成模型】通义万相Wan2.1模型本地部署和LoRA微调
  • Python----深度学习(基于DNN的吃鸡预测)
  • 动手学深度学习11.11. 学习率调度器-笔记练习(PyTorch)
  • arcpy列表函数的应用(4)
  • 阿根廷发生5.6级地震,震源深度30公里
  • 中国固体火箭发动机领域杰出专家赵殿礼逝世,享年92岁
  • 看见“看得见的手”,看见住房与土地——读《央地之间》
  • 上海市十六届人大常委会第二十一次会议表决通过有关人事任免事项
  • 美国通过《删除法案》:打击未经同意发布他人私密图像,包括“深度伪造”
  • 发挥全国劳模示范引领作用,加速汽车产业电智化转型