当前位置: 首页 > news >正文

Linux学习记录--消息队列

一.什么是消息队列

消息队列(Message Queue)是 Linux 系统中一种重要的进程间通信(IPC)机制。它允许不同进程通过队列形式传递消息,具有异步、解耦、流量削峰等优点。POSIX 消息队列提供了一种标准化的接口,可以在不同进程之间可靠地传递数据。

维度管道/匿名管道有名管道(FIFO)Posix 消息队列Posix 共享内存
数据形式字节流字节流结构化消息裸地址空间
优先级✅(0..MQ_PRIO_MAX-1)❌(自己实现)
多对多半双工半双工全双工、任意进程读写全双工、任意进程读写
内核持久化随进程结束消失随进程结束消失随内核直到显式 unlink随内核直到显式 unlink
需要同步原语不需要(内核已串行化)不需要不需要(内核已串行化)必须自己加互斥/信号量
拷贝次数2(内核缓冲)220(真正零拷贝)
编程复杂度(得自己管锁、管缓冲区布局)
典型吞吐量最高

二.核心API

函数关键参数速记返回值/错误典型用途⚠️ 注意事项备注
mq_openname 必须以“/”开头;oflag 位或:O_RDONLY / O_WRONLY / O_RDWR | O_CREAT | O_NONBLOCK成功返回队列描述符;失败返回(mqd_t)-1 并置 errno创建/打开一个队列① name 必须是 /somename 形式,且只能有一个 / 在开头;

队列不存在且指定 O_CREAT 时,必须提供 attr,否则行为未定义;调试时建议先 mq_unlink 防止复用旧属性。
mq_close关闭引用,不删除内核对象0 / -1进程退出前关闭描述符① 仅减少引用计数,不会删除队列;与文件描述符类似,进程退出时会自动关闭,但显式关闭更规范。
mq_unlink引用计数为 0 时立即销毁0 / -1清理残留队列① 只删除文件名,已打开的队列仍可继续使用;
类似文件系统的 unlink,但不会立即释放资源,直到所有引用关闭。
mq_sendmsg_len ≤ mq_msgsizeprio 0…MQ_PRIO_MAX-1,越大越靠前0 / -1,队列满且非阻塞时 errno=EAGAIN发送一条完整消息① msg_len 必须 ≤ 队列属性中的 mq_msgsize,否则返回 EMSGSIZE

发送长度可为 0,即“空消息”,接收端仍会收到一条完整消息(长度为 0)。
mq_receivemsg_len ≥ mq_msgsizeprio 可 NULL实际字节数 / -1,空队列且非阻塞时 errno=EAGAIN接收一条完整消息① msg_len 必须 ≥ mq_msgsize,否则返回 EMSGSIZE

接收缓冲区大小必须 ≥ mq_msgsize,否则返回 EMSGSIZE;空队列时非阻塞立即返回 EAGAIN
mq_getattrattr 出参:flags、maxmsg、msgsize、curmsgs0 / -1运行时查看队列属性① 只能读取当前属性,不能修改;
可用于轮询队列当前消息数,但值是瞬时的,可能被并发修改。
mq_setattr只能改 O_NONBLOCK 标志,其余字段忽略0 / -1动态切换阻塞/非阻塞① 唯一可修改的是 mq_flags 中的 O_NONBLOCK 位;
通常用于运行时切换阻塞/非阻塞模式,其他字段被忽略。

三.案例展示

1.先把代码跑起来

1. 环境检查
# 查看系统最大消息数与单条最大字节
sysctl kernel.msgmax kernel.msgmnb
# 查看 Posix MQ 默认上限
sysctl fs.mqueue.queues_max fs.mqueue.msg_max fs.mqueue.msgsize_max
# 临时调大(重启失效)
sudo sysctl -w fs.mqueue.msg_max=1000

运行结果如下,不同电脑配置可能不同:

图1 环境检查

2.API调用

在一个进程中简单使用:

#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <unistd.h>
#include <errno.h>
#include <stdio.h>
#include <string.h>#define MQ_NAME     "/demo_mq"
#define BUF_LEN     256
#define MQ_MAXMSG   10int main(void)
{/* 1. 强制删除可能已存在的旧队列,防止复用旧的属性 */mq_unlink(MQ_NAME);/* 2. 正确设置队列属性 *//*mq_receive 在队列空且没有设置 O_NONBLOCK 时会阻塞等待新消息*//*** 创建/打开队列时是否带 O_NONBLOCK 决定了初始标志;* 之后可以用 fcntl(mqd, F_SETFL, O_NONBLOCK) 动态切换;*/struct mq_attr attr = {.mq_flags   = O_NONBLOCK,   /* 设置 O_NONBLOCK | 0 */.mq_maxmsg  = MQ_MAXMSG,    /* 队列最大消息数 */.mq_msgsize = BUF_LEN,      /* 单条消息最大字节数 */.mq_curmsgs = 0};/* 3. 创建/打开消息队列 */mqd_t mqd = mq_open(MQ_NAME,O_CREAT | O_RDWR,0664,&attr);      /* 关键:必须传 &attr */if (mqd == (mqd_t)-1) {perror("mq_open");return 1;}/* 4. 打印队列当前属性 */mq_getattr(mqd, &attr);printf("flags     = 0x%lx\n", attr.mq_flags);printf("maxmsg    = %ld\n",   attr.mq_maxmsg);printf("msgsize   = %ld\n",   attr.mq_msgsize);printf("curmsgs   = %ld\n",   attr.mq_curmsgs);// 设置阻塞/非阻塞fcntl(mqd, F_SETFL, O_NONBLOCK);// 检测阻塞/非阻塞struct mq_attr cur;mq_getattr(mqd, &cur);if (cur.mq_flags & O_NONBLOCK)printf("当前是非阻塞\n");elseprintf("当前是阻塞\n");/*** @mq_send* Receive the oldest from highest priority messages in message queue MQDES.* int mq_send(mqd_t mqdes, const char *msg_ptr,size_t msg_len, unsigned msg_prio);* mqdes   – 队列描述符,打开时必须有写权限。  * msg_ptr – 用户缓冲区指针,内容完全透明,可以带结构体。* msg_len – 消息字节数,必须 ≤ mq_msgsize,否则返回 EMSGSIZE。* msg_prio– 0 到 sysconf(_SC_MQ_PRIO_MAX)-1 的整数,数值越大越靠前; * 如果接收方只想要最高优先级消息,可以把它当“带优先级的邮筒”。  * 返回值  – 0 成功,-1 失败;若队列满且未设 O_NONBLOCK 则阻塞,设了 O_NONBLOCK 立即返回 EAGAIN。*//* 5. 发送 10 条消息 */printf("begin write\n");char buf[BUF_LEN];for (int i = 0; i < 10; ++i) {int n = snprintf(buf, sizeof(buf), "消息队列中多发几条, is:%d", i + 1);if (mq_send(mqd, buf, n, 0) == -1) {perror("mq_send");mq_close(mqd);return 1;}}printf("write ok\n");/* 6. 打印队列当前属性 */mq_getattr(mqd, &attr);printf("flags     = 0x%lx\n", attr.mq_flags);printf("maxmsg    = %ld\n",   attr.mq_maxmsg);printf("msgsize   = %ld\n",   attr.mq_msgsize);printf("curmsgs   = %ld\n",   attr.mq_curmsgs);/*** @mq_receive* ssize_t mq_receive(mqd_t mqdes, char *msg_ptr,size_t msg_len, unsigned *msg_prio);* mq_receive 把一条消息从队列里拷贝到用户提供的 buf 中,同时把这条消息从队列里删除。* mqdes   – 必须有读权限。  * msg_ptr – 接收缓冲区,长度至少等于 mq_msgsize。  * msg_len – 你给出的缓冲区大小,必须 ≥ mq_msgsize,否则返回 EMSGSIZE。  * msg_prio– 如果不关心优先级可传 NULL,否则函数把实际优先级写回来。  * 返回值  – 成功返回实际收到的字节数;0 字节空消息是允许的;  *     队列为空且未设 O_NONBLOCK 时阻塞,否则返回 -1 置 EAGAIN。*//* 7. 取出 */ssize_t rcvd = 1;while(1){   /*mq_receive 在队列空且没有设置 O_NONBLOCK 时会阻塞等待新消息*/rcvd = mq_receive(mqd, buf, sizeof(buf), NULL); if (rcvd == -1) {perror("mq_receive");break;} else {write(STDOUT_FILENO, buf, rcvd);write(STDOUT_FILENO, "\n", 1);} }/* 8. 打印队列当前属性 */mq_getattr(mqd, &attr);printf("flags     = 0x%lx\n", attr.mq_flags);printf("maxmsg    = %ld\n",   attr.mq_maxmsg);printf("msgsize   = %ld\n",   attr.mq_msgsize);printf("curmsgs   = %ld\n",   attr.mq_curmsgs);/* 9. 清理 */mq_close(mqd);mq_unlink(MQ_NAME);return 0;
}
3. 编译 & 运行
# 终端   
gcc que_test.c -o test -lrt && ./test  

结果:

4.大体流程
mq_open()   ─┐
mq_close()  ├─ 生命周期管理
mq_unlink() ─┘
mq_send()   ─┐
mq_receive()├─ 数据流
mq_notify() ─┘ 异步通知(本文略)
5.关键结构体
struct mq_attr {long mq_flags;   // 仅 0 或 O_NONBLOCKlong mq_maxmsg;  // 队列深度long mq_msgsize; // 单条最大字节long mq_curmsgs; // 当前消息条数(只读)
};

注意:mq_flags 不能直接在 attr 里写“阻塞/非阻塞”来持久化,它只在 mq_open 时生效;后续想切换必须用 fcntl(mqd, F_SETFL, O_NONBLOCK)

可能案例过于简单,令人心想这和数组有啥区别?它和共享空间,命名管道有啥区别?怎么体现?

2.优先级体验

        在 que_test 里我们统一用 mq_send(mqd, buf, len, 0),即优先级=0,现在将优先级进行设置。(注意优先级的范围在0 --- maxlen-1)

把发送循环改成:

for (int i = 0; i < 10; ++i) {int prio = i % 3;          // 0,1,2 三级优先级snprintf(buf, sizeof(buf), "msg%ld", i);mq_send(mqd, buf, strlen(buf)+1, prio);
}

        再次运行,可以得到结果:

可见权值越大,优先级越高。

3.经典读写

        和前面的进程通讯一样,因为其能够跨进程,因此依旧经典套餐,一个进程读,一个进程写。依旧简单的使用,后续打补丁,不是现在懒得不想动(狗头)。

读进程
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <unistd.h>
#include <errno.h>
#include <stdio.h>
#include <string.h>#define MQ_NAME     "/demo_mq"
#define BUF_LEN     256
#define MQ_MAXMSG   10int main(void)
{/* 1. 强制删除可能已存在的旧队列,防止复用旧的属性 */mq_unlink(MQ_NAME);/* 2. 正确设置队列属性 *//*mq_receive 在队列空且没有设置 O_NONBLOCK 时会阻塞等待新消息*//*** 创建/打开队列时是否带 O_NONBLOCK 决定了初始标志;* 之后可以用 fcntl(mqd, F_SETFL, O_NONBLOCK) 动态切换;*/struct mq_attr attr = {.mq_flags   = O_NONBLOCK,   /* 设置 O_NONBLOCK | 0 */.mq_maxmsg  = MQ_MAXMSG,    /* 队列最大消息数 */.mq_msgsize = BUF_LEN,      /* 单条消息最大字节数 */.mq_curmsgs = 0};/* 3. 创建/打开消息队列 */mqd_t mqd = mq_open(MQ_NAME,O_CREAT | O_RDWR,0664,&attr);      /* 关键:必须传 &attr */if (mqd == (mqd_t)-1) {perror("mq_open");return 1;}fcntl(mqd, F_SETFL, 0);char buf[BUF_LEN];write(STDOUT_FILENO,"读取队列准备就绪。。。\n",35);while (1) {ssize_t n = mq_receive(mqd, buf, sizeof(buf), NULL);if (n == -1) {if (errno == EAGAIN) {          // 队列空usleep(100000);             // 稍等再试continue;}perror("mq_receive");break;}if (n >= 5 && memcmp(buf, "QUIT!\n", 6) == 0)break;write(STDOUT_FILENO, buf, n);
}/* 6. 打印队列当前属性 */mq_getattr(mqd, &attr);printf("flags     = 0x%lx\n", attr.mq_flags);printf("maxmsg    = %ld\n",   attr.mq_maxmsg);printf("msgsize   = %ld\n",   attr.mq_msgsize);printf("curmsgs   = %ld\n",   attr.mq_curmsgs);/* 7. 清理 */mq_close(mqd);mq_unlink(MQ_NAME);return 0;
}

在读进程创建消息队列,它与共享空间一样,永久文件,只需要在写进程打开此消息队列就可以通讯了。

写进程
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <unistd.h>
#include <errno.h>
#include <stdio.h>
#include <string.h>#define MQ_NAME     "/demo_mq"
#define BUF_LEN     256
#define MQ_MAXMSG   10int main(void)
{struct mq_attr attr = {.mq_flags   = O_NONBLOCK,   /* 设置 O_NONBLOCK | 0 */.mq_maxmsg  = MQ_MAXMSG,    /* 队列最大消息数 */.mq_msgsize = BUF_LEN,      /* 单条消息最大字节数 */.mq_curmsgs = 0};/* 1. 打开消息队列 */mqd_t mqd = mq_open(MQ_NAME,O_CREAT | O_RDWR,0664,&attr);      /* 关键:必须传 &attr */if (mqd == (mqd_t)-1) {perror("mq_open");return 1;}fcntl(mqd, F_SETFL, O_NONBLOCK);struct mq_attr cur;mq_getattr(mqd, &cur);if (cur.mq_flags & O_NONBLOCK)printf("当前是非阻塞\n");elseprintf("当前是阻塞\n");printf("begin write\n");char buf[BUF_LEN];while(1) {// 从控制台读取消息int n = read(STDIN_FILENO,buf,sizeof(buf));if (n == 0) {                /* Ctrl-D 产生 EOF */strcpy(buf, "QUIT!\n");  /* 发完就溜 */mq_send(mqd, buf, strlen(buf), 0);break;}// 将控制台消息发送到消息队列if (mq_send(mqd, buf, n, 0) == -1) {perror("mq_send");mq_close(mqd);return 1;}}printf("write ok\n");mq_close(mqd);
// 保证读端可以读完消息,因此关闭队列,移除队列的操作在读端return 0;
}
run一下
# 终端 1(reader)  
gcc que_read.c -o read -lrt && ./read  
# 终端 2(writer)  
gcc que_write.c -o write -lrt && ./write

结束写

四.小结

        只是基础的使用这些消息队列相关函数,对消息队列的性能特点并未利用,后续进阶。


文章转载自:

http://6845lypT.bdkhL.cn
http://VeSJw6BA.bdkhL.cn
http://mRb1YG00.bdkhL.cn
http://Ts96Nx3R.bdkhL.cn
http://4KSgSGos.bdkhL.cn
http://gxM1wydG.bdkhL.cn
http://azskoj8s.bdkhL.cn
http://6qapUzH3.bdkhL.cn
http://rYS4Cw0u.bdkhL.cn
http://hdssLHsw.bdkhL.cn
http://hJisZ5t4.bdkhL.cn
http://9Jw0e9Ii.bdkhL.cn
http://NoikWV7Z.bdkhL.cn
http://zdiwC7rc.bdkhL.cn
http://KrzYYkav.bdkhL.cn
http://g8c9pfgI.bdkhL.cn
http://Pzg5K7rP.bdkhL.cn
http://8K6SO8vo.bdkhL.cn
http://8zZociCj.bdkhL.cn
http://0zLBdjYR.bdkhL.cn
http://OFNpoUga.bdkhL.cn
http://Djs4EhpZ.bdkhL.cn
http://sKnMH7tf.bdkhL.cn
http://e4Zbajsr.bdkhL.cn
http://y5S1QKtG.bdkhL.cn
http://2aH3JHqI.bdkhL.cn
http://IURlFX3T.bdkhL.cn
http://6LnTytZq.bdkhL.cn
http://aOKTSR8f.bdkhL.cn
http://26WhjciY.bdkhL.cn
http://www.dtcms.com/a/376272.html

相关文章:

  • leetcode算法刷题的第三十一天
  • Linux驱动开发(2)进一步理解驱动
  • Linux驱动开发笔记(十)——中断
  • 推荐一款智能三防手机:IP68+天玑6300+PoC对讲+夜视
  • 栈:逆波兰表达式求解
  • nginx中ssl证书的获取与配置
  • 云平台得大模型使用以及调用
  • 手写简单的int类型顺序表
  • Spring Boot 深入剖析:BootstrapRegistry 与 BeanDefinitionRegistry 的对比
  • [rStar] 解决方案节点 | `BaseNode` | `MCTSNode`
  • 鸿蒙:@Builder 和 @BuilderParam正确使用方法
  • 美图云修-一站式AI修图软件
  • 从齿轮到智能:机器人如何重塑我们的世界【科普类】
  • F12中返回的id里preview和response内容不一致的问题
  • 【CSS 3D 交互】实现精美翻牌效果:从原理到实战
  • vue二次封装ant-design-vue的table,识别columns中的自定义插槽
  • vue方法汇总
  • GPU硬件架构和配置的理解
  • C++类和对象初识
  • 笔记:乐鑫 (Espressif) 的生态策略与开发者悖论
  • SELinux策略:域转换与类型继承
  • 【VLMs篇】06:Cosmos-Reason1:从物理常识到具身推理
  • 图漾相机 FM851-E2 相关资料
  • 资产管理什么软件好
  • npm 安装命令中关于 @ 的讲解,如:npm install @vue-office/docx vue-demi
  • PowerBI 没实现的的联动同步下钻,QuickBI 实现了
  • k8s+jenkins+harbor构建Devops平台
  • 【中文教材】35. 证券市场指数
  • 36.卷积神经网络:让AI学会看图
  • 【Linux】进程概念(一):从冯诺依曼体系到 PCB 的进程核心解析