Linux知识回顾总结----进程间通信(上)
本章将会讲解,什么是进程间通信,如何实现进程间通信,为啥要进行进程进程间通信,其次为了还会有一些 doem 帮助你看到进程间通信,我会举管道、SystemV 共享内存以及进程池
的相关实验,以及最后讲解一下管道、SystemV、消息队列等这些标准的内核结构是什么样子的!废话不多说,快上车!
一、概念
1.1 是什么
本质就是让具有独立性的进程指向同一份资源,有的同学可能会疑惑独立性不就是进程的最重要的特点吗?怎么还要让他们看到有通信?确实这样的通信是需要消耗大量的资源,但是通信也是为了更好的合作!
1.2 为什么
- 通过进程间的通信可以实现进程的控制,实现让不同的进程去做不同的工作,这个就是我们后面需要重点强调的实验进程池来进行实现!
- 其实还可以实现资源共享、数据传输、通知事件等其他的作用。
1.3 有什么
这个我们重点掌握前两个就可以了,那就是 管道 和 System V标准。你如果感兴趣的话可以直接去ai,就可以了解他的发展史,我觉的这些不是重点,就不过过多的进行描述。
二、管道
2.1 匿名管道
2.1.1 是什么
他是基于父子进程可以看到同一块资源而衍生出来的一种解决方式。父进程在创建出子进程的时候他们就天然的指向了一块地址空间,那这个地址空间不就是我们需要让两个进程看到同一份资源的硬性基础吗!然后我们关闭一个读端,关闭一个写端,就是实现了通信!
2.1.2 理解匿名管道
为什么是叫做匿名管道,后面我们还会学习命名管道。匿名就在于没有名字,我们可以分别从文件描述符的角度以及内核的角度进行深入的了解。
文件描述符,通过一个图就可以明白!通过文件描述表的下标去进行对于一个管道文件进行读取和写入。
从内核的角度进行理解,就是说在 Linux 下面一切文件,创建出来的管道就是一个文件。
2.1.3 实验解释
这是一个实现匿名管道的简单代码,我在编写的时候主要是对于里面,两个主要函数的具体实现,使用read 与 write 的书写有些问题,不是很熟练,以及对于 make file 的编写还需要加强!
#include <iostream>
#include <fcntl.h>
#include <unistd.h>
#include <sys/wait.h>
#include <cstring>
void ChildWrite(int wfd)
{// 进行写通过使用一个 缓冲区进行写入char buffer[1024];int cnt = 0;while(true){snprintf(buffer, sizeof(buffer), "I am Child, pid: %d, cnt:%d",getpid(), cnt++);// buffer 是数据,然后把他给放到 wfd 当中// 注意这里仅仅写入了一部分,所以不要使用sizeofwrite(wfd, buffer, strlen(buffer));sleep(1);}}
void FatherRead(int rfd)
{char buffer[1024];// 循环的从 rfd 当中去读数据while(true){buffer[0] = 0;ssize_t n = read(rfd, buffer, sizeof(buffer) - 1);if(n > 0){buffer[n] = 0;std::cout << "Child say:"<<buffer << std::endl;}}
}
int main()
{// 1.使用匿名管道首选需要创建一个管道使用 pipint fds[2] = {0}; // f[0] 是读端, f[1] 是写端int n = pipe(fds);if(n < 0){//创建失败,直接退出std::cout << "Pipe Error" << std::endl;return 1;}//这里表示3, 4 这些文件描述符没有被使用std::cout << "fds[0]:" << fds[0] << std::endl;std::cout << "fds[1]:" << fds[1] << std::endl;// 2. 创建子进程pid_t id = fork();if(id == 0){// 这个是子进程// 让子进程去进行写,那么需要关闭读close(fds[0]);ChildWrite(fds[1]);close(fds[1]);exit(-1);}// 父进程去关闭写端close(fds[1]);// 父进程进行专门的读FatherRead(fds[0]);waitpid(id, nullptr, 0);close(fds[0]);return 0;
}
2.1.4 管道当中的四种情况
匿名管道具有4个细节:1. 匿名管道仅仅适用于具有血缘关系的进程与进程间进行通信(常用与父子)
2. 管道文件自带同步机制,也就是说,只有写完了才会读!读端也就是父进程会一直等待子进程的写入,但是当子进程退出了,也就没有写端了!那么我父进程也就没有必要一直在哪里等着你,我的父进程也就直接推出了。相应的如果读的太慢了,写的非常快,那么写满了就要阻塞等待。
3. 管道是面向字节流的,也就是说它不关心你写进去的是一行文字、一个结构体,还是一整个消息。在它眼里,一切都是“字节”。
4. 管道是单向通信的。半双工的状态。
5. 声明周期随着进程而结束。
2.1.5 基于匿名管道的进程池
整体的结构是需要有个类channel进行组织管道,他需要指导子进程的pid 与 写的进程描述符。然后就是对于这个管道的进行关闭、发送与回收工作。
另外一个类 channelManger 对于生成的匿名管道进行管理,将生成的管道插入进来,对所有的管道进行管理:停止、回收、打印、选择(使用轮询的算法,定义一个 next)
最后就是使用已经管理好管道的进程池类 ProcessPool ,完成创建管道、初始化工作函数,开始工作:根据计算出一个 code 任务,然后将他发送到选择一个轮询得到的匿名管道,等待这个匿名管道得到 code, 然后执行code。
除此之外还需要对所有的任务进行封装成 TaskManager 这个类,然后在里面进行初始化、计算任务、注册任务、执行任务,整体也是通过一个vector 来管理所有的任务。代码如下:
#pragma once
#include <iostream>
#include <vector>
#include <ctime>
typedef void (*task_t)();
void PrintLog()
{std::cout << "我是一个打印日志的任务" << std::endl;
}void Download()
{std::cout << "我是一个下载的任务" << std::endl;
}void Upload()
{std::cout << "我是一个上传的任务" << std::endl;
}class TaskManager
{
public:TaskManager(){// 建立一个随机种子srand(time(nullptr)); }void Register(task_t t){_tasks.push_back(t);}int Code(){return rand() % _tasks.size();}void Execute(int code){if(code >= 0 && code < _tasks.size()){_tasks[code]();}}
private:std::vector<task_t> _tasks;
};
#ifndef __PROCESS_POOL_HPP__
#define __PROCESS_POOL_HPP__
#include <iostream>
#include <cstdlib>
#include <vector>
#include <sys/wait.h>
#include "Task.hpp"// 先描述
// 再组织
// 需要管道描述,然后进行组织,最后把组织好的管道放入到进程池当中
class Channel
{
public:Channel(int wfd, pid_t subid) : _wfd(wfd), _subid(subid){_name = "channel-" + std::to_string(_wfd) + "-" + std::to_string(_subid);}~Channel() {}void Send(int code){// 写的时候需要对于 code 取地址int n = write(_wfd, &code, sizeof(code));(void)n; // 防止 n 警告}void Close(){close(_wfd);}void Wait(){pid_t rid = waitpid(_subid, nullptr, 0);(void)rid;}int Fd() { return _wfd; }pid_t SubId() { return _subid; }std::string Name() { return _name; }private:int _wfd;pid_t _subid;std::string _name;
};
class ChannelManager
{
public:ChannelManager() : _next(0){}// 父进程的写端插入进来,然后让子进程进行读取void Insert(int wfd, pid_t subid){_channels.emplace_back(wfd, subid);}// 关闭所有的管道void StopProcess(){for (auto &channel : _channels){channel.Close();std::cout << "关闭: " << channel.Name() << std::endl;}}void waitProcess(){for (auto &channel : _channels){channel.Wait();std::cout << "回收: " << channel.Name() << std::endl;}}Channel &Select(){auto &c = _channels[_next];_next++;// 防止越界_next %= _channels.size();return c;}// 进行一下打印所有的管道void PrintsubProcess(){for (auto &c : _channels){std::cout << c.Name() << std::endl;}}~ChannelManager() {}private:// 通过 vector 数组的方式实现管理std::vector<Channel> _channels;int _next; // 实现轮询
};
const int gdefaultnum = 5;
class ProcessPool
{
public:ProcessPool(int num) : _process_num(num){// to do// 进行方法的注册_tm.Register(PrintLog);_tm.Register(Download);_tm.Register(Upload);}void Work(int rfd){while (true){// 子进程通过匿名管道读父进程发送来的命令int code = 0;ssize_t n = read(rfd, &code, sizeof(code));if (n > 0){// 进行工作std::cout << "子进程[" << getpid() << "]收到一个任务码: " << code << std::endl;_tm.Execute(code);}else if (n == 0){// 不写了,读到为 0 了, 就推出了std::cout << "子进程退出" << std::endl;break;}else{std::cout << "出错了" << std::endl;break;}}}bool Start(){for (int i = 0; i < gdefaultnum; i++){// 1. 创建管道int pipefd[2] = {0};int n = pipe(pipefd);if (n < 0)return false;// 2. 创建子进程pid_t subid = fork();if (subid < 0)return false;else if (subid == 0){// 子进程创建成功// 子进程要 r ,关闭 w 1close(pipefd[1]);Work(pipefd[0]);close(pipefd[1]);exit(0);}else{// 父进程要 w,关闭 r 0, 然后还需要进行回收close(pipefd[0]);_cm.Insert(pipefd[1], subid);}}return true;}void Run(){//1. 选择一个任务int select_code = _tm.Code();//2. 选择一个信道auto& c = _cm.Select();std::cout << "选择了一个子进程: " << c.Name() << std::endl;// 3. 发送任务c.Send(select_code);std::cout << "发送了一个任务码: " << select_code << std::endl;}void Stop(){_cm.StopProcess();_cm.waitProcess();}~ProcessPool() {}private:ChannelManager _cm;int _process_num;TaskManager _tm;
};
#endif
#include "ProcessPool.hpp"
int main()
{// 创建 5 个管道ProcessPool pp(gdefaultnum);pp.Start();int cnt = 15;while(cnt--){pp.Run();sleep(1);}pp.Stop();return 0;
}
主包本来想一口气都写完,但是写完了这个代码后发现写了一上午😱😱😱。后面还有2大部分,我会分成上、中、下三部分进行回顾如果是有其他的错误还请批评指正。如果对你有帮助还请给我点个赞👍👍👍。