【Linux】匿名管道和进程池
1.匿名管道
因为进程具有独立性,进程间通信需要操作系统先建立信道,先让不同的进程看到同一份资源,然后才能通信。管道也是文件,属于文件,因为这个管道可以直接用pipe创建,没有名字,所以叫匿名管道。
匿名管道只能用来进行有“血缘关系”的进程的进程间通信,如父进程和子进程之间,或者两个子进程之间。
1.1 创建管道
创建管道,用系统调用函数pipe,参数是一个输出型参数,创建成功返回0,创建失败返回-1,并且错误码被设置。fd[0]通常表示读端,fd[1]通常表示写端。
#include <iostream>
#include <unistd.h>
using namespace std;
int main()
{int fsd[2] = {0};int n = pipe(fsd);if(n<0) //创建失败,向stderr输出错误信息{cerr<< "pipe error" <<endl;return 1;}cout << "fds[0]:" << fsd[0] << endl; //输出读端的文件描述符cout << "fds[1]:" << fsd[1] << endl; //输出写端的文件描述符return 0;
}
1.2 创建子进程
fork创建子进程,然后父进程还要waitpid等待子进程。
如果 waitpid 执行成功,它返回子进程的进程号;如果出现错误,则返回 -1,并将错误原因存放在 errno 变量中。
#include <iostream>
#include <sys/types.h>
#include <unistd.h>
#include <sys/wait.h>using namespace std;
int main()
{// 1. 创建管道int fsd[2] = {0};int n = pipe(fsd);if (n < 0) // 创建失败,向stderr输出错误信息{cerr << "pipe error" << endl;return 1;}cout << "fds[0]:" << fsd[0] << endl; // 输出读端的文件描述符cout << "fds[1]:" << fsd[1] << endl; // 输出写端的文件描述符// 2.创建子进程pid_t pid = fork();if (pid == 0) // 子进程{// ... 子进程执行的代码exit(0);}waitpid(pid, nullptr, 0); // 等待子进程结束return 0;
}
1.3 关闭不需要的读写端
管道是单向通信的,属于半双工的一种特殊情况。
- 半双工:任何一个时刻,一个发,一个收
- 全双工:任何一个时刻,可以同时收发
上面的图是子进程读数据父进程写数据,父进程写,就关闭fd[0],子进程读,就关闭fd[1],父进程读子进程写则相反,下面以子进程写数据父进程读数据为例。
// 2.创建子进程pid_t pid = fork();if (pid == 0) // 子进程{// 3.关闭子进程的读端,让子进程读写数据close(fds[0]);exit(0);}// 3.关闭父进程的写端,让父进程读读数据close(fds[1]);waitpid(pid, nullptr, 0); // 等待子进程结束
此时两个进程就建立了单向的信道。
1.4 父子进程之间的通信
写数据的时候可以先用snprintf函数对要写入的数据格式化存放在buffer里。
用write函数从buffer里拿数据往管道里写count个字节,返回值是实际写入的字节数。
我们让子进程循环写入数据。
void ChildWrite(int pfd)
{char buffer[1024] = {0};int cnt = 0; // 设置一个变量while (1){snprintf(buffer, sizeof(buffer), "子进程写的数据, pid:%d, 变量:%d", getpid(), cnt);ssize_t n = write(pfd, buffer, strlen(buffer)); // 写数据if (n < 0){cerr << "写失败" << endl;break;}cnt++; // 变量自增sleep(1); // 睡眠1秒}
}
- 往文件里写入内容的时候,写strlen(buffer)字节的数据,不用把\0写入,因为字符串以\0结尾是C语言的标准,但文件没有这样的规定,所以写入的时候用strlen求大小,不写入\0。
- 凡是C语言的接口,如前面的snprintf,处理字符串时,都会自动在末尾添加\0,系统调用的函数是不会有这个操作的,所以snprintf的参数传参时是sizeof(buffer)而不是sizeof(buffer)-1,因为默认会在结尾添加\0。
用read函数从指定的文件描述符里读数据,读到buffer里,读count个字节,返回实际读取的字节数。
void FatherRead(int pfd)
{char buffer[1024];while (1){buffer[0] = 0; // 清空缓冲区ssize_t n = read(pfd, buffer, sizeof(buffer) - 1);if (n < 0) // 读失败{cerr << "读失败" << endl;break;}buffer[n] = '\0'; // 字符串结尾加上'\0'cout << "父进程读到的数据为:" << buffer << endl;}
}
- 读的时候读sizeof(buffer)-1个字节的数据,因为要预留一个位置给\0,因为我们前面没写入\0,所以读的时候就要自己手动添加(我们期望读sizeof(buffer)-1个字节的数据,但实际读了n个字节,所以直接把buffer的第n个位置设为\0,转成字符串)。
// 2.创建子进程pid_t pid = fork();if (pid == 0) // 子进程{// 3.关闭子进程的读端,让子进程读写数据close(fds[0]);ChildWrite(fds[1]);exit(0);}// 3.关闭父进程的写端,让父进程读读数据close(fds[1]);FatherRead(fds[0]); // 父进程读数据waitpid(pid, nullptr, 0); // 等待子进程结束
运行看程序的结果。
这样子进程写入的数据就被父进程拿到了,并且父进程能拿到子进程cnt的变化,说明这里没有进行写时拷贝,更加证明是子进程把数据传递给了父进程。
子进程写入时,我们sleep(1),但是父进程没有,父进程的read在管道没有读到数据的时候,会阻塞住,等着管道被写入新数据,所以虽然父进程没有sleep(1),但是父进程在打印数据的时候也是间隔1秒打印一次,这能说明管道文件自带同步机制。
当我们让子进程不间断的写入数据,父进程每隔3秒在读一次数据,结果如下。
- 当管道中写的快,读得慢时,读到的结果取决于buffer的大小,和写入无关,所以管道是面向字节流的。
- 文件(管道)的生命周期是随进程的。
4种通信情况:
- 写的慢,读的快:读端阻塞(进程)
- 写的快,读的慢:写满了的时候,写端阻塞
- 写着写着不写了,读一直在读:read会返回0,表示读到了文件结尾
- 一直在写,但不读了:写端此时写入就没有意义,并且还在浪费空间,操作系统会kill写端进程,发送异常信号13)SIGPIPE
验证第4种情况:
#include <iostream>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <sys/types.h>
#include <unistd.h>
#include <sys/wait.h>
using namespace std;void ChildWrite(int pfd)
{char buffer[1024] = {0};int cnt = 0; // 设置一个变量while (1){snprintf(buffer, sizeof(buffer), "子进程写的数据, pid:%d, 变量:%d", getpid(), cnt);ssize_t n = write(pfd, buffer, strlen(buffer)); // 写数据if (n < 0){cerr << "写失败" << endl;break;}cnt++; // 变量自增sleep(1); // 睡眠1秒}
}void FatherRead(int pfd)
{char buffer[1024];while (1){buffer[0] = 0; // 清空缓冲区ssize_t n = read(pfd, buffer, sizeof(buffer) - 1);if (n < 0) // 读失败{cerr << "读失败" << endl;break;}buffer[n] = '\0'; // 字符串结尾加上'\0'cout << "父进程读到的数据为:" << buffer << endl;break; //读一次数据后直接退出}
}int main()
{//创建管道int fds[2] = {0};int n = pipe(fds);if (n < 0) // 创建失败,向stderr输出错误信息{cerr << "pipe error" << endl;return 1;}cout << "fds[0]:" << fds[0] << endl; // 输出读端的文件描述符cout << "fds[1]:" << fds[1] << endl; // 输出写端的文件描述符//创建子进程pid_t pid = fork();if (pid == 0) // 子进程{close(fds[0]);ChildWrite(fds[1]); //子进程一直写exit(0);}close(fds[1]);FatherRead(fds[0]); // 父进程读一次数据就不读了close(fds[0]);//关闭读端int status = 0;pid_t rid = waitpid(pid, &status, 0); // 等待子进程结束if(rid > 0){printf("子进程已退出,退出码:%d, 退出信号:%d\n", (status>>8)&0xFF, status&0x7F);}return 0;
}
- wait和waitpid,都有⼀个status参数,该参数是⼀个输出型参数,由操作系统填充。
- 如果传递NULL,表⽰不关⼼⼦进程的退出状态信息。
- 否则,操作系统会根据该参数,将⼦进程的退出信息反馈给⽗进程。
- status不能简单的当作整形来看待,可以当作位图来看待,具体细节如下图(只研究status低16 ⽐特位)
2.进程池
我们可以让父进程一次创建出很多个进程和管道,
父进程如果没有往管道里写入,子进程就会一直阻塞,此时就像前面的情况2(写的快读的慢),只要我们规定读和写都是一个int类型4个字节大小的数据,就不会有字节流的问题,这个int的变量可以是一个任务码。
父进程往哪个管道写入,哪个子进程就会读到父进程写入的消息,然后运行一次,此时父进程就相当于通过管道来暂停或唤醒相应的子进程,父进程发送这个任务码一方面唤醒了子进程,另一方面让子进程完成某种工作,这种提前把子进程创建一批就叫做进程池。
2.1 通信信道的建立
创建两个文件,一个Main.cc,一个ProcessPool.hpp,还有一个Makefle文件。
//ProcessPool.hpp文件
#ifndef __PROCESS_POOL_HPP__
#define __PROCESS_POOL_HPP__
#include <iostream>
using namespace std;#endif
// Main.cc文件
#include "ProcessPool.hpp"
int main()
{return 0;
}
//Makefile文件
ProcessPool: Main.ccg++ -o $@ $^.PHONY: clean
clean:rm -f ProcessPool
首先我们需要一个进程池的类。
//ProcessPool.hpp文件
class ProcessPool //进程池
{
public:ProcessPool(){}~ProcessPool(){}private:};
进程池的接口首先要有创建进程池,我们要知道这个进程池里要创建的进程的个数,一旦进程的数量确定了,管道的数量也就能确定,这里先固定一个数,假如就创建5个。
这里创建匿名管道的方法和前面一样,三步骤:1.创建管道 2.创建子进程 3.形成单向通信信道
因为要创建多个管道和子进程,所以直接循环创建。
class ProcessPool
{
public:ProcessPool(int num = 5):_proc_num(num){}bool Creat(){for(int i = 0; i < _proc_num; i++){//1.创建管道int pipe_fd[2] = {0}; //pipe_fd[0]读端,pipe_fd[1]写端 int ret = pipe(pipe_fd);if(ret < 0) return false;//创建失败//2.创建子进程pid_t pid = fork();if(pid < 0) return false; else if(pid == 0) //子进程{//3.形成单向信道close(pipe_fd[1]);//子进程读,关闭pipe_fd(1)//...子进程工作close(pipe_fd[0]); //完成工作后把读端关掉exit(1); //然后直接退出}else //父进程{close(pipe_fd[0]);//父进程写,关闭pipe_fd(0)//...父进程工作}}return true;}~ProcessPool(){}private:int _proc_num; //创建进程的数量
};
子进程完成自己的任务之后就退出,不会进循环,父进程会跟着for循环一直创建子进程。
但是此时管道的文件描述符以及对应的子进程的pid都是临时的,并且父进程会往多个管道写入,所以父进程要知道往哪个管道写入,每个子进程一个对应一个写端(管道),还要知道这个管道对应的哪个子进程。
2.2 对管道的管理
这里需要对管道先进行描述,然后进行管理。
//ProcessPool.hpp文件
class Pipe //对管道的描述
{Pipe() //构造{}~Pipe() //析构{}
};class ManagePipe //管理管道
{ManagePipe(){}~ManagePipe(){}
};
Pipe类里要包括这个管道的文件描述符,便于父进程选择往那个管道里写,还要有对应子进程的pid,为了方便我们查看,还可以加一个变量来表示这个pipe的名字,然后写一些get方法,也是为了方便我们查看。
class Pipe
{
public:Pipe(int wfd, pid_t pid) //构造:_wfd(wfd), _pid(pid){_pipe_name = "wfd为" + to_string(_wfd) + " --pipe-- 对应子进程pid是:" + to_string(_pid);}~Pipe() //析构{}
private:int _wfd; //管道的文件描述符pid_t _pid; //管道对应的子进程pidstring _pipe_name; //这个管道的名字
};
因为_wfd和_pid不是字符串类型,要用to_string转换一下。
ManagePipe类要管理这些管道,可以用vector做管理,vector里存放Pipe类。
class ManagePipe
{
public:ManagePipe() {}~ManagePipe(){}
private:vector<Pipe> _mp; //用vector管理存放这些Pipe
};
我们每创建一个子进程,ManagePipe类里就要记录一个管道文件描述符,还有对应子进程的pid,构建好的管道信息直接放进vector里面。
class ManagePipe
{
public:ManagePipe() {}bool Insert(int wfd, pid_t pid) //记录管道信息{_pipes.emplace_back(wfd, pid); return true;}~ManagePipe(){}
private:vector<Pipe> _pipes; //用vector管理存放这些Pipe
};
然后只要子进程和管道创建成功了,父进程就往ManagePipe里放,并且这里可以假设子进程就完成一个打印的任务吧。
class ProcessPool
{
public:ProcessPool(int num = 5):_proc_num(num){}void Work(int rfd){cout << "子进程工作... rfd为:" << rfd << endl;}bool Creat(){for(int i = 0; i < _proc_num; i++){//1.创建管道int pipe_fd[2] = {0}; //pipe_fd[0]读端,pipe_fd[1]写端 int ret = pipe(pipe_fd);if(ret < 0) return false;//创建失败//2.创建子进程pid_t pid = fork();if(pid < 0) return false; else if(pid == 0) //子进程{//3.形成单向信道close(pipe_fd[1]);//子进程读,关闭pipe_fd(1)Work(pipe_fd[0]);close(pipe_fd[0]); //完成工作后把读端关掉exit(1); //然后直接退出}else //父进程{close(pipe_fd[0]);//父进程写,关闭pipe_fd(0)_mp.Insert(pipe_fd[1], pid); //记录信息,管理管道和子进程}}return true;}~ProcessPool(){}private:int _proc_num; //创建进程的数量ManagePipe _mp; //管理管道
};
现在我们可以验证一下这个管道是不是创建成功了,先在ManagePipe类里实现一个打印管道名字的接口。
void PrintPipsName() //打印一下管道的名字 {for(auto & p : _pipes){cout << p.Name() << endl;}}
然后再到ProcessPool类里实现一个Debug接口,调用这个PrintPipsName函数。
void Debug(){_mp.PrintPipsName();}
最后在Main.cc里测试一下。
#include "ProcessPool.hpp"
int main()
{ProcessPool pp; //实例化对象pp.Creat(); //创建进程池pp.Debug(); //测试sleep(10);return 0;
}
打印顺序是乱的没关系,因为父进程和子进程同时向显示器在打印,这个结果显示,子进程的读端一直是文件描述符3,pipe的写端文件描述符是4、5、6、7、8,为什么是这个结果我们后面3.画图理解部分会解释。
2.2 给子进程分配任务
2.2.1 任务码
给子进程分配任务我们可以设置一个任务码code,并且严格规定写端写入读端读取的数据大小一致。子进程需要做的工作就是等待父进程写入,父进程没有写入时会阻塞住。
void Work(int rfd){while(1){int code = 0;int n = read(rfd, &code, sizeof(code));if(n > 0) //读取成功{if(n != sizeof(code)) //读到的数据大小和发送的不一致continue;cout << "子进程:" << getpid() << " 收到一个任务码:" << code << endl;}else if(n == 0) //读取到文件结尾{cout << "子进程读取完毕" << endl;break;}else{cout << "读取失败" << endl;break;}}}
父进程在发送任务的时候,首先要在选择一个信道,选择信道的工作交给ManagePipe类,为了解决负载不均衡问题,我们采用父进程对子进程轮询发送任务的方法,所以我们还需要记录下一个选择谁。
class ManagePipe
{
public:ManagePipe() :_next(0) //next初始化0{}bool Insert(int wfd, pid_t pid){_pipes.emplace_back(wfd, pid);return true;}void PrintPipsName() //打印一下管道的名字 {for(auto & p : _pipes){cout << p.Name() << endl;}}Pipe &Select(){Pipe &p = _pipes[_next];_next++;_next %= _pipes.size(); //控制next的数值范围return p;}~ManagePipe(){}
private:vector<Pipe> _pipes; //用vector管理存放这些Pipeint _next; //记录下一个选谁
};
选择信道之后就要发送任务码SendCode,其实就是父进程往管道里写入,发送任务码的方法在Pipe类里实现。
class Pipe
{
public:Pipe(int wfd, pid_t pid) //构造:_wfd(wfd), _pid(pid){_pipe_name = "wfd为" + to_string(_wfd) + " --pipe-- 对应子进程pid是:" + to_string(_pid);}int Fd(){return _wfd;}pid_t Pid(){return _pid;}string Name(){return _pipe_name;}void SendCode(int code) {write(_wfd, &code, sizeof(code));}~Pipe(){} //析构private:int _wfd; //管道的读端文件描述符pid_t _pid; //管道对应的子进程pidstring _pipe_name; //这个管道的名字
};
//ProcessPool类里
void PushTask(int test_code)
{//选择信道Pipe &p = _mp.Select();cout << "选择了" << p.Name() << endl;//发送任务码cout << "发送任务码" << test_code << endl;p.SendCode(test_code);
}
然后我们在Main.cc里测试一下。
#include "ProcessPool.hpp"
int main()
{ProcessPool pp;pp.Creat();int task_code = 0;while(1){pp.PushTask(task_code++);sleep(1);}sleep(10);return 0;
}
此时就是轮询的方法选择子进程并且发送任务码。
2.2.2 管理和执行任务
新建一个文件Task.hpp,里面存放一些任务和对任务的管理。
#pragma once
#include <iostream>
#include <vector>typedef void (*Task)(); //函数指针class ManageTask
{
public:ManageTask(){}~ManageTask(){}
private:std::vector<Task> _tasks; //vector里存函数指针
};
我们先多弄几个任务,比如打印日志,下载,上传等。
//Task.hpp文件里,ManageTask类外
void PrintLog()
{std::cout << "任务:打印日志" << std::endl;
}
void DownLoad()
{std::cout << "任务:下载" << std::endl;
}
void UpLoad()
{std::cout << "任务:上传" << std::endl;
}
这里也需要任务码,任务码这里我们就随机生成,但是范围不能超过任务的个数。
class ManageTask
{
public:ManageTask(){srand(time(nullptr)); //种随机数种子,包含头文件#include <ctime>}int Code(){return rand() % _tasks.size(); //随机数 }~ManageTask(){}
private:std::vector<Task> _tasks; //vector里存函数指针
};
有任务之后我们要先往vector里注册,就是把这些任务push_back。
void Register(Task t)
{_tasks.push_back(t);
}
有了任务码就可以直接执行对应的任务了,其实任务码就是这个指针数组下标。
//ManageTask类void Execute(int code){if(code >= 0 && code < _tasks.size()){_tasks[code](); //以函数指针的方式直接调用}}
然后回到ProcessPool.hpp文件里,在这个文件里包含Task.hpp的头文件。
此时我们的ProcessPool类还要对任务进行管理,在ProcessPool的构造函数里直接初始化就行
class ProcessPool
{
public:ProcessPool(int num = 5):_proc_num(num){_mt.Register(PrintLog);_mt.Register(DownLoad);_mt.Register(UpLoad);}//....private:int _proc_num; //创建进程的数量ManagePipe _mp; //管理管道ManageTask _mt; //管理任务
};
然后在父进程可以在选择信道之前先选择任务。
void PushTask(){//选择任务int task_code = _mt.Code();//选择信道Pipe &p = _mp.Select();cout << "选择了" << p.Name() << endl;//发送任务码cout << "发送任务码" << test_code << endl;p.SendCode(test_code);}
子进程收到任务码还要执行。
void Work(int rfd){while(1){int code = 0;int n = read(rfd, &code, sizeof(code));if(n > 0) //读取成功{if(n != sizeof(code)) //读到的数据大小和发送的不一致continue;cout << "子进程:" << getpid() << " 收到一个任务码:" << code << endl;_mt.Execute(code); //执行任务}else if(n == 0) //读取到文件结尾{cout << "子进程读取完毕" << endl;break;}else{cout << "读取失败" << endl;break;}}}
我们在Main.cc里验证一下。
#include "ProcessPool.hpp"
int main()
{ProcessPool pp;pp.Creat();while(1){pp.PushTask();sleep(1);}sleep(10);return 0;
}
2.3 关闭和回收子进程
根据前面说过的通信的那4种情况,这里只要关闭父进程的wfd就行,即关闭写端。我们直接在ManagePipe里实现。
//Pipe类
void Close() //关闭
{ close(_wfd);
}void Wait() //等待
{waitpid(_wfd, nullptr, 0);
}
//ManagePipe类
void CloseAll()
{for(auto & p:_pipes){p.Close();cout << "关闭" << p.Name() << endl;}
}void WaitAll()
{for(auto & p:_pipes){p.Wait();cout << "回收" << p.Name() << endl;}
}
//ProcessPool类
void Stop()
{_mp.CloseAll();_mp.WaitAll();
}
我们在Main.cc里验证一下。
#include "ProcessPool.hpp"
int main()
{ProcessPool pp;pp.Creat();int cnt = 10;while(cnt--){pp.PushTask();sleep(1);}pp.Stop();sleep(5);return 0;
}
3.画图理解
父进程创建第一个子进程时示例图如下。
父进程要创建第二个子进程时,子进程拷贝的父进程的文件描述符表,3没有文件使用,所以父进程的pipe_fd[0]继续是3,4被第一个进程占用了,所以父进程的pipe_fd[1]变成了5,第二个子进程复制的父进程的,所以第二个子进程pipe_fd[0]继续是3。
依次往后推,这些管道文件的读端文件描述符就是4、5、6、7、8,写端文件描述符都是3。
但是!! 第二个进程复制父进程的文件描述表时,4指向了第一个进程,导致第二个进程的4也会指向第一个进程。
往后都会如此。
所以前面的代码需要做调整,在子进程进行操作之前,把父进程打开过的管道写端全部关闭就行了,创建当前子进程之前,历史上打开过的写端存放在哪?ManagePipe的vector里。
在ManagePipe类里在实现一个关闭的接口。
void CloseBeforAll(){for(auto & p:_pipes){p.Close();}}
子进程会继承父进程的_mp,虽然后面父进程会对_mp进行Insert操作,但是这里会发生写时拷贝,创建子进程的那一瞬间,继承的是父进程历史上的数据。
本次分享就到这里了,我们下篇见~