Linux之进程间通信
目录
一、进程间通信介绍
1.1、进程间通信目的
1.2、进程间通信发展
1.3、进程间通讯分类
二、管道
三、匿名管道
3.1、示例代码
完整重定向问题:
3.2、⽤ fork 来共享管道原理
3.3、站在⽂件描述符⻆度-深度理解管道
3.4、站在内核⻆度-管道本质
3.5、管道样例
3.5.1、测试管道读写
3.5.2、创建进程池处理任务
3.6、管道读写规则
3.7、匿名管道特点
3.8、验证管道通信的4种情况
四、命名管道
4.1、创建一个命名管道
4.2、匿名管道与命名管道的区别
4.3、命名管道的打开规则
实例一、⽤命名管道实现⽂件拷⻉
实例二、⽤命名管道实现server&client通信
五、system V共享内存
5.1、共享内存⽰意图
5.2、共享内存数据结构
5.3、共享内存函数
实例一、共享内存实现通信
示例二、借助管道实现访问控制版的共享内存
六、system V消息队列(了解即可)
七、system V信号量(了解即可)
7.1、并发编程
7.2、信号量
八、内核是如何组织管理IPC资源的
一、进程间通信介绍
1.1、进程间通信目的
- 数据传输:⼀个进程需要将它的数据发送给另⼀个进程。
- 资源共享:多个进程之间共享同样的资源。
- 通知事件:⼀个进程需要向另⼀个或⼀组进程发送消息,通知它(它们)发⽣了某种事件(如进程终⽌时要通知⽗进程)。
- 进程控制:有些进程希望完全控制另⼀个进程的执⾏(如Debug进程),此时控制进程希望能够拦截另⼀个进程的所有陷⼊和异常,并能够及时知道它的状态改变。
1.2、进程间通信发展
- 管道
- System V进程间通信
- POSIX进程间通信
1.3、进程间通讯分类
管道:
- 匿名管道pipe
- 命名管道
System V IPC:
- System V 消息队列
- System V 共享内存
- System V 信号量
POSIX IPC:
- 消息队列
- 共享内存
- 信号量
- 互斥量
- 条件变量
- 读写锁
二、管道
- 管道是Unix中最古⽼的进程间通信的形式。
- 我们把从⼀个进程连接到另⼀个进程的⼀个数据流称为⼀个“管道”
注意:执行一个命令或者执行一个程序,我们可以通过 命令/程序 & 的方式将该命令或者程序放到后台执行。
三、匿名管道
#include<unistd.h>
功能:创建⼀⽆名管道
原型
int pipe(int fd[2]);
参数:
fd:⽂件描述符数组,其中fd[0]表⽰读端, fd[1]表⽰写端
返回值:成功返回0,失败返回错误代码
3.1、示例代码
#include <iostream>
#include <string>
#include <cstdlib>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>int main()
{// 创建管道int fds[2] = {0};int n = pipe(fds); //fds:输出型参数if(n != 0){std::cerr<<"pipe error"<<std::endl;return 1; }//创建子进程,执行任务....
}
完整重定向问题:
上图是完整的重定向写法,其中 ./mypipe 1 > log.txt 作用是将该程序中向一号文件描述符指向的文件写入的内容写入到 log.txt 中,这段命令中 1 可以省略,因为通过 > 重定向默认是重定向一号文件描述符,如果想重定向其他文件描述符,则不可以省略,重定向哪个文件描述符,就将哪个文件描述符(即数组下标)写上就可以。
2>$1 的作用是将一号文件描述符作为数组下标时,将该数组下标指向空间中的内容拷贝到二号文件描述符作为数组下标时指向的数组空间中。
这段命令的作用就是先将一号文件描述符重定向到 log.txt,然后再将 file_struct 结构中数组下标为1的空间中的内容拷贝到下标为2的空间中。这样1号和2号文件描述符都重定向到 log.txt 了。
3.2、⽤ fork 来共享管道原理
3.3、站在⽂件描述符⻆度-深度理解管道
3.4、站在内核⻆度-管道本质
所以,看待管道,就如同看待⽂件⼀样!管道的使⽤和⽂件⼀致,迎合了“Linux⼀切皆⽂件思 想”。
3.5、管道样例
3.5.1、测试管道读写
代码:
#include <iostream>
#include <string>
#include <cstdlib>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>// Fater process -> read
// Child process -> write
int main()
{// 创建管道int fds[2] = {0};int n = pipe(fds); // fds:输出型参数if (n != 0){std::cerr << "pipe error" << std::endl;return 1;}// Fater process -> read// Child process -> write// 创建子进程pid_t id = fork();if(id < 0){std::cerr<<"fork error"<<std::endl;return 2;}else if (id == 0){//子进程//关闭不需要的fd,关闭readclose(fds[0]);int cnt = 0;while(true){std::string message = "hello bit,hello ";message += std::to_string(getpid());message += ",";message += std::to_string(cnt);::write(fds[1],message.c_str(),message.size());cnt++;sleep(1);}exit(0);}else{//父进程//关闭不需要的fd,关闭writeclose(fds[1]);char buffer[1024];while(true){size_t n = ::read(fds[0],buffer,sizeof(buffer));if(n > 0){buffer[n] = 0;std::cout<<"child->father,message:"<<buffer<<std::endl;}}pid_t rid = waitpid(id,NULL,0);std::cout<<"father wait child success:"<<rid<<std::endl;}return 0;
}
效果:
3.5.2、创建进程池处理任务
源码:
Makefile:
BIN=processpool
CC=g++
FLAGS=-c -Wall -std=c++11
LDFLAGS=-o
#SRC=$(shell ls *.cc)
SRC=$(wildcard *.cc)
OBJ=$(SRC:.cc=.o)$(BIN):$(OBJ)$(CC) $(LDFLAGS) $@ $^
%.o:%.cc $(CC) $(FLAGS) $<.PHONY:clean
clean:rm -f $(BIN) $(OBJ).PHONY:test
test:@echo $(SRC)@echo $(OBJ)
Task.hpp:
#pragma once#include <iostream>
#include <unordered_map>
#include <functional>
#include <ctime>
#include <sys/types.h>
#include <unistd.h>using task_t = std::function<void()>;class TaskManger
{
public:TaskManger(){srand(time(nullptr));tasks.push_back([](){ std::cout << "sub process[ " << getpid() << " ] 执行访问数据库任务\n"<< std::endl; });tasks.push_back([](){ std::cout << "sub process[ " << getpid() << " ] 执行URL解析\n"<< std::endl; });tasks.push_back([](){std::cout << "sub process[ " << getpid() << " ] 执行加密任务\n"<< std::endl; });tasks.push_back([](){ std::cout << "sub process[ " << getpid() << " ] 执行数据库持久化任务\n"<< std::endl; });}int SelectTask(){return rand() % tasks.size();}void Excute(unsigned long number){if(number > tasks.size() || number < 0)return;tasks[number]();}~TaskManger(){}private:std::vector<task_t> tasks;
};TaskManger tm;void Worker() //获取并执行任务
{while(true){int cmd = 0; //任务在vector中的下标int n = ::read(0, &cmd, sizeof(cmd));if(n == sizeof(cmd)){tm.Excute(cmd);}else if(n == 0) //写端关闭,子进程退出{//不需要关闭子进程的文件描述符,子进程退出后会自动关闭std::cout<< "pid: " <<getpid() << " quit..." << std::endl;break;}else{//....其他操作}}
}
Channel.hpp:
#ifndef __CHANNEL_HPP__
#define __CHANNEL_HPP__#include<iostream>
#include<string>
#include<unistd.h>//对于子进程而言,只需要管理自己的一个读端就好了
//但是父进程有很多写端,对应不用的子进程,所以需要通过先描述,在组织的思想管理起来class Channel
{
public:Channel(int wfd, pid_t who):_wfd(wfd),_who(who){_name = "Channel-" + std::to_string(wfd) + "-" + std::to_string(who);}std::string Name(){return _name;}void Send(int cmd) //向管道写入任务的编号{::write(_wfd, &cmd, sizeof(cmd));}void Close(){::close(_wfd);}pid_t Id(){return _who;}int wFd(){return _wfd;}~Channel(){}private:int _wfd;std::string _name; //给当前管道起个名字pid_t _who; //对应哪个子进程
};#endif
ProcessPool.hpp:
#include <iostream>
#include <string>
#include <vector>
#include <cstdlib>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <functional>
#include "Task.hpp"
#include "Channel.hpp"// typedef std::function<void()> work_t;
using work_t = std::function<void()>;enum
{OK = 0,UsageError,PipeError,ForkError
};class ProcessPool
{
public:ProcessPool(int n, work_t w):processnum(n),work(w){}//channel:输出型参数//work_t work:回调函数int InitProcessPool(){//创建指定个数进程for(int i = 0; i < processnum; i++){//先有管道int pipefd[2] = {0};int n = pipe(pipefd);if(n < 0) {return PipeError;}//创建进程pid_t id = fork();if(id < 0){return ForkError;}//建立通信信道if(id == 0){//关闭历史fd//因为父进程每一轮循环都会打开一个写端,//从第二个子进程开始,每个子进程都会将父进程前面管理起来的写端和当前轮次打开的写端都继承下来//这导致除最后一个管道外的每个管道除了被父进程指向写端外,还会被其他子进程指向写端std::cout<< getpid() << ", child close history fd: "; //打印提示信息,方便检查是否存在问题for(auto &c : channels){std::cout<< c.wFd() << " ";c.Close();}std::cout<< " over" <<std::endl;::close(pipefd[1]); //关闭当前轮次打开的写端std::cout<< "debug: " << pipefd[0] <<std::endl;dup2(pipefd[0], 0); //重定向读端 让 0 号文件描述符指向管道读端work();::exit(0);}//父进程执行::close(pipefd[0]); //关闭读端channels.emplace_back(pipefd[1], id);//Channel ch(pipefd[1], id);//channels.push_back(ch);}return OK;}void DispatchTask(){int who = 0;//派发任务int num = 10;while(num--){//选择一个任务int task = tm.SelectTask();//选择一个子进程channelChannel &curr = channels[who++];who %= channels.size();std::cout<< "########################" << std::endl;std::cout<< "send "<< task << " to " << curr.Name() << ",任务还剩:" << num << std::endl;std::cout<< "########################" << std::endl;//派发任务curr.Send(task);sleep(1);}}void cleanProcessPool(){//version 3for(auto &c : channels){c.Close(); //关闭写端,让读端的子进程退出pid_t rid = ::waitpid(c.Id(), nullptr, 0);if(rid > 0){std::cout<< "child" << rid <<" wait ... success" << std::endl;}}//version 2// for(int i = channels.size() - 1; i >= 0; i--)// {// channels[i].Close();// pid_t rid = ::waitpid(channels[i].Id(), nullptr, 0);// if(rid > 0)// {// std::cout<< "child" << rid <<" wait ... success" << std::endl;// }// }//version 1// for(auto &c : channels)// {// c.Close();// }// for(auto &c : channels)// {// pid_t rid = ::waitpid(c.Id(), nullptr, 0);// if(rid > 0)// {// std::cout<< "child" << rid <<" wait ... success" << std::endl;// }// }}void DebugPrint(){for(auto &c : channels){std::cout<< c.Name() <<std::endl;}}private:std::vector<Channel> channels;int processnum; //进程数量work_t work; //任务
};
Main.cc:
#include "ProcessPool.hpp"
#include "Task.hpp"//打印提示如何使用该程序的信息
void Usage(std::string proc)
{std::cout<< "Usage: " << proc << "process-num" << std::endl;
}int main(int argc, char *argv[])
{if(argc != 2){Usage(argv[0]);return UsageError;}int num = std::stoi(argv[1]);ProcessPool * pp = new ProcessPool(num, Worker);//初始化进程池pp->InitProcessPool();//派发任务pp->DispatchTask();//退出进程池pp->cleanProcessPool();delete pp;return 0;
}
3.6、管道读写规则
- O_NONBLOCK 控制当前是否为阻塞模式,默认情况下是阻塞模式。
- 当没有数据可读时:
- O_NONBLOCK disable:read调⽤阻塞,即进程暂停执⾏,⼀直等到有数据来到为⽌。
- O_NONBLOCK enable:read调⽤返回-1,errno值为EAGAIN。
- 当管道满的时候:
- O_NONBLOCK disable:write调⽤阻塞,直到有进程读⾛数据。
- O_NONBLOCK enable:调⽤返回-1,errno值为EAGAIN。
- 如果所有管道写端对应的⽂件描述符被关闭,则read返回0。
- 如果所有管道读端对应的⽂件描述符被关闭,则write操作会产⽣信号SIGPIPE,进⽽可能导致 write进程退出。
- 当要写⼊的数据量不⼤于PIPE_BUF时,linux将保证写⼊的原⼦性。
- 当要写⼊的数据量⼤于PIPE_BUF时,linux将不再保证写⼊的原⼦性。
- 在Linux中,PIPE_BUF一般是4KB。
3.7、匿名管道特点
- 只能⽤于具有共同祖先的进程(具有亲缘关系的进程)之间进⾏通信;通常,⼀个管道由⼀个进程创建,然后该进程调⽤fork,此后⽗、⼦进程之间就可应⽤该管道。
- 管道提供流式服务。即,管道是面向字节流的。
- ⼀般⽽⾔,进程退出,管道释放,所以管道的⽣命周期随进程。
- ⼀般⽽⾔,内核会对管道操作进⾏同步与互斥。
- 管道是半双⼯的,数据只能向⼀个⽅向流动;需要双⽅通信时,需要建⽴起两个管道。
- 正常情况下两个进程使用管道通信,每个进程都需要关闭管道的一端,即,一个只留写入端,一个只留读取端,这样做是因为管道是半双工的,只允许一端读,一端写。如果不关闭,文件描述符是有限的,有限的就属于资源,所以不关闭会造成资源泄漏,另外如果因为没有关闭导致误操作,就可能会让管道内的数据混乱。
- 对于匿名管道来说,父子进程使用管道通信必须先创建一个读写两端都打开的管道,在创建子进程,这是因为匿名管道中,子进程必须通过继承的文件描述符来访问管道读写端,自己无法打开任意一端。
3.8、验证管道通信的4种情况
读正常&&写满
- 示例代码:
#include <iostream>
#include <string>
#include <cstdlib>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>// Fater process -> read
// Child process -> write
int main()
{// 创建管道int fds[2] = {0};int n = pipe(fds); // fds:输出型参数if (n != 0){std::cerr << "pipe error" << std::endl;return 1;}// Fater process -> read// Child process -> write// 创建子进程pid_t id = fork();if(id < 0){std::cerr<<"fork error"<<std::endl;return 2;}else if (id == 0){//子进程//关闭不需要的fd,关闭readclose(fds[0]);int cnt = 0;int total = 0;while(true){std::string message = "h";// std::string message = "hello bit,hello ";// message += std::to_string(getpid());// message += ",";// message += std::to_string(cnt);total += ::write(fds[1],message.c_str(),message.size());cnt++;std::cout<<"total:"<<total<<std::endl;//sleep(5);}exit(0);}else{//父进程//关闭不需要的fd,关闭writeclose(fds[1]);char buffer[1024];while(true){sleep(100);size_t n = ::read(fds[0],buffer,sizeof(buffer));if(n > 0){buffer[n] = 0;std::cout<<"child->father,message:"<<buffer<<std::endl;}}pid_t rid = waitpid(id,NULL,0);std::cout<<"father wait child success:"<<rid<<std::endl;}return 0;
}
效果:
可以看到管道的大小是有限的,当管道被写满之后写端就会停止写入。
结论:管道空间是有上限的。如果管道被写满,写端就会阻塞等待读端来读,且每次写入数据时是追加式的写入。
写正常&&读空
- 示例代码:
#include <iostream>
#include <string>
#include <cstdlib>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>// Fater process -> read
// Child process -> write
int main()
{// 创建管道int fds[2] = {0};int n = pipe(fds); // fds:输出型参数if (n != 0){std::cerr << "pipe error" << std::endl;return 1;}// Fater process -> read// Child process -> write// 创建子进程pid_t id = fork();if(id < 0){std::cerr<<"fork error"<<std::endl;return 2;}else if (id == 0){//子进程//关闭不需要的fd,关闭readclose(fds[0]);int cnt = 0;while(true){std::string message = "hello bit,hello ";message += std::to_string(getpid());message += ",";message += std::to_string(cnt);::write(fds[1],message.c_str(),message.size());cnt++;//sleep(1);sleep(5);}exit(0);}else{//父进程//关闭不需要的fd,关闭writeclose(fds[1]);char buffer[1024];while(true){size_t n = ::read(fds[0],buffer,sizeof(buffer));if(n > 0){buffer[n] = 0;std::cout<<"child->father,message:"<<buffer<<std::endl;}}pid_t rid = waitpid(id,NULL,0);std::cout<<"father wait child success:"<<rid<<std::endl;}return 0;
}
效果:该程序运行后可以看到每打印一次数据后都会等一会,等待新数据的写入才能再次读取。
结论:如果管道为空,读端会阻塞等待,且每次读取都会将读到的内容在管道中清空。
写关闭&&读正常
- 示例代码:
#include <iostream>
#include <string>
#include <cstdlib>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>// Fater process -> read
// Child process -> write
int main()
{// 创建管道int fds[2] = {0};int n = pipe(fds); // fds:输出型参数if (n != 0){std::cerr << "pipe error" << std::endl;return 1;}// Fater process -> read// Child process -> write// 创建子进程pid_t id = fork();if(id < 0){std::cerr<<"fork error"<<std::endl;return 2;}else if (id == 0){//子进程//关闭不需要的fd,关闭readclose(fds[0]);int cnt = 0;while(true){std::string message = "hello bit,hello ";message += std::to_string(getpid());message += ",";message += std::to_string(cnt);::write(fds[1],message.c_str(),message.size());cnt++;break;}exit(0);}else{//父进程//关闭不需要的fd,关闭writeclose(fds[1]);char buffer[1024];while(true){sleep(1);size_t n = ::read(fds[0],buffer,sizeof(buffer));if(n > 0){buffer[n] = 0;std::cout<<"child->father,message:"<<buffer<<std::endl;}else if(n == 0){std::cout<<"n:"<<n<<std::endl;std::cout<<"child quit??? me too"<<std::endl;}}pid_t rid = waitpid(id,NULL,0);std::cout<<"father wait child success:"<<rid<<std::endl;}return 0;
}
效果:
结论:如果写端关闭,读端读完管道内的数据,再读取的时候就会读到0,表示对端关闭,也表示读到文件结尾。
读关闭&&写正常
- 示例代码:
#include <iostream>
#include <string>
#include <cstdlib>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>// Fater process -> read
// Child process -> write
int main()
{// 创建管道int fds[2] = {0};int n = pipe(fds); // fds:输出型参数if (n != 0){std::cerr << "pipe error" << std::endl;return 1;}// Fater process -> read// Child process -> write// 创建子进程pid_t id = fork();if(id < 0){std::cerr<<"fork error"<<std::endl;return 2;}else if (id == 0){//子进程//关闭不需要的fd,关闭readclose(fds[0]);int cnt = 0;while(true){std::string message = "hello bit,hello ";message += std::to_string(getpid());message += ",";message += std::to_string(cnt);::write(fds[1],message.c_str(),message.size());cnt++;sleep(1);}exit(0);}else{//父进程//关闭不需要的fd,关闭writeclose(fds[1]);char buffer[1024];while(true){sleep(1);size_t n = ::read(fds[0],buffer,sizeof(buffer));if(n > 0){buffer[n] = 0;std::cout<<"child->father,message:"<<buffer<<std::endl;}else if(n == 0){std::cout<<"n:"<<n<<std::endl;std::cout<<"child quit??? me too"<<std::endl;}close(fds[0]);break;}int status = 0;pid_t rid = waitpid(id,&status,0);std::cout<<std::endl;std::cout<<"father wait child success:"<<rid<<"exit code:"<<((status<<8)&0xFF)<<", exit sig:"<<(status & 0x7F)<<std::endl;}return 0;
}
效果:
结论:如果读端关闭,OS会直接杀掉写端的进程。OS会使用13号信号SIGPIPE来杀掉进程。
四、命名管道
- 匿名管道应⽤的⼀个限制就是只能在具有共同祖先(具有亲缘关系)的进程间通信。
- 如果我们想在不相关的进程之间交换数据,可以使⽤FIFO⽂件来做这项⼯作,它经常被称为命名管道。
- 命名管道是⼀种特殊类型的⽂件。
4.1、创建一个命名管道
命名管道可以从命令⾏上创建,命令⾏⽅法是使⽤下⾯这个命令:
$ mkfifo filename
命名管道也可以从程序⾥创建,相关函数有:
int mkfifo(const char *filename,mode_t mode);
创建命名管道:
int main(int argc, char *argv[])
{
mkfifo("p2", 0644);
return 0;
}
4.2、匿名管道与命名管道的区别
- 匿名管道由pipe函数创建并打开。
- 命名管道由mkfifo函数创建,打开⽤open。
- FIFO(命名管道)与pipe(匿名管道)之间唯⼀的区别在它们创建与打开的⽅式不同,⼀但这些⼯作完成之后,它们具有相同的语义。
4.3、命名管道的打开规则
- O_NONBLOCK 控制当前是否为阻塞模式,默认情况下是阻塞模式。
- 如果当前打开操作是为读⽽打开FIFO时:
- O_NONBLOCK disable:阻塞直到有相应进程为写⽽打开该FIFO。
- O_NONBLOCK enable:⽴刻返回成功。
- 如果当前打开操作是为写⽽打开FIFO时:
- O_NONBLOCK disable:阻塞直到有相应进程为读⽽打开该FIFO。
- O_NONBLOCK enable:⽴刻返回失败,错误码为ENXIO。
实例一、⽤命名管道实现⽂件拷⻉
读取⽂件,写⼊命名管道:
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <fcntl.h>#define ERR_EXIT(m) \do \{ \perror(m); \exit(EXIT_FAILURE); \} while (0)int main(int argc, char *argv[])
{//创建命名管道mkfifo("tp", 0644);//打开文件int infd;infd = ::open("abc", O_RDONLY);if (infd == -1)ERR_EXIT("open");//打开命名管道int outfd;outfd = ::open("tp", O_WRONLY);if (outfd == -1)ERR_EXIT("open");//将文件中的内容读出,写入管道中char buf[1024];int n;while ((n = read(infd, buf, 1024)) > 0){write(outfd, buf, n);}//关闭文件和管道close(infd);close(outfd);return 0;
}
读取管道,写⼊⽬标⽂件:
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <fcntl.h>#define ERR_EXIT(m) \do \{ \perror(m); \exit(EXIT_FAILURE); \} while (0)int main(int argc, char *argv[])
{//打开文件int outfd;outfd = ::open("abc.bak", O_WRONLY | O_CREAT | O_TRUNC, 0644);if (outfd == -1)ERR_EXIT("open");//打开管道int infd;infd = ::open("tp", O_RDONLY);if (outfd == -1)ERR_EXIT("open");//将管道的内容写入文件char buf[1024];int n;while ((n = read(infd, buf, 1024)) > 0){write(outfd, buf, n);}//关闭文件和管道,并删除管道close(infd);close(outfd);unlink("tp");return 0;
}
实例二、⽤命名管道实现server&client通信
Makefile:
SERVER=server
CLIENT=client
CC=g++
SERVER_SRC=Server.cc
Client_SRC=Client.cc.PHONY:all
all:$(SERVER) $(CLIENT)$(SERVER):$(SERVER_SRC)$(CC) -o $@ $^ -std=c++11
$(CLIENT):$(Client_SRC)$(CC) -o $@ $^ -std=c++11.PHONY:clean
clean:rm -f $(SERVER) $(CLIENT)
Common.hpp:
#pragma once #include<iostream>
#include <string>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>const std::string gpipeFile = "./fifo";
const mode_t gmode = 0600;
const int gdefultfd = -1;
const int gsize = 1024;
const int gForRead = O_RDONLY;
const int gForWrite = O_WRONLY;int OpenPipe(int flag)
{//如果读端打开文件时,写端还没有打开,读端的open方法就会堵塞int fd = ::open(gpipeFile.c_str(), flag);if(fd < 0){std::cerr << "open error" << std::endl;}return fd;
}void ClosePipeHelper(int fd)
{if(fd > 0){::close(fd);}
}
Client.hpp:
#pragma once #include<iostream>
#include"Comm.hpp"class Client
{
public:Client():_fd(gdefultfd){}bool OpenPipeForWrite(){_fd = OpenPipe(gForWrite);if(_fd < 0) return false;return true;}// std::string *: 输出型参数// const std::string &: 输入型参数// std::string &: 输入输出型参数int SendPipe(const std::string & in){return ::write(_fd, in.c_str(), in.size());}void ClosePipe(){ClosePipeHelper(_fd);}~Client(){}private:int _fd;
};
Client.cc:
#include"Client.hpp"
#include<iostream>int main()
{Client client;client.OpenPipeForWrite();std::string message;while(true){std::cout<<"Please Enter# ";std::getline(std::cin, message);client.SendPipe(message);}client.ClosePipe();return 0;
}
Server.hpp:
#pragma once#include<iostream>
#include"Comm.hpp"class Init
{
public:Init(){umask(0);int n = ::mkfifo(gpipeFile.c_str(), gmode);if(n < 0){std::cerr << "mififo error" << std::endl;return ;}std::cout << "mkfifo success" << std::endl;}~Init(){int n = ::unlink(gpipeFile.c_str());if(n < 0){std::cerr << "unlink error" << std::endl;return ;}std::cout << "unlink success" <<std::endl;}
private:
};Init init;class Server
{
public:Server():_fd(gdefultfd){}bool OpenPipeForRead(){_fd = ::OpenPipe(gForRead);if(_fd < 0) return false;return true;}// std::string *: 输出型参数// const std::string &: 输入型参数// std::string &: 输入输出型参数int RecvPipe(std::string *out){char buffer[gsize];ssize_t n = ::read(_fd, buffer, sizeof(buffer) - 1);if(n > 0){buffer[n] = 0;*out = buffer;}return n;}void ClosePipe(){ClosePipeHelper(_fd);}~Server(){}private:int _fd;
};
Server.cc:
#include"Server.hpp"
#include<iostream>int main()
{Server server;server.OpenPipeForRead();std::string message;while(true){if(server.RecvPipe(&message) > 0){std::cout << "client say# " << message << std::endl;}else{break;}}std::cout << "client quit, me too" << std::endl;server.ClosePipe();return 0;
}
五、system V共享内存
共享内存区是最快的IPC形式。⼀旦这样的内存映射到共享它的进程的地址空间,这些进程间数据传递不再涉及到内核,换句话说是进程不再通过执⾏进⼊内核的系统调⽤来传递彼此的数据。
5.1、共享内存⽰意图
5.2、共享内存数据结构
共享内存在任何时刻,可以在OS中同时存在很多个,所以OS要以一种数据结构将共享内存管理起来。
struct shmid_ds {
struct ipc_perm shm_perm; /* operation perms */
int shm_segsz; /* size of segment (bytes) */
__kernel_time_t shm_atime; /* last attach time */
__kernel_time_t shm_dtime; /* last detach time */
__kernel_time_t shm_ctime; /* last change time */
__kernel_ipc_pid_t shm_cpid; /* pid of creator */
__kernel_ipc_pid_t shm_lpid; /* pid of last operator */
unsigned short shm_nattch; /* no. of current attaches */
unsigned short shm_unused; /* compatibility */
void shm_unused2; /* ditto - used by DIPC */
void shm_unused3; /* unused */
};
5.3、共享内存函数
shmget函数:
功能:⽤来创建共享内存。
原型
int shmget(key_t key, size_t size, int shmflg);
参数
key:这个共享内存段名字
size:共享内存⼤⼩
shmflg:由九个权限标志构成,它们的⽤法和创建⽂件时使⽤的mode模式标志是⼀样的
取值为IPC_CREAT:共享内存不存在,创建并返回;共享内存已存在,获取并返回。
取值为IPC_CREAT | IPC_EXCL:共享内存不存在,创建并返回;共享内存已存在,出错返回。
创建时还可以将共享内存的权限一并以按位或的形式传入。
返回值:成功返回⼀个⾮负整数,即该共享内存段的标识码—shmid;失败返回-1。
shmid vs key:
shmid:只给用户用的一个标识shm的标识符。
key:只作为内核中,区分shm唯一性的标识符,不作为用户管理shm的id值。
shmat函数:
功能:将共享内存段连接到进程地址空间
原型
void *shmat(int shmid, const void *shmaddr, int shmflg);
参数
shmid: 共享内存标识
shmaddr:用户指定挂接到什么虚拟地址,不想指定传空指针就可以
shmflg:它的两个可能取值是SHM_RND和SHM_RDONLY,使用默认属性传0
返回值:成功返回⼀个指针,这个指针就是共享内存段在进程地址空间中的起始地址;失败返回-1。
说明:
shmaddr为NULL,核⼼⾃动选择⼀个地址
shmaddr不为NULL且shmflg⽆SHM_RND标记,则以shmaddr为连接地址。
shmaddr不为NULL且shmflg设置了SHM_RND标记,则连接的地址会⾃动向下调整为SHMLBA的整数倍。
公式:shmaddr - (shmaddr % SHMLBA)
shmflg=SHM_RDONLY,表⽰连接操作⽤来只读共享内存
shmdt函数:
功能:将共享内存段与当前进程脱离
原型
int shmdt(const void *shmaddr);
参数
shmaddr: 由shmat所返回的指针
返回值:成功返回0;失败返回-1。
注意:将共享内存段与当前进程脱离不等于删除共享内存段。
shmctl函数:
功能:⽤于控制共享内存。
原型
int shmctl(int shmid, int cmd, struct shmid_ds *buf);
参数
shmid:由shmget返回的共享内存标识码
cmd:将要采取的动作(有三个可取值)
buf:指向⼀个保存着共享内存的模式状态和访问权限的数据结构,不关心时可以传空指针
返回值:成功返回0;失败返回-1。
cmd的值:
共享内存管理指令:(即命令行指令)
ipcs:可以将消息队列,共享内存,信号量相关信息全部显示出来。
ipcs -m:只打印系统中已经存在的共享内存的相关信息。
ipcrm -m + shmid:根据 shmid 删除指定共享内存。
ftok函数:
功能:根据传入的参数生成shmget函数所需的Key值
原型
key_t ftok(const char * pathname, int proj_id);
参数
pathname:这是一个指向已存在文件的路径名的指针,该文件必须存在且可访问,因为ftok会使用该文件的inode号作为生成键值的一部分,一般是和当前项目有关的路径
proj_id:项目标识符,是一个整型参数,通常使用一个字符的ASCII值,如'A'、'B'等
返回值:成功时返回生成的键值(key_t类型),失败时返回-1,并设置errno
实例一、共享内存实现通信
Makefile:
SERVER=server
CLIENT=client
CC=g++
SERVER_SRC=Server.cc
Client_SRC=Client.cc .PHONY:all
all:$(SERVER) $(CLIENT)$(SERVER):$(SERVER_SRC)$(CC) -o $@ $^ -std=c++11
$(CLIENT):$(Client_SRC)$(CC) -o $@ $^ -std=c++11.PHONY:clean
clean:rm -f $(SERVER) $(CLIENT)
ShareMemory.hpp:
#pragma once#include <iostream>
#include <string>
#include <cstdio>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <stdalign.h>
#include <unistd.h>const std::string gpath = "/home/swb";
int gprojId = 0x6666;
int gshmsize = 4096;
mode_t gmode = 0600;//将十进制的key转化为十六进制,方便查看
std::string ToHex(key_t k)
{char buffer[64];snprintf(buffer, sizeof(buffer), "0x%x", k);return buffer;
}class ShareMemory
{
private://创建和获取一块共享内存的代码重复度高,提取成一个子方法void CreateShmHelper(int shmflg){_key = ::ftok(gpath.c_str(), gprojId);if(_key < 0){std::cerr << "ftok error" << std::endl;return ;}_shmid = ::shmget(_key, gshmsize, shmflg);if(_shmid < 0){std::cerr << "shmget error" << std::endl;return ;}std::cout << "shmid: " << _shmid << std::endl;}
public:ShareMemory():_shmid(-1),_key(0),_addr(nullptr){}~ShareMemory(){}void CreateShm(){if(_shmid == -1){CreateShmHelper(IPC_CREAT | IPC_EXCL | gmode);}}void GetShm(){CreateShmHelper(IPC_CREAT);}void AttachShm(){_addr = shmat(_shmid, nullptr, 0);if((long long)_addr == -1) //longlong 是因为当前是64位系统,地址占八字节{std::cout << "attach error" << std::endl;}}void DetachShm(){if(_addr != nullptr)::shmdt(_addr);std::cout << "detach done" <<std::endl;}void DeleteShm(){shmctl(_shmid, IPC_RMID, nullptr);}void* GetAddr(){return _addr;}private:int _shmid;key_t _key;void* _addr;
};ShareMemory shm;
Server.cc:
#include <iostream>
#include <unistd.h>
#include <string.h>
#include "ShareMemory.hpp"int main()
{shm.CreateShm();shm.AttachShm();//这里进行IPCchar *strinfo = (char*)shm.GetAddr();while(true){printf("%s\n",strinfo);sleep(1);}shm.DetachShm();shm.DeleteShm();return 0;
}
Client.cc:
#include <iostream>
#include <string.h>
#include "ShareMemory.hpp"int main()
{shm.GetShm();//获取共享内存shm.AttachShm(); //挂载到当前进程的虚拟地址空间中//这里进行IPC//获取共享内存在虚拟地址空间中的地址//并强转成需要传递的数据的类型的指针char *strinfo = (char*)shm.GetAddr(); char ch = 'A';while(ch <= 'Z'){//将这块空间当做数组使用strinfo[ch - 'A'] = ch;ch++;}shm.DetachShm();return 0;
}
注意:
- 共享内存是通信速度最快的。
- 共享内存没有进⾏同步与互斥!共享内存缺乏访问控制!会带来并发问题。需要用户自己增加保护机制。
- 共享内存的生命周期随内核,即一个进程创建了共享内存,如果不主动释放,当进程结束后,该共享内存不会销毁,依旧存在。
- 两种释放共享内存的方式:1、用户主动让OS释放。2、OS重启。
示例二、借助管道实现访问控制版的共享内存
Makefile:
SERVER=server
CLIENT=client
CC=g++
SERVER_SRC=Server.cc
Client_SRC=Client.cc .PHONY:all
all:$(SERVER) $(CLIENT)$(SERVER):$(SERVER_SRC)$(CC) -o $@ $^ -std=c++11
$(CLIENT):$(Client_SRC)$(CC) -o $@ $^ -std=c++11.PHONY:clean
clean:rm -f $(SERVER) $(CLIENT)
ShareMemory.hpp:
#pragma once#include <iostream>
#include <string>
#include <cstdio>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <stdalign.h>
#include <unistd.h>const std::string gpath = "/home/swb";
int gprojId = 0x6666;
int gshmsize = 4096;
mode_t gmode = 0600;//将十进制的key转化为十六进制,方便查看
std::string ToHex(key_t k)
{char buffer[64];snprintf(buffer, sizeof(buffer), "0x%x", k);return buffer;
}class ShareMemory
{
private://创建和获取一块共享内存的代码重复度高,提取成一个子方法void CreateShmHelper(int shmflg){_key = ::ftok(gpath.c_str(), gprojId);if(_key < 0){std::cerr << "ftok error" << std::endl;return ;}_shmid = ::shmget(_key, gshmsize, shmflg);if(_shmid < 0){std::cerr << "shmget error" << std::endl;return ;}std::cout << "shmid: " << _shmid << std::endl;}
public:ShareMemory():_shmid(-1),_key(0),_addr(nullptr){}~ShareMemory(){}void CreateShm(){if(_shmid == -1){CreateShmHelper(IPC_CREAT | IPC_EXCL | gmode);}}void GetShm(){CreateShmHelper(IPC_CREAT);}void AttachShm(){_addr = shmat(_shmid, nullptr, 0);if((long long)_addr == -1) //longlong 是因为当前是64位系统,地址占八字节{std::cout << "attach error" << std::endl;}}void DetachShm(){if(_addr != nullptr)::shmdt(_addr);std::cout << "detach done" <<std::endl;}void DeleteShm(){shmctl(_shmid, IPC_RMID, nullptr);}void* GetAddr(){return _addr;}private:int _shmid;key_t _key;void* _addr;
};ShareMemory shm;struct data
{char status[32];char lasttime[48];char image[4000];
};
Time.hpp:
#pragma once#include <iostream>
#include <string>
#include <ctime>std::string GetCurrTime()
{time_t t = time(nullptr);struct tm *curr = ::localtime(&t);char currtime[32];snprintf(currtime, sizeof(currtime), "%d-%d-%d %d:%d:%d",curr->tm_year + 1900,curr->tm_mon + 1,curr->tm_mday,curr->tm_hour,curr->tm_min,curr->tm_sec);return currtime;
}
Fifo.hpp:
#pragma once#include <iostream>
#include <string>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>const std::string gpipeFile = "./fifo";
const mode_t gfifomode = 0600;
const int gdefultfd = -1;
const int gsize = 1024;
const int gForRead = O_RDONLY;
const int gForWrite = O_WRONLY;class Fifo
{
private:void OpenFifo(int flag){_fd = ::open(gpipeFile.c_str(), flag);if(_fd < 0){std::cerr << "open error" << std::endl;}}
public:Fifo():_fd(-1){umask(0);int n = ::mkfifo(gpipeFile.c_str(), gfifomode);if(n < 0){return ;}std::cout << "mkfifo success" << std::endl;}bool OpenPipeForWrite(){OpenFifo(gForWrite);if(_fd < 0)return false;return true;}bool OpenPipeForRead(){OpenFifo(gForRead);if(_fd < 0)return false;return true;}int Wait(){int code = 0; //存放管道中的数据ssize_t n = ::read(_fd, &code, sizeof(code));if(n == sizeof(code)){return 0;}else if(n == 0){return 1;}else{return 2;}}void Signal(){int code = 1;::write(_fd, &code, sizeof(code));}~Fifo(){if(_fd >= 0)::close(_fd);int n = ::unlink(gpipeFile.c_str());if(n < 0){std::cerr << "unlink error" << std::endl;}std::cout << "unlink success" << std::endl;}private:int _fd;
};Fifo gpipe;
Server.cc:
#include <iostream>
#include <unistd.h>
#include <string.h>
#include "ShareMemory.hpp"
#include "Time.hpp"
#include "Fifo.hpp"int main()
{shm.CreateShm();shm.AttachShm();gpipe.OpenPipeForRead();// 这里进行IPCstruct data *image = (struct data *)shm.GetAddr();while (true){gpipe.Wait();printf("status: %s\n", image->status);printf("lasttime: %s\n", image->lasttime);printf("image: %s\n", image->image);strcpy(image->status, "过期");}shm.DetachShm();shm.DeleteShm();return 0;
}
Client.cc:
#include <iostream>
#include <string.h>
#include "ShareMemory.hpp"
#include "Time.hpp"
#include "Fifo.hpp"int main()
{shm.GetShm();//获取共享内存shm.AttachShm(); //挂载到当前进程的虚拟地址空间中gpipe.OpenPipeForWrite();//这里进行IPC//获取共享内存在虚拟地址空间中的地址//并强转成需要传递的数据的类型的指针struct data *image = (struct data *)shm.GetAddr(); while(true){//将这块空间当做数组使用strcpy(image->status, "最新");strcpy(image->lasttime, GetCurrTime().c_str());strcpy(image->image, "xxxxxxxxxxxxxxxxxxxxxxx");gpipe.Signal();sleep(3);}shm.DetachShm();return 0;
}
注意:这里实现的是不完整的,正常需要两个管道才能实现真正的保护,例如有管道1和管道2,server需要先对管道1读,当读取成功后再对共享内存进行读,最后对管道2进行写入数据;client先对共享内存进行写入,然后对管道1进行写,最后对管道2进行读。
六、system V消息队列(了解即可)
- 消息队列提供了⼀个从⼀个进程向另外⼀个进程发送⼀块数据的⽅法
- 每个数据块都被认为是有⼀个类型,接收者进程接收的数据块可以有不同的类型值
- 特性⽅⾯:IPC资源必须删除,否则不会⾃动清除,除⾮重启,所以system V IPC资源的⽣命周期随内核
命令行指令:
ipcs -q:查看system V消息队列相关信息。
ipcrm -m + msqid:根据msqid删除指定消息队列。
msgget函数:
功能:⽤来创建system V消息队列。
原型
int msgget(key_t key, int msgflg);
参数
key:和shmget中的key一样,通常也是通过ftok函数来获取
shmflg:
取值为IPC_CREAT:system V消息队列不存在,创建并返回;system V消息队列已存在,获取并返回。
取值为IPC_CREAT | IPC_EXCL:system V消息队列不存在,创建并返回;消息队列已存在,出错返回。
返回值:成功返回⼀个⾮负整数,即该消息队列的标识码—msqid;失败返回-1。
msgctl函数:
功能:⽤于控制system V消息队列。
原型
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
参数
msqid:由msgget返回的system V消息队列标识码
cmd:将要采取的动作,IPC_RMID用来删除消息队列
buf:指向⼀个保存着消息队列的模式状态和访问权限的数据结构,不关心时可以传空指针
返回值:成功返回0;失败返回-1。
msgsnd函数:
功能:发送消息到消息队列。
原型
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
参数
msqid:由msgget返回的system V消息队列标识码
msgp:指向用户定义的消息缓冲区的指针,缓冲区需包含消息类型和数据。
msgsz:消息正文(数据部分)的长度(字节),不包括消息类型字段。
msgflg:控制标志(通常为 0 或 IPC_NOWAIT)
返回值:成功返回
0
,失败返回-1
并设置errno
。msgflg 标志:
0:阻塞模式,若消息队列满则阻塞,直到空间可用。IPC_NOWAIT:非阻塞模式,若队列满则立即返回 -1 并设置 errno=EAGAIN。
消息缓冲区结构:
消息缓冲区必须是一个结构体,第一个字段为 long 类型的消息类型,后续为数据部分:
struct msgbuf {
long mtype; // 消息类型(必须 > 0)
char mtext[1]; // 消息数据(柔性数组,实际长度由 `msgsz` 指定)
};
msgrcv函数:
功能:接收消息从消息队列。
原型
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
参数
msqid:由msgget返回的system V消息队列标识码
msgp:指向接收消息的缓冲区(需足够大)。
msgsz:缓冲区中数据部分的最大容量(字节)。
msgtyp:指定接收的消息类型,> 0:接收类型等于 msgtyp 的第一条消息;= 0:接收队列中的第一条消息(任意类型);< 0:系统会从消息队列中查找类型值(mtype)小于或等于 |msgtyp|(绝对值) 的消息,并返回其中类型值最小的第一条消息。
msgflg:控制标志(可选 IPC_NOWAIT、MSG_NOERROR 等组合)。
返回值:成功返回实际接收的消息数据长度(字节),失败返回 -1 并设置 errno。
msgflg 标志:
IPC_NOWAIT:非阻塞模式,若无匹配消息则立即返回 -1(errno=ENOMSG)。MSG_NOERROR:若消息数据长度超过 msgsz,自动截断而不报错;未设置此标志且消息过大时,返回 -1(errno=E2BIG)。
MSG_EXCEPT(Linux 特有):当 msgtyp > 0 时,接收类型不等于 msgtyp 的第一条消息。
七、system V信号量(了解即可)
获取信号量:(可以获取一个信号量集)
删除信号量:
信号量操作:
命令行查看信号量:ipcs -s
命令行删除信号量:ipcrm -s + semid
7.1、并发编程
- 多个执⾏流(进程),能看到的同⼀份公共资源:共享资源。
- 被保护起来的资源叫做临界资源。
- 保护的⽅式常⻅:互斥与同步。
- 任何时刻,只允许⼀个执⾏流访问资源,叫做互斥。
- 多个执⾏流,访问临界资源的时候,具有⼀定的顺序性,叫做同步。
- 系统中某些资源⼀次只允许⼀个进程使⽤,称这样的资源为临界资源或互斥资源。
- 在进程中涉及到互斥资源的程序段叫临界区。你写的代码=访问临界资源的代码(临界区)+不访问 临界资源的代码(⾮临界区)。
- 所谓的对共享资源进⾏保护,本质是对访问共享资源的代码进⾏保护。
7.2、信号量
- 特性⽅⾯:IPC资源必须删除,否则不会⾃动清除,除⾮重启,所以system V IPC资源的⽣命周期随内核。
- 理解方面:信号量是⼀个计数器。
- 作用方面:保护临界区。
- 本质方面:信号量本质是对资源的预订机制。
- 操作⽅⾯:申请资源,计数器--,P操作;释放资源,计数器++,V操作。
八、内核是如何组织管理IPC资源的
具体结构的截图: