【操作系统(Linux)】——生产者消费者同步互斥模型
✅ 一、程序功能概述
我们将做的:实现一个经典的「生产者-消费者问题」多线程同步模型的案例,主要用到 循环缓冲区 + POSIX 信号量 sem_t
+ pthread 多线程库,非常适合理解并发控制、线程通信和缓冲区管理。
案例目标:通过多个生产者线程不断往共享缓冲区中写入数据,多个消费者线程从中读取数据,所有线程对缓冲区的访问必须遵循互斥和同步机制,确保数据完整性与一致性。
案例代码:
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <pthread.h>
#include <unistd.h>
#include <signal.h>
#include <semaphore.h>
#define Maxbuf 10
#define TimesOfOp 10
#define historynum 100
struct Circlebuf {
int read;
int write;
int buf[Maxbuf];
} circlebuf;
sem_t mutex; // 互斥信号量
sem_t empty; // 空槽位信号量
sem_t full; // 满槽位信号量
char writehistory[historynum][30];
char readhistory[historynum][30];
char history[historynum][30];
int writehistorycount = 0;
int readhistorycount = 0;
int historycount = 0;
void writeCirclebuf(struct Circlebuf *cb, int *value) {
cb->buf[cb->write] = *value;
sleep(1); // 模拟耗时写入
cb->write = (cb->write + 1) % Maxbuf;
}
int readCirclebuf(struct Circlebuf *cb) {
int value = cb->buf[cb->read];
sleep(1); // 模拟耗时读取
cb->buf[cb->read] = 0;
cb->read = (cb->read + 1) % Maxbuf;
return value;
}
void sigend(int sig) {
exit(0);
}
void *productThread(void *arg) {
int id = *(int *)arg;
int t = TimesOfOp;
int writeptr;
while (t--) {
sem_wait(&empty);
sem_wait(&mutex);
writeCirclebuf(&circlebuf, &id);
writeptr = (circlebuf.write - 1 + Maxbuf) % Maxbuf;
sprintf(writehistory[writehistorycount++], "生产者%d:缓冲区%d=%d", id, writeptr, id);
sprintf(history[historycount++], "生产者%d:缓冲区%d=%d\n", id, writeptr, id);
sem_post(&mutex);
sem_post(&full);
sleep(1);
}
return NULL;
}
void *consumerThread(void *arg) {
int id = *(int *)arg;
int t = TimesOfOp;
int value, readptr;
while (t--) {
sem_wait(&full);
sem_wait(&mutex);
value = readCirclebuf(&circlebuf);
readptr = (circlebuf.read - 1 + Maxbuf) % Maxbuf;
sprintf(readhistory[readhistorycount++], "消费者%d:缓冲区%d=%d", id, readptr, value);
sprintf(history[historycount++], "消费者%d:缓冲区%d=%d\n", id, readptr, value);
sem_post(&mutex);
sem_post(&empty);
sleep(1);
}
return NULL;
}
int main() {
int ProdNum = 0, ConsNum = 0;
sem_init(&mutex, 0, 1);
sem_init(&empty, 0, Maxbuf);
sem_init(&full, 0, 0);
signal(SIGINT, sigend);
signal(SIGTERM, sigend);
circlebuf.read = circlebuf.write = 0;
for (int i = 0; i < Maxbuf; i++) circlebuf.buf[i] = 0;
printf("请输入生产者线程的数目: ");
scanf("%d", &ProdNum);
printf("请输入消费者线程的数目: ");
scanf("%d", &ConsNum);
// 分配线程和 ID 数组
pthread_t *proThreads = malloc(ProdNum * sizeof(pthread_t));
pthread_t *conThreads = malloc(ConsNum * sizeof(pthread_t));
int *proIDs = malloc(ProdNum * sizeof(int));
int *conIDs = malloc(ConsNum * sizeof(int));
// 创建消费者线程
for (int i = 0; i < ConsNum; i++) {
conIDs[i] = i + 1;
if (pthread_create(&conThreads[i], NULL, consumerThread, &conIDs[i]) != 0) {
perror("Create consumer thread error");
exit(EXIT_FAILURE);
}
}
// 创建生产者线程
for (int i = 0; i < ProdNum; i++) {
proIDs[i] = i + 1;
if (pthread_create(&proThreads[i], NULL, productThread, &proIDs[i]) != 0) {
perror("Create producer thread error");
exit(EXIT_FAILURE);
}
}
// 等待所有线程完成
for (int i = 0; i < ProdNum; i++) pthread_join(proThreads[i], NULL);
for (int i = 0; i < ConsNum; i++) pthread_join(conThreads[i], NULL);
// 打印读写对比历史
printf("\n========= 读写历史对比表 =========\n");
int max = (writehistorycount > readhistorycount) ? writehistorycount : readhistorycount;
for (int i = 0; i < max; i++) {
printf("%-30s | %-30s\n",
(i < writehistorycount) ? writehistory[i] : " ",
(i < readhistorycount) ? readhistory[i] : " ");
}
// 打印操作历史记录
printf("\n*************缓冲池的操作历史为:*************\n");
for (int i = 0; i < historycount; i++) {
printf("%s", history[i]);
}
// 资源清理
sem_destroy(&mutex);
sem_destroy(&empty);
sem_destroy(&full);
free(proThreads);
free(conThreads);
free(proIDs);
free(conIDs);
return 0;
}
下面将系统性地讲解该代码的结构、功能、关键技术点,理解每一块的作用与原理。
🔧 二、关键结构与全局变量
🔹 1. 缓冲区结构体
struct Circlebuf {
int read;
int write;
int buf[Maxbuf];
} circlebuf;
buf[]
:固定大小的缓冲区,大小为Maxbuf
(10)read
:读指针(消费者读取位置)write
:写指针(生产者写入位置)
构成一个循环缓冲队列,写满了从头开始,读完了也从头读。
🔹 2. 信号量定义
sem_t mutex; // 互斥访问缓冲区
sem_t empty; // 剩余可写槽位数(空位)
sem_t full; // 可读槽位数(数据个数)
三个信号量分别控制:
mutex
: 对缓冲区访问的互斥锁empty
: 控制写入前必须有空位full
: 控制读取前必须有数据
🔹 3. 记录写/读操作历史
char writehistory[historynum][30]; // 写入历史
char readhistory[historynum][30]; // 读取历史
char history[historynum][30]; // 所有历史记录
用于调试输出,记录每一次写入或读取的内容、缓冲区位置和线程编号。
🔄 三、核心操作函数
🔹 1. 写入缓冲区 writeCirclebuf
void writeCirclebuf(struct Circlebuf *circlebuf,int *value){
circlebuf->buf[circlebuf->write] = (*value);
sleep(1); // 模拟耗时
circlebuf->write = (circlebuf->write + 1) % Maxbuf;
}
将值写入当前写指针处,并更新写指针位置(循环回绕)。
🔹 2. 读取缓冲区 readCirclebuf
int readCirclebuf(struct Circlebuf *circlebuf){
int value = circlebuf->buf[circlebuf->read];
sleep(1);
circlebuf->buf[circlebuf->read] = 0;
circlebuf->read = (circlebuf->read + 1) % Maxbuf;
return value;
}
读取并清空当前位置的值,并更新读指针位置。
🔹 3. 生产者线程函数
void *productThread(void *i)
工作流程:
sem_wait(&empty)
:缓冲区空位数 > 0 时继续sem_wait(&mutex)
:互斥锁保护缓冲区- 执行写操作
- 保存历史记录
sem_post(&mutex)
:释放互斥锁sem_post(&full)
:可读数量 +1- 每次
sleep(1)
方便观察
🔹 4. 消费者线程函数
void *consumerThread(void *i)
工作流程与生产者相反:
sem_wait(&full)
:缓冲区有数据可读sem_wait(&mutex)
:互斥锁保护缓冲区- 执行读取操作
- 保存历史记录
sem_post(&mutex)
:释放互斥锁sem_post(&empty)
:空位 +1sleep(1)
观察用
🧵 四、主函数 main
1️⃣ 初始化
sem_init(&mutex,0,1);
sem_init(&empty,0,Maxbuf);
sem_init(&full,0,0);
初始化三个信号量:
mutex = 1
:用于互斥访问缓冲区empty = Maxbuf
:初始全是空的full = 0
:没有数据可读
设置 SIGINT
、SIGTERM
信号处理为 exit(0)
,用于 Ctrl+C 优雅退出。
2️⃣ 创建线程
根据输入创建多个生产者线程和消费者线程:
pthread_create(&proid[i], NULL, productThread, &pro[i-1]);
注意:你在创建线程时有 数组越界风险(见补充建议),见最后改进建议。
3️⃣ 输出对比
sleep((ConsNum+ProdNum)*10);
等待足够时间让所有线程完成操作。
之后输出:
- 每个生产者写入什么
- 每个消费者读取什么
- 完整的缓冲区操作历史
✅ 五、关键同步机制总结
操作类型 | 使用的信号量 | 作用说明 |
---|---|---|
写(生产者) | sem_wait(empty) | 等待有空位写入 |
写 | sem_wait(mutex) | 加互斥锁,保护缓冲区 |
写 | sem_post(full) | 写完后增加可读数量 |
读(消费者) | sem_wait(full) | 等待有数据可读 |
读 | sem_wait(mutex) | 加互斥锁,保护缓冲区 |
读 | sem_post(empty) | 读完后释放空位 |
⚠️ 六、代码可优化建议
- ✅ 线程数组越界问题:
pthread_create(&proid[i], ...) // i 从 1 到 N
应改为:
for(i = 0; i < ProdNum; i++) {
pro[i] = i + 1;
pthread_create(&proid[i], ...);
}
否则 proid[i]
会越界访问未分配内存。
- ✅ 增加线程 join 回收资源
可加入:
for (i = 0; i < ProdNum; i++) pthread_join(proid[i], NULL);
for (i = 0; i < ConsNum; i++) pthread_join(conid[i], NULL);
- ✅ 可读性优化
输出用 printf("\033[1;32m...")
等 ANSI 转义码加颜色,便于区分线程。
✅ 七、运行效果示意(节选)
请输入生产者线程的数目 :2
请输入消费者线程的数目 :2
生产者1:缓冲区0=1 | 消费者1:缓冲区0=1
生产者2:缓冲区1=2 | 消费者2:缓冲区1=2
生产者1:缓冲区2=1 | 消费者1:缓冲区2=1
...
*************缓冲池的操作历史为:******************
生产者1:缓冲区0=1
消费者1:缓冲区0=1
...
🎯 总结
技术点 | 说明 |
---|---|
多线程并发 | 使用 pthread_create 创建多个生产者与消费者 |
循环缓冲区 | 通过 read /write 指针实现 FIFO 缓冲结构 |
同步机制 | 使用信号量 empty/full/mutex 避免竞争 |
信号处理 | 捕获 SIGINT /SIGTERM ,安全退出 |
调试可视化 | 使用 history 记录生产消费过程 |