Linux 进程间通信机制详解
Linux进程间通信
1. 什么是进程间通信?
进程间通信,简称 IPC,是指两个或多个进程(正在运行的程序的实例)之间进行数据交换、信息传递或同步操作的机制。
-
数据传输:一个进程需要将它的数据发送给另一个进程。
-
资源共享:多个进程需要共享相同的资源(如一个共享的内存区域)。
-
通知事件:一个进程需要向另一个进程发送消息,通知它发生了某个事件。
-
进程控制:有些进程希望完全控制另一个进程的执行(进程池)。
因为进程之间具有独立性(写时拷贝、拥有独立的虚拟地址空间),因此本质是让不同的进程看到同一份资源。
2. 管道
2.1 匿名管道
2.1.1 概念与特性
匿名管道 是一种最基本的进程间通信方式。它创建一个单向的、先进先出的数据传输通道,数据从一端写入,从另一端读取。因为它没有在文件系统中留下任何名称(如文件名),所以被称为匿名的。
-
匿名管道五种特性:
-
只能用来进行具有血缘关系的进程进行进程间通信(通常用于父子)。
-
具有同步机制。
-
面向字节流。
-
单向通信。
-
生命周期是随进程的。
-
-
匿名管道四种通信情况
-
写慢,读快:读端就要阻塞等待。
-
读慢,写快:当把管道写满之后(通常是64KB),读端就要阻塞等待。
-
写关,读继续:read就会读到返回值0,表示文件结尾。
-
读关,写继续:写端没有任何意义,OS会发送13号(SIGPIPE)信号杀掉写端进程。
-
2.1.2 pipe 系统调用
函数原型:
#include <unistd.h>
int pipe(int pipefd[2]);
参数:
-
pipefd[2]
:一个由两个整数组成的数组。调用pipe
后,内核会填充这个数组:-
pipefd[0]
:代表管道的读端。进程从这个文件描述符读取数据。 -
pipefd[1]
:代表管道的写端。进程向这个文件描述符写入数据。
-
返回值:
-
成功:返回
0
。 -
失败:返回
-1
,并设置相应的错误代码到errno
变量中。
2.1.3 示例
#include <iostream>
#include <unistd.h>
#include <cstdio>
#include <sys/types.h>
#include <sys/wait.h>
#include <cstring>void CWrite (int wfd) {char buffer[1024] = {0};int cnt = 0;while (true) {snprintf(buffer , sizeof(buffer) , "I am child process , pid: %d , cnt: %d" , getpid() , cnt++);write(wfd , buffer , strlen(buffer));sleep(1);}
}void FRead (int rfd) {char buffer[1024] = {0};while (true) {int n = read(rfd , buffer , sizeof(buffer) - 1);if (n > 0) {buffer[n] = 0;}std::cout << "I received a news " << buffer << std::endl;// sleep(1);}
}int main () {// fds[0]: read fds[1]: writeint fds[2] = {0};int n = pipe(fds);if (n < 0) {std::cerr << "open pipe error" << std::endl;return -1;}n = fork();if (n == 0) {// child process// 单向通信,子进程关闭读端close(fds[0]);CWrite(fds[1]);close(fds[1]);exit(0);}// 单向通信,父进程关闭写端close(fds[1]);FRead(fds[0]);close(fds[0]);waitpid(n , nullptr , 0);return 0;
}
2.1.4 图示
匿名管道本质是被OS复用的代码,底层原理其实就是文件系统。
2.2 匿名管道的应用场景(进程池)
2.2.1 代码
// tasks.hpp
#pragma once#include <functional>
#include <vector>
#include <iostream>
#include <ctime>
#include <cstdlib>void print_log () { std::cout << "打印日志" << std::endl; }
void download () { std::cout << "下载任务" << std::endl; }
void upload () { std::cout << "上传任务" << std::endl; }class Tasks {public:Tasks() {srand(time(nullptr));}void add(std::function<void()> task) {tasks.push_back(task);}// 随机分配一个任务int get_taskcode () {if (tasks.empty()) {return -1;}return rand() % tasks.size(); }// 执行任务bool execute (int taskcode) {if (taskcode < 0 || taskcode >= tasks.size()) {return false;}tasks[taskcode]();return true;}private:std::vector<std::function<void()>> tasks;
};// processPool.hpp
#pragma once#include <iostream>
#include <unistd.h>
#include <string>
#include <vector>
#include <sys/types.h>
#include <cstdlib>
#include <sys/wait.h>
#include "tasks.hpp"// 这个类用来描述主进程与子进程间的任务通信通道
class Channel {public:Channel(int wfd , pid_t process_id) :_wfd(wfd) , _process_id(process_id) {_name = "child process: pid - " + std::to_string(_process_id) + " wfd - " + std::to_string(_wfd);}int get_wfd() { return _wfd; }pid_t get_process_id () { return _process_id; }std::string& get_name() { return _name; }// 向 wfd 管道发送 taskcode 任务码void send(int taskcode) {ssize_t n = write(_wfd , &taskcode , sizeof(taskcode));(void)n;}private:int _wfd; // 管道写入端文件描述符,主进程通过此fd向子进程发送数据pid_t _process_id; // 目标子进程的进程ID,用于唯一标识任务接收方std::string _name; // 子进程的逻辑名称,用于日志记录和进程管理
};// 这个类用来管理所有的信道
class ChannelManger {public:// 插入一个信道void insert (int wfd , pid_t cid) {_channels.emplace_back(wfd , cid); // 直接构造,无需构造 + 拷贝/构造 + 移动}// 销毁所有的信道void destroy_all () {for (size_t i = 0; i < _channels.size(); i++) {close(_channels[i].get_wfd());// std::cout << "close child process name: " << _channels[i].get_name() << std::endl;}}// 轮询方式选择一个子进程信道Channel& select() {auto& c = _channels[_next];_next++;_next %= _channels.size();return c;}// 回收所有的子进程void recycle_all () {for (size_t i = 0; i < _channels.size(); i++) {int rid = waitpid(_channels[i].get_process_id() , nullptr , 0);if (rid < 0) {std::cerr << "waitpid error" << std::endl;return;}std::cout << "wait success - name: " << _channels[i].get_name() << std::endl;}}void end() {for (size_t i = 0; i < _channels.size(); i++) {close(_channels[i].get_wfd());std::cout << "close child process name: " << _channels[i].get_name() << std::endl;waitpid(_channels[i].get_process_id() , nullptr , 0);std::cout << "wait success - name: " << _channels[i].get_name() << std::endl;}}private:std::vector<Channel> _channels;int _next = 0;
};// 这个类用来描述一个进程池
class ProcessPool {public:ProcessPool(int process_count = 3) :_process_count(process_count) {// 注册任务_tasks.add(print_log);_tasks.add(download);_tasks.add(upload);}~ProcessPool() {}void work (int rfd) {while (true) {int taskcode = 0; // 存储从管道读取的任务指令码ssize_t n = read(rfd , &taskcode , sizeof(taskcode));if (n > 0) {// 成功读取到数据,但需要检查是否读取了完整的数据if (n != sizeof(taskcode)) {continue; // 跳过本次不完整数据,继续下一次读取}// 完整读取到任务指令码,此处应添加任务处理逻辑std::cout << "child process pid: " << getpid() << " 收到任务码: " << taskcode << " ";_tasks.execute(taskcode); // 任务处理// sleep(1);} else if (n == 0) {// 读取到文件结束符(EOF),表示主进程已关闭管道写入端// 这通常意味着主进程要求该子进程终止,子进程将被SIGPIPE(13)信号杀死std::cout << "pid: " << getpid() << " child process exit" << std::endl;break;} else {std::cerr << "read error" << std::endl;break;}}}bool create () {for (int i = 0; i < _process_count; i++) {// 1. 创建匿名管道int fds[2] = {0};int n = pipe(fds);if (n < 0) {std::cerr << "pipe create error" << std::endl;return false;}// 2. 创建子进程pid_t id = fork();if (id < 0) {std::cerr << "child process create error" << std::endl;return false;} else if (id == 0) {// 子进程// 关闭子进程中从父进程继承的所有写端文件描述符// 父进程所有的写端都被存储在 ChannelManger::std::vector<Channel> _channels; // 第一次循环继承父进程的 ChannelManger._cpcm 中的 _channels为空,无论父子进程执行顺序,因为当下面父进程 insert 时会触发 ChannelManger _cpcm; 写时拷贝// 第二次循环继承父进程的 ChannelManger._cpcm 中的 _channels仅有一个 Channel 元素,而该 Channel 元素中存储的写端是4// 第三次...// 因此,只需每次遍历 ChannelManger._cpcm 中的 _channels 关闭所有的 Channel 里的 _wfd 即可。_cpcm.destroy_all();// 创建单向通信信道,关闭不需要的文件描述符close(fds[1]);work(fds[0]);close(fds[0]);exit(0);}// 父进程// 创建单向通信信道,关闭不需要的文件描述符close(fds[0]);// 管理创建的子进程和通信信道_cpcm.insert(fds[1] , id);}return true;}void destroy () {// 3. 关闭所有的信道`// _cpcm.destroy_all();// // 4. 回收所有的子进程// _cpcm.recycle_all();_cpcm.end();}// 主进程为进程池中的子进程分配任务void run (int n) {while (n--) {int taskcode = _tasks.get_taskcode();auto& c = _cpcm.select();std::cout << "主进程向 - " << c.get_wfd() << " - 信道写入任务码 - " << " - " << taskcode << std::endl; c.send(taskcode);sleep(1);}}private:Tasks _tasks;ChannelManger _cpcm;int _process_count; // 进程池中进程的个数
};// main.cc
#include "processPool.hpp"int main () {// test();ProcessPool pp;pp.create();pp.run(9);pp.destroy();return 0;
}
2.2.2 图示
2.2.3 深入剖析进程池设计中的一个经典误区:子进程每次循环继承到父进程与其他子进程通信的写端
1️⃣第一次循环迭代
-
父进程状态:
_cpcm
为空(尚未添加任何Channel)。 -
创建管道:得到文件描述符对,假设是
[3, 4]
(读端3,写端4)。 -
调用
fork()
:创建子进程1。 -
子进程1执行:
-
继承父进程的
_cpcm
(此时为空向量) -
调用
_cpcm.destroy_all()
→ 什么都不做(因为空的) -
关闭自己的写端
fds[1]
(描述符4) -
进入工作循环
-
-
父进程执行:
-
关闭读端
fds[0]
(描述符3) -
将写端
fds[1]
(描述符4)插入_cpcm
-
注意:并且无论父子进程的执行顺序,首先父子进程看到的是同一份资源(ChannelManger _cpcm 中的 std::vector<Channel> _channels )若父进程先执行,调用 _cpcm 的 insert,底层是 _channels.push_back(),触发写时拷贝,所以子进程中的 _cpcm 中的 _channels 还是空的。
2️⃣:第二次循环迭代
-
父进程当前状态:
_cpcm
包含一个Channel(对应描述符4)。 -
创建新管道:得到新的描述符对
[3, 5]
。 -
再次调用
fork()
:创建子进程2。 -
此时出现关键问题:
-
子进程2继承的是当前父进程的内存状态
-
这意味着子进程2的
_cpcm
包含描述符4(第一个管道的写端) -
当子进程2执行
_cpcm.destroy_all()
时,会关闭进程2继承父进程并与进程1通信的文件描述符4。
-
3️⃣ 以此类推…
2.3 命名管道
2.3.1 概念与特性
命名管道,也称为 FIFO,是一种特殊的文件类型,它提供了一个命名的、单向的进程间通信通道。关键点在于,它有一个在文件系统中的路径名,这使得无关的进程能够找到并利用它进行通信。
-
特性:
-
适用于毫不相关进程间的通信。
-
命名管道是一种特殊的文件格式,并不会刷新到磁盘中。
-
具有同步机制。
-
面向字节流。
-
write方没有执行 open 函数的时候,read方,就要在自己的 open 函数内部进行阻塞,直到有人把管道文件打开了,open才会返回。
-
-
命名管道四种通信情况
-
写慢,读快:读端就要阻塞等待。
-
读慢,写快:当把管道写满之后,读端就要阻塞等待。
-
写关,读继续:read就会读到返回值0,表示文件结尾。
-
读关,写继续:写端没有任何意义,OS会发送13号(SIGPIPE)信号杀掉写端进程。
-
2.3.2 mkfifo 系统调用
函数原型:
#include <sys/types.h>
#include <sys/stat.h>int mkfifo(const char *pathname, mode_t mode);
参数:
-
pathname
:管道文件的路径名 -
mode
:文件权限模式(会被 umask 影响)
返回值:
-
成功:返回 0
-
失败:返回 -1,并设置 errno
2.3.3 示例
// comm.hpp
#pragma once
#include <iostream>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <fcntl.h>
#include <string>
#include <cstring>#define PIPE_PATHNAME "./mkfifo"
// #define EXIT_FAILURE 1class NamedPipe {public:NamedPipe(const std::string pathname = PIPE_PATHNAME) :_pathname(pathname) {umask(0);// 创建命名管道int n = mkfifo(_pathname.c_str() , 0666);if (n < 0) {std::cerr << "create namedpipe error" << std::endl;exit(EXIT_FAILURE);}std::cout << "create namedpipe success" << std::endl;}~NamedPipe() {int n = unlink(_pathname.c_str());if (n == -1) {std::cerr << "unlink namedpipe error" << std::endl;exit(EXIT_FAILURE);}std::cout << "unlink namedpipe success" << std::endl;}private:std::string _pathname;
};class PipeCommunicator {public:PipeCommunicator(const std::string pathname = PIPE_PATHNAME) :_pathname(pathname) , _fd(-1) {}void openForRead() {// write 方没有执行open的时候,read方,就要在open内部进行阻塞// 直到有人把管道文件打开了,open才会返回_fd = open(_pathname.c_str() , O_RDONLY);if (_fd < 0) {std::cerr << "open namedpipe error" << std::endl;exit(EXIT_FAILURE);}std::cout << "open namedpipe success" << std::endl;}void openForWrite() {_fd = open(_pathname.c_str() , O_WRONLY);if (_fd < 0) {std::cerr << "open namedpipe error" << std::endl;exit(EXIT_FAILURE);}std::cout << "open namedpipe success" << std::endl;}void reader() {char buffer[1024] = {0};while (true) {int n = read(_fd , buffer, sizeof(buffer) - 1);if (n > 0) {buffer[n] = 0;std::cout << "client send: " << buffer << std::endl;} else if (n == 0) {std::cout << "client exit" << std::endl;break; } else {std::cout << "server read error" << std::endl;break; }}}void writer() {std::string message;int count = 1;while (true) {std::cout << "Please Enter: ";std::getline(std::cin , message);message += (" , message number: " + std::to_string(count++));write(_fd , message.c_str() , message.size());}}~PipeCommunicator() {if(_fd > 0) {close(_fd);}}private:std::string _pathname;int _fd;
}; // server.cc
#include "comm.hpp"int main () {// 建立命名管道NamedPipe namedpipe;// 通信PipeCommunicator pc;pc.openForRead();pc.reader();return 0;
}// client.cc
#include "comm.hpp"int main () {PipeCommunicator pc;pc.openForWrite();pc.writer();return 0;
}
管道:数据一旦被读端成功读取,就会从匿名管道中永久移除,管道是一种FIFO的临时缓冲区,而不是一个持久化的存储设备。
共享内存:共享内存中的数据在读取后依然存在,直到被主动覆盖或清除。
正如你所说:
-
struct inode
:代表文件本身(身份标识、权限、大小、磁盘块位置等) -
struct file
:代表进程与文件的会话或视图(读写位置、打开模式等)
struct file
引用计数的核心作用
现在来回答你的问题:既然已经解耦了,struct file
的引用计数有什么用?
引用计数主要管理的是同一个进程内对同一个struct file
实例的共享。
- 场景1:文件描述符复制(
dup2
)
3. System V 通信方案
System V 是一种 内核持久化 的 IPC 机制。这意味着会一直存在于内核中,直到被显式地删除或系统重启。它与匿名管道(进程退出后自动销毁)形成了鲜明的对比。
System V 的生命周期是随内核的。
system V是一种标准,Linux内核为了支持这种标准,专门设计了一个 IPC 通信模块。
3.1 共享内存
让多个进程可以访问同一块物理内存空间,这是最快的IPC形式,因为数据不需要在进程间复制。
共享内存的本质,是通过页表将同一段物理内存映射至多个进程的虚拟地址空间,让不同的进程看到同一份资源,从而实现进程间的零拷贝内存共享。
3.1.1 ftok
ftok
是 System V IPC(包括消息队列、信号量集、共享内存)中用于生成一个唯一键值(key_t
)的函数。
只要使用相同的 pathname
和 proj_id
,在任何进程中调用 ftok
,都会得到相同的 key 值。这是不同进程能够找到同一个 IPC 资源的基础。
怎么样保证两个不同的进程,使用的是一个共享内存呢?
通过 key 来区分,key 是在用户层构建并传入操作系统中的,所以通信双方需要在用户层一起约定一个 key。
函数原型:
#include <sys/types.h>
#include <sys/ipc.h>key_t ftok(const char *pathname, int proj_id);
参数:
-
pathname: 这是一个指向已存在文件路径名的字符串指针。
- 通常使用当前目录(
.
)
- 通常使用当前目录(
-
proj_id: 一个标识符,通常是一个字符或整数。
返回值:
-
成功: 返回一个可用于
msgget
,semget
,shmget
的key_t
类型的键值。 -
失败: 返回 -1 ,并设置errno。
3.1.2 shmget
shmget 创建或获取共享内存段。
函数原型:
#include <sys/ipc.h>
#include <sys/shm.h>int shmget(key_t key, size_t size, int shmflg);
参数:
-
key: 共享内存的键值。通常使用
ftok()
函数生成。 -
size: 共享内存的大小,通常是4096的整数倍。
-
shmflg: 权限标志和创建标志的组合(通过
|
组合)。-
IPC_CREAT: 创建共享内存,如果目标共享内存不存在,就创建;否则,获取该共享内存。
-
IPC_EXCL: (单独使用,无意义),创建共享内从,如果目标共享内存不存在,就创建,如果已经存在,shmget 就会出错返回。(只要shmget成功返回,一定是一个全新的共享内存)。
-
0666: 创建时指定权限,类似文件操作。
-
返回值:
-
成功: 返回共享内存的标识符(一个非负整数,
shmid
)。 -
失败: 返回-1,并设置errno。
3.1.3 shmat
将共享内存挂接到进程地址空间中。
函数原型:
#include <sys/types.h>
#include <sys/shm.h>void *shmat(int shmid, const void *shmaddr, int shmflg);
参数:
-
shmid: 由
shmget
返回的共享内存标识符。 -
shmaddr: 使用固定的虚拟地址进行挂接,但通常设置为 NULL,让操作系统选择合适的地址。
-
shmflg: 附加选项。
-
0
: 默认,可读可写。 -
SHM_RDONLY
: 只读的方式。
-
返回值:
-
成功: 返回挂接后起始的虚拟地址。
-
失败: 返回
(void*)-1
,并设置 errno。
3.1.4 shmdt
将共享内存段从进程地址空间分离。
函数原型:
#include <sys/types.h>
#include <sys/shm.h>int shmdt(const void *shmaddr);
参数:
- shmaddr: 由
shmat
返回的挂接后的起始虚拟地址。
返回值:
-
成功: 返回0。
-
失败: 返回-1,并设置errno。
3.1.5 shmctl
控制共享内存(包括删除)。
函数原型:
#include <sys/ipc.h>
#include <sys/shm.h>int shmctl(int shmid, int cmd, struct shmid_ds *buf);
参数:
-
shmid: 共享内存标识符。
-
cmd: 控制命令(每次调用只能传递一个命令值)。
-
IPC_RMID: 最重要、最常用的命令。 标记该段为已销毁。当所有附加的进程都与之分离后,该段才会被真正销毁。
-
IPC_STAT: 通过输出型参数
struct shmid_ds
获取段信息。
-
-
buf: 当 cmd 为
IPC_RMID
时,buf 应为 NULL ;当 cmd 为IPC_STAT
时,buf 应为外部的 struct shmid_ds 输出型参数结构体。
返回值:
-
成功: 返回0。
-
失败: 返回-1,并设置errno。
3.1.6 对应的命令
-
ipcs -m
: 列出系统中所有的 System V 共享内存段。 -
ipcrm -m shmid
: 删除指定shmid
的共享内存段。
3.1.7 示例
// comm.hpp
#pragma once#include <cstdio>
#include <cstdlib>#define ERR_EXIT(errorstr) \do { \perror(errorstr); \exit(EXIT_FAILURE); \} while(0)// // shm.hpp
#pragma once#include <iostream>
#include <unistd.h>
#include <string>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include "comm.hpp"const std::string default_path = ".";
const int default_proj_id = 0x66;
const int default_id = -1;
const int default_size = 4096;
const int default_mode = 0666;#define CREATER "creater"
#define USER "user"// 封装共享内存
class IpcShm {private:void _creater(int flag) {_shmid = shmget(_key , _shmsize , flag);if (_shmid < 0) {ERR_EXIT("shmget");}std::cout << "shmget success - shmid: " << _shmid << std::endl;}// servervoid create() {_creater(IPC_CREAT | IPC_EXCL | default_mode);}// clientvoid get() {_creater(IPC_CREAT);}void attach() {_shmaddr = shmat(_shmid , nullptr , 0);if (reinterpret_cast<long long>(_shmaddr) < 0) {ERR_EXIT("shmat");}}void detach() {int n = shmdt(_shmaddr);if (n < 0) {ERR_EXIT("shmdt");}std::cout << "shmdt success" << std::endl;}void destroy() {if (_shmid == default_id)return;detach();if (_type == CREATER) {int n = shmctl(_shmid , IPC_RMID , nullptr);if (n < 0) {ERR_EXIT("shmctl");}std::cout << "destroy shm success - shmid: " << _shmid << std::endl;}}public:IpcShm(const std::string& type = CREATER , const std::string& pathname = default_path , const int proj_id = default_proj_id , const int size = default_size): _shmid(default_id) , _shmsize(size) , _key(-1) , _shmaddr(nullptr) , _type(type){// ftok 用于在 System V IPC 中生成唯一的key_key = ftok(pathname.c_str() , proj_id);if (_key < 0) {ERR_EXIT("ftok");}std::cout << "ftok success - key: " << _key << std::endl; if(_type == CREATER) {create();} else if (_type == USER) {get();} else {std::cerr << "type error" << std::endl;exit(EXIT_FAILURE);}attach();}~IpcShm() {destroy();}void* getVirtualAddr() { std::cout << "_shmaddr: " << _shmaddr << std::endl;return _shmaddr;}size_t getSize() {return _shmsize;}void getAttr() {struct shmid_ds ds;int n = shmctl(_shmid , IPC_STAT , &ds);printf("shm_segsz: %ld\n", ds.shm_segsz);printf("key: 0x%x\n", ds.shm_perm.__key);// ...}private:int _shmid;size_t _shmsize;key_t _key;void* _shmaddr;const std::string _type;
};// client.cc
#include "shm.hpp"int main() {IpcShm shm(USER);char* shm_mem = (char*)shm.getVirtualAddr();int index = 0;for(char c = 'A'; c < 'E'; c++) {shm_mem[index++] = c;sleep(1);}return 0;
}// server.cc
#include "shm.hpp"int main() {IpcShm shm;char* shm_mem = (char*)shm.getVirtualAddr();sleep(10);// while (true) {// std::cout << shm_mem << std::endl;// sleep(1);// }return 0;
}
共享内存,没有所谓的同步机制(需要使用其他方式来解决数据不一致的问题)。
若先执行client,因为client中的shmget没有携带创建权限,系统会使用随机或默认值,这通常权限不足。(先执行 server , 再执行 client)
3.2 消息队列
3.2.1 核心概念
消息队列,提供了一种,一个进程给另一个进程发送类型数据块的方式。
-
消息: 数据的基本单位。每个消息都是一个结构体,包含两个部分:
-
一个长整型的消息类型: 必须是 大于 0 的整数。这个消息类型可以用来作为接收进程筛选消息的标识符(A进程设置1,接收2;B进程设置2,接收1)。
-
一个数据数组: 存放实际要传输的数据。
在 C 语言中,通常定义如下结构(虽然标准没有严格规定,但这是通用做法):
-
struct msgbuf { long mtype; /* 消息类型,必须 > 0 */ char mtext[1]; /* 消息数据,在实际应用中通常是一个更大的数组 */
};
注意:mtext
字段可以是任意数据类型,不仅仅是 char
。它可以是包含多个成员的自定义结构体。
-
队列: 由内核维护的一个链表,用于存放消息。每个消息队列都有一个唯一的标识符,称为 消息队列 ID。
-
生命周期: System V 消息队列是内核持久化的。这意味着它们会一直存在于内核中,直到被显式地删除、系统重启或者系统配置的资源限制被触发。创建它的进程终止后,队列依然存在。
3.2.2 主要的API函数
使用 System V 消息队列需要包含以下头文件:
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
1.根据一个键值 key 来创建新的或获取一个已存在的消息队列,同共享内存。返回msgid
int msgget(key_t key, int msgflg);2.向指定的消息队列发送一条消息
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
msqid:msgget 返回的消息队列 ID。
msgp: 指向一个消息缓冲区的指针。该缓冲区必须包含 mtype 字段和 mtext 字段。
msgsz:mtext 字段的字节数(不包括 mtype 的长度)。
msgflg:控制标志。- 0:阻塞模式。如果队列已满(达到字节数或消息数限制),调用进程会阻塞。- IPC_NOWAIT:非阻塞模式。如果队列已满,函数立即返回 -1,并设置 errno 为 EAGAIN。3.从指定消息队列中接收一条消息
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
msqid:消息队列 ID。
msgp:指向用于存放接收消息的缓冲区。
msgsz:缓冲区中 mtext 字段的最大容量。
msgtyp:指定要接收的消息类型,这是最灵活的部分:- == 0:读取队列中的第一条消息(先进先出)。- > 0:读取队列中消息类型等于 msgtyp 的第一条消息。可以用来实现“消息通道”。- < 0:读取队列中消息类型小于等于 msgtyp 绝对值的类型值最小的消息。可以用来实现某种形式的优先级接收。
msgflg:控制标志。- 0:阻塞模式。如果没有指定类型的消息,调用进程会阻塞。- IPC_NOWAIT:非阻塞模式。如果没有消息,立即返回 -1,errno 设置为 ENOMSG。4.对消息队列执行各种控制操作。
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
msqid:消息队列 ID。
cmd:要执行的操作命令。- IPC_STAT:获取队列的状态信息,存入 buf 指向的结构体。- IPC_SET:设置队列的参数,如权限、所有者等。- IPC_RMID:立即删除消息队列。所有被阻塞的 msgsnd 和 msgrcv 调用都会立即失败,errno 被设置为 EIDRM。这是最常用的命令之一。
buf:指向 struct msqid_ds 结构体的指针,用于传入或传出信息。
3.2.3 对应的命令
-
ipcs -q
: 列出系统中所有的 System V 消息队列。 -
ipcrm -q msgid
: 删除指定msgid
的消息队列。
3.3 信号量
System V 信号量是 Unix/Linux 系统中最早的进程间同步与互斥机制之一,属于 System V IPC(Inter-Process Communication,进程间通信)三大组件(信号量、消息队列、共享内存)的核心成员。它的设计目标是解决多个进程对共享资源的并发访问问题,支持计数信号量(可实现资源计数)和二元信号量(可实现互斥锁)两种模式,且具有随内核持久化的特性(除非主动删除,否则会一直存在于内核中)。
System V 信号量不是以单个信号量为单位,而是以信号量集的形式存在。一个信号量集可以包含多个信号量(一个数组),这些信号量可以独立操作,也可以原子性地同时操作。