当前位置: 首页 > news >正文

进程通信(进程池的模拟实现) read write函数复习 Linux ─── 第23课

目录

write和read函数补充:

进程池(process pool)

第一步: 创建并初始化processpool

第二步:主进程对子进程派发任务

补充:

第三步: 子进程执行完退出进程池

回收子进程

Channel.hpp

ProcessPool.hpp

Task.hpp

main.cc

makefile


write和read函数补充:

const char *str = "hello";
write(fd, str, strlen(str));  // 写入 5 个字节 ('h', 'e', 'l', 'l', 'o'),不含 '\0'

write(fd ,str, strlen(str)) 不会将\0读入 ,strlen中不含\0

read如果读的是字符串想打印 ,需要预留一个位置再最后加\0 ,再进行打印

        读的数据如果不想打印 ,不用预留位置

wirte和read都是数据流,它们的行为是严格的字节级原始数据写入和读出,完全按照用户指定的内容和长度进行操作

匿名管道的应用: 进程池

进程池(process pool)

先把进程创建出来,需要什么任务 ,派发什么任务.

让一个进程(master进程) ,给其他进程(work进程)派发任务 

下面实现process pool

第一步: 创建并初始化processpool

master要管理所有的管道 创建channel

创建管道 ,创建子进程 ,用vector管理全部的channel 

#include<iostream>
#include<unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include<vector>
#include<functional>//执行任一方法

//typedef std::function<void()> work_t;
using work_t =std::function<void()>;//定义了函数对象类型

enum 
{
    OK=0,
    UsageError,
    PipeError,
    ForkError,
    CloseError,
    Dup2Error
};


//先描述
class channel
{
public:
    channel(int wfd ,pid_t who):_wfd(wfd),_who(who)
    {
        _name ="channel-"+std::to_string(wfd)+"->"+ std::to_string(who);
    }

    ~channel()
    {

    }
    std::string Name()
    {
        return _name;
    }
private:
    int _wfd;
    std::string _name;//channel-3->203444
    pid_t _who;
};


void Worker()
{
    //read ->stdin
}
void Download()
{
    //read ->stdin
}


//channels是输出型参数
//work_t work 回调方法   使创建子进程与让子进程执行任务是解耦的
int InitProcessPool(std::vector<channel>& channels ,int num ,work_t work)
{
        //创建指定进程个数
        for(int i=0 ;i <num; i++)
        {   //管道
            int Pipefds[2]={0};
            int n =::pipe(Pipefds);
            if(n< 0) return PipeError;
    
            //子进程
            pid_t id =::fork();
            if(id == 0)
            {//child 读
                int close_ret = ::close(Pipefds[1]);//关闭写通道
                if(close_ret < 0) return CloseError;
                int dup2_ret =::dup2(Pipefds[0] ,0);//重定向,让子进程从标准输入中获取要执行的任务。不再使用Pipefds[0]了      
                if(dup2_ret< 0) return Dup2Error;
                work();
    
                ::close(Pipefds[0]);
                //sleep(10);  DebugPrint用
                ::exit(0);
            }
            else if(id < 0)
            {
                return ForkError;
            }
            else
            {//parent
                int close_ret = ::close(Pipefds[0]);//关闭读通道
                if(close_ret < 0) return CloseError;

                channels.emplace_back(Pipefds[1] , id);
    
            }
        }
        return OK;
    
}
void DebugProcesspool(std::vector<channel>& channels)
{
    for( auto &c :channels)
    {
        std::cout<< c.Name() <<std::endl;
    }

}



void Usage(std::string process)
{
    std::cout<<"Uasge:"<< process <<"process-num"<<std::endl;
}

//我们自己是master
int main(int argc ,char* argv[])
{
    if(argc!=2)
    {
        Usage(argv[0]);
        return UsageError;
    }
    int num  =std::stoi(argv[1]);
    std::vector<channel> channels;

    //1.创建&&初始化进程池
    InitProcessPool(channels , num ,Worker);
    // InitProcessPool(channels , num ,Print);
    // InitProcessPool(channels , num ,Dowload);
    
    //DebugProcesspool(channels);



    sleep(100);
    return 0;
}

第二步:主进程对子进程派发任务

补充:

        什么是任务? 任务码表示任务  4个字节(int)写 ,4个字节(int)读  

        怎么派发? 让每个子进程任务量相等   

  • 方法1: 轮询
  • 方法2:随机
  • 方法3:历史任务数

第三步: 子进程执行完退出进程池

派发完所有的任务,子进程读取完 ,都在read阻塞 ,此时关闭子进程的两种方法

  1. 向子进程发送退出任务
  2. 利用管道写端关闭,读端读到0 ,子进程会自己退出的特性(推荐)

回收子进程

这里有一个藏得很深的bug

        在创建多管道时 ,子进程会继承父进程的fd表 ,就会导致管道的写端被越来越多的子进程拿到,引用计数++ ,释放master进程的全部的wfd后 ,管道的写端还链接着子进程 ,就不能使上面的方法2成功,

解决方法: 

  1. 倒着关闭master的wfd(最后一个管道只有一个wfd(在master,子进程都没有最后一个管道的wfd) ,倒着关闭子进程 ,此子进程的wfd也会跟着关闭)
  2. 在创建子进程时关闭历史wfd

通过上图发现父进程的_wfd从4开始递增

子进程的_rfd都是3

Channel.hpp


#ifndef __CHANNEL_HPP__
#define __CHANNEL_HPP__

#include <iostream>
#include <string>
#include <unistd.h>

//先描述
class channel
{
public:
    channel(int wfd ,pid_t who):_wfd(wfd),_who(who)
    {
        _name =" channel-"+std::to_string(wfd)+"->"+ std::to_string(who);
    }

    ~channel()
    {

    }

    void Send(int tasknum)
    {
        ::write(_wfd ,&tasknum ,sizeof(tasknum)); //write第二个参数是const void *buf

    }
    std::string Name()
    {
        return _name;
    }
    void Close()
    {
        ::close(_wfd);
    }
    pid_t Id()
    {
        return _who;
    }
    int WFD()
    {
        return _wfd;
    }
private:
    int _wfd;
    std::string _name;//channel-3->203444
    pid_t _who;
};

#endif

ProcessPool.hpp

#include<iostream>
#include<unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include<vector>
#include<functional>//执行任一方法
#include"Task.hpp"
#include"Channel.hpp"

enum 
{
    OK=0,
    UsageError,
    PipeError,
    ForkError,
    CloseError,
    Dup2Error
};
//typedef std::function<void()> work_t;
using work_t =std::function<void()>;//定义了函数对象类型


class ProcessPool
{
public:
    ProcessPool(int num ,work_t w):num(num),work(w)
    {

    }

    //channels是输出型参数
//work_t work 回调方法   使创建子进程与让子进程执行任务是解耦的
int InitProcessPool()
{
        //创建指定进程个数
        for(int i=0 ;i <num; i++)
        {   //管道
            int Pipefds[2]={0};
            int n =::pipe(Pipefds);
            if(n< 0) return PipeError;
    
            //子进程
            pid_t id =::fork();
            if(id == 0)
            {//child 读

                // 子进程关闭历史wfd
                for(auto &c : channels)
                {
                    std::cout  << c.WFD() << " ";
                    c.Close();
                }
                int close_ret = ::close(Pipefds[1]);//关闭写通道
                if(close_ret < 0) return CloseError;
                int dup2_ret =::dup2(Pipefds[0] ,0);//重定向,让子进程从标准输入中获取要执行的任务。不再使用Pipefds[0]了      
                if(dup2_ret< 0) return Dup2Error;
                //子进程从管道拿任务 ,执行任务
                work();//work()退出后exit
    
                ::close(Pipefds[0]);
                //sleep(10);  DebugPrint用
                ::exit(0);
            }
            else if(id < 0)
            {
                return ForkError;
            }
            else
            {//parent
                int close_ret = ::close(Pipefds[0]);//关闭读通道
                if(close_ret < 0) return CloseError;

                channels.emplace_back(Pipefds[1] , id);
    
            }
        }
        return OK;
    
}

    void DebugProcesspool()
    {
        for( auto &c :channels)
        {
            std::cout<< c.Name() <<std::endl;
        }

    }

void DispathTask()
{
    int who =0;
    //派发任务

    int num =20; //20个任务
    while(num--)
    {
        //a.选择一个任务 ,整数值  ,taskmanager中选
        int Tasknum = TM.SelectTask();
        //b.选择一个子进程管道  ,channels中选
        channel &curr =channels[who++];
        who %= channels.size();

        std::cout<<"######################"<<std::endl;
        std::cout<<"send"<<Tasknum<<" "<<curr.Name()<<"任务还剩"<<num<<std::endl;
        std::cout<<"######################"<<std::endl;

        //c.通过管道派发任务
        curr.Send(Tasknum);

        sleep(1);
    }

}



void CleanProcesspool()
{
        // version 3  前提:子进程创建时删除了继承自master的历史wfd
        for (auto &c : channels)
        {
            c.Close();
            pid_t rid = ::waitpid(c.Id(), nullptr, 0);
            if (rid > 0)
            {
                std::cout << "child " << rid << " wait ... success" << std::endl;
            }
        }

    // version 2  前提:子进程创建时没有删除继承自master的历史wfd   方法:倒着关闭master的wfd(让最后一个子进程先读到0,关闭读端 Worker退出 子进程退出)

        // for(int i = channels.size()-1; i >= 0; i--)
        // {
        //     channels[i].Close();
        //     pid_t rid = ::waitpid(channels[i].Id(), nullptr, 0); // 阻塞了!
        //     if (rid > 0)
        //     {
        //         std::cout << "child " << rid << " wait ... success" << std::endl;
        //     }
        // }

    //vertion1
    // for(auto& c :channels)
    // {
    //     c.Close();
    // }

    // //回收子进程,为啥写两个循环
    // for(auto &c:channels)
    // {
    //     int n =waitpid(c.Id() ,nullptr ,0);
    //     if(n > 0)
    //     {
    //         std::cout<<"wait child:"<< n <<"sucess  "<<std::endl;
    //     }
    // }
}


private:
    std::vector<channel> channels;
    int num ;
    work_t work;
};



Task.hpp

#pragma once

#include<iostream>
#include<unordered_map>
#include<functional>
#include<ctime>
#include <sys/types.h>
#include <unistd.h>
using work_t =std::function<void()>;

void Download()
{
    std::cout<<"我是Download任务 "<<"我的pid是"<<getpid()<<std::endl;
}
void Print()
{
    std::cout<<"我是Print任务 "<<"我的pid是"<<getpid()<<std::endl;
}
void Log ()
{
    std::cout<<"我是Log日志任务 "<<"我的pid是"<<getpid()<<std::endl;
}
void Sql()
{
    std::cout<<"我是Sql数据库同步任务 "<<"我的pid是"<<getpid()<<std::endl;
}

static int  num =0;//总任务个数
class TaskManager
{
public:
    TaskManager()
    {
        srand(time(nullptr));
        InsertTask(Download);
        InsertTask(Print);
        InsertTask(Log);
        InsertTask(Sql);

    }
    void InsertTask(work_t t)
    {
        _tasks[num++] =t;
    }
    //选择任务
    int SelectTask()
    {
        return rand()% num;
    }

    void Excute(int number)
    {
        //没有这个任务
        if(_tasks.find(number) == _tasks.end()) return;
        //有这个任务 ,执行
        _tasks[number]();
    }
    ~TaskManager()
    {}

private:
    std::unordered_map<int ,work_t> _tasks;
};


TaskManager TM;

//Task属于父子共享区


//子进程的回调方法
void Worker()
{
    //read ->stdin
    while(true)
    {
        int cmd =0;
        int n =::read( 0, &cmd ,sizeof(cmd));//读和写都是以一个int的大小
        if( n== sizeof(cmd))//执行任务
        {
            TM.Excute(cmd);
        }
        else if(n == 0)//读端已经读到0啦代表写端已经关闭 ,关闭子进程
        {
            std::cout<<"pid:"<<getpid()<<"quit..."<<std::endl;
            break;
        }
        else
        {}
    }
}

main.cc

#include"ProcessPool.hpp"
#include"Task.hpp"


void Usage(std::string process)
{
    std::cout<<"Uasge:"<< process <<"process-num"<<std::endl;
}

//我们自己是master
int main(int argc ,char* argv[])
{
    if(argc!=2)
    {
        Usage(argv[0]);
        return UsageError;
    }
    int num  =std::stoi(argv[1]);
    ProcessPool * pp = new ProcessPool(num ,Worker); 

    //1.创建&&初始化进程池
    pp->InitProcessPool();
    //2.派发任务
    pp->DispathTask();
    //3.退出进程池 ,只需要关闭所有管道的写端即可
    pp->CleanProcesspool();


    //1.创建&&初始化进程池
    //InitProcessPool(channels , num ,Worker);

    //DebugProcesspool(channels);
    
    //2.派发任务
    //DispathTask(channels);
    

    //3.退出进程池 ,只需要关闭所有管道的写端即可
    //CleanProcesspool(channels);

    delete(pp);

    return 0;
}

makefile

BIN=processpool
CC=g++
FLAGS=-c -Wall -std=c++11
LDFLAGS= -o
#SRC=$(shell ls *.cc)
SRC=$(wildcard *.cc)
OBJ=$(SRC:.cc=.o)

$(BIN):$(OBJ)
	$(CC) $(LDFLAGS) $@ $^
%.o:%.cc
	$(CC) $(FLAGS)  $<

.PHONY:clean
clean:
	rm -f $(BIN) $(OBJ)

.PHONY:test
test:
	@echo $(SRC)
	@echo $(OBJ)

相关文章:

  • 数据库基础知识点(系列四)
  • OpenCV图像拼接(7)根据权重图对源图像进行归一化处理函数normalizeUsingWeightMap()
  • SQL 通用表表达式(CTE )
  • Linux之基本命令和格式
  • RabbitMQ 学习整理2 - 消峰限流
  • C++学习之类和对象基本概念
  • 使用vector构造杨辉三角形
  • 深入理解JavaScript中的同步和异步编程模型及应用场景
  • 【#2】介绍第三方库
  • 全面系统梳理多模态LLM对齐算法
  • Shiro学习(一):Shiro介绍和基本使用
  • vue2拦截器 拦截后端返回的数据,并判断是否需要登录
  • 程序化广告行业(32/89):常见广告位类型深度剖析
  • 【大模型基础_毛玉仁】4.2 参数附加方法
  • spring 核心注解整理
  • Windows 和 Linux 操作系统架构对比以及交叉编译
  • 网络华为HCIA+HCIP 交换机
  • ⭐算法OJ⭐连接所有点的最小费用【最小生成树】(C++实现)Min Cost to Connect All Points
  • 申报视频材料要求!第三批南充西充县非物质文化遗产代表性项目(增补)条件时间和申请程序
  • 原生后台GPS位置限制,降低功耗
  • 长春市网站优化公司/自媒体平台注册官网
  • 株洲新闻网红网株洲站/google搜索引擎入口下载
  • 软件定制开发费用多少云鲸互创优秀/青岛百度推广优化怎么做的
  • 盐城seo网站优化软件/杭州网站推广与优化
  • 做网站代理商好赚吗?/上海热点新闻
  • 网站降权怎么做/青岛关键词排名系统