Linux多线程[生产者消费者模型]
01. 信号量
信号量一种更通用的同步机制,由一个计数器控制,用于管理对多个资源的访问或实现线程间的基本信号传递。互斥锁可以看作是一种特殊的信号量(计数值为1的信号量)。可以将其比喻为一个公共自行车亭,里面有N辆自行车(资源)。信号量计数器初始值为N。借车时P操作
获取一个资源(计数减1,如果为0则等待),还车时V操作
释放一个资源(计数加1,并唤醒等待者)。
1.1 mutex&condtion&senaphore联系
- 互斥锁:解决“独占访问”问题。“你忙你的,我等你”。
- 条件变量:解决“条件等待”问题。“条件没好,我先睡,好了叫我”。它是互斥锁的黄金搭档。
- 信号量:解决“资源计数”或“信号传递”问题。“还有几个资源可用?”。它更通用,但正确使用也更复杂。
当你在选择时可以考虑以下情节:
- 只想防止多个线程同时进入一段代码 -> 互斥锁。
- 想等待某个条件成立(如队列非空) -> 互斥锁 + 条件变量。
- 想控制对N个完全相同资源的访问 -> 信号量。
1.2 信号量流程初识
控制资源访问时序图:
代码示例:
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h>sem_t semaphore;void* threadRoute(void* arg) {int id = *(int*)arg;printf("Thread %d waiting for resource...\n", id);sem_wait(&semaphore); // 获取信号量printf("Thread %d acquired resource. Working...\n", id);sleep(2); // 模拟工作printf("Thread %d releasing resource.\n", id);sem_post(&semaphore); // 释放信号量return NULL;
}int main() {pthread_t threads[5];int ids[2];// 初始化信号量,允许2个线程同时访问sem_init(&semaphore, 0, 2);for (int i = 0; i < 5; i++) {ids[i] = i;pthread_create(&threads[i], NULL, threadRoute, &ids[i]);}for (int i = 0; i < 2; i++) {pthread_join(threads[i], NULL);}sem_destroy(&semaphore);return 0;
}
02. 生产者消费者模型
2.1 什么是生产者消费者模型?
生产者消费者模型是一种经典的多线程同步与协作模型,它描述了两种不同类型的线程(或进程)通过一个共享的缓冲区(或队列)进行协作的方式。
想象一个超市的场景:
- 生产者就像商品供应商,不断生产商品并送到超市货架上
- 消费者就像购物者,从货架上取走商品进行消费
- 缓冲区就像超市的货架,暂时存放商品,平衡供应和需求的速度差异
其实之前在 Linux 进程间通信 【管道通信】 中学习到的 管道 本质上就是一个天然的 「生产者消费者模型」,因为它允许多个进程同时访问,并且不会出现问题,意味着它维护好了 「互斥、同步」 关系;当写端写满管道时,无法再写,通知读端进行读取;当管道为空时,无法读取,通知写端写入数据。
2.2 生产者消费者模型的特点
- 「生产者消费者模型」符合
321
原则
3 种关系:- 生产者与生产者:互斥
- 互斥消费者与消费者:互斥
- 生产者与消费者:互斥与同步
- 2种角色:
- 生产者
- 消费者
- 1个交易场所:
- 通常是一个特定的缓冲区(阻塞队列、环形队列)
2.3 生产者消费者模型的优点
- 高效性:依托 “交易场所”(如缓冲区),生产者仅需关注场所是否有空位,消费者仅需关注场所是否有就绪数据,双方无需相互关注状态,可独立高效操作;且能按策略调整协同关系,适配供需平衡。
- 灵活性:可依据实际供需关系灵活调整策略,应对 “忙闲不均” 的场景,优化资源利用。
- 低耦合易维护扩展:明确划分生产者、消费者、交易场所三大角色,各角色各司其职,可按需自由设计,降低组件间依赖,便于后续维护与功能扩展。
03. 基于环形队列实现生产者消费者模型
3.1 环形队列
「生产者消费者模型」 中的交易场所是可更换的,还可以使用 环形队列,所谓的环形队列并非队列,而是用数组模拟实现的“队列”,并且它的判空、判满比较特殊。环形队列为空时,生产者需要先生产数据,消费者阻塞。
无论是生产者还是消费者,只有申请到自己的信号量资源后,才进行生产/消费。
上图中的pro_sem
就表示 生产者还可以进行 3 次生产,con_sem
表示消费者还可以消费5
次。
生产者、消费者对于 「信号量」 的申请可以这样理解。
//生产者
void Producer(){// 申请信号量(空位-1)sem wait(&pro sem);//生产商品// 释放信号量(商品 + 1)sem post(&con sem);
}
// 消费者
void Consumer(){// 申请信号量(商品-1)sem wait(&con sem);// 消费商品// 释放信号量(空位+1)sem post(&pro sem);
}
3.2 多生产多消费模型
环形队列可以放Task
任务(即自定义的一个task类,里面封装了一些动作),我们可以把的Task.hpp
引入即可。
3.2.1 task.hpp
#pragma once#include <iostream>
#include <string>
#include <cstdio>
#include <functional>class Task
{typedef std::function<int(int, int, char)> func_t;public:Task() {}Task(int x, int y, char op, func_t func) : _x(x), _y(y), _op(op), _callback(func) {}std::string operator()() // 仿函数{int result = _callback(_x, _y, _op);char buffer[1024];snprintf(buffer, sizeof buffer, "%d %c %d = %d", _x, _op, _y, result);return buffer;}std::string toTaskString(){char buffer[1024];snprintf(buffer, sizeof buffer, "%d %c %d =?", _x, _op, _y);return buffer;}~Task() {}private:int _x;int _y;char _op;func_t _callback;
};const std::string oper = "+-*/%";int mymath(int x, int y, char op)
{int result = 0;switch (op){case '+':result = x + y;break;case '-':result = x - y;break;case '*':result = x * y;break;case '/':if (y == 0){std::cerr << "div zero error!" << std::endl;result = -1;}elseresult = x / y;break;case '%':{if (y == 0){std::cerr << "mod zero error!" << std::endl;result = -1;}elseresult = x % y;}break;default:// do nothingbreak;return result;}
}
3.2.2main.cc
#include "RingQueue.hpp"
#include "task.hpp"
#include <iostream>
#include <pthread.h>
#include <time.h>
#include <cstdlib>
#include <sys/types.h>
#include <unistd.h>// 添加互斥锁
pthread_mutex_t output_mutex = PTHREAD_MUTEX_INITIALIZER;
std::string SelfName()
{char name[128];snprintf(name, sizeof name, "thread[0x%x]", pthread_self());return name;
}
void *ProductorRoute(void *rq)
{// RingQueue<int> *ringqueue = static_cast<RingQueue<int> *>(rq); // 类型转换 void* -> RingQueue<int> *RingQueue<Task> *ringqueue = static_cast<RingQueue<Task> *>(rq);while (true){// version 1// sleep(2);// int data = rand() % 10 + 1;// ringqueue->push(data);// std::cout << "生产完成,产生一个数据: " << data << std::endl;// version 2 构建or获取任务// const std::string oper = "+-*%";int x = rand() % 100;int y = rand() % 200;char op = oper[rand() % oper.size()];Task t(x, y, op, mymath);// 生产任务ringqueue->push(t);// 输出提示// 使用互斥锁保护输出pthread_mutex_lock(&output_mutex);std::cout << SelfName() << "生产者派发了一个任务: " << t.toTaskString() << std::endl;pthread_mutex_unlock(&output_mutex);// sleep(1);}
}
void *ConsumerRoute(void *rq)
{// RingQueue<int> *ringqueue = static_cast<RingQueue<int> *>(rq);RingQueue<Task> *ringqueue = static_cast<RingQueue<Task> *>(rq);while (true){// version 1// int data;// ringqueue->pop(&data);// std::cout << "消费完成,消费一个数据: " << data << std::endl;// sleep(1); // sleep位置会影响生产和消费的顺序// version 2Task t; // 将pop的任务放到这个里面// 消费任务ringqueue->pop(&t);std::string result = t(); // 防函数。。。std::string operator() (), 类似于t.operator()// 使用互斥锁保护输出pthread_mutex_lock(&output_mutex);std::cout << SelfName() << "消费者消费了一个任务: " << result << std::endl;pthread_mutex_unlock(&output_mutex);}
}
int main()
{// 3中关系,2哥角色,一个交易场所(模拟的队列)// 1.创建线程srand((unsigned int)time(nullptr) & getpid() ^ pthread_self() ^ 0x89755); // 初始化种子// RingQueue<int> *rq = new RingQueue<int>();RingQueue<Task> *rq = new RingQueue<Task>(); // 闯入一个struct,即一个任务pthread_t p[4], c[8];for (int i = 0; i < 4; i++)pthread_create(p + i, nullptr, ProductorRoute, rq); // 将rq传递给子线程就可以看到同一个队列了,并进行操作for (int i = 0; i < 8; i++)pthread_create(c + i, nullptr, ConsumerRoute, rq);// 回收资源for (int i = 0; i < 4; i++)pthread_join(p[i], nullptr);for (int i = 0; i < 8; i++)pthread_join(c[i], nullptr);delete rq;return 0;
}
3.2.3RingQueue.hpp
#pragma once#include <iostream>
#include <vector>
#include <semaphore.h>
#include <cassert>
#include <unistd.h>
#include <pthread.h>static const int gcap = 5;
template <class T>
class RingQueue
{
private: // 封装void P(sem_t &sem){int n = sem_wait(&sem);assert(n == 0); // if 。cc中也可以改成这样,防止未被使用(void)n;}void V(sem_t &sem){int n = sem_post(&sem);assert(n == 0);(void)n;}public:RingQueue(const int &cap = gcap) : _queue(cap), _cap(cap){// 初始化信号量int n = sem_init(&_spaceSem, 0, _cap); // 表示初始值为 _capassert(n == 0);n = sem_init(&_dataSem, 0, 0);assert(n == 0);_productorStep = _consumerStep = 0;pthread_mutex_init(&_pmutex, nullptr);pthread_mutex_init(&_cmutex, nullptr);}void push(const T &in){ // 投递数据P(_spaceSem); // 先抢票。。。 申请空间信号量,有构造函数中,即_cap--pthread_mutex_lock(&_pmutex); // 一个一个进入临界区_queue[_productorStep++] = in;_productorStep %= _cap; // criclepthread_mutex_unlock(&_pmutex);V(_dataSem);}void pop(T *out){ // 取出数据P(_dataSem); // 初始值为0消费者进程会被阻塞!!!pthread_mutex_lock(&_cmutex);*out = _queue[_consumerStep++];_consumerStep %= _cap;pthread_mutex_unlock(&_cmutex);V(_spaceSem);}~RingQueue(){sem_destroy(&_spaceSem);sem_destroy(&_dataSem);pthread_mutex_destroy(&_pmutex);pthread_mutex_destroy(&_cmutex);}private:std::vector<T> _queue;int _cap;// 信号量 sem_t 本质上是一个用于同步的计数器,即 P+1,V-1sem_t _spaceSem; // 生产者 看中空间资源sem_t _dataSem; // 消费者 看中数据资源int _productorStep; // 生产者的步伐(下标)int _consumerStep; // 消费者的步伐(下标)pthread_mutex_t _pmutex;pthread_mutex_t _cmutex;
};
3.2.4 结果输出
三者联系与区别:
特性 | 互斥锁 (Mutex) | 条件变量 (CondVar) | 信号量 (Semaphore) |
---|---|---|---|
主要目的 | 互斥访问共享资源 | 等待特定条件成立 | 控制对多个资源的访问或发信号 |
状态 | 锁定/未锁定 | 无自身状态,依赖外部条件 | 有一个整型计数器 |
配合使用 | 通常单独使用 | 必须与互斥锁一起使用 | 可以单独使用 |
释放机制 | 由锁持有者释放 | 通过wait 自动释放关联互斥锁,唤醒时重新获取 | 可由不同线程释放 |