当前位置: 首页 > news >正文

Linux:多线程(三.POSIX信号量、生产消费模型、线程池)

目录

1. 生产者消费者模型

1.1 阻塞队列(BlockingQueue)

1.2 一个实际应用的例子

2. POSIX信号量

2.1 引入

2.2 回顾加深理解信号量

2.3 信号量的操作接口

3. 基于循环队列的生产消费模型

3.1 循环队列

3.2 整个项目

4. 线程池

4.1 概念

4.2 线程池实现

1. 生产者消费者模型

超市(交易场所):

  • 定义:超市是数据“交易”的场所,即共享资源或临界资源的存储空间(也可以叫缓冲区)。在多线程编程中,这通常是一个数据结构(如队列、缓冲区等),用于临时存储数据,供生产者和消费者线程进行访问。

一般我们使用阻塞队列作为缓冲区

  • 功能:作为生产者和消费者之间数据传递的桥梁。生产者线程在此处添加(生产)数据,消费者线程在此处取走(消费)数据。

生产者(Producer):

  • 定义:生产者线程负责生成数据并将其放入超市(共享资源)中。

  • 并发度:生产者线程可以并发地运行,以提高数据的生成速度。但需要注意同步和互斥问题,以避免多个生产者同时写入数据导致的冲突。

  • 生产者之间都是互斥的:不能多个生产者同时都在往共享资源里面写

消费者(Consumer):

  • 定义:消费者线程负责从超市(共享资源)中取出数据并进行处理。

  • 并发度:消费者线程也可以并发地运行,以提高数据的处理速度。同样需要注意同步和互斥问题

  • 消费者之间都是互斥的:不能多个消费者同时都在从共享资源里面拿数据

3种关系:

生产者 vs 生产者 — 互斥

多个生产者线程可能同时试图向共享缓冲区(如队列或数组)中写入数据。为了防止数据竞争和不一致,我们需要使用互斥机制来确保同一时间只有一个生产者线程能够访问共享资源。

互斥通常通过互斥锁(Mutex)来实现。当一个生产者线程获得互斥锁时,其他生产者线程将被阻塞,直到锁被释放。这样,每个生产者线程在写入缓冲区时都能独占资源,从而避免了数据竞争。

消费者 vs 消费者 — 互斥

多个消费者线程可能同时试图从共享缓冲区中读取数据。为了确保数据的正确性和一致性,我们同样需要使用互斥机制来防止多个消费者线程同时访问缓冲区。

互斥锁在这里同样起到关键作用。当一个消费者线程获得互斥锁时,其他消费者线程将被阻塞,直到锁被释放。这样,每个消费者线程在读取缓冲区时都能独占资源,避免了潜在的冲突和不一致。

生产者 vs 消费者 — 互斥 && 同步

生产者线程和消费者线程需要共享一个缓冲区。这要求我们使用互斥机制来确保同一时间只有一个线程(生产者或消费者)能够访问缓冲区,以避免数据竞争和不一致。

但是,仅仅互斥是不够的。我们还需要使用同步机制来确保生产者和消费者之间的协调。例如,当缓冲区为空时,消费者线程应该被阻塞,直到生产者线程向其中添加了数据。同样地,当缓冲区满时,生产者线程也应该被阻塞,直到消费者线程从中取走了数据。

同步通常通过条件变量(Condition Variables)来实现。生产者线程在添加数据到缓冲区后,会向条件变量发送信号(signal),以唤醒等待的消费者线程。类似地,消费者线程在取走数据后,也会向条件变量发送信号,以唤醒等待的生产者线程。通过这种方式,生产者和消费者线程能够协调地工作,确保缓冲区的有效使用和数据的一致性。

优点:

  • 解耦:由于引入了一个缓冲区作为中介,生产者和消费者之间并不直接相互调用,从而降低了它们之间的耦合度。这使得生产者和消费者的代码发生变化时,不会对对方产生直接影响,提高了系统的灵活性和可维护性。

  • 支持并发:生产者和消费者是两个独立的并发体,它们之间通过缓冲区进行通信。生产者只需将数据放入缓冲区,就可以继续生产下一个数据;消费者只需从缓冲区中取出数据,就可以继续处理。这种并发处理的方式可以避免因生产者和消费者速度不匹配而导致的阻塞问题

  • 支持忙闲不均:在生产者和消费者模型中,生产者和消费者的速度可以不相同。当生产者生产数据的速度过快,而消费者处理数据的速度较慢时,未处理的数据可以暂时存储在缓冲区中,等待消费者处理。这种机制可以平衡生产者和消费者之间的速度差异,避免资源的浪费和瓶颈的产生。

1.1 阻塞队列(BlockingQueue)

在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。

其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)

BlockQueue.hpp 

#include<iostream>
#include<pthread.h>
#include<queue>

using namespace std;

template<class T>
class BlockQueue
{
public:
    //初始化
	BlockQueue(int max=20)
	:_max(max)
	{
		
		pthread_cond_init(&_cond1,nullptr);
		pthread_cond_init(&_cond2,nullptr);
		pthread_mutex_init(&_mutex,nullptr);
	}
    
	//头删,并且返回这个数,生产者使用的接口
	T pop()
	{
		pthread_mutex_lock(&_mutex);//加锁
		while(_q.size()==0)//如果队列中没有数据,则让线程等待
		{
			pthread_cond_wait(&_cond1,&_mutex);
		}
		T data=_q.front();
        _q.pop();
        
		pthread_cond_signal(&_cond2);//唤醒正在等待的尾插线程,可以插入了
		pthread_mutex_unlock(&_mutex);//解锁
		return data;
	}
   
	//尾插,消费者使用的接口
	void push(T& data)
	{
		pthread_mutex_lock(&_mutex);//加锁
		while(_q.size()==_max)//如果队列中数据满了,则让线程等待
		{
				pthread_cond_wait(&_cond2,&_mutex);
		}
		_q.push(data);

        pthread_cond_signal(&_cond1);//唤醒正在等待的头删线程,可以删除了
        pthread_mutex_unlock(&_mutex);//解锁
	}


    //销毁
	~BlockQueue()
	{
		pthread_cond_destroy(&_cond1);
		pthread_cond_destroy(&_cond2);
		pthread_mutex_destroy(&_mutex);
	}

private:
	pthread_cond_t _cond1;//头删条件变量
	pthread_cond_t _cond2;//尾插条件变量
	pthread_mutex_t _mutex;//定义锁
    queue<T> _q;//定义队列
	int _max;//队列的空间大小
};

1.2 一个实际应用的例子

  • BlockQueue.hpp:封装的阻塞队列

  • text.cc:测试代码

  • Task.hpp:任务类(这里只是进行一个加法)

Task.hpp

#include<string>

std::string oper="+-*/%";

class Task
{
public:
    //初始化
	Task(int x,int y,char oper)
	:_x(x),_y(y),_oper(oper),_result(0),_correct(0)
	{
	}
    
	//运行
	int run()
	{
		switch (_oper)
		{
		case '+':
		    _result=_x+_y;
			break;
		case '-':
			_result=_x-_y;
			break;
		case '*':
			_result=_x*_y;
			break;
		case '/':
			if(_y==0) _correct=1;
			else _result=_x/_y;
			break;
		case '%':
			if(_y==0) _correct=1;
			else _result=_x%_y;
			break;
		default:
			_correct=2;
			break;
		}
	}
    
	//消费者拿的任务
	std::string GetResult()
	{
		std::string rs=std::to_string(_x);
		rs+=_oper;
		rs+=std::to_string(_y);
		rs+="=";
		rs+=std::to_string(_result);
		rs+="[";
		rs+=std::to_string(_correct);
		rs+="]";

        return rs;
	}
   
	//生产者生产的任务
	std::string GetTask()
    {
        std::string r = std::to_string(_x);
        r += _oper;
        r += std::to_string(_y);
        r += "=?";
        return r;
	}
    
	~Task()
	{}

private:
	int _x;        
	int _y;
	int _result;      //结果
	char _oper;       //运算符
	int _correct;     //值是否正确
};

text.cc

#include<iostream>
#include"BlockQueue.hpp"
#include<pthread.h>
#include<ctime>
#include<unistd.h>
#include"Task.hpp"

//消费者
void* conmuser(void* args)
{
    BlockQueue<Task>* td=static_cast<BlockQueue<Task>*>(args);
    while(true)
    {
        Task t=td->pop();
    
        t.run();
        cout<<"conmuser get a task:"<<t.GetResult()<<endl;
        sleep(1);
    }
}

//生产者
void* producer(void* args)
{
    int len=oper.size();
    srand(time(nullptr));
    BlockQueue<Task>* td=static_cast<BlockQueue<Task>*>(args);
    while(true)
    {
        int data1=rand()%5;
        usleep(100);
        int data2=rand()%10;
        char op=oper[rand()%len];

        Task t(data1,data2,op);

        td->push(t);
        cout<<"producer a task:"<<t.GetTask()<<endl;
        sleep(1);
    }
}

int main()
{
    BlockQueue<Task>* q=new BlockQueue<Task>;//创建队列
    pthread_t c,p;//定义生产者和消费者

    pthread_create(&c,nullptr,producer,(void*)q);//创建生产者的线程
    pthread_create(&c,nullptr,conmuser,(void*)q);//创建消费者的线程
 
    //回收线程
    pthread_join(c,nullptr);
    pthread_join(p,nullptr);
    
    //销毁队列
    delete q;

    return 0;
}

2. POSIX信号量

2.1 引入

上次我们使用了阻塞队列的生产消费模型,在先前的生产者-消费者模型代码中,当一个线程想要操作临界资源时,必须确保临界资源处于满足条件的状态才能进行修改;否则无法修改。例如,在push接口中,当队列已满时,临界资源处于条件不可用的状态,无法继续进行push操作。此时,线程应该进入条件变量队列cond中等待。如果队列未满,即临界资源条件已准备好,那么可以继续push,调用队列_qpush接口。

观察代码可以看到,在判断临界资源是否就绪之前,必须先获取锁,因为判断临界资源实质上就是对临界资源的访问,而访问临界资源自然需要加锁以保护。因此,代码通常会先获取锁,然后手动检查临界资源的就绪状态,根据状态判断是等待还是直接操作临界资源。

但是如果事先知道临界资源的状态是否就绪,则无需一上来就加锁。一旦提前知道临界资源的就绪状态,便不再需要手动检查资源状态。在这种情况下,若有一个计数器来表示临界资源中小块资源的数量(如队列中每个空间),线程在访问临界资源前会先请求该计数器。若计数器大于0,则表明队列中有空余位置,可以直接向队列push数据;若计数器等于0,则说明队列已满,不能继续push数据,应该阻塞等待,直至计数器再次大于0,方可继续向队列push数据。

void push(T& data)
	{
		pthread_mutex_lock(&_mutex);//加锁
		while(_q.size()==_max)//如果队列中数据满了,则生产者等待
		{
				pthread_cond_wait(&_cond2,&_mutex);
		}
		_q.push(data);

        if(_q.size()>0)
        {
                pthread_cond_signal(&_cond1);//通知消费者可以消费了
        }

        pthread_mutex_unlock(&_mutex);//解锁
	}

2.2 回顾加深理解信号量

信号量是一种用于进程间通信和同步的机制。它本质上是一个计数器,用于衡量系统中的资源可用数量。通过信号量,可以实现对临界资源的访问控制,确保多个进程或线程能够安全地共享资源而不发生冲突。

在访问临界资源之前,程序可以通过申请信号量来获取对资源的访问权限。如果信号量的值大于0,表示资源可用,程序可以继续访问资源;如果信号量的值等于0,表示资源已被占用,程序需要等待,直到资源可用为止。

信号量并不仅仅是简单的计数器,它是通过原子操作实现的,确保信号量的操作是线程安全的。常用的信号量操作包括P操作(等待操作)和V操作(释放操作),也称为PV操作。P操作会将信号量的值减1,用于占用资源;V操作会将信号量的值加1,用于释放资源。

通过合理地使用信号量和PV操作,可以实现多线程或多进程之间的同步和互斥,避免资源竞争和死锁等并发问题。信号量是操作系统中重要的同步工具,广泛应用于进程间通信、资源管理、线程同步等场景。

system信号量和POSIX信号量都是用于进程间通信和同步的机制,但它们之间存在一些区别。

1. 系统信号量:

  • 系统信号量是Linux中的一种系统调用,用于进程间通信和同步。

  • 系统信号量是以系统级资源的形式存在,可以跨越进程边界,不仅可以用于线程之间的同步,也可以用于进程之间的同步。

  • 系统信号量是一个全局的计数器,可以通过系统调用函数来创建、初始化、P操作(等待操作)和V操作(释放操作)等。

  • 系统信号量的操作是通过系统调用函数来实现的,如semget、semop等。

2. POSIX信号量:

  • POSIX信号量是基于POSIX标准的一种同步机制

  • POSIX信号量与系统信号量类似,但是在接口和使用上有些许差异。

  • POSIX信号量允许用于进程间通信和线程间同步。

  • POSIX信号量通过调用相关的POSIX函数来创建、初始化、等待和释放,如sem_open、sem_wait、sem_post等。

系统信号量是Linux系统提供的一种进程间通信和同步机制,而POSIX信号量是基于POSIX标准的一种同步机制,二者都可以实现进程或线程间的同步和互斥操作

2.3 信号量的操作接口

初始化信号量:

使用sem_init函数可以初始化信号量,给定的value值会成为信号量的初始值。如果信号量是线程间共享的,可以被多个线程同时使用;如果是进程间共享的,可以被多个进程使用

#include <semaphore.h>//下面的函数都这此头文件

int sem_init(sem_t *sem, int pshared, unsigned int value);
  • sem: 指向要初始化的信号量的指针(我们使用sem_t 类型直接定义)

  • pshared: 0 表示该信号量为线程间共享;非零值表示信号量为进程间共享

  • value: 信号量的初始值

  • 若成功,返回值为0,表示初始化信号量成功。

  • 若出现错误,返回值为-1,表示初始化失败,并设置errno来指示具体错误。(下面都是一样的)

销毁信号量:

使用sem_destroy函数可以销毁之前初始化的信号量。在销毁信号量之前,要确保所有线程或进程都已经停止使用该信号量。

int sem_trywait(sem_t *sem);
  • sem: 要销毁的信号量的指针

等待信号量:(P操作- -)

使用sem_wait函数可以等待信号量,即执行P操作。如果信号量的值大于0,则将其减1并立即返回,否则线程(或进程)会阻塞等待信号量变为大于0。

int sem_wait(sem_t *sem);
  • sem: 要等待的信号量的指针

发布信号量:(V操作++)

使用sem_post函数可以发布(释放)信号量,即执行V操作。对信号量执行V操作会将其值加1,并唤醒可能正在等待该信号量的线程(或进程)。

int sem_post(sem_t *sem);
  • sem: 要发布的信号量的指针

3. 基于循环队列的生产消费模型

3.1 循环队列

之前在阻塞队列里,我们不能实现出队列与入队列的同时进行。现在因为是循环队列我们使用了两个索引,而两个索引不同时可以同时进行出和入

当为空时或者满时,二者只能有一个开始执行。然后就不再相等了,也是能分开进行了

#include<iostream>
#include<string>
#include<vector>
#include<pthread.h>
#include<semaphore.h>

using namespace std;

template<class T>
class RoundQwueue
{
private:
    //信号量--
	void P(sem_t& sub)
	{
		sem_wait(&sub);
	}
    //信号量++
	void V(sem_t& add)
	{
		sem_post(&add);
	}
    //加锁
	void Lock(pthread_mutex_t& mutex)
	{
		pthread_mutex_lock(&mutex);
	}
    //解锁
	void Unlock(pthread_mutex_t& mutex)
	{
		pthread_mutex_unlock(&mutex);
	}
public:
    //初始化
	RoundQwueue(int max=20)
	:_q(max),_max(max),_con(0),_pro(0)
	{
		pthread_mutex_init(&_mutex1,nullptr);
		pthread_mutex_init(&_mutex2,nullptr);
        
		sem_init(&_sem1,0,max);
		sem_init(&_sem2,0,0);
	}
   
	//生产者生产数据
	void push(T& data)
	{
		P(_sem1);           //生产者的信号量--,也就是可放数据的空间减一
		Lock(_mutex1);

		_q[_pro++]=data;      //将数据添加到队列中
		//_pro++;

        _pro%=_q.size();    //保证队列的循环,使_pro的下标不会超过队列的最大空间值

		Unlock(_mutex1);
		V(_sem2);           //消费者的信号量++,也就是可拿数据的空间加一
	}
    
	//消费者拿数据
	void pop(T& data)
	{
		P(_sem2);           //消费者的信号量--,也就是可拿数据的空间减一
		Lock(_mutex2);

		data=_q[_con];      //将队列中的数据往外拿
		_con++;
		_con%=_q.size();    //保证队列的循环,使_con的下标不会超过队列的最大空间值

		Unlock(_mutex2);
		V(_sem1);           //生产者的信号量++,也就是可放数据的空间加一
	}
    
	//回收
	~RoundQwueue()
	{
		pthread_mutex_destroy(&_mutex1);
		pthread_mutex_destroy(&_mutex2);

		sem_destroy(&_sem1);
		sem_destroy(&_sem2);
	}

private:
	vector<T> _q;              //这个一定要初始化
	int _max;                  //队列空间的最大值
	int _con;                  //消费者的下标
	int _pro;                  //生产者的下标

	pthread_mutex_t _mutex1;   //生产者的锁
	pthread_mutex_t _mutex2;   //消费者的锁

	sem_t _sem1;               //生产者信号量,一开始为队列的最大值
	sem_t _sem2;               //消费者信号量,一开始为0
};

3.2 整个项目

  • RingQueue.hpp:封装的循环队列

  • text.cc:程序的主体

  • Task.hpp:任务类(这里只是一个function包装器)

Task.hpp

#include<string>

std::string oper="+-*/%";

class Task
{
public:
    //初始化
	Task()
	{}
	Task(int x,int y,char oper)
	:_x(x),_y(y),_oper(oper),_result(0),_correct(0)
	{
	}
    
	//运行
	void run()
	{
		switch (_oper)
		{
		case '+':
		    _result=_x+_y;
			break;
		case '-':
			_result=_x-_y;
			break;
		case '*':
			_result=_x*_y;
			break;
		case '/':
			if(_y==0) _correct=1;
			else _result=_x/_y;
			break;
		case '%':
			if(_y==0) _correct=1;
			else _result=_x%_y;
			break;
		default:
			_correct=2;
			break;
		}
	}
    
	//消费者拿的任务
	std::string GetResult()
	{
		std::string rs=std::to_string(_x);
		rs+=_oper;
		rs+=std::to_string(_y);
		rs+="=";
		rs+=std::to_string(_result);
		rs+="[";
		rs+=std::to_string(_correct);
		rs+="]";

        return rs;
	}
   
	//生产者生产的任务
	std::string GetTask()
    {
        std::string r = std::to_string(_x);
        r += _oper;
        r += std::to_string(_y);
        r += "=?";
        return r;
	}
    
	~Task()
	{}

private:
	int _x;        
	int _y;
	int _result;      //结果
	char _oper;       //运算符
	int _correct;     //值是否正确
};

text.cc

#include<iostream>
#include"RoundQueue.hpp"
#include"Task.hpp"
#include<pthread.h>
#include<ctime>
#include<unistd.h>

using namespace  std;

//生产者
void* producer(void* args)
{
	
	srand(time(nullptr));
	int len=oper.size();
	RoundQwueue<Task>* td=static_cast<RoundQwueue<Task>*>(args);
	
    while(true)
	{
		
		int data1=rand()%10;
		usleep(100);
		int data2=rand()%5;
		char op=oper[rand()%len];
		
		Task t(data1,data2,op);
		td->push(t);

        cout<<"producer give a task:"<<t.GetTask()<<endl;
		sleep(1);
	}
}

//消费者
void* consumer(void* args)
{
	RoundQwueue<Task>* td=static_cast<RoundQwueue<Task>*>(args);
	while(true)
	{
		Task t;
		td->pop(t);
		t.run();
    	
		cout<<"consumer get a task:"<<t.GetResult()<<endl;
		
	}
}

int main()
{
	RoundQwueue<Task>* tr=new RoundQwueue<Task>;//创造队列
	
	pthread_t tid1;
	pthread_t tid2;
	
	pthread_create(&tid1,nullptr,producer,(void*)tr);
	pthread_create(&tid2,nullptr,consumer,(void*)tr);
	
	pthread_join(tid1,nullptr);
    pthread_join(tid2,nullptr);
	
	delete tr;
	return 0;
}

4. 线程池

4.1 概念

  线程池:见名知义,就是多个线程构成的集合。其中线程的个数是确定的,并不是固定的

  为什么要有线程池?

  • 如果每次都只创建一个线程,首先当用户请求过多时,每次都需要创建一个线程,创建线程需要时间和调度开销,这样会影响缓存的局部性和整体的性能。其次,如果无上限一直创建线程,还会导致CPU的过分调度。

  • 线程池已经创建好了一定数量的线程,等待着分配任务,这样避免了处理任务时的线程创建和销毁。线程池里线程个数确定,能够保证内核的充分利用,还能防止过分调度。

  • 线程池中可用线程数量取决于可用额并发处理器,处理器内核,内存,网络socket等的数量。

线程池的应用场景:

  • 需要大量的线程来完成任务,且完成人物的时间比较短。比如:WEB服务器完成网页请求这样的任务,因为当个任务小,并且任务量巨大,你可以想象一个热门网站的请求次数。但是对于长时间的任务,线程池的优先就不明显了。比如:一个Telnet连接请求,因为Telnet会话时间比线程创建时间大多了。

  • 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。

  • 接收突发性的大量请求,但是不至于使服务器因此产生大量线程应用。突发性大量客户请求,在没有线程池的情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量的线程可能使内存到达极限,出现错误。

4.2 线程池实现

线程池实际也是一个生产者消费者模型,接收任务,往任务队列中放任务的是生产者,从任务队列中拿任务并执行的是消费者。

 主线程是生产者,用来接收任务和放任务。

Task.hpp

#include<string>

std::string oper="+-*/%";

class Task
{
public:
    //初始化
	Task(int x,int y,char oper)
	:_x(x),_y(y),_oper(oper),_result(0),_correct(0)
	{
	}
    
	//运行
	int run()
	{
		switch (_oper)
		{
		case '+':
		    _result=_x+_y;
			break;
		case '-':
			_result=_x-_y;
			break;
		case '*':
			_result=_x*_y;
			break;
		case '/':
			if(_y==0) _correct=1;
			else _result=_x/_y;
			break;
		case '%':
			if(_y==0) _correct=1;
			else _result=_x%_y;
			break;
		default:
			_correct=2;
			break;
		}
	}
    
	//消费者拿的任务
	std::string GetResult()
	{
		std::string rs=std::to_string(_x);
		rs+=_oper;
		rs+=std::to_string(_y);
		rs+="=";
		rs+=std::to_string(_result);
		rs+="[";
		rs+=std::to_string(_correct);
		rs+="]";

        return rs;
	}
   
	//生产者生产的任务
	std::string GetTask()
    {
        std::string r = std::to_string(_x);
        r += _oper;
        r += std::to_string(_y);
        r += "=?";
        return r;
	}
    
	~Task()
	{}

private:
	int _x;        
	int _y;
	int _result;      //结果
	char _oper;       //运算符
	int _correct;     //值是否正确
};

threadpool.hpp

#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <queue>

using namespace std;

class threadname
{
public:
	string _name;       //线程的名字
	pthread_t tid;      //线程的tid
};

template <class T>
class Threadpool
{
private:
    //加锁
	void Lock()
	{
		pthread_mutex_lock(&_mutex);
	}
    
	//解锁
	void Unlock()
	{
		pthread_mutex_unlock(&_mutex);
	}
    
	//线程等待
	void Wait()
	{
		pthread_cond_wait(&_cond, &_mutex);
	}
    
	//唤醒线程
	void signal()
	{
		pthread_cond_signal(&_cond);
	}
    
	//输出线程的名字
	string Getthreadname(pthread_t tid)
	{
		for (auto ti : _v)
		{
			if (ti.tid == tid)
			{
				return ti._name;
			}
		}
		return nullptr;
	}

public:
    //构造函数
	Threadpool(int max = 10)
		: _max(max), _v(max)                        //初始化数组和_max
	{
		pthread_mutex_init(&_mutex, nullptr);       //初始化锁
		pthread_cond_init(&_cond, nullptr);         //初始化条件变量
	}

	static void *hander(void *args)
	{
		Threadpool<T> *td = static_cast<Threadpool<T> *>(args);
		string name = td->Getthreadname(pthread_self());     //通过tid,将该线程的名字从数组中拿出

		while (true)
		{
			td->Lock();
			while (td->_q.empty())       //队列如果为空,则进行线程等待
			{
				td->Wait();
			}

			T t = td->pop();             //将任务从队列中拿出
			td->Unlock();
			t.run();

			cout << name << " a result:" << t.GetResult() << endl;
		}
	}
    
	//创造线程
	void Create()
	{

		int num = _v.size();
		for (int i = 0; i < num; i++)
		{
			_v[i]._name = "thread_" + to_string(i + 1);   //将线程的名字存入数组中
			pthread_create(&(_v[i].tid), nullptr, hander, (void *)this);
			cout<<_v[i]._name<<endl;                      //打印创造的线程名字
		}
		
	}
    
	//将任务添加到队列中
	void push(T &data)
	{
		Lock();
		_q.push(data);
		cout << "thread produser a task:" << data.GetTask() << endl;
		signal();
		Unlock();
	}
    
	//从队列中拿出任务
	T pop()
	{
		T t = _q.front();
		_q.pop();
		return t;
	}
    
	//析构函数
	~Threadpool()
	{
		pthread_mutex_destroy(&_mutex);
		pthread_cond_destroy(&_cond);
	}

private:
	vector<threadname> _v;    //定义一个数组,用来存放线程的名字和tid
	queue<T> _q;              //队列,用来存放任务
	int _max;	              // 数组的大小

	pthread_mutex_t _mutex;   // 锁
	pthread_cond_t _cond;     
};

text.cc

#include "threadpool.hpp"
#include "Task.hpp"
#include <ctime>

using namespace std;

int main()
{
    Threadpool<Task> *tp = new Threadpool<Task>;
    tp->Create();
    srand(time(nullptr));
    int len = oper.size();

    while (true)
    {

        int x = rand() % 10 + 1;
        usleep(100);
        int y = rand() % 20 + 1;
        char op = oper[rand() % len];

        Task t(x, y, op);

        tp->push(t);
        sleep(1);
    }
    
    delete tp;

    return 0;
}

结果:

线程池是一开始,我们就创建好n个线程,然后将任务添加到线程池中,由之前创建好的线程去挣抢完成任务

相关文章:

  • 计算机网络——IP、MAC、ARP
  • 常见的交换机端口类型
  • golang从入门到做牛马:第十七篇-Go语言Map:键值对的“魔法袋”
  • 【前端】【组件】【vue2】封装一个vue2的ECharts组件,不用借助vue-echarts
  • ctf-web: php原生类利用 -- GHCTF Popppppp
  • 深度学习笔记——残差网络和模型选择
  • 【python-uiautomator2】手机上的ATX应用界面报错问题处理:无法提供服务,非am instrument启动
  • 图像处理篇---opencv中的图像特征
  • JavaScript基本知识
  • 【C++模板】:开启泛型编程之门(函数模版,类模板)
  • 大模型在甲状腺良性肿瘤诊疗全流程中的应用研究报告
  • 【Golang】第一弹-----初步认识GO语言
  • docker 小记
  • 使用 OpenSSL 和 Python 实现 AES-256-CBC 加密与解密(安全密钥管理)
  • Node 使用 SSE 结合redis 推送数据(echarts 图表实时更新)
  • Javascript基础语法详解
  • 深入探索Matter协议:开发Matter智能家居设备的基本步骤
  • 《Java三剑客:JDK、JRE、JVM的“塑料友情”》
  • wireshark 如何关闭混杂模式 wireshark操作
  • redis在ubuntu更新至最新版本-官方提供方法-查看版本和状态-查看数据库中数据
  • 济源网站制作/大连seo外包平台
  • 动画形式的h5在哪个网站做/百度seo推广怎么收费
  • 怎么学习网站开发/微信营销成功案例8个
  • 苏州做网站品牌公司/北京网站优化公司
  • 企业网站的首页设计/seo整站怎么优化
  • 做外贸学英语的网站/百度云盘官网