当前位置: 首页 > news >正文

Linux操作系统7- 线程同步与互斥4(基于POSIX条件变量的生产者消费者模型)

上篇文章:Linux操作系统7- 线程同步与互斥3(POSIX条件变量的使用,线程循环打印数字)-CSDN博客

本篇代码仓库:myLerningCode/l31 · 橘子真甜/Linux操作系统与网络编程学习 - 码云 - 开源中国 (gitee.com)

目录

一. 生产者消费者模型分析

1.1 三种角色

1.2 生产者消费者模型的特点

二. BlockQueue生产者消费者模型

​编辑

2.1 成员变量 ⭐

2.2 构造与析构函数 

2.3 辅助函数 

2.4 push 与 pop ⭐

三. 测试代码 

3.1 不控制生产者生产,消费者消费

3.2 只控制生产者生产

3.2 只控制消费者消费

四. 代码细节与总结

4.1 pthread_cond_wait


一. 生产者消费者模型分析

1.1 三种角色

        生产者消费者模型中有三个角色生产者,消费者,以及生产者消费者的交易场所。                          这个模型非常类似于生活中的超市,我们作为消费者从超市购买物品,而厂商作为生产者生产物品。

生产者生产数据然后传递给交易场所,如果发现交易场所满了,则阻塞等待

消费者从交易场所拿取数据消费,如果发现交易场所为空,则阻塞等待

交易场所作为临时保存数据的场所,是生产消费的缓冲区

1.2 生产者消费者模型的特点

        根据上面的分析,可以总结出三个角色之间的关系。生产者有1个或者多个,消费者有1个或者多个。但是所有的数据都是不同的。

1 消费者与消费者之间:互斥竞争关系,我不能消费你正在消费的数据

2 生产者与生产者之间:互斥竞争关系,我不能生产你正在生产的数据

3 消费者与生产者之间:互斥(一个时间点只有一个消费者或者生产者访问交易场所,其他人只能等待)与同步(消费者消费数据后通知生产者生产,生产者生产数据后通知消费者消费)关系

        生产者消费者模型可以将生产者和消费者解耦,让二者不会受到对方的影响,关注于自己的事情。 

        还能通过交易场所缓冲数据,以便支持消费者生产者之间忙闲不均的问题(有时候生产者生产数据过快,有时候消费者消费数据过快)从而提高整体的效率

二. BlockQueue生产者消费者模型

        这里使用BlockQueue(阻塞队列)来实现生产者消费者模型。其中阻塞队列是生产者消费者交易的场所。

        作为交易场所BlockQueue需要提供下面的基础功能:

1 通过互斥锁保证交易场所数据的安全

2 为生产者提供放入数据的能力,为消费者提供消费的能力

3 通过条件变量保证生产者消费者之间的同步

2.1 成员变量 ⭐

        通过上面的分析,成员变量至少需要 互斥锁 两个条件变量 一个队列。

代码如下:

#pragma once
#include <iostream>
#include <queue> //使用queue作为阻塞队列

#include <unistd.h>
#include <pthread.h>
const int gnum = 10; // 阻塞队列的默认容量

template <class T>
class BlockQueue
{
private:
    std::queue<T> _queue;  // 阻塞队列
    size_t _maxnum;        // 队列最大容量
    pthread_mutex_t _mtx;  // 互斥锁
    pthread_cond_t _pcond; // 生产者条件变量,满了需要休眠
    pthread_cond_t _ccond; // 消费者条件变量,无数据要休眠
};

2.2 构造与析构函数 

        需要在构造函数中完成锁与条件变量的初始化。析构中销毁锁与条件变量

    BlockQueue(int maxnum = gnum) : _maxnum(maxnum)
    {
        pthread_mutex_init(&_mtx, nullptr);
        pthread_cond_init(&_pcond, nullptr);
        pthread_cond_init(&_ccond, nullptr);
    }

    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mtx);
        pthread_cond_destroy(&_pcond);
        pthread_cond_destroy(&_ccond);
    }

2.3 辅助函数 

        由于每一次生产消费数据都需要判断阻塞队列是否为满或者空。所以需要两个函数来帮助我们快速获取阻塞队列的信息。

//不想让外部调用这些代码,设置为私有
private:
    bool is_empty()
    {
        return _queue.empty();
    }

    bool is_full()
    {
        return _queue.size() == _maxnum;
    }

2.4 push 与 pop ⭐

        这两个函数是阻塞队列生产者消费模型的核心函数。生产生产数据之后通过push向阻塞队列中放入数据,消费者从阻塞队列中拿取数据然后消费。

    // 生产者生产数据
    void push(const T &in)
    {
        // 加锁保护
        pthread_mutex_lock(&_mtx);

        // 判断是否满足生产
        while (is_full())
        {
            // 数据满了生产者等待消费者消费
            pthread_cond_wait(&_pcond, &_mtx);
        }

        // 生产数据
        _queue.push(in);

        // 队列不为空,通知消费者消费
        pthread_cond_signal(&_ccond);

        // 解锁
        pthread_mutex_unlock(&_mtx);
    }

    // 消费这消费数据,通过输入输出型参数获取数据
    void pop(T *out)
    {
        // 加锁保护
        pthread_mutex_lock(&_mtx);

        // 判断是否可以消费数据
        while (is_empty())
        {
            // 等待生产者生产数据
            pthread_cond_wait(&_ccond, &_mtx);
        }

        // 开始消费数据
        *out = _queue.front();
        _queue.pop();

        // 队列不满,通知生产者生产数据
        pthread_cond_signal(&_pcond);

        // 解锁
        pthread_mutex_unlock(&_mtx);
    }

2.5 BlockQueue.hpp

        阻塞队列的全部代码如下:

#pragma once
#include <iostream>
#include <queue> //使用queue作为阻塞队列

#include <unistd.h>
#include <pthread.h>
const int gnum = 10; // 阻塞队列的最大容量

template <class T>
class BlockQueue
{
public:
    BlockQueue(int maxnum = gnum) : _maxnum(maxnum)
    {
        pthread_mutex_init(&_mtx, nullptr);
        pthread_cond_init(&_pcond, nullptr);
        pthread_cond_init(&_ccond, nullptr);
    }

    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mtx);
        pthread_cond_destroy(&_pcond);
        pthread_cond_destroy(&_ccond);
    }

    // 生产者生产数据
    void push(const T &in)
    {
        // 加锁保护
        pthread_mutex_lock(&_mtx);

        // 判断是否满足生产
        while (is_full())
        {
            // 数据满了生产者等待消费者消费
            pthread_cond_wait(&_pcond, &_mtx);
        }

        // 生产数据
        _queue.push(in);

        // 队列不为空,通知消费者消费
        pthread_cond_signal(&_ccond);

        // 解锁
        pthread_mutex_unlock(&_mtx);
    }

    // 消费这消费数据,通过输入输出型参数获取数据
    void pop(T *out)
    {
        // 加锁保护
        pthread_mutex_lock(&_mtx);

        // 判断是否可以消费数据
        while (is_empty())
        {
            // 等待生产者生产数据
            pthread_cond_wait(&_ccond, &_mtx);
        }

        // 开始消费数据
        *out = _queue.front();
        _queue.pop();

        // 队列不满,通知生产者生产数据
        pthread_cond_signal(&_pcond);

        // 解锁
        pthread_mutex_unlock(&_mtx);
    }

private:
    bool is_empty()
    {
        return _queue.empty();
    }

    bool is_full()
    {
        return _queue.size() == _maxnum;
    }

private:
    std::queue<T> _queue;  // 阻塞队列
    size_t _maxnum;        // 队列最大容量
    pthread_mutex_t _mtx;  // 互斥锁
    pthread_cond_t _pcond; // 生产者条件变量,满了需要休眠
    pthread_cond_t _ccond; // 消费者条件变量,无数据要休眠
};

三. 测试代码 

        创建一个线程用于生产,一个线程用于消费。生产线程向队列生产一个随机数,而消费线程从队列中拿取数据并打印输出。

3.1 不控制生产者生产,消费者消费

测试代码如下:

#include <iostream>
#include <string>

#include <unistd.h>
#include <pthread.h>
#include "BlockQueue.hpp"

void *producer(void *args)
{
    // 获取交易场所 - 阻塞队列
    BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
    while (true)
    {
        int tmp = rand() % 10000 + 1;
        bq->push(tmp);
        std::cout << "生产者生产数据:" << tmp << std::endl;
    }

    return nullptr;
}

void *consumer(void *args)
{
    // 获取交易场所 - 阻塞队列
    BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
    while (true)
    {
        int tmp = 0;
        bq->pop(&tmp);
        std::cout << "消费者获取数据:" << tmp << std::endl;
    }
    return nullptr;
}

int main()
{
    srand(time(0) ^ getpid() ^ rand());
    // 定义生产消费线程与阻塞队列
    pthread_t p;
    pthread_t c;
    BlockQueue<int> *bq = new BlockQueue<int>();

    pthread_create(&p, nullptr, producer, (void *)bq);
    pthread_create(&c, nullptr, consumer, (void *)bq);

    pthread_join(p, nullptr);
    pthread_join(c, nullptr);

    delete bq;
    bq = nullptr;
    return 0;
}

测试结果如下:

可以看到,生产者疯狂生产数据,消费者疯狂消费数据

3.2 只控制生产者生产

        让生产者线程生产一次就调用sleep(1);

void *producer(void *args)
{
    // 获取交易场所 - 阻塞队列
    BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
    while (true)
    {
        sleep(1);
        int tmp = rand() % 10000 + 1;
        bq->push(tmp);
        std::cout << "生产者生产数据:" << tmp << std::endl;
    }

    return nullptr;
}

        可以看到,生产者生产数据之后消费者才去消费数据。当生产者sleep的时候,消费者发现阻塞队列为空,此时就会阻塞自己,等待生产者生产数据

3.2 只控制消费者消费

        消费者每隔一秒才去消费数据。

void *consumer(void *args)
{
    // 获取交易场所 - 阻塞队列
    BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
    while (true)
    {
        sleep(1);
        int tmp = 0;
        bq->pop(&tmp);
        std::cout << "消费者获取数据:" << tmp << std::endl;
    }
    return nullptr;
}

    运行结果如下:

        可以看到:由于生产者不受控制,一开始就生产很多数据。由于消费者每隔一秒才消费数据,所以会先消费生产者之前生产的数据

四. 代码细节与总结

        通过上面的测试:可以直到,通过生产者消费者模型可以实现线程之间的同步与互斥。还能通过控制生产者消费者生产消费的速度来调整双方的同步。

4.1 pthread_cond_wait

        生产者生产数据的时候,消费者消费数据的时候都是处于临界区。这时候生产者或消费者都是带着锁的。

        为什么生产者带上锁阻塞,消费者还能消费?消费者上锁阻塞,生产者还能生存?

         这是因为:pthread_cond_wait有第二个参数,被调用的时候。会以原子性将我们的锁释放,然后阻塞自己。

        当wait被唤醒之后,同样以原子性再次加锁。

        如果多个生产者或者消费者被同时唤醒,此时pthread_condd_wait会不会调用失败?

        假设10个生产者都满足if_full 进入wait。然后同时唤醒,如果我们只使用if进行判断的话,就会导致10个生存者被唤醒后就都去插入数据了

        所以需要使用while循环去判断条件是否满足,即便唤醒了,也要再次判断条件满足。因为有可能你刚唤醒,就有其他生产者生产了数据!

相关文章:

  • moveit2基础教程上手-使用xarm6演示
  • 【工具变量】中国各地级市是否属于“信息惠民国家试点城市”匹配数据(2010-2024年)
  • 『 C++ 』错误使用std::mutex引发的error C2039: “try_lock_until“: 不是 “std::mutex“ 的成员
  • vue3:十一、主页面布局(左侧菜单折叠展开设置)
  • vulnhub靶场之【hack-me-please靶机】
  • 微前端框架深度对比与技术实现剖析
  • Cursor的五种高级用法
  • 数智读书笔记系列022《算力网络-云网融合2.0时代的网络架构与关键技术》读书笔记
  • 权限维持—Linux系统Rootkit后门
  • 云原生之开源遥测框架OpenTelemetry(在 Gin 框架中使用 OpenTelemetry 进行分布式追踪和监控)
  • 软考通关利器:中级软件设计师结构化开发核心考点
  • UniRel论文复现过程中的问题解决办法(全)
  • 【C语言】字符串函数详解
  • AtCoder Beginner Contest 398(ABCDEF)
  • MySQL密码修改的全部方式一篇详解
  • 使用brower use AI 代理自动控制浏览器完成任务
  •  UNIX网络编程笔记:TCP客户/服务器程序示例
  • 基于springboot的房屋租赁系统(028)
  • 电机控制常见面试问题(十七)
  • JetsonNano —— 4、Windows下对JetsonNano板卡烧录刷机Ubuntu20.04版本(官方教程)
  • 这就是上海!
  • 西夏文残碑等文物来沪,见证一段神秘灿烂的历史
  • 电话费被私改成48元套餐长达数年,投诉后移动公司退补600元话费
  • 宁夏民政厅原厅长欧阳艳已任自治区政府副秘书长、办公厅主任
  • 江苏银行一季度净赚近98亿增逾8%,不良贷款率微降
  • 广东一公司违规开展学科培训被罚没470万,已注销营业执照