Linux/AndroidOS中进程间的通信线程间的同步 - 消息队列
本文介绍消息队列,它允许进程之间以消息的形式交换数据。数据的交换单位是整个消息。
- POSIX 消息队列是引用计数的。只有当所有当前使用队列的进程都关闭了队列之后才会对队列进行标记以便删除。
- POSIX 消息有一个关联的优先级,并且消息之间是严格按照优先级顺序排队的(以及接收)。
- POSIX 消息队列提供了一个特性允许在队列中的一条消息可用时异步地通知进程。
POSIX 消息队列支持是一个通过 CONFIG_POSIX_MQUEUE 选项配置的可选内核组件。
1 概述
POSIX 消息队列 API 中的主要函数如下。
- mq_open()函数创建一个新消息队列或打开一个既有队列,返回后续调用中会用到的消息队列描述符。
- mq_send()函数向队列写入一条消息。
- mq_receive()函数从队列中读取一条消息。
- mq_close()函数关闭进程之前打开的一个消息队列。
- mq_unlink()函数删除一个消息队列名并当所有进程关闭该队列时对队列进行标记以便删除。
上面的函数所完成的功能是相当明显的。此外,POSIX 消息队列 API 还具备一些特别的特性。
- 每个消息队列都有一组关联的特性,其中一些特性可以在使用 mq_open()创建或打开队列时进行设置。获取和修改队列特性的工作则是由两个函数来完成的:mq_getattr()和 mq_setattr()。
- mq_notify()函数允许一个进程向一个队列注册接收消息通知。在注册完之后,当一条消息可用时会通过发送一个信号或在一个单独的线程中调用一个函数来通知进程。
2 打开、关闭和断开链接消息队列
2.1 打开一个消息队列
mq_open()函数创建一个新消息队列或打开一个既有队列。
#include <fcntl.h> /*Defines 0*constants */
#include<sys/stat.h> /*Defines mode constants */
#include <mqueue.h>mqd_t mq_open(const char *name, int oflag, .../* mode t mode, struct mq_attr *attr */);/* Returns a message queue descriptor on success, or (mgd_t)-l on error */
name 参数标识出了消息队列。
oflag 参数是一个位掩码,它控制着 mq_open()操作的各个方面。下表对这个掩码中可以包含的值进行了总结。
oflag 参数的其中一个用途是,确定是打开一个既有队列还是创建和打开一个新队列。如果在 oflag 中不包含 O_CREAT,那么将会打开一个既有队列。如果在 oflag 中包含了 O_CREAT,并且与给定的 name 对应的队列不存在,那么就会创建一个新的空队列。如果在 oflag 中同时包含 O_CREAT 和 O_EXCL,并且与给定的 name 对应的队列已经存在,那么 mq_open()就会失败。
oflag 参数还能够通过包含 O_RDONLY、O_WRONLY 以及 O_RDWR 这三个值中的一个来表明调用进程在消息队列上的访问方式。
剩下的一个标记值 O_NONBLOCK 将会导致以非阻塞的模式打开队列。如果后续的mq_receive()或 mq_send()调用无法在不阻塞的情况下执行,那么调用就会立即返回 EAGAIN 错误。
mq_open()通常用来打开一个既有消息队列,这种调用只需要两个参数,但如果在 flags中指定了 O_CREAT,那么就还需要另外两个参数:mode 和 attr。(如果通过 name 指定的队列已经存在,那么这两个参数会被忽略。)这些参数的用法如下。
- mode 参数是一个位掩码,它指定了施加于新消息队列之上的权限。这个参数可取的位值与文件上的掩码值是一样的,并且与 open()一样,mode 中的值会与进程的 umask取掩码。要从一个队列中读取消息(mq_receive())就必须要将读权限赋予相应的用户,要向队列写入消息(mq_send())就需要写权限。
- attr 参数是一个 mq_attr 结构,它指定了新消息队列的特性。如果 attr 为 NULL,那么将使用实现定义的默认特性创建队列。
mq_open()在成功结束时会返回一个消息队列描述符,它是一个类型为 mqd_t 的值,在后续的调用中将会使用它来引用这个打开着的消息队列。即需要确保这个类型是一个能在赋值语句中使用或能作为函数参数传递的的类型。(如在 Linux 上,mqd_t 是一个 int)
2.2 fork()、exec()以及进程终止对消息队列描述符的影响
在 fork()中子进程会接收其父进程的消息队列描述符的副本,并且这些描述符会引用同样的打开着的消息队列描述。子进程不会继承其父进程的任何消息通知注册。
当一个进程执行了一个 exec()或终止时,所有其打开的消息队列描述符会被关闭。关闭消息队列描述符的结果是进程在相应队列上的消息通知注册会被注销。
2.3 关闭一个消息队列
mq_close()函数关闭消息队列描述符 mqdes。
#include <mqueue.h>
int mq_close(mqd_t mqdes);/* Returns 0 on success, or -l on error */
如果调用进程已经通过 mqdes 在队列上注册了消息通知,那么通知注册会自动被删除,并且另一个进程可以随后向该队列注册消息通知。
当进程终止或调用 exec()时,消息队列描述符会被自动关闭。与文件描述符一样,应用程序应该在不再使用消息队列描述符的时候显式地关闭消息队列描述符以防止出现进程耗尽消息队列描述符的情况。
与文件上的 close()一样,关闭一个消息队列并不会删除该队列。要删除队列则需要使用mq_unlink(),它是 unlink()在消息队列上的版本。
2.4 删除一个消息队列
mq_unlink()函数删除通过 name 标识的消息队列,并将队列标记为在所有进程使用完该队列之后销毁该队列(这可能意味着会立即删除,前提是所有打开该队列的进程已经关闭了该队列)。
#include <mqueue.h>
int mq_unlink(const char *name);/* Returns 0 on success, or -l on error */
示例程序显示了 mq_unlink()的用法。
/* pmsg_unlink.cUsage: pmsg_unlink mq-nameUnlink a POSIX message queue.使用 mq_unlink()断开一个 POSIX 消息队列的链接Linux supports POSIX message queues since kernel 2.6.6.
*/
#include <mqueue.h>
#include "tlpi_hdr.h"int
main(int argc, char *argv[])
{if (argc != 2 || strcmp(argv[1], "--help") == 0)usageErr("%s mq-name\n", argv[0]);if (mq_unlink(argv[1]) == -1)errExit("mq_unlink");exit(EXIT_SUCCESS);
}
3 描述符和消息队列之间的关系
消息队列描述符和打开着的消息队列之间的关系 与 文件描述符和打开着的文件描述之间的关系类似。消息队列描述符是一个进程级别的句柄,它引用了系统层面的打开着的消息队列描述表中的一个条目,而该条目则引用了一个消息队列对象。下图对这种关系进行了描绘。
在 Linux 上,消息队列被实现成了虚拟文件系统中的 i-node,并且消息队列描述符和打开着的消息队列描述分别被实现成了文件描述符和打开着的文件描述。
下图有助于阐明消息队列描述符的使用方面的细节问题(所有这些都与文件描述符的使用类似)。
- 一个打开的消息队列描述拥有一组关联的标记。SUSv3 只规定了一种这样的标记,即NONBLOCK,它确定了 I/O 是否是非阻塞的。
- 两个进程能够持有引用同一个打开的消息队列描述的消息队列描述符(图中的描述符x)。当一个进程在打开了一个消息队列之后调用 fork()时就会发生这种情况。这些描述符会共享 O_NONBLOCK 标记的状态。
- 两个进程能够持有引用不同消息队列描述(它们引用了同一个消息队列)的打开的消息队列描述(如进程 A 中的描述符 z 和进程 B 中的描述符 y 都引用了/mq-r)。当两个进程分别使用 mq_open()打开同一个队列时就会发生这种情况。
4 消息队列特性
mq_open()、mq_getattr()以及 mq_setattr()函数都会接收一个参数,它是一个指向 mq_attr结构的指针。这个结构是在<mqueue.h>中进行定义的,其形式如下。
struct mq_attr {long mq_flags; /* Message queue description flags:0 or O_NONBLOCK [mq_getattr(),mq_setattr()] */long mq_maxmsg; /* Maximum number of messages on queue [mq_open(),mq_getattr()] */long mq_msgsize; /* Maximum message size(in bytes) [mq_open(),mq_getattr()] */long mq_curmsgs; /* Number of messages currently in queue [mq_getattr()] */
};
在开始深入介绍 mq_attr 的细节之前有必要指出以下几点。
- 这三个函数中的每个函数都只用到了其中几个字段。上面给出的结构定义中的注释指出了各个函数所用到的字段。
- 这个结构包含了与一个消息描述符相关联的打开的消息队列描述(mq_flags)的相关信息以及该描述符所引用的队列的相关信息(mq_maxmsg、mq_msgsize、mq_curmsgs)。
- 其中一些字段中包含的信息在使用 mq_open()创建队列时就已经确定下来了(mq_maxmsg 和 mq_msgsize);其他字段则会返回消息队列描述(mq_flags)或消息队列(mq_curmsgs)的当前状态的相关信息。
4.1 在创建队列时设置消息队列特性
在使用 mq_open()创建消息队列时可以通过下列 mq_attr 字段来确定队列的特性。
- mq_maxmsg 字段定义了使用mq_send()向消息队列添加消息的数量上限,其取值必须大于0。
- mq_msgsize 字段定义了加入消息队列的每条消息的大小的上限,其取值必须大于 0。
内核根据这两个值来确定消息队列所需的最大内存量。
mq_maxmsg 和 mq_msgsize 特性是在消息队列被创建时就确定下来的,并且之后也无法修改这两个特性。
示例程序为 mq_open()函数提供了一个命令行界面并展示了在 mq_open()中如何使用 mq_attr 结构。
消息队列特性可以通过两个命令行参数来指定:–m 用于指定 mq_maxmsg,–s 用于指定mq_msgsize。
- 只要指定了其中一个选项,那么一个非 NULL 的 attrp 参数就会被传递给mq_open()。
- 如果在命令行中只指定了–m 和–s 选项中的一个,那么 attrp 指向的 mq_attr 结构中的一些字段就会取默认值。
- 如果两个选项都被没有被指定,那么在调用 mq_open()时会将attrp 指定为 NULL,这将会导致使用由实现定义的队列特性的默认值来创建队列。
/* pmsg_create.cCreate a POSIX message queue.创建一个 POSIX 消息队列Usage as shown in usageError().Linux supports POSIX message queues since kernel 2.6.6.
*/
#include <mqueue.h>
#include <sys/stat.h>
#include <fcntl.h>
#include "tlpi_hdr.h"static void
usageError(const char *progName)
{fprintf(stderr, "Usage: %s [-cx] [-m maxmsg] [-s msgsize] mq-name ""[octal-perms]\n", progName);fprintf(stderr, " -c Create queue (O_CREAT)\n");fprintf(stderr, " -m maxmsg Set maximum # of messages\n");fprintf(stderr, " -s msgsize Set maximum message size\n");fprintf(stderr, " -x Create exclusively (O_EXCL)\n");exit(EXIT_FAILURE);
}int
main(int argc, char *argv[])
{int flags, opt;mode_t perms;mqd_t mqd;struct mq_attr attr, *attrp;/* If 'attrp' is NULL, mq_open() uses default attributes. If anoption specifying a message queue attribute is supplied on thecommand line, we save the attribute in 'attr' and set 'attrp'pointing to 'attr'. We assign some (arbitrary) default valuesto the fields of 'attr' in case the user specifies the valuefor one of the queue attributes, but not the other. */attrp = NULL;attr.mq_maxmsg = 10;attr.mq_msgsize = 2048;flags = O_RDWR;/* Parse command-line options */while ((opt = getopt(argc, argv, "cm:s:x")) != -1) {switch (opt) {case 'c':flags |= O_CREAT;break;case 'm':attr.mq_maxmsg = atoi(optarg);attrp = &attr;break;case 's':attr.mq_msgsize = atoi(optarg);attrp = &attr;break;case 'x':flags |= O_EXCL;break;default:usageError(argv[0]);}}if (optind >= argc)usageError(argv[0]);perms = (argc <= optind + 1) ? (S_IRUSR | S_IWUSR) :getInt(argv[optind + 1], GN_BASE_8, "octal-perms");mqd = mq_open(argv[optind], flags, perms, attrp);if (mqd == (mqd_t) -1)errExit("mq_open");exit(EXIT_SUCCESS);
}
4.2 获取消息队列特性
mq_getattr()函数返回一个包含与描述符 mqdes 相关联的消息队列描述和消息队列的相关信息的 mq_attr 结构。
#include <mqueue.h>
int mq_getattr(mqd_t mqdes, struct mq_attr *attr);/* Returs 0 on success, or -l on error */
除了上面已经介绍的 mq_maxmsg 和 mq_msgsize 字段之外,attr 指向的返回结构中还包含下列字段。
- mq_flags
这些是与描述符 mqdes 相关联的打开的消息队列描述的标记,其取值只有一个:O_NONBLOCK。这个标记是根据 mq_open()的 oflag 参数来初始化的,并且使用 mq_setattr()可以修改这个标记。 - mq_curmsgs
当前位于队列中的消息数。这个信息在 mq_getattr()返回时可能已经发生了改变,前提是存在其他进程从队列中读取消息或向队列写入消息。
示例程序使用了 mq_getattr()来获取通过命令行参数指定的消息队列的特性,然后在标准输出中显示这些特性。
/* pmsg_getattr.cDisplay attributes of a POSIX message queue.获取 POSIX 消息队列特性Linux supports POSIX message queues since kernel 2.6.6.
*/
#include <mqueue.h>
#include "tlpi_hdr.h"int
main(int argc, char *argv[])
{mqd_t mqd;struct mq_attr attr;if (argc != 2 || strcmp(argv[1], "--help") == 0)usageErr("%s mq-name\n", argv[0]);mqd = mq_open(argv[1], O_RDONLY);if (mqd == (mqd_t) -1)errExit("mq_open");if (mq_getattr(mqd, &attr) == -1)errExit("mq_getattr");printf("Maximum # of messages on queue: %ld\n", attr.mq_maxmsg);printf("Maximum message size: %ld\n", attr.mq_msgsize);printf("# of messages currently on queue: %ld\n", attr.mq_curmsgs);exit(EXIT_SUCCESS);
}
下面的 shell会话使用了示例程序来创建一个消息队列并使用实现定义的默认值来初始化其特性(即传入 mq_open()的最后一个参数为 NULL),然后显示队列特性,这样就能够看到 Linux 上的默认设置了。
$ ./pmsg_create -cx /mq
$ ./pmsg_getattr /mq
Maximum # of messages on queue: 10
Maximum message size: 8192
# of messages currently on queue: 0
$ ./pmsg_unlink /mq
从上面的输出中可以看出 Linux 上 mq_maxmsg 和 mq_msgsize 的默认取值分别为 10 和8192。
4.3 修改消息队列特性
mq_setattr()函数设置与消息队列描述符 mqdes 相关联的消息队列描述的特性,并可选地返回与消息队列有关的信息。
#include <mqueue.h>
int mq_setattr(mqd_t mqdes, const struct mq_attr *newattr, struct mq_attr *oldattr);/* Returns 0 on success, or-l on error */
mq_setattr()函数执行下列任务。
- 它使用 newattr 指向的 mq_attr 结构中的 mq_flags 字段来修改与描述符 mqdes 相关联的消息队列描述的标记(POSIX 规定使用 mq_setattr()能够修改的唯一特性是 O_NONBLOCK 标记的状态)。
- 如果 oldattr 不为 NULL,那么就返回一个包含之前的消息队列描述标记和消息队列特性的 mq_attr 结构(即与 mq_getattr()执行的任务一样)。
应用程序应该通过使用 mq_getattr() 来 获 取 mq_flags 值并修改O_NONBLOCK 位来修改 O_NONBLOCK 标记的状态以及调用 mq_setattr()来修改 mq_flags 设置。如为启用 O_NONBLOCK 需要编写下列代码:
if(mg_getattr(mqd,&attr)==-1)errExit("mq getattr");
attr.mq_flagS=O_NONBLOCK;
if(mq_setattr(mqd,&attr,NULL)==-1)errExit("mq_getattr");
5 交换消息
5.1 发送消息
mq_send()函数将位于 msg_ptr 指向的缓冲区中的消息添加到描述符 mqdes 所引用的消息队列中。
#include <mqueue.h>
int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned int msg_prio);/* Returns 0 on success, or -l on error */
msg_len 参数指定了 msg_ptr 指向的消息的长度,其值必须小于或等于队列的 mq_msgsize特性,否则 mq_send()就会返回 EMSGSIZE 错误。长度为零的消息是允许的。
每条消息都拥有一个用非负整数表示的优先级,它通过 msg_prio 参数指定。消息在队列中是按照优先级倒序排列的(即 0 表示优先级最低)。当一条消息被添加到队列中时,它会被放置在队列中具有相同的优先级的所有消息之后。如果一个应用程序无需使用消息优先级,那么只需要将 msg_prio 指定为 0 即可。
Linux上 这个上限至少是 32768(_POSIX_MQ_PRIO_MAX),即优先级的取值范围至少为 0 到 32767。
如果消息队列已经满了(即已经达到了队列的 mq_maxmsg 限制),那么后续的 mq_send()调用会阻塞直到队列中存在可用空间为止或者在 O_NONBLOCK 标记起作用时立即失败并返回 EAGAIN 错误。
示例程序为 mq_send()函数提供了一个命令行界面。
/* pmsg_send.cUsage as shown in usageError().Send a message (specified as a command line argument) to a POSIX message queue.向 POSIX 消息队列写入一条消息See also pmsg_receive.c.Linux supports POSIX message queues since kernel 2.6.6.
*/
#include <mqueue.h>
#include <fcntl.h> /* For definition of O_NONBLOCK */
#include "tlpi_hdr.h"static void
usageError(const char *progName)
{fprintf(stderr, "Usage: %s [-n] mq-name msg [prio]\n", progName);fprintf(stderr, " -n Use O_NONBLOCK flag\n");exit(EXIT_FAILURE);
}int
main(int argc, char *argv[])
{int flags, opt;mqd_t mqd;unsigned int prio;flags = O_WRONLY;while ((opt = getopt(argc, argv, "n")) != -1) {switch (opt) {case 'n': flags |= O_NONBLOCK; break;default: usageError(argv[0]);}}if (optind + 1 >= argc)usageError(argv[0]);mqd = mq_open(argv[optind], flags);if (mqd == (mqd_t) -1)errExit("mq_open");prio = (argc > optind + 2) ? atoi(argv[optind + 2]) : 0;if (mq_send(mqd, argv[optind + 1], strlen(argv[optind + 1]), prio) == -1)errExit("mq_send");exit(EXIT_SUCCESS);
}
5.2 接收消息
mq_receive()函数从 mqdes 引用的消息队列中删除一条优先级最高、存在时间最长的消息并将删除的消息放置在 msg_ptr 指向的缓冲区。
#include <mqueue.h>
ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned int *msg_prio);/* Returns number of bytes in received message on success, or -l on error */
调用者使用 msg_len 参数来指定 msg_ptr 指向的缓冲区中的可用字节数。
不管消息的实际大小是什么,msg_len(即 msg_ptr 指向的缓冲区的大小)必须要大于或等于队列的 mq_msgsize 特性,否则 mq_receive()就会失败并返回 EMSGSIZE 错误。如果不清楚一个队列的 mq_msgsize 特性的值,那么可以使用 mq_getattr()来获取这个值。(在一个包含多个协作进程的应用程序中一般无需使用 mq_getattr(),因为应用程序通常能够提前确定队列的 mq_msgsize 设置。)
如果 msg_prio 不为 NULL,那么接收到的消息的优先级会被复制到 msg_prio 指向的位置处。如果消息队列当前为空,那么 mq_receive() 会阻塞直到存在可用的消息或在O_NONBLOCK 标记起作用时会立即失败并返回 EAGAIN 错误。
示例程序为 mq_receive()函数提供了一个命令行界面,在 usageError()函数中给出了这个程序的命令格式。下面的 shell 会话演示了示例程序的用法。首先创建了一个消息队列并向其发送了一些具备不同优先级的消息。
$ ./pmsg_create -cx /mq
$ ./pmsg_send /mq msg-a 5
$ ./pmsg_send /mq msg-b 0
$ ./pmsg_send /mq msg-c 10
然后执行一系列命令来从队列中接收消息。
$ ./pmsg_receive /mq
Read 5 bytes; priority = 10
msg-c
$ ./pmsg_receive /mq
Read 5 bytes; priority = 5
msg-a
$ ./pmsg_receive /mq
Read 5 bytes; priority = 0
msg-b
从上面的输出中可以看出,消息的读取是按照优先级来进行的。
此刻,这个队列是空的。当再次执行阻塞式接收时,操作就会阻塞。
$ ./pmsg_receive /mq
另一方面,如果执行了一个非阻塞接收,那么调用就会立即返回一个失败状态。
$ ./pmsg_receive -n /mq
ERROR [EAGAIN/EWOULDBLOCK Resource temporarily unavailable] mq_receive
/* pmsg_receive.cUsage as shown in usageError().Receive a message from a POSIX message queue, and write it onstandard output.从 POSIX 消息队列中读取一条消息See also pmsg_send.c.Linux supports POSIX message queues since kernel 2.6.6.
*/
#include <mqueue.h>
#include <fcntl.h> /* For definition of O_NONBLOCK */
#include "tlpi_hdr.h"static void
usageError(const char *progName)
{fprintf(stderr, "Usage: %s [-n] mq-name\n", progName);fprintf(stderr, " -n Use O_NONBLOCK flag\n");exit(EXIT_FAILURE);
}int
main(int argc, char *argv[])
{int flags, opt;mqd_t mqd;unsigned int prio;void *buffer;struct mq_attr attr;ssize_t numRead;flags = O_RDONLY;while ((opt = getopt(argc, argv, "n")) != -1) {switch (opt) {case 'n': flags |= O_NONBLOCK; break;default: usageError(argv[0]);}}if (optind >= argc)usageError(argv[0]);mqd = mq_open(argv[optind], flags);if (mqd == (mqd_t) -1)errExit("mq_open");/* We need to know the 'mq_msgsize' attribute of the queue inorder to determine the size of the buffer for mq_receive() */if (mq_getattr(mqd, &attr) == -1)errExit("mq_getattr");buffer = malloc(attr.mq_msgsize);if (buffer == NULL)errExit("malloc");numRead = mq_receive(mqd, buffer, attr.mq_msgsize, &prio);if (numRead == -1)errExit("mq_receive");printf("Read %ld bytes; priority = %u\n", (long) numRead, prio);if (write(STDOUT_FILENO, buffer, numRead) == -1)errExit("write");write(STDOUT_FILENO, "\n", 1);exit(EXIT_SUCCESS);
}
5.3 在发送和接收消息时设置超时时间
mq_timedsend()和 mq_timedreceive()函数与 mq_send()和 mq_receive()几乎是完全一样的,它们之间唯一的差别在于如果操作无法立即被执行,并且该消息队列描述上的O_NONBLOCK 标记不起作用,那么 abs_timeout 参数就会为调用阻塞的时间指定一个上限。
#define _XOPEN_SOURCE 600
#include <mqueue.h>
#include <time.h>
int mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned int msg_prio, const struct timespec *abs_timeout);/* Returns 0 on success, or -l on error */
ssize_t mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,unsigned int *msg_prio, const struct timespec *abs_timeout);/* Returns number of bytes in received message on success, or -l on error */
abs_timeout 参数是一个 timespec 结构,它将超时时间描述为自新纪元到现在的一个绝对值,其单位为秒数和纳秒数。要指定一个相对超时则可以使用 clock_gettime()来获取 CLOCK_REALTIME 时钟的当前值并在该值上加上所需的时间量来生成一个恰当初始化过的 timespec 结构。
如果 mq_timedsend()或 mq_timedreceive()调用因超时而无法完成操作,那么调用就会失败并返回 ETIMEDOUT 错误。
6 消息通知
POSIX 消息队列能够接收之前为空的队列上有可用消息的异步通知(即队列从空变成了非空)。这个特性意味着已经无需执行一个阻塞的调用或将消息队列描述符标记为非阻塞并在队列上定期执行 mq_receive()调用了,因为一个进程能够请求消息到达通知,然后继续执行其他任务直到收到通知为止。进程可以选择通过信号的形式或通过在一个单独的线程中调用一个函数的形式来接收通知。
mq_notify()函数注册调用进程在一条消息进入描述符 mqdes 引用的空队列时接收通知。
#include <mqueue.h>
int mq_notify(mqd_t mqdes, const struct sigevent *notification);/* Returns 0 on success, or -l on error */
notification 参数指定了进程接收通知的机制。在深入介绍 notification 参数的细节之前,有关消息通知需要注意以下几点。
- 在任何一个时刻都只有一个进程(“注册进程”)能够向一个特定的消息队列注册接收通知。如果一个消息队列上已经存在注册进程了,那么后续在该队列上的注册请求将会失败(mq_notify()返回 EBUSY 错误)。
- 只有当一条新消息进入之前为空的队列时注册进程才会收到通知。如果在注册的时候队列中已经包含消息,那么只有当队列被清空之后有一条新消息达到之时才会发出通知。
- 当向注册进程发送了一个通知之后就会删除注册信息,之后任何进程就能够向队列注册接收通知了。换句话说,只要一个进程想要持续地接收通知,那么它就必须要在每次接收到通知之后再次调用 mq_notify()来注册自己。
- 注册进程只有在当前不存在其他在该队列上调用 mq_receive()而发生阻塞的进程时才会收到通知。如果其他进程在 mq_receive()调用中被阻塞了,那么该进程会读取消息,注册进程会保持注册状态。
- 一个进程可以通过在调用 mq_notify()时传入一个值为 NULL 的 notification 参数来撤销自己在消息通知上的注册信息。
示例程序中给出的是该结构的一个简化版本,它只列出了与 mq_notify()相关的字段。
union sigval {int sival_int; /*Integer value for accompanying data */void *sival_ptr; /* Pointer value for accompanying data */
};struct sigevent {int sigev_notify; /* Notification method */int sigev_signo; /* Notification signal for SIGEV SIGNAL */union sigval sigev_value; /*Value passed to signal handler or thread function */void (*sigev_notify_function) (union sigval); /* Thread notification function */void *sigev_notify_attributes; /*Really 'pthread attr t'*/
这个结构的 sigev_notify 字段将会被设置成下列值中的一个。
- SIGEV_NONE
注册这个进程接收通知,但当一条消息进入之前为空的队列时不通知该进程。与往常一样,当新消息进入空队列之后注册信息会被删除。 - SIGEV_SIGNAL
通过生成一个在 sigev_signo 字段中指定的信号来通知进程。如果 sigev_signo 是一个实时信号,那么 sigev_value 字段将会指定信号都带的数据。通过传入信号处理器的siginfo_t 结构中的 si_value 字段或通过调用 sigwaitinfo()或 sigtimedwait()返回值能够取得这部分数据。siginfo_t 结构中的下列字段也会被填充:si_code,其值为 SI_MESGQ;si_signo,其值是信号编号;si_pid,其值是发送消息的进程的进程 ID;以及 si_uid,其值是发送消息的进程的真实用户 ID。(si_pid 和 si_uid 字段在其他大多数实现上不会被设置。) - SIGEV_THREAD
通过调用在 sigev_notify_function 中指定的函数来通知进程,就像是在一个新线程中启动该函数一样。sigev_notify_attributes 字段可以为 NULL 或是一个指向定义了线程的特性的 pthread_ attr_t 结构的指针。sigev_value 中指定的联合 sigval 值将会作为参数传入这个函数。
6.1 通过信号接收通知
示例程序提供了一个使用信号来进行消息通知的例子。这个程序执行了下列任务。
- 以非阻塞模式打开了一个通过命令行指定名称的消息队列①,确定了该队列的mq_msgsize 特性的值②,并分配了一个大小为该值的缓冲区来接收消息③。
- 阻塞通知信号(SIGUSR1)并为其建立一个处理器④。
- 首次调用 mq_notify()来注册进程接收消息通知⑤。
- 执行一个无限循环,在循环中执行下列任务。
(1) 调用 sigsuspend(),该函数会解除通知信号的阻塞状态并等待直到信号被捕获⑥。从这个系统调用中返回表示已经发生了一个消息通知。此刻,进程会撤销消息通知的注册信息。
(2) 调用 mq_notify()重新注册进程接收消息通知⑦。
(3) 执行一个 while 循环从队列中尽可能多地读取消息以便清空队列⑧。
/* mq_notify_sig.cUsage: mq_notify_sig mq-nameDemonstrate message notification via signals (catching the signals witha signal handler) on a POSIX message queue.通过信号接收消息通知
*/
#include <signal.h>
#include <mqueue.h>
#include <fcntl.h> /* For definition of O_NONBLOCK */
#include "tlpi_hdr.h"#define NOTIFY_SIG SIGUSR1static void
handler(int sig)
{/* Just interrupt sigsuspend() */
}/* This program does not handle the case where a message already exists onthe queue by the time the first attempt is made to register for messagenotification. In that case, the program would never receive a notification.See mq_notify_via_signal.c for an example of how to deal with that case. */int
main(int argc, char *argv[])
{struct sigevent sev;mqd_t mqd;struct mq_attr attr;void *buffer;ssize_t numRead;sigset_t blockMask, emptyMask;struct sigaction sa;if (argc != 2 || strcmp(argv[1], "--help") == 0)usageErr("%s mq-name\n", argv[0]);mqd = mq_open(argv[1], O_RDONLY | O_NONBLOCK); //以非阻塞模式打开了一个通过命令行指定名称的消息队列if (mqd == (mqd_t) -1)errExit("mq_open");/* Determine mq_msgsize for message queue, and allocate an input bufferof that size */if (mq_getattr(mqd, &attr) == -1) //确定了该队列的mq_msgsize 特性的值errExit("mq_getattr");buffer = malloc(attr.mq_msgsize); //并分配了一个大小为该值的缓冲区来接收消息if (buffer == NULL)errExit("malloc");/* Block the notification signal and establish a handler for it */sigemptyset(&blockMask); //阻塞通知信号(SIGUSR1)并为其建立一个处理器sigaddset(&blockMask, NOTIFY_SIG);if (sigprocmask(SIG_BLOCK, &blockMask, NULL) == -1)errExit("sigprocmask");sigemptyset(&sa.sa_mask);sa.sa_flags = 0;sa.sa_handler = handler;if (sigaction(NOTIFY_SIG, &sa, NULL) == -1)errExit("sigaction");/* Register for message notification via a signal */sev.sigev_notify = SIGEV_SIGNAL; //首次调用 mq_notify()来注册进程接收消息通知sev.sigev_signo = NOTIFY_SIG;if (mq_notify(mqd, &sev) == -1)errExit("mq_notify");sigemptyset(&emptyMask);for (;;) {sigsuspend(&emptyMask); /* 调用 sigsuspend(),该函数会解除通知信号的阻塞状态并等待直到信号被捕获 *//* Reregister for message notification */if (mq_notify(mqd, &sev) == -1) //调用 mq_notify()重新注册进程接收消息通知errExit("mq_notify");while ((numRead = mq_receive(mqd, buffer, attr.mq_msgsize, NULL)) >= 0) //执行一个 while 循环从队列中尽可能多地读取消息以便清空队列printf("Read %ld bytes\n", (long) numRead);if (errno != EAGAIN) /* Unexpected error */errExit("mq_receive");}
}
示例程序中存在很多方面值得详细解释。
- 程序阻塞了通知信号并使用 sigsuspend()来等待该信号,而没有使用 pause(),这是为了防止出现程序在执行 for 循环中的其他代码(即没有因等待信号而阻塞)时错过信号的情况。如果发生了这种情况,并且使用了 pause()来等待信号,那么下次调用 pause()时会阻塞,即使系统已经发出了一个信号。
- 程序以非阻塞模式打开了队列,并且当一个通知发生之后使用一个 while 循环来读取队列中的所有消息。通过这种方式来清空队列能够确保当一条新消息到达之后会产生一个新通知。使用非阻塞模式意味着 while 循环在队列被清空之后就会终止(mq_receive()会失败并返回 EAGAIN 错误)。
- 在 for 循环中比较重要的一点是在读取队列中的所有消息之前重新注册接收消息通知。如果颠倒了顺序,如按照下面的顺序:队列中的所有消息都被读取了,while 循环终止;另一个消息被添加到了队列中;mq_notify()被调用以重新注册接收消息通知。此刻,系统将不会产生新的通知信号,因为队列已经非空了,其结果是程序在下次调用 sigsuspend()时会永远阻塞。
6.2 通过线程接收通知
示例程序提供了一个使用线程来发布消息通知的例子。
- 当消息通知发生时,程序会在清空队列之前重新启用通知②。
- 采用了非阻塞模式使得在接收到一个通知之后可以在无需阻塞的情况下完全清空队列⑤。
/* mq_notify_thread.cDemonstrate message notification via threads on a POSIX message queue.
*/
#include <pthread.h>
#include <mqueue.h>
#include <signal.h>
#include <fcntl.h> /* For definition of O_NONBLOCK */
#include "tlpi_hdr.h"/* This program does not handle the case where a message already exists onthe queue by the time the first attempt is made to register for messagenotification. In that case, the program would never receive a notification.See mq_notify_via_thread.c for an example of how to deal with that case. */static void notifySetup(mqd_t *mqdp);static void /* Thread notification function */
threadFunc(union sigval sv)
{ssize_t numRead;mqd_t *mqdp;void *buffer;struct mq_attr attr;mqdp = sv.sival_ptr;/* Determine mq_msgsize for message queue, and allocate an input bufferof that size */if (mq_getattr(*mqdp, &attr) == -1)errExit("mq_getattr");buffer = malloc(attr.mq_msgsize);if (buffer == NULL)errExit("malloc");/* Reregister for message notification */notifySetup(mqdp);while ((numRead = mq_receive(*mqdp, buffer, attr.mq_msgsize, NULL)) >= 0)printf("Read %ld bytes\n", (long) numRead);if (errno != EAGAIN) /* Unexpected error */errExit("mq_receive");free(buffer);
}static void
notifySetup(mqd_t *mqdp)
{struct sigevent sev;sev.sigev_notify = SIGEV_THREAD; /* Notify via thread */sev.sigev_notify_function = threadFunc;sev.sigev_notify_attributes = NULL;/* Could be pointer to pthread_attr_t structure */sev.sigev_value.sival_ptr = mqdp; /* Argument to threadFunc() */if (mq_notify(*mqdp, &sev) == -1)errExit("mq_notify");
}int
main(int argc, char *argv[])
{mqd_t mqd;if (argc != 2 || strcmp(argv[1], "--help") == 0)usageErr("%s mq-name\n", argv[0]);mqd = mq_open(argv[1], O_RDONLY | O_NONBLOCK);if (mqd == (mqd_t) -1)errExit("mq_open");notifySetup(&mqd);pause(); /* Wait for notifications via thread function */
}
示例程序的设计还需要注意以下几点。
- 程序通过一个线程来请求通知需要将传入 mq_notify()的 sigevent 结构的 sigev_notify字段的值指定为 SIGEV_THREAD 。线程的启动函数 threadFunc() 是通过sigev_notify_function 字段来指定的③。
- 在启用消息通知之后,主程序会永远中止⑥;定时器通知是通过在一个单独的线程中调用 threadFunc()来分发的①。
- 本来可以通过将消息队列描述符 mqd 变成一个全局变量使之对 threadFunc()可见,但这里采用了一种不同的做法:将消息队列描述符的地址放在了传给 mq_notify()的sigev_value.sival_ptr 字段中④。当后面调用 threadFunc()时,这个参数会作为其参数被传入到该函数中。
7 Linux 特有的特性
POSIX 消息队列在 Linux 上的实现提供了一些非标准的却相当有用的特性。
7.1 通过命令行显示和删除消息队列对象
POSIX IPC 对象被实现成了虚拟文件系统中的文件,并且可以使用 ls和 rm 来列出和删除这些文件。为列出和删除 POSIX 消息队列就必须要使用形如下面的命令来将消息队列挂载到文件系统中。
# mount -t mqueue source target
source 可以是任意一个名字(通常将其指定为字符串 none),其唯一的意义是它将出现在/proc/mounts 中并且 mount 和 df 命令会显示出这个名字。target 是消息队列文件系统的挂载点。
下面的 shell 会话显示了如何挂载消息队列文件系统和显示其内容。首先为文件系统创建一个挂载点并挂载它。
$ sudo mkdir /dev/mqueue
$ sudo mount -t mqueue none /dev/mqueue
接着显示新挂载在/proc/mounts 中的记录,然后显示挂载目录上的权限。
$ cat /proc/mounts | grep mqueue
none /dev/mqueue mqueue rw 0 0
$ ls -ld /dev/mqueue
drwxrwxrwt 2 root root 60 May 7 16:54 /dev/mqueue/
在 ls 命令的输出中需要注意的一点是消息队列文件系统在挂载时会自动为挂载目录设置粘滞位。(从 ls 的输出中的 other-execute 权限字段中有一个 t 就可以看出这一点。)这意味着非特权进程只能在它所拥有的消息队列上执行断开链接的操作。
接着创建一个消息队列,使用 ls 来表明它在文件系统中是可见的,然后删除该消息队列。
$ ./pmsg_create -c /newq
$ ls /dev/mqueue
newq
$ rm /dev/mqueue/newq
7.2 获取消息队列的相关信息
可以显示消息队列文件系统中的文件的内容,每个虚拟文件都包含了其关联的消息队列的相关信息。
$ ./pmsg_create -c /mq
$ ./pmsg_send /mq abcdefg
$ cat /dev/mqueue/mq
QSIZE:7 NOTIFY:0 SIGNO:0 NOTIFY_PID:0
QSIZE 字段的值为队列中所有数据的总字节数,剩下的字段则与消息通知相关。如果NOTIFY_PID 为非零,那么进程 ID 为该值的进程已经向该队列注册接收消息通知了,剩下的字段则提供了与这种通知相关的信息。
- NOTIFY 是一个与其中一个 sigev_notify 常量对应的值:0 表示 SIGEV_SIGNAL,1表示 SIGEV_NONE,2 表示 SIGEV_THREAD。
- 如果通知方式是 SIGEV_SIGNAL,那么 SIGNO 字段指出了哪个信号会用来分发消息通知。下面的 shell 会话对这些字段中包含的信息进行了说明。
$ ./mq_notify_sig /mq &
[1] 1920782
$ cat /dev/mqueue/mq
QSIZE:7 NOTIFY:0 SIGNO:10 NOTIFY_PID:1920782
$ kill %1
$ ./mq_notify_thread /mq &
[2] 1921732
[1] Terminated ./mq_notify_sig /mq
$ cat /dev/mqueue/mq
QSIZE:7 NOTIFY:2 SIGNO:0 NOTIFY_PID:1921732
7.3 使用另一种 I/O 模型操作消息队列
在 Linux 实现上,消息队列描述符实际上是一个文件描述符,因此可以使用 I/O 多路复用系统调用(select()和 poll())或 epoll API 来监控这个文件描述符。
8 消息队列限制
SUSv3 为 POSIX 消息队列定义了两个限制。
- MQ_PRIO_MAX
它定义了一条消息的最大优先级。 - MQ_OPEN_MAX
一个实现可以定义这个限制来指明一个进程最多能打开的消息队列数量。由于 Linux将消息队列描述符实现成了文件描述符,因此适用于文件描述符的限制将适用于消息队列描述符。(换句话说,在 Linux 上,每个进程以及系统所能打开的文件描述符的数量限制实际上会应用于文件描述符数量和消息队列描述符数量之和。)
Linux 还提供了一些/proc 文件来查看和修改(需具备特权)控制 POSIX 消息队列的使用的限制。下面这三个文件位于/proc/sys/fs/ mqueue 目录中。
msg_max
这个限制为新消息队列的 mq_maxmsg 特性的取值规定了一个上限(即使用 mq_open()创建队列时 attr.mq_maxmsg 字段的上限值)。这个限制的默认值是 10,最小值是 1(在早于 2.6.28 的内核中是10),最大值由内核常量HARD_MSGMAX 定义,该常量的值是通过公式(131072 / sizeof(void *))计算得来的,在 Linux/x86-32 上其值为 32768。当一个特权进程(CAP_SYS_RESOURCE)调用 mq_open()时 msg_max 限制会被忽略,但 HARD_MSGMAX 仍然担当着 attr.mq_maxmsg 的上限值的角色。
msgsize_max
这个限制为非特权进程创建的新消息队列的 mq_msgsize 特性的取值规定了一个上限(即使用 mq_open()创建队列时 attr.mq_msgsize 字段的上限值)。这个限制的默认值是 8192,最小值是 128(在早于 2.6.28 的内核中是 8192),最大值是 1048576(在早于 2.6.28 的内核中是INT_MAX)。当一个非特权进程(CAP_SYS_RESOURCE)调用 mq_open()时会忽略这个限制。
queues_max
这是一个系统级别的限制,它规定了系统上最多能够创建的消息队列的数量。一旦达到这个限制,就只有特权进程(CAP_SYS_RESOURCE)才能够创建新队列。这个限制的默认值是 256,其取值可以为范围从 0 到 INT_MAX 之间的任意一个值。
Linux 还提供了 RLIMIT_MSGQUEUE 资源限制,它可以用来为属于调用进程的真实用户ID 的所有消息队列所消耗的空间规定一个上限。