当前位置: 首页 > news >正文

【Linux】条件变量封装类及环形队列的实现

📢博客主页:https://blog.csdn.net/2301_779549673
📢博客仓库:https://gitee.com/JohnKingW/linux_test/tree/master/lesson
📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
📢本文由 JohnKi 原创,首发于 CSDN🙉
📢未来很长,值得我们全力奔赴更美好的生活✨

在这里插入图片描述

在这里插入图片描述

文章目录

  • 📢前言
  • 🏳️‍🌈一、条件变量 Cond
    • 1.1 条件变量的本质
    • 1.2 核心作用
  • 🏳️‍🌈二、条件变量封装类
    • 2.1 类定义
    • 2.2 构造函数
    • 2.3 Wait 方法
    • 2.4 Notify 和 NotifyAll
    • 2.5 析构函数
    • 2.6 整体代码
    • 2.7 示例用法
  • 🏳️‍🌈三、POSIX 信号量
    • 3.1 Sem类核心功能
    • 3.2 设计意义
    • 3.3 Sem类封装
  • 🏳️‍🌈四、环形队列的逻辑及实现
    • 4.1 核心设计
    • 4.2 成员变量
    • 4.3 核心接口
      • 4.3.1 构造函数
      • 4.3.2Equeue(生产者接口)
      • 4.3.3 Pop(消费者接口)​
    • 4.4 同步机制
      • 4.4.1 信号量控制
      • 4.4.2 互斥锁保护
    • 4.5 整体代码
    • 4.6 测试代码
  • 👥总结


📢前言

紧接上回 BlockQueue生产消费模型 的实现,这篇文章,笔者来介绍一下 条件变量 的意义和作用,然后进行 封装,并实现 BlockQueue 的加强版 环形阻塞队列 RingBuffer


🏳️‍🌈一、条件变量 Cond

1.1 条件变量的本质

条件变量 是 ​线程间的通信机制,用于在 ​特定条件不满足时挂起线程,并在条件满足时 ​唤醒线程继续执行。它必须与 ​**互斥锁(Mutex)**​ 配合使用,确保操作的原子性。

1.2 核心作用

  1. 避免忙等待(Busy Waiting)​
    ​问题:没有条件变量时,线程需反复检查条件是否满足(while(条件不满足)),浪费CPU资源。
    ​解决:条件变量让线程在条件不满足时主动休眠,直到被其他线程唤醒。

  2. 实现线程间协作
    ​生产者-消费者模型:生产者通知消费者“数据已准备好”,消费者通知生产者“缓冲区有空位”。
    ​任务队列:工作线程在队列为空时休眠,主线程添加任务后唤醒工作线程。

  3. 保证操作的原子性
    通过 ​互斥锁 + 条件变量 的组合,确保线程在检查条件和进入等待状态的整个过程是原子的避免竞态条件

🏳️‍🌈二、条件变量封装类

头文件和命名空间

  • 包含 和 <pthread.h>,后者提供 POSIX 线程操作。
  • 使用 LockModule 命名空间中的 Mutex 类,表示互斥锁。

2.1 类定义

class Cond {
public:
    Cond();
    void Wait(Mutex &mutex);
    void Notify();
    void NotifyAll();
    ~Cond();
private:
    pthread_cond_t _cond;
};

封装了 pthread_cond_t,提供初始化、等待、通知和销毁功能

2.2 构造函数

Cond() {
    int n = ::pthread_cond_init(&_cond, nullptr);
    (void)n; // 忽略返回值
}
  • 调用 pthread_cond_init 初始化条件变量。
  • 返回值 n 被忽略,实际应用中应检查错误(如返回非零表示失败)。

2.3 Wait 方法

void Wait(Mutex &mutex) {
    int n = ::pthread_cond_wait(&_cond, mutex.LockPtr());
}

作用:使当前线程等待条件变量,并释放关联的互斥锁。
​参数Mutex 对象的引用,调用其 LockPtr() 获取底层 pthread_mutex_t*
​流程

  1. 调用线程必须已锁定 mutex
  2. pthread_cond_wait 自动释放 mutex 并阻塞,直到被唤醒。
  3. 被唤醒后重新获取 mutex,继续执行。

2.4 Notify 和 NotifyAll

void Notify() {
    ::pthread_cond_signal(&_cond); // 唤醒一个等待线程
}
void NotifyAll() {
    ::pthread_cond_broadcast(&_cond); // 唤醒所有等待线程
}

区别

  • Notify() 唤醒至少一个等待线程。
  • NotifyAll() 唤醒所有等待线程。
  • 应在持有相同互斥锁时调用,以避免竞态条件

2.5 析构函数

~Cond() {
    ::pthread_cond_destroy(&_cond); // 销毁条件变量
}

确保没有线程等待时销毁,否则行为未定义

2.6 整体代码

Mutex.hpp

#pragma once
#include <iostream>
#include <pthread.h> // POSIX线程库头文件

namespace LockModule
{
    // 互斥锁封装类(不可拷贝构造/赋值)
    class Mutex
    {
    public:
        // 禁止拷贝(保护系统锁资源)
        Mutex(const Mutex&) = delete;
        const Mutex& operator = (const Mutex&) = delete;

        // 构造函数:初始化POSIX互斥锁
        Mutex()
        {
            // 初始化互斥锁属性为默认值
            int n = ::pthread_mutex_init(&_lock, nullptr);
            (void)n; // 实际开发建议处理错误码
        }

        // 析构函数:销毁锁资源
        ~Mutex()
        {
            // 确保锁已处于未锁定状态
            int n = ::pthread_mutex_destroy(&_lock);
            (void)n; // 生产环境应检查返回值
        }

        // 加锁操作(阻塞直至获取锁)
        void Lock()
        {
            // 可能返回EDEADLK(死锁检测)等错误码
            int n = ::pthread_mutex_lock(&_lock);
            (void)n; // 简化处理,实际建议抛异常或记录日志
        }

        // 解锁操作(必须由锁持有者调用)
        void Unlock()
        {
            // 未持有锁时解锁将返回EPERM
            int n = ::pthread_mutex_unlock(&_lock);
            (void)n; 
        }

        // 获取底层锁指针(可用于自定义条件变量)
        pthread_mutex_t *LockPtr()
        {
            return &_lock;
        }

    private:
        pthread_mutex_t _lock; // 底层锁对象
    };

    // RAII锁守卫(自动管理锁生命周期)
    class LockGuard
    {
    public:
        // 构造时加锁(必须传入已初始化的Mutex引用)
        LockGuard(Mutex &mtx):_mtx(mtx)
        {
            _mtx.Lock(); // 进入临界区
        }

        // 析构时自动解锁(异常安全保证)
        ~LockGuard()
        {
            _mtx.Unlock(); // 离开作用域自动释放
        }

    private:
        Mutex &_mtx; // 引用方式持有,避免拷贝导致未定义行为
    };
}

Cond.hpp

#pragma once

#include <iostream>
#include <pthread.h>

#include "Mutex.hpp"


namespace CondModule{
    using namespace LockModule;

    class Cond{
        public:
            Cond(){
                int n = ::pthread_cond_init(&_cond, nullptr);
                (void)n;
            }

            void Wait(Mutex& mutex){
                // pthread_mutex_lock() 的作用是 锁定互斥锁,直到成功为止。
                // 第一个参数 cond 是一个条件变量的标识符,第二个参数 mutex 是一个互斥锁的标识符。
                // pthread_cond_wait() 用来等待条件变量 cond 被 pthread_cond_signal() 或 pthread_cond_broadcast() 唤醒。
                // 它将阻塞调用线程,直到被唤醒或被中断。
                // 返回值是 0 表示成功,其他值表示出错。
                // 出错时,errno 被设置。
                // 成功时,调用线程获得互斥锁 mutex,直到被 pthread_cond_signal() 或 pthread_cond_broadcast() 唤醒。
                int n = ::pthread_cond_wait(&_cond, mutex.LockPtr());
                (void)n;
            }

            void Notify(){
                // pthread_cond_signal() 用来唤醒一个正在等待条件变量的线程。
                // 第一个参数 cond 是一个条件变量的标识符。
                // 返回值是 0 表示成功,其他值表示出错。
                // 出错时,errno 被设置。
                // 成功时,一个正在等待条件变量的线程被唤醒。
                int n = ::pthread_cond_signal(&_cond);
                (void)n;
            }

            void NotifyAll(){
                // pthread_cond_broadcast() 用来唤醒所有正在等待条件变量的线程。
                // 第一个参数 cond 是一个条件变量的标识符。
                // 返回值是 0 表示成功,其他值表示出错。
                // 出错时,errno 被设置。
                // 成功时,所有正在等待条件变量的线程被唤醒。
                int n = ::pthread_cond_broadcast(&_cond);
                (void)n;
            }

            ~Cond(){
                int n = ::pthread_cond_destroy(&_cond);
                (void)n;
            }
        
        private:
            pthread_cond_t _cond;
    };
}

2.7 示例用法

Mutex mutex;
Cond cond;
std::queue<int> buffer;

// 生产者线程
void producer() {
    mutex.Lock();
    while (buffer.full()) {
        cond.Wait(mutex); // 等待缓冲区非满
    }
    buffer.push(1);
    cond.Notify(); // 通知消费者
    mutex.Unlock();
}

// 消费者线程
void consumer() {
    mutex.Lock();
    while (buffer.empty()) {
        cond.Wait(mutex); // 等待缓冲区非空
    }
    buffer.pop();
    cond.Notify(); // 通知生产者
    mutex.Unlock();
}

🏳️‍🌈三、POSIX 信号量

POSIX信号量SystemV信号量 作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。但POSIX可以用于线程间同步。

因此我们可以封装一个 Sem类 来统筹管理 POSIX信号量

扮演 线程同步协调者 的角色,通过 信号量机制 精准控制生产与消费的节奏,确保缓冲区的线程安全与高效运作。其设计不仅简化了复杂的同步逻辑,还通过 封装 和 *RAII机制 *提升了代码的可靠性和可维护性,是多线程编程中资源同步的典范实现。

3.1 Sem类核心功能

用于简化 POSIX 信号量的使用。其主要功能如下:

  • 初始化:创建并初始化信号量。
  • P 操作​(P()):申请资源(信号量减 1),若资源不足则阻塞。
  • V 操作​(V()):释放资源(信号量加 1),唤醒等待线程。
  • 销毁:清理信号量资源。

3.2 设计意义

  1. 简化信号量操作
  • 封装底层 API:将 sem_initsem_waitsem_postsem_destroy 封装为类方法,隐藏实现细节。
  • 统一接口:通过 P()V() 提供直观的语义(“申请”和“释放”)。
  1. 提高代码可维护性
    ​RAII 机制:构造函数初始化资源,析构函数自动释放,避免资源泄漏。
{
    Sem sem(5); // 信号量初始化
    sem.P();    // 使用信号量
}               // 作用域结束自动调用 ~Sem()
  1. 支持多种同步场景
    通过调整初始值 value,可适配不同场景:
  • 互斥锁(Mutex)​:value = 1(二元信号量)。
  • 资源池:value = N(表示最多允许 N 个线程同时访问资源)。
  • 生产者-消费者模型:通过两个信号量分别控制缓冲区空间和数据数量。

3.3 Sem类封装

#pragma once
#include <semaphore.h>

namespace SemModule{
    int defaultsemval = 1;
    class Sem{
        public:
            Sem(int value = defaultsemval) : _init_value(value){
                sem_init(&sem, 0, value);
            }

            void P(){
                ::sem_wait(&sem);
            }

            void V(){
                ::sem_post(&sem);
            }

            ~Sem(){
                ::sem_destroy(&sem);
            }

        private:
            sem_t sem;
            int _init_value;
    };
}

🏳️‍🌈四、环形队列的逻辑及实现

在这里插入图片描述

4.1 核心设计

这是一个基于 ​信号量(Semaphore)​ 和 ​互斥锁(Mutex)​ 的线程安全环形队列(Ring Buffer),用于实现 ​生产者-消费者模型。通过以下机制确保线程安全:

​信号量控制:空间信号量(_spacesem)控制可用空间,数据信号量(_datasem)控制可消费数据。
​互斥锁保护:生产者和消费者各自拥有独立的锁(_p_lock_c_lock),支持多生产者和多消费者并发操作。

4.2 成员变量

在这里插入图片描述

4.3 核心接口

4.3.1 构造函数

RingBuffer(int cap)
    : _ring(cap),         // 初始化缓冲区容量
      _cap(cap),          // 记录总容量
      _p_step(0),         // 生产者起始位置
      _c_step(0),         // 消费者起始位置
      _datasem(0),        // 初始无可消费数据
      _spacesem(cap) {}   // 初始空间=总容量

作用:初始化缓冲区及相关同步机制

4.3.2Equeue(生产者接口)

void Equeue(const T &in) {
    _spacesem.P();        // 等待可用空间(信号量-1)
    {
        LockGuard lockguard(_p_lock);  // 加生产者锁
        _ring[_p_step] = in;           // 写入数据
        _p_step = (_p_step + 1) % _cap; // 环形移动
    }
    _datasem.V();         // 增加可消费数据(信号量+1)
}

流程

  1. 申请空间:通过 _spacesem.P() 等待缓冲区有空位。
  2. 生产数据:在锁保护下写入数据并更新生产者索引。
  3. 通知消费者:通过 _datasem.V() 增加可消费数据数量。

4.3.3 Pop(消费者接口)​

void Pop(T *out) {
    _datasem.P();         // 等待可消费数据(信号量-1)
    {
        LockGuard lockguard(_c_lock);  // 加消费者锁
        *out = _ring[_c_step];         // 读取数据
        _c_step = (_c_step + 1) % _cap; // 环形移动
    }
    _spacesem.V();        // 增加可用空间(信号量+1)
}

流程

  1. ​申请数据:通过 _datasem.P() 等待缓冲区有数据。
  2. 消费数据:在锁保护下读取数据并更新消费者索引。
  3. 通知生产者:通过 _spacesem.V() 增加可用空间数量。

4.4 同步机制

4.4.1 信号量控制

在这里插入图片描述

  • ​生产者阻塞条件_spacesem 为0时(缓冲区满)。
  • 消费者阻塞条件_datasem 为0时(缓冲区空)。

4.4.2 互斥锁保护

在这里插入图片描述

4.5 整体代码

#include "Cond.hpp"
#include "Mutex.hpp"
#include "Sem.hpp"

#include <vector>
#include <pthread.h>


namespace RingBufferModule{

    using namespace LockModule;
    using namespace CondModule;
    using namespace SemModule;

    template<typename T>
    class RingBuffer{
        public:
            RingBuffer(int cap)
                : _ring(cap),       // 初始化缓冲区容量
                _cap(cap),          // 记录总容量
                _p_step(0),         // 生产者起始位置
                _c_step(0),         // 消费者起始位置
                _datasem(0),        // 初始无可消费数据
                _spacesem(cap)      // 初始空间=总容量
            {}

            // 生产者接口
            void Equeue(const T &in) {
                _spacesem.P();        // 等待可用空间(信号量-1)
                {
                    LockGuard lockguard(_p_lock);  // 加生产者锁
                    _ring[_p_step] = in;           // 写入数据
                    _p_step = (_p_step + 1) % _cap; // 环形移动
                }
                _datasem.V();         // 增加可消费数据(信号量+1)
            }

            // 消费者接口
            void Pop(T *out) {
                _datasem.P();         // 等待可消费数据(信号量-1)
                {
                    LockGuard lockguard(_c_lock);  // 加消费者锁
                    *out = _ring[_c_step];         // 读取数据
                    _c_step = (_c_step + 1) % _cap; // 环形移动
                }
                _spacesem.V();        // 增加可用空间(信号量+1)
            }

            // 析构函数
            ~RingBuffer(){}

        private:
            std::vector<T> _ring;      // 环,临界资源
            int _cap;                   // 环的容量 
            int _p_step;                // 生产者指针位置
            int _c_step;                // 消费者指针位置

            Mutex _p_lock;              // 生产者锁
            Mutex _c_lock;              // 消费者锁

            Sem _datasem;               // 数据信号量
            Sem _spacesem;              // 空间信号量
    };
}

4.6 测试代码

#include "RingBuffer.hpp"
#include <pthread.h>
#include <unistd.h>
#include <ctime>

using namespace RingBufferModule;

void *Consumer(void *args)
{
    RingBuffer<int> *ring_buffer = static_cast<RingBuffer<int> *>(args);
    while(true)
    {
        sleep(1);
        // sleep(1);
        // 1. 消费数据
        int data;
        ring_buffer->Pop(&data);

        // 2. 处理:花时间
        std::cout << "消费了一个数据: " << data << std::endl;
    }
}

void *Productor(void *args)
{
    RingBuffer<int> *ring_buffer = static_cast<RingBuffer<int> *>(args);
    int data = 0;
    while (true)
    {
        // 1. 获取数据:花时间
        // sleep(1);

        // 2. 生产数据
        ring_buffer->Equeue(data);
        std::cout << "生产了一个数据: " << data << std::endl;
        data++;
    }
}

int main()
{
    RingBuffer<int> *ring_buffer = new RingBuffer<int>(5); // 共享资源 -> 临界资源
    // 单生产,单消费
    pthread_t c1, p1, c2,c3,p2;
    pthread_create(&c1, nullptr, Consumer, ring_buffer);
    pthread_create(&c2, nullptr, Consumer, ring_buffer);
    pthread_create(&c3, nullptr, Consumer, ring_buffer);
    pthread_create(&p1, nullptr, Productor, ring_buffer);
    pthread_create(&p2, nullptr, Productor, ring_buffer);


    pthread_join(c1, nullptr);
    pthread_join(c2, nullptr);
    pthread_join(c3, nullptr);
    pthread_join(p1, nullptr);
    pthread_join(p2, nullptr);


    delete ring_buffer;

    return 0;
}

👥总结

本篇博文对 【Linux】条件变量封装类及环形队列的实现 做了一个较为详细的介绍,不知道对你有没有帮助呢

觉得博主写得还不错的三连支持下吧!会继续努力的~

请添加图片描述

http://www.dtcms.com/a/111946.html

相关文章:

  • mybatis慢sql无所遁形
  • 学透Spring Boot — 009. Spring Boot的四种 Http 客户端
  • 科技赋能安居梦:中建海龙以模块化革新重塑城市更新范式
  • 多输入多输出 | Matlab实现BO-GRU贝叶斯优化门控循环单元多输入多输出预测
  • 图解AUTOSAR_SWS_LINStateManager
  • prism WPF 模块
  • #SVA语法滴水穿石# (003)关于 sequence 和 property 的区别和联系
  • Mysql 中有哪些日志结构?
  • LeetCode 687 -- 二叉树
  • HTML5+CSS3+JS小实例:带滑动指示器的导航图标
  • 开源且完全没有审核限制的大型语言模型的概述
  • 电力载波单灯控制器:智能照明的关键技术
  • AQS 等待队列中的线程自旋多少次后挂起?
  • 为PXIe控制器配置NI Linux实时操作系统安装软件
  • #python项目生成exe相关了解
  • C++20新增内容
  • 前端页面鼠标移动监控(鼠标运动、鼠标监控)鼠标节流处理、throttle、限制触发频率(setTimeout、clearInterval)
  • 表结构数据的基本特征、获取、加工与使用
  • Java 状态模式 详解
  • 金融机构开源软件生命周期管理实务
  • 模组COF受损制程排查验证及改善
  • 从文本到多模态:如何将RAG扩展为支持图像+文本检索的增强生成系统?
  • 基于 docker 的 Xinference 全流程部署指南
  • shell语言替换脚本、填补整个命令行
  • 知识考量码【蓝桥】
  • leetcode-代码随想录-链表-翻转链表
  • 框架PasteForm实际开发案例,换个口味显示数据,支持echarts,只需要标记几个特性即可在管理端显示(2)
  • Python办公自动化(2)对wordpdf的操作
  • 青少年编程与数学 02-015 大学数学知识点 04课题、微积分
  • 如何判断多个点组成的3维面不是平的,如果不是平的,如何拆分成多个平面