当前位置: 首页 > news >正文

Linux多线程[生产者消费者模型]

01. 信号量

信号量一种更通用的同步机制,由一个计数器控制,用于管理对多个资源的访问或实现线程间的基本信号传递。互斥锁可以看作是一种特殊的信号量(计数值为1的信号量)。可以将其比喻为一个公共自行车亭,里面有N辆自行车(资源)。信号量计数器初始值为N。借车时P操作获取一个资源(计数减1,如果为0则等待),还车时V操作释放一个资源(计数加1,并唤醒等待者)。


1.1 mutex&condtion&senaphore联系

在这里插入图片描述

  • 互斥锁:解决“独占访问”问题。“你忙你的,我等你”
  • 条件变量:解决“条件等待”问题。“条件没好,我先睡,好了叫我”。它是互斥锁的黄金搭档。
  • 信号量:解决“资源计数”或“信号传递”问题。“还有几个资源可用?”。它更通用,但正确使用也更复杂。

当你在选择时可以考虑以下情节:

  • 只想防止多个线程同时进入一段代码 -> 互斥锁
  • 想等待某个条件成立(如队列非空) -> 互斥锁 + 条件变量
  • 想控制对N个完全相同资源的访问 -> 信号量

1.2 信号量流程初识

控制资源访问时序图:
在这里插入图片描述


代码示例:

#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h>sem_t semaphore;void* threadRoute(void* arg) {int id = *(int*)arg;printf("Thread %d waiting for resource...\n", id);sem_wait(&semaphore); // 获取信号量printf("Thread %d acquired resource. Working...\n", id);sleep(2); // 模拟工作printf("Thread %d releasing resource.\n", id);sem_post(&semaphore); // 释放信号量return NULL;
}int main() {pthread_t threads[5];int ids[2];// 初始化信号量,允许2个线程同时访问sem_init(&semaphore, 0, 2);for (int i = 0; i < 5; i++) {ids[i] = i;pthread_create(&threads[i], NULL, threadRoute, &ids[i]);}for (int i = 0; i < 2; i++) {pthread_join(threads[i], NULL);}sem_destroy(&semaphore);return 0;
}

02. 生产者消费者模型

2.1 什么是生产者消费者模型?

生产者消费者模型是一种经典的多线程同步与协作模型,它描述了两种不同类型的线程(或进程)通过一个共享的缓冲区(或队列)进行协作的方式。
想象一个超市的场景:

  • 生产者就像商品供应商,不断生产商品并送到超市货架上
  • 消费者就像购物者,从货架上取走商品进行消费
  • 缓冲区就像超市的货架,暂时存放商品,平衡供应和需求的速度差异

其实之前在 Linux 进程间通信 【管道通信】 中学习到的 管道 本质上就是一个天然的 「生产者消费者模型」,因为它允许多个进程同时访问,并且不会出现问题,意味着它维护好了 「互斥、同步」 关系;当写端写满管道时,无法再写,通知读端进行读取;当管道为空时,无法读取,通知写端写入数据。

在这里插入图片描述


2.2 生产者消费者模型的特点

  • 「生产者消费者模型」符合321原则
    3 种关系:
    • 生产者与生产者:互斥
    • 互斥消费者与消费者:互斥
    • 生产者与消费者:互斥与同步
  • 2种角色:
    • 生产者
    • 消费者
  • 1个交易场所:
    • 通常是一个特定的缓冲区(阻塞队列、环形队列)

2.3 生产者消费者模型的优点

  1. 高效性:依托 “交易场所”(如缓冲区),生产者仅需关注场所是否有空位,消费者仅需关注场所是否有就绪数据,双方无需相互关注状态,可独立高效操作;且能按策略调整协同关系,适配供需平衡。
  2. 灵活性:可依据实际供需关系灵活调整策略,应对 “忙闲不均” 的场景,优化资源利用。
  3. 低耦合易维护扩展:明确划分生产者、消费者、交易场所三大角色,各角色各司其职,可按需自由设计,降低组件间依赖,便于后续维护与功能扩展。

03. 基于环形队列实现生产者消费者模型

3.1 环形队列

生产者消费者模型」 中的交易场所是可更换的,还可以使用 环形队列,所谓的环形队列并非队列,而是用数组模拟实现的“队列”,并且它的判空判满比较特殊。环形队列为空时,生产者需要先生产数据,消费者阻塞。

在这里插入图片描述


无论是生产者还是消费者,只有申请到自己的信号量资源后,才进行生产/消费
在这里插入图片描述

上图中的pro_sem就表示 生产者还可以进行 3 次生产,con_sem表示消费者还可以消费5次。


生产者、消费者对于 「信号量」 的申请可以这样理解。

//生产者
void Producer(){// 申请信号量(空位-1)sem wait(&pro sem);//生产商品// 释放信号量(商品 + 1)sem post(&con sem);
}   
// 消费者
void Consumer(){// 申请信号量(商品-1)sem wait(&con sem);// 消费商品// 释放信号量(空位+1)sem post(&pro sem);
}

3.2 多生产多消费模型

环形队列可以放Task任务(即自定义的一个task类,里面封装了一些动作),我们可以把的Task.hpp引入即可。


3.2.1 task.hpp
#pragma once#include <iostream>
#include <string>
#include <cstdio>
#include <functional>class Task
{typedef std::function<int(int, int, char)> func_t;public:Task() {}Task(int x, int y, char op, func_t func) : _x(x), _y(y), _op(op), _callback(func) {}std::string operator()() // 仿函数{int result = _callback(_x, _y, _op);char buffer[1024];snprintf(buffer, sizeof buffer, "%d %c %d = %d", _x, _op, _y, result);return buffer;}std::string toTaskString(){char buffer[1024];snprintf(buffer, sizeof buffer, "%d %c %d =?", _x, _op, _y);return buffer;}~Task() {}private:int _x;int _y;char _op;func_t _callback;
};const std::string oper = "+-*/%";int mymath(int x, int y, char op)
{int result = 0;switch (op){case '+':result = x + y;break;case '-':result = x - y;break;case '*':result = x * y;break;case '/':if (y == 0){std::cerr << "div zero error!" << std::endl;result = -1;}elseresult = x / y;break;case '%':{if (y == 0){std::cerr << "mod zero error!" << std::endl;result = -1;}elseresult = x % y;}break;default:// do nothingbreak;return result;}
}

3.2.2main.cc
#include "RingQueue.hpp"
#include "task.hpp"
#include <iostream>
#include <pthread.h>
#include <time.h>
#include <cstdlib>
#include <sys/types.h>
#include <unistd.h>// 添加互斥锁
pthread_mutex_t output_mutex = PTHREAD_MUTEX_INITIALIZER;
std::string SelfName()
{char name[128];snprintf(name, sizeof name, "thread[0x%x]", pthread_self());return name;
}
void *ProductorRoute(void *rq)
{// RingQueue<int> *ringqueue = static_cast<RingQueue<int> *>(rq); // 类型转换  void* -> RingQueue<int> *RingQueue<Task> *ringqueue = static_cast<RingQueue<Task> *>(rq);while (true){// version 1// sleep(2);// int data = rand() % 10 + 1;// ringqueue->push(data);// std::cout << "生产完成,产生一个数据: " << data << std::endl;// version 2  构建or获取任务// const std::string oper = "+-*%";int x = rand() % 100;int y = rand() % 200;char op = oper[rand() % oper.size()];Task t(x, y, op, mymath);// 生产任务ringqueue->push(t);// 输出提示// 使用互斥锁保护输出pthread_mutex_lock(&output_mutex);std::cout << SelfName() << "生产者派发了一个任务: " << t.toTaskString() << std::endl;pthread_mutex_unlock(&output_mutex);// sleep(1);}
}
void *ConsumerRoute(void *rq)
{// RingQueue<int> *ringqueue = static_cast<RingQueue<int> *>(rq);RingQueue<Task> *ringqueue = static_cast<RingQueue<Task> *>(rq);while (true){// version 1// int data;// ringqueue->pop(&data);// std::cout << "消费完成,消费一个数据: " << data << std::endl;// sleep(1); // sleep位置会影响生产和消费的顺序// version 2Task t; // 将pop的任务放到这个里面// 消费任务ringqueue->pop(&t);std::string result = t(); // 防函数。。。std::string operator() (), 类似于t.operator()// 使用互斥锁保护输出pthread_mutex_lock(&output_mutex);std::cout << SelfName() << "消费者消费了一个任务: " << result << std::endl;pthread_mutex_unlock(&output_mutex);}
}
int main()
{// 3中关系,2哥角色,一个交易场所(模拟的队列)// 1.创建线程srand((unsigned int)time(nullptr) & getpid() ^ pthread_self() ^ 0x89755); // 初始化种子// RingQueue<int> *rq = new RingQueue<int>();RingQueue<Task> *rq = new RingQueue<Task>(); // 闯入一个struct,即一个任务pthread_t p[4], c[8];for (int i = 0; i < 4; i++)pthread_create(p + i, nullptr, ProductorRoute, rq); // 将rq传递给子线程就可以看到同一个队列了,并进行操作for (int i = 0; i < 8; i++)pthread_create(c + i, nullptr, ConsumerRoute, rq);// 回收资源for (int i = 0; i < 4; i++)pthread_join(p[i], nullptr);for (int i = 0; i < 8; i++)pthread_join(c[i], nullptr);delete rq;return 0;
}

3.2.3RingQueue.hpp
#pragma once#include <iostream>
#include <vector>
#include <semaphore.h>
#include <cassert>
#include <unistd.h>
#include <pthread.h>static const int gcap = 5;
template <class T>
class RingQueue
{
private: // 封装void P(sem_t &sem){int n = sem_wait(&sem);assert(n == 0); // if  。cc中也可以改成这样,防止未被使用(void)n;}void V(sem_t &sem){int n = sem_post(&sem);assert(n == 0);(void)n;}public:RingQueue(const int &cap = gcap) : _queue(cap), _cap(cap){// 初始化信号量int n = sem_init(&_spaceSem, 0, _cap); // 表示初始值为 _capassert(n == 0);n = sem_init(&_dataSem, 0, 0);assert(n == 0);_productorStep = _consumerStep = 0;pthread_mutex_init(&_pmutex, nullptr);pthread_mutex_init(&_cmutex, nullptr);}void push(const T &in){                                 // 投递数据P(_spaceSem);                 // 先抢票。。。 申请空间信号量,有构造函数中,即_cap--pthread_mutex_lock(&_pmutex); // 一个一个进入临界区_queue[_productorStep++] = in;_productorStep %= _cap; // criclepthread_mutex_unlock(&_pmutex);V(_dataSem);}void pop(T *out){                // 取出数据P(_dataSem); // 初始值为0消费者进程会被阻塞!!!pthread_mutex_lock(&_cmutex);*out = _queue[_consumerStep++];_consumerStep %= _cap;pthread_mutex_unlock(&_cmutex);V(_spaceSem);}~RingQueue(){sem_destroy(&_spaceSem);sem_destroy(&_dataSem);pthread_mutex_destroy(&_pmutex);pthread_mutex_destroy(&_cmutex);}private:std::vector<T> _queue;int _cap;// 信号量 sem_t 本质上是一个用于同步的计数器,即 P+1,V-1sem_t _spaceSem;    // 生产者 看中空间资源sem_t _dataSem;     // 消费者  看中数据资源int _productorStep; // 生产者的步伐(下标)int _consumerStep;  // 消费者的步伐(下标)pthread_mutex_t _pmutex;pthread_mutex_t _cmutex;
};

3.2.4 结果输出

在这里插入图片描述

三者联系与区别:

特性互斥锁 (Mutex)条件变量 (CondVar)信号量 (Semaphore)
主要目的互斥访问共享资源等待特定条件成立控制对多个资源的访问或发信号
状态锁定/未锁定无自身状态,依赖外部条件有一个整型计数器
配合使用通常单独使用必须与互斥锁一起使用可以单独使用
释放机制由锁持有者释放通过wait自动释放关联互斥锁,唤醒时重新获取可由不同线程释放


http://www.dtcms.com/a/350491.html

相关文章:

  • python项目中pyproject.toml是做什么用的
  • 【Canvas与标牌】维兰德汤谷公司logo
  • Hadoop MapReduce Task 设计源码分析
  • java-代码随想录第十七天| 700.二叉搜索树中的搜索、617.合并二叉树、98.验证二叉搜索树
  • C++ STL 专家容器:关联式、哈希与适配器
  • 《微服务架构下API网关流量控制Bug复盘:从熔断失效到全链路防护》
  • 精准测试的密码:解密等价类划分,让Bug无处可逃
  • 【C语言16天强化训练】从基础入门到进阶:Day 11
  • 朴素贝叶斯算法总结
  • 互联网大厂Java面试实录:Spring Boot与微服务架构解析
  • cmd命令行删除文件夹
  • rk3566编译squashfs报错解决
  • QT5封装的日志记录函数
  • 算法练习-遍历对角线
  • 开源夜莺里如何引用标签和注解变量
  • VTK开发笔记(四):示例Cone,创建圆锥体,在Qt窗口中详解复现对应的Demo
  • 使用Cloudflare的AI Gateway代理Google AI Studio
  • 论文阅读:Code as Policies: Language Model Programs for Embodied Control
  • Redis的单线程和多线程
  • Linux_用 `ps` 按进程名过滤线程,以及用 `pkill` 按进程名安全杀进程
  • 记一次RocketMQ消息堆积
  • (二十二)深入了解AVFoundation-编辑:视频变速功能-实战在Demo中实现视频变速
  • 数字人视频创作革命!开源免费无时限InfiniteTalk ,数字人图片 + 音频一键生成无限长视频
  • ADC-工业信号采集卡-K004规格书
  • 智能电视MaxHub恢复系统
  • 【第十章】Python 文件操作深度解析:从底层逻辑到多场景实战​
  • Flink 滑动窗口实战:从 KeyedProcessFunction 到 AggregateFunction WindowFunction 的完整旅程
  • vi/vim 查找字符串
  • h5和微信小程序查看pdf文件
  • 实验1 第一个微信小程序