当前位置: 首页 > news >正文

cuda编程笔记(26)-- 核函数使用任务队列

任务队列是常见的解耦方式。在CPU的多核编程里,经常有这种生产者消费者的场景:生产者线程往任务队列丢任务,消费者线程在任务队列取任务去干活。

在cuda的核函数里,似乎也可以这么设计,每个线程都去取任务干活。但是为了利用GPU的多线程的特性,可以将取任务的单位归属到block,一个block认领一个任务后,由block内的线程共同处理。这样即做到了任务队列解耦,还利用了GPU的多线程特性。

至于生产者消费者的异步模型,则可以通过cuda里的流来进行实现。

先看看自产自销的模式,熟悉任务队列的写法:

#ifndef __CUDACC__
#define __CUDACC__
#endif
#include <cuda_runtime.h>
#include <cstdio>struct Task {//任务的数据结构由自己定义int id;int depth;
};// ------------------- 全局任务队列定义 -------------------
struct TaskQueue {Task* tasks;int* head;int* tail;int  capacity;//现在的设计是固定容量,最多处理capacity个任务,而不是随时变化
};__device__ TaskQueue gQueue;// ------------------- 初始化队列 -------------------
__global__ void initQueue(Task* buf, int* head, int* tail, int capacity, int initTasks) {if (threadIdx.x == 0 && blockIdx.x == 0) {gQueue.tasks = buf;gQueue.head = head;gQueue.tail = tail;gQueue.capacity = capacity;*gQueue.head = 0;*gQueue.tail = initTasks;}
}// ------------------- 设备函数 -------------------
__device__ bool fetchTask(Task& out) {int idx = atomicAdd(gQueue.head, 1);if (idx >= *gQueue.tail) return false;out = gQueue.tasks[idx];return true;
}__device__ void pushTask(const Task& t) {int idx = atomicAdd(gQueue.tail, 1);if (idx < gQueue.capacity)gQueue.tasks[idx] = t;
}// ------------------- 每个Block一个任务 -------------------
__global__ void workerKernel() {__shared__ Task task;   // 每个Block共享一个任务__shared__ bool hasTask;while (true) {if (threadIdx.x == 0) {hasTask = fetchTask(task); // 只由block内线程0取任务}__syncthreads(); // 等待任务取完if (!hasTask) break;// 这里所有线程协作执行同一个任务printf("Block %d Thread %d 处理任务 %d depth=%d\n",blockIdx.x, threadIdx.x, task.id, task.depth);// 模拟子任务生成(仅block内thread 0执行)if (threadIdx.x == 0 && task.depth < 2) {Task newTask{ task.id * 10 + blockIdx.x, task.depth + 1 };pushTask(newTask);}__syncthreads(); // 确保任务完成}
}// ------------------- Host 端 -------------------
int main() {const int CAPACITY = 1024;const int INIT_TASKS = 4;Task* d_tasks;int* d_head, * d_tail;cudaMalloc(&d_tasks, CAPACITY * sizeof(Task));cudaMalloc(&d_head, sizeof(int));cudaMalloc(&d_tail, sizeof(int));Task h_init[INIT_TASKS];for (int i = 0; i < INIT_TASKS; ++i) h_init[i] = { i, 0 };cudaMemcpy(d_tasks, h_init, sizeof(h_init), cudaMemcpyHostToDevice);initQueue << <1, 1 >> > (d_tasks, d_head, d_tail, CAPACITY, INIT_TASKS);cudaDeviceSynchronize();workerKernel << <4, 32 >> > (); // 每个block对应一个任务cudaDeviceSynchronize();cudaFree(d_tasks);cudaFree(d_head);cudaFree(d_tail);return 0;
}

然后看一下生产者消费者的情况:

  • 全局任务队列(在 GPU 全局内存中)

  • 生产者(Producer Kernel) 负责不断往队列里 push 任务;

  • 消费者(Consumer Kernel) 各个 Block 协作,从队列里取任务(每个 Block 消费一个任务);

  • 队列容量可以临时去掉(即假设任务永远不会太多而溢出);

  • 队列判空由 head == tail 决定;

  • 不考虑环形结构(线性增长即可);

#ifndef __CUDACC__
#define __CUDACC__
#endif
#include <cuda_runtime.h>
#include <cstdio>struct Task {int id;int depth;
};// ------------------- 全局任务队列定义 -------------------
struct TaskQueue {Task* tasks;int* head; // 消费位置int* tail; // 生产位置
};__device__ TaskQueue gQueue;// ------------------- 初始化队列 -------------------
__global__ void initQueue(Task* buf, int* head, int* tail) {if (threadIdx.x == 0 && blockIdx.x == 0) {gQueue.tasks = buf;gQueue.head = head;gQueue.tail = tail;*gQueue.head = 0;*gQueue.tail = 0;}
}// ------------------- 设备函数:入队 / 出队 -------------------
__device__ bool fetchTask(Task& out) {int idx = atomicAdd(gQueue.head, 1);if (idx >= *gQueue.tail) {// 没有任务可取,撤销本次取atomicSub(gQueue.head, 1);return false;}out = gQueue.tasks[idx];return true;
}__device__ void pushTask(const Task& t) {int idx = atomicAdd(gQueue.tail, 1);gQueue.tasks[idx] = t;
}// ------------------- 生产者核函数 -------------------
__global__ void producerKernel(int numTasks) {int tid = blockIdx.x * blockDim.x + threadIdx.x;if (tid < numTasks) {Task t{ tid, 0 };pushTask(t);printf("Producer %d 生产任务 id=%d\n", tid, t.id);}
}// ------------------- 消费者核函数(Block级消费) -------------------
__global__ void consumerKernel() {__shared__ Task task;__shared__ bool hasTask;while (true) {if (threadIdx.x == 0)hasTask = fetchTask(task);__syncthreads();if (!hasTask) break; // 队列空,结束printf("Consumer Block %d Thread %d 消费任务 id=%d\n",blockIdx.x, threadIdx.x, task.id);// 模拟执行任务for (int i = 0; i < 1000; ++i) __threadfence(); // 或简单的空操作,防止编译器优化掉__syncthreads();}
}// ------------------- Host 端 -------------------
int main() {const int CAPACITY = 128;const int NUM_TASKS = 16;Task* d_tasks;int* d_head, * d_tail;cudaMalloc(&d_tasks, CAPACITY * sizeof(Task));cudaMalloc(&d_head, sizeof(int));cudaMalloc(&d_tail, sizeof(int));// 初始化队列initQueue << <1, 1 >> > (d_tasks, d_head, d_tail);cudaDeviceSynchronize();cudaStream_t streamProd, streamCons;cudaStreamCreate(&streamProd);cudaStreamCreate(&streamCons);// 异步并发:生产者 + 消费者producerKernel << <2, 8, 0, streamProd >> > (NUM_TASKS);consumerKernel << <4, 32, 0, streamCons >> > ();cudaDeviceSynchronize();cudaFree(d_tasks);cudaFree(d_head);cudaFree(d_tail);cudaStreamDestroy(streamProd);cudaStreamDestroy(streamCons);return 0;
}

http://www.dtcms.com/a/461475.html

相关文章:

  • 存储芯片核心产业链研发实力:兆易创新、北京君正、澜起科技、江波龙、长电科技、佰维存储,6家龙头公司研发实力深度数据
  • 《Seq2Time: Sequential Knowledge Transfer for Video LLMTemporal Grounding》
  • 山东省建设部网站官网网站备案审核通过后
  • 浏览器兼容性问题处理
  • Day 09(下) B2a实例解说----exampleB2a.cc+ActionInitialization+PrimaryGeneratorAction
  • 分布式锁:Redisson的可重入锁
  • 计算机硬件相关(AI回答)
  • 网站设计中的用户体验大型网站需要什么样的团队
  • 淘宝网站开发方式网站托管 济南
  • 重庆网站seo案例网站推广用什么方法最好
  • sql报错:java.sql.SQLSyntaxErrorException: Unknown column ‘as0‘ in ‘where clause‘
  • 做网站是什么公司做陶瓷公司网站
  • CentOS 7上安装SonarQube8.9
  • 遗留系统微服务改造(二):数据迁移实战攻略与一致性保证
  • IO操作(Num22)
  • 领码方案|微服务与SOA的世纪对话(6):组织跃迁——智能架构下的团队与文化变革
  • 怎么什么软件可以吧做网站网站被百度收录很重要
  • C++ 单例模式(Singleton)详解
  • 面向未来的数据平台
  • C++5d
  • Transformer实战(21)——文本表示(Text Representation)
  • 网站空间商 权限梵克雅宝
  • 【Vue 3 】——setup、ref、watch
  • 做期货网站违法的吗淄博市住房和城乡建设局网站
  • 使用feign进行远程调用出现的问题(文件服务参数接收为null)
  • 国自然·医工交叉热点|通用医学影像分割基础模型与数据库
  • React Native:关于react自定义css属性的位置
  • 对于el-table中自定义表头中添加el-popover会弹出两个的解决方案,分别针对固定列和非固定列来隐藏最后一个浮框。
  • 电子商务公司简介系统清理优化工具
  • 内网渗透实战:红队作战全解析