Linux应用开发-11-消息队列
System V IPC:主要用于同一台计算机上的不同进程间通信。
- System V IPC (进程间通信)特性:
- 使用“键”(Key) 标识:不同的进程通过一个约定好的、全局唯一的关键字 (key_t) 来访问同一个 IPC 对象。内核则通过唯一的标识符(如 msqid, shmid)来管理它们。
- 持久性 (Persistent):IPC 对象被创建后,会持续存在于内核中,即使创建它的进程退出了也不会消失。必须被显式删除(通过 msgctl 等函数或 ipcrm 命令)
- ipcs 命令:使用 ipcs 命令来查看系统中当前存在的所有 System V IPC 对象(消息队列、共享内存段、信号量数组)。

消息队列 (Message Queue) :提供了一种从一个进程向另一个进程发送“数据块”的方法。
| 特性 | 管道 (Pipe / FIFO) | 消息队列 (Message Queue) |
|---|---|---|
| 数据格式 | 无格式字节流 (Stream) | 有格式的消息 (Record-oriented),每条消息都有自己的“类型”。 |
| 读取方式 | 先进先出 (FIFO),必须按顺序读 | 按类型接收。接收进程可以有选择地读取特定类型的消息,而不必按先进先出的顺序。 |
| 同步 | 严格的同步和阻塞 | 避免了命名管道的同步和阻塞问题。 |
| API | read() / write() | msgrcv() / msgsnd() |
函数说明:
-
msgget() - (获取/创建消息队列)
- 创建一个新的消息队列,或者获取一个已经存在的消息队列的ID。
#include <sys/ipc.h> #include <sys/msg.h> int msgget(key_t key, int msgflg); /* key_t key: 消息队列的“键”(关键字)。全局唯一的数字,所有想访问同一个队列的进程都必须使用相同的 key**。IPC_PRIVATE: 一个特殊值,表示创建一个只供当前进程及其子进程(通过fork继承)使用的私有队列。ftok() 函数:在实际项目中,为避免 key 冲突,通常会使用 ftok() 函数根据一个已存在的文件路径和项目ID来生成一个唯一的 key。 int msgflg: 标志位,由多个选项通过 | (位或) 组合而成。IPC_CREAT: (创建) 如果 key 对应的队列不存在,则创建一个新的队列。如果已存在,则直接返回它的ID。IPC_EXCL: (排他) 必须与 IPC_CREAT 配合使用 (IPC_CREAT | IPC_EXCL)。如果队列已存在,则调用失败(返回 EEXIST 错误),而不是返回它的ID。权限位 (mode): 与 IPC_CREAT 配合使用,指定队列的读写权限, 0666 (表示所有人可读可写)。 */
- 创建一个新的消息队列,或者获取一个已经存在的消息队列的ID。
-
msgsnd() - (发送消息)
- 向一个已打开的消息队列添加(发送)一条新消息。
#include <sys/msg.h> int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg); /* int msqid: 目标消息队列的ID const void *msgp:指向您要发送的消息的指针。这个消息不是一个简单的字符串,它必须是一个自定义的结构体,且该结构体的第一个成员必须是 long 类型的消息类型。 size_t msgsz: 要发送的消息的数据部分的大小。它不包括 msg_type 成员的大小。sizeof(struct my_message) - sizeof(long) int msgflg: 标志位,0: 默认(阻塞),IPC_NOWAIT: 非阻塞。 */ struct my_message {long msg_type; // 必须是第一个成员char msg_data[100]; // 消息内容 };
- 向一个已打开的消息队列添加(发送)一条新消息。
-
msgrcv() - (接收消息)
- 用于从一个消息队列读取(接收)一条消息。
#include <sys/msg.h> ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg); /* int msqid: 消息队列的ID。 void *msgp: 指向一个空结构体的指针,用于存放接收到的消息。这个结构体的定义必须与发送方一致(第一个成员是 long msg_type)。 size_t msgsz: 您能接收的数据部分的最大大小(即 sizeof(struct my_message) - sizeof(long))。 long msgtyp: 指定您想接收哪种类型的消息:msgtyp == 0: 接收队列中的第一条消息(严格的先进先出 FIFO)。msgtyp > 0: 接收队列中类型完全等于 msgtyp 的第一条消息。msgtyp < 0: 接收队列中类型小于等于 msgtyp 绝对值的第一条消息(在所有符合条件的消息中,选择类型最小的)。 int msgflg: 标志位。0: 默认(阻塞),IPC_NOWAIT: 非阻塞,MSG_NOERROR: 如果收到的消息数据部分大于您指定的 msgsz,则截断消息,只接收 msgsz 字节,而不是返回错误。 */
- 用于从一个消息队列读取(接收)一条消息。
-
msgctl() - (控制消息队列)
- 用于对消息队列进行各种控制操作,如获取状态、设置属性,以及删除。
#include <sys/ipc.h> #include <sys/msg.h> int msgctl(int msqid, int cmd, struct msqid_ds *buf); /* int msqid: 消息队列的ID。 int cmd: 您要执行的命令:IPC_STAT: 获取队列状态。内核会将 msqid 对应的队列信息复制到您传入的 buf 结构体中。IPC_SET: 设置队列状态。内核会使用您在 buf 中提供的信息(例如权限 msg_perm.mode)来更新队列的属性。IPC_RMID: 删除队列。这是必须的操作!它会立即将消息队列从内核中永久删除。buf 参数此时应设为 NULL。 struct msqid_ds *buf: 一个指向内核数据结构的指针,用于传递或接收队列的属性信息。 */
- 用于对消息队列进行各种控制操作,如获取状态、设置属性,以及删除。
用法示例:
1.使用 ftok 基于一个临时文件路径生成了一个唯一的 key;
2.父进程使用 msgget 和这个 key 创建了一个新的消息队列,并立即调用 fork 将程序一分为二。
3.父进程作为“发送者”,通过 msgsnd 向队列发送了两条包含真实字符串数据(存放在 msg_text 字段)并标记了不同类型(Type 1 和 Type 2)的消息。子进程作为“接收者”,通过调用 msgrcv 演示了消息队列的“按类型接收”特性
4.先接收 Type 2 消息,然后再接收 Type 1 消息,证明了其灵活性。
5.最后,父进程在 wait 回收了退出的子进程后,调用 msgctl 分别执行 IPC_STAT(检查队列状态)和 IPC_RMID(永久删除队列)
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/wait.h>
#include <string.h>// 1. 定义一个用于生成 key 的唯一路径
#define KEY_PATH "/tmp/mq_demo_key"
#define KEY_ID 1// 2. 定义消息结构 (第一个成员必须是 long mtype)
struct msg_buffer {long msg_type;char msg_text[100];
};int main(void)
{int msqid;key_t key;struct msg_buffer message_send, message_recv;struct msqid_ds queue_status_buf; // 用于 msgctlsize_t data_size = sizeof(message_send.msg_text); // 计算数据部分大小// --- 准备工作:生成 key ---// (在实际应用中,文件 /tmp/mq_demo_key 应该是一个已存在的文件)// 为了演示简单,我们直接用 ftok生成一个idFILE *f = fopen(KEY_PATH, "w");if (f == NULL) {perror("fopen for key failed");exit(1);}fclose(f);key = ftok(KEY_PATH, KEY_ID);if (key == -1) {perror("ftok 失败");exit(1);}printf("父进程:使用 Key %ld 准备消息队列...\n", (long)key);// --- 1. msgget() ---// 创建一个新的消息队列 (如果已存在则报错,确保我们是创建者)msqid = msgget(key, IPC_CREAT | IPC_EXCL | 0666);if (msqid == -1) {perror("msgget 失败");exit(1);}printf("父进程:消息队列创建成功 (ID: %d)\n\n", msqid);// --- 创建子进程 ---pid_t pid = fork();if (pid < 0) {perror("fork 失败");exit(1);}if (pid == 0) {// --- 子进程 (接收者) ---sleep(1); // 确保父进程先发送printf("子进程 (PID %d): 启动,准备接收消息。\n", getpid());// --- 3. msgrcv() ---// 演示按类型接收:我们故意先接收类型为 2 的消息printf("子进程:正在等待类型为 2 的消息...\n");if (msgrcv(msqid, &message_recv, data_size, 2, 0) == -1) {perror("msgrcv (type 2) 失败");exit(1);}printf("子进程:成功收到类型 2 消息: '%s'\n", message_recv.msg_text);// 再接收类型为 1 的消息printf("子进程:正在等待类型为 1 的消息...\n");if (msgrcv(msqid, &message_recv, data_size, 1, 0) == -1) {perror("msgrcv (type 1) 失败");exit(1);}printf("子进程:成功收到类型 1 消息: '%s'\n", message_recv.msg_text);printf("子进程:所有消息接收完毕,退出。\n");exit(0);} else {// --- 父进程 (发送者 和 控制者) ---// --- 2. msgsnd() ---printf("父进程 (PID %d): 准备发送两条消息...\n", getpid());// 准备并发送消息 1 (类型为 1)message_send.msg_type = 1;strcpy(message_send.msg_text, "这是消息 1 (Type 1)");if (msgsnd(msqid, &message_send, data_size, 0) == -1) {perror("msgsnd (type 1) 失败");exit(1);}// 准备并发送消息 2 (类型为 2)message_send.msg_type = 2;strcpy(message_send.msg_text, "这是消息 2 (Type 2)");if (msgsnd(msqid, &message_send, data_size, 0) == -1) {perror("msgsnd (type 2) 失败");exit(1);}printf("父进程:两条消息已发送。\n\n");// 等待子进程结束wait(NULL); printf("\n父进程:子进程已退出。\n");// --- 4. msgctl() ---// 演示 IPC_STAT: 获取队列状态if (msgctl(msqid, IPC_STAT, &queue_status_buf) == -1) {perror("msgctl(IPC_STAT) 失败");exit(1);}printf("父进程:检查队列状态,当前队列中还有 %ld 条消息 (应为0)。\n", queue_status_buf.msg_qnum);// 演示 IPC_RMID: 删除队列printf("父进程:正在删除消息队列...\n");if (msgctl(msqid, IPC_RMID, NULL) == -1) {perror("msgctl(IPC_RMID) 失败");exit(1);}printf("父进程:消息队列已删除。\n");// 清理 ftok 用的文件unlink(KEY_PATH); }return 0;
}

