当前位置: 首页 > news >正文

C++并发编程-23. 线程间切分任务的方法

按数量划分

在这里插入图片描述

按递归划分

在这里插入图片描述

在这里插入图片描述

#include <thread>
#include <list>
#include "thread_safe_stack.h"
#include <future>
#include <memory>
template<typename T>
struct sorter  //1  
{struct chunk_to_sort{std::list<T> data;std::promise<std::list<T> > promise;};thread_safe_stack<chunk_to_sort> chunks;    //⇽-- - 2std::vector<std::thread> threads;   // ⇽-- - 3unsigned const max_thread_count;std::atomic<bool> end_of_data;sorter() :max_thread_count(std::thread::hardware_concurrency() - 1),end_of_data(false){}~sorter()    //⇽-- - 4{end_of_data = true;     //⇽-- - 5for (unsigned i = 0; i < threads.size(); ++i){threads[i].join();    //⇽-- - 6}}void try_sort_chunk(){std::shared_ptr<chunk_to_sort> chunk = chunks.try_pop();    //⇽-- - 7if (chunk){sort_chunk(chunk);    //⇽-- - 8}}std::list<T> do_sort(std::list<T>& chunk_data)    //⇽-- - 9{if (chunk_data.empty()){return chunk_data;}std::list<T> result;result.splice(result.begin(),chunk_data,chunk_data.begin());T const& partition_val = *result.begin();typename std::list<T>::iterator divide_point =  //⇽-- - 10std::partition(chunk_data.begin(),chunk_data.end(),[&](T const& val) {return val < partition_val; });chunk_to_sort new_lower_chunk;new_lower_chunk.data.splice(new_lower_chunk.data.end(),chunk_data,chunk_data.begin(),divide_point);std::future<std::list<T> > new_lower =new_lower_chunk.promise.get_future();chunks.push(std::move(new_lower_chunk));   // ⇽-- - 11if (threads.size() < max_thread_count)    // ⇽-- - 12{threads.push_back(std::thread(&sorter<T>::sort_thread,this));}std::list<T> new_higher(do_sort(chunk_data));result.splice(result.end(),new_higher);while (new_lower.wait_for(std::chrono::seconds(0)) !=std::future_status::ready)    //⇽-- - 13{try_sort_chunk();   // ⇽-- - 14}result.splice(result.begin(),new_lower.get());return result;}void sort_chunk(std::shared_ptr<chunk_to_sort > const& chunk){chunk->promise.set_value(do_sort(chunk->data));    //⇽-- - 15}void sort_thread(){while (!end_of_data)    //⇽-- - 16{try_sort_chunk();    // ⇽-- - 17//交出时间片std::this_thread::yield();    //⇽-- - 18}}
};

我们实现一个函数调用上面的封装快速排序

template<typename T>
std::list<T> parallel_quick_sort(std::list<T> input)    //⇽-- - 19
{if (input.empty()){return input;}sorter<T> s;return s.do_sort(input);    //⇽-- - 20
}

本例中,parallel_quick_sort()函数(19处)把绝大部分功能委托给sorter类(1处),后者通过栈容器管理待排序的数据段(2处),并集中管控多个线程以并发执行任务(3处),从而以便捷的操作方式给出了代码实现。

本例中,主要工作由成员函数do_sort()负责(9处),它借标准库的std::partition()函数完成数据分段(10处)。

do_sort()将新划分出来的数据段压入栈容器(11处),但没有为每个数据段都专门生成新线程,而仅当仍存在空闲的处理器时(12处)才生成新线程。

因为划分出的前半部分数据可能会由别的线程处理,所以我们需要等待它完成排序而进入就绪状态(13处)。

如果当前线程是整个程序中仅有的线程,或者其他线程都正忙于别的任务,那么这一等待行为则需妥善处理,在当前线程的等待期间,我们让它试着从栈容器取出数据进行处理(14处)。

try_sort_chunk()先从栈容器弹出一段数据(7处)并对其进行排序(8处),再把结果存入附属该段的promise中(15处),使之准备就绪,以待提取。

向栈容器压入数据段与取出相关结果相互对应,两项操作均由同一个线程先后执行(11和12处)。

只要标志end_of_data没有成立(16处),各线程便反复循环,尝试对栈内数据段进行排序17。

每个线程在两次检测标志之间进行让步(18处),好让别的线程有机会向栈容器添加数据段。这段代码由sorter类的析构函数汇合各个线程(4处)。

do_sort()将在全部数据段都完成排序后返回(即便许多工作线程仍在运行),主线程进而从parallel_quick_sort()的调用返回20,并销毁sorter对象。其析构函数将设置标志end_of_data成立(5处),然后等待全部线程结束(6处)。标志的成立使得线程函数内的循环终止(16处)。

按工作类别划分

在这里插入图片描述
CRTP(Curiously Recurring Template Pattern,奇特递归模板模式)
在这里插入图片描述


template <typename Derived>
class Shape {
public:void draw() {// 将 this 转换为派生类指针,调用具体实现static_cast<Derived*>(this)->draw();}double area() {return static_cast<Derived*>(this)->area();}
};class Circle : public Shape<Circle> {
public:void draw() {std::cout << "Drawing a circle\n";}double area() {return 3.14159 * 10 * 10;}
};class Rectangle : public Shape<Rectangle> {
public:void draw() {std::cout << "Drawing a rectangle\n";}double area() {return 10 * 20;}
};

模板单例类包含了原子变量_bstop控制线程是否停止

包含了_que用来存储要处理的信息,这是一个线程安全的队列。

_thread是要处理任务的线程。

线程安全队列我们之前有实现过,但是还需要稍微改进下以满足接受外部停止的通知。

我们给ThreadSafeQue添加一个原子变量_bstop表示线程停止的标记

在需要停止等待的时候我们调用如下通知函数
在这里插入图片描述
在这里插入图片描述
比如我们要实现一个ClassA 处理A类任务,可以这么做


#include "ActorSingle.h"
#include "ClassB.h"
struct MsgClassA {std::string name;friend std::ostream& operator << (std::ostream& os, const MsgClassA& ca) {os << ca.name;return os;}
};
class ClassA : public ActorSingle<ClassA, MsgClassA> {friend class ActorSingle<ClassA, MsgClassA>;
public:~ClassA() {_bstop = true;_que.NotifyStop();_thread.join();std::cout << "ClassA destruct " << std::endl;}void DealMsg(std::shared_ptr<MsgClassA> data) {std::cout << "class A deal msg is " << *data << std::endl;MsgClassB msga;msga.name = "llfc";ClassB::Inst().PostMsg(msga);}
private:ClassA(){_thread = std::thread([this]() {for (; (_bstop.load() == false);) {std::shared_ptr<MsgClassA> data = _que.WaitAndPop();if (data == nullptr) {continue;}DealMsg(data);}std::cout << "ClassA thread exit " << std::endl;});}
};

在这里插入图片描述
在这里插入图片描述

http://www.dtcms.com/a/366764.html

相关文章:

  • uniapp vue页面传参到webview.nvue页面的html或者另一vue中
  • Web应用:返回图片URL
  • Python快速入门专业版(一):Windows/macOS/Linux 系统环境搭建(附常见报错解决)
  • 【连接器专题】案例:带屏蔽膜FPC出现概率性短路,真是供应商的锅?
  • EasyVoice与cpolar:构建私域有声平台的本地化方案
  • Spring线程池ThreadPoolTaskExecutor‌详解
  • 隔空盗刷、AI钓鱼、代理劫持…金融黑产竟进化至此?
  • Elasticsearch 8 中 Nested 数据类型的使用方法
  • 【iOS】 懒加载
  • 一文吃透 CSS 伪类:从「鼠标悬停」到「斑马纹表格」的 30 个实战场景
  • 中值滤波、方框滤波、高斯滤波、均值滤波、膨胀、腐蚀、开运算、闭运算
  • HTML图片标签及路径详解
  • Python开篇撬动未来的万能钥匙 从入门到架构的全链路指南
  • 工厂模式总结
  • C++知识
  • C 盘清理技巧分享:释放磁盘空间,提升系统性能
  • 将 PDF 转换为 TIFF 图片:简单有效的 Java 教程
  • 数据传输,数据解析与写数据库
  • django全国小米su7的行情查询系统(代码+数据库+LW)
  • 阿瓦隆 A15 Pro 221TH/S:SHA-256 算力与高效能耗
  • 大模型部署全攻略:Docker+FastAPI+Nginx搭建高可用AI服务
  • Linux 编译 Android 版 QGroundControl 软件并运行到手机上
  • 一天涨幅2000倍的期权有吗?
  • (JVM)四种垃圾回收算法
  • ArcGIS学习-15 实战-建设用地适宜性评价
  • Node.js轻松生成动态二维码
  • Windows+Docker一键部署CozeStudio私有化,保姆级
  • 【Docker】P1 前言:容器化技术发展之路
  • LangChain4J-(4)-多模态视觉理解
  • 少儿编程C++快速教程之——2. 字符串处理