当前位置: 首页 > news >正文

【Linux】IPC——匿名管道的使用

【Linux】IPC——匿名管道的使用

管道作为进程间通信的一种方式,虽然原始,但很经典,在这篇文章会通过对其使用的讲解,引出原理进行讲解,还会使用匿名管道创建一个进程池。

文章用到的代码码云链接:间尺/review

tips:推荐在学习文件系统后学习这部分内容,或者起码学完文件描述符表内容。

使用:

函数原型

#include <unistd.h>int pipe(int pipefd[2]);

参数说明

  • pipefd[2]:包含两个文件描述符的数组
    • pipefd[0]:管道的读取端
    • pipefd[1]:管道的写入端

返回值

  • 成功:返回 0
  • 失败:返回 -1,并设置 errno

使用原理

请添加图片描述

使用代码

#include <iostream>
#include <unistd.h>
#include <cstdio>
#include <cstring>
#include <sys/types.h>
#include <sys/wait.h>// 9:
void ChildWrite(int wfd)
{char c = 'c';int cnt = 0;while (true){write(wfd, &c, 1);printf("child: %d\n", cnt++);if (cnt == 100)break;}
}void FatherRead(int rfd)
{char buffer[1024];while (true){// 这里保证write完成了,再去读sleep(2);buffer[0] = 0;ssize_t n = read(rfd, buffer, sizeof(buffer) - 1);if (n > 0){buffer[n] = 0;std::cout << "child say: " << buffer << std::endl;}else if (n == 0){std::cout << "n : " << n << std::endl;std::cout << "child 退出,我也退出";break;}else{break;}break;}
}int main()
{// 1. 创建管道int fds[2] = {0}; // fds[0]:读端   fds[1]: 写端int n = pipe(fds);if (n < 0){std::cerr << "pipe error" << std::endl;return 1;}std::cout << "fds[0]: " << fds[0] << std::endl;std::cout << "fds[1]: " << fds[1] << std::endl;// 2. 创建子进程pid_t id = fork();if (id == 0){// 3. 关闭不需要的读写端,形成通信信道close(fds[0]);ChildWrite(fds[1]);close(fds[1]);exit(0);}// 3. 关闭不需要的读写端,形成通信信道close(fds[1]);std::cout << "开始读\n";FatherRead(fds[0]);std::cout << "读完毕\n";close(fds[0]);sleep(3);int status = 0;int ret = waitpid(id, &status, 0); // 获取到子进程的退出信息if (ret > 0){printf("exit code: %d, exit signal: %d\n", (status >> 8) & 0xFF, status & 0x7F);sleep(5);}return 0;
}

特性:

  • 用于有血缘关系的进程的进程间通信(利用了子进程对父进程文件描述符表(files_structstruct file*)的深拷贝)
  • 自带同步机制。
    • 读和写都开启的情况下:
      • 读比写快,读进行阻塞等待;
      • 读比写慢,写进行阻塞等待;
    • 读进程关闭,写的进程直接被操作系统发送信号杀死,因为没有人读,写是无意义的。
    • 写进程关闭,读进程在读到文件末尾后返回0。
  • pipe是面向字节流的。
  • pipe是单向通信的。
  • 生命周期随进程,因为每次创建匿名管道,都会创建内存级管道,然后由进程文件描述符表指向指定文件,同时对struct file结构体完成计数+1操作,当所有相关的进程都关闭,引用计数为0,由操作系统关闭相应文件

进程池实现

完整代码:间尺/review

核心代码

#pragma once#include <iostream>
#include <cstdlib> // stdlib.h stdio.h -> cstdlib cstdio
#include <vector>
#include <unistd.h>
#include <sys/wait.h>
#include "Task.hpp"// 先描述
class Channel
{
public:Channel(int fd, pid_t id) : _wfd(fd), _subid(id){// 给管道命名,方便调试_name = "channel-" + std::to_string(_wfd) + "-" + std::to_string(_subid);}~Channel(){}void Send(int code){int n = write(_wfd, &code, sizeof(code));(void)n; // ?}void Close(){close(_wfd);}void Wait(){pid_t rid = waitpid(_subid, nullptr, 0);(void)rid;}int Fd() { return _wfd; }pid_t SubId() { return _subid; }std::string Name() { return _name; }private:int _wfd;pid_t _subid;std::string _name;// int _loadnum;
};// 在组织
class ChannelManager
{
public:ChannelManager() : _next(0){}void Insert(int wfd, pid_t subid){_channels.emplace_back(wfd, subid);}// 进程池负载均衡轮询算法Channel &Select(){auto &c = _channels[_next];_next++;_next %= _channels.size();return c;}// 输出管道名字,用于debugvoid PrintChannel(){for (auto &channel : _channels){std::cout << channel.Name() << std::endl;}}void StopSubProcess(){for (auto &channel : _channels){channel.Close();std::cout << "关闭: " << channel.Name() << std::endl;}}void WaitSubProcess(){for (auto &channel : _channels){channel.Wait();std::cout << "回收: " << channel.Name() << std::endl;}}~ChannelManager() {}private:std::vector<Channel> _channels;int _next;
};const int gdefaultnum = 5;class ProcessPool
{
public:ProcessPool(int num) : _process_num(num){_tm.Register(PrintLog);_tm.Register(Download);_tm.Register(Upload);}void Work(int rfd){while (true){int code = 0;ssize_t n = read(rfd, &code, sizeof(code));if (n > 0){if (n != sizeof(code)){continue;}std::cout << "子进程[" << getpid() << "]收到一个任务码: " << code << std::endl;_tm.Execute(code);}else if (n == 0){std::cout << "子进程退出" << std::endl;break;}else{std::cout << "读取错误" << std::endl;break;}}}bool Start(){for (int i = 0; i < _process_num; i++){// 1. 创建管道int pipefd[2] = {0};int n = pipe(pipefd);if (n < 0)return false;// 2. 创建子进程pid_t subid = fork();if (subid < 0)return false;else if (subid == 0){// 子进程// 3. 关闭不需要的文件描述符close(pipefd[1]);Work(pipefd[0]); //??close(pipefd[0]);exit(0);}else{// 父进程//  3. 关闭不需要的文件描述符close(pipefd[0]); // 写端:pipefd[1];_cm.Insert(pipefd[1], subid);// wfd, subid}}return true;}void Debug(){_cm.PrintChannel();}void Run(){// 1. 选择一个任务int taskcode = _tm.Code();// 2. 选择一个信道[子进程],负载均衡的选择一个子进程,完成任务auto &c = _cm.Select();std::cout << "选择了一个子进程: " << c.Name() << std::endl;// 2. 发送任务c.Send(taskcode);std::cout << "发送了一个任务码: " << taskcode << std::endl;}void Stop(){// 关闭父进程所有的wfd即可_cm.StopSubProcess();// 回收所有子进程_cm.WaitSubProcess();}~ProcessPool(){}private:ChannelManager _cm;int _process_num;TaskManager _tm;
};

关键问题:为什么要把关闭和回收分开写?不可以一边关闭对应子进程的fd,然后一边回收对应子进程吗?

图解

请添加图片描述

所以说:想要实现一边关闭一边回收,必须从后向前依次关闭子进程,然后回收。否则就会导致子进程read返回值一直得不到0,从而子进程不能关闭,父进程一直阻塞 wait子进程。

http://www.dtcms.com/a/422669.html

相关文章:

  • 重庆市建设医院网站首页网站服务器租用一年多少钱啊
  • Process Explorer 第四章 · Autoruns 基础知识——通俗易懂
  • Spring Boot 3.x 开发 Starter 快速上手体验,通过实践理解自动装配原理
  • 如何通过配置扩展服务函数的返回对象
  • 手工生成DuckDB 1.4版c++插件的简单步骤
  • linux进程生命周期
  • 单机游戏大全网站开发wordpress模板获取数据库
  • wap网站设计方案做一款网站注意啥
  • Flask项目中CSRF Token实现的解决方案
  • 使用 Kubernetes(k8s) 搭建 Redis 3 主 3 从集群教程
  • icejs状态管理store使用
  • Web开发 20
  • GPU计算效率提升:混合精度训练、并行优化、量化与VLLM实践
  • 做新闻类网站建站公司排名 软通
  • wordpress js 统计网站的seo是什么意思
  • 实用Excel学习资料包(含操作+函数+图表教程)
  • 开源AI智能名片链动2+1模式S2B2C商城小程序在公益课裂变法中的应用与影响研究
  • # vim中给变量添加双引号
  • wps word添加水印
  • 软考-系统架构设计师 应用程序与数据库的交互详细讲解
  • 改bug的一些体会
  • 安全对齐到底是什么
  • 专业VBA代码优化服务邀约‌,OFFICE excel计算优化,wrod报表生成
  • 织梦门户网站源码下载平面设计师的培训机构
  • 2025 AI 消费端变革:从生活助手到体验重构的全民浪潮
  • 【VUECLI】node.js打造自己的前端cli脚手架工具
  • 磁共振成像原理(理论)15:空间信息编码 (Spatial Information Encoding) -频率编码相位编码
  • 磐石网站seo手机nfc网站开发
  • 命名视图学习笔记
  • CentOS7安装OpenStack云计算平台框架