当前位置: 首页 > news >正文

linux--------------------BlockQueue的生产者消费模型

1.基础BlockingQueue的生产者消费模型

1.1 BlockQueue

在多线程编程中阻塞队列是一种常用于实现生产者和消费者模型的数据结构,它与普通的队列区别在于,当队列为空时,从队列获取元素的操作将被阻塞,直到队列中放入了新的数据。当队列满的时候,往里面插入数据也会被阻塞直到有元素被取出

1.2 c++ queue模拟实现阻塞队列的生产消费模型

为了理解我们来看一下代码

#ifndef __BLOCK_QUEUE_HPP__
#define __BLOCK_QUEUE_HPP__
#include <iostream>
#include <string>
#include <queue>
#include <pthread.h>
template <typename T>
class BlockQueue
{
private:
bool IsFull()
{
return _block_queue.size() == _cap;
}
bool IsEmpty()
{
return _block_queue.empty();
}
public:
BlockQueue(int cap) : _cap(cap)
{
_productor_wait_num = 0;
_consumer_wait_num = 0;
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_product_cond, nullptr);
pthread_cond_init(&_consum_cond, nullptr);
}
void Enqueue(T &in) // ⽣产者⽤的接⼝
{
pthread_mutex_lock(&_mutex);
while(IsFull()) // 保证代码的健壮性
{
// ⽣产线程去等待,是在临界区中休眠的!你现在还持有锁呢!!!
// 1. pthread_cond_wait调⽤是: a. 让调⽤线程等待 b. ⾃动释放曾经持有的
_mutex锁 c. 当条件满⾜,线程唤醒,pthread_cond_wait要求线性
// 必须重新竞争_mutex锁,竞争成功,⽅可返回!!!
// 之前:安全
_productor_wait_num++;
pthread_cond_wait(&_product_cond, &_mutex); // 只要等待,必定会有唤
醒,唤醒的时候,就要继续从这个位置向下运⾏!!
_productor_wait_num--;
// 之后:安全
}
// 进⾏⽣产
// _block_queue.push(std::move(in));
// std::cout << in << std::endl;
_block_queue.push(in);
// 通知消费者来消费
if(_consumer_wait_num > 0)
pthread_cond_signal(&_consum_cond); // pthread_cond_broadcast
pthread_mutex_unlock(&_mutex);
}
void Pop(T *out) // 消费者⽤的接⼝ --- 5个消费者
{
pthread_mutex_lock(&_mutex);
while(IsEmpty()) // 保证代码的健壮性
{
// 消费线程去等待,是在临界区中休眠的!你现在还持有锁呢!!!
// 1. pthread_cond_wait调⽤是: a. 让调⽤进程等待 b. ⾃动释放曾经持有的
_mutex锁
_consumer_wait_num++;
pthread_cond_wait(&_consum_cond, &_mutex); // 伪唤醒
_consumer_wait_num--;
}// 进⾏消费
*out = _block_queue.front();
_block_queue.pop();
// 通知⽣产者来⽣产
if(_productor_wait_num > 0)
pthread_cond_signal(&_product_cond);
pthread_mutex_unlock(&_mutex);
// pthread_cond_signal(&_product_cond);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_product_cond);
pthread_cond_destroy(&_consum_cond);
}
private:
std::queue<T> _block_queue; // 阻塞队列,是被整体使⽤的!!!
int _cap; // 总上限
pthread_mutex_t _mutex; // 保护_block_queue的锁
pthread_cond_t _product_cond; // 专⻔给⽣产者提供的条件变量
pthread_cond_t _consum_cond; // 专⻔给消费者提供的条件变量
int _productor_wait_num;
int _consumer_wait_num;
};📌 注意:这⾥采⽤模版,是想告诉我们,队列中不仅仅可以防⽌内置类型,⽐如int, 对象也可
以作为任务来参与⽣产消费的过程哦.
下⾯附上⼀张代码,⽅便课堂使⽤
#pragma once
#include <iostream>
#include <string>
#include <functional>
// 任务类型1
// class Task
// {
// public:
// Task() {}
// Task(int a, int b) : _a(a), _b(b), _result(0)
// {
// }
// void Excute()
// {
// _result = _a + _b;
// }
// std::string ResultToString()
// {
// return std::to_string(_a) + "+" + std::to_string(_b) + "=" +
std::to_string(_result);
// }
// std::string DebugToString()
// {
// return std::to_string(_a) + "+" + std::to_string(_b) + "=?";
// }
// private:
// int _a;
// int _b;
// int _result;
// };
// 任务类型2
using Task = std::function<void()>;

2-3 为什么ptread_cond_wait需要互斥量

条件等待是线程同步的一种手段,如果只有一个线程,条件不满足,一直等待下去都不会被满足,所以必须要有一个线程通过某些手段,去改变共享变量,使原先的条件不满足变成满足,然后再去友好的通知等待在该条件变量上的线程

条件不会无缘无故的突然变得满足了,必然牵扯到共享数据的变化,所以一定要用互斥锁来保护

按照上⾯的说法,我们设计出如下的代码:先上锁,发现条件不满⾜,解锁,然后等待在条件变
量上不就⾏了,如下代码:
// 错误的设计
pthread_mutex_lock(&mutex);
while (condition_is_false) {
pthread_mutex_unlock(&mutex);
//解锁之后,等待之前,条件可能已经满⾜,信号已经发出,但是该信号可能被错过
pthread_cond_wait(&cond);
pthread_mutex_lock(&mutex);
}
pthread_mutex_unlock(&mutex);

由于解锁和等待不是原子性,调用解锁之后pthread_cond_wait之前,如果有其他的线程获得互斥量摒弃条件满⾜,发送了信号,那么 pthread_cond_wait 将错过这个信 号,可能会导致线程永远阻塞在这个 pthread_cond_wait 。所以解锁和等待必须是⼀个原⼦操作。

2-4 条件变量的使用规范

pthread_mutex_lock(&mutex);
while (条件为假)
pthread_cond_wait(cond, mutex);
修改条件
pthread_mutex_unlock(&mutex);

给条件发现信号

pthread_mutex_lock(&mutex);
设置条件为真
pthread_cond_signal(cond);
pthread_mutex_unlock(&mutex);

2-5条件变量的封装

大家有兴趣的可以看一下

test_4_7_线程池/Cond.h · liu xi peng/linux---ubuntu系统 - 码云 - 开源中国

2-6 POSIX信号量

POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源,但是OIXIS可以用于线程间同步

初始化

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
pshared:0表⽰线程间共享,⾮零表⽰进程间共享
value:信号量初始值

销毁

int sem_destroy(sem_t *sem);

等待

功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem); 
发布
int sem_post(sem_t *sem);
http://www.dtcms.com/a/288283.html

相关文章:

  • 【Docker基础】深入解析Docker-compose核心配置:Services服务配置详解
  • Gitee 提交信息的规范
  • 算法基础知识总结
  • GoC 图片指令
  • BeanFactory 和 FactoryBean 的区别
  • 架构探索笔记【1】
  • 如何快速学习一门新技术
  • 实用的文件和文件夹批量重命名工具
  • 手撕Spring底层系列之:注解驱动的魔力与实现内幕
  • 【Linux】重生之从零开始学习运维之Nginx
  • 【服务器与部署 14】消息队列部署:RabbitMQ、Kafka生产环境搭建指南
  • Linux中添加重定向(Redirection)功能到minishell
  • 中小机构如何低成本搭建教育培训平台?源码开发+私有化部署攻略
  • 什么是帕累托最优,帕累托最优如何运用在组相联映像中
  • AspectJ 表达式中常见符号说明
  • GoogleBenchmark用法
  • 环形区域拉普拉斯方程傅里叶级数解
  • 电阻耐压参数学习总结
  • 再谈进程-控制
  • 敏感词 v0.27.0 新特性之词库独立拆分
  • 5-大语言模型—理论基础:注意力机制优化
  • 关于个人博客系统的测试报告
  • Typecho评论系统集成Markdown编辑器完整教程
  • Windows事件查看器完整指南
  • 最少标记点问题:贪心算法解析
  • 深入了解 find_element 方法:Web 自动化定位元素的核心​
  • Linux某个进程CPU占用率高原因定位手段
  • Vue基础(前端教程①-路由)
  • 从 C# 转 Python 第三天:文件操作、异常处理与错误日志实践
  • 量子计算与AI融合的技术突破与实践路径