当前位置: 首页 > news >正文

[linux仓库]肝爆进程通信:匿名管道、命名管道、进程池核心原理与实战【万字长文】

🌟 各位看官好,我是egoist2023

🌍 Linux == Linux is not Unix !

🚀 今天来学习进程通信的相关内容:匿名管道、命名管道、进程池。

👍 如果觉得这篇文章有帮助,欢迎您一键三连,分享更多人哦!

进程通信的产生

如果未来进程之间要协同,一个进程要把自己的数据交给另一个进程!或者一个进程要命令另外一个进程做其他事!
但是呢,由于进程之间是具有独立性的 ! 如果想把一个进程的数据交给另一个进程….基本不可能!

由此诞生出如何进程通信.

如何做到进程通信

进程之间是有独立性的?还需要保证这两之间可以进行通信,由此我们被迫使用第三者,他可以对这两进程通信起来.那他是谁呢?无疑我们第一个想到的便是操作系统.

结论:进程间通信的前提:让不同的进程看到同一份资源.这份资源由OS提供,而资源一定是某种形式的内存空间!

进程通信目的

  • 数据传输:⼀个进程需要将它的数据发送给另⼀个进程
  • 资源共享:多个进程之间共享同样的资源。
  • 通知事件:⼀个进程需要向另⼀个或⼀组进程发送消息,通知它(它们)发⽣了某种事件(如进程终⽌时要通知⽗进程)。
  • 进程控制:有些进程希望完全控制另⼀个进程的执⾏(如Debug进程),此时控制进程希望能够拦截另⼀个进程的所有陷⼊和异常,并能够及时知道它的状态改变

进程通信分类及发展背景

既然有了对进程通信概念理解,那么让进程实现通信就落实到资源这一块问题了

进程通信分类如图所示:

我们前面有接触过管道,但是并没有谈及具体含义,现在就可以来解决这个问题了.

大火都清楚,我们程序员是非常"偷懒"的,有可以复用的代码绝不会多写一遍,实际上这也是一种"巧智",从某方面提高了可维护性.

管道就是一种"取巧"的方式,它是基于文件的通信方法.

但是随着时代的发展,大家发现管道并不能解决所有问题,由于一批新问题的产生导致程序员必须创造一个真正的资源,供进程间通信,由此产生了System V标准和消息队列等...

System V进程间通信 --> 单独设计通信模块 -->窊定标准了 -->只能进行本地通信(自己的电脑)
这里谈及下System V的生态问题:

System V 标准并非由单一公司定制,而是由AT&T(美国电话电报公司) 主导开发的一套 UNIX 操作系统标准。

实际上定标准 和 实现标准对应的代码 是两批人.

一般定标准的人都是不需要定标准的,而是有另一层进行代码编写.那为什么另一层凭什么听从定标准的人呢?因为定标准的人在技术上一定是处于技术领先的地位 --> 这意味着他的产品领先 --> 我所定下的标准,时代会跟着走.如果你不跟着我的标准走,那么你就被时代淘汰,就会没落.反之,如果你跟着我定的方向走,就能继续发展. --> 从而使你被迫听从定标准的这批人.

管道概念

我们把从⼀个进程连接到另⼀个进程的⼀个数据流称为⼀个“管道” : 

如何证明 | 两边都是进程呢?

  •  在 Linux 系统中,每个进程都有唯一的进程号PID ,通过ps工具确实观察到有三个进程,且有唯一的进程号.
  • 它们的状态都为S+,表明是正在运行的进程.
  • 既然它们的父进程都相同,打印父进程发现是bash shell,说明三者是并发执行的.

匿名管道

fork共享管道原理

我们之前写过让父子进程同时向显示器进行打印内容,这是如何做到的呢?

让父子进程看到了同一份资源.在上图中,父进程创建子进程,子进程会以拷贝父进程的内核数据结构,此时父子都指向一个struct file,那么struct file里是什么呢?里面不是有个文件缓冲区吗?这不意味着父子看到了同一份资源?那么就可以基于这个原理进行进程的通信.

文件描述符角度 - 理解管道

父子进程可以看到同一份资源了,但是会出现一个问题:如果父进程写到101位置时,此时位置是停留在101这个位置的,此时子进程读是往101后读,因此会读到空内容。为了实现父子进程之间的通信,我们不得不让struct file也拷贝一份,这样便能实现父子进程间的通信.实际上,这便是管道的设计原理.

  管道的定义:管道是一个基于文件系统的一个内存级的单向通信的文件,主要用来进程间通信(IPc.>Inter-Process Communication)的

父进程要以“读写”两种方式 打开 同一个 管道文件
1.为什么要读写打开? --> 如果只打开读,那么子只能继承读,写也同理 --> 让子进程,也继承rw方式
如果是父写子读,那么要关闭对应的读写端

2.为什么要关闭读写端? --> 管道只需要单向通信 --> 定位:简单快速易上手(不关闭也可以,但建议 --> 防止误操作!)

3.如果要让两个进程互相通信呢? --> 创建两个管道! 

内核角度 - 管道本质

看待管道,就如同看待⽂件⼀样!管道的使⽤和⽂件⼀致,迎合了“Linux⼀切皆⽂件思想”。

管道实操

  • 创建匿名管道 
  • [0]表示:读fd , [1]表示:写fd
  • 成功返回0,失败返回错误代码

  1. 打开父进程的读写端
  2. 父进程创建子进程,子进程能继承父进程读写方式
  3. 这里实现父读子写,因此父关闭1,子关闭0.

    

  

父子进程打开了这个文件,同时可以实现读写功能.但是,我们的这个文件是没有文件名的啊!这说明该文件是内存级文件! 没有名字 --> 称为匿名管道

既然是内存级文件,那么就不存在所谓的向磁盘刷新的概念,因此也不需要路径,文件名概念.

实际上,这种通信方式存在一定的缺陷:

父进程定义全局数据,本来就可以被子进程所看到啊!!!但子进程定义的缓冲区父进程看不到,进行不了通信。如果要对数据进行修改呢,如何做呢?所以说管道是一种单向通信.

管道4种情况、5大特性

5大特性:

  1. 常用于具有血缘关系的进行,进行IPC,常用于父子
  2. 单向通信
  3. 管道的生命周期进程
  4. 面向字节流 --> 网络讲
  5. 管道自带同步机制! --> 多线程讲

4种情况

写端不关,写端不写

管道里面没有数据,读端就会被阻塞!

   

读端不关,读端不读

管道被写满了的话,就不在写入了!

写端不写,写端关闭

read会读到返回值为0,表示读到文件结尾!

读端关闭,写端正常

 OS会自动杀掉写进程!为什么这样做呢?OS不会做无效动作,不会做浪费时间的事!

进程池 

生活中,我们常常会听遣父母的安排,母亲对大儿说:最近工作有些累,帮母亲锤下背;对二女说:劳烦女儿今天扫扫地,倒下垃圾;对三孩说:再过一会要炒菜了,把刚买的白菜清洗下.三个孩子都等待着父母给他们派发任务.而在计算机中,匿名管道也可以实现这种需求.

Task.hpp

#pragma once#include <iostream>
#include <string>
#include <vector>
#include <functional>
// 4种任务
// task_t[4];using task_t = std::function<void()>;void Download()
{std::cout << "我是一个downlowd任务" << std::endl;
}void MySql()
{std::cout << "我是一个 MySQL 任务" << std::endl;
}void Sync()
{std::cout << "我是一个数据刷新同步的任务" << std::endl;
}void Log()
{std::cout << "我是一个日志保存任务" << std::endl;
}std::vector<task_t> tasks; // 任务表class Init
{
public:Init(){tasks.push_back(Download);tasks.push_back(MySql);tasks.push_back(Sync);tasks.push_back(Log);}
};Init ginit;

 ProcessPool.hpp

#ifndef __PROCESS_POOL_HPP__
#define __PROCESS_POOL_HPP__#include <iostream>
#include <cstdlib>
#include <string>
#include <vector>
#include <functional>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <ctime>
#include "Task.hpp"const int gdefault_process_num = 5;
// typedef std::function<void (int fd)> func_t;
using callback_t = std::function<void(int fd)>;// 先描述
class Channel
{
public:Channel(){}Channel(int fd, const std::string &name, pid_t id) : _wfd(fd), _name(name), _sub_target(id){}void DebugPrint(){printf("channel name: %s, wfd: %d, target pid: %d\n", _name.c_str(), _wfd, _sub_target);}~Channel() {}int Fd() { return _wfd; }std::string Name() { return _name; }pid_t Target() { return _sub_target; }void Close() { close(_wfd); }void Wait(){pid_t rid = waitpid(_sub_target, nullptr, 0);(void)rid;}private:int _wfd;std::string _name;pid_t _sub_target; // 目标子进程是谁// int _load; //
};class ProcessPool
{
private:void CtrlSubProcessHelper(int &index){// 1. 选择一个通道(进程)int who = index;index++;index %= _channels.size();// 2. 选择一个任务,随机int x = rand() % tasks.size(); // [0, 3]// 3. 任务推送给子进程std::cout << "选择信道: " << _channels[who].Name() << ", subtarget : " << _channels[who].Target() << std::endl;write(_channels[who].Fd(), &x, sizeof(x));sleep(1);}public:ProcessPool(int num = gdefault_process_num) : _processnum(num){srand(time(nullptr) ^ getpid() ^ 0x777);}~ProcessPool(){}bool InitProcessPool(callback_t cb){for (int i = 0; i < _processnum; i++){sleep(1);// 1. 创建了管道int pipefd[2] = {0};int n = pipe(pipefd);if (n < 0)return false;// 2. 创建子进程pid_t id = fork();if (id < 0)return false;if (id == 0){// child, read//  3. 关闭不需要的rw端,形成信道close(pipefd[1]);// 子进程应该干什么事情啊??cb(pipefd[0]);exit(0);}// father, writeclose(pipefd[0]);std::string name = "channel-" + std::to_string(i);_channels.emplace_back(pipefd[1], name, id);}return true;}// 2. 控制唤醒指定的一个子进程,让该子进程完成指定任务// 2.1 轮询选择一个子进程(选择一个信道) -- 负载均衡void PollingCtrlSubProcess(){int index = 0;while (true){CtrlSubProcessHelper(index);}}void PollingCtrlSubProcess(int count){if (count < 0)return;int index = 0;while (count){CtrlSubProcessHelper(index);count--;}}void RandomCtrlSubProcess(){}void LoadCtrlSubProcess(){}void ProcessPoolPrintFd(){std::cout << "进程池wfd list: ";for(auto &c : _channels)std::cout << c.Fd() << " ";std::cout << std::endl;}// 我们的代码,其实是有一个bug 的!void WaitSubProcesses(){// for(int end = _channels.size()-1; end >= 0; end--)// {//     _channels[end].Close();//     _channels[end].Wait();// }for (auto &c : _channels){c.Close();c.Wait();}// // 1. 先让所有子进程结束// for (auto &c : _channels)// {//     c.Close();// }// // 2. 你在回收所有的子进程僵尸状态// for (auto &c : _channels)// {//     c.Wait();//     std::cout << "回收子进程: " << c.Target() << std::endl;// }}private:// 在组织std::vector<Channel> _channels; // 所有信道int _processnum;                // 有多少个子进程
};

main.cc 

#include "ProcessPool.hpp"int main()
{// 1. 创建进程池ProcessPool pp(5);// 2. 初始化进程池pp.InitProcessPool([](int fd){while(true){int code = 0;//std::cout << "子进程阻塞: " << getpid() << std::endl;ssize_t n = read(fd, &code, sizeof(code));if(n == sizeof(code)) // 任务码{std::cout << "子进程被唤醒: " << getpid() << ", fd: " << fd << std::endl;if(code >= 0 && code < tasks.size()){tasks[code]();}else{std::cerr << "父进程给我的任务码是不对的: " << code << std::endl;}}else if(n == 0){std::cout << "子进程应该退出了: " << getpid() << std::endl;break;}else{std::cerr << "read fd: " << fd << ", error" << std::endl;break;}}});pp.ProcessPoolPrintFd();// 3. 控制进程池pp.PollingCtrlSubProcess(10);// 4. 结束线程池pp.WaitSubProcesses();std::cout << "父进程控制子进程完成,父进程结束" << std::endl;return 0;
}

Bug问题

// 1. 先让所有子进程结束
for (auto &c : _channels)
{c.Close();
}
// 2. 你在回收所有的子进程僵尸状态
for (auto &c : _channels)
{c.Wait();std::cout << "回收子进程: " << c.Target() << std::endl;
}

在回收子进程的时候,我们让子进程的读端关闭,此时写端是正常写的,但是OS不会做浪费时间的事,因此会杀掉该进程,而我们利用这一情况,将所有子进程的读端全部关闭,再全部进行回收.

但实际上,我们会发现这两者是可以合并在一起的啊!!!可以边关闭对应的子进程的读端,边进行回收,完全是可以放在一个循环体里的.

for (auto &c : _channels)
{c.Close();c.Wait();
}

但是回收子进程却发现报错了,这是为什么?实际上这个原因很难发现,因为要对底层细节剖析的非常明白才能解决,接下来小编说明出bug的原因:

  1. 创建管道父进程打开读写端,即3和4下标被占,创建子进程.父进程关闭fd为3的下标,即关闭读端口;子进程关闭fd为4的下标,即关闭写端口.
  2. 再次创建管道,由于此时3位置为空了,所以fd为3的下标为读端口,fd为5的下标为写端口.创建子进程,再次关闭父子对应的读写端口.
  3. 在这里我们就会发现一个问题,子进程不仅继承了fd为3和5的下标,还继承了fd为4的下标,指向了第一个信道;
  4. 由此循环下去,只有最后一个管道由子进程连着.

既然最后一个管道由子进程连着,那么我们倒着关管道,同时回收子进程不就可以了?

for(int end = _channels.size()-1; end >= 0; end--)
{_channels[end].Close();_channels[end].Wait();
}

但是这种实现方式不太符合逻辑,且实现的不够优雅,实在想正着回收子进程,该如何做呢?

// 子进程除了要关闭自己的w,同时也要关闭,自己从父进程哪里继承下来的所有的之前进程w端
//我的子进程,要关闭的,从父进程哪里继承下来的wfd都在哪里??
// _channels本身是被子进程继承下去的.
// 1. 子进程不要担心,父进程会影响自己的_channels.
// 2. fork之后,当前进程,只会看到所有的历史进程的wfd,并不受后续父进程emplace_backd的影响
std::cout << "进程:" << getpid() << ", 关闭了: ";
for(auto &c : _channels)
{std::cout  << c.Fd() << " ";c.Close();
}
std::cout <<"\n";

命名管道

  • 管道应⽤的⼀个限制就是只能在具有共同血缘关系的进程间通信。
  • 如果我们想在不相关的进程之间交换数据,可以使⽤FIFO⽂件来做这项⼯作,它经常被称为命名管道。
  • 命名管道是⼀种特殊类型的文件.

打开规则:

如果当前打开操作是为读⽽打开FIFO时:

  • O_NONBLOCK disable:阻塞直到有相应进程为写⽽打开该FIFO
  • O_NONBLOCK enable:⽴刻返回成功

如果当前打开操作是为写⽽打开FIFO时:

  • O_NONBLOCK disable:阻塞直到有相应进程为读⽽打开该FIFO
  • O_NONBLOCK enable:⽴刻返回失败,错误码为ENXIO 

原理

匿名管道是在内存级进行申请资源的特殊处理,而命名管道则是在磁盘上申请的特殊文件.当该文件被打开时会被系统做特殊处理,其表现是这个文件不需要往磁盘上刷新.

实操文件拷贝

int mkfifo(const char *pathname, mode_t mode);

1.client和server是如何看到同一份资源的?--> Linux的路径是唯一的!路径+文件名是本质 -->唯一的inode
2.为什么mkfifo叫命名管道 -->因为文件就是有名字哇! 

clinet创建管道 tp , 往abc文件读取内容,打开管道往管道里面写入内容;server端打开管道并进行读取内容,再往abc.backup写入内容,完成文件之间的拷贝.

 client.cpp

//读取管道,写入命名管道
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include"common.hpp"#define ERR_EXIT(m)         \do                      \{                       \perror(m);          \exit(EXIT_FAILURE); \} while (0)int main(int argc, char *argv[])
{mkfifo("tp", 0644);int infd;infd = open("abc", O_RDONLY);if (infd == -1)ERR_EXIT("open");int outfd;outfd = open("tp", O_WRONLY);if (outfd == -1)ERR_EXIT("open");char buf[1024];int n;while ((n = read(infd, buf, 1024)) > 0){write(outfd, buf, n);}close(infd);close(outfd);return 0;
}

server.cpp 

//读取管道,写入目标文件
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include"common.hpp"#define ERR_EXIT(m)         \do                      \{                       \perror(m);          \exit(EXIT_FAILURE); \} while (0)int main(int argc, char *argv[])
{int outfd;outfd = open("abc.bak", O_WRONLY | O_CREAT | O_TRUNC, 0644);if (outfd == -1)ERR_EXIT("open");int infd;infd = open("tp", O_RDONLY);if (outfd == -1)ERR_EXIT("open");char buf[1024];int n;while ((n = read(infd, buf, 1024)) > 0){write(outfd, buf, n);}close(infd);close(outfd);unlink("tp");return 0;
}

server&client通信 

common.hpp

#ifndef __COMMON_HPP__
#define __COMMON_HPP__#include<iostream>
#include <cstdio>
#include <string>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <fcntl.h>const std::string fifoname = "fifo";
mode_t mode = 0666;
//int size = 128;在Linux是能这样的
#define SIZE 128#endif

NamePipe.hpp

#pragma once#include "common.hpp"const int defaultfd = -1;class NamedPipe
{
public:NamedPipe(const std::string &name) : _name(name), _fd(defaultfd){}~NamedPipe(){}bool Create(){int n = mkfifo(_name.c_str(), mode);if (n == 0){std::cout << "mkfifo success" << std::endl;}else{std::cout << "mkfifo failed" << std::endl;perror("mkfifo");return false;}return true;}void Close(){if (_fd == defaultfd)return;elseclose(_fd);}bool OpenForRead(){_fd = open(_name.c_str(), O_RDONLY);if (_fd < 0){perror("open");return false;}std::cout << "open file success" << std::endl;return true;}bool OpenForWrite(){_fd = open(_name.c_str(), O_WRONLY);if (_fd < 0){perror("open");return false;}return true;}// 输入参数:const &// 输出参数:*// 输入输出参数:&bool Read(std::string *out){char buffer[SIZE] = {0};ssize_t num = read(_fd, buffer, sizeof(buffer) - 1);if (num > 0){buffer[num] = 0;*out = buffer;}else if (num == 0){return false;}else{return false;}return true;}void Write(const std::string &in){write(_fd, in.c_str(), in.size());}void Remove(){int m = unlink(_name.c_str());(void)m;}private:// std::string _path;//path+namestd::string _name;int _fd;// std::string who;
};

client.cpp 

#include "NamedPipe.hpp"int main()
{NamedPipe named_pipe(fifoname);named_pipe.OpenForWrite();while (true){std::cout << "Please Enter# ";std::string line;std::getline(std::cin, line);named_pipe.Write(line);}named_pipe.Close();return 0;
}

server.cpp

#include "NamedPipe.hpp"int main()
{NamedPipe pp(fifoname);pp.Create();pp.OpenForRead();std::string message;while (true){bool res = pp.Read(&message);if (!res)break;std::cout << "client say@" << message << std::endl;}// 归还资源pp.Close();pp.Remove();return 0;
}

 因此,命名管道主要解决的是毫无关系的进程之间,进行文件级进程通信!!!

  

客户端关闭,写端还在读,返回值为0. 

指令扩展:

总结

进程通信基础

  • 必要性:进程独立性导致直接通信困难,需通过操作系统提供的共享资源实现
  • 目的:数据传输、资源共享、事件通知、进程控制
  • 分类:发展从管道到SystemV标准(消息队列等),再到现代通信方式

管道技术

  1. 匿名管道

    • 特点:内存级单向通信,用于父子进程通信
    • 原理:通过fork共享文件描述符实现
    • 四种读写情况及五大特性
  2. 命名管道(FIFO)

    • 特点:磁盘特殊文件,允许无关进程通信
    • 实现:通过mkfifo创建,遵循特定打开规则
    • 应用:文件拷贝、客户端-服务器通信

进程池实现

  • 通过管道实现任务分发
  • 子进程管理及资源回收机制
  • 实际应用中的bug分析与解决方案

技术对比

  • 匿名管道:简单快速,限于血缘关系进程
  • 命名管道:更灵活,支持无关进程通信

http://www.dtcms.com/a/442202.html

相关文章:

  • 【论文阅读】-《Sparse Adversarial Attack via Perturbation Factorization》
  • flash类网站开发定制网络教研系统
  • Fine-Grained Auditing 在 ABP vNext 的落地
  • 新手用PPT百科找模板:简单好用的实操分享!
  • Next.js客户端组件与服务端组件:为什么app路由(App Route)成为新标准?use client、服务端组件嵌套客户端组件
  • 做ppt医学专业图片网站门户网站建设自查整改
  • Product Hunt 每日热榜 | 2025-10-03
  • 监管视角下的大数据信用报告:合规、透明与安全的博弈
  • 7. Pandas 字符串与类别数据处理
  • 【iOS】简单的四则运算
  • Tomcat的CATALINA_BASE
  • 嵌入式 Tomcat 与 Spring MVC 集成
  • MyBatis 进阶
  • 软件设计师-软件工程-软件过程模型
  • 论坛网站方案手机网站建设的趋势
  • LeetCode每日一题——单调数列
  • LeetCode 100题(10题)
  • 后端开发网站做一些什么建设部网站官网 造价鉴定
  • day52-Zabbix(第一部分)
  • 依托Java和百度地图实现长沙市热门道路与景点实时路况检索的实践探索
  • 7-1-查询练习
  • Numpy 手搓线性回归
  • 昆明网站服务器湖北seo推广
  • 医院网站建设怎么样盐城网站建设效果
  • dockerfile理解
  • SpringBoot集成Druid连接池_配置优化与监控实践指南
  • 12380网站建设打算公众号小程序开发公司
  • 高并发场景下的前后端数据同步策略:长轮询、SSE与WebSocket对比分析
  • 网站推广对接北京手机网站搭建费用
  • Raydium