当前位置: 首页 > news >正文

【操作系统(Linux)】——生产者消费者同步互斥模型

✅ 一、程序功能概述

我们将做的:实现一个经典的「生产者-消费者问题」多线程同步模型的案例,主要用到 循环缓冲区 + POSIX 信号量 sem_t + pthread 多线程库,非常适合理解并发控制、线程通信和缓冲区管理。

案例目标:通过多个生产者线程不断往共享缓冲区中写入数据,多个消费者线程从中读取数据,所有线程对缓冲区的访问必须遵循互斥和同步机制,确保数据完整性与一致性。

案例代码:

#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <pthread.h>
#include <unistd.h>
#include <signal.h>
#include <semaphore.h>

#define Maxbuf 10
#define TimesOfOp 10
#define historynum 100

struct Circlebuf {
    int read;
    int write;
    int buf[Maxbuf];
} circlebuf;

sem_t mutex;        // 互斥信号量
sem_t empty;        // 空槽位信号量
sem_t full;         // 满槽位信号量

char writehistory[historynum][30];
char readhistory[historynum][30];
char history[historynum][30];
int writehistorycount = 0;
int readhistorycount = 0;
int historycount = 0;

void writeCirclebuf(struct Circlebuf *cb, int *value) {
    cb->buf[cb->write] = *value;
    sleep(1); // 模拟耗时写入
    cb->write = (cb->write + 1) % Maxbuf;
}

int readCirclebuf(struct Circlebuf *cb) {
    int value = cb->buf[cb->read];
    sleep(1); // 模拟耗时读取
    cb->buf[cb->read] = 0;
    cb->read = (cb->read + 1) % Maxbuf;
    return value;
}

void sigend(int sig) {
    exit(0);
}

void *productThread(void *arg) {
    int id = *(int *)arg;
    int t = TimesOfOp;
    int writeptr;

    while (t--) {
        sem_wait(&empty);
        sem_wait(&mutex);

        writeCirclebuf(&circlebuf, &id);
        writeptr = (circlebuf.write - 1 + Maxbuf) % Maxbuf;

        sprintf(writehistory[writehistorycount++], "生产者%d:缓冲区%d=%d", id, writeptr, id);
        sprintf(history[historycount++], "生产者%d:缓冲区%d=%d\n", id, writeptr, id);

        sem_post(&mutex);
        sem_post(&full);
        sleep(1);
    }
    return NULL;
}

void *consumerThread(void *arg) {
    int id = *(int *)arg;
    int t = TimesOfOp;
    int value, readptr;

    while (t--) {
        sem_wait(&full);
        sem_wait(&mutex);

        value = readCirclebuf(&circlebuf);
        readptr = (circlebuf.read - 1 + Maxbuf) % Maxbuf;

        sprintf(readhistory[readhistorycount++], "消费者%d:缓冲区%d=%d", id, readptr, value);
        sprintf(history[historycount++], "消费者%d:缓冲区%d=%d\n", id, readptr, value);

        sem_post(&mutex);
        sem_post(&empty);
        sleep(1);
    }
    return NULL;
}

int main() {
    int ProdNum = 0, ConsNum = 0;

    sem_init(&mutex, 0, 1);
    sem_init(&empty, 0, Maxbuf);
    sem_init(&full, 0, 0);

    signal(SIGINT, sigend);
    signal(SIGTERM, sigend);

    circlebuf.read = circlebuf.write = 0;
    for (int i = 0; i < Maxbuf; i++) circlebuf.buf[i] = 0;

    printf("请输入生产者线程的数目: ");
    scanf("%d", &ProdNum);
    printf("请输入消费者线程的数目: ");
    scanf("%d", &ConsNum);

    // 分配线程和 ID 数组
    pthread_t *proThreads = malloc(ProdNum * sizeof(pthread_t));
    pthread_t *conThreads = malloc(ConsNum * sizeof(pthread_t));
    int *proIDs = malloc(ProdNum * sizeof(int));
    int *conIDs = malloc(ConsNum * sizeof(int));

    // 创建消费者线程
    for (int i = 0; i < ConsNum; i++) {
        conIDs[i] = i + 1;
        if (pthread_create(&conThreads[i], NULL, consumerThread, &conIDs[i]) != 0) {
            perror("Create consumer thread error");
            exit(EXIT_FAILURE);
        }
    }

    // 创建生产者线程
    for (int i = 0; i < ProdNum; i++) {
        proIDs[i] = i + 1;
        if (pthread_create(&proThreads[i], NULL, productThread, &proIDs[i]) != 0) {
            perror("Create producer thread error");
            exit(EXIT_FAILURE);
        }
    }

    // 等待所有线程完成
    for (int i = 0; i < ProdNum; i++) pthread_join(proThreads[i], NULL);
    for (int i = 0; i < ConsNum; i++) pthread_join(conThreads[i], NULL);

    // 打印读写对比历史
    printf("\n========= 读写历史对比表 =========\n");
    int max = (writehistorycount > readhistorycount) ? writehistorycount : readhistorycount;
    for (int i = 0; i < max; i++) {
        printf("%-30s | %-30s\n",
            (i < writehistorycount) ? writehistory[i] : " ",
            (i < readhistorycount) ? readhistory[i] : " ");
    }

    // 打印操作历史记录
    printf("\n*************缓冲池的操作历史为:*************\n");
    for (int i = 0; i < historycount; i++) {
        printf("%s", history[i]);
    }

    // 资源清理
    sem_destroy(&mutex);
    sem_destroy(&empty);
    sem_destroy(&full);
    free(proThreads);
    free(conThreads);
    free(proIDs);
    free(conIDs);

    return 0;
}

下面将系统性地讲解该代码的结构、功能、关键技术点,理解每一块的作用与原理。


🔧 二、关键结构与全局变量

🔹 1. 缓冲区结构体

struct Circlebuf {
    int read;
    int write;
    int buf[Maxbuf];
} circlebuf;
  • buf[]:固定大小的缓冲区,大小为 Maxbuf(10)
  • read:读指针(消费者读取位置)
  • write:写指针(生产者写入位置)

构成一个循环缓冲队列,写满了从头开始,读完了也从头读。


🔹 2. 信号量定义

sem_t mutex; // 互斥访问缓冲区
sem_t empty; // 剩余可写槽位数(空位)
sem_t full;  // 可读槽位数(数据个数)

三个信号量分别控制:

  • mutex: 对缓冲区访问的互斥锁
  • empty: 控制写入前必须有空位
  • full: 控制读取前必须有数据

🔹 3. 记录写/读操作历史

char writehistory[historynum][30]; // 写入历史
char readhistory[historynum][30];  // 读取历史
char history[historynum][30];      // 所有历史记录

用于调试输出,记录每一次写入或读取的内容、缓冲区位置和线程编号。


🔄 三、核心操作函数

🔹 1. 写入缓冲区 writeCirclebuf

void writeCirclebuf(struct Circlebuf *circlebuf,int *value){
    circlebuf->buf[circlebuf->write] = (*value);
    sleep(1); // 模拟耗时
    circlebuf->write = (circlebuf->write + 1) % Maxbuf;
}

将值写入当前写指针处,并更新写指针位置(循环回绕)。


🔹 2. 读取缓冲区 readCirclebuf

int readCirclebuf(struct Circlebuf *circlebuf){
    int value = circlebuf->buf[circlebuf->read];
    sleep(1);
    circlebuf->buf[circlebuf->read] = 0;
    circlebuf->read = (circlebuf->read + 1) % Maxbuf;
    return value;
}

读取并清空当前位置的值,并更新读指针位置。


🔹 3. 生产者线程函数

void *productThread(void *i)

工作流程:

  1. sem_wait(&empty):缓冲区空位数 > 0 时继续
  2. sem_wait(&mutex):互斥锁保护缓冲区
  3. 执行写操作
  4. 保存历史记录
  5. sem_post(&mutex):释放互斥锁
  6. sem_post(&full):可读数量 +1
  7. 每次 sleep(1) 方便观察

🔹 4. 消费者线程函数

void *consumerThread(void *i)

工作流程与生产者相反:

  1. sem_wait(&full):缓冲区有数据可读
  2. sem_wait(&mutex):互斥锁保护缓冲区
  3. 执行读取操作
  4. 保存历史记录
  5. sem_post(&mutex):释放互斥锁
  6. sem_post(&empty):空位 +1
  7. sleep(1) 观察用

🧵 四、主函数 main

1️⃣ 初始化

sem_init(&mutex,0,1);
sem_init(&empty,0,Maxbuf);
sem_init(&full,0,0);

初始化三个信号量:

  • mutex = 1:用于互斥访问缓冲区
  • empty = Maxbuf:初始全是空的
  • full = 0:没有数据可读

设置 SIGINTSIGTERM 信号处理为 exit(0),用于 Ctrl+C 优雅退出。


2️⃣ 创建线程

根据输入创建多个生产者线程和消费者线程:

pthread_create(&proid[i], NULL, productThread, &pro[i-1]);

注意:你在创建线程时有 数组越界风险(见补充建议),见最后改进建议。


3️⃣ 输出对比

sleep((ConsNum+ProdNum)*10);

等待足够时间让所有线程完成操作。

之后输出:

  • 每个生产者写入什么
  • 每个消费者读取什么
  • 完整的缓冲区操作历史

✅ 五、关键同步机制总结

操作类型使用的信号量作用说明
写(生产者)sem_wait(empty)等待有空位写入
sem_wait(mutex)加互斥锁,保护缓冲区
sem_post(full)写完后增加可读数量
读(消费者)sem_wait(full)等待有数据可读
sem_wait(mutex)加互斥锁,保护缓冲区
sem_post(empty)读完后释放空位

⚠️ 六、代码可优化建议

  1. 线程数组越界问题:
pthread_create(&proid[i], ...)  // i 从 1 到 N

应改为:

for(i = 0; i < ProdNum; i++) {
    pro[i] = i + 1;
    pthread_create(&proid[i], ...);
}

否则 proid[i] 会越界访问未分配内存。


  1. 增加线程 join 回收资源

可加入:

for (i = 0; i < ProdNum; i++) pthread_join(proid[i], NULL);
for (i = 0; i < ConsNum; i++) pthread_join(conid[i], NULL);

  1. 可读性优化

输出用 printf("\033[1;32m...") 等 ANSI 转义码加颜色,便于区分线程。


✅ 七、运行效果示意(节选)

请输入生产者线程的数目 :2
请输入消费者线程的数目 :2

生产者1:缓冲区0=1 | 消费者1:缓冲区0=1
生产者2:缓冲区1=2 | 消费者2:缓冲区1=2
生产者1:缓冲区2=1 | 消费者1:缓冲区2=1
...

*************缓冲池的操作历史为:******************
生产者1:缓冲区0=1
消费者1:缓冲区0=1
...

🎯 总结

技术点说明
多线程并发使用 pthread_create 创建多个生产者与消费者
循环缓冲区通过 read/write 指针实现 FIFO 缓冲结构
同步机制使用信号量 empty/full/mutex 避免竞争
信号处理捕获 SIGINT/SIGTERM,安全退出
调试可视化使用 history 记录生产消费过程

相关文章:

  • 图解力扣回溯及剪枝问题的模板应用
  • ctfshow VIP题目限免 密码逻辑脆弱
  • 区间 dp 系列 题解
  • 《深入探秘:分布式软总线自发现、自组网技术原理》
  • 部署大模型不再难:DeepSeek + 腾讯云 HAI 实战教程
  • Java 列表初始化全解析:7种方式详解与最佳实践
  • SpringBoot和微服务学习记录Day2
  • python基础语法10-异常处理
  • TPS入门DAY03 服务器篇
  • 提示词工程
  • 牛客KY257 日期累加
  • 逆向工程的多层次解析:从实现到领域的全面视角
  • Spark核心知识总结
  • 2025年汽车加气站操作工证考试内容
  • html元素转图像之深入探索 html - to - image:功能、应用与实践
  • 探索原生JS的力量:自定义实现类似于React的useState功能
  • popupwindow拦截返回点击
  • 学习笔记083——Java Stream API
  • 第七天 开始Unity Shader的学习之Unity中的基础光照之高光反射光照模型
  • S7-1200 PLC热电偶和热电阻模拟量模块
  • 电商设备网站怎么做/东莞百度搜索优化
  • 南京做南京美容整形网站/一份完整的营销策划书
  • 网站建设金牛万达/站长工具ip查询
  • 网站怎样排版/google chrome官网
  • 江苏两学一做网站/东莞网络营销代运营
  • 索引网站有哪些/查询网站