cuda编程笔记(26)-- 核函数使用任务队列
任务队列是常见的解耦方式。在CPU的多核编程里,经常有这种生产者消费者的场景:生产者线程往任务队列丢任务,消费者线程在任务队列取任务去干活。
在cuda的核函数里,似乎也可以这么设计,每个线程都去取任务干活。但是为了利用GPU的多线程的特性,可以将取任务的单位归属到block,一个block认领一个任务后,由block内的线程共同处理。这样即做到了任务队列解耦,还利用了GPU的多线程特性。
至于生产者消费者的异步模型,则可以通过cuda里的流来进行实现。
先看看自产自销的模式,熟悉任务队列的写法:
#ifndef __CUDACC__
#define __CUDACC__
#endif
#include <cuda_runtime.h>
#include <cstdio>struct Task {//任务的数据结构由自己定义int id;int depth;
};// ------------------- 全局任务队列定义 -------------------
struct TaskQueue {Task* tasks;int* head;int* tail;int capacity;//现在的设计是固定容量,最多处理capacity个任务,而不是随时变化
};__device__ TaskQueue gQueue;// ------------------- 初始化队列 -------------------
__global__ void initQueue(Task* buf, int* head, int* tail, int capacity, int initTasks) {if (threadIdx.x == 0 && blockIdx.x == 0) {gQueue.tasks = buf;gQueue.head = head;gQueue.tail = tail;gQueue.capacity = capacity;*gQueue.head = 0;*gQueue.tail = initTasks;}
}// ------------------- 设备函数 -------------------
__device__ bool fetchTask(Task& out) {int idx = atomicAdd(gQueue.head, 1);if (idx >= *gQueue.tail) return false;out = gQueue.tasks[idx];return true;
}__device__ void pushTask(const Task& t) {int idx = atomicAdd(gQueue.tail, 1);if (idx < gQueue.capacity)gQueue.tasks[idx] = t;
}// ------------------- 每个Block一个任务 -------------------
__global__ void workerKernel() {__shared__ Task task; // 每个Block共享一个任务__shared__ bool hasTask;while (true) {if (threadIdx.x == 0) {hasTask = fetchTask(task); // 只由block内线程0取任务}__syncthreads(); // 等待任务取完if (!hasTask) break;// 这里所有线程协作执行同一个任务printf("Block %d Thread %d 处理任务 %d depth=%d\n",blockIdx.x, threadIdx.x, task.id, task.depth);// 模拟子任务生成(仅block内thread 0执行)if (threadIdx.x == 0 && task.depth < 2) {Task newTask{ task.id * 10 + blockIdx.x, task.depth + 1 };pushTask(newTask);}__syncthreads(); // 确保任务完成}
}// ------------------- Host 端 -------------------
int main() {const int CAPACITY = 1024;const int INIT_TASKS = 4;Task* d_tasks;int* d_head, * d_tail;cudaMalloc(&d_tasks, CAPACITY * sizeof(Task));cudaMalloc(&d_head, sizeof(int));cudaMalloc(&d_tail, sizeof(int));Task h_init[INIT_TASKS];for (int i = 0; i < INIT_TASKS; ++i) h_init[i] = { i, 0 };cudaMemcpy(d_tasks, h_init, sizeof(h_init), cudaMemcpyHostToDevice);initQueue << <1, 1 >> > (d_tasks, d_head, d_tail, CAPACITY, INIT_TASKS);cudaDeviceSynchronize();workerKernel << <4, 32 >> > (); // 每个block对应一个任务cudaDeviceSynchronize();cudaFree(d_tasks);cudaFree(d_head);cudaFree(d_tail);return 0;
}
然后看一下生产者消费者的情况:
全局任务队列(在 GPU 全局内存中)
生产者(Producer Kernel) 负责不断往队列里 push 任务;
消费者(Consumer Kernel) 各个 Block 协作,从队列里取任务(每个 Block 消费一个任务);
队列容量可以临时去掉(即假设任务永远不会太多而溢出);
队列判空由
head == tail
决定;不考虑环形结构(线性增长即可);
#ifndef __CUDACC__
#define __CUDACC__
#endif
#include <cuda_runtime.h>
#include <cstdio>struct Task {int id;int depth;
};// ------------------- 全局任务队列定义 -------------------
struct TaskQueue {Task* tasks;int* head; // 消费位置int* tail; // 生产位置
};__device__ TaskQueue gQueue;// ------------------- 初始化队列 -------------------
__global__ void initQueue(Task* buf, int* head, int* tail) {if (threadIdx.x == 0 && blockIdx.x == 0) {gQueue.tasks = buf;gQueue.head = head;gQueue.tail = tail;*gQueue.head = 0;*gQueue.tail = 0;}
}// ------------------- 设备函数:入队 / 出队 -------------------
__device__ bool fetchTask(Task& out) {int idx = atomicAdd(gQueue.head, 1);if (idx >= *gQueue.tail) {// 没有任务可取,撤销本次取atomicSub(gQueue.head, 1);return false;}out = gQueue.tasks[idx];return true;
}__device__ void pushTask(const Task& t) {int idx = atomicAdd(gQueue.tail, 1);gQueue.tasks[idx] = t;
}// ------------------- 生产者核函数 -------------------
__global__ void producerKernel(int numTasks) {int tid = blockIdx.x * blockDim.x + threadIdx.x;if (tid < numTasks) {Task t{ tid, 0 };pushTask(t);printf("Producer %d 生产任务 id=%d\n", tid, t.id);}
}// ------------------- 消费者核函数(Block级消费) -------------------
__global__ void consumerKernel() {__shared__ Task task;__shared__ bool hasTask;while (true) {if (threadIdx.x == 0)hasTask = fetchTask(task);__syncthreads();if (!hasTask) break; // 队列空,结束printf("Consumer Block %d Thread %d 消费任务 id=%d\n",blockIdx.x, threadIdx.x, task.id);// 模拟执行任务for (int i = 0; i < 1000; ++i) __threadfence(); // 或简单的空操作,防止编译器优化掉__syncthreads();}
}// ------------------- Host 端 -------------------
int main() {const int CAPACITY = 128;const int NUM_TASKS = 16;Task* d_tasks;int* d_head, * d_tail;cudaMalloc(&d_tasks, CAPACITY * sizeof(Task));cudaMalloc(&d_head, sizeof(int));cudaMalloc(&d_tail, sizeof(int));// 初始化队列initQueue << <1, 1 >> > (d_tasks, d_head, d_tail);cudaDeviceSynchronize();cudaStream_t streamProd, streamCons;cudaStreamCreate(&streamProd);cudaStreamCreate(&streamCons);// 异步并发:生产者 + 消费者producerKernel << <2, 8, 0, streamProd >> > (NUM_TASKS);consumerKernel << <4, 32, 0, streamCons >> > ();cudaDeviceSynchronize();cudaFree(d_tasks);cudaFree(d_head);cudaFree(d_tail);cudaStreamDestroy(streamProd);cudaStreamDestroy(streamCons);return 0;
}