当前位置: 首页 > news >正文

【Linux线程】阻塞队列环形队列(消费者生产者模型的实现)

目录

前言

1. 阻塞队列

 2. 环形队列

总结


前言

        了解了线程控制、同步与互斥、以及消费者生产者模型,本篇文章为实践篇,对以上内容的实践,使用阻塞队列和环形队列来实现生产者消费者模型;

在这里插入图片描述

1. 阻塞队列

        在多线程编程中,阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别 在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元 素的操作也会被阻塞,直到有元素被从队列中取出;

 模型整体结构:

一个生产线程,一个消费线程,生产线程负责在队列中生产,一个线程负责去取进行消费;

  1. 队列满的时候,生产线程发生阻塞,不再生产
  2. 队列为空时,消费线程发生阻塞,不再拿任务; 

 在这个体系中要理清楚:

由谁来通知生产线程和消费线程? 谁来唤醒线程?

消费者生产者体系中只有生产者知道什么时候需要消费,只有消费者知道什么时候生产;

所以唤醒线程只能互相唤醒

 整体结构较为简单,借助数据结构Queue来实现:

在此之前,可以依据RAII的思想对mutex进行封装,不需要手动的添加解锁(也可以使用C++库在的锁):

#pragma once

#include <pthread.h>

class mutex
{
public:
    mutex(pthread_mutex_t* lock)
        :_lock(lock)
    {}

    void Lock()
    {
        pthread_mutex_lock(_lock);
    }

    void UnLock()
    {
        pthread_mutex_unlock(_lock);
    }
    ~mutex()
    {}
private:
    pthread_mutex_t* _lock;
};

class LockGuard
{
public:
    LockGuard(pthread_mutex_t* lock)
        :_mutex(lock)
    {
        _mutex.Lock();
    }

    ~LockGuard()
    {
        _mutex.UnLock();
    }
private:
    mutex _mutex;
};

 阻塞队列成员设计:

const int defaultcap = 5;
template<class T>
class BlockQueue
{

    
private:
    std::queue<T> _q; 
    int _capacity;
    pthread_mutex_t _mutex;
    pthread_cond_t _p_cond;
    pthread_cond_t _c_cond;

};

 容量设置阻塞队列的大小,锁控制线程安全、两个条件变量用于控制生产线程与消费线程的同步;

 核心接口也就只有两个:Push(生产)、Pop(消费);

void Push(const T& in) //生产者
{
    LockGuard lockguard(&_mutex);

    while (IsFull())
    {
        pthread_cond_wait(&_p_cond, &_mutex);
    }
    _q.push(in);
    // 有数据了可以唤醒消费者线程来进行消费 
    pthread_cond_signal(&_c_cond);

}

void Pop(T* out) //消费者
{
    LockGuard lockguard(&_mutex);

    while (IsEmpty())
    {
        pthread_cond_wait(&_c_cond, &_mutex);
    }

    *out = _q.front();
    _q.pop();
    // 唤醒生产者
    pthread_cond_signal(&_p_cond); //唤醒放在释放锁的前边后边都可以

}

 整体逻辑:


#include <pthread.h>
#include <ctime>
#include <unistd.h>
#include <iostream>
#include <pthread.h>
#include <queue>
#include "LockGuard.hpp"

const int defaultcap = 5;

template<class T>
class BlockQueue
{
public:
    BlockQueue(int cap = defaultcap)
        :_capacity(cap)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_p_cond, nullptr);
        pthread_cond_init(&_c_cond, nullptr);
    }

    bool IsFull()
    {
        return _q.size() == _capacity;
    }
    bool IsEmpty()
    {
        return _q.size() == 0;
    }
    void Push(const T& in) //生产者
    {
        LockGuard lockguard(&_mutex);
        //pthread_mutex_lock(&_mutex);

        while (IsFull())
        {
            pthread_cond_wait(&_p_cond, &_mutex);
        }
        _q.push(in);
        // if(_q.size() > _productor_water_line) pthread_cond_signal(&_c_cond); //达到生产水平线就唤醒线程
        pthread_cond_signal(&_c_cond);//唤醒线程时如果线程本来就醒着,不会有什么影响
        //pthread_mutex_unlock(&_mutex);
    }
    void Pop(T* out) //消费者
    {
        LockGuard lockguard(&_mutex);
        //pthread_mutex_lock(&_mutex);
        while (IsEmpty())
        {
            pthread_cond_wait(&_c_cond, &_mutex);
        }

        *out = _q.front();
        _q.pop();
        //消费者生产者体系中,只有生产者知道什么时候需要消费
        //只有消费者知道什么时候生产
        // if(_q.size() < _consumer_water_line) pthread_cond_signal(&_p_cond);
        pthread_cond_signal(&_p_cond);//唤醒放在释放锁的前边后边都可以
        // 在锁内唤醒,线程不会在条件变量上等了,转而会到阻塞到申请锁的队列

        //pthread_mutex_unlock(&_mutex);
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_p_cond);
        pthread_cond_destroy(&_c_cond);
    }
private:
    std::queue<T> _q;
    int _capacity;
    pthread_mutex_t _mutex;
    pthread_cond_t _p_cond;
    pthread_cond_t _c_cond;

    // int _consumer_water_line;  // _consumer_water_line = _capacity / 3 * 2
    // int _productor_water_line; // _productor_water_line = _capacity / 3
};


void *consumer(void *arg)
{
    BlockQueue<int> *bqp = (BlockQueue<int> *)arg;
    int data;
    for (;;)
    {
        bqp->Pop(&data);
        std::cout << "Consume data done : " << data << std::endl;
    }
}

// more faster
void *producter(void *arg)
{
    BlockQueue<int> *bqp = (BlockQueue<int> *)arg;
    srand((unsigned long)time(NULL));
    for (;;)
    {
        int data = rand() % 1024;
        bqp->Push(data);
        std::cout << "Prodoct data done: " << data << std::endl;
        sleep(1);
    }
}

int main()
{
    BlockQueue<int> bq;
    pthread_t c, p;
    pthread_create(&c, NULL, consumer, (void *)&bq);
    pthread_create(&p, NULL, producter, (void *)&bq);

    pthread_join(c, NULL);
    pthread_join(p, NULL);
    return 0;
}

 2. 环形队列

         阻塞队列的方法也有部分欠缺,比如:把资源放到队列中,使用互斥锁一次就只能一个线程访问这个队列;这也会降低效率;这时就可以使用信号量来解决;

         使用互斥锁时,把队列看成一个整体来访问,使用信号量可以完美解决 多个线程同时访问队列的不同区域; 

 基于信号量实现环形队列:

  1. 信号量本质就是一个计数器
  2. 申请信号量本质是预定资源
  3. PV操作是原子性的

 场景分析:

  1.  生产者打满这个队列后就不能再生产了(会覆盖原有资源)
  2. 消费者不能超过生产者

生产者和消费者指向同一位置的两种情况:

  1. 队列为空(只能让生产者跑)
  2. 队列为满(只能让消费者跑)

也就是说我们需要局部维持互斥和同步 

对资源的描述与认识:空间-->p,数据-->d
所以我们需要两个信号量:

  • p: _space_sem N
  • c: _data_sem  0

 生产者生产:

P(_space_sem)// _space_sem 减减

// 生产数据/任务

V(_data_sem)// _data_sem 加加

消费者消费:

P(_data_sem)// _data_sem 减减

// 生产数据/任务

V(_space_sem )// _space_sem 加加

完整代码:

#include <pthread.h>
#include <ctime>
#include <unistd.h>
#include <iostream>
#include <pthread.h>
#include <queue>
#include "LockGuard.hpp"
#include <vector>
#include <stdlib.h>
#include <semaphore.h>

#define NUM 16

class RingQueue
{
private:
    std::vector<int> q;
    int cap; // 队列容量
    sem_t data_sem; // 数据的数量
    sem_t space_sem; // 空余空间数量
    int consume_step; // 消费偏移量
    int product_step; // 生产偏移量

public:
    RingQueue(int _cap = NUM) : q(_cap), cap(_cap)
    {
        sem_init(&data_sem, 0, 0);
        sem_init(&space_sem, 0, cap);
        consume_step = 0;
        product_step = 0;
    }
    // 生产
    void PutData(const int &data)
    {
        sem_wait(&space_sem); // P
        q[consume_step] = data;
        consume_step++;
        consume_step %= cap;
        sem_post(&data_sem); // V
    }
    // 消费
    void GetData(int &data)
    {
        sem_wait(&data_sem);
        data = q[product_step];
        product_step++;
        product_step %= cap;
        sem_post(&space_sem);
    }
    ~RingQueue()
    {
        sem_destroy(&data_sem);
        sem_destroy(&space_sem);
    }
};

void *consumer(void *arg)
{
    RingQueue *rqp = (RingQueue *)arg;
    int data;
    for (;;)
    {
        rqp->GetData(data);
        std::cout << "Consume data done : " << data << std::endl;
        sleep(1);
    }
}


void *producter(void *arg)
{
    RingQueue *rqp = (RingQueue *)arg;
    srand((unsigned long)time(NULL));
    for (;;)
    {
        int data = rand() % 1024;
        rqp->PutData(data);
        std::cout << "Prodoct data done: " << data << std::endl;
        // sleep(1);
    }
}

int main()
{
    RingQueue rq;
    pthread_t c, p;
    pthread_create(&c, NULL, consumer, (void *)&rq);
    pthread_create(&p, NULL, producter, (void *)&rq);

    pthread_join(c, NULL);
    pthread_join(p, NULL);
}

 这是一个简易版本的环形队列,如对信号量或线程控制不太熟悉的伙伴,可以阅读我的这篇文章:

【Linux线程】线程互斥与同步

【Linux线程】线程控制


总结

        以上便是本文的全部内容,希望对你有所帮助,感谢阅读!

相关文章:

  • Python常见面试题的详解5
  • AI 项目开发流程
  • 硅基流动+OfficeAI:开启WPS智能办公新时代
  • 【MATLAB】解决mod函数、逻辑判断的误差问题
  • 说一下Redis中的Gossip协议
  • 机器学习_16 朴素贝叶斯知识点总结
  • Unreal5从入门到精通之使用 BindWidget 将 C++ 连接到 UMG 蓝图
  • nginx部署vue项目访问路径问题
  • MATLAB联动本地部署的DeepSeek模型
  • JAX-RS与JAXB:实现XML数据交互的完整指南
  • 基于MATLAB的城轨车辆跨接电缆长度计算
  • 青少年编程与数学 02-009 Django 5 Web 编程 20课题、测试
  • 68页PDF | 数据安全总体解决方案:从数据管理方法论到落地实践的全方位指南(附下载)
  • 编码格式大全:类型 特点及其在网络安全中的作用
  • STM32 PWM脉冲宽度调制介绍
  • python股票分析系统部署操作过程及代码实现
  • opensuse [Linux] 系统挂在新的机械硬盘
  • Ubuntu 20 掉显卡驱动的解决办法
  • 设计模式-工厂模式
  • go语言并发的最佳实践
  • 企业展厅图片/seo一个月工资一般多少
  • 阿里云ecs可以做几个网站/百度app下载官方免费下载最新版
  • 网站的版式/最近的新闻有哪些
  • 自己做网站卖东西/免费发布广告
  • 淘宝如何在其他网站做优惠/网络营销软件下载
  • 沂南网站建设/徐州百度推广总代理