进程通信(进程池的模拟实现) read write函数复习 Linux ─── 第23课
目录
write和read函数补充:
进程池(process pool)
第一步: 创建并初始化processpool
第二步:主进程对子进程派发任务
补充:
第三步: 子进程执行完退出进程池
回收子进程
Channel.hpp
ProcessPool.hpp
Task.hpp
main.cc
makefile
write和read函数补充:
const char *str = "hello";
write(fd, str, strlen(str)); // 写入 5 个字节 ('h', 'e', 'l', 'l', 'o'),不含 '\0'
write(fd ,str, strlen(str)) 不会将\0读入 ,strlen中不含\0
read如果读的是字符串想打印 ,需要预留一个位置再最后加\0 ,再进行打印
读的数据如果不想打印 ,不用预留位置
wirte和read都是数据流,它们的行为是严格的字节级原始数据写入和读出,完全按照用户指定的内容和长度进行操作
匿名管道的应用: 进程池
进程池(process pool)
先把进程创建出来,需要什么任务 ,派发什么任务.
让一个进程(master进程) ,给其他进程(work进程)派发任务
下面实现process pool
第一步: 创建并初始化processpool
master要管理所有的管道 创建channel
创建管道 ,创建子进程 ,用vector管理全部的channel
#include<iostream>
#include<unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include<vector>
#include<functional>//执行任一方法
//typedef std::function<void()> work_t;
using work_t =std::function<void()>;//定义了函数对象类型
enum
{
OK=0,
UsageError,
PipeError,
ForkError,
CloseError,
Dup2Error
};
//先描述
class channel
{
public:
channel(int wfd ,pid_t who):_wfd(wfd),_who(who)
{
_name ="channel-"+std::to_string(wfd)+"->"+ std::to_string(who);
}
~channel()
{
}
std::string Name()
{
return _name;
}
private:
int _wfd;
std::string _name;//channel-3->203444
pid_t _who;
};
void Worker()
{
//read ->stdin
}
void Download()
{
//read ->stdin
}
//channels是输出型参数
//work_t work 回调方法 使创建子进程与让子进程执行任务是解耦的
int InitProcessPool(std::vector<channel>& channels ,int num ,work_t work)
{
//创建指定进程个数
for(int i=0 ;i <num; i++)
{ //管道
int Pipefds[2]={0};
int n =::pipe(Pipefds);
if(n< 0) return PipeError;
//子进程
pid_t id =::fork();
if(id == 0)
{//child 读
int close_ret = ::close(Pipefds[1]);//关闭写通道
if(close_ret < 0) return CloseError;
int dup2_ret =::dup2(Pipefds[0] ,0);//重定向,让子进程从标准输入中获取要执行的任务。不再使用Pipefds[0]了
if(dup2_ret< 0) return Dup2Error;
work();
::close(Pipefds[0]);
//sleep(10); DebugPrint用
::exit(0);
}
else if(id < 0)
{
return ForkError;
}
else
{//parent
int close_ret = ::close(Pipefds[0]);//关闭读通道
if(close_ret < 0) return CloseError;
channels.emplace_back(Pipefds[1] , id);
}
}
return OK;
}
void DebugProcesspool(std::vector<channel>& channels)
{
for( auto &c :channels)
{
std::cout<< c.Name() <<std::endl;
}
}
void Usage(std::string process)
{
std::cout<<"Uasge:"<< process <<"process-num"<<std::endl;
}
//我们自己是master
int main(int argc ,char* argv[])
{
if(argc!=2)
{
Usage(argv[0]);
return UsageError;
}
int num =std::stoi(argv[1]);
std::vector<channel> channels;
//1.创建&&初始化进程池
InitProcessPool(channels , num ,Worker);
// InitProcessPool(channels , num ,Print);
// InitProcessPool(channels , num ,Dowload);
//DebugProcesspool(channels);
sleep(100);
return 0;
}
第二步:主进程对子进程派发任务
补充:
什么是任务? 任务码表示任务 4个字节(int)写 ,4个字节(int)读
怎么派发? 让每个子进程任务量相等
- 方法1: 轮询
- 方法2:随机
- 方法3:历史任务数
第三步: 子进程执行完退出进程池
派发完所有的任务,子进程读取完 ,都在read阻塞 ,此时关闭子进程的两种方法
- 向子进程发送退出任务
- 利用管道写端关闭,读端读到0 ,子进程会自己退出的特性(推荐)
回收子进程
这里有一个藏得很深的bug
在创建多管道时 ,子进程会继承父进程的fd表 ,就会导致管道的写端被越来越多的子进程拿到,引用计数++ ,释放master进程的全部的wfd后 ,管道的写端还链接着子进程 ,就不能使上面的方法2成功,
解决方法:
- 倒着关闭master的wfd(最后一个管道只有一个wfd(在master,子进程都没有最后一个管道的wfd) ,倒着关闭子进程 ,此子进程的wfd也会跟着关闭)
- 在创建子进程时关闭历史wfd
通过上图发现父进程的_wfd从4开始递增
子进程的_rfd都是3
Channel.hpp
#ifndef __CHANNEL_HPP__
#define __CHANNEL_HPP__
#include <iostream>
#include <string>
#include <unistd.h>
//先描述
class channel
{
public:
channel(int wfd ,pid_t who):_wfd(wfd),_who(who)
{
_name =" channel-"+std::to_string(wfd)+"->"+ std::to_string(who);
}
~channel()
{
}
void Send(int tasknum)
{
::write(_wfd ,&tasknum ,sizeof(tasknum)); //write第二个参数是const void *buf
}
std::string Name()
{
return _name;
}
void Close()
{
::close(_wfd);
}
pid_t Id()
{
return _who;
}
int WFD()
{
return _wfd;
}
private:
int _wfd;
std::string _name;//channel-3->203444
pid_t _who;
};
#endif
ProcessPool.hpp
#include<iostream>
#include<unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include<vector>
#include<functional>//执行任一方法
#include"Task.hpp"
#include"Channel.hpp"
enum
{
OK=0,
UsageError,
PipeError,
ForkError,
CloseError,
Dup2Error
};
//typedef std::function<void()> work_t;
using work_t =std::function<void()>;//定义了函数对象类型
class ProcessPool
{
public:
ProcessPool(int num ,work_t w):num(num),work(w)
{
}
//channels是输出型参数
//work_t work 回调方法 使创建子进程与让子进程执行任务是解耦的
int InitProcessPool()
{
//创建指定进程个数
for(int i=0 ;i <num; i++)
{ //管道
int Pipefds[2]={0};
int n =::pipe(Pipefds);
if(n< 0) return PipeError;
//子进程
pid_t id =::fork();
if(id == 0)
{//child 读
// 子进程关闭历史wfd
for(auto &c : channels)
{
std::cout << c.WFD() << " ";
c.Close();
}
int close_ret = ::close(Pipefds[1]);//关闭写通道
if(close_ret < 0) return CloseError;
int dup2_ret =::dup2(Pipefds[0] ,0);//重定向,让子进程从标准输入中获取要执行的任务。不再使用Pipefds[0]了
if(dup2_ret< 0) return Dup2Error;
//子进程从管道拿任务 ,执行任务
work();//work()退出后exit
::close(Pipefds[0]);
//sleep(10); DebugPrint用
::exit(0);
}
else if(id < 0)
{
return ForkError;
}
else
{//parent
int close_ret = ::close(Pipefds[0]);//关闭读通道
if(close_ret < 0) return CloseError;
channels.emplace_back(Pipefds[1] , id);
}
}
return OK;
}
void DebugProcesspool()
{
for( auto &c :channels)
{
std::cout<< c.Name() <<std::endl;
}
}
void DispathTask()
{
int who =0;
//派发任务
int num =20; //20个任务
while(num--)
{
//a.选择一个任务 ,整数值 ,taskmanager中选
int Tasknum = TM.SelectTask();
//b.选择一个子进程管道 ,channels中选
channel &curr =channels[who++];
who %= channels.size();
std::cout<<"######################"<<std::endl;
std::cout<<"send"<<Tasknum<<" "<<curr.Name()<<"任务还剩"<<num<<std::endl;
std::cout<<"######################"<<std::endl;
//c.通过管道派发任务
curr.Send(Tasknum);
sleep(1);
}
}
void CleanProcesspool()
{
// version 3 前提:子进程创建时删除了继承自master的历史wfd
for (auto &c : channels)
{
c.Close();
pid_t rid = ::waitpid(c.Id(), nullptr, 0);
if (rid > 0)
{
std::cout << "child " << rid << " wait ... success" << std::endl;
}
}
// version 2 前提:子进程创建时没有删除继承自master的历史wfd 方法:倒着关闭master的wfd(让最后一个子进程先读到0,关闭读端 Worker退出 子进程退出)
// for(int i = channels.size()-1; i >= 0; i--)
// {
// channels[i].Close();
// pid_t rid = ::waitpid(channels[i].Id(), nullptr, 0); // 阻塞了!
// if (rid > 0)
// {
// std::cout << "child " << rid << " wait ... success" << std::endl;
// }
// }
//vertion1
// for(auto& c :channels)
// {
// c.Close();
// }
// //回收子进程,为啥写两个循环
// for(auto &c:channels)
// {
// int n =waitpid(c.Id() ,nullptr ,0);
// if(n > 0)
// {
// std::cout<<"wait child:"<< n <<"sucess "<<std::endl;
// }
// }
}
private:
std::vector<channel> channels;
int num ;
work_t work;
};
Task.hpp
#pragma once
#include<iostream>
#include<unordered_map>
#include<functional>
#include<ctime>
#include <sys/types.h>
#include <unistd.h>
using work_t =std::function<void()>;
void Download()
{
std::cout<<"我是Download任务 "<<"我的pid是"<<getpid()<<std::endl;
}
void Print()
{
std::cout<<"我是Print任务 "<<"我的pid是"<<getpid()<<std::endl;
}
void Log ()
{
std::cout<<"我是Log日志任务 "<<"我的pid是"<<getpid()<<std::endl;
}
void Sql()
{
std::cout<<"我是Sql数据库同步任务 "<<"我的pid是"<<getpid()<<std::endl;
}
static int num =0;//总任务个数
class TaskManager
{
public:
TaskManager()
{
srand(time(nullptr));
InsertTask(Download);
InsertTask(Print);
InsertTask(Log);
InsertTask(Sql);
}
void InsertTask(work_t t)
{
_tasks[num++] =t;
}
//选择任务
int SelectTask()
{
return rand()% num;
}
void Excute(int number)
{
//没有这个任务
if(_tasks.find(number) == _tasks.end()) return;
//有这个任务 ,执行
_tasks[number]();
}
~TaskManager()
{}
private:
std::unordered_map<int ,work_t> _tasks;
};
TaskManager TM;
//Task属于父子共享区
//子进程的回调方法
void Worker()
{
//read ->stdin
while(true)
{
int cmd =0;
int n =::read( 0, &cmd ,sizeof(cmd));//读和写都是以一个int的大小
if( n== sizeof(cmd))//执行任务
{
TM.Excute(cmd);
}
else if(n == 0)//读端已经读到0啦代表写端已经关闭 ,关闭子进程
{
std::cout<<"pid:"<<getpid()<<"quit..."<<std::endl;
break;
}
else
{}
}
}
main.cc
#include"ProcessPool.hpp"
#include"Task.hpp"
void Usage(std::string process)
{
std::cout<<"Uasge:"<< process <<"process-num"<<std::endl;
}
//我们自己是master
int main(int argc ,char* argv[])
{
if(argc!=2)
{
Usage(argv[0]);
return UsageError;
}
int num =std::stoi(argv[1]);
ProcessPool * pp = new ProcessPool(num ,Worker);
//1.创建&&初始化进程池
pp->InitProcessPool();
//2.派发任务
pp->DispathTask();
//3.退出进程池 ,只需要关闭所有管道的写端即可
pp->CleanProcesspool();
//1.创建&&初始化进程池
//InitProcessPool(channels , num ,Worker);
//DebugProcesspool(channels);
//2.派发任务
//DispathTask(channels);
//3.退出进程池 ,只需要关闭所有管道的写端即可
//CleanProcesspool(channels);
delete(pp);
return 0;
}
makefile
BIN=processpool
CC=g++
FLAGS=-c -Wall -std=c++11
LDFLAGS= -o
#SRC=$(shell ls *.cc)
SRC=$(wildcard *.cc)
OBJ=$(SRC:.cc=.o)
$(BIN):$(OBJ)
$(CC) $(LDFLAGS) $@ $^
%.o:%.cc
$(CC) $(FLAGS) $<
.PHONY:clean
clean:
rm -f $(BIN) $(OBJ)
.PHONY:test
test:
@echo $(SRC)
@echo $(OBJ)