Linux—进程池实现
进程池的定义与核心概念
进程池(Process Pool)是一种用于管理和复用进程资源的编程模型,属于并发编程领域的重要概念。其核心思想是预先创建一组固定数量的进程(称为 “池”),当需要执行任务时,直接从池中获取空闲进程处理任务,任务完成后进程返回池中等待下一次调用,而非频繁创建和销毁进程。
管道就是一个文件,父进程通过管道文件向子进程传递信息,从而控制子进程
进程池的实现
进程结构
1.processpool.hpp
1.channel
class Channel
{
public:Channel(int fd,int id):_wfd(fd),_id(id){_name="channel-" + to_string(_wfd) + "-" + to_string(_id);}void Send(int code){int n=write(_wfd,&code,sizeof(code));}void CloseProcess(){pid_t rid=waitpid(_id,nullptr,0);}void CloseId(){close(_wfd);}int FD() {return _wfd;}pid_t ID() {return _id;}string NAME() {return _name;}private:int _wfd;pid_t _id;string _name;
};
Channel用来表示管道文件,_wfd是管道所对应的文件描述符,_id是管道对应的子进程id,_name是进程名字
Send用来向子进程发送信息,子进程可以根据信息来做出回应
两个close函数用来关闭管道和等待子进程
2.channelmanager
class ChannelManager
{
public:ChannelManager():_next(0){}void Insert(int fd,pid_t id){_channels.emplace_back(fd,id);}Channel& Select(){auto &c=_channels[_next];_next++;_next%=_channels.size();return c;}void StopProcess(){for(auto &c:_channels){c.CloseProcess();cout << "关闭: " << c.NAME() << std::endl;}}void StopId(){for(auto& c:_channels){c.CloseId();cout << "回收: " << c.NAME() << std::endl;}}void Printf(){for(auto &c:_channels){cout<<"子进程的名字是:"<<c.NAME()<<endl;}}private:vector<Channel> _channels;size_t _next;
};
channelmanager用来管理channel的结构体,_channels是一个数组,用来存储channel对象。_next用来处理子进程负载的轮询数据
Insert函数用来插入Channel对象,方便进行管理
Select函数用来处理子进程的负载问题,避免子进程工作量不均衡,这里使用轮询方法,而_next用来处理正确的轮询对象
另外两个Stop函数用来关闭所有的子进程和各自的管道文件
3.processpool
class ProcessPool
{
public:ProcessPool(int num) : _process_num(num){_tm.Register(PrintLog);_tm.Register(Download);_tm.Register(Upload);}void Work(int fd)
{while(true){int code=0;int n=read(fd,&code,sizeof(code));if(n>0){if(n!=sizeof(code)) {continue;}cout<<"子进程"<<getpid()<<' '<<"收到任务码"<<code<<endl;_tm.Execute(code);}else if(n==0){cout<<"子进程退出"<<endl;break;}else{cout<<"读取失败"<<endl;break;}}
}bool Strate()
{for(int i=0;i<_process_num;i++){// 1. 创建管道int fd[2]={0};int n=pipe(fd);if(n<0) return false;// 2. 创建子进程pid_t id=fork();//子读,父写if(id==0){//子进程close(fd[1]);Work(fd[0]);close(fd[0]);exit(0);}else if(id>0){//父进程close(fd[0]);_cm.Insert(fd[1],id);}else{//创建失败return false;}}return true;
}void Run()
{int taskcode=_tm.Code();auto& c=_cm.Select();cout<<"选择了一个子进程为"<<c.NAME()<<endl;c.Send(taskcode);cout<<"发送了一个任务码"<<taskcode<<endl;
}void Stop()
{// 关闭父进程所有的wfd即可_cm.StopId();// 回收所有子进程_cm.StopProcess();
}private:ChannelManager _cm;int _process_num;TaskManager _tm;
};
processpool用来给用户进行管理操作,_cm是管理的对象,_process_num表示进程的数量,_tm用来保存任务的内容,它的函数在另一个.hpp文件中。
构造函数初始化子进程的数量,用来给strate函数创建子进程的数量,而_tm用来插入任务内容
Strate函数可以创建子进程,这里使用for循环进行创建子进程的操作,在创建子进程的时候使用pipe创建管道方便父子进程进行通信,在创建完进程之后,关闭掉不需要的接口,然后子进程等待任务码的写入。
Work函数使用while循环来不断读取管道文件的内容,分为成功读取,读取到文件结尾以及读取失败,当成功读取的时候,它会根据任务码的内容来进行不同的任务,其他两种情况都可以退出程序
2.task.hpp
1.函数指针
typedef void (*task_t)();void PrintLog()
{std::cout << "我是一个打印日志的任务" << std::endl;
}void Download()
{std::cout << "我是一个下载的任务" << std::endl;
}void Upload()
{std::cout << "我是一个上传的任务" << std::endl;
}
这里使用函数指针方便传递任务函数
2.taskmanager
class TaskManager
{
public:TaskManager(){srand(time(nullptr));}void Register(task_t t){_tasks.push_back(t);}int Code(){return rand()%_tasks.size();}void Execute(int code){if(code>=0&&code<_tasks.size()){_tasks[code]();}}
private:vector<task_t> _tasks;
};
TaskManager用来管理任务内容,_task用来插入任务数据。
构造函数与code函数进行联动,使得Code函数每次的任务码都是不一样的。
Register函数用来插入任务内容。
Execute函数根据任务码来进行不同的任务
代码的问题
问题
由于子进程的文件描述符是拷贝父进程的文件描述符,所以之后的子进程会和父进程一样指向之前创建的管道文件
如果关闭管道和等待子进程的步骤错误,就会出现问题
for(int i=0;i<channels.size();i++)
{channels[i].closeid();channels[i].closeprocess();
}
如果以这种形式实现的Stop操作,就会出现阻塞情况,由于关闭的只是当前子进程的管道文件,后面的子进程依旧指向当前的管道文件,此时子进程读取不到文件结尾,也就关闭不了子进程,就发生了阻塞
解决方法
1.在关闭文件的时候从后往前关闭
2.关闭其他子进程的写端