Linux系统(项目)之----进程池
一、前置知识
注:本节内容中的知识在写一篇博客中会做讲解,这里需要先了解熟悉一下
进程池的概念
进程池是一种用于管理和调度进程的机制。它预先创建了一组进程,这些进程处于等待状态,当有任务需要执行时,可以从进程池中取出一个进程来处理任务。在多任务处理的场景下,进程池可以有效地提高系统的性能和资源利用率。
比如在一些服务器程序中,当有客户端请求到达时,服务器可以利用进程池中的进程来处理请求。如果没有进程池,每次有请求都要创建新的进程,这会带来较大的开销。而进程池中的进程已经创建好了,只需要分配任务给它们即可。
进程间通信功能
进程池本身并不直接提供进程间通信的功能,但它可以和进程间通信机制一起使用。在多进程编程中,进程间通信是非常重要的,因为不同进程之间需要交换数据或者协调工作。
常见的进程间通信方式有:
管道(Pipe):
管道是一种半双工的通信方式,数据只能在一个方向上流动。它分为匿名管道和命名管道。匿名管道通常用于父子进程之间的通信,例如在 Linux 系统中,使用 pipe 系统调用可以创建一个匿名管道。创建后,父子进程可以通过管道的文件描述符进行通信,一个进程向管道写入数据,另一个进程从管道读取数据。
命名管道(也叫 FIFO)允许不相关的进程进行通信。它在文件系统中有对应的文件名,进程可以通过文件名来访问管道。比如,一个进程可以创建一个命名管道,然后其他进程可以通过这个管道的文件名来打开管道进行读写操作。
消息队列(Message Queue):
消息队列允许一个或多个进程向队列中写入消息,同时允许一个或多个进程读取队列中的消息。它是一种比较灵活的进程间通信方式,消息队列可以存储多个消息,发送方和接收方不需要同时运行。例如,在一个分布式系统中,不同的进程可以将消息发送到同一个消息队列,然后由专门的进程来处理这些消息。
共享内存(Shared Memory):
共享内存允许两个或多个进程共享一个给定的存储区。这是最快的进程间通信方式,因为进程可以直接访问共享内存中的数据。不过,使用共享内存需要解决同步问题,例如多个进程同时对共享内存进行写操作可能会导致数据混乱。在 Linux 系统中,可以使用系统调用如 shmget、shmat 等来创建和管理共享内存。
信号(Signal):
信号是一种比较简单的进程间通信方式,它是一种软件中断。一个进程可以向另一个进程发送信号,接收信号的进程可以根据信号的类型来执行相应的处理程序。例如,当一个进程收到 SIGINT 信号时(通常是用户按下 Ctrl + C 产生的),可以执行清理工作然后退出。信号主要用于进程之间的同步和简单的通知。
当使用进程池时,如果需要进程间通信,可以结合上述的通信方式来实现。例如,在一个使用进程池的多任务应用程序中,主进程可以通过消息队列向进程池中的工作进程发送任务数据,工作进程处理完任务后,也可以通过消息队列或者管道等方式将结果发送回主进程。
二、设计进程池
进程池的设计我们今天以C++语言,因此我们需要大体上要用到两个类,一个类用于构造进程池本身,另一个用于设计进程间的通信
在设计时,如果要用某个库函数但是不知道头文件时,可以去linux里面man一下就可以了~
本进程池的设计是综合前面所有所学知识,包括lambda表达式,进程控制,回调函数,function函数等等,有一定的综合性
在本篇文章中,我们将采用.hpp代替.h文件,原因如下:
.hpp
是C++中用于头文件的扩展名,它与 .h
类似,但更倾向于现代C++的编程风格。它通常用于存放类的声明、函数声明、模板定义等,有助于提高代码的可读性和可维护性。在现代C++项目中,使用 .hpp
文件是一种常见的做法。
2.1 通信渠道的设计
根据上述前置知识,大体逻辑是这样的:
初始化:通过构造函数初始化通道对象,设置文件描述符、通道名称和目标子进程ID。
调试输出:
DebugPrint()
函数用于打印通道的基本信息,方便调试。获取信息:提供
Fd()
、Name()
和Target()
函数来获取通道的文件描述符、名称和目标子进程ID。资源管理:
Close()
函数用于关闭文件描述符,释放资源;Wait()
函数用于等待目标子进程结束。析构:析构函数目前为空,但通常用于清理资源。
参考代码:
class Channel
{
public://基本构成:构造函数、析构函数等等Channel() {}~Channel(){}Channel(int fd,const string& name,pid_t id):_wfd(fd),_name(name),_sub_target(id){}//不同功能函数void DebugPrint(){printf("channel name: %s, wfd: %d, target pid: %d\n",_name.c_str(),_wfd, _sub_target);}int Fd(){return _wfd;}string Name(){return _name;}pid_t Target(){return _sub_target;}void Close(){close(_wfd);} void Wait(){pid_t rid=waitpid(_sub_target,nullptr,0);(void)rid; //这里记得强转一下,要不函数返回值类型会报错~}
private:int _wfd;string _name;pid_t _sub_target;
};
2.2 进程池本体的设计
大体步骤如下:
1.初始化进程池,又可以具体细分为如下步骤:
循环创建子进程:通过一个
for
循环,根据_processnum
(进程池中进程的数量)来创建相应数量的子进程。创建管道:在每次循环中,使用
pipe()
系统调用创建一个管道(pipefd
),用于父子进程间的通信。管道由两个文件描述符组成,pipefd[0]
用于读,pipefd[1]
用于写。检查管道创建:如果
pipe()
调用失败(返回值小于0),函数返回false
表示初始化失败。创建子进程:使用
fork()
系统调用创建一个子进程。fork()
调用成功后,返回两次:在父进程中返回子进程的PID,在子进程中返回0。错误检查:如果
fork()
返回一个负值,表示创建子进程失败,函数返回false
。子进程逻辑:
关闭不需要的管道写端(
pipefd[1]
),因为子进程只需要从管道读取数据。调用回调函数
cb
,传入管道读端(pipefd[0]
),让子进程执行特定的任务。子进程完成任务后调用
exit(0)
退出。
父进程逻辑:
关闭管道读端(
pipefd[0]
),因为父进程只需要向管道写入数据。创建一个通道名称,格式为
"channel-" + 子进程索引
。将管道写端(
pipefd[1]
)、通道名称和子进程PID添加到_channels
容器中,用于后续的进程管理和通信。
返回成功:如果所有子进程都成功创建并初始化,函数返回
true
表示进程池初始化成功。
这里在补充说明一些知识点:
cb
代表的是一个回调函数(callback function)。回调函数是一种在软件或程序库中常用的技术,它允许在某个特定的时间点或事件发生时执行一段预定义的代码。这种技术在异步编程、事件处理和任务调度中尤其常见。在这段代码的上下文中,回调函数 cb
被用来指定子进程应该执行的任务。当子进程被创建后,它会调用这个回调函数,传入一个参数,通常是与子进程通信的管道的读端文件描述符(pipefd[0]
)。这样,子进程就可以通过这个文件描述符接收来自父进程的数据或命令,并执行相应的操作。而callback_t
是一个类型别名,它代表了一个回调函数的类型。这个类型使用 C++ 标准库中的 std::function
模板来定义,具体来说,它是一个可以接受一个 int
类型的参数(在这个上下文中通常是一个文件描述符 fd
)并且不返回任何值(void
)的函数。
// 1.进程池的初始化bool InitProcessPool(callback_t cb){for (int i = 0; i < _processnum; i++){// 1.创建管道int pipefd[2] = {0};int n = pipe(pipefd); // pipe()函数,管道创建成功就返回0if (n < 0){return false;}// 2.创建子进程pid_t id = fork();if (id < 0)return false;if (id == 0){// 3.子进程读,关闭写,形成信道close(pipefd[1]);cb(pipefd[0]);exit(0);}// 外面父进程写,关闭读close(pipefd[0]);string name = "channel-" + to_string(i);_channels.emplace_back(pipefd[1], name, id); // 将当前子进程的通信信息(管道写端、通道名称和子进程ID)添加到 _channels 容器中,以便后续管理和通信。}return true;}
2. 控制唤醒指定的一个子进程,让该子进程完成指定任务
这里我们还要写一下各个任务的代码,这里直接给出:
#pragma once#include <iostream>
#include <string>
#include <vector>
#include <functional>using namespace std;// 4种任务
// task_t[4];using task_t = function<void()>;void Download()
{std::cout << "我是一个downlowd任务" << std::endl;
}
void MySql()
{std::cout << "我是一个 MySQL 任务" << std::endl;
}void Sync()
{std::cout << "我是一个数据刷新同步的任务" << std::endl;
}void Log()
{std::cout << "我是一个日志保存任务" << std::endl;
}vector<task_t> tasks;class Init
{
public:Init(){tasks.push_back(Download); // 在类的定义中直接初始化成员变量时,可以省略函数括号tasks.push_back(MySql);tasks.push_back(Sync);tasks.push_back(Log);}
};
Init ginit;
3.进程控制,这里我们采用轮询控制
// 2. 控制唤醒指定的一个子进程,让该子进程完成指定任务// 2.1 轮询选择一个子进程(选择一个信道) -- 负载均衡// 这里可以可以用函数重载来实现多种情况:1.有限次2.无限次void PollingCtrlSubProcess(){int index = 0;while (1){CtrlSubProcessHelper(index);}}void PollingCtrlSubProcess(int count){if (count < 0)return;int index = 0;while (count){CtrlSubProcessHelper(index);count--;}}
2.3 主程序的设计
主程序设计比较简单,就是进行一下初始化和控制就好了~
代码放在汇总里了!
2.4 代码汇总:
这是Main.cc文件
#include "ProcessPool.hpp"
int main()
{// 1.初始化进程池ProcessPool pp(5);// 2.初始化进程池pp.InitProcessPool([](int fd){while(1){int code=0;ssize_t n=read(fd,&code,sizeof(code));if(n==sizeof(code)) //任务码{cout<<"子进程被唤醒:"<<getpid()<<endl;if(code>=0&&code<tasks.size()){// cout << "子进程开始执行任务了" << endl;tasks[code]();}else{cerr<< "父进程给我的任务码是不对的: " << code << endl;}}else if(n==0){cout << "子进程应该退出了: " << getpid() << endl;break;}else{cerr << "read fd: " << fd << ", error" << endl;break;}} });// 3.控制进程池pp.PollingCtrlSubProcess(5);// 4. 结束进程池pp.WaitSubProcess();std::cout << "父进程控制子进程完成,父进程结束" << std::endl;return 0;
}
这是process.hpp文件
#ifndef __PROCESS_POOL_HPP__
#define __PROCESS_POOL_HPP__#include <iostream>
#include<cstdlib>
#include<string>
#include <vector>
#include <functional>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <ctime>
#include "Task.hpp"using namespace std;const int gdefault_process_num = 5;
using callback_t = function<void(int fd)>;class Channel
{
public:// 基本构成:构造函数、析构函数等等Channel() {}~Channel() {}Channel(int fd, const string &name, pid_t id): _wfd(fd), _name(name), _sub_target(id){}// 不同功能函数void DebugPrint(){printf("channel name: %s, wfd: %d, target pid: %d\n", _name.c_str(), _wfd, _sub_target);}int Fd() { return _wfd; }string Name() { return _name; }pid_t Target() { return _sub_target; }void Close() { close(_wfd); }void Wait(){pid_t rid = waitpid(_sub_target, nullptr, 0);(void)rid; // 这里记得强转一下,要不函数返回值类型会报错~}private:int _wfd;string _name;pid_t _sub_target;
};class ProcessPool
{
public:// 构造函数与析构函数ProcessPool(int num = gdefault_process_num): _processnum(num){srand(time(nullptr) ^ getpid() ^ 0x777); // 0x777是一个常数,与时间戳和进程ID进行异或操作,进一步增加种子的随机性。}~ProcessPool(){}// 进程池的相关函数// 1.进程池的初始化bool InitProcessPool(callback_t cb){for (int i = 0; i < _processnum; i++){// 1.创建管道int pipefd[2] = {0};int n = pipe(pipefd); // pipe()函数,管道创建成功就返回0if (n < 0){return false;}// 2.创建子进程pid_t id = fork();if (id < 0)return false;if (id == 0){// 3.子进程读,关闭写,形成信道close(pipefd[1]);cb(pipefd[0]);exit(0);}// 外面父进程写,关闭读close(pipefd[0]);string name = "channel-" + to_string(i);_channels.emplace_back(pipefd[1], name, id); // 将当前子进程的通信信息(管道写端、通道名称和子进程ID)添加到 _channels 容器中,以便后续管理和通信。}return true;}// 2. 控制唤醒指定的一个子进程,让该子进程完成指定任务// 2.1 轮询选择一个子进程(选择一个信道) -- 负载均衡// 这里可以可以用函数重载来实现多种情况:1.有限次2.无限次void PollingCtrlSubProcess(){int index = 0;while (1){CtrlSubProcessHelper(index);}}void PollingCtrlSubProcess(int count){if (count < 0)return;int index = 0;while (count){CtrlSubProcessHelper(index);count--;}}void WaitSubProcess(){for(auto& c:_channels){c.Close();c.Wait();}}private:vector<Channel> _channels; // 所有信道int _processnum; // 有多少个子进程void CtrlSubProcessHelper(int &index){// 1.选择一个通道(进程)int who = index;index++;index %= _channels.size();// 2.选择一个任务,随机int x = rand() % tasks.size(); //[0,3]// 3. 任务推送给子进程cout << "选择信道:" << _channels[who].Name() << ", subtarget : " << _channels[who].Target() << endl;write(_channels[who].Fd(), &x, sizeof(x));sleep(1);}
};
#endif
这是Task.hpp文件
#pragma once#include <iostream>
#include <string>
#include <vector>
#include <functional>using namespace std;// 4种任务
// task_t[4];using task_t = function<void()>;void Download()
{std::cout << "我是一个downlowd任务" << std::endl;
}
void MySql()
{std::cout << "我是一个 MySQL 任务" << std::endl;
}void Sync()
{std::cout << "我是一个数据刷新同步的任务" << std::endl;
}void Log()
{std::cout << "我是一个日志保存任务" << std::endl;
}vector<task_t> tasks;class Init
{
public:Init(){tasks.push_back(Download); // 在类的定义中直接初始化成员变量时,可以省略函数括号tasks.push_back(MySql);tasks.push_back(Sync);tasks.push_back(Log);}
};
Init ginit;
这是Makefile文件
process pool:Main.ccg++ -o $@ $^ -std=c++11
.PHONY:clean
clean:rm -f process pool
运行结果: