【Linux篇】进程间通信 - 匿名管道
📌 个人主页: 孙同学_
🔧 文章专栏:Liunx
💡 关注我,分享经验,助你少走弯路!
文章目录
- 一. 进程通信的介绍
- 1.1 进程间通信目的
- 1.2 进程间通信的本质
- 1.3 进程间通信发展
- 1.4 进程间通信分类
 
- 二. 管道
- 2.什么是管道
 
- 三. 匿名管道
- 3.1匿名管道的介绍
- 3.2 用fork来共享管道原理
- 3.3 站在文件描述符角度-深度理解管道
- 3.4 站在内核角度-管道本质
- 3.5 管道样例
- 3.5.1 测试管道读写
- 3.5.2 基于匿名管道---进程池
 
 
一. 进程通信的介绍
1.1 进程间通信目的
- 数据传输: 一个进程需要将它的数据传输给另一个进程
- 资源共享: 多个进程之间需要共享同一份资源
- 通知事件: 一个进程需要向另一个进程或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程)
- 进程控制: 有些进程完全希望控制另一个进程的执行(如debug进程),此时控制进程希望能够拦截另一个进程所有陷入和异常,并能够及时知道它的状态改变
1.2 进程间通信的本质
- 本质:是先让不同的进程,先看到同一份资源[“内存”]。(然后才有通信的条件)其中这份资源不是任何进程提供的,这份资源由操作系统(OS)提供,并且操作系统在创建或者销毁这份资源时必须得给我们提供各种系统调用,只要是系统调用,那就是OS的接口,那就需要设计者设计统一的通信接口。
1.3 进程间通信发展
- 管道 :基于文件的管道通信
- System V进程间通信
- POSIX进程间通信
1.4 进程间通信分类
管道
- 匿名管道
pipe- 命名管道
System V IPC
- System V 消息队列
- System V 共享内存
- System V 信号量
POSIX IPC
- 消息队列
- 共享内存
- 信号量
- 互斥量
- 条件变量
- 读写锁
二. 管道
2.什么是管道
- 管道是unix中最古老的进程间通信的形式
- 我们把从一个进程连接到另一个进程的数据流称之为一个管道
  
三. 匿名管道
3.1匿名管道的介绍
匿名管道通常进行父子通信
 补:匿名管道不是具体的文件,而是内存中的一个缓冲区。
3.2 用fork来共享管道原理

3.3 站在文件描述符角度-深度理解管道
 操作系统为我们提供了一个内存级的管道文件,只需要调用某些接口就能打开,把这个文件以读写两种方式打开。我们以前打开文件可以是读打开,写打开,或者读写打开,只不过我们以前只返回一个文件描述符,而今天这个文件是同时以读写的方式打开,把读写打开的两个文件描述符返回给上层,这就叫做完成管道创建,而管道通信是进行父子间通信的,接下来fork创建子进程,子进程会继承父进程的文件描述符表,发生浅拷贝,所以子进程也能拿到曾经父进程以读和写打开的这个管道文件。
  这种管道只能做父子的单向通信,比如我们想让父进程写,子进程读,所以就需要关掉父子进程不需要的文件描述符,此时就形成了一个单向数据通信的信道,我们把这个信道称之为管道
 
 这个管道是被OS单独设计的,所以就有单独的系统调用,这个系统调用叫做pipe
 
 创建成功返回值为0,否则就为-1,这个参数为输出型参数,pipe成功后会把读写描述符放在这个具有两个元素的数组里。这种管道创建也不需要文件路径,也就没有文件名,所以也叫匿名管道,因为它是内存级的。
- 那么问题就来了,没有名字那怎么确保进程打开的是同一个管道文件?
 答案是子进程继承文件描述符表
3.4 站在内核角度-管道本质
 在Linux操作系统里面,这个struct file结构是内存级的,在我们的磁盘上并不存在这个结构,磁盘上只存在这个文件的属性inode和它的数据块。
 
 实际真实情况下,struct file是会给我们子进程拷贝一份,子进程的文件描述符指向浅拷贝的struct file
 
3.5 管道样例
创建管道
 
 创建成功返回值为0,否则就为-1,这个参数为输出型参数,pipe成功后会把读写描述符放在这个具有两个元素的数组里。
#include<iostream>
#include<unistd.h>int main()
{int fds[2] = {0};//2个文件描述符int n = pipe(fds);if(n < 0){std::cerr << "pipe error" << std::endl;return 1; //创建失败退出码设置为1}std::cout << "fds[0]" << fds[0] << std::endl;//查看fds中的0下标std::cout << "fds[1]" << fds[1] << std::endl;//查看fds中的1下标return 0;
}

 我们至此就验证了调用一个pipe就能够把管道文件以文件的方式打开。因为它得到的文件描述符是3和4,文件描述符的本质是数组下标,012被占用了。
3.5.1 测试管道读写
#include<iostream>
#include<unistd.h>
#include<sys/types.h>
#include<sys/wait.h>
#include<cstdio>
#include<cstring>void ChildWrite(int wfd)
{char buffer[1024];int cnt = 0;while(true){//格式化输出snprintf(buffer,sizeof(buffer),"I am child,pid:%d,cnt:%d",getpid(),cnt++);//这里为什么不需要sizeof(buffer)-1呢?//凡是c语言处理字符串的接口,它会在结尾时自动添加\0。而系统调用属于c语言的下层,它不关心所谓的字符串,所以它在读取时是没有\0的,需要我们自己加。write(wfd,buffer,strlen(buffer)); //把字符串完整的样子写过去sleep(1);//每隔一秒钟写一次}
}void FatherRead(int rfd)
{char buffer[1024];while(true){buffer[0] = 0;//buffer使用之前清成0,把它当做字符串来看ssize_t n = read(rfd,buffer,sizeof(buffer)-1);//读的时候认为接收过来的是一个字符串,读的时候给buffer里预留一个字节的空间,手动给字符串添加\0if(n > 0)//说明读取成功{buffer[n] = 0;//将读到的字符串的后一位置为\0std::cout << "child say: " << buffer << std::endl;}}}
int main()
{//1.创建管道int fds[2] = {0};//2个文件描述符   fds[0]: 读端  fds[1]: 写端int n = pipe(fds);if(n < 0){std::cerr << "pipe error" << std::endl;return 1; //创建失败退出码设置为1}std::cout << "fds[0]" << fds[0] << std::endl;//查看fds中的0下标std::cout << "fds[1]" << fds[1] << std::endl;//查看fds中的1下标//2.创建子进程pid_t id = fork();if(id == 0){//子进程//codeclose(fds[0]);//子进程关闭不需要的读端ChildWrite(fds[1]);//子进程写入,是向文件描述符1号中去写入close(fds[1]);exit(0);//子进程跑完就让子进程终止}//父进程//3.关闭不需要的读写端,形成通信通道//f -> r,c -> w  让父进程读,子进程写close(fds[1]);//父进程关闭不需要的写端FatherRead(fds[0]);//父进程进行读取waitpid(id,nullptr,0);close(fds[0]);return 0;
}

 我们可以看到父进程将子进程的信息打印了出来,也就说明子进程确实把消息给了父进程,这个过程中不会发生写时拷贝。
- 管道的五种特性
-  匿名管道,匿名管道只能用来进行具有血缘关系的进程进行进程之间的通信(常用父与子) 
-  管道文件自带同步机制(比如子进程每隔3秒写一条,那么父进程也会每隔3秒读一条) 
-  管道是面向字节流的(写的快读的慢,单次读到的数据取决于缓冲区的大小) 
-  管道是单向通信的,属于半双工的一种特殊情况 
 任何一个时刻,一个发,一个收,叫做半双工;
 任何一个时刻,可以同时发收,叫做全双工;
-  (管道)文件的声明周期是随进程的 
- 管道通信的四种情况
- 写的慢,读的快 — 读端进程就需要阻塞(等写)
- 写的快,读的慢 — 写满缓冲区的时候,写就要阻塞等待(等读)
- 写端关,读端开 — read就会读到返回值为0,表示读到了文件结尾,读端也会自动关闭
- 写端开,读端关 — 写端再写入没有任何意义(OS不会做没有意义的事),OS会杀掉写端进程 ,发送异常信号13(SIGPIPE)
3.5.2 基于匿名管道—进程池
管道的容量: 默认是64kb
 原子性: 原子性指的是一个操作要么完全执行,要么完全不执行,不会出现执行到一半的情况。
ProcessPool.hpp
#ifndef __PROCESS_POOL_HPP__
#define __PROCESS_POOL_HPP__#include<iostream>
#include<cstdlib>
#include<vector>
#include<unistd.h>
#include<sys/wait.h>
#include"Task.hpp"//先描述
class Channel //管道
{
public:Channel(int fd,pid_t id):_wfd(fd),_subid(id){_name = "channels-" + std::to_string(_wfd) + "-" + std::to_string(_subid);//to_string是将一个整数转化成一个字符串}~Channel(){}void Send(int code){int n = write(_wfd,&code,sizeof(code));(void)n; //n在这个函数中定义了,但是没有使用,没有使用编译的时候就会有警告,此处就是对n做一下使用}void Close(){close(_wfd);//关掉写的文件描述符}void Wait(){pid_t rid = waitpid(_subid,nullptr,0);(void)rid; }int Fd(){return _wfd;}pid_t Subid(){return _subid;} std::string Name(){return _name;}private:int _wfd;pid_t _subid;std::string _name;//int lodnum;
};//再组织
class ChannelManager
{
public:ChannelManager():_next(0){};void Insert(int wfd,pid_t subid){_channels.emplace_back(wfd,subid);//Channel c(wfd,subid);//_channels.push_back(std::move(c));}Channel &Select(){auto &c = _channels[_next];_next++;_next %= _channels.size();return c;}void PrintChannel(){for(auto &channel : _channels){std::cout << channel.Name() << std::endl;}}void StopSubProcess(){for(auto &channel : _channels){channel.Close();std::cout << "关闭:" << channel.Name() << std::endl;}}void WaitSubProcess(){for(auto &channel : _channels){channel.Wait();std::cout << "回收:" << channel.Name() << std::endl;}}~ChannelManager(){};private:std::vector<Channel> _channels;//未来父进程对通信信道进行管理,其实就是对vector的管理int _next;
};const int gdefaultnum = 5;//在栈上直接定义class ProcessPool //进程池
{
public:ProcessPool(int num):_process_num(num){_tm.Register(PrintLog);_tm.Register(Download);_tm.Register(Upload);};//子进程要做的工作void Work(int rfd){while(true){//测试//std::cout << "我是子进程,我的rfd是:"<< rfd << std::endl;//sleep(5);//每隔5秒打印一条//int code = 0;ssize_t n = read(rfd,&code,sizeof(code));if(n > 0)//读成功{if(n != sizeof(code))//说明通信时传入的数据不规范{continue;//继续让它读}std::cout << "子进程[" << getpid() << "]收到一个任务码:" << code << std::endl;_tm.Execute(code);//执行任务}else if(n == 0){//服务端把写端关闭了就会读到0std::cout << "子进程退出" << std::endl;break;}else{std::cout << "读取错误" << std::endl;break;}}}//创建/启动进程池bool Start(){for(int i = 0;i < _process_num;i++) //只有父进程才会执行for循环{//1.创建管道int pipefd[2] = {0};int n = pipe(pipefd);if(n < 0) return false;//如果创建失败,返回false//走到这说明创建成功了//2.创建子进程pid_t subid = fork();if(subid < 0) return false;//如果子进程创建失败,返回falseelse if(subid == 0){//子进程//3.关闭不需要的文件描述符close(pipefd[1]);//关闭子进程的写Work(pipefd[0]);//子进程做工作close(pipefd[0]);//子进程工作做完关掉读端exit(0);//子进程工作做完}else{//父进程//3.关闭不需要的文件描述符close(pipefd[0]);//关闭父进程的读//wfd可以知道父进程是要干什么的,父进程通过supid可以知道是哪个子进程_cm.Insert(pipefd[1],subid);// wfd ,subid}}return true;}void Debug(){_cm.PrintChannel();}   //给进程池提供任务void Run(){//1. 选择一个任务int taskcode = _tm.Code();//2.负载均衡的选择一个信道(子进程)完成任务auto &c = _cm.Select(); //选择一个channelstd::cout << "选择了一个子进程:" << c.Name() << std::endl;//3. 发送任务c.Send(taskcode);std::cout << "发送了一个任务码:" << taskcode << std::endl;}void Stop(){//关闭父进程的所有写入端_cm.StopSubProcess();//关闭后子进程会进入僵尸,所以还要回收所有子进程_cm.WaitSubProcess();}~ProcessPool(){};
private:ChannelManager _cm;int _process_num;//创建进程的个数TaskManager _tm; 
};#endif
Task.hpp
#pragma once#include<iostream>
#include<vector>
#include<ctime>typedef void (*tast_t)();//定义一个函数指针///////////////////////debug//////////////////////
void PrintLog()
{std::cout << "我是一个打印日志的任务" << std::endl;
}
void Download()
{std::cout << "我是一个下载的任务" << std::endl;
}
void Upload()
{std::cout << "我是一个上传的任务" << std::endl;
}
///////////////////////////////////////////////////class TaskManager
{
public:TaskManager(){srand(time(nullptr));}void Register(tast_t t) //注册任务{_tasks.push_back(t);}int Code()//返回任务{return rand() % _tasks.size();}void Execute(int code)//执行任务{if(code >= 0 && code < _tasks.size()){_tasks[code]();//以函数指针的方式调用想完成的任务 }}~TaskManager(){}
private:std::vector<tast_t> _tasks;//vector中放函数指针
};
Main.cc
#include"ProcessPool.hpp"int main()
{//ChannelManager.Build();//构建一个通信信道,其实就是管道//创建进程池对象ProcessPool pp(gdefaultnum);//创建gdefaultnum个进程//启动进程池pp.Start();//pp.Debug();//int task_code = 0;//任务码//自动派发任务int cnt = 10;//while(cnt--)//让进程池跑10次{//1.选择一个信道pp.Run();//把任务码交给进程池,由它转发给子进程sleep(1);//每隔1秒让主进程给进程池内部派送一个任务}//回收,结束进程池pp.Stop();//把进程池停下来//pp.Wait();return 0;
}   
Makefile
process_pool:Main.ccg++ -o $@ $^ -std=c++11
.PHONY:clean
clean:rm -f process_pool
有个小问题,当我们把代码设计成下面这样时我们就会发现当我们进行stop的时候就停住了,这是为什么呢?
void StopSubProcess(){for(auto &channel : _channels){channel.Close();std::cout << "关闭:" << channel.Name() << std::endl;}}void WaitSubProcess(){for(auto &channel : _channels){channel.Wait();std::cout << "回收:" << channel.Name() << std::endl;}}
//写在一块void CloseAndWait(){for(auto &channel : _channels){channel.Close();std::cout << "关闭:" << channel.Name() << std::endl;channel.Wait();std::cout << "回收:" << channel.Name() << std::endl;}}

 所以如果将来我们创建了5个子进程,将来就会有5个端口指向子进程1的写端(父进程加四个子进程),对于下一个子进程来讲就会有4个端口指向该子进程的写端。所以当我们关闭子进程1的写端的时候只是关闭了父进程的,但这个文件并没有真的被关掉,此时只是引用计数由5变为4,当进行CloseAndWait的时候先关闭指定的文件描述符,紧接着调用Wait等待这个子进程,但这个子进程并不会退出,它会在Wait那里阻塞住。这就是上面代码现象的原因。
- 解决方法:
- 倒着关闭文件描述符
 由于最后一个子进程的写端只有父进程指向,并且最后一个子进程的哥哥们的写端我都要指向,所以我们的第一种方案就是倒着关。倒着关闭最后一个子进程后,倒数第二个子进程的写端就只有父进程指向了。
		// 解决方案一:倒着关 for(int i = _channels.size()-1; i >= 0;i--){_channels[i].Close();std::cout << "关闭:" << _channels[i].Name() << std::endl;_channels[i].Wait();std::cout << "回收:" << _channels[i].Name() << std::endl;}
- 让父进程一个人指向所有子进程的W端
 我们在创建子进程的时候不再让弟弟进程指向哥哥进程的写端,而让父进程一个人指向子进程的写端。让子进程在拿到父进程的文件描述符表后关闭父进程文件描述符表指向哥哥进程的写端。
 那么问题来了,这个子进程它的哥哥进程的所有写端在哪里?
 答案是在我们的ChannelManager当中所有的vector中,父进程每次会把自己的写端插入到vector中(_cm.Insert(pipefd[1],subid);),如何关?
 for(std::vector< Channels> _channels) _channels.close()
//创建/启动进程池bool Start(){for(int i = 0;i < _process_num;i++) //只有父进程才会执行for循环{//1.创建管道int pipefd[2] = {0};int n = pipe(pipefd);if(n < 0) return false;//如果创建失败,返回false//走到这说明创建成功了//2.创建子进程pid_t subid = fork();if(subid < 0) return false;//如果子进程创建失败,返回falseelse if(subid == 0){//子进程//让子进程关闭继承下来的哥哥进程的写端_cm.CloseAll();//子进程拿到的是历史上父进程打开的所有的写端//3.关闭不需要的文件描述符close(pipefd[1]);//关闭子进程的写Work(pipefd[0]);//子进程做工作close(pipefd[0]);//子进程工作做完关掉读端exit(0);//子进程工作做完}else{//父进程//3.关闭不需要的文件描述符close(pipefd[0]);//关闭父进程的读//wfd可以知道父进程是要干什么的,父进程通过supid可以知道是哪个子进程_cm.Insert(pipefd[1],subid);// wfd ,subid}}return true;void CloseAll(){for(auto &channel : _channels){channel.Close();std::cout << "关闭:" << channel.Name() << std::endl;}}
👍 如果对你有帮助,欢迎:
- 点赞 ⭐️
- 收藏 📌
- 关注 🔔

