模拟实现线程池
目录
简述线程池
线程池核心逻辑
准备阶段
makefile编写
threadpool.hpp编写
Task.hpp编写
threadpool.cc编写
测试运行
线程池停止运行函数stop编写
线程退出条件编写
我们今天来模拟实现线程池。
简述线程池
线程池是⼀种线程使⽤模式。线程过多会带来调度开销,进⽽影响缓存局部性和整体性能。⽽线程池维护着多 个线程,等待着监督管理者分配可并发执⾏的任务。这避免了在处理短时间任务时创建与销毁线程的 代价。线程池不仅能够保证内核的充分利⽤,还能防⽌过分调度。可⽤线程数量应该取决于可⽤的并 发处理器、处理器内核、内存、⽹络sockets等的数量。
线程池核心逻辑
我们今天就用threadpool类模拟线程池,线程池内部会有很多的线程,然后这个线程需要实现我们用户手动传入事务,所以内部肯定有一个事务队列,然后我们可以通过equeue函数传入各种不同的事务到队列中,这个队列是没有长度的,实际长度取决于我们传入多少事务,然后线程池内部的线程不可能杂乱无章,我们可以将其提前创建限定个数然后装进一个数组里面,我们称为线程数组,然后里面的线程依次竞争队列的资源,所以可以看出线程池就是一个生产者-消费者模型。
可以有很多用户通过一个主函数输入,但是我们这次只实现单个生产者的模型,比较简单嘛。
准备阶段
看上图可知,在threadpool类里面就需要一个数组存储线程,如果线程交给一个智能指针管理,我们等一下就是要交给一个只能指针管理,那数组内存储的就是一个一个指向线程的智能指针,然后需要一个任务队列,由于多个消费者竞争了临界资源(queue),所以就需要一把锁了,线程之间要完成同步还需要一个条件变量,条件变量需要唤醒机制,怎么唤醒,所以依据之前就需要一个计数等待队列中的等待线程个数,我们还可以再加入一个线程数组的容量num,方便我们一次创建num个线程装在队列里面。
我们希望在主函数创建线程,接着调用start方法启动线程池,这时里面的线程立马的创建好了,直接指向我们定义的处理事务的方法,然后这时队列不是空的吗,所有的线程都会直接进入等待队列,然后接着调用equeue方法加入事务,这里的事务我们使用回调函数,另起一个事务.hpp,里面编写事务函数,最后终止线程池,然后调用join方法阻塞式回收。所以threadpool里面就需要包含equeue方法,stop方法,线程处理函数(私有),判断队列是否为空方法,join回收方法,我们线程没完成一个方法就进行日志的打印,同时锁,添加变量,线程库那些创建什么的都可以使用我们自己封装好的(不知道的翻我之前的博客好吧,不解释),由于代码比较简单我非必要不解释。
makefile编写
直接拷贝之前的。
BIN=pool
CC=g++
SRC = $(wildcard *.cc)
OBJ = $(SRC:.cc=.o)
$(BIN):$(OBJ)
$(CC) -o $@ $^ -std=c++17 -lpthread
%.o:%.cc
$(CC) -c $<
.PTHONY:clean
clean:
rm -f $(BIN) $(OBJ)
threadpool.hpp编写
#pragma once
#include<iostream>
using namespace std;
#include<memory>
#include<unistd.h>
#include<vector>
#include<queue>
#include<functional> //
#include"pthread.hpp"
#include"mutex.hpp"
#include"cond.hpp"
#include"log.hpp"
namespace threadpoolmodule
{
using namespace lockmodule;
using namespace threadmodule;
using namespace condmodule;
using namespace logmodule;
using thread_t = shared_ptr<thread>;
//用来做测试的线程方法
void defaulttest()
{
while (true)
{
LOG(loglevel::DEBUG) << "我是一个测试方法";
sleep(1);
}
}
const static int defaultnum = 5;
template<class T>
class threadpool
{
private:
bool isempty()
{
return _taskq.empty();
}
void HandleTask()
{
LOG(loglevel::INFO) << "线程进入HandleTask的逻辑\n";
while (true)
{
T t;
{
lockguard lockguard(mut);
// 1.拿任务
while (isempty())
{
_wait_num++;
_cond.wait(mut);
_wait_num--;
}
t = _taskq.front();
_taskq.pop();
}
//2 处理任务
t(); //规定,未来所有的任务处理都是必须提供()直接调用的方法
}
}
public:
threadpool()
: _num(defaultnum)
, _wait_num(0)
{
for (int i = 0; i < _num; i++)
{
//
_threads.push_back(make_shared<thread>(bind(&threadpool::HandleTask, this)));
LOG(loglevel::INFO) << "构键线程" << _threads.back()->getname() << "对象 。。。成功";
}
}
void start()
{
for (auto& thread_ptr : _threads)
{
thread_ptr->start();
LOG(loglevel::INFO) << "启动线程" << thread_ptr->getname() << "。。。成功";
}
}
void equeue(T in)
{
lockguard lockguard(mut);
_taskq.push(move(in));
if ( _wait_num)
{
_cond.notify();
}
}
void stop()
{}
void join()
{
for (auto& thread_ptr : _threads)
{
thread_ptr->join();
LOG(loglevel::INFO) << "停止线程" << thread_ptr->getname() << "。。。成功";
}
}
~threadpool()
{}
private:
vector<thread_t> _threads; //线程池中的线程初始个数在数组里面
int _num; //线程池的容量
queue<T> _taskq; //线程池中的任务队列
mutex mut;
cond _cond;
int _wait_num; //等待队列线程
};
};
stop方法等下实现,私有成员就这些,不够再添。
这里,由于我们私有实现的成员函数线程调用方法会多出一个this指针,但是我们在自己实现的thread库里面调用的是无参的,所以要支持P()直接括号访问处理函数可以使用lambda,也可以绑定封装,头文件functional,bind的第一个参数是指向这个要绑定的函数的地址,如果这个函数不是成员函数那肯定是函数名直接传进去,但是成员函数比较特殊,成员函数的地址的提取方式为&类名::函数名,记住就好。
接着访问队列事务,需要加锁的,所以需要另加域,然后日志的打印换行只能使用\n,不能使用endl,因为我们这个<<调用的是自己重载的,endl是不认识的。如果使用endl等下报错会有一堆,不好看的,不必要在这里浪费时间。处理解锁处理,那什么时候唤醒线程呢?
根据生产者-消费者模型每次用户主程序输入任务的时候,表示肯定有任务,所以这时判断如果等待队列有线程,这个计数不是0,就唤醒一个线程,push事件入队列时防止拷贝,让move进行移动构造,然后传参的时候不建议使用T&,因为外部传入的一定是T,可能会类型不兼容。
Task.hpp编写
#pragma once
#include"threadpool.hpp"
#include"log.hpp"
using namespace logmodule;
void Push()
{
LOG(loglevel::DEBUG) << "我是一个推送数据到服务器的任务,我正在被执行";
}
我们的任务文件不多就先只有一个Push好吧。
threadpool.cc编写
#include"threadpool.hpp"
#include"Task.hpp"
using namespace threadpoolmodule;
using task_t = function<void()>;
int main()
{
//线程池的创建
shared_ptr<threadpool<task_t>> tp = make_shared<threadpool<task_t>>();
//线程池的开始处理事务
tp->start();
int cnt = 10;
while (cnt)
{
tp->equeue(Push);
cnt--;
}
//停止线程池, 等下实现
//tp->stop();
//回收线程池
tp->join();
return 0;
}
使用回调函数构造线程池,然后进行我们预想的操作,先传入10个任务,但是我们的线程池内线程只有5个,所以肯定会有线程处于等待队列的。
测试运行
咦,为什么唤醒线程,创建线程,执行方法的打印日志都是正常打印的,结果join退不出了呢,我们再来看一下HandleTask方法
这个方法是不是没有做线程退出的操作呀,线程永远在死循环,永远处在等待队列里面,当然退不出啦,此时卡住不是很正常的事,主线程在等待,这也证明了我们到这里写的代码都没有什么问题的。
Task当前在执行被哪个线程执行还可以再表示一下,所以就需要传入string name。
bind此时由于绑定函数多了一个参数string,所以就需要传入一个占位符。this不算绑定占位参数。
就剩stop方法和设计线程不满足pop条件退出的代码了。
线程池停止运行函数stop编写
关于判断线程池是不是处于运行状态的,我们可以增加一个bool类型的成员表示。表示运行状态的bool初始值是false比较合适,因为构造时肯定还没有调用start。stop不能贸然的随便就停止,一个线程池要停止肯定要保证里面的所有的线程都执行完了所有任务,并且没有线程处于等待队列。
void stop()
{
//让里面的线程都重新自己退出
//退出之前需要将所有的任务都处理完,所以需要一次性唤醒所有在等待的线程
if (_isrunning)
{
_isrunning = false;
if (_wait_num)
{
_cond.notifyall();
}
}
}
基于这个_isrunning我们还可以对其他函数加上判断,start的时候可以判断,只有当_isrunning处于false才需要start,equeue插入任务时,当_isrunning处于true时才有插入任务的必要。
由于会有多个线程同时访问equeue,加上if判断不是原子性的所以一定要放在加锁的里面进行,先加锁再判断。
start的改变_isrunning的值应该放在启动线程循环前面,因为启动线程可能出错的,而且这个启动也是多线程的,如果放在后面,当启动时如果一个线程被start比_isrunning置为true还早的访问queue就会出错,你放在循环的后面出错的概率也越高呀。
线程退出条件编写
在HandleTask中,当线程被唤醒时是不是只有当事务队列为空并且线程池处于关闭状态(_isrunning == false)时线程就可以退出了呀,进入循环的条件则需要再加上当前线程池处于运行状态,最后打印退出信息。
void HandleTask(string name)
{
LOG(loglevel::INFO) << "线程进入HandleTask的逻辑\n";
while (true)
{
T t;
{
lockguard lockguard(mut);
// 1.拿任务
while (isempty() && _isrunning)
{
_wait_num++;
_cond.wait(mut);
_wait_num--;
}
if (isempty() && !_isrunning)
{
break;
}
t = _taskq.front();
_taskq.pop();
}
//2 处理任务
t(name); //规定,未来所有的任务处理都是必须提供()直接调用的方法
}
LOG(loglevel::INFO) << "线程:" << name << "退出";
}