C++线程池----基于生产者消费者模式队列实现
目录
1 引言
2 大体代码流程
2.1 自己实现的类用于线程同步的条件变量和互斥锁
2.2 初始化并且创建线程池
2.3 detect推理时流程
1 引言
简单记录下一个基于生产者消费者模式队列实现的C++线程池。
2 大体代码流程
2.1 自己实现的类用于线程同步的条件变量和互斥锁
class WaitGroup:no_copy{std::condition_variable m_cv;std::mutex m_mtx;std::atomic<int> m_ticker;
public:WaitGroup() :m_ticker(0) {}explicit WaitGroup(int init_val) :m_ticker(init_val) {}void add(int v) {m_ticker += v;}void done() {if (--m_ticker == 0) {m_cv.notify_one();}}void wait() {if (m_ticker > 0) {std::unique_lock<std::mutex> lck(m_mtx);m_cv.wait(lck, [&] { return m_ticker <= 0; });}}};
这个类其实就是条件变量和互斥锁,然后初始化的时候会用这个类进行同步然后控制所有的初始化都完成,
2.2 初始化并且创建线程池
在mainc.cpp中调用的时候有
vsscfldwp::load_mod_cfg_json(mod_json_template.data());
vsscfldwp::wait_init_done();
在load_modules_cfg_async函数中最开始有个, g_wg.add(static_cast<int>(vgpus.size()));也就是一共用了几个gpu,然后这个load函数
void load_modules_cfg_async(const string& json) {using namespace Rjson;Doc doc = Doc::loads(json);string base_dir = doc.rv()["base_dir"].get_str();auto arr = doc.rv()["mods"];int n = arr.size();for (int i = 0; i < n; i++) {auto obj = arr[i];string mod = obj["name"].get_str();string type = obj["type"].get_str();vector<int> vgpus;obj["gpus"].get_arr(vgpus);g_wg.add(static_cast<int>(vgpus.size()));if (type == "cafl") {
#ifdef USE_CAFLstring caffmodp = obj["modpath"].get_str();string protpath = obj["protpath"].get_str();auto tp = make_shared<CaffeTaskPool>();tp->p.init_pool(4, vgpus, [base_dir, caffmodp, protpath, tp](CnnNetRefp& net, int gpuid){cafl::set_mode_device(gpuid);{lock_guard<mutex> lck(g_mtx);net = cafl::Net::load(base_dir + protpath, base_dir + caffmodp);if (tp->m_initor.fetch_add(1) == 0) {int ninp = net->num_inputs();tp->m_inpDims.resize(ninp);for (int i = 0; i < ninp; i++) {auto input_layer = net->get_input_blob(i);LayerDims& ld = tp->m_inpDims[i];ld.num = input_layer->num();ld.channels = input_layer->channels();ld.width = input_layer->width();ld.height = input_layer->height();}int noup = net->num_outputs();tp->m_outDims.resize(noup);for (int i = 0; i < noup; i++) {auto outputlayer = net->get_output_blob(i);LayerDims& ld = tp->m_outDims[i];ld.num = outputlayer->num();ld.channels = outputlayer->channels();ld.width = outputlayer->width();ld.height = outputlayer->height();}}}g_wg.done();});bool inst_ok = g_mpools.insert(make_pair(mod, tp)).second;assert(inst_ok);
#endif}
#ifdef USE_MLU
#ifdef USE_CAFLelse if (type == "cafl-mlu") {
#ifdef __aarch64__printf("MLU_ARM��֧��caffe���ܣ����˳��������������ļ����ͣ�\n");exit(0);
#endifstring caffmodp = obj["modpath"].get_str();string protpath = obj["protpath"].get_str();string mlu_type = obj["mlu_type"].get_str();string mlu_mode = obj["mlu_mode"].get_str();auto tp = make_shared<CaffeTaskPool>();tp->p.init_pool(4, vgpus, [base_dir, caffmodp, protpath, tp, mlu_type, mlu_mode](CnnNetRefp& net, int mluid){cafl::set_mlu_type_mode(mlu_type.data(), mlu_mode.data());cafl::set_mode_device(mluid);{lock_guard<mutex> lck(g_mtx);net = cafl::Net::load(base_dir + protpath, base_dir + caffmodp);if (tp->m_initor.fetch_add(1) == 0) {int ninp = net->num_inputs();tp->m_inpDims.resize(ninp);for (int i = 0; i < ninp; i++) {auto input_layer = net->get_input_blob(i);LayerDims& ld = tp->m_inpDims[i];ld.num = input_layer->num();ld.channels = input_layer->channels();ld.width = input_layer->width();ld.height = input_layer->height();}int noup = net->num_outputs();tp->m_outDims.resize(noup);for (int i = 0; i < noup; i++) {auto outputlayer = net->get_output_blob(i);LayerDims& ld = tp->m_outDims[i];ld.num = outputlayer->num();ld.channels = outputlayer->channels();ld.width = outputlayer->width();ld.height = outputlayer->height();}}net->net_forward();}g_wg.done();});bool inst_ok = g_mpools.insert(make_pair(mod, tp)).second;assert(inst_ok);}
#endif //end mlu caffeelse if (type == "cnrt") {string cnpath = obj["cam2path"].get_str();int batch_streams{};if (obj.has_val("batchstreams")) {batch_streams = obj["batchstreams"].get_int();}auto tp = make_shared<CnrtTaskPool>();auto init_func = [base_dir, cnpath, tp](CnrtNetRef& net_ref, int device_id){cnrt::ErrInfo ei{};cnrt::set_device(device_id, &ei);if (ei.code) {fmt::print("cnrt::set_device error with device id {} : {}\n", device_id, device_id, ei.errmsg);abort();}string cnfullpath = base_dir + cnpath;cnrt::cnrtnet_t* net = cnrt::load_net_from_file(cnfullpath.data(), device_id, &ei);if (!net) {fmt::print("cnrt::load_net_from_file({}) failed: {}\n", cnfullpath, ei.errmsg);abort();}net_ref = CnrtNetRef(net, [](cnrt::cnrtnet_t* n) { cnrt::release_net(n); });if (tp->m_initor.fetch_add(1) == 0) {int ninp = net_num_inputs(net);tp->m_inpDims.resize(ninp);tp->m_inpDtypes.resize(ninp);for (int i = 0; i < ninp; i++) {cnrt::LayerDims ldim{};net_input_layer_dims(net, i, &ldim);auto& ld = tp->m_inpDims[i];ld.num = ldim.n;ld.channels = ldim.c;ld.width = ldim.w;ld.height = ldim.h;int dtype = net_input_layer_data_type(net, i);tp->m_inpDtypes[i] = dtype;}int noup = net_num_outputs(net);tp->m_outDims.resize(noup);tp->m_outDtypes.resize(noup);for (int i = 0; i < noup; i++) {cnrt::LayerDims ldim{};net_output_layer_dims(net, i, &ldim);auto& ld = tp->m_outDims[i];ld.num = ldim.n;ld.channels = ldim.c;ld.width = ldim.w;ld.height = ldim.h;int dtype = net_output_layer_data_type(net, i);tp->m_outDtypes[i] = dtype;}tp->m_max_batch_size = net_max_batch_size(net);}g_wg.done();};if (batch_streams > 0) {using std::placeholders::_1;using std::placeholders::_2;using std::placeholders::_3;tp->bp.init_pool(batch_streams * 2, batch_streams, vgpus, init_func, std::bind(&CnrtTaskPool::batch_task_cb, tp, _1, _2, _3));tp->set_batch_streams(batch_streams);}else {tp->p.init_pool(4, vgpus, init_func);}bool inst_ok = g_mpools.insert(make_pair(mod, tp)).second;assert(inst_ok);}
#elif USE_MLU370else if (type == "magicmind") {string mm2path = obj["mm2path"].get_str();int batch_streams{};if (obj.has_val("batchstreams")) {batch_streams = obj["batchstreams"].get_int();}auto tp = make_shared<MmTaskPool>();auto init_func = [base_dir, mm2path, tp](MmNetRef& net_ref, int gpuid){using namespace mm;mm::ErrInfo ei{};mm::set_device(gpuid, &ei);if (ei.code) {fmt::print("mm::set_device error with gpuid {} : {}\n", gpuid, ei.errmsg);abort();}string te2fullpath = base_dir + mm2path;mm::mmnet_t* net = mm::load_net_from_file(te2fullpath.data(), &ei);if (!net) {fmt::print("mm::load_net_from_file({}) failed: {}\n", te2fullpath, ei.errmsg);abort();}net_ref = MmNetRef(net, [](mm::mmnet_t* n) { mm::release_net(n); });if (tp->m_initor.fetch_add(1) == 0) {int ninp = net_num_inputs(net);tp->m_inpDims.resize(ninp);for (int i = 0; i < ninp; i++) {mm::LayerDims ldim{};net_input_layer_dims(net, i, &ldim);auto& ld = tp->m_inpDims[i];ld.num = ldim.n;ld.channels = ldim.c;ld.width = ldim.w;ld.height = ldim.h;}int noup = net_num_outputs(net);tp->m_outDims.resize(noup);for (int i = 0; i < noup; i++) {mm::LayerDims ldim{};net_output_layer_dims(net, i, &ldim);auto& ld = tp->m_outDims[i];ld.num = ldim.n;ld.channels = ldim.c;ld.width = ldim.w;ld.height = ldim.h;}tp->m_max_batch_size = net_max_batch_size(net);}g_wg.done();};if (batch_streams > 0) {using std::placeholders::_1;using std::placeholders::_2;using std::placeholders::_3;tp->bp.init_pool(batch_streams * 2, batch_streams, vgpus, init_func, std::bind(&MmTaskPool::batch_task_cb, tp, _1, _2, _3));tp->set_batch_streams(batch_streams);}else {tp->p.init_pool(4, vgpus, init_func);}bool inst_ok = g_mpools.insert(make_pair(mod, tp)).second;assert(inst_ok);}
#elif USE_BMelse if (type == "bm") {string bmpath = obj["bm2path"].get_str();int batch_streams{};if (obj.has_val("batchstreams")) {batch_streams = obj["batchstreams"].get_int();}auto tp = make_shared<BmrtTaskPool>();auto init_func = [base_dir, bmpath, tp](BmrtNetRef& net_ref, int device_id){bmrt::ErrInfo ei{};string bmfullpath = base_dir + bmpath;bmrt::bmrtnet_t* net = bmrt::load_net_from_file(bmfullpath.data(), device_id, &ei);if (!net) {fmt::print("Bmrt::load_net_from_file({}) failed: {}\n", bmfullpath, ei.errmsg);abort();}net_ref = BmrtNetRef(net, [](bmrt::bmrtnet_t* n) { bmrt::release_net(n); });if (tp->m_initor.fetch_add(1) == 0) {int ninp = net_num_inputs(net);tp->m_inpDims.resize(ninp);tp->m_inpDtypes.resize(ninp);for (int i = 0; i < ninp; i++) {bmrt::LayerDims ldim{};net_input_layer_dims(net, i, &ldim);auto& ld = tp->m_inpDims[i];ld.num = ldim.n;ld.channels = ldim.c;ld.width = ldim.w;ld.height = ldim.h;int dtype = net_input_layer_data_type(net, i);tp->m_inpDtypes[i] = dtype;}int noup = net_num_outputs(net);tp->m_outDims.resize(noup);tp->m_outDtypes.resize(noup);for (int i = 0; i < noup; i++) {bmrt::LayerDims ldim{};net_output_layer_dims(net, i, &ldim);auto& ld = tp->m_outDims[i];ld.num = ldim.n;ld.channels = ldim.c;ld.width = ldim.w;ld.height = ldim.h;int dtype = net_output_layer_data_type(net, i);tp->m_outDtypes[i] = dtype;}tp->m_max_batch_size = net_max_batch_size(net);}g_wg.done();};if (batch_streams > 0) {using std::placeholders::_1;using std::placeholders::_2;using std::placeholders::_3;tp->bp.init_pool(batch_streams * 2, batch_streams, vgpus, init_func, std::bind(&BmrtTaskPool::batch_task_cb, tp, _1, _2, _3));tp->set_batch_streams(batch_streams);}else {tp->p.init_pool(4, vgpus, init_func);}tp->m_vgpus = vgpus;tp->m_modName = mod;bool inst_ok = g_mpools.insert(make_pair(mod, tp)).second;assert(inst_ok);}
#elif USE_ACLelse if (type == "acl") {string aclpath = obj["acl2path"].get_str();int batch_streams{};if (obj.has_val("batchstreams")) {batch_streams = obj["batchstreams"].get_int();}auto tp = make_shared<AclTaskPool>();auto init_func = [base_dir, aclpath, tp](AclNetRef& net_ref, int device_id){acl::ErrInfo ei{};acl::set_device(device_id, &ei);if (ei.code) {fmt::print("acl::set_device error with device id {} : {}\n", device_id, device_id, ei.errmsg);abort();}string aclfullpath = base_dir + aclpath;acl::aclnet_t* net = acl::load_net_from_file(aclfullpath.data(), device_id, &ei);if (!net) {fmt::print("acl::load_net_from_file({}) failed: {}\n", aclfullpath, ei.errmsg);abort();}net_ref = AclNetRef(net, [](acl::aclnet_t* n) { acl::release_net(n); });if (tp->m_initor.fetch_add(1) == 0) {int ninp = net_num_inputs(net);tp->m_inpDims.resize(ninp);tp->m_inpDtypes.resize(ninp);for (int i = 0; i < ninp; i++) {acl::LayerDims ldim{};net_input_layer_dims(net, i, &ldim);auto& ld = tp->m_inpDims[i];ld.num = ldim.n;ld.channels = ldim.c;ld.width = ldim.w;ld.height = ldim.h;int dtype = net_input_layer_data_type(net, i);tp->m_inpDtypes[i] = dtype;}int noup = net_num_outputs(net);tp->m_outDims.resize(noup);tp->m_outDtypes.resize(noup);for (int i = 0; i < noup; i++) {acl::LayerDims ldim{};net_output_layer_dims(net, i, &ldim);auto& ld = tp->m_outDims[i];ld.num = ldim.n;ld.channels = ldim.c;ld.width = ldim.w;ld.height = ldim.h;int dtype = net_output_layer_data_type(net, i);tp->m_outDtypes[i] = dtype;}tp->m_max_batch_size = net_max_batch_size(net);}g_wg.done();};if (batch_streams > 0) {using std::placeholders::_1;using std::placeholders::_2;using std::placeholders::_3;int que_cap = max(4*(int)vgpus.size(), batch_streams*4);tp->bp.init_pool(que_cap, batch_streams, vgpus, init_func, std::bind(&AclTaskPool::batch_task_cb, tp, _1, _2, _3));tp->set_batch_streams(batch_streams);}else {tp->p.init_pool(4*vgpus.size(), vgpus, init_func);}tp->m_vgpus = vgpus;tp->m_modName = mod;bool inst_ok = g_mpools.insert(make_pair(mod, tp)).second;assert(inst_ok);}
#elseelse if (type == "trt-caffe") {string te2path = obj["te2path"].get_str();int batch_streams{};if (obj.has_val("batchstreams")) {batch_streams = obj["batchstreams"].get_int();}auto tp = make_shared<TrtTaskPool>();auto init_func = [base_dir, te2path, tp](TrtNetRef& net_ref, int gpuid){using namespace trt;trt::ErrInfo ei{};trt::set_device(gpuid, &ei);if (ei.code) {fmt::print("trt::set_device error with gpuid {} : {}\n", gpuid, ei.errmsg);abort();}string te2fullpath = base_dir + te2path;trt::trtnet_t* net = trt::load_net_from_file(te2fullpath.data(), &ei);if (!net) {fmt::print("trt::load_net_from_file({}) failed: {}\n", te2fullpath, ei.errmsg);abort();}net_ref = TrtNetRef(net, [](trt::trtnet_t* n) { trt::release_net(n); });if (tp->m_initor.fetch_add(1) == 0) {int ninp = net_num_inputs(net);tp->m_inpDims.resize(ninp);for (int i = 0; i < ninp; i++) {trt::LayerDims ldim{};net_input_layer_dims(net, i, &ldim);auto& ld = tp->m_inpDims[i];ld.num = ldim.n;ld.channels = ldim.c;ld.width = ldim.w;ld.height = ldim.h;}int noup = net_num_outputs(net);tp->m_outDims.resize(noup);for (int i = 0; i < noup; i++) {trt::LayerDims ldim{};net_output_layer_dims(net, i, &ldim);auto& ld = tp->m_outDims[i];ld.num = ldim.n;ld.channels = ldim.c;ld.width = ldim.w;ld.height = ldim.h;}tp->m_max_batch_size = net_max_batch_size(net);}g_wg.done();};if (batch_streams > 0) {using std::placeholders::_1;using std::placeholders::_2;using std::placeholders::_3;tp->bp.init_pool(batch_streams * 2, batch_streams, vgpus, init_func, std::bind(&TrtTaskPool::batch_task_cb, tp, _1, _2, _3));tp->set_batch_streams(batch_streams);}else {tp->p.init_pool(4, vgpus, init_func);}bool inst_ok = g_mpools.insert(make_pair(mod, tp)).second;assert(inst_ok);}
#endifelse {fmt::print("load_modules_cfg_async failed: type {} not exist \n", type);abort();}}}
在最后初始化后有个g_wg.done(); 这其实就是执行
void done() {if (--m_ticker == 0) {m_cv.notify_one();}}
每次都减一,当减到0之后就唤醒,
然后wait_modules_init_done就是等待唤醒的,
void wait_modules_init_done(){g_wg.wait();}
然后还可以看到调用了init_pool函数。
int init_pool(uint32_t cap, const std::vector<int>& gpu_lst, const InitType& inst_init_cb) {if (m_que) return 1;size_t ng = gpu_lst.size();if (ng == 0) return 10;auto q = std::make_shared<tbb::concurrent_bounded_queue<TaskType> >();m_que = q;m_que->set_capacity(cap);for (size_t i = 0; i < ng; i++) {int gpuid = gpu_lst[i];std::thread([q, gpuid, i, inst_init_cb]() {GpuInstType _init_obj;inst_init_cb(_init_obj, gpuid);for (TaskType t;;) {q->pop(t);t(_init_obj, (uint32_t)i);}}).detach();}return 0;}
然后它使用的是线程安全队列std::make_shared<tbb::concurrent_bounded_queue<TaskType> >();,支持多生产者多消费者模式。相当于内部内置了条件变量和互斥锁,他的push和pop是线程安全的,用户可以直接用push和pop,如果队列空那么pop自动等待,如果队列满那么push自动等待。
然后在这里每个线程是先执行初始化函数 inst_init_cb(_init_obj, gpuid),然后就是for循环,一直在这里等待pop然后处理任务,有任务就pop出来处理,没任务就等待。
2.3 detect推理时流程
然后detect处理函数一层层进去看,会发现
virtual void sync_batch_process_rawC(const LayerDataRaw* inplayers, size_t nLayers, const char* outlayer_str, size_t nol,LayerDataC* outdata) {if (m_batchStreams > 0) {auto t = std::make_shared<TrtBatchTask>();t->input_layers_ = inplayers;t->input_layers_num_ = nLayers;t->output_layer_str_ = outlayer_str;t->output_layers_num_ = nol;t->out_data_ = outdata;bp.push_task(t);t->nt_.wait();if (t->e_.what()[0]) {for (int i = 0; i < nol; ++i) {if (outdata[i].data != nullptr)delete[](float*)(outdata[i].data);}throw t->e_;}}else {SimpleNotifier nt;Exception _e;vector<trt::NetInoutLayerData> trtInputLayers(nLayers);int nBatch = inplayers[0].dims.num;for (size_t i = 0; i < nLayers; i++) {auto& il = inplayers[i];auto iid = initial_input_dim(il.idx);if (iid.channels != il.dims.channels || iid.width != il.dims.width || iid.height != il.dims.height) {throw Exception(fmt::format("parse {} input-layer failed! Trt cannot support layer shape change during runtime!", il.idx));}trtInputLayers[i].data = il.data;trtInputLayers[i].layer_idx = il.idx;trtInputLayers[i].size = il.dims.num * il.dims.channels * il.dims.width * il.dims.height * sizeof(float);}vector<trt::NetInoutLayerData> vOutData(nol, trt::NetInoutLayerData{});p.push_task([&nt, &_e, &trtInputLayers, outlayer_str, nBatch, nol, &vOutData, outdata](TrtNetRef& _netp, uint32_t) {//set input datastrt::trtnet_t* net = _netp.get();try{trt_net_get_outputs2(net, outlayer_str, nol, nBatch, vOutData.data(), outdata);trt::ErrInfo ei{};std::chrono::high_resolution_clock::time_point tnow = std::chrono::high_resolution_clock::now();if (trt::net_do_inference(net, nBatch, trtInputLayers.data(), trtInputLayers.size(), vOutData.data(), vOutData.size(), &ei)){throw Exception(fmt::format("trt::net_do_inference failed: {}", ei.errmsg));}std::chrono::high_resolution_clock::time_point tpost = std::chrono::high_resolution_clock::now();// std::cout << "************* trt::net_do_inference is : " << std::chrono::duration_cast<std::chrono::duration<double>>(tpost - tnow).count() * 1000 << " ms" << std::endl;}catch (Exception& e) {_e = e;}nt.wake();});nt.wait();if (_e.what()[0]) {for (auto& i : vOutData) {delete[](float*)(i.data);}throw _e;}}}
这里面重点看这个lambda表达式以及后面的内容
p.push_task([&nt, &_e, &trtInputLayers, outlayer_str, nBatch, nol, &vOutData, outdata](TrtNetRef& _netp, uint32_t) {//set input datastrt::trtnet_t* net = _netp.get();try{trt_net_get_outputs2(net, outlayer_str, nol, nBatch, vOutData.data(), outdata);trt::ErrInfo ei{};std::chrono::high_resolution_clock::time_point tnow = std::chrono::high_resolution_clock::now();if (trt::net_do_inference(net, nBatch, trtInputLayers.data(), trtInputLayers.size(), vOutData.data(), vOutData.size(), &ei)){throw Exception(fmt::format("trt::net_do_inference failed: {}", ei.errmsg));}std::chrono::high_resolution_clock::time_point tpost = std::chrono::high_resolution_clock::now();// std::cout << "************* trt::net_do_inference is : " << std::chrono::duration_cast<std::chrono::duration<double>>(tpost - tnow).count() * 1000 << " ms" << std::endl;}catch (Exception& e) {_e = e;}nt.wake();});nt.wait();if (_e.what()[0]) {for (auto& i : vOutData) {delete[](float*)(i.data);}throw _e;}
这里的nt变量在前面有定义SimpleNotifier nt; 这个类的是
class SimpleNotifier :no_copy{long _wk;
public:SimpleNotifier() :_wk(0) {}void wait();void wake();
};class SimpleNotifier : private NonCopyable {std::atomic_bool flag_{};std::mutex mtx_;std::condition_variable cv_;public:void wait() {std::unique_lock<std::mutex>lk(mtx_);cv_.wait(lk, [&](){return flag_.load(); });}void wake() {{std::lock_guard<std::mutex>lk(mtx_);flag_.store(true);}cv_.notify_one();}};
那么在sync_batch_process_rawC函数会调用nt.wait();,而在lambda那个表达式也就是task任务中有一个nt.wake();那么也就是当线程池中的这个任务被pop出来并且执行完之后,就会wake,那么这里等待条件也就满足了,可以往下执行了。