【Linux】进程间通讯-消息队列
参考博客:https://blog.csdn.net/IYXUAN/article/details/123295347
一、消息队列基本原理
-
消息队列的本质其实是一个内核提供的链表,内核基于这个链表,实现了一个数据结构;
-
向消息队列中写数据,实际上是向这个数据结构中插入一个新结点;从消息队列汇总读数据,实际上是从这个数据结构中删除一个结点;
-
消息队列提供了一个从一个进程向另外一个进程发送一块数据的方法;
-
消息队列也有管道一样的不足,就是每个数据块的最大长度是有上限的,系统上全体队列的最大总长度也有一个上限。Linux用宏MSGMAX和MSGMNB来限制一条消息的最大长度和一个队列的最大长度.
二、消息队列使用方法
2.1 用户消息缓冲区
无论发送进程还是接收进程,都需要在进程空间中用消息缓冲区来暂存消息。该消息缓冲区的结构定义如下:
struct msgbuf {long mtype; /* 消息的类型 */char mtext[]; /* 消息正文 */
};
- 可通过 mtype 区分数据类型,同过判断mtype,是否为需要接收的数据
mtext[]
为存放消息正文的数组,可以根据消息的大小定义该数组的长度
2.2 使用函数介绍
2.2.1 消息队列的创建
通过msgget创建消息队列,函数原型如下:
#include <sys/msg.h>
int msgget(key_t key, int msgflg);
返回值
- 成功 msgget 将返回一个非负整数,即该消息队列的标识码;
- 失败 则返回-1
2.2.2 向消息队列中添加信息
向消息队列中添加数据,使用到的是msgsnd()函数,函数原型如下:
int msgsnd(int msgid, const void *msg_ptr, size_t msg_sz, int msgflg);
- msgid: 由msgget函数返回的消息队列标识码
- msg_ptr: 是一个指针,指针指向准备发送的消息
- msg_sz: 是msg_ptr指向的消息长度,消息缓冲区结构体中mtext的大小,不包括数据的类型
- msgflg: 控制着当前消息队列满或到达系统上限时将要发生的事情。比如,msgflg = IPC_NOWAIT 表示队列满不等待,返回EAGAIN错误
注意,消息的数据结构却有一定的要求,指针msg_ptr所指向的消息结构一定要是以一个长整型成员变量开始的结构体,接收函数将用这个成员来确定消息的类型。所以消息结构要定义成这样:
struct my_message {long int message_type;/* The data you wish to transfer */
};
msg_sz 是msg_ptr指向的消息的长度,注意是消息的长度,而不是整个结构体的长度,也就是说msg_sz是不包括长整型消息类型成员变量的长度。
2.2.3 从消息队列中读取信息
msgrcv() 用来从一个消息队列获取消息,它的原型为:
int msgrcv(int msgid, void *msg_ptr, size_t msg_st, long int msgtype, int msgflg);
-
msgid: 由msgget函数返回的消息队列标识码
-
msg_ptr: 是一个指针,指针指向准备接收的消息
-
msgsz: 是msg_ptr指向的消息长度
-
msgtype: 可以实现接收优先级的简单形式
- msgtype=0返回队列第一条信息
- msgtype>0返回队列第一条类型等于msgtype的消息
- msgtype<0返回队列第一条类型小于等于msgtype绝对值的消息
-
msgflg: 控制着队列中没有相应类型的消息可供接收时将要发生的事。比如,msgflg=IPC_NOWAIT,队列没有可读消息不等待,返回ENOMSG错误。msgflg=MSG_NOERROR,消息大小超过msgsz时被截断
-
调用成功时,该函数返回放到接收缓存区中的字节数,消息被复制到由msg_ptr指向的用户分配的缓存区中,然后删除消息队列中的对应消息。失败时返回-1。
2.2.4 消息队列的控制函数
控制函数原型如下:
int msgctl(int msqid, int command, strcut msqid_ds *buf);
-
command: 是将要采取的动作,它可以取3个值
- IPC_STAT:把msgid_ds结构中的数据设置为消息队列的当前关联值,即用消息队列的当前关联值覆盖msgid_ds的值
- IPC_SET:如果进程有足够的权限,就把消息列队的当前关联值设置为msgid_ds结构中给出的值
- IPC_RMID:删除消息队列. 注意:若选择删除队列,第三个参数传NULL
-
buf是指向msgid_ds结构的指针,它指向消息队列模式和访问权限的结构
-
如果操作成功,返回0;如果失败,则返回-1
msgid_ds
结构至少包括以下成员:
struct msgid_ds
{uid_t shm_perm.uid;uid_t shm_perm.gid;mode_t shm_perm.mode;
};
三、消息队列通讯
3.1 查看创建的消息队列
使用下述shell
命令查看当前系统下的消息队列信息
ipcs -q
目前没有创建任何消息队列
3.2 生产者进程
- 下面的进程作为生产者进程,用于创建和销毁消息队列
- 每次发送消息,指定消息的类型与内容,发送到消息到构建好的消息队列中
#define IPCKEY 123
#define MSG_SIZE 1024struct Mymsg {long msg_type;char msg_content[MSG_SIZE];
};struct Mymsg myMsg = {0};
long msgType = 1;
int msgId;// 生产者
void test1() {// 创建或打开消息队列msgId = msgget(IPCKEY, IPC_CREAT | IPC_EXCL | 0666);if (msgId < 0) {if (errno == EEXIST) {msgId = msgget(IPCKEY, 0666);if (msgId < 0) {perror("msgget failed");return;}std::cout << "Message queue already exists. Continuing..." << std::endl;} else {perror("msgget failed");return;}} else {std::cout << "Message queue created successfully." << std::endl;}while (true) {std::cout << "Enter message type (0 to exit): ";if (!(std::cin >> msgType)) {std::cout << "Invalid input. Exiting..." << std::endl;break;}if (msgType == 0) {// 发送退出信号并删除队列myMsg.msg_type = 1;strcpy(myMsg.msg_content, "quit");msgsnd(msgId, &myMsg, strlen(myMsg.msg_content) + 1, 0);msgctl(msgId, IPC_RMID, NULL);std::cout << "Message queue removed." << std::endl;break;}std::string buffer;std::cout << "Enter message content: ";std::cin.ignore(); // 清除缓冲区std::getline(std::cin, buffer);myMsg.msg_type = msgType;strncpy(myMsg.msg_content, buffer.c_str(), MSG_SIZE - 1);myMsg.msg_content[MSG_SIZE - 1] = '\0';// 发送消息int ret = msgsnd(msgId, &myMsg, strlen(myMsg.msg_content) + 1, 0);if (ret < 0) {perror("msgsnd failed");if (errno == EAGAIN) {std::cout << "Message queue is full. Retry later." << std::endl;}} else {std::cout << "Message sent successfully." << std::endl;}}
}
3.4 消费者进程
- 下面的进程为消费者进程,每次从消息队列中读取消息
- 每次读取要设置读取的消息类型,这里统一设置为读取
1
号消息
#define IPCKEY 123
#define MSG_SIZE 1024struct Mymsg {long msg_type;char msg_content[MSG_SIZE];
};struct Mymsg myMsg = {0};
long msgType = 1;
int msgId;// 消费者
void test2() {// 打开消息队列msgId = msgget(IPCKEY, 0666);if (msgId < 0) {perror("msgget failed");return;}std::cout << "Consumer started. Waiting for messages..." << std::endl;while (true) {// 接收所有类型的消息ssize_t len = msgrcv(msgId, &myMsg, MSG_SIZE, msgType, IPC_NOWAIT);if (len > 0) {myMsg.msg_content[len] = '\0'; std::cout << "Received type " << myMsg.msg_type << ": " << myMsg.msg_content << std::endl;if (myMsg.msg_type == 1 && strcmp(myMsg.msg_content, "quit") == 0) {std::cout << "Exit signal received." << std::endl;break;}}else if (errno != ENOMSG) { // 忽略"没有消息"的错误perror("msgrcv failed");break;}usleep(100000); // 减少CPU占用}
}
3.5 运行结果
运行后,使用命令行查询消息队列信息,可以发现创建了一个消息队列
先运行生产者,在运行消费者,然后生产者向消息队列中发送消息,那么生产者很快就会拿出数据
如果消费者阻塞或者延迟打开,那么这些消息会按顺序依次读取到,因为消息队列的链表每次都是按顺序插入的,读取也是按顺序读取的
退出生产者进程,执行消息队列删除操作,再次查询消息队列,发现已经不存在了
四、总结
4.1 消息队列的优缺点
优点
- 结构化通信:消息带类型,消费者可按需过滤消息(如只处理特定类型的消息)。
- 异步解耦:生产者和消费者无需同时运行,消息会持久化存储在内核中(直到被读取或队列被删除)。
- 跨进程友好:适用于无关进程间的通信(基于键值而非进程 ID)。
缺点
- 性能限制:相比共享内存,消息队列的通信效率较低(需内核拷贝数据)。
- 内核空间限制:消息队列的大小、数量受内核参数限制(如
msgmax
、msgmnb
),可通过sysctl
命令查看或调整。 - 潜在阻塞:默认情况下,
msgsnd()
和msgrcv()
会阻塞等待队列有空间或有消息,需合理使用IPC_NOWAIT
避免死锁。
消息队列通过类型化消息和内核持久化存储实现了进程间的异步通信,适合需要结构化数据传递的场景。使用时需注意键值唯一性、类型过滤逻辑及阻塞控制,避免因资源竞争导致程序异常。
4.2 进程间通讯方式对比
方式 | 通信机制 | 速度 | 适用场景 |
---|---|---|---|
消息队列 | 内核消息链表,带类型消息 | 中等 | 需结构化消息、异步解耦的场景 |
共享内存 | 共享内存区域,需同步机制 | 最快 | 大数据量、高性能要求的场景 |
管道 / 命名管道 | 流式字节序列,单向通信 | 较慢 | 父子进程或亲缘进程间的简单通信 |
套接字 | 网络或本地字节流 / 数据报 | 较慢 | 跨主机或复杂网络拓扑的通信 |
更多资料:https://github.com/0voice