当前位置: 首页 > news >正文

linux ipc之消息队列

前言

在用linux mtk8518平台做sourdbar的时候,发现appmain进程和audio进程使用消息队列进行通信,当高频率发送消息时会无规律的漏掉部分消息。现在就带着这个目地来学习下驱动层是如何实现的。

msgget

消息队列的创建,使用系统调用
文件位置:linux/ipc/msg.c

long ksys_msgget(key_t key, int msgflg)
{struct ipc_namespace *ns;static const struct ipc_ops msg_ops = {.getnew = newque,.associate = security_msg_queue_associate,};struct ipc_params msg_params;ns = current->nsproxy->ipc_ns;msg_params.key = key;msg_params.flg = msgflg;return ipcget(ns, &msg_ids(ns), &msg_ops, &msg_params);
}SYSCALL_DEFINE2(msgget, key_t, key, int, msgflg)
{return ksys_msgget(key, msgflg);
}

使用newque创建一个新的队列

static int newque(struct ipc_namespace *ns, struct ipc_params *params)
{struct msg_queue *msq;int retval;key_t key = params->key;int msgflg = params->flg;msq = kmalloc(sizeof(*msq), GFP_KERNEL_ACCOUNT);if (unlikely(!msq))return -ENOMEM;msq->q_perm.mode = msgflg & S_IRWXUGO;msq->q_perm.key = key;msq->q_perm.security = NULL;retval = security_msg_queue_alloc(&msq->q_perm);if (retval) {kfree(msq);return retval;}msq->q_stime = msq->q_rtime = 0;msq->q_ctime = ktime_get_real_seconds();msq->q_cbytes = msq->q_qnum = 0;msq->q_qbytes = ns->msg_ctlmnb;msq->q_lspid = msq->q_lrpid = NULL;INIT_LIST_HEAD(&msq->q_messages);INIT_LIST_HEAD(&msq->q_receivers);INIT_LIST_HEAD(&msq->q_senders);/* ipc_addid() locks msq upon success. */retval = ipc_addid(&msg_ids(ns), &msq->q_perm, ns->msg_ctlmni);if (retval < 0) {ipc_rcu_putref(&msq->q_perm, msg_rcu_free);return retval;}ipc_unlock_object(&msq->q_perm);rcu_read_unlock();return msq->q_perm.id;
}

消息的发送

使用系统调用msgsnd来发送信息

long ksys_msgsnd(int msqid, struct msgbuf __user *msgp, size_t msgsz,int msgflg)
{long mtype;if (get_user(mtype, &msgp->mtype))return -EFAULT;return do_msgsnd(msqid, mtype, msgp->mtext, msgsz, msgflg);
}SYSCALL_DEFINE4(msgsnd, int, msqid, struct msgbuf __user *, msgp, size_t, msgsz,int, msgflg)
{return ksys_msgsnd(msqid, msgp, msgsz, msgflg);
}

下面来看看do_msgsnd的具体实现

static long do_msgsnd(int msqid, long mtype, void __user *mtext,size_t msgsz, int msgflg)
{struct msg_queue *msq;struct msg_msg *msg;int err;struct ipc_namespace *ns;DEFINE_WAKE_Q(wake_q);ns = current->nsproxy->ipc_ns;if (msgsz > ns->msg_ctlmax || (long) msgsz < 0 || msqid < 0)return -EINVAL;if (mtype < 1)return -EINVAL;msg = load_msg(mtext, msgsz);//分配内存并将用户端数据进行cpif (IS_ERR(msg))return PTR_ERR(msg);msg->m_type = mtype;msg->m_ts = msgsz;rcu_read_lock();//加锁msq = msq_obtain_object_check(ns, msqid);if (IS_ERR(msq)) {err = PTR_ERR(msq);goto out_unlock1;}ipc_lock_object(&msq->q_perm);for (;;) {struct msg_sender s;err = -EACCES;if (ipcperms(ns, &msq->q_perm, S_IWUGO))goto out_unlock0;/* raced with RMID? */if (!ipc_valid_object(&msq->q_perm)) {err = -EIDRM;goto out_unlock0;}err = security_msg_queue_msgsnd(&msq->q_perm, msg, msgflg);if (err)goto out_unlock0;//如果当前列队总的消息长度+当前事件的长度不超过最大值,则break去处理消息if (msg_fits_inqueue(msq, msgsz))break;/* queue full, wait: */if (msgflg & IPC_NOWAIT) {//是否等待,不等待直接返回err = -EAGAIN;goto out_unlock0;}...//省略}ipc_update_pid(&msq->q_lspid, task_tgid(current));msq->q_stime = ktime_get_real_seconds();if (!pipelined_send(msq, msg, &wake_q)) {//返回0代表目前没有接收者或者msg数据过大//如果此消息当前没有处理者,那么先放到链表上去list_add_tail(&msg->m_list, &msq->q_messages);msq->q_cbytes += msgsz;msq->q_qnum++;percpu_counter_add_local(&ns->percpu_msg_bytes, msgsz);percpu_counter_add_local(&ns->percpu_msg_hdrs, 1);}err = 0;msg = NULL;out_unlock0:ipc_unlock_object(&msq->q_perm);wake_up_q(&wake_q);
out_unlock1:rcu_read_unlock();if (msg != NULL)free_msg(msg);return err;
}

pipelined_send:

static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg,struct wake_q_head *wake_q)
{struct msg_receiver *msr, *t;//轮循所有的处理者list_for_each_entry_safe(msr, t, &msq->q_receivers, r_list) {if (testmsg(msg, msr->r_msgtype, msr->r_mode) &&!security_msg_queue_msgrcv(&msq->q_perm, msg, msr->r_tsk,msr->r_msgtype, msr->r_mode)) {//找到当前事件的处理者了,那么这个处理者就需要从链表上移除,并使用此处理者处理消息list_del(&msr->r_list);if (msr->r_maxsize < msg->m_ts) {wake_q_add(wake_q, msr->r_tsk);/* See expunge_all regarding memory barrier */smp_store_release(&msr->r_msg, ERR_PTR(-E2BIG));} else {ipc_update_pid(&msq->q_lrpid, task_pid(msr->r_tsk));msq->q_rtime = ktime_get_real_seconds();wake_q_add(wake_q, msr->r_tsk);//将消息cp到注册的处理者smp_store_release(&msr->r_msg, msg);return 1;}}}return 0;
}

send函数大体逻辑如下:
1、判断队列总长度是否超标,没有则进行消息发送的操作
2、从注册的链表上轮循所有的处理者,如果有处理者则立马使用此处理者进行消息的处理,同时从链表上移除此处理者
3、如果没有找到处理者,那么就先将消息存放到事件链表上

消息的接收

使用系统调用msgrcv

long ksys_msgrcv(int msqid, struct msgbuf __user *msgp, size_t msgsz,long msgtyp, int msgflg)
{return do_msgrcv(msqid, msgp, msgsz, msgtyp, msgflg, do_msg_fill);
}SYSCALL_DEFINE5(msgrcv, int, msqid, struct msgbuf __user *, msgp, size_t, msgsz,long, msgtyp, int, msgflg)
{return ksys_msgrcv(msqid, msgp, msgsz, msgtyp, msgflg);
}

do_msgrcv:

static long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgflg,long (*msg_handler)(void __user *, struct msg_msg *, size_t))
{int mode;struct msg_queue *msq;struct ipc_namespace *ns;struct msg_msg *msg, *copy = NULL;DEFINE_WAKE_Q(wake_q);ns = current->nsproxy->ipc_ns;if (msqid < 0 || (long) bufsz < 0)return -EINVAL;if (msgflg & MSG_COPY) {if ((msgflg & MSG_EXCEPT) || !(msgflg & IPC_NOWAIT))return -EINVAL;copy = prepare_copy(buf, min_t(size_t, bufsz, ns->msg_ctlmax));if (IS_ERR(copy))return PTR_ERR(copy);}mode = convert_mode(&msgtyp, msgflg);rcu_read_lock();msq = msq_obtain_object_check(ns, msqid);if (IS_ERR(msq)) {rcu_read_unlock();free_copy(copy);return PTR_ERR(msq);}for (;;) {struct msg_receiver msr_d;msg = ERR_PTR(-EACCES);if (ipcperms(ns, &msq->q_perm, S_IRUGO))goto out_unlock1;ipc_lock_object(&msq->q_perm);/* raced with RMID? */if (!ipc_valid_object(&msq->q_perm)) {msg = ERR_PTR(-EIDRM);goto out_unlock0;}//从消息链表上接收消息msg = find_msg(msq, &msgtyp, mode);if (!IS_ERR(msg)) {/** Found a suitable message.* Unlink it from the queue.*/if ((bufsz < msg->m_ts) && !(msgflg & MSG_NOERROR)) {msg = ERR_PTR(-E2BIG);goto out_unlock0;}/** If we are copying, then do not unlink message and do* not update queue parameters.*/if (msgflg & MSG_COPY) {msg = copy_msg(msg, copy);goto out_unlock0;}//将消息从链表上取下来,后续进行处理 list_del(&msg->m_list);msq->q_qnum--;msq->q_rtime = ktime_get_real_seconds();ipc_update_pid(&msq->q_lrpid, task_tgid(current));msq->q_cbytes -= msg->m_ts;percpu_counter_sub_local(&ns->percpu_msg_bytes, msg->m_ts);percpu_counter_sub_local(&ns->percpu_msg_hdrs, 1);ss_wakeup(msq, &wake_q, false);goto out_unlock0;}//如果没有消息,如何不等待则直接返回if (msgflg & IPC_NOWAIT) {msg = ERR_PTR(-ENOMSG);goto out_unlock0;}//等待就会将此处理者加入到处理者队列list_add_tail(&msr_d.r_list, &msq->q_receivers);msr_d.r_tsk = current;msr_d.r_msgtype = msgtyp;msr_d.r_mode = mode;if (msgflg & MSG_NOERROR)msr_d.r_maxsize = INT_MAX;elsemsr_d.r_maxsize = bufsz;//初始化WRITE_ONCE(msr_d.r_msg, ERR_PTR(-EAGAIN));/* memory barrier not required, we own ipc_lock_object() */__set_current_state(TASK_INTERRUPTIBLE);ipc_unlock_object(&msq->q_perm);rcu_read_unlock();//让出cpu让写消息的进程有机会运行schedule();rcu_read_lock();//看消息内存是否被重新写入,如果有那么直接处理消息msg = READ_ONCE(msr_d.r_msg);if (msg != ERR_PTR(-EAGAIN)) {/* see MSG_BARRIER for purpose/pairing */smp_acquire__after_ctrl_dep();goto out_unlock1;}/** ... or see -EAGAIN, acquire the lock to check the message* again.*/ipc_lock_object(&msq->q_perm);//第二次看消息内存是否被重新写入,如果有那么直接处理消息msg = READ_ONCE(msr_d.r_msg);if (msg != ERR_PTR(-EAGAIN))goto out_unlock0;//如果上面两个流程都没有执行,那么会将此处理者从链表上del掉。list_del(&msr_d.r_list);if (signal_pending(current)) {msg = ERR_PTR(-ERESTARTNOHAND);goto out_unlock0;}ipc_unlock_object(&msq->q_perm);}out_unlock0:ipc_unlock_object(&msq->q_perm);wake_up_q(&wake_q);
out_unlock1:rcu_read_unlock();if (IS_ERR(msg)) {free_copy(copy);return PTR_ERR(msg);}bufsz = msg_handler(buf, msg, bufsz);free_msg(msg);return bufsz;
}

rcv函数大体逻辑如下:
1、判断消息链表上有没有已经存在需要处理的数据。如果有将此消息从链表上移除同时对获取消息进行处理
2、如果没有需要处理的消息,那么将当前进程加入到处理者链表
3、如果没有找到处理者,那么就先将消息存放到事件链表上。同时当前进程主动调用schedule让出cpu执行权限,让发送消息的进程有机会进行消息发送
4、当当前进程再次被调度进来时候,判断处理者的消息是否被发送端重新写入,如果有写入那么直接处理此消息。如果没有那么将此处理者从链表移除

结论:从整个代码流程上看,暂时没看会导致消息漏掉的逻辑。

http://www.dtcms.com/a/537986.html

相关文章:

  • 英文企业网站开发推广最有效的办法
  • 计算机网络自顶向下方法8——应用层 HTTP报文格式与cookie机制
  • CUDA-GDB(8)——检查程序状态
  • 青海网站建设公明网站建设怎么做
  • 学院网站建设策划书村镇建设年度报表登录网站
  • 西安网站seo技术外贸企业网站模板建设可以吗
  • Rust:函数栈帧 Box智能指针
  • 如何实现大模型 “边生成边显示“
  • 网站排版教程程序员 做网站 微信公众号 赚钱
  • 无人机数据 → 三维模型与光谱指数 → 多源融合特征 → 机器学习模型与机理解释 → 生态应用案例与科研论文
  • 做性的网站有哪些内容科技股有哪些股票龙头2021
  • 深圳网站建设找智恒网络网站做竞价优化
  • 计算机视觉:基于YOLOv8/YOLOv7/YOLOv6/YOLOv5的零售柜商品检测识别系统(Python+PySide6界面+训练代码)(源码+文档)✅
  • 重庆网站设计公司推荐永久免费虚拟主机
  • 软件自学网站房地产设计公司
  • 网络科普:自治系统编号
  • 网站不显示index.html北京最大的广告制作公司
  • TCP 消息分段与粘包问题的完整解决方案
  • 网站怎么运营推广电话销售管理系统
  • 邢台公司网站建设南漳网站制作
  • 度假村网站模板关键词研究工具
  • 【算法】day13 链表
  • 可以做网站的语言济南泰安网站建设公司
  • 超级工程网站建设上海中心大厦wordpress 登陆后台
  • 淮安网站定制wordpress多用户图库
  • 顺企网萍乡网站建设网站排名优化怎么弄
  • switch宝可梦传说Z-A金手指1.0.1免通信进化和持物通信进化修改
  • 校园电子商务网站建设网站制作教程网站
  • 长沙第三方网站建设公司辽宁工程建设信息网诚信库怎么填
  • 建网站需要什么软件义乌跨境电商公司前十名