Linux学习记录--消息队列
一.什么是消息队列
消息队列(Message Queue)是 Linux 系统中一种重要的进程间通信(IPC)机制。它允许不同进程通过队列形式传递消息,具有异步、解耦、流量削峰等优点。POSIX 消息队列提供了一种标准化的接口,可以在不同进程之间可靠地传递数据。
维度 | 管道/匿名管道 | 有名管道(FIFO) | Posix 消息队列 | Posix 共享内存 |
---|---|---|---|---|
数据形式 | 字节流 | 字节流 | 结构化消息 | 裸地址空间 |
优先级 | ❌ | ❌ | ✅(0..MQ_PRIO_MAX -1) | ❌(自己实现) |
多对多 | 半双工 | 半双工 | 全双工、任意进程读写 | 全双工、任意进程读写 |
内核持久化 | 随进程结束消失 | 随进程结束消失 | 随内核直到显式 unlink | 随内核直到显式 unlink |
需要同步原语 | 不需要(内核已串行化) | 不需要 | 不需要(内核已串行化) | 必须自己加互斥/信号量 |
拷贝次数 | 2(内核缓冲) | 2 | 2 | 0(真正零拷贝) |
编程复杂度 | 低 | 低 | 低 | 高(得自己管锁、管缓冲区布局) |
典型吞吐量 | 中 | 中 | 中 | 最高 |
二.核心API
函数 | 关键参数速记 | 返回值/错误 | 典型用途 | ⚠️ 注意事项 | 备注 |
---|---|---|---|---|---|
mq_open | name 必须以“/”开头;oflag 位或:O_RDONLY / O_WRONLY / O_RDWR | O_CREAT | O_NONBLOCK | 成功返回队列描述符;失败返回(mqd_t)-1 并置 errno | 创建/打开一个队列 | ① name 必须是 /somename 形式,且只能有一个 / 在开头; | 队列不存在且指定 O_CREAT 时,必须提供 attr ,否则行为未定义;调试时建议先 mq_unlink 防止复用旧属性。 |
mq_close | 关闭引用,不删除内核对象 | 0 / -1 | 进程退出前关闭描述符 | ① 仅减少引用计数,不会删除队列; | 与文件描述符类似,进程退出时会自动关闭,但显式关闭更规范。 |
mq_unlink | 引用计数为 0 时立即销毁 | 0 / -1 | 清理残留队列 | ① 只删除文件名,已打开的队列仍可继续使用; | 类似文件系统的 unlink ,但不会立即释放资源,直到所有引用关闭。 |
mq_send | msg_len ≤ mq_msgsize ;prio 0…MQ_PRIO_MAX-1 ,越大越靠前 | 0 / -1 ,队列满且非阻塞时 errno=EAGAIN | 发送一条完整消息 | ① msg_len 必须 ≤ 队列属性中的 mq_msgsize ,否则返回 EMSGSIZE ; | 发送长度可为 0,即“空消息”,接收端仍会收到一条完整消息(长度为 0)。 |
mq_receive | msg_len ≥ mq_msgsize ;prio 可 NULL | 实际字节数 / -1 ,空队列且非阻塞时 errno=EAGAIN | 接收一条完整消息 | ① msg_len 必须 ≥ mq_msgsize ,否则返回 EMSGSIZE ; | 接收缓冲区大小必须 ≥ mq_msgsize ,否则返回 EMSGSIZE ;空队列时非阻塞立即返回 EAGAIN 。 |
mq_getattr | attr 出参:flags、maxmsg、msgsize、curmsgs | 0 / -1 | 运行时查看队列属性 | ① 只能读取当前属性,不能修改; | 可用于轮询队列当前消息数,但值是瞬时的,可能被并发修改。 |
mq_setattr | 只能改 O_NONBLOCK 标志,其余字段忽略 | 0 / -1 | 动态切换阻塞/非阻塞 | ① 唯一可修改的是 mq_flags 中的 O_NONBLOCK 位; | 通常用于运行时切换阻塞/非阻塞模式,其他字段被忽略。 |
三.案例展示
1.先把代码跑起来
1. 环境检查
# 查看系统最大消息数与单条最大字节
sysctl kernel.msgmax kernel.msgmnb
# 查看 Posix MQ 默认上限
sysctl fs.mqueue.queues_max fs.mqueue.msg_max fs.mqueue.msgsize_max
# 临时调大(重启失效)
sudo sysctl -w fs.mqueue.msg_max=1000
运行结果如下,不同电脑配置可能不同:
图1 环境检查
2.API调用
在一个进程中简单使用:
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <unistd.h>
#include <errno.h>
#include <stdio.h>
#include <string.h>#define MQ_NAME "/demo_mq"
#define BUF_LEN 256
#define MQ_MAXMSG 10int main(void)
{/* 1. 强制删除可能已存在的旧队列,防止复用旧的属性 */mq_unlink(MQ_NAME);/* 2. 正确设置队列属性 *//*mq_receive 在队列空且没有设置 O_NONBLOCK 时会阻塞等待新消息*//*** 创建/打开队列时是否带 O_NONBLOCK 决定了初始标志;* 之后可以用 fcntl(mqd, F_SETFL, O_NONBLOCK) 动态切换;*/struct mq_attr attr = {.mq_flags = O_NONBLOCK, /* 设置 O_NONBLOCK | 0 */.mq_maxmsg = MQ_MAXMSG, /* 队列最大消息数 */.mq_msgsize = BUF_LEN, /* 单条消息最大字节数 */.mq_curmsgs = 0};/* 3. 创建/打开消息队列 */mqd_t mqd = mq_open(MQ_NAME,O_CREAT | O_RDWR,0664,&attr); /* 关键:必须传 &attr */if (mqd == (mqd_t)-1) {perror("mq_open");return 1;}/* 4. 打印队列当前属性 */mq_getattr(mqd, &attr);printf("flags = 0x%lx\n", attr.mq_flags);printf("maxmsg = %ld\n", attr.mq_maxmsg);printf("msgsize = %ld\n", attr.mq_msgsize);printf("curmsgs = %ld\n", attr.mq_curmsgs);// 设置阻塞/非阻塞fcntl(mqd, F_SETFL, O_NONBLOCK);// 检测阻塞/非阻塞struct mq_attr cur;mq_getattr(mqd, &cur);if (cur.mq_flags & O_NONBLOCK)printf("当前是非阻塞\n");elseprintf("当前是阻塞\n");/*** @mq_send* Receive the oldest from highest priority messages in message queue MQDES.* int mq_send(mqd_t mqdes, const char *msg_ptr,size_t msg_len, unsigned msg_prio);* mqdes – 队列描述符,打开时必须有写权限。 * msg_ptr – 用户缓冲区指针,内容完全透明,可以带结构体。* msg_len – 消息字节数,必须 ≤ mq_msgsize,否则返回 EMSGSIZE。* msg_prio– 0 到 sysconf(_SC_MQ_PRIO_MAX)-1 的整数,数值越大越靠前; * 如果接收方只想要最高优先级消息,可以把它当“带优先级的邮筒”。 * 返回值 – 0 成功,-1 失败;若队列满且未设 O_NONBLOCK 则阻塞,设了 O_NONBLOCK 立即返回 EAGAIN。*//* 5. 发送 10 条消息 */printf("begin write\n");char buf[BUF_LEN];for (int i = 0; i < 10; ++i) {int n = snprintf(buf, sizeof(buf), "消息队列中多发几条, is:%d", i + 1);if (mq_send(mqd, buf, n, 0) == -1) {perror("mq_send");mq_close(mqd);return 1;}}printf("write ok\n");/* 6. 打印队列当前属性 */mq_getattr(mqd, &attr);printf("flags = 0x%lx\n", attr.mq_flags);printf("maxmsg = %ld\n", attr.mq_maxmsg);printf("msgsize = %ld\n", attr.mq_msgsize);printf("curmsgs = %ld\n", attr.mq_curmsgs);/*** @mq_receive* ssize_t mq_receive(mqd_t mqdes, char *msg_ptr,size_t msg_len, unsigned *msg_prio);* mq_receive 把一条消息从队列里拷贝到用户提供的 buf 中,同时把这条消息从队列里删除。* mqdes – 必须有读权限。 * msg_ptr – 接收缓冲区,长度至少等于 mq_msgsize。 * msg_len – 你给出的缓冲区大小,必须 ≥ mq_msgsize,否则返回 EMSGSIZE。 * msg_prio– 如果不关心优先级可传 NULL,否则函数把实际优先级写回来。 * 返回值 – 成功返回实际收到的字节数;0 字节空消息是允许的; * 队列为空且未设 O_NONBLOCK 时阻塞,否则返回 -1 置 EAGAIN。*//* 7. 取出 */ssize_t rcvd = 1;while(1){ /*mq_receive 在队列空且没有设置 O_NONBLOCK 时会阻塞等待新消息*/rcvd = mq_receive(mqd, buf, sizeof(buf), NULL); if (rcvd == -1) {perror("mq_receive");break;} else {write(STDOUT_FILENO, buf, rcvd);write(STDOUT_FILENO, "\n", 1);} }/* 8. 打印队列当前属性 */mq_getattr(mqd, &attr);printf("flags = 0x%lx\n", attr.mq_flags);printf("maxmsg = %ld\n", attr.mq_maxmsg);printf("msgsize = %ld\n", attr.mq_msgsize);printf("curmsgs = %ld\n", attr.mq_curmsgs);/* 9. 清理 */mq_close(mqd);mq_unlink(MQ_NAME);return 0;
}
3. 编译 & 运行
# 终端
gcc que_test.c -o test -lrt && ./test
结果:
4.大体流程
mq_open() ─┐
mq_close() ├─ 生命周期管理
mq_unlink() ─┘
mq_send() ─┐
mq_receive()├─ 数据流
mq_notify() ─┘ 异步通知(本文略)
5.关键结构体
struct mq_attr {long mq_flags; // 仅 0 或 O_NONBLOCKlong mq_maxmsg; // 队列深度long mq_msgsize; // 单条最大字节long mq_curmsgs; // 当前消息条数(只读)
};
注意:
mq_flags
不能直接在attr
里写“阻塞/非阻塞”来持久化,它只在mq_open
时生效;后续想切换必须用fcntl(mqd, F_SETFL, O_NONBLOCK)
。
可能案例过于简单,令人心想这和数组有啥区别?它和共享空间,命名管道有啥区别?怎么体现?
2.优先级体验
在 que_test
里我们统一用 mq_send(mqd, buf, len, 0)
,即优先级=0,现在将优先级进行设置。(注意优先级的范围在0 --- maxlen-1)
把发送循环改成:
for (int i = 0; i < 10; ++i) {int prio = i % 3; // 0,1,2 三级优先级snprintf(buf, sizeof(buf), "msg%ld", i);mq_send(mqd, buf, strlen(buf)+1, prio);
}
再次运行,可以得到结果:
可见权值越大,优先级越高。
3.经典读写
和前面的进程通讯一样,因为其能够跨进程,因此依旧经典套餐,一个进程读,一个进程写。依旧简单的使用,后续打补丁,不是现在懒得不想动(狗头)。
读进程
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <unistd.h>
#include <errno.h>
#include <stdio.h>
#include <string.h>#define MQ_NAME "/demo_mq"
#define BUF_LEN 256
#define MQ_MAXMSG 10int main(void)
{/* 1. 强制删除可能已存在的旧队列,防止复用旧的属性 */mq_unlink(MQ_NAME);/* 2. 正确设置队列属性 *//*mq_receive 在队列空且没有设置 O_NONBLOCK 时会阻塞等待新消息*//*** 创建/打开队列时是否带 O_NONBLOCK 决定了初始标志;* 之后可以用 fcntl(mqd, F_SETFL, O_NONBLOCK) 动态切换;*/struct mq_attr attr = {.mq_flags = O_NONBLOCK, /* 设置 O_NONBLOCK | 0 */.mq_maxmsg = MQ_MAXMSG, /* 队列最大消息数 */.mq_msgsize = BUF_LEN, /* 单条消息最大字节数 */.mq_curmsgs = 0};/* 3. 创建/打开消息队列 */mqd_t mqd = mq_open(MQ_NAME,O_CREAT | O_RDWR,0664,&attr); /* 关键:必须传 &attr */if (mqd == (mqd_t)-1) {perror("mq_open");return 1;}fcntl(mqd, F_SETFL, 0);char buf[BUF_LEN];write(STDOUT_FILENO,"读取队列准备就绪。。。\n",35);while (1) {ssize_t n = mq_receive(mqd, buf, sizeof(buf), NULL);if (n == -1) {if (errno == EAGAIN) { // 队列空usleep(100000); // 稍等再试continue;}perror("mq_receive");break;}if (n >= 5 && memcmp(buf, "QUIT!\n", 6) == 0)break;write(STDOUT_FILENO, buf, n);
}/* 6. 打印队列当前属性 */mq_getattr(mqd, &attr);printf("flags = 0x%lx\n", attr.mq_flags);printf("maxmsg = %ld\n", attr.mq_maxmsg);printf("msgsize = %ld\n", attr.mq_msgsize);printf("curmsgs = %ld\n", attr.mq_curmsgs);/* 7. 清理 */mq_close(mqd);mq_unlink(MQ_NAME);return 0;
}
在读进程创建消息队列,它与共享空间一样,永久文件,只需要在写进程打开此消息队列就可以通讯了。
写进程
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <unistd.h>
#include <errno.h>
#include <stdio.h>
#include <string.h>#define MQ_NAME "/demo_mq"
#define BUF_LEN 256
#define MQ_MAXMSG 10int main(void)
{struct mq_attr attr = {.mq_flags = O_NONBLOCK, /* 设置 O_NONBLOCK | 0 */.mq_maxmsg = MQ_MAXMSG, /* 队列最大消息数 */.mq_msgsize = BUF_LEN, /* 单条消息最大字节数 */.mq_curmsgs = 0};/* 1. 打开消息队列 */mqd_t mqd = mq_open(MQ_NAME,O_CREAT | O_RDWR,0664,&attr); /* 关键:必须传 &attr */if (mqd == (mqd_t)-1) {perror("mq_open");return 1;}fcntl(mqd, F_SETFL, O_NONBLOCK);struct mq_attr cur;mq_getattr(mqd, &cur);if (cur.mq_flags & O_NONBLOCK)printf("当前是非阻塞\n");elseprintf("当前是阻塞\n");printf("begin write\n");char buf[BUF_LEN];while(1) {// 从控制台读取消息int n = read(STDIN_FILENO,buf,sizeof(buf));if (n == 0) { /* Ctrl-D 产生 EOF */strcpy(buf, "QUIT!\n"); /* 发完就溜 */mq_send(mqd, buf, strlen(buf), 0);break;}// 将控制台消息发送到消息队列if (mq_send(mqd, buf, n, 0) == -1) {perror("mq_send");mq_close(mqd);return 1;}}printf("write ok\n");mq_close(mqd);
// 保证读端可以读完消息,因此关闭队列,移除队列的操作在读端return 0;
}
run一下
# 终端 1(reader)
gcc que_read.c -o read -lrt && ./read
# 终端 2(writer)
gcc que_write.c -o write -lrt && ./write
写
结束写
四.小结
只是基础的使用这些消息队列相关函数,对消息队列的性能特点并未利用,后续进阶。