当前位置: 首页 > news >正文

【Linux系统】匿名管道以及进程池的简单实现

前言:

        上文我们讲到了动静态库的简单制作【Linux系统】动静态库的制作-CSDN博客

        本文我们来讲一讲进程通信中的匿名管道!

        点个关注!


理解

1.进程间通信的目的

        数据传输:一个进程需要将它的数据发送给另一个进程

        资源共享:多个进程之间共享同样的资源

        通知事件的发生:一个进程需要向另一个进程或一组进程发送进程,通知它们发生了上什么事件(如:子进程退出时,要通知父进程进行等待)

        进程控制:有些进程希望完全控制另一个进程,控制的进程希望拦截到另一个进程的所有陷入和异常,并能及时的知道它的状态改变

2.什么是进程间通信

        进程通信(Inter-Process Communication,简称 IPC)是指操作系统中不同进程之间交换数据、传递信息或协调行为的机制

        在操作系统中,进程是独立运行的程序实例,拥有各自独立的内存空间和资源(如地址空间、文件描述符等),默认情况下相互隔离。但实际应用中,多个进程 often 需要协作完成任务(例如一个进程生成数据,另一个进程处理数据),这就需要通过特定的方式打破隔离,实现信息交换 —— 这就是进程通信的核心目的

3.如何通信

        进程通信的本质是:先让不同的进程看到同一份资源,再然后才能实现通信!

        而我们知道进程的有独立性的!想让进程之间看到同一份资源,这份资源就只能由OS提供,而如何提供呢?通过系统调用 ->设计统一的通信接口!


匿名管道

1.接口使用方法

#include <unistd.h>
int pipe(int fd[2]);输出型参数:int fd[2]
这是一个长度为 2 的 int 数组,用于存储管道的两个文件描述符
fd[0]:管道的读端(read end),专门用于从管道读取数据
fd[1]:管道的写端(write end),专门用于向管道写入数据
调用 pipe() 后,内核会自动填充这两个文件描述符,进程通过操作它们实现对管道的读写返回值:成功返回0,失败返回-1

2.匿名管道原理

        匿名管道是基于已经有的技术,再次开发的!

        匿名管道本质也是文件!也有缓冲区(管道缓冲区)!

        但是匿名管道的管道缓冲区不需要刷新到磁盘,也与磁盘没有关系!起作用主要用于暂时保存通信数据!

        匿名管道是内存级的文件!在原有的文件控制上被再次设计,配上单独的接口。并且匿名管道没有文件路径,没有文件名!

        如下图:

如何保证两个进程打开是同一个管道:

        子进程会继承父进程的file_struct!所有指向的管道一定是同一个!

        就算是子进程与子进程之间通信,它们的file_struct都是继承父进程的,都会指向同一个管道的。

3.匿名管道的特性与通信

5种特性:

匿名管道,只能用于本机通信,只能用于有血缘关系的进程进行通信(常用于父子通信)!
匿名管道文件,自带同步机制:包含4种通信情况!
匿名管道的面向字节流的
匿名管道是单向通信的!(属于半双工。半双工:任何时候一个发,一个收。全双工:任何时候,可以同时收发)
匿名管道的生命周期是随进程的!

4种通信情况:

写慢,读快:读端阻塞,等待写端
写快,读慢:管道缓冲区写满了,就要阻塞等待读端
写关闭,读继续:一直读取,知道读到完,返回0,表示读取到文件末尾
写继续,读关闭:无意义操作!OS会自动杀掉写端进程(通过信号:13 SIGPIPE杀掉)

4.测试匿名管道接口

#include<iostream>
#include<unistd.h>
#include<cstdlib>
#include<sys/types.h>
#include<cstring>
#include<sys/wait.h>
using namespace std;void ChildWrite(int fd)
{while(1){char buffer[] = {"hello pipe!"};write(fd, buffer, strlen(buffer));sleep(1);}
}void ParentRead(int fd)
{char buffer[1024];while(1){int n=read(fd,buffer,sizeof(buffer)-1);if (n > 0){buffer[n] = 0;printf("子进程说:%s\n", buffer);}else if(n==0){cout<<"子进程已经退出!父进程也执行退出!\n";break;}else{cout<<"读取失败!\n";break;}}
}// 子写 -> 父读
int main()
{//1.创建管道int fds[2] = {0};ssize_t n = pipe(fds);if(n<0){cout<<"pipi error!\n";exit(1);}cout<<"fds[0]:"<<fds[0]<<endl;  //fds[0]:表示读取端cout<<"fds[0]:"<<fds[1]<<endl;  //fds[1]:表示写入端//2.创建子进程int x = fork();if(x<0){cout<<"fork error!\n";exit(1);}else if(x==0){//子进程//3.关闭不需要端口,关闭读端close(fds[0]);ChildWrite(fds[1]);//子进程退出close(fds[1]);}else{//父进程//3.关闭不需要端口,关闭写端close(fds[1]);ParentRead(fds[0]);//父进程退出close(fds[0]);//4.等待int states=0;int q = wait(&states);}
}


基于匿名管道实现进程池

把进程当成 “可复用的资源”,提前建好一批,随时待命:

  1. 预先创建 “资源进程”:程序启动时,创建固定数量的进程(比如 4 个,或和 CPU 核心数一致),这些进程平时处于 空闲等待状态(不干活,但占着资源待命)。
  2. 管理进程调度任务:有一个 “管理进程”(比如父进程)负责把任务分配给空闲的资源进程;资源进程处理完任务后,不会销毁,而是回到池里继续等下一个任务

类比:餐厅预先雇好 4 个厨师(资源进程),来了订单(任务),经理(管理进程)安排厨师做菜,做完继续待命,不用每次重新雇厨师。

1.大体思路

        1.进程池对象的创建:要有对应的类,采用先描述在组织的思路!先描述管道,在采用对应的结构体组织管道,最后再封装为进程池

        2.进程池的启动:由父进程创建多个子进程,并由管道链接!父进程执行记录子进程的工作,子进程执行指定的任务。此时子进程阻塞在读取端,等待管道有数据写入!

        3.任务的执行:选择子进程、选择任务返回任务码、向指定的子进程对应的管道写入任务码!子进程通过管道读取任务码,根据任务码去执行指定的任务!

        4.进程池的中止:上面我们讲到,写端关闭、读端会自动关闭!所有只要将管道的写段全部关闭!那么读端就会全部关闭,子进程识别到读端关闭,就会退出!退出之后,再由父进程继续等待即可完成中止!

2.完整代码

//ProgressPool.hpp#pragma once
#include<iostream>
#include<vector>
#include<unistd.h>
#include<string>
#include<cstring>
#include<sys/types.h>
#include<sys/wait.h>
#include"Task.hpp"
using namespace std;
// 可以之间写方法的实现//先描述
class Channel
{
public:Channel(int fd=0,int id=0):_fd(fd),_id(id){_name="Channel"+to_string(fd)+to_string(id);}string& name(){return _name;}//发送任务码void Send(int code){write(_fd,&code,sizeof(code));}void Close(){close(_fd);}void Wait(){int states=0;waitpid(_id,&states,0);}private:int _fd;pid_t _id;string _name;
};// 在组织
class ManagerChannel
{
public://记录管道fd、子进程的pidvoid Inset(int fd,pid_t id){_c.emplace_back(fd,id);}//选择管道Channel& Select(){Channel& c=_c[_num];_num++;_num%=_c.size();return c;}//关闭写端void CloseRead(){for(auto& e:_c){e.Close();cout<<"关闭:"<<e.name()<<endl;}}//回收子进程void WaitChild(){for(auto& e:_c){e.Wait();cout<<"回收:"<<e.name()<<endl;}}private:int _num=0;vector<Channel> _c;
};class ProgressPoll
{
public:ProgressPoll(int num): _num(num){}//读取任务码,并执行void Work(int fd){while (1){int code;int n = read(fd, &code, sizeof(code));if (n > 0){if(n!=sizeof(code))continue;cout<<"子进程["<<getpid()<<"]收到一个任务码!\n";//根据任务码,执行任务_tm.execute(code);}else if (n == 0){cout<<"父进程没有指定任务!子进程退出!\n";break;}else{cout<<"读取任务码错误!\n";break;}}}//启动进程池
void Start()
{for(int i=0;i<_num;i++){ // 创建管道int fd[2];int n = pipe(fd);if (n == -1){cout << "pipe error!\n";}else{// 创建子进程pid_t id = fork();if (id == 0){// 子进程// 关闭不需要的端口close(fd[1]);// 执行任务Work(fd[0]);// 退出close(fd[0]);exit(0);     //!!!!!严重注意}else{// 父进程// 关闭不需要的端口close(fd[0]);//记录子进程_m.Inset(fd[1],id);}}}
}//派送任务
void run()
{//选择任务int taskcode=_tm.code();//选着管道(子进程):采取轮询选择auto& c=_m.Select();cout<<"选择子进程["<<c.name()<<"]执行任务\n";//发送任务码c.Send(taskcode);cout<<"发送任务码:"<<taskcode<<endl;
}//停止进程池
void Stop()
{//关闭写端,读端自动关闭!//关闭父进程写段_m.CloseRead();//等待子进程_m.WaitChild();
}private:int _num; //需要几个子进程ManagerChannel _m;TaskManager _tm;
};//TaskManager.hpp#pragma once
#include <iostream>
#include <ctime>class TaskManager
{
public://选择任务码int code(){srand(time(nullptr));return rand() % 5;}//根据子进程传递的任务码,执行对应任务void execute(int code){std::cout<<"执行任务码为:"<<code<<"的任务!\n";}private:
};//test.cpp#include "ProgressPool.hpp"int main()
{// 创建进程池对象ProgressPoll pp(5);// 启动进程池pp.Start();// 派送任务for(int i=0;i<5;i++){pp.run();sleep(2);}//pp.run();// 停止进程池pp.Stop();
}

3.修改小bug

        上面的程序中存在一个小bug!

        是在关闭写端的过程中,我们看上面代码关闭写段是只调用了一个close函数关闭了一个端口!但事实上我们的代码中管道不仅仅一个端口!

        因为子进程的创建是会继承父进程的数据的!父进程中指向的管道,会被子进程继承,那么子进程也会指向对应的管道了!

        这就会导致一个管道可能有多个进程指向!存在多个写端!

        而存在多个写段会影响什么呢?因为我们上面的代码确实存在这个问题,但是可以正常运行啊!

        看似没问题!实则暗藏玄机:

        首先,如果存在多个写端,在进程池终止的时候,我们仅仅关闭的父进程的写段端,没有关闭其他子进程的写段,这会导致什么?这会导致管道的读端无法自动关闭!从而导致子进程无法识别到中止意图,无法退出!

        然后,那为什么我们上面的代码可以正常运行呢?那是因为我们上面的代码是在任务指向完之后,再去进行的中止!而执行完任务后子进程是自动退出的!子进程退出后,子进程中的写段、读端页会被自动关闭!所以我们之前的代码才可以正常运行!

        但是!如果在子进程任务还没有执行完时,子进程还没有退出时,我们之间调用Stop函数就会卡死!因为父进程写段关闭,但还有其他子进程指向,并且子进程并没有写入数据的功能!子进程就会直接卡死在read函数中!

修改bug:

既然知道bug以及bug的原因,那我们如何修改bug呢?

很简单!只需要在创建子进程的时候将继承下来的写端全部关闭即可!(子进程只有读端)

// 在组织
class ManagerChannel
{
public://关闭继承下来的全部写端void CloseAll(){for(auto& e:_c){e.Close();}}private:int _num=0;vector<Channel> _c;
};//启动进程池
void Start()
{for(int i=0;i<_num;i++){ // 创建管道int fd[2];int n = pipe(fd);if (n == -1){cout << "pipe error!\n";}else{// 创建子进程pid_t id = fork();if (id == 0){// 子进程//关闭继承父进程的全部写端_m.CloseAll();// 关闭不需要的端口close(fd[1]);// 执行任务Work(fd[0]);// 退出close(fd[0]);exit(0);     //!!!!!严重注意}else{// 父进程// 关闭不需要的端口close(fd[0]);//记录子进程_m.Inset(fd[1],id);}}}
}

4.最终完整代码

//ProgressPool.hpp#pragma once
#include<iostream>
#include<vector>
#include<unistd.h>
#include<string>
#include<cstring>
#include<sys/types.h>
#include<sys/wait.h>
#include"Task.hpp"
using namespace std;
// 可以之间写方法的实现//先描述
class Channel
{
public:Channel(int fd=0,int id=0):_fd(fd),_id(id){_name="Channel"+to_string(fd)+to_string(id);}string& name(){return _name;}//发送任务码void Send(int code){write(_fd,&code,sizeof(code));}void Close(){close(_fd);}void Wait(){int states=0;waitpid(_id,&states,0);}private:int _fd;pid_t _id;string _name;
};// 在组织
class ManagerChannel
{
public://记录管道fd、子进程的pidvoid Inset(int fd,pid_t id){_c.emplace_back(fd,id);}//选择管道Channel& Select(){Channel& c=_c[_num];_num++;_num%=_c.size();return c;}//关闭写端void CloseRead(){for(auto& e:_c){e.Close();cout<<"关闭:"<<e.name()<<endl;}}//回收子进程void WaitChild(){for(auto& e:_c){e.Wait();cout<<"回收:"<<e.name()<<endl;}}//关闭继承下来的全部写端void CloseAll(){for(auto& e:_c){e.Close();}}private:int _num=0;vector<Channel> _c;
};class ProgressPoll
{
public:ProgressPoll(int num): _num(num){}//读取任务码,并执行void Work(int fd){while (1){int code;int n = read(fd, &code, sizeof(code));if (n > 0){if(n!=sizeof(code))continue;cout<<"子进程["<<getpid()<<"]收到一个任务码!\n";//根据任务码,执行任务_tm.execute(code);}else if (n == 0){cout<<"父进程没有指定任务!子进程退出!\n";break;}else{cout<<"读取任务码错误!\n";break;}}}//启动进程池
void Start()
{for(int i=0;i<_num;i++){ // 创建管道int fd[2];int n = pipe(fd);if (n == -1){cout << "pipe error!\n";}else{// 创建子进程pid_t id = fork();if (id == 0){// 子进程//关闭继承父进程的全部写端_m.CloseAll();// 关闭不需要的端口close(fd[1]);// 执行任务Work(fd[0]);// 退出close(fd[0]);exit(0);     //!!!!!严重注意}else{// 父进程// 关闭不需要的端口close(fd[0]);//记录子进程_m.Inset(fd[1],id);}}}
}//派送任务
void run()
{//选择任务int taskcode=_tm.code();//选着管道(子进程):采取轮询选择auto& c=_m.Select();cout<<"选择子进程["<<c.name()<<"]执行任务\n";//发送任务码c.Send(taskcode);cout<<"发送任务码:"<<taskcode<<endl;
}//停止进程池
void Stop()
{//关闭写端,读端自动关闭!//关闭父进程写段_m.CloseRead();//等待子进程_m.WaitChild();
}private:int _num; //需要几个子进程ManagerChannel _m;TaskManager _tm;
};//TaskManager.hpp#pragma once
#include <iostream>
#include <ctime>class TaskManager
{
public://选择任务码int code(){srand(time(nullptr));return rand() % 5;}//根据子进程传递的任务码,执行对应任务void execute(int code){std::cout<<"执行任务码为:"<<code<<"的任务!\n";}private:
};//test.cpp#include "ProgressPool.hpp"int main()
{// 创建进程池对象ProgressPoll pp(5);// 启动进程池pp.Start();// 派送任务for(int i=0;i<5;i++){pp.run();sleep(2);}//pp.run();// 停止进程池pp.Stop();
}
http://www.dtcms.com/a/340329.html

相关文章:

  • 测试环境搭建和部署(在Linux环境下搭建jdk+Tomcat+mysql环境和项目包的部署)
  • 暖哇科技AI调查智能体上线,引领保险调查风控智能化升级
  • cv2.bitwise_and是 OpenCV 中用于执行按位与运算的核心函数,主要用于图像处理中的像素级操作
  • 【密码学实战】X86、ARM、RISC-V 全量指令集与密码加速技术全景解析
  • 【考研408数据结构-09】 图论进阶:最短路径与最小生成树
  • 【考研408数据结构-05】 串与KMP算法:模式匹配的艺术
  • [论文阅读] 人工智能 + 软件工程 | 从用户需求到产品迭代:特征请求研究的全景解析
  • 【软考架构】软件工程:软件项目管理
  • 用倒计时软件为考研备考精准导航 复习 模拟考试 日期倒计时都可以用
  • SBOM风险预警 | NPM前端框架 javaxscript 遭受投毒窃取浏览器cookie
  • vue3 el-select 默认选中第一个
  • 使用Redis 分布式锁防止短信验证码重复下发问题
  • 《防雷电路设计》---TVS介绍
  • Linux系统之部署nullboard任务管理工具
  • C++/Qt开发:TCP通信连接软件测试方法:ECHO指令
  • C++中的原子操作,自旋锁
  • Vibe Coding:轻松的幻觉,沉重的未来
  • HTML <meta name=“color-scheme“>:自动适配系统深色 / 浅色模式
  • AutoGLM2.0背后的云手机和虚拟机分析(非使用案例)
  • Mac 4步 安装 Jenv 管理多版本JDK
  • 基于YOLO11的手机违规使用检测模型训练实战
  • MySQL诊断系列(3/6):索引分析——5个SQL揪出“僵尸索引”
  • Docker Compose命令一览(Docker Compose指令、docker-compose命令)
  • 动态规划----8.乘积最大子数组
  • 遥感机器学习入门实战教程|Sklearn 案例④ :多分类器对比(SVM / RF / kNN / Logistic...)
  • 详解 scikit-learn 数据预处理工具:从理论到实践
  • 5.4 4pnpm 使用介绍
  • 给你的Unity编辑器添加实现类似 Odin 的 条件显示字段 (ShowIf/HideIf) 功能
  • Scikit-learn 预处理函数分类详解
  • pnpm : 无法加载文件 C:\Program Files\nodejs\pnpm.ps1,因为在此系统上禁止运行脚本。