《Linux系统编程篇》System V信号量实现生产者与消费者问题(Linux 进程间通信(IPC))——基础篇(拓展思维)
文章目录
- 📚 **生产者-消费者问题**
- 🔑 **问题分析**
- 🛠️ **详细实现:生产者-消费者**
- **步骤 1:定义信号量和缓冲区**
- **步骤 2:创建信号量**
- **步骤 3:生产者进程**
- **步骤 4:消费者进程**
- **步骤 5:创建进程并启动**
- 🧑🔧 **完整代码示例**
- 🎯 **关键点总结**
接上节,我们来详细展开一下 生产者-消费者问题,并用 System V 信号量 来解决它。这个经典问题帮助我们理解如何在多个进程间同步和互斥地共享资源。
📚 生产者-消费者问题
生产者-消费者问题是多进程同步问题中的经典例子。问题的背景是:有两个进程,一个生产者(Producer)不断生产产品,另一个消费者(Consumer)不断消费产品。两者都需要共享一个有限的缓冲区。生产者往缓冲区写入数据,消费者从缓冲区读取数据。为了避免并发问题,我们需要同步生产者和消费者的访问。
具体的挑战是:
- 互斥:生产者和消费者在访问共享缓冲区时,不能同时操作。
- 同步:缓冲区不能超过最大容量,也不能为空。
🔑 问题分析
我们需要使用信号量来解决这些问题,具体来说,我们需要:
- 一个信号量来控制缓冲区的空位置数(空位信号量)。
- 一个信号量来控制缓冲区的已满位置数(已满信号量)。
- 一个互斥信号量来保证每次只有一个进程(生产者或消费者)可以访问缓冲区。
我们通过信号量来控制:
- 当缓冲区为空时,消费者应该等待。
- 当缓冲区已满时,生产者应该等待。
- 互斥信号量保证在访问共享缓冲区时,只有一个进程能够进入临界区。
🛠️ 详细实现:生产者-消费者
步骤 1:定义信号量和缓冲区
我们将使用以下信号量:
empty
:缓冲区中空位的数量,初始值为BUFFER_SIZE
。full
:缓冲区中已满的数量,初始值为0
。mutex
:互斥锁,用来确保每次只有一个进程能够访问缓冲区,初始值为1
。
缓冲区本身可以用一个数组来表示:
#define BUFFER_SIZE 5 // 缓冲区大小
#define NUM_ITEMS 10 // 生产和消费的物品数量
int buffer[BUFFER_SIZE]; // 缓冲区
int in = 0; // 指向下一个要写入的位置
int out = 0; // 指向下一个要读取的位置
步骤 2:创建信号量
我们通过 semget()
创建信号量集:
int sem_id = semget(IPC_PRIVATE, 3, IPC_CREAT | 0666); // 创建3个信号量
if (sem_id == -1) {
perror("semget");
exit(1);
}
// 初始化信号量
semctl(sem_id, 0, SETVAL, BUFFER_SIZE); // empty 信号量:初始为缓冲区大小
semctl(sem_id, 1, SETVAL, 0); // full 信号量:初始为0,表示缓冲区没有物品
semctl(sem_id, 2, SETVAL, 1); // mutex 信号量:初始为1,表示可以访问缓冲区
步骤 3:生产者进程
生产者进程的工作流程如下:
- 等待空位信号量(
empty
):只有在有空位时才能生产。 - 获取互斥信号量(
mutex
):进入临界区,确保没有其他进程操作缓冲区。 - 生产:将数据放入缓冲区。
- 释放互斥信号量(
mutex
):退出临界区。 - 增加已满信号量(
full
):表明缓冲区中有一个新产品,消费者可以消费。
生产者代码示例:
void producer(int sem_id) {
for (int i = 0; i < NUM_ITEMS; i++) {
struct sembuf sops[2];
// P(empty)
sops[0].sem_num = 0;
sops[0].sem_op = -1;
sops[0].sem_flg = 0;
// P(mutex)
sops[1].sem_num = 2;
sops[1].sem_op = -1;
sops[1].sem_flg = 0;
semop(sem_id, sops, 2);
buffer[in] = i;
printf("生产者生产了产品 %d\n", i);
in = (in + 1) % BUFFER_SIZE;
// V(mutex) 和 V(full)
struct sembuf sops_release[2];
sops_release[0].sem_num = 2; // mutex
sops_release[0].sem_op = 1;
sops_release[0].sem_flg = 0;
sops_release[1].sem_num = 1; // full
sops_release[1].sem_op = 1;
sops_release[1].sem_flg = 0;
semop(sem_id, sops_release, 2);
sleep(1);
}
}
步骤 4:消费者进程
消费者进程的工作流程如下:
- 等待已满信号量(
full
):只有在缓冲区有物品时才能消费。 - 获取互斥信号量(
mutex
):进入临界区,确保没有其他进程操作缓冲区。 - 消费:从缓冲区中取出数据。
- 释放互斥信号量(
mutex
):退出临界区。 - 增加空位信号量(
empty
):表明缓冲区有一个空位,生产者可以生产。
消费者代码示例:
void consumer(int sem_id) {
for (int i = 0; i < NUM_ITEMS; i++) {
struct sembuf sops[2];
// P(full)
sops[0].sem_num = 1;
sops[0].sem_op = -1;
sops[0].sem_flg = 0;
// P(mutex)
sops[1].sem_num = 2;
sops[1].sem_op = -1;
sops[1].sem_flg = 0;
semop(sem_id, sops, 2);
int item = buffer[out];
printf("消费者消费了产品 %d\n", item);
out = (out + 1) % BUFFER_SIZE;
// V(mutex) 和 V(empty)
struct sembuf sops_release[2];
sops_release[0].sem_num = 2; // mutex
sops_release[0].sem_op = 1;
sops_release[0].sem_flg = 0;
sops_release[1].sem_num = 0; // empty
sops_release[1].sem_op = 1;
sops_release[1].sem_flg = 0;
semop(sem_id, sops_release, 2);
sleep(1);
}
}
步骤 5:创建进程并启动
在 main()
函数中,我们创建了一个子进程,用于运行消费者进程。父进程将作为生产者运行。
int main() {
int sem_id = semget(IPC_PRIVATE, 3, IPC_CREAT | 0666);
if (sem_id == -1) {
perror("semget");
exit(1);
}
semctl(sem_id, 0, SETVAL, BUFFER_SIZE); // empty
semctl(sem_id, 1, SETVAL, 0); // full
semctl(sem_id, 2, SETVAL, 1); // mutex
pid_t pid = fork();
if (pid == 0) {
consumer(sem_id);
} else if (pid > 0) {
producer(sem_id);
wait(NULL);
semctl(sem_id, 0, IPC_RMID);
} else {
perror("fork failed");
exit(1);
}
return 0;
}
在这段代码中:
fork()
:通过fork()
创建一个新的子进程。父进程作为生产者执行producer()
函数,子进程作为消费者执行consumer()
函数。- 父进程和子进程分工:生产者不断生产物品放入缓冲区,消费者从缓冲区取出物品进行消费。
wait(NULL)
:父进程使用wait()
来等待子进程的结束,这样可以确保父进程在子进程完成后再退出,避免资源的提前释放。- 删除信号量集:为了避免信号量集泄露,程序结束时通过
semctl()
删除创建的信号量集。
🧑🔧 完整代码示例
这里是完整的代码,包含了生产者和消费者进程的实现,以及使用 System V 信号量同步和互斥访问共享缓冲区。
#include <stdio.h>
#include <stdlib.h>
#include <sys/sem.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#define BUFFER_SIZE 5
#define NUM_ITEMS 10
int buffer[BUFFER_SIZE];
int in = 0;
int out = 0;
void producer(int sem_id) {
for (int i = 0; i < NUM_ITEMS; i++) {
struct sembuf sops[2];
// P(empty)
sops[0].sem_num = 0;
sops[0].sem_op = -1;
sops[0].sem_flg = 0;
// P(mutex)
sops[1].sem_num = 2;
sops[1].sem_op = -1;
sops[1].sem_flg = 0;
semop(sem_id, sops, 2);
buffer[in] = i;
printf("生产者生产了产品 %d\n", i);
in = (in + 1) % BUFFER_SIZE;
// V(mutex) 和 V(full)
struct sembuf sops_release[2];
sops_release[0].sem_num = 2; // mutex
sops_release[0].sem_op = 1;
sops_release[0].sem_flg = 0;
sops_release[1].sem_num = 1; // full
sops_release[1].sem_op = 1;
sops_release[1].sem_flg = 0;
semop(sem_id, sops_release, 2);
sleep(1);
}
}
void consumer(int sem_id) {
for (int i = 0; i < NUM_ITEMS; i++) {
struct sembuf sops[2];
// P(full)
sops[0].sem_num = 1;
sops[0].sem_op = -1;
sops[0].sem_flg = 0;
// P(mutex)
sops[1].sem_num = 2;
sops[1].sem_op = -1;
sops[1].sem_flg = 0;
semop(sem_id, sops, 2);
int item = buffer[out];
printf("消费者消费了产品 %d\n", item);
out = (out + 1) % BUFFER_SIZE;
// V(mutex) 和 V(empty)
struct sembuf sops_release[2];
sops_release[0].sem_num = 2; // mutex
sops_release[0].sem_op = 1;
sops_release[0].sem_flg = 0;
sops_release[1].sem_num = 0; // empty
sops_release[1].sem_op = 1;
sops_release[1].sem_flg = 0;
semop(sem_id, sops_release, 2);
sleep(1);
}
}
int main() {
int sem_id = semget(IPC_PRIVATE, 3, IPC_CREAT | 0666);
if (sem_id == -1) {
perror("semget");
exit(1);
}
semctl(sem_id, 0, SETVAL, BUFFER_SIZE); // empty
semctl(sem_id, 1, SETVAL, 0); // full
semctl(sem_id, 2, SETVAL, 1); // mutex
pid_t pid = fork();
if (pid == 0) {
consumer(sem_id);
} else if (pid > 0) {
producer(sem_id);
wait(NULL);
semctl(sem_id, 0, IPC_RMID);
} else {
perror("fork failed");
exit(1);
}
return 0;
}
🎯 关键点总结
- 信号量的使用:通过
empty
、full
和mutex
信号量实现生产者和消费者的同步与互斥。 semop()
调用:每次生产者或消费者对共享资源进行操作时,都需要通过semop()
来执行信号量操作,确保数据的正确访问顺序。P()
和V()
操作:通过P()
操作来阻塞等待资源,V()
操作来释放资源,确保进程按预期顺序执行。
通过这个实例,你可以更加深入地理解如何使用 System V 信号量 来解决实际的同步和互斥问题。在实际应用中,生产者消费者模式广泛应用于操作系统调度、缓冲区管理等场景。