当前位置: 首页 > news >正文

Linux系统(项目)之----进程池

一、前置知识

注:本节内容中的知识在写一篇博客中会做讲解,这里需要先了解熟悉一下

  1. 进程池的概念

    • 进程池是一种用于管理和调度进程的机制。它预先创建了一组进程,这些进程处于等待状态,当有任务需要执行时,可以从进程池中取出一个进程来处理任务。在多任务处理的场景下,进程池可以有效地提高系统的性能和资源利用率。

    • 比如在一些服务器程序中,当有客户端请求到达时,服务器可以利用进程池中的进程来处理请求。如果没有进程池,每次有请求都要创建新的进程,这会带来较大的开销。而进程池中的进程已经创建好了,只需要分配任务给它们即可。

  2. 进程间通信功能

    • 进程池本身并不直接提供进程间通信的功能,但它可以和进程间通信机制一起使用。在多进程编程中,进程间通信是非常重要的,因为不同进程之间需要交换数据或者协调工作。

    • 常见的进程间通信方式有:

      • 管道(Pipe)

        • 管道是一种半双工的通信方式,数据只能在一个方向上流动。它分为匿名管道和命名管道。匿名管道通常用于父子进程之间的通信,例如在 Linux 系统中,使用 pipe 系统调用可以创建一个匿名管道。创建后,父子进程可以通过管道的文件描述符进行通信,一个进程向管道写入数据,另一个进程从管道读取数据。

        • 命名管道(也叫 FIFO)允许不相关的进程进行通信。它在文件系统中有对应的文件名,进程可以通过文件名来访问管道。比如,一个进程可以创建一个命名管道,然后其他进程可以通过这个管道的文件名来打开管道进行读写操作。

      • 消息队列(Message Queue)

        • 消息队列允许一个或多个进程向队列中写入消息,同时允许一个或多个进程读取队列中的消息。它是一种比较灵活的进程间通信方式,消息队列可以存储多个消息,发送方和接收方不需要同时运行。例如,在一个分布式系统中,不同的进程可以将消息发送到同一个消息队列,然后由专门的进程来处理这些消息。

      • 共享内存(Shared Memory)

        • 共享内存允许两个或多个进程共享一个给定的存储区。这是最快的进程间通信方式,因为进程可以直接访问共享内存中的数据。不过,使用共享内存需要解决同步问题,例如多个进程同时对共享内存进行写操作可能会导致数据混乱。在 Linux 系统中,可以使用系统调用如 shmget、shmat 等来创建和管理共享内存。

      • 信号(Signal)

        • 信号是一种比较简单的进程间通信方式,它是一种软件中断。一个进程可以向另一个进程发送信号,接收信号的进程可以根据信号的类型来执行相应的处理程序。例如,当一个进程收到 SIGINT 信号时(通常是用户按下 Ctrl + C 产生的),可以执行清理工作然后退出。信号主要用于进程之间的同步和简单的通知。

    • 当使用进程池时,如果需要进程间通信,可以结合上述的通信方式来实现。例如,在一个使用进程池的多任务应用程序中,主进程可以通过消息队列向进程池中的工作进程发送任务数据,工作进程处理完任务后,也可以通过消息队列或者管道等方式将结果发送回主进程。

二、设计进程池

进程池的设计我们今天以C++语言,因此我们需要大体上要用到两个类,一个类用于构造进程池本身,另一个用于设计进程间的通信

在设计时,如果要用某个库函数但是不知道头文件时,可以去linux里面man一下就可以了~

本进程池的设计是综合前面所有所学知识,包括lambda表达式,进程控制,回调函数,function函数等等,有一定的综合性

在本篇文章中,我们将采用.hpp代替.h文件,原因如下:

.hpp 是C++中用于头文件的扩展名,它与 .h 类似,但更倾向于现代C++的编程风格。它通常用于存放类的声明、函数声明、模板定义等,有助于提高代码的可读性和可维护性。在现代C++项目中,使用 .hpp 文件是一种常见的做法。

2.1 通信渠道的设计

根据上述前置知识,大体逻辑是这样的:

  1. 初始化:通过构造函数初始化通道对象,设置文件描述符、通道名称和目标子进程ID。

  2. 调试输出DebugPrint() 函数用于打印通道的基本信息,方便调试。

  3. 获取信息:提供 Fd()Name()Target() 函数来获取通道的文件描述符、名称和目标子进程ID。

  4. 资源管理Close() 函数用于关闭文件描述符,释放资源;Wait() 函数用于等待目标子进程结束。

  5. 析构:析构函数目前为空,但通常用于清理资源。

参考代码:

class Channel
{
public://基本构成:构造函数、析构函数等等Channel() {}~Channel(){}Channel(int fd,const string& name,pid_t id):_wfd(fd),_name(name),_sub_target(id){}//不同功能函数void DebugPrint(){printf("channel name: %s, wfd: %d, target pid: %d\n",_name.c_str(),_wfd, _sub_target);}int Fd(){return _wfd;}string Name(){return _name;}pid_t Target(){return _sub_target;}void Close(){close(_wfd);} void Wait(){pid_t rid=waitpid(_sub_target,nullptr,0);(void)rid;      //这里记得强转一下,要不函数返回值类型会报错~}
private:int _wfd;string _name;pid_t _sub_target;
};

2.2 进程池本体的设计

大体步骤如下:

1.初始化进程池,又可以具体细分为如下步骤:

  1. 循环创建子进程:通过一个 for 循环,根据 _processnum(进程池中进程的数量)来创建相应数量的子进程。

  2. 创建管道:在每次循环中,使用 pipe() 系统调用创建一个管道(pipefd),用于父子进程间的通信。管道由两个文件描述符组成,pipefd[0] 用于读,pipefd[1] 用于写。

  3. 检查管道创建:如果 pipe() 调用失败(返回值小于0),函数返回 false 表示初始化失败。

  4. 创建子进程:使用 fork() 系统调用创建一个子进程。fork() 调用成功后,返回两次:在父进程中返回子进程的PID,在子进程中返回0。

  5. 错误检查:如果 fork() 返回一个负值,表示创建子进程失败,函数返回 false

  6. 子进程逻辑

    • 关闭不需要的管道写端(pipefd[1]),因为子进程只需要从管道读取数据。

    • 调用回调函数 cb,传入管道读端(pipefd[0]),让子进程执行特定的任务。

    • 子进程完成任务后调用 exit(0) 退出。

  7. 父进程逻辑

    • 关闭管道读端(pipefd[0]),因为父进程只需要向管道写入数据。

    • 创建一个通道名称,格式为 "channel-" + 子进程索引

    • 将管道写端(pipefd[1])、通道名称和子进程PID添加到 _channels 容器中,用于后续的进程管理和通信。

  8. 返回成功:如果所有子进程都成功创建并初始化,函数返回 true 表示进程池初始化成功。

这里在补充说明一些知识点:

cb 代表的是一个回调函数(callback function)。回调函数是一种在软件或程序库中常用的技术,它允许在某个特定的时间点或事件发生时执行一段预定义的代码。这种技术在异步编程、事件处理和任务调度中尤其常见。在这段代码的上下文中,回调函数 cb 被用来指定子进程应该执行的任务。当子进程被创建后,它会调用这个回调函数,传入一个参数,通常是与子进程通信的管道的读端文件描述符(pipefd[0])。这样,子进程就可以通过这个文件描述符接收来自父进程的数据或命令,并执行相应的操作。而callback_t 是一个类型别名,它代表了一个回调函数的类型。这个类型使用 C++ 标准库中的 std::function 模板来定义,具体来说,它是一个可以接受一个 int 类型的参数(在这个上下文中通常是一个文件描述符 fd)并且不返回任何值(void)的函数。

    // 1.进程池的初始化bool InitProcessPool(callback_t cb){for (int i = 0; i < _processnum; i++){// 1.创建管道int pipefd[2] = {0};int n = pipe(pipefd); // pipe()函数,管道创建成功就返回0if (n < 0){return false;}// 2.创建子进程pid_t id = fork();if (id < 0)return false;if (id == 0){// 3.子进程读,关闭写,形成信道close(pipefd[1]);cb(pipefd[0]);exit(0);}// 外面父进程写,关闭读close(pipefd[0]);string name = "channel-" + to_string(i);_channels.emplace_back(pipefd[1], name, id); // 将当前子进程的通信信息(管道写端、通道名称和子进程ID)添加到 _channels 容器中,以便后续管理和通信。}return true;}

2. 控制唤醒指定的一个子进程,让该子进程完成指定任务

这里我们还要写一下各个任务的代码,这里直接给出:

#pragma once#include <iostream>
#include <string>
#include <vector>
#include <functional>using namespace std;// 4种任务
// task_t[4];using task_t = function<void()>;void Download()
{std::cout << "我是一个downlowd任务" << std::endl;
}
void MySql()
{std::cout << "我是一个 MySQL 任务" << std::endl;
}void Sync()
{std::cout << "我是一个数据刷新同步的任务" << std::endl;
}void Log()
{std::cout << "我是一个日志保存任务" << std::endl;
}vector<task_t> tasks;class Init
{
public:Init(){tasks.push_back(Download); // 在类的定义中直接初始化成员变量时,可以省略函数括号tasks.push_back(MySql);tasks.push_back(Sync);tasks.push_back(Log);}
};
Init ginit;

3.进程控制,这里我们采用轮询控制

    // 2. 控制唤醒指定的一个子进程,让该子进程完成指定任务//  2.1 轮询选择一个子进程(选择一个信道) -- 负载均衡// 这里可以可以用函数重载来实现多种情况:1.有限次2.无限次void PollingCtrlSubProcess(){int index = 0;while (1){CtrlSubProcessHelper(index);}}void PollingCtrlSubProcess(int count){if (count < 0)return;int index = 0;while (count){CtrlSubProcessHelper(index);count--;}}

2.3 主程序的设计

主程序设计比较简单,就是进行一下初始化和控制就好了~

代码放在汇总里了!

2.4 代码汇总:

这是Main.cc文件

#include "ProcessPool.hpp"
int main()
{// 1.初始化进程池ProcessPool pp(5);// 2.初始化进程池pp.InitProcessPool([](int fd){while(1){int code=0;ssize_t n=read(fd,&code,sizeof(code));if(n==sizeof(code))  //任务码{cout<<"子进程被唤醒:"<<getpid()<<endl;if(code>=0&&code<tasks.size()){// cout << "子进程开始执行任务了" << endl;tasks[code]();}else{cerr<< "父进程给我的任务码是不对的: " << code << endl;}}else if(n==0){cout << "子进程应该退出了: " << getpid() << endl;break;}else{cerr << "read fd: " << fd << ", error" << endl;break;}} });// 3.控制进程池pp.PollingCtrlSubProcess(5);// 4. 结束进程池pp.WaitSubProcess();std::cout << "父进程控制子进程完成,父进程结束" << std::endl;return 0;
}

这是process.hpp文件

#ifndef __PROCESS_POOL_HPP__
#define __PROCESS_POOL_HPP__#include <iostream>
#include<cstdlib>
#include<string>
#include <vector>
#include <functional>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <ctime>
#include "Task.hpp"using namespace std;const int gdefault_process_num = 5;
using callback_t = function<void(int fd)>;class Channel
{
public:// 基本构成:构造函数、析构函数等等Channel() {}~Channel() {}Channel(int fd, const string &name, pid_t id): _wfd(fd), _name(name), _sub_target(id){}// 不同功能函数void DebugPrint(){printf("channel name: %s, wfd: %d, target pid: %d\n", _name.c_str(), _wfd, _sub_target);}int Fd() { return _wfd; }string Name() { return _name; }pid_t Target() { return _sub_target; }void Close() { close(_wfd); }void Wait(){pid_t rid = waitpid(_sub_target, nullptr, 0);(void)rid; // 这里记得强转一下,要不函数返回值类型会报错~}private:int _wfd;string _name;pid_t _sub_target;
};class ProcessPool
{
public:// 构造函数与析构函数ProcessPool(int num = gdefault_process_num): _processnum(num){srand(time(nullptr) ^ getpid() ^ 0x777); // 0x777是一个常数,与时间戳和进程ID进行异或操作,进一步增加种子的随机性。}~ProcessPool(){}// 进程池的相关函数// 1.进程池的初始化bool InitProcessPool(callback_t cb){for (int i = 0; i < _processnum; i++){// 1.创建管道int pipefd[2] = {0};int n = pipe(pipefd); // pipe()函数,管道创建成功就返回0if (n < 0){return false;}// 2.创建子进程pid_t id = fork();if (id < 0)return false;if (id == 0){// 3.子进程读,关闭写,形成信道close(pipefd[1]);cb(pipefd[0]);exit(0);}// 外面父进程写,关闭读close(pipefd[0]);string name = "channel-" + to_string(i);_channels.emplace_back(pipefd[1], name, id); // 将当前子进程的通信信息(管道写端、通道名称和子进程ID)添加到 _channels 容器中,以便后续管理和通信。}return true;}// 2. 控制唤醒指定的一个子进程,让该子进程完成指定任务//  2.1 轮询选择一个子进程(选择一个信道) -- 负载均衡// 这里可以可以用函数重载来实现多种情况:1.有限次2.无限次void PollingCtrlSubProcess(){int index = 0;while (1){CtrlSubProcessHelper(index);}}void PollingCtrlSubProcess(int count){if (count < 0)return;int index = 0;while (count){CtrlSubProcessHelper(index);count--;}}void WaitSubProcess(){for(auto& c:_channels){c.Close();c.Wait();}}private:vector<Channel> _channels; // 所有信道int _processnum;           // 有多少个子进程void CtrlSubProcessHelper(int &index){// 1.选择一个通道(进程)int who = index;index++;index %= _channels.size();// 2.选择一个任务,随机int x = rand() % tasks.size(); //[0,3]// 3. 任务推送给子进程cout << "选择信道:" << _channels[who].Name() << ", subtarget : " << _channels[who].Target() << endl;write(_channels[who].Fd(), &x, sizeof(x));sleep(1);}
};
#endif

这是Task.hpp文件

#pragma once#include <iostream>
#include <string>
#include <vector>
#include <functional>using namespace std;// 4种任务
// task_t[4];using task_t = function<void()>;void Download()
{std::cout << "我是一个downlowd任务" << std::endl;
}
void MySql()
{std::cout << "我是一个 MySQL 任务" << std::endl;
}void Sync()
{std::cout << "我是一个数据刷新同步的任务" << std::endl;
}void Log()
{std::cout << "我是一个日志保存任务" << std::endl;
}vector<task_t> tasks;class Init
{
public:Init(){tasks.push_back(Download); // 在类的定义中直接初始化成员变量时,可以省略函数括号tasks.push_back(MySql);tasks.push_back(Sync);tasks.push_back(Log);}
};
Init ginit;

这是Makefile文件

process pool:Main.ccg++ -o $@ $^ -std=c++11
.PHONY:clean
clean:rm -f process pool

运行结果:


文章转载自:

http://V4ksQjel.yfcyh.cn
http://3RY7V03j.yfcyh.cn
http://coOBeZ5p.yfcyh.cn
http://xh66hNCe.yfcyh.cn
http://7I45EWNd.yfcyh.cn
http://eXNe3sQO.yfcyh.cn
http://E8GYP1ga.yfcyh.cn
http://7JeNPGaO.yfcyh.cn
http://ownktp7m.yfcyh.cn
http://yxg4b0F8.yfcyh.cn
http://CKKSrVcR.yfcyh.cn
http://gBgjMf8x.yfcyh.cn
http://sjY8Q0jc.yfcyh.cn
http://wEChqeXh.yfcyh.cn
http://yruWlHKq.yfcyh.cn
http://mdEEWg9O.yfcyh.cn
http://G5vw2p7p.yfcyh.cn
http://Eia0hEbS.yfcyh.cn
http://4f10nfK6.yfcyh.cn
http://FAxoGSLB.yfcyh.cn
http://2Lrti4Ji.yfcyh.cn
http://hZpUexym.yfcyh.cn
http://EhIOpOMK.yfcyh.cn
http://ltzFhAIE.yfcyh.cn
http://CAEXSnqS.yfcyh.cn
http://tTir7WwD.yfcyh.cn
http://Hqj89tu7.yfcyh.cn
http://cJ9Obamq.yfcyh.cn
http://1mI1npQA.yfcyh.cn
http://fqHoeYZr.yfcyh.cn
http://www.dtcms.com/a/359996.html

相关文章:

  • 搭建卷积神经网络
  • LangChain 核心链式组件对比:从 SequentialChain 到 LCEL
  • 想学怎么写网站怎么办?初学者专用! (HTML+CSS+JS)
  • 【大语言模型 32】Constitutional AI:自我改进的对齐方法
  • TJA1445学习笔记(二)
  • Python入门教程之类型判别
  • Qt Core 之 QString
  • 响应式编程框架Reactor【7】
  • React Hooks useMemo
  • JVM学习总结
  • docker中的命令(四)
  • 大话 IOT 技术(3) -- MQTT篇
  • 机器视觉学习-day19-图像亮度变换
  • 【模型训练篇】VeRL分布式基础 - 框架Ray
  • 分布式相关
  • 正则表达式 Python re 库完整教程
  • 如何用熵正则化控制注意力分数的分布
  • 让你的App与众不同打造独特品牌展示平台
  • Scikit-learn Python机器学习 - 类别特征提取- OneHotEncoder
  • 编写Linux下usb设备驱动方法:disconnect函数中要完成的任务
  • 【数学建模学习笔记】异常值处理
  • RAG(检索增强生成)技术的核心原理与实现细节
  • 【Unity开发】Unity核心学习(三)
  • macos自动安装emsdk4.0.13脚本
  • 在Ubuntu系统上安装和配置JMeter和Ant进行性能测试
  • 基于SpringBoot + Vue 的宠物领养管理系统
  • 【Spring Cloud微服务】7.拆解分布式事务与CAP理论:从理论到实践,打造数据一致性堡垒
  • ANR InputDispatching TimeOut超时判断 - android-15.0.0_r23
  • 拆分TypeScript项目的学习收获:处理编译缓存和包缓存,引用本地项目,使用相对路径
  • 配置 Kubernetes Master 节点不可调度的标准方法