Linux操作系统7- 线程同步与互斥4(基于POSIX条件变量的生产者消费者模型)
上篇文章:Linux操作系统7- 线程同步与互斥3(POSIX条件变量的使用,线程循环打印数字)-CSDN博客
本篇代码仓库:myLerningCode/l31 · 橘子真甜/Linux操作系统与网络编程学习 - 码云 - 开源中国 (gitee.com)
目录
一. 生产者消费者模型分析
1.1 三种角色
1.2 生产者消费者模型的特点
二. BlockQueue生产者消费者模型
编辑
2.1 成员变量 ⭐
2.2 构造与析构函数
2.3 辅助函数
2.4 push 与 pop ⭐
三. 测试代码
3.1 不控制生产者生产,消费者消费
3.2 只控制生产者生产
3.2 只控制消费者消费
四. 代码细节与总结
4.1 pthread_cond_wait
一. 生产者消费者模型分析
1.1 三种角色
生产者消费者模型中有三个角色生产者,消费者,以及生产者消费者的交易场所。 这个模型非常类似于生活中的超市,我们作为消费者从超市购买物品,而厂商作为生产者生产物品。
生产者生产数据然后传递给交易场所,如果发现交易场所满了,则阻塞等待
消费者从交易场所拿取数据消费,如果发现交易场所为空,则阻塞等待
交易场所作为临时保存数据的场所,是生产消费的缓冲区
1.2 生产者消费者模型的特点
根据上面的分析,可以总结出三个角色之间的关系。生产者有1个或者多个,消费者有1个或者多个。但是所有的数据都是不同的。
1 消费者与消费者之间:互斥竞争关系,我不能消费你正在消费的数据
2 生产者与生产者之间:互斥竞争关系,我不能生产你正在生产的数据
3 消费者与生产者之间:互斥(一个时间点只有一个消费者或者生产者访问交易场所,其他人只能等待)与同步(消费者消费数据后通知生产者生产,生产者生产数据后通知消费者消费)关系
生产者消费者模型可以将生产者和消费者解耦,让二者不会受到对方的影响,关注于自己的事情。
还能通过交易场所缓冲数据,以便支持消费者生产者之间忙闲不均的问题(有时候生产者生产数据过快,有时候消费者消费数据过快)从而提高整体的效率
二. BlockQueue生产者消费者模型
这里使用BlockQueue(阻塞队列)来实现生产者消费者模型。其中阻塞队列是生产者消费者交易的场所。
作为交易场所BlockQueue需要提供下面的基础功能:
1 通过互斥锁保证交易场所数据的安全
2 为生产者提供放入数据的能力,为消费者提供消费的能力
3 通过条件变量保证生产者消费者之间的同步
2.1 成员变量 ⭐
通过上面的分析,成员变量至少需要 互斥锁 两个条件变量 一个队列。
代码如下:
#pragma once
#include <iostream>
#include <queue> //使用queue作为阻塞队列
#include <unistd.h>
#include <pthread.h>
const int gnum = 10; // 阻塞队列的默认容量
template <class T>
class BlockQueue
{
private:
std::queue<T> _queue; // 阻塞队列
size_t _maxnum; // 队列最大容量
pthread_mutex_t _mtx; // 互斥锁
pthread_cond_t _pcond; // 生产者条件变量,满了需要休眠
pthread_cond_t _ccond; // 消费者条件变量,无数据要休眠
};
2.2 构造与析构函数
需要在构造函数中完成锁与条件变量的初始化。析构中销毁锁与条件变量
BlockQueue(int maxnum = gnum) : _maxnum(maxnum)
{
pthread_mutex_init(&_mtx, nullptr);
pthread_cond_init(&_pcond, nullptr);
pthread_cond_init(&_ccond, nullptr);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mtx);
pthread_cond_destroy(&_pcond);
pthread_cond_destroy(&_ccond);
}
2.3 辅助函数
由于每一次生产消费数据都需要判断阻塞队列是否为满或者空。所以需要两个函数来帮助我们快速获取阻塞队列的信息。
//不想让外部调用这些代码,设置为私有
private:
bool is_empty()
{
return _queue.empty();
}
bool is_full()
{
return _queue.size() == _maxnum;
}
2.4 push 与 pop ⭐
这两个函数是阻塞队列生产者消费模型的核心函数。生产生产数据之后通过push向阻塞队列中放入数据,消费者从阻塞队列中拿取数据然后消费。
// 生产者生产数据
void push(const T &in)
{
// 加锁保护
pthread_mutex_lock(&_mtx);
// 判断是否满足生产
while (is_full())
{
// 数据满了生产者等待消费者消费
pthread_cond_wait(&_pcond, &_mtx);
}
// 生产数据
_queue.push(in);
// 队列不为空,通知消费者消费
pthread_cond_signal(&_ccond);
// 解锁
pthread_mutex_unlock(&_mtx);
}
// 消费这消费数据,通过输入输出型参数获取数据
void pop(T *out)
{
// 加锁保护
pthread_mutex_lock(&_mtx);
// 判断是否可以消费数据
while (is_empty())
{
// 等待生产者生产数据
pthread_cond_wait(&_ccond, &_mtx);
}
// 开始消费数据
*out = _queue.front();
_queue.pop();
// 队列不满,通知生产者生产数据
pthread_cond_signal(&_pcond);
// 解锁
pthread_mutex_unlock(&_mtx);
}
2.5 BlockQueue.hpp
阻塞队列的全部代码如下:
#pragma once
#include <iostream>
#include <queue> //使用queue作为阻塞队列
#include <unistd.h>
#include <pthread.h>
const int gnum = 10; // 阻塞队列的最大容量
template <class T>
class BlockQueue
{
public:
BlockQueue(int maxnum = gnum) : _maxnum(maxnum)
{
pthread_mutex_init(&_mtx, nullptr);
pthread_cond_init(&_pcond, nullptr);
pthread_cond_init(&_ccond, nullptr);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mtx);
pthread_cond_destroy(&_pcond);
pthread_cond_destroy(&_ccond);
}
// 生产者生产数据
void push(const T &in)
{
// 加锁保护
pthread_mutex_lock(&_mtx);
// 判断是否满足生产
while (is_full())
{
// 数据满了生产者等待消费者消费
pthread_cond_wait(&_pcond, &_mtx);
}
// 生产数据
_queue.push(in);
// 队列不为空,通知消费者消费
pthread_cond_signal(&_ccond);
// 解锁
pthread_mutex_unlock(&_mtx);
}
// 消费这消费数据,通过输入输出型参数获取数据
void pop(T *out)
{
// 加锁保护
pthread_mutex_lock(&_mtx);
// 判断是否可以消费数据
while (is_empty())
{
// 等待生产者生产数据
pthread_cond_wait(&_ccond, &_mtx);
}
// 开始消费数据
*out = _queue.front();
_queue.pop();
// 队列不满,通知生产者生产数据
pthread_cond_signal(&_pcond);
// 解锁
pthread_mutex_unlock(&_mtx);
}
private:
bool is_empty()
{
return _queue.empty();
}
bool is_full()
{
return _queue.size() == _maxnum;
}
private:
std::queue<T> _queue; // 阻塞队列
size_t _maxnum; // 队列最大容量
pthread_mutex_t _mtx; // 互斥锁
pthread_cond_t _pcond; // 生产者条件变量,满了需要休眠
pthread_cond_t _ccond; // 消费者条件变量,无数据要休眠
};
三. 测试代码
创建一个线程用于生产,一个线程用于消费。生产线程向队列生产一个随机数,而消费线程从队列中拿取数据并打印输出。
3.1 不控制生产者生产,消费者消费
测试代码如下:
#include <iostream>
#include <string>
#include <unistd.h>
#include <pthread.h>
#include "BlockQueue.hpp"
void *producer(void *args)
{
// 获取交易场所 - 阻塞队列
BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
while (true)
{
int tmp = rand() % 10000 + 1;
bq->push(tmp);
std::cout << "生产者生产数据:" << tmp << std::endl;
}
return nullptr;
}
void *consumer(void *args)
{
// 获取交易场所 - 阻塞队列
BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
while (true)
{
int tmp = 0;
bq->pop(&tmp);
std::cout << "消费者获取数据:" << tmp << std::endl;
}
return nullptr;
}
int main()
{
srand(time(0) ^ getpid() ^ rand());
// 定义生产消费线程与阻塞队列
pthread_t p;
pthread_t c;
BlockQueue<int> *bq = new BlockQueue<int>();
pthread_create(&p, nullptr, producer, (void *)bq);
pthread_create(&c, nullptr, consumer, (void *)bq);
pthread_join(p, nullptr);
pthread_join(c, nullptr);
delete bq;
bq = nullptr;
return 0;
}
测试结果如下:
可以看到,生产者疯狂生产数据,消费者疯狂消费数据
3.2 只控制生产者生产
让生产者线程生产一次就调用sleep(1);
void *producer(void *args)
{
// 获取交易场所 - 阻塞队列
BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
while (true)
{
sleep(1);
int tmp = rand() % 10000 + 1;
bq->push(tmp);
std::cout << "生产者生产数据:" << tmp << std::endl;
}
return nullptr;
}
可以看到,生产者生产数据之后消费者才去消费数据。当生产者sleep的时候,消费者发现阻塞队列为空,此时就会阻塞自己,等待生产者生产数据
3.2 只控制消费者消费
消费者每隔一秒才去消费数据。
void *consumer(void *args)
{
// 获取交易场所 - 阻塞队列
BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
while (true)
{
sleep(1);
int tmp = 0;
bq->pop(&tmp);
std::cout << "消费者获取数据:" << tmp << std::endl;
}
return nullptr;
}
运行结果如下:
可以看到:由于生产者不受控制,一开始就生产很多数据。由于消费者每隔一秒才消费数据,所以会先消费生产者之前生产的数据
四. 代码细节与总结
通过上面的测试:可以直到,通过生产者消费者模型可以实现线程之间的同步与互斥。还能通过控制生产者消费者生产消费的速度来调整双方的同步。
4.1 pthread_cond_wait
生产者生产数据的时候,消费者消费数据的时候都是处于临界区。这时候生产者或消费者都是带着锁的。
为什么生产者带上锁阻塞,消费者还能消费?消费者上锁阻塞,生产者还能生存?
这是因为:pthread_cond_wait有第二个参数,被调用的时候。会以原子性将我们的锁释放,然后阻塞自己。
当wait被唤醒之后,同样以原子性再次加锁。
如果多个生产者或者消费者被同时唤醒,此时pthread_condd_wait会不会调用失败?
假设10个生产者都满足if_full 进入wait。然后同时唤醒,如果我们只使用if进行判断的话,就会导致10个生存者被唤醒后就都去插入数据了
所以需要使用while循环去判断条件是否满足,即便唤醒了,也要再次判断条件满足。因为有可能你刚唤醒,就有其他生产者生产了数据!