【Linux系统】匿名管道以及进程池的简单实现
前言:
上文我们讲到了动静态库的简单制作【Linux系统】动静态库的制作-CSDN博客
本文我们来讲一讲进程通信中的匿名管道!
点个关注!
理解
1.进程间通信的目的
数据传输:一个进程需要将它的数据发送给另一个进程
资源共享:多个进程之间共享同样的资源
通知事件的发生:一个进程需要向另一个进程或一组进程发送进程,通知它们发生了上什么事件(如:子进程退出时,要通知父进程进行等待)
进程控制:有些进程希望完全控制另一个进程,控制的进程希望拦截到另一个进程的所有陷入和异常,并能及时的知道它的状态改变
2.什么是进程间通信
进程通信(Inter-Process Communication,简称 IPC)是指操作系统中不同进程之间交换数据、传递信息或协调行为的机制。
在操作系统中,进程是独立运行的程序实例,拥有各自独立的内存空间和资源(如地址空间、文件描述符等),默认情况下相互隔离。但实际应用中,多个进程 often 需要协作完成任务(例如一个进程生成数据,另一个进程处理数据),这就需要通过特定的方式打破隔离,实现信息交换 —— 这就是进程通信的核心目的
3.如何通信
进程通信的本质是:先让不同的进程看到同一份资源,再然后才能实现通信!
而我们知道进程的有独立性的!想让进程之间看到同一份资源,这份资源就只能由OS提供,而如何提供呢?通过系统调用 ->设计统一的通信接口!
匿名管道
1.接口使用方法
#include <unistd.h>
int pipe(int fd[2]);输出型参数:int fd[2]
这是一个长度为 2 的 int 数组,用于存储管道的两个文件描述符
fd[0]:管道的读端(read end),专门用于从管道读取数据
fd[1]:管道的写端(write end),专门用于向管道写入数据
调用 pipe() 后,内核会自动填充这两个文件描述符,进程通过操作它们实现对管道的读写返回值:成功返回0,失败返回-1
2.匿名管道原理
匿名管道是基于已经有的技术,再次开发的!
匿名管道本质也是文件!也有缓冲区(管道缓冲区)!
但是匿名管道的管道缓冲区不需要刷新到磁盘,也与磁盘没有关系!起作用主要用于暂时保存通信数据!
匿名管道是内存级的文件!在原有的文件控制上被再次设计,配上单独的接口。并且匿名管道没有文件路径,没有文件名!
如下图:
如何保证两个进程打开是同一个管道:
子进程会继承父进程的file_struct!所有指向的管道一定是同一个!
就算是子进程与子进程之间通信,它们的file_struct都是继承父进程的,都会指向同一个管道的。
3.匿名管道的特性与通信
5种特性:
匿名管道,只能用于本机通信,只能用于有血缘关系的进程进行通信(常用于父子通信)! |
匿名管道文件,自带同步机制:包含4种通信情况! |
匿名管道的面向字节流的 |
匿名管道是单向通信的!(属于半双工。半双工:任何时候一个发,一个收。全双工:任何时候,可以同时收发) |
匿名管道的生命周期是随进程的! |
4种通信情况:
写慢,读快:读端阻塞,等待写端 |
写快,读慢:管道缓冲区写满了,就要阻塞等待读端 |
写关闭,读继续:一直读取,知道读到完,返回0,表示读取到文件末尾 |
写继续,读关闭:无意义操作!OS会自动杀掉写端进程(通过信号:13 SIGPIPE杀掉) |
4.测试匿名管道接口
#include<iostream>
#include<unistd.h>
#include<cstdlib>
#include<sys/types.h>
#include<cstring>
#include<sys/wait.h>
using namespace std;void ChildWrite(int fd)
{while(1){char buffer[] = {"hello pipe!"};write(fd, buffer, strlen(buffer));sleep(1);}
}void ParentRead(int fd)
{char buffer[1024];while(1){int n=read(fd,buffer,sizeof(buffer)-1);if (n > 0){buffer[n] = 0;printf("子进程说:%s\n", buffer);}else if(n==0){cout<<"子进程已经退出!父进程也执行退出!\n";break;}else{cout<<"读取失败!\n";break;}}
}// 子写 -> 父读
int main()
{//1.创建管道int fds[2] = {0};ssize_t n = pipe(fds);if(n<0){cout<<"pipi error!\n";exit(1);}cout<<"fds[0]:"<<fds[0]<<endl; //fds[0]:表示读取端cout<<"fds[0]:"<<fds[1]<<endl; //fds[1]:表示写入端//2.创建子进程int x = fork();if(x<0){cout<<"fork error!\n";exit(1);}else if(x==0){//子进程//3.关闭不需要端口,关闭读端close(fds[0]);ChildWrite(fds[1]);//子进程退出close(fds[1]);}else{//父进程//3.关闭不需要端口,关闭写端close(fds[1]);ParentRead(fds[0]);//父进程退出close(fds[0]);//4.等待int states=0;int q = wait(&states);}
}
基于匿名管道实现进程池
把进程当成 “可复用的资源”,提前建好一批,随时待命:
- 预先创建 “资源进程”:程序启动时,创建固定数量的进程(比如 4 个,或和 CPU 核心数一致),这些进程平时处于 空闲等待状态(不干活,但占着资源待命)。
- 管理进程调度任务:有一个 “管理进程”(比如父进程)负责把任务分配给空闲的资源进程;资源进程处理完任务后,不会销毁,而是回到池里继续等下一个任务。
类比:餐厅预先雇好 4 个厨师(资源进程),来了订单(任务),经理(管理进程)安排厨师做菜,做完继续待命,不用每次重新雇厨师。
1.大体思路
1.进程池对象的创建:要有对应的类,采用先描述在组织的思路!先描述管道,在采用对应的结构体组织管道,最后再封装为进程池
2.进程池的启动:由父进程创建多个子进程,并由管道链接!父进程执行记录子进程的工作,子进程执行指定的任务。此时子进程阻塞在读取端,等待管道有数据写入!
3.任务的执行:选择子进程、选择任务返回任务码、向指定的子进程对应的管道写入任务码!子进程通过管道读取任务码,根据任务码去执行指定的任务!
4.进程池的中止:上面我们讲到,写端关闭、读端会自动关闭!所有只要将管道的写段全部关闭!那么读端就会全部关闭,子进程识别到读端关闭,就会退出!退出之后,再由父进程继续等待即可完成中止!
2.完整代码
//ProgressPool.hpp#pragma once
#include<iostream>
#include<vector>
#include<unistd.h>
#include<string>
#include<cstring>
#include<sys/types.h>
#include<sys/wait.h>
#include"Task.hpp"
using namespace std;
// 可以之间写方法的实现//先描述
class Channel
{
public:Channel(int fd=0,int id=0):_fd(fd),_id(id){_name="Channel"+to_string(fd)+to_string(id);}string& name(){return _name;}//发送任务码void Send(int code){write(_fd,&code,sizeof(code));}void Close(){close(_fd);}void Wait(){int states=0;waitpid(_id,&states,0);}private:int _fd;pid_t _id;string _name;
};// 在组织
class ManagerChannel
{
public://记录管道fd、子进程的pidvoid Inset(int fd,pid_t id){_c.emplace_back(fd,id);}//选择管道Channel& Select(){Channel& c=_c[_num];_num++;_num%=_c.size();return c;}//关闭写端void CloseRead(){for(auto& e:_c){e.Close();cout<<"关闭:"<<e.name()<<endl;}}//回收子进程void WaitChild(){for(auto& e:_c){e.Wait();cout<<"回收:"<<e.name()<<endl;}}private:int _num=0;vector<Channel> _c;
};class ProgressPoll
{
public:ProgressPoll(int num): _num(num){}//读取任务码,并执行void Work(int fd){while (1){int code;int n = read(fd, &code, sizeof(code));if (n > 0){if(n!=sizeof(code))continue;cout<<"子进程["<<getpid()<<"]收到一个任务码!\n";//根据任务码,执行任务_tm.execute(code);}else if (n == 0){cout<<"父进程没有指定任务!子进程退出!\n";break;}else{cout<<"读取任务码错误!\n";break;}}}//启动进程池
void Start()
{for(int i=0;i<_num;i++){ // 创建管道int fd[2];int n = pipe(fd);if (n == -1){cout << "pipe error!\n";}else{// 创建子进程pid_t id = fork();if (id == 0){// 子进程// 关闭不需要的端口close(fd[1]);// 执行任务Work(fd[0]);// 退出close(fd[0]);exit(0); //!!!!!严重注意}else{// 父进程// 关闭不需要的端口close(fd[0]);//记录子进程_m.Inset(fd[1],id);}}}
}//派送任务
void run()
{//选择任务int taskcode=_tm.code();//选着管道(子进程):采取轮询选择auto& c=_m.Select();cout<<"选择子进程["<<c.name()<<"]执行任务\n";//发送任务码c.Send(taskcode);cout<<"发送任务码:"<<taskcode<<endl;
}//停止进程池
void Stop()
{//关闭写端,读端自动关闭!//关闭父进程写段_m.CloseRead();//等待子进程_m.WaitChild();
}private:int _num; //需要几个子进程ManagerChannel _m;TaskManager _tm;
};//TaskManager.hpp#pragma once
#include <iostream>
#include <ctime>class TaskManager
{
public://选择任务码int code(){srand(time(nullptr));return rand() % 5;}//根据子进程传递的任务码,执行对应任务void execute(int code){std::cout<<"执行任务码为:"<<code<<"的任务!\n";}private:
};//test.cpp#include "ProgressPool.hpp"int main()
{// 创建进程池对象ProgressPoll pp(5);// 启动进程池pp.Start();// 派送任务for(int i=0;i<5;i++){pp.run();sleep(2);}//pp.run();// 停止进程池pp.Stop();
}
3.修改小bug
上面的程序中存在一个小bug!
是在关闭写端的过程中,我们看上面代码关闭写段是只调用了一个close函数关闭了一个端口!但事实上我们的代码中管道不仅仅一个端口!
因为子进程的创建是会继承父进程的数据的!父进程中指向的管道,会被子进程继承,那么子进程也会指向对应的管道了!
这就会导致一个管道可能有多个进程指向!存在多个写端!
而存在多个写段会影响什么呢?因为我们上面的代码确实存在这个问题,但是可以正常运行啊!
看似没问题!实则暗藏玄机:
首先,如果存在多个写端,在进程池终止的时候,我们仅仅关闭的父进程的写段端,没有关闭其他子进程的写段,这会导致什么?这会导致管道的读端无法自动关闭!从而导致子进程无法识别到中止意图,无法退出!
然后,那为什么我们上面的代码可以正常运行呢?那是因为我们上面的代码是在任务指向完之后,再去进行的中止!而执行完任务后子进程是自动退出的!子进程退出后,子进程中的写段、读端页会被自动关闭!所以我们之前的代码才可以正常运行!
但是!如果在子进程任务还没有执行完时,子进程还没有退出时,我们之间调用Stop函数就会卡死!因为父进程写段关闭,但还有其他子进程指向,并且子进程并没有写入数据的功能!子进程就会直接卡死在read函数中!
修改bug:
既然知道bug以及bug的原因,那我们如何修改bug呢?
很简单!只需要在创建子进程的时候将继承下来的写端全部关闭即可!(子进程只有读端)
// 在组织
class ManagerChannel
{
public://关闭继承下来的全部写端void CloseAll(){for(auto& e:_c){e.Close();}}private:int _num=0;vector<Channel> _c;
};//启动进程池
void Start()
{for(int i=0;i<_num;i++){ // 创建管道int fd[2];int n = pipe(fd);if (n == -1){cout << "pipe error!\n";}else{// 创建子进程pid_t id = fork();if (id == 0){// 子进程//关闭继承父进程的全部写端_m.CloseAll();// 关闭不需要的端口close(fd[1]);// 执行任务Work(fd[0]);// 退出close(fd[0]);exit(0); //!!!!!严重注意}else{// 父进程// 关闭不需要的端口close(fd[0]);//记录子进程_m.Inset(fd[1],id);}}}
}
4.最终完整代码
//ProgressPool.hpp#pragma once
#include<iostream>
#include<vector>
#include<unistd.h>
#include<string>
#include<cstring>
#include<sys/types.h>
#include<sys/wait.h>
#include"Task.hpp"
using namespace std;
// 可以之间写方法的实现//先描述
class Channel
{
public:Channel(int fd=0,int id=0):_fd(fd),_id(id){_name="Channel"+to_string(fd)+to_string(id);}string& name(){return _name;}//发送任务码void Send(int code){write(_fd,&code,sizeof(code));}void Close(){close(_fd);}void Wait(){int states=0;waitpid(_id,&states,0);}private:int _fd;pid_t _id;string _name;
};// 在组织
class ManagerChannel
{
public://记录管道fd、子进程的pidvoid Inset(int fd,pid_t id){_c.emplace_back(fd,id);}//选择管道Channel& Select(){Channel& c=_c[_num];_num++;_num%=_c.size();return c;}//关闭写端void CloseRead(){for(auto& e:_c){e.Close();cout<<"关闭:"<<e.name()<<endl;}}//回收子进程void WaitChild(){for(auto& e:_c){e.Wait();cout<<"回收:"<<e.name()<<endl;}}//关闭继承下来的全部写端void CloseAll(){for(auto& e:_c){e.Close();}}private:int _num=0;vector<Channel> _c;
};class ProgressPoll
{
public:ProgressPoll(int num): _num(num){}//读取任务码,并执行void Work(int fd){while (1){int code;int n = read(fd, &code, sizeof(code));if (n > 0){if(n!=sizeof(code))continue;cout<<"子进程["<<getpid()<<"]收到一个任务码!\n";//根据任务码,执行任务_tm.execute(code);}else if (n == 0){cout<<"父进程没有指定任务!子进程退出!\n";break;}else{cout<<"读取任务码错误!\n";break;}}}//启动进程池
void Start()
{for(int i=0;i<_num;i++){ // 创建管道int fd[2];int n = pipe(fd);if (n == -1){cout << "pipe error!\n";}else{// 创建子进程pid_t id = fork();if (id == 0){// 子进程//关闭继承父进程的全部写端_m.CloseAll();// 关闭不需要的端口close(fd[1]);// 执行任务Work(fd[0]);// 退出close(fd[0]);exit(0); //!!!!!严重注意}else{// 父进程// 关闭不需要的端口close(fd[0]);//记录子进程_m.Inset(fd[1],id);}}}
}//派送任务
void run()
{//选择任务int taskcode=_tm.code();//选着管道(子进程):采取轮询选择auto& c=_m.Select();cout<<"选择子进程["<<c.name()<<"]执行任务\n";//发送任务码c.Send(taskcode);cout<<"发送任务码:"<<taskcode<<endl;
}//停止进程池
void Stop()
{//关闭写端,读端自动关闭!//关闭父进程写段_m.CloseRead();//等待子进程_m.WaitChild();
}private:int _num; //需要几个子进程ManagerChannel _m;TaskManager _tm;
};//TaskManager.hpp#pragma once
#include <iostream>
#include <ctime>class TaskManager
{
public://选择任务码int code(){srand(time(nullptr));return rand() % 5;}//根据子进程传递的任务码,执行对应任务void execute(int code){std::cout<<"执行任务码为:"<<code<<"的任务!\n";}private:
};//test.cpp#include "ProgressPool.hpp"int main()
{// 创建进程池对象ProgressPoll pp(5);// 启动进程池pp.Start();// 派送任务for(int i=0;i<5;i++){pp.run();sleep(2);}//pp.run();// 停止进程池pp.Stop();
}