当前位置: 首页 > news >正文

cuda-NCCL笔记(3)-- 分布式训练LeNet

有了nccl,我们就可以尝试做分布式训练了,刚好之前也写了LeNet这个类,就把它们结合起来,做一个分布式的训练;

本例子需要Linux环境,并且该主机上需要有两个GPU才能运行

首先是LeNet的训练数据集MNIST,train-images-idx3-ubyte与train-labels-idx1-ubyte;这两个文件网上都可以找到压缩包,下载并解压即可得到这两个二进制文件。

对于pytorch,有自带的api去读取,对于C++,只能自己去实现数据的读取了

Dataloaders.hpp

该头文件定义了读取数据集的类MNISTDataset

#include <iostream>
#include <fstream>
#include <vector>
#include <string>
#include <stdexcept>
#include <random>
#include <algorithm>
#include <cstdint>
struct MNISTSamlpe
{MNISTSamlpe(const std::vector<float>& image_,const unsigned char &label_):image(image_),label(label_){}std::vector<float> image;//28*28unsigned char label;
};
class MNISTDataset{
public:MNISTDataset(const std::string &image_file,const std::string &label_file){load_images(image_file);load_labels(label_file);if(images.size()!=labels.size()){throw std::runtime_error("Number of images and labels mismatch");}indices.resize(images.size());for(size_t i=0;i<indices.size();i++)indices[i]=i;}//随机打乱void shuffle(unsigned seed=42){std::shuffle(indices.begin(),indices.end(),std::default_random_engine(seed));current_idx=0;}//Get next batch(获取下一批数据)std::vector<MNISTSamlpe> next_batch(size_t batch_size){std::vector<MNISTSamlpe> batch;batch.reserve(batch_size);for(size_t i=0;i<batch_size;i++){if(current_idx>=indices.size()) current_idx=0;size_t idx=indices[current_idx++];batch.emplace_back(MNISTSamlpe{images[idx], labels[idx]});}return batch;}size_t size() const { return images.size(); }
private:std::vector<std::vector<float>> images;std::vector<unsigned char> labels;std::vector<size_t> indices;size_t current_idx=0;void load_images(const std::string &path){std::ifstream file(path,std::ios::binary);if(!file) throw std::runtime_error("Cannot open image file");uint32_t magic,num,rows,cols;file.read((char *)&magic,4);file.read((char*)&num, 4);file.read((char*)&rows, 4);file.read((char*)&cols, 4);magic=__builtin_bswap32(magic);num   = __builtin_bswap32(num);rows  = __builtin_bswap32(rows);cols  = __builtin_bswap32(cols);if(magic != 2051) throw std::runtime_error("Invalid MNIST image file");images.resize(num,std::vector<float>(rows*cols));for(uint32_t i=0;i<num;i++){for(uint32_t j=0;j<rows*cols;j++){unsigned char pixel;file.read((char*)&pixel,1);images[i][j]=pixel/255.0f;}}}void load_labels(const std::string &path) {std::ifstream file(path, std::ios::binary);if(!file) throw std::runtime_error("Cannot open label file");uint32_t magic, num;file.read((char*)&magic, 4);file.read((char*)&num, 4);magic = __builtin_bswap32(magic);num   = __builtin_bswap32(num);if(magic != 2049) throw std::runtime_error("Invalid MNIST label file");labels.resize(num);for(uint32_t i=0;i<num;i++) {unsigned char lbl;file.read((char*)&lbl,1);labels[i] = lbl;}}
};

kernels.hpp

有一些需要手写的核函数,主要用到了softmax_forward_loss_batch,softmax_ce_backward_batch,scale_kernel

#pragma once
#include <cuda_runtime.h>
#include <device_launch_parameters.h>
//功能:将数组 p 初始化为 [a,b] 区间的均匀分布随机数。
__global__ void init_uniform(float* p, int n, unsigned seed, float a = -0.05f, float b = 0.05f);
//张量清零 功能:把数组 p 清零。常用于梯度初始化。
__global__ void zero_kernel(float* p, int n);//功能:在卷积 / 全连接的输出上加上对应的 bias 偏置。
//原理:根据 idx 反推当前像素属于哪个通道 c,然后加上对应的 b[c]。(因为bias是根据通道划分的,一个通道共用一个bias)
__global__ void add_bias_nchw(float* y, const float* b, int N, int C, int H, int W) ;// Softmax 前向 + 交叉熵损失(单样本),logits->prob, 返回loss
//功能:计算 softmax 概率,同时得到交叉熵损失。
//原理:标准 softmax + CE,做了 max 平移避免指数溢出。
__global__ void softmax_forward_loss(const float* logits, const int* label, float* prob, float* loss, int num_classes);// softmax + CE 的反向:dlogits(损失函数对logits的导数) = prob - onehot(推导过程自行学习)
//功能:计算 dL / dlogits。
__global__ void softmax_ce_backward(const float* prob, const int* label, float* dlogits, int num_classes);
//多样本版本的
__global__ void softmax_forward_loss_batch(const float* logits, const unsigned* labels,float* prob, float* loss,int batch_size, int num_classes);
__global__ void softmax_ce_backward_batch(const float* prob, const unsigned* labels,float* dlogits,int batch_size, int num_classes);
// SGD 参数更新:W -= lr * dW
//功能:梯度下降更新参数。 
__global__ void sgd_update(float* W, const float* dW, float lr, int n) ;
// 功能是计算卷积层的 bias 梯度
// 逻辑:把输出梯度 dy 在 N、H、W 上做 sum,得到每个通道的偏置梯度。**每个通道一个线程**
//dy:上层传下来的梯度,形状是(N, C, H, W)
//数学上,db[c]=dy在N*H*W维度上的总和
__global__ void reduce_bias_grad(const float* dy, float* db, int N, int C, int H, int W) ;//张量展平 (Flatten): 将特征图 (N,C,H,W) flatten 成 (N, C*H*W) 行优先缓冲(简单拷贝)
//这里简化为 N=1,只是拷贝。
__global__ void nchw_to_nxk(const float* x, float* y, int N, int C, int H, int W) ;
//功能:反展平,主要用在调试或反向传播时。
__global__ void nxk_to_nchw(const float* x, float* y, int N, int C, int H, int W) ;
// 初始化 GPU 上的 ones 向量
__global__ void init_ones_kernel(float* data, int n) ;
//缩放
__global__ void scale_kernel(float* data, int n, float factor);

kernels.cu

#include"kernels.hpp"#include <cuda_runtime.h>
#include <device_launch_parameters.h>//功能:将数组 p 初始化为 [a,b] 区间的均匀分布随机数。
__global__ void init_uniform(float* p, int n, unsigned seed, float a, float b ) {int i = blockIdx.x * blockDim.x + threadIdx.x;if (i < n) {// 线性同余伪随机(教学用)unsigned s = seed ^ (i * 747796405u + 2891336453u);s ^= s >> 17; s *= 0xed5ad4bbU; s ^= s >> 11; s *= 0xac4c1b51U; s ^= s >> 15; s *= 0x31848babU; s ^= s >> 14;float r = (s & 0x00FFFFFF) / float(0x01000000); // [0,1)p[i] = a + (b - a) * r;}
}
//张量清零 功能:把数组 p 清零。常用于梯度初始化。
__global__ void zero_kernel(float* p, int n) {int i= blockIdx.x * blockDim.x + threadIdx.x;if (i < n) {p[i] = 0.0f;}
}//功能:在卷积 / 全连接的输出上加上对应的 bias 偏置。
//原理:根据 idx 反推当前像素属于哪个通道 c,然后加上对应的 b[c]。(因为bias是根据通道划分的,一个通道共用一个bias)
__global__ void add_bias_nchw(float* y, const float* b, int N, int C, int H, int W) {//b数组的大小是Cint i= blockIdx.x * blockDim.x + threadIdx.x;int total = N * C * H * W;if (i < total) {int c = (i / (H * W)) % C;//计算当前像素属于哪个同搭配channely[i] += b[c];}
}// Softmax 前向 + 交叉熵损失(单样本),logits->prob, 返回loss
//功能:计算 softmax 概率,同时得到交叉熵损失。
//原理:标准 softmax + CE,做了 max 平移避免指数溢出。
__global__ void softmax_forward_loss(const float* logits, const int* label, float* prob, float* loss, int num_classes) {// 单样本简化// 1) 减去最大值防溢出float mx = logits[0];for (int i = 1; i < num_classes; i++)mx = fmaxf(mx, logits[i]);float sum = 0.f;for (int i =0; i < num_classes; i++) {float e = expf(logits[i] - mx);prob[i] = e;sum += e;}for (int i = 0; i < num_classes; i++)prob[i] /= sum;int y = *label;float l = -logf(fmaxf(prob[y], 1e-12f));//这是损失*loss = l;
}// softmax + CE 的反向:dlogits(损失函数对logits的导数) = prob - onehot(推导过程自行学习)
//功能:计算 dL / dlogits。
__global__ void softmax_ce_backward(const float* prob, const int* label, float* dlogits, int num_classes) {int i = blockIdx.x * blockDim.x + threadIdx.x;if (i < num_classes) {int y = *label;dlogits[i] = prob[i] - (i == y ? 1.f : 0.f);//详见softmax的梯度公式}
}
// softmax + cross entropy loss (batch)
// logits: [batch_size, num_classes]
// labels: [batch_size]
// prob:   [batch_size, num_classes]
// loss:   [batch_size]  (每个样本的loss,外面可以再取均值)
__global__ void softmax_forward_loss_batch(const float* logits, const unsigned* labels,float* prob, float* loss,int batch_size, int num_classes)
{int b = blockIdx.x; // 每个block处理一个样本if (b >= batch_size) return;const float* logit_row = logits + b * num_classes;float* prob_row = prob + b * num_classes;// 1) 找最大值防止溢出float mx = logit_row[0];for (int i = 1; i < num_classes; i++)mx = fmaxf(mx, logit_row[i]);// 2) exp & sumfloat sum = 0.f;for (int i = 0; i < num_classes; i++) {float e = expf(logit_row[i] - mx);prob_row[i] = e;sum += e;}// 3) 归一化 softmaxfor (int i = 0; i < num_classes; i++)prob_row[i] /= sum;// 4) cross entropy lossunsigned y = labels[b];float l = -logf(fmaxf(prob_row[y], 1e-12f));loss[b] = l;
}// backward: dlogits = prob - onehot
// prob: [batch_size, num_classes]
// labels: [batch_size]
// dlogits: [batch_size, num_classes]
__global__ void softmax_ce_backward_batch(const float* prob, const unsigned* labels,float* dlogits,int batch_size, int num_classes)
{int b = blockIdx.x;  // 样本编号int i = threadIdx.x; // 类别编号if (b >= batch_size || i >= num_classes) return;int y = labels[b];const float* prob_row = prob + b * num_classes;float* dlogit_row = dlogits + b * num_classes;dlogit_row[i] = prob_row[i] - (i == y ? 1.f : 0.f);
}// SGD 参数更新:W -= lr * dW
//功能:梯度下降更新参数。 
__global__ void sgd_update(float* W, const float* dW, float lr, int n) {int i= blockIdx.x * blockDim.x + threadIdx.x;if (i < n) {W[i] -= lr * dW[i];}
}
// 功能是计算卷积层的 bias 梯度
// 逻辑:把输出梯度 dy 在 N、H、W 上做 sum,得到每个通道的偏置梯度。**每个通道一个线程**
//dy:上层传下来的梯度,形状是(N, C, H, W)
//数学上,db[c]=dy在N*H*W维度上的总和
__global__ void reduce_bias_grad(const float* dy, float* db, int N, int C, int H, int W) {int c = blockIdx.x * blockDim.x + threadIdx.x;if (c >= C) return;float s = 0.f;for (int n = 0; n < N; n++) {//遍历 batch 内的每个样本 nconst float* p = dy + (n * C + c) * H * W;//p 指向第 n 个样本、第 c 个通道的起始地址for (int i = 0; i < H * W; i++)//内层循环:遍历该通道的所有空间位置(H * W)s += p[i];//累加梯度到 s}db[c] = s;
}//张量展平 (Flatten): 将特征图 (N,C,H,W) flatten 成 (N, C*H*W) 行优先缓冲(简单拷贝)
//这里简化为 N=1,只是拷贝。
__global__ void nchw_to_nxk(const float* x, float* y, int N, int C, int H, int W) {// N=1 简化int k = C * H * W;int i = blockIdx.x * blockDim.x + threadIdx.x;if (i < k) y[i] = x[i];
}
//功能:反展平,主要用在调试或反向传播时。
__global__ void nxk_to_nchw(const float* x, float* y, int N, int C, int H, int W) {int k = C * H * W;int i = blockIdx.x * blockDim.x + threadIdx.x;if (i < k) y[i] = x[i];
}
// 初始化 GPU 上的 ones 向量
__global__ void init_ones_kernel(float* data, int n) {int idx = blockIdx.x*blockDim.x + threadIdx.x;if(idx < n) data[idx] = 1.0f;
}
//缩放
__global__ void scale_kernel(float* data, int n, float factor) {int idx = blockIdx.x * blockDim.x + threadIdx.x;if (idx < n) {data[idx] *= factor;}
}

Layers.hpp

对于定义的LeNet类,需要用到nccl的地方是梯度。对于两个GPU上训练的梯度,需要做一次平均,再广播到各个GPU,这样才能实现每个GPU上网络参数一致。实现的方式即ncclAllReduce

我们额外定义一个allreduce_grads函数,用来把有梯度的层的梯度在两个GPU上进行平均

#pragma once
#include <cuda_runtime.h>
#include <device_launch_parameters.h>
#include <cudnn.h>
#include <cublas_v2.h>
#include <iostream>
#include<cstdio>
#include <cmath>
#include <cstdlib>
#include <vector>
#include<memory>
#include <nccl.h>
#include"kernels.hpp"
class Layer {//抽象的层
public://输入的逻辑:调用者传入分配内存空间的输入,输出存在类里virtual void forward(float* input) = 0;//反向传播的逻辑亦是如此,只是名字反了一下,参数名字是outputvirtual void backward(float* grad_output) = 0;virtual float* get_output() = 0;virtual float* get_grad_input() = 0;virtual float* get_w_grad()=0;virtual float* get_b_grad()=0;virtual int get_w_grad_size()=0;virtual int get_b_grad_size()=0;virtual void update(float lr) = 0;virtual~Layer() {}
};
class Conv2D :public Layer {
public:Conv2D(cudnnHandle_t &handle,int batch,int in_channels,int out_channels,int in_h,int in_w,int kernel_size,int stride=1,int padding=0);void forward(float* input)override {_input = input;const float alpha = 1.0f, beta = 0.0f;//向前cudnnConvolutionForward(_handle, &alpha, _input_desc,_input,_filter_desc,_weight,_conv_desc,_fwd_algo,_fwd_ws,_fwd_ws_size,&beta,_output_desc,_output);//加偏置cudnnAddTensor(_handle, &alpha, _bias_desc, _bias, &alpha, _output_desc, _output);//注意这里两个都是alpha,详情查看函数接口}void backward(float* grad_output)override {const float alpha = 1.0f, beta = 0.0f;// grad_biascudnnConvolutionBackwardBias(_handle, &alpha, _output_desc, grad_output, &beta, _bias_desc, _grad_bias);// grad_weightcudnnConvolutionBackwardFilter(_handle, &alpha, _input_desc, _input, _output_desc, grad_output,_conv_desc, _bwd_filter_algo, _bwd_filter_ws, _bwd_filter_ws_size,&beta,_filter_desc, _grad_weight);// grad_input 这个输出 是要传到外面去给下一层用的cudnnConvolutionBackwardData(_handle, &alpha, _filter_desc, _weight,_output_desc, grad_output,_conv_desc, _bwd_data_algo, _bwd_data_ws, _bwd_data_ws_size, &beta, _input_desc, _grad_input);}float* get_output() override { return _output; }float* get_grad_input() override { return _grad_input; }float* get_w_grad(){return _grad_weight;}float* get_b_grad(){return _grad_bias;}int get_w_grad_size(){return _out_channels * _in_channels * _kernel_size * _kernel_size;}int get_b_grad_size(){return _out_channels;}void update(float lr) override;~Conv2D() {cudaFree(_weight);cudaFree(_bias);cudaFree(_grad_weight);cudaFree(_grad_bias);cudaFree(_output);cudaFree(_grad_input);if (_fwd_ws) cudaFree(_fwd_ws);if (_bwd_filter_ws) cudaFree(_bwd_filter_ws);if (_bwd_data_ws) cudaFree(_bwd_data_ws);cudnnDestroyTensorDescriptor(_input_desc);cudnnDestroyTensorDescriptor(_output_desc);cudnnDestroyTensorDescriptor(_bias_desc);cudnnDestroyFilterDescriptor(_filter_desc);cudnnDestroyConvolutionDescriptor(_conv_desc);}
private:int _in_channels, _out_channels, _kernel_size, _stride, _padding, _batch;//卷积核的参数(kernel暂时是正方形的)int _in_h, _in_w, _out_h, _out_w;//输入输出的形状float* _weight, * _bias;//卷积核的参数和偏置float* _input, * _output, * _grad_input;//x的输入、输出,反向传播时向外输出的梯度float* _grad_weight, * _grad_bias;//参数的梯度,偏置的梯度cudnnHandle_t& _handle;//引用外部的handle,这样一个程序只用创建一个handlecudnnTensorDescriptor_t _input_desc, _output_desc, _bias_desc;//三个张量描述子:输入张量,输出张量,偏置张量cudnnFilterDescriptor_t _filter_desc;//卷积核描述子cudnnConvolutionDescriptor_t _conv_desc;//卷积描述子cudnnConvolutionFwdAlgo_t _fwd_algo;//前向操作的算法cudnnConvolutionBwdFilterAlgo_t _bwd_filter_algo;//反向求导中,对参数求导的算法cudnnConvolutionBwdDataAlgo_t _bwd_data_algo;//反向求导中,对输入求导的算法size_t _fwd_ws_size, _bwd_filter_ws_size, _bwd_data_ws_size;//各个工作空间的大小void* _fwd_ws=nullptr, * _bwd_filter_ws= nullptr, * _bwd_data_ws= nullptr;//各个工作空间的指针
};
class ReLU :public Layer {
public:ReLU(cudnnHandle_t& handle, int n, int c, int h, int w) :_handle(handle),_n(n),_c(c),_h(h),_w(w){//描述子初始化cudnnCreateTensorDescriptor(&_input_desc);cudnnCreateActivationDescriptor(&_act_desc);//描述子设置cudnnSetTensor4dDescriptor(_input_desc, CUDNN_TENSOR_NCHW, CUDNN_DATA_FLOAT, _n, _c, _h, _w);cudnnSetActivationDescriptor(_act_desc,CUDNN_ACTIVATION_RELU ,CUDNN_PROPAGATE_NAN,0.0f);//分配GPU内存cudaMalloc(&_output, sizeof(float) * _n * _c * _h * _w);cudaMalloc(&_grad_input, sizeof(float) * _n * _c * _h * _w);}void forward(float* input)override {_input = input;const float alpha = 1.0f, beta = 0.0f;//执行激活操作,结果保存在_outputcudnnActivationForward(_handle, _act_desc, &alpha, _input_desc, _input, &beta, _input_desc, _output);}void backward(float* grad_output)override {//执行反向梯度计算,结果存在_grad_inputconst float alpha = 1.0f, beta = 0.0f;cudnnActivationBackward(_handle, _act_desc, &alpha,_input_desc, _output,        // yDesc, y (forward 输出)_input_desc, grad_output,    // dyDesc, dy (来自上层的梯度)_input_desc, _input,         // xDesc, x (forward 输入)&beta,_input_desc, _grad_input);   // dxDesc, dx}float* get_output()override { return _output; }float* get_grad_input()override { return _grad_input; }float* get_w_grad(){return nullptr;}float* get_b_grad(){return nullptr;}int get_w_grad_size(){return 0;}int get_b_grad_size(){return 0;}void update(float lr) override{}~ReLU() {cudaFree(_output);cudaFree(_grad_input);cudnnDestroyTensorDescriptor(_input_desc);cudnnDestroyActivationDescriptor(_act_desc);}
private:float* _input;float* _output;float* _grad_input;int _n, _c, _h, _w;cudnnHandle_t& _handle;cudnnTensorDescriptor_t _input_desc;//输入(同时也是输出)张量的描述子cudnnActivationDescriptor_t _act_desc;//激活描述子};
class MaxPool2D:public Layer{
public:MaxPool2D(cudnnHandle_t &handle,int n,int c,int h,int w,int ph,int pw,int padding,int stride):_handle(handle),_n(n),_c(c),_h(h),_w(w),_ph(ph),_pw(pw),_padding(padding),_stride(stride){//描述子初始化cudnnCreateTensorDescriptor(&_input_desc);cudnnCreateTensorDescriptor(&_output_desc);cudnnCreatePoolingDescriptor(&_pool_desc);//描述子设置cudnnSetPooling2dDescriptor(_pool_desc,CUDNN_POOLING_MAX, CUDNN_PROPAGATE_NAN, _ph,_pw,_padding,_padding,_stride,_stride);cudnnSetTensor4dDescriptor(_input_desc,CUDNN_TENSOR_NCHW,CUDNN_DATA_FLOAT,_n,_c,_h,_w);cudnnGetPooling2dForwardOutputDim(_pool_desc,_input_desc,&_n,&_c,&_out_h,&_out_w);cudnnSetTensor4dDescriptor(_output_desc,CUDNN_TENSOR_NCHW,CUDNN_DATA_FLOAT,_n,_c,_out_h,_out_w);//分配内存cudaMalloc(&_output,sizeof(float)*_n*_c*_out_h*_out_w);cudaMalloc(&_grad_input,sizeof(float)*_n*_c*_h*_w);}void forward(float *input) override{_input=input;const float alpha=1.0f,beta=0.0f;cudnnPoolingForward(_handle,_pool_desc,&alpha,_input_desc,_input,&beta,_output_desc,_output);}void backward(float *grad_output) override{const float alpha=1.0f,beta=0.0f;cudnnPoolingBackward(_handle,_pool_desc,&alpha,_output_desc,_output,_output_desc,grad_output,_input_desc,_input,&beta,_input_desc,_grad_input);}float *get_output()override{return _output;}float *get_grad_input()override {return _grad_input;}float* get_w_grad(){return nullptr;}float* get_b_grad(){return nullptr;}int get_w_grad_size(){return 0;}int get_b_grad_size(){return 0;}void update(float lr)override{}~MaxPool2D(){cudaFree(_output);cudaFree(_grad_input);cudnnDestroyTensorDescriptor(_input_desc);cudnnDestroyTensorDescriptor(_output_desc);cudnnDestroyPoolingDescriptor(_pool_desc);}
private:int _n,_c,_h,_w,_ph,_pw,_padding,_stride;//输入维度,池化的padding和strideint _out_h,_out_w;//输出维度float *_input,*_output,*_grad_input;cudnnHandle_t &_handle;cudnnTensorDescriptor_t _input_desc,_output_desc;//输入输出张量描述子cudnnPoolingDescriptor_t _pool_desc;//池化描述子
};
class Linear : public Layer {
public:Linear(cublasHandle_t &handle_, int batch_, int in_f, int out_f);void forward(float* input_) override {//input: [batch, in_features]// weight: [in_features, out_features]// bias: [out_features]// output: [batch, out_features]input = input_;const float alpha = 1.0f, beta = 0.0f;// output = input * weightcublasSgemm(handle, CUBLAS_OP_N, CUBLAS_OP_N,out_features, batch, in_features,&alpha,weight, out_features,input, in_features,&beta,output, out_features);// add bias:const float beta2 = 1.0f;for(int i = 0; i < batch; i++)cublasSaxpy(handle, out_features, &alpha, bias, 1, output + i*out_features, 1);}void backward(float* grad_output) override {//input [batch, in_features]//grad_output [batch, out_features]//grad_weight [in_features, out_features]const float alpha = 1.0f, beta = 0.0f;// grad_weight = input^T * grad_outputcublasSgemm(handle, CUBLAS_OP_N, CUBLAS_OP_T,out_features,in_features, batch,&alpha,grad_output, out_features,input, in_features,&beta,grad_weight, out_features);// weight: [in_features, out_features]//grad_output [batch, out_features]//grad_input [batch, in_features]// grad_input = grad_output * weight^TcublasSgemm(handle, CUBLAS_OP_T, CUBLAS_OP_N,in_features, batch, out_features,&alpha,weight, out_features,grad_output, out_features,&beta,grad_input, in_features);// grad_bias = grad_output^T * onescublasSgemv(handle, CUBLAS_OP_N,batch, out_features,&alpha,grad_output, out_features,ones, 1,&beta,grad_bias, 1);}float* get_output() override { return output; }float* get_grad_input() override { return grad_input; }float* get_w_grad(){return grad_weight;}float* get_b_grad(){return grad_bias;}int get_w_grad_size(){return in_features * out_features;}int get_b_grad_size(){return out_features;}void update(float lr) override {const float alpha = -lr;// weight -= lr * grad_weightcublasSaxpy(handle, in_features*out_features, &alpha, grad_weight, 1, weight, 1);// bias -= lr * grad_biascublasSaxpy(handle, out_features, &alpha, grad_bias, 1, bias, 1);}~Linear() {cudaFree(weight);cudaFree(bias);cudaFree(grad_weight);cudaFree(grad_bias);cudaFree(output);cudaFree(grad_input);cudaFree(ones);}private:cublasHandle_t &handle;int in_features, out_features, batch;float *weight, *bias;float *grad_weight, *grad_bias;float *input, *output, *grad_input;float *ones;};
class LeNet:public Layer{
public:LeNet(cublasHandle_t &cublas_,cudnnHandle_t &cudnn_,int batch_):cublas(cublas_),cudnn(cudnn_),batch(batch_){layers.emplace_back(std::make_shared<Conv2D>(cudnn,batch,1,6,28,28,5));layers.emplace_back(std::make_shared<ReLU>(cudnn,batch,6,24,24));layers.emplace_back(std::make_shared<MaxPool2D>(cudnn,batch,6,24,24,2,2,0,2));layers.emplace_back(std::make_shared<Conv2D>(cudnn,batch,6,16,12,12,5));layers.emplace_back(std::make_shared<ReLU>(cudnn,batch,16,8,8));layers.emplace_back(std::make_shared<MaxPool2D>(cudnn,batch,16,8,8,2,2,0,2));layers.emplace_back(std::make_shared<Linear>(cublas,batch,16*4*4,120));layers.emplace_back(std::make_shared<ReLU>(cudnn,batch,120,1,1));layers.emplace_back(std::make_shared<Linear>(cublas,batch,120,84));layers.emplace_back(std::make_shared<ReLU>(cudnn,batch,84,1,1));layers.emplace_back(std::make_shared<Linear>(cublas,batch,84,10));cudaMalloc(&output,batch*10*sizeof(float));cudaMalloc(&grad_input,batch*1*28*28*sizeof(float));}void forward(float *input_)override{input=input_;for(const auto &l:layers){l->forward(input);input=l->get_output();}cudaMemcpy(output,input,sizeof(float)*batch*10,cudaMemcpyDeviceToDevice);}void backward(float *grad_output)override{float* grad = grad_output;for(int i=layers.size()-1;i>=0;i--){layers[i]->backward(grad);grad = layers[i]->get_grad_input();}cudaMemcpy(grad_input,grad,sizeof(float)*batch*1*28*28,cudaMemcpyDeviceToDevice);}float* get_output() override { return output; }float* get_grad_input() override { return grad_input; }float* get_w_grad(){return nullptr;}float* get_b_grad(){return nullptr;}int get_w_grad_size(){return 0;}int get_b_grad_size(){return 0;}void allreduce_grads(ncclComm_t comm,int num_gpus,cudaStream_t stream){//专门用来同步分布训练梯度的函数for(const auto &layer:layers){int w_size=layer->get_w_grad_size();if(w_size){//有参数的梯度float *w_grad=layer->get_w_grad();//这是获取已经算好的梯度ncclAllReduce(w_grad,w_grad,w_size,ncclFloat,ncclSum,comm,stream);//归一化scale_kernel<<<(w_size+255)/256,256>>>(w_grad,w_size,1.0/num_gpus);}int b_size=layer->get_b_grad_size();if(b_size){float *b_grad=layer->get_b_grad();//这是获取已经算好的梯度ncclAllReduce(b_grad,b_grad,b_size,ncclFloat,ncclSum,comm,stream);//归一化scale_kernel<<<(b_size+255)/256,256>>>(b_grad,b_size,1.0/num_gpus);}}cudaStreamSynchronize(stream);}void update(float lr){for(const auto &l:layers){l->update(lr);}}~LeNet(){cudaFree(output);cudaFree(grad_input);}
private:cublasHandle_t &cublas;cudnnHandle_t &cudnn;int batch;float *input,*output,*grad_input;std::vector<std::shared_ptr<Layer>> layers;
};

Layers.cu

#include"Layers.hpp"
Conv2D::Conv2D(cudnnHandle_t &handle,int batch,int in_channels,int out_channels,int in_h,int in_w,int kernel_size,int stride,int padding):_handle(handle),_batch(batch), _in_channels(in_channels), _out_channels(out_channels), _in_h(in_h), _in_w(in_w), _kernel_size(kernel_size), _stride(stride), _padding(padding){//初始化描述符(cudnnCreateTensorDescriptor(&_input_desc));(cudnnCreateTensorDescriptor(&_output_desc));(cudnnCreateTensorDescriptor(&_bias_desc));(cudnnCreateFilterDescriptor(&_filter_desc));(cudnnCreateConvolutionDescriptor(&_conv_desc));//设置描述符(cudnnSetTensor4dDescriptor(_input_desc, CUDNN_TENSOR_NCHW, CUDNN_DATA_FLOAT, _batch, _in_channels, _in_h, _in_w));(cudnnSetTensor4dDescriptor(_bias_desc, CUDNN_TENSOR_NCHW, CUDNN_DATA_FLOAT, 1, out_channels, 1, 1));(cudnnSetFilter4dDescriptor(_filter_desc,CUDNN_DATA_FLOAT,CUDNN_TENSOR_NCHW,_out_channels,_in_channels,_kernel_size,_kernel_size));(cudnnSetConvolution2dDescriptor(_conv_desc, padding, padding, stride, stride, 1, 1, CUDNN_CROSS_CORRELATION, CUDNN_DATA_FLOAT));(cudnnGetConvolution2dForwardOutputDim(_conv_desc,_input_desc,_filter_desc,&_batch,&_out_channels,&_out_h,&_out_w));//获得输出维度(cudnnSetTensor4dDescriptor(_output_desc, CUDNN_TENSOR_NCHW, CUDNN_DATA_FLOAT, _batch, _out_channels, _out_h, _out_w));//分配GPU内存(cudaMalloc(&_weight,sizeof(float)*_out_channels* _in_channels*_kernel_size*_kernel_size));(cudaMalloc(&_bias, sizeof(float) * _out_channels));(cudaMalloc(&_grad_weight, sizeof(float) * _out_channels * _in_channels * _kernel_size * _kernel_size));//参数的梯度维度和参数是一样的(cudaMalloc(&_grad_bias, sizeof(float) * _out_channels));(cudaMalloc(&_output, sizeof(float) * _batch * _out_channels * _out_h * _out_w));(cudaMalloc(&_grad_input, sizeof(float) * _batch * _in_channels * _in_h * _in_w));//初始化参数int w_size = _out_channels * _in_channels * _kernel_size * _kernel_size;int b_size = _out_channels;init_uniform << <(w_size + 255) / 256, 256 >> > (_weight, w_size,1, -0.05f, 0.05f);//先同意设置为1init_uniform << <(b_size + 255) / 256, 256 >> > (_bias, b_size,1, -0.05f, 0.05f);zero_kernel << <(w_size + 255) / 256, 256 >> > (_grad_weight, w_size);zero_kernel << <(_batch * _out_channels * _out_h * _out_w + 255) / 256, 256 >> > (_output, _batch * _out_channels * _out_h * _out_w);zero_kernel << <(_batch * _in_channels * _in_h * _in_w + 255) / 256, 256 >> > (_grad_input, _batch * _in_channels * _in_h * _in_w);cudaDeviceSynchronize();//TODO:检测结果的事情后面再说吧//获取算法cudnnConvolutionFwdAlgoPerf_t fwdPerf[8]; int retFwd = 0;cudnnGetConvolutionForwardAlgorithm_v7(_handle,_input_desc, _filter_desc,_conv_desc,_output_desc,8,&retFwd,fwdPerf);_fwd_algo = fwdPerf[0].algo;cudnnConvolutionBwdFilterAlgoPerf_t bwdFiltPerf[8]; int retBF = 0;cudnnConvolutionBwdDataAlgoPerf_t   bwdDataPerf[8]; int retBD = 0;cudnnGetConvolutionBackwardFilterAlgorithm_v7(_handle, _input_desc, _output_desc, _conv_desc, _filter_desc, 8, &retBF, bwdFiltPerf);_bwd_filter_algo = bwdFiltPerf[0].algo;cudnnGetConvolutionBackwardDataAlgorithm_v7(_handle, _filter_desc, _output_desc, _conv_desc, _input_desc, 8, &retBD, bwdDataPerf);_bwd_data_algo = bwdDataPerf[0].algo;//工作空间的内存分配cudnnGetConvolutionForwardWorkspaceSize(_handle, _input_desc, _filter_desc, _conv_desc, _output_desc, _fwd_algo, &_fwd_ws_size);cudnnGetConvolutionBackwardFilterWorkspaceSize(_handle, _input_desc, _output_desc, _conv_desc, _filter_desc, _bwd_filter_algo, &_bwd_filter_ws_size);cudnnGetConvolutionBackwardDataWorkspaceSize(_handle, _filter_desc, _output_desc, _conv_desc, _input_desc, _bwd_data_algo, &_bwd_data_ws_size);if (_fwd_ws_size > 0) cudaMalloc(&_fwd_ws, _fwd_ws_size); else _fwd_ws = nullptr;if (_bwd_filter_ws_size > 0) cudaMalloc(&_bwd_filter_ws, _bwd_filter_ws_size); else _bwd_filter_ws = nullptr;if (_bwd_data_ws_size > 0) cudaMalloc(&_bwd_data_ws, _bwd_data_ws_size); else _bwd_data_ws = nullptr;}void Conv2D::update(float lr)  {int w_size = _out_channels * _in_channels * _kernel_size * _kernel_size;int b_size = _out_channels;// 简单 SGDsgd_update << <(w_size + 255) / 256, 256 >> > (_weight,_grad_weight,lr,w_size);sgd_update << <(b_size + 255) / 256, 256 >> > (_bias, _grad_bias, lr, b_size);cudaDeviceSynchronize();
}
Linear::Linear(cublasHandle_t &handle_, int batch_, int in_f, int out_f): handle(handle_), batch(batch_), in_features(in_f), out_features(out_f) {// 参数和输出内存cudaMalloc(&weight, sizeof(float) * in_features * out_features);cudaMalloc(&bias, sizeof(float) * out_features);cudaMalloc(&grad_weight, sizeof(float) * in_features * out_features);cudaMalloc(&grad_bias, sizeof(float) * out_features);cudaMalloc(&output, sizeof(float) * batch * out_features);cudaMalloc(&grad_input, sizeof(float) * batch * in_features);cudaMalloc(&ones, sizeof(float) * batch);//初始化参数init_uniform << <(in_features*out_features + 255) / 256, 256 >> > (weight, in_features*out_features,1, -0.05f, 0.05f);//先同意设置为1init_uniform << <(out_features + 255) / 256, 256 >> > (bias, out_features,1, -0.05f, 0.05f);// 初始化 ones 向量init_ones_kernel<<<(batch + 255)/256, 256>>>(ones, batch);cudaDeviceSynchronize();}

main.cu

#include <cuda_runtime.h>
#include <cublas_v2.h>
#include <iostream>
#include<vector>
#include<thread>
#include<nccl.h>
#include"Layers.hpp"
#include"kernels.hpp"
#include"Dataloaders.hpp"
void checkCuda(cudaError_t res) {if (res != cudaSuccess) {std::cerr << "CUDA Error: " << cudaGetErrorString(res) << std::endl;exit(EXIT_FAILURE);}
}void checkNCCL(ncclResult_t res) {if (res != ncclSuccess) {std::cerr << "NCCL Error: " << ncclGetErrorString(res) << std::endl;exit(EXIT_FAILURE);}
}
const int nDev=2;void train(int epochs,int batch_size,float lr,LeNet &net,ncclComm_t comm,int rank,cudaStream_t stream,MNISTDataset &Dataset){float *d_inputs;unsigned *d_labels;//输出的预测概率,softmax后的结果,损失,预测概率的梯度(作为LeNet反向传播的输入)float *d_logits,* d_prob,*loss,*dlogits;int num_classes=10;cudaMalloc(&d_inputs,batch_size*28*28*sizeof(float));cudaMalloc(&d_labels,batch_size*sizeof(unsigned));cudaMalloc(&d_logits,batch_size*num_classes*sizeof(float));cudaMalloc(&d_prob,batch_size*num_classes*sizeof(float));cudaMalloc(&loss,batch_size*sizeof(float));cudaMalloc(&dlogits,batch_size*num_classes*sizeof(float));std::vector<float> h_loss(batch_size);for(int epoch=0;epoch<epochs;epoch++){auto batch=Dataset.next_batch(batch_size);std::vector<unsigned> h_labels(batch_size);for(int batch_idx=0;batch_idx<batch_size;batch_idx++){cudaMemcpy(d_inputs+batch_idx*28*28,batch[batch_idx].image.data(),28*28*sizeof(float),cudaMemcpyHostToDevice);h_labels[batch_idx] = batch[batch_idx].label;}cudaMemcpy(d_labels, h_labels.data(),batch_size*sizeof(unsigned), cudaMemcpyHostToDevice);net.forward(d_inputs);cudaMemcpy(d_logits,net.get_output(),batch_size*num_classes*sizeof(float),cudaMemcpyDeviceToDevice);softmax_forward_loss_batch<<<batch_size,1>>>(d_logits,d_labels,d_prob,loss,batch_size,num_classes);softmax_ce_backward_batch<<<batch_size,num_classes>>>(d_prob,d_labels,dlogits,batch_size,num_classes);net.backward(dlogits);net.allreduce_grads(comm,nDev,stream);cudaStreamSynchronize(stream);net.update(lr);// ---- 打印中间输出 ----cudaMemcpy(h_loss.data(), loss, batch_size*sizeof(float), cudaMemcpyDeviceToHost);float avg_loss = 0.f;for (float l : h_loss) avg_loss += l;avg_loss /= batch_size;if (epoch % 1 == 0) { // 每个 epoch 打印一次std::cout << "[Rank " << rank << "] Epoch " << epoch << " Avg Loss = " << avg_loss << " (first label=" << h_labels[0] << ")" << std::endl;}}cudaFree(d_inputs);cudaFree(d_labels);cudaFree(d_logits);cudaFree(d_prob);cudaFree(loss);cudaFree(dlogits);
}void thread_func(int rank,const ncclUniqueId &id,MNISTDataset &Dataset){int dev=rank;cudaSetDevice(dev);// 创建流cudaStream_t stream;checkCuda(cudaStreamCreate(&stream));ncclComm_t comm;checkNCCL(ncclCommInitRank(&comm,nDev,id,rank));cublasHandle_t cublas;cudnnHandle_t cudnn;cublasCreate(&cublas);//句柄是与当前上下文绑定的,不能一个句柄执行在不同GPUcudnnCreate(&cudnn);int batch_size=8;float lr=0.1f;LeNet LeNet_net(cublas,cudnn,batch_size);train(10,batch_size,lr,LeNet_net,comm,rank,stream,Dataset);cublasDestroy(cublas);cudnnDestroy(cudnn);ncclCommDestroy(comm);cudaStreamDestroy(stream);
}
int main() {// 获取 UniqueId (多进程时 rank0 生成,广播给其他进程)ncclUniqueId id;checkNCCL(ncclGetUniqueId(&id));MNISTDataset Datasets[2]={MNISTDataset{"/home/huangxy/Projects/cudaLearning/train-images-idx3-ubyte", "/home/huangxy/Projects/cudaLearning/train-labels-idx1-ubyte"},MNISTDataset{"/home/huangxy/Projects/cudaLearning/train-images-idx3-ubyte", "/home/huangxy/Projects/cudaLearning/train-labels-idx1-ubyte"}};Datasets[0].shuffle(123);Datasets[1].shuffle(456);// 启动两个线程,模拟两个进程std::vector<std::thread> threads;for (int rank = 0; rank < nDev; rank++) {threads.emplace_back(thread_func, rank, std::ref(id),std::ref(Datasets[rank]));}for (auto& t : threads) t.join();
}

执行结果

输出缓冲有覆盖的情况,这是多线程的正常现象

[Rank 0] Epoch 0 Avg Loss = 2.31589 (first label=3)
[Rank 1] Epoch 0 Avg Loss = 2.30583 (first label=4)
[Rank 0] Epoch 1 Avg Loss = 2.31319 (first label=6)
[Rank 1] Epoch 1 Avg Loss = 2.31365 (first label=0)
[Rank 0] Epoch 2 Avg Loss = 2.32392 (first label=3)
[Rank 1] Epoch 2 Avg Loss = 2.27455 (first label=6)
[Rank [Rank 0] Epoch 13] Epoch  Avg Loss = 3 Avg Loss = 2.32893 (first label=3)
2.32235 (first label=3)
[Rank 1] Epoch 4 Avg Loss = 2.31996 (first label=3)
[Rank 0] Epoch 4 Avg Loss = 2.35508 (first label=8)
[Rank 1] Epoch 5 Avg Loss = [Rank 0] Epoch 5 Avg Loss = 2.30198 (first label=5)
2.29931 (first label=1)
[Rank 1] Epoch 6 Avg Loss = 2.27989 (first label=6)
[Rank 0] Epoch 6 Avg Loss = 2.33464 (first label=5)
[Rank 1] Epoch 7 Avg Loss = 2.30244 (first label=2)
[Rank 0] Epoch 7 Avg Loss = 2.2739 (first label=9)
[Rank 1] Epoch 8 Avg Loss = [Rank 0] Epoch 8 Avg Loss = 2.22131 (first label=3)
2.26422 (first label=1)
[Rank [Rank 0] Epoch 9 Avg Loss = 2.27249 (first label=1)
1] Epoch 9 Avg Loss = 2.38588 (first label=7)


文章转载自:

http://P84kLgOj.mnrqq.cn
http://2aR1cNiv.mnrqq.cn
http://MY65lg5h.mnrqq.cn
http://B5XgZIDB.mnrqq.cn
http://sNK4qeOV.mnrqq.cn
http://AxTdm97D.mnrqq.cn
http://lR4Vamhh.mnrqq.cn
http://6Hmrjrnt.mnrqq.cn
http://rIFZA0AY.mnrqq.cn
http://jBg0oUz3.mnrqq.cn
http://ny7cR5dF.mnrqq.cn
http://mmM6OiiW.mnrqq.cn
http://N07IUBro.mnrqq.cn
http://ZIrailrF.mnrqq.cn
http://17jFEnnl.mnrqq.cn
http://NJal964l.mnrqq.cn
http://FSi5c3hv.mnrqq.cn
http://H319Ntb5.mnrqq.cn
http://EergjhwU.mnrqq.cn
http://mum8uerW.mnrqq.cn
http://WL6sLuij.mnrqq.cn
http://iqkARvij.mnrqq.cn
http://oegP8qD1.mnrqq.cn
http://Iu6Qu9Ul.mnrqq.cn
http://JE4IBOJ2.mnrqq.cn
http://mVy4VHhp.mnrqq.cn
http://ICDDOkR4.mnrqq.cn
http://5QNtx9dC.mnrqq.cn
http://5NF2SYaB.mnrqq.cn
http://OVqfACxJ.mnrqq.cn
http://www.dtcms.com/a/376915.html

相关文章:

  • Android Studio开发环境配置
  • 【springboot+vue3】博客论坛管理系统(源码+文档+调试+基础修改+答疑)
  • 中台的万象
  • 从Grok 4多智能体协同到RAG范式革命:2025年AI工作流的技术重构
  • pythonFlask 使用 SQLAlchemy 的连接池
  • 【系统架构设计(25)】Web应用服务器与现代架构
  • minikube 的 kubernetes 入门教程-Nginx Proxy Manager
  • ‌Git Bisect 二分查找定位错误总结
  • 基于大数据挖掘的药品不良反应知识整合与利用研究
  • Git 命令教程
  • springboot synchronized 本地锁入门与实战
  • 【竞赛系列】机器学习实操项目08——全球城市计算AI挑战赛(数据可视化分析)
  • Nginx 实战系列(八)—— Nginx SSL/TLS 配置指南
  • Python函数详解及*args、**kwargs用法
  • 零基础3个月上岸[特殊字符]自学数据分析路线
  • Java多线程(一)
  • pyspark读取hive表中数据后进行lgb建模
  • LeetCode 热题 42.接雨水(双指针写法)
  • 带你走进vue的响应式底层
  • 【算法--链表】117.填充每个节点的下一个右侧节点指针Ⅱ--通俗讲解
  • BFS与FloodFill算法简介与实战
  • 闭包面试题
  • el-table表头做过滤
  • LaTeX 中给单个/部分参考文献标记颜色(BibTeX 文献引用)
  • 深入探讨讲解MOS管工作原理-ASIM阿赛姆
  • 环境变量_进程地址空间
  • 文档抽取技术:革新合同管理,提升效率、准确性和智能化水平
  • 关于CSDN中图片无法粘贴的问题解决办法
  • 初始python
  • webshell上传方式