Linux管道
预备知识:
进程通信
进程需要某种协同,协同的前提条件是通信。有些数据是用来通知就绪的,有些是单纯的传输数据,还有一些是控制相关信息。
进程具有独立性,所以通信的成本可能稍微高一点;进程间通信前提是让不同进程看到同一份(操作系统)资源(“一段内存”)。
一定是某一个进程先需要通信,让OS创建一个共享资源,OS必须提供很多系统调用,OS创建的共享资源不同,系统调用不同,进程通信就会有不同种类。
System V
System V是一套本地通信的标准,通信方式有:
1.消息队列
2.共享内存
3.信息量
直接复用内核代码直接通信:命名管道和匿名管道。
管道
匿名管道
父子进程的文件表类似于浅拷贝,不会创建第二份struct file;进程会默认打开标准输入输出,就是拷贝了bash的files_struct,继承了它的文件描述符;struct file内部也包含了引用计数(内存级),因此子进程fd关闭不影响父进程文件使用。
在这里,父子进程看到同一份资源(内核级缓冲区),这个资源叫管道文件,父进程从缓冲区读,子进程缓冲区写,就能进行通信,当然,这里的管道通信是单向的。
让父子进程关掉不需要的描述符,缓冲区不需要刷新,因为不需要写到磁盘文件,而且写入效率也低。
创建管道文件
如果想要双向通信,可以创建两个管道,单项通信更简单一些,如果在一个缓冲区双方都要读写,那么会更复杂。
代码:
#include<iostream>
#include<unistd.h>
#include<cstring>
using namespace std;
void wrprc(int fd)
{string s = "syx 666";int ct = 0;int id = getpid();while (1){string message = to_string(id) + ":" + s + to_string(ct++);write(fd, message.c_str(), message.size());cout << message << endl;sleep(1);}
}
void rdprc(int fd)
{int id = getpid();char buffer[512];while (1){ssize_t n = read(fd, buffer, 511);if(n>0){buffer[511] = '\0';cout << id << ' ' << n << ":" << buffer << endl;sleep(1);}}
}
int main()
{int pipefd[2];int n = pipe(pipefd);if(n!=0){cerr << "error:" << errno << "errorstring" << strerror(errno) << endl;return 1;}//0 read 1 writecout << pipefd[0] << ':' << pipefd[1] << endl;pid_t id = fork();if(id==0){close(pipefd[0]);wrprc(pipefd[1]);close(pipefd[1]);}else{close(pipefd[1]);rdprc(pipefd[0]);close(pipefd[0]);}return 0;
}
运行:
管道的5种特征:
1.匿名管道只能用来进行具有血缘关系的进程之间通信,如父子进程。
2.管道内部,自带进程之间同步的机制(多执行流执行代码的时候,具有明显的顺序性),父进程读取节奏与子进程写的节奏保持一致,缓冲区可能存在被多个进程同时访问的情况,导致数据不一致问题(小于PPIPE_BUF字节,写入是原子的,安全的)。
3.管道文件的生命周期是跟随进程的生命周期。
4.管道文件在通信的时候,是面向字节流的,写入和读取的次数不是一一对应的(读取会读取一大堆)。
5.管道的通信模式,是一种特殊的半双工模式。(不能同时两个都写)
管道的4种情况
1.如果管道内部是空的并且写的fd没有关闭,读取条件不具备,读进程会被阻塞。
2.如果管道被写满,并且fd不读且没被关闭,写进程会被阻塞。
3.管道一直再被读,但是写被关闭了,读端会一直读到0,表示读到文件结尾。
4.读fd被关闭,OS会杀掉对应写的进程(发送13号信号)。
进程池
提前创建一批子进程,有任务将任务交给子进程执行,当管道没数据,子进程就在阻塞等待,父进程想哪个管道写入,就是唤醒哪个子进程来处理任务,当然,父进程要进行后端任务的负载均衡(都忙起来)。
//task.hpp
#pragma
#include<iostream>
using namespace std;
#include<stdlib.h>
#include<unistd.h>
void Download()
{cout << "download!" << endl;
}
void Print()
{cout << "print!" << endl;
}
void Flush()
{cout << "flush!" << endl;
}
typedef void (*task)();
#define NUM 3
task tasks[NUM];
void loadTask()
{srand(time(nullptr) ^ getpid());tasks[0] = Download;tasks[1] = Print;tasks[2] = Flush;
}
void exec(int number)
{if(number<0||number>2){return;}tasks[number]();
}
int selectTask()
{return rand() % NUM;
}
//processpool.c
#include<iostream>
#include<string>
#include<unistd.h>
using namespace std;
#include<vector>
#include"task.hpp"
#include<sys/wait.h>
class Channel
{
public:Channel(int w,pid_t i){wfd = w;id = i;}pid_t getid(){return id;}int getwfd(){return wfd;}void closewfd(){close(wfd);}private:int wfd;pid_t id;
};
void work(int rfd)
{while(1){int command = 0;int n = read(rfd, &command, sizeof(command));if (n == sizeof(int)){exec(command);}else{close(rfd);break;}}
}
void createChannel(int num,vector<Channel>* v)
{for (int i = 0; i < num; i++){int pipefd[2] = {0};int n = pipe(pipefd);if (n < 0)exit(2);pid_t id = fork();if(id==0){for(auto i:*v){i.closewfd();}close(pipefd[1]);work(pipefd[0]);exit(0);} close(pipefd[0]);v->emplace_back(Channel(pipefd[1], id));}}
void sendTask(Channel ch,int task)
{write(ch.getwfd(),&task,sizeof(task));
}
int main(int argc,char* argv[])
{if(argc!=2){cerr << "number error!" << endl;return 1;}vector<Channel> channels;int now = 0;loadTask();int num = std::stoi(argv[1]);//1.创建子进程和通信createChannel(num, &channels);//2.通过channel发放任务int cnt = 4;while (cnt--){int task = selectTask();cout << channels[now].getid() << ' ' << task << ':' << endl;sendTask(channels[now], task);now++;now %= num;sleep(1);}// 3.回收管道和子进程for(auto &i:channels){i.closewfd();int status;int pid=0;pid = waitpid(i.getid(), &status, 0);if(pid<0)cerr << "error!" << endl;}return 0;
}
这里要注意,子进程创建会继承父进程的文件描述符表,一定要把不需要写端关掉。
命名管道
如果两个进程毫无关系,那么就用命名管道进行通信。
创建命名管道:
mkfifo filename
int mkfifo(const char* filename,mode_t mode);
删除目录下文件:
int unlink(const char* path);
server和client进程通信(server读,client写):
#namePipe.hpp
#pragma
#include<iostream>
#include<string>
#include<sys/types.h>
#include<sys/stat.h>
#include<unistd.h>
#include<cerrno>
#include<fcntl.h>
using namespace std;
#define Size 256
const string Path = "./myfifo";
#define creater 1
#define user 2
class NamedPipe
{
public:NamedPipe(const string &Path,int id) :path(Path),id(id){if(id==creater){int ret = mkfifo(path.c_str(), 0666);if(ret!=0){perror("mkfifo");}}openNamedPipe();}int removeNamedpipe(){int ret = unlink(path.c_str());if(ret!=0){perror("mkfifo");}return ret;}~NamedPipe(){if(id==creater)removeNamedpipe();}int Read(string* out){char buffer[Size];int n = read(fd, buffer, Size);if(n>0){buffer[Size] = 0;*out = buffer;}return n;}void Write(const string& in){write(fd, in.c_str(), in.size());}private : const string path;int id;int fd;int openNamedPipe(){if(id==creater)fd = open(path.c_str(), O_RDONLY);elsefd = open(path.c_str(), O_WRONLY);return fd;}
};
#server.cpp
#include"namedPipe.hpp"
int main()
{NamedPipe pipe(Path,creater);while(1){string messages;int n=pipe.Read(&messages);if(n==0){cout << "client quit!" << endl;break;}cout << messages << endl;}return 0;
}
#client.cpp
#include"namedPipe.hpp"
int main()
{NamedPipe pipe(Path,user);while(1){cout << "in:";string messages;cin >> messages;pipe.Write(messages);}return 0;
}