基于环形队列与信号量的生产者-消费者模型深度解析与实现
目录
一、空间资源与数据资源:基于环形队列的生产者 - 消费者模型深度解析
1、生产者与消费者关注资源的差异
2、信号量初始值设置
3、生产者与消费者的资源申请和释放机制
(一)生产者:申请空间资源,释放数据资源
申请空间资源
释放数据资源
(二)消费者:申请数据资源,释放空间资源
申请数据资源
释放空间资源
4、必须遵守的两个关键规则
(一)禁止同时访问同一位置
(二)避免相互套圈超过一圈
5、数组模拟与环状特性实现
6、空满状态判断难题与解决方案
(一)判断难题
(二)解决方案
1. 加计数器或标记位
加计数器
标记位
2. 预留空位置
7、信号量在多线程同步中的优势
(一)空间资源信号量(blank_sem)
(二)数据资源信号量(data_sem)
二、代码实现
1、环形队列类实现 (RingQueue.hpp)
相关说明
具体机制
线程安全设计
实现说明
2、生产者-消费者模型实现
相关说明
实现说明
生产者与消费者保持同步节奏
生产速度超过消费速度
生产速度较慢,而消费速度较快
三、信号量保护环形队列的原理
1、数据不一致的产生条件
2、信号量对特殊情况的保障机制
3、正常情况下的并发执行
四、信号量封装类 Sem和环形队列类 RingQueue的结合使用
1、信号量封装类 Sem
详细讲解
2、环形队列类 RingQueue
详细讲解
3、引入之前写的MutexModule命名空间再对环形队列类 RingQueue进行封装
封装说明
一、空间资源与数据资源:基于环形队列的生产者 - 消费者模型深度解析
上一节生产者 - 消费者的例子是基于queue的,其空间可以动态分配,现在基于固定大小的环形队列重写这个程序(POSIX信号量)。环形队列作为一种高效的数据结构,在多线程编程中广泛应用于生产者 - 消费者模型等场景,以实现数据的缓冲和同步处理。在生产者 - 消费者模型中,生产者和消费者所关注的资源存在显著差异,这种差异主要体现在对环形队列中不同类型资源的关注上:

1、生产者与消费者关注资源的差异
生产者和消费者在环形队列这一共享资源结构中,有着截然不同的关注点。生产者主要聚焦于环形队列中的空间资源,而消费者则更关心其中的数据资源。具体而言:
-
生产者视角:生产者时刻关注环形队列是否存在可用的空间(blank)。只要环形队列中有空闲位置,即存在空间资源,生产者就能够启动生产流程,将新的数据放入队列。
-
消费者视角:消费者着重留意环形队列中是否已经填充了数据(data)。一旦队列中有数据存在,消费者便可以开始消费操作,从队列中取出数据进行处理。
2、信号量初始值设置
为了精准管理环形队列中的空间资源和数据资源,我们引入信号量机制,分别用 blank_sem 和 data_sem 来描述空间资源和数据资源。在初始化这两个信号量时,需要根据环形队列的初始状态赋予它们不同的初始值:
-
blank_sem初始值:将其初始值设置为环形队列的容量。这是因为在系统启动之初,环形队列尚未被任何数据填充,所有位置均为空闲状态,即全是可用的空间资源。 -
data_sem初始值:将其初始值设定为 0。由于初始时环形队列中没有数据,所以代表数据资源的data_sem初始值为 0,表示没有可消费的数据。
3、生产者与消费者的资源申请和释放机制
(一)生产者:申请空间资源,释放数据资源
生产者在整个生产过程中,遵循特定的资源申请和释放规则:
申请空间资源
-
每次生产数据前,生产者必须先申请
blank_sem。 -
这一操作类似于获取进入生产区域的许可。若
blank_sem的值不为 0,表明环形队列中存在空闲空间,信号量申请成功,生产者可以立即开展生产操作,将数据写入队列的空闲位置。 -
反之,若
blank_sem的值为 0,意味着队列已满,没有可用的空间资源,此时信号量申请失败,生产者需要在blank_sem的等待队列中进入阻塞等待状态,直到有其他消费者消费数据,为环形队列腾出新的空间,生产者才会被唤醒并重新尝试申请信号量。
释放数据资源
-
当生产者成功完成数据生产后,需要对数据资源进行释放操作。
-
这里需要注意的是,尽管生产者在生产前对
blank_sem执行了 P 操作(申请操作),但在生产完成后,应针对data_sem执行 V 操作(释放操作),而非再次操作blank_sem。 -
这是因为生产者在生产前获取的是一个空闲位置(blank 位置),经过生产操作后,该位置被新生产的数据填充,在消费者未消费之前,此位置已转变为数据位置(data 位置)。
-
所以,生产者完成生产意味着环形队列中新增了一个可消费的数据位置,因此要对
data_sem进行 V 操作,增加可消费的数据资源数量。
(二)消费者:申请数据资源,释放空间资源
消费者在消费过程中同样遵循一套严谨的资源管理规则:
申请数据资源
-
每次消费数据前,消费者需要先申请
data_sem。这相当于获取消费数据的权限。 -
若
data_sem的值不为 0,说明环形队列中有可供消费的数据,信号量申请成功,消费者可以立即从队列中取出数据进行消费。 -
若
data_sem的值为 0,则表示队列中没有数据,信号量申请失败,消费者需要在data_sem的等待队列中阻塞等待,直到有生产者生产新的数据,使队列中有数据可供消费,消费者才会被唤醒并重新申请信号量。
释放空间资源
-
当消费者完成数据消费后,要对空间资源进行释放。
-
虽然消费者在消费前对
data_sem执行了 P 操作,但消费完成后,应针对blank_sem执行 V 操作,而不是data_sem。 -
这是因为消费者在消费前获取的是一个数据位置(data 位置),消费操作完成后,该位置的数据已被处理,再次消费已无意义。
-
为了让生产者能够在该位置继续生产新的数据,需要将此位置重新视为空闲位置(blank 位置)。
-
所以,消费者完成消费意味着环形队列中新增了一个可用的空闲位置,因此要对
blank_sem进行 V 操作,增加可用的空间资源数量。
4、必须遵守的两个关键规则
在基于环形队列的生产者和消费者模型中,为确保系统的正常运行和数据的准确性,生产者和消费者必须严格遵守以下两个重要规则:
(一)禁止同时访问同一位置
生产者和消费者在访问环形队列时,必须避免对同一位置进行操作。具体来说:
-
如果生产者和消费者同时访问环形队列中的同一个位置,这就相当于两者同时对这一关键的临界资源进行操作,极有可能引发数据不一致、数据冲突等严重问题。
-
例如,生产者正在向某个位置写入数据,而消费者同时从该位置读取数据,这将导致消费者读取到不完整或错误的数据,进而影响整个系统的稳定性和可靠性。
-
相反,如果生产者和消费者访问的是环形队列中的不同位置,那么它们就可以同时进行生产和消费操作。
-
在这种情况下,由于两者操作的对象不同,不会产生数据冲突,系统能够高效地运行,实现生产者和消费者的并行工作,提高整体的处理效率。

(二)避免相互套圈超过一圈
生产者和消费者在环形队列中的操作顺序和速度需要保持一定的协调性,避免出现一方将另一方套圈超过一圈的情况:
-
对于生产者而言,它从消费者的位置开始,按照顺时针方向进行生产。如果生产者的生产速度远快于消费者的消费速度,当生产者绕着环形队列生产了一圈数据后再次遇到消费者时,此时生产者应立即停止生产。
-
因为如果继续生产,新生产的数据将会覆盖环形队列中尚未被消费者消费的旧数据,导致数据丢失,影响系统的正常运行。
-
同理,消费者从生产者的位置开始,按顺时针方向进行消费。若消费者的消费速度比生产者的生产速度快,当消费者绕着环形队列消费了一圈数据后再次遇到生产者时,消费者也应停止消费。
-
否则,继续消费将会读取到环形队列中已经废弃(或者已经被读为空)的数据,这些数据可能是生产者后续生产过程中已经更新或不再有效的数据,从而引发数据处理错误。

通过严格遵循上述资源关注差异、信号量管理机制以及两个关键规则,基于环形队列的生产者和消费者模型能够实现高效、稳定的数据生产和消费过程,确保系统在多任务环境下的正确性和可靠性。
5、数组模拟与环状特性实现
-
环形队列通常借助数组来进行模拟实现,利用模运算巧妙地模拟出环状特性。
-
在数组实现中,设定一个固定大小的数组作为队列的存储空间,同时使用两个指针(或索引)分别表示队列的头(front)和尾(rear)。
-
当指针到达数组的末尾时,通过模运算将其重新指向数组的开头,从而实现循环使用数组空间的效果。
-
例如,假设数组的大小为
size,当前尾指针为rear,当需要向后移动尾指针以插入新元素时,新的尾指针位置可以通过(rear + 1) % size来计算。 -
这种模运算的方式使得指针在数组范围内循环移动,完美地模拟了环形队列的环状特性,避免了数组空间的浪费,提高了空间利用率。

6、空满状态判断难题与解决方案
(一)判断难题
环形结构具有一个特殊的问题,即其起始状态和结束状态在表现形式上可能是一样的。例如,当队列为空时,头指针和尾指针可能指向同一个位置;而当队列满时,头指针和尾指针也可能指向同一个位置。这就导致仅通过头尾指针的位置关系,很难准确判断队列是处于空状态还是满状态,给程序的设计和实现带来了困扰。
(二)解决方案
1. 加计数器或标记位
为了准确判断环形队列的空满状态,可以采用加计数器或者标记位的方法。
加计数器
-
引入一个计数器变量,用于记录队列中当前存储的元素数量。
-
当有元素入队时,计数器加 1;当有元素出队时,计数器减 1。
-
通过判断计数器的值是否为 0,可以确定队列是否为空;通过判断计数器的值是否等于队列的最大容量,可以确定队列是否已满。
-
这种方法简单直观,但需要额外的变量来维护计数器,增加了程序的复杂度。
标记位
-
设置一个标记位变量,用于记录上一次操作是入队还是出队。
-
当进行入队操作时,将标记位设置为入队状态;当进行出队操作时,将标记位设置为出队状态。
-
在判断队列空满状态时,结合头尾指针的位置关系和标记位的状态来进行综合判断。
-
例如,如果头尾指针指向同一位置,且标记位为入队状态,则说明队列已满;如果头尾指针指向同一位置,且标记位为出队状态,则说明队列为空。这种方法相对复杂,但不需要额外的计数器变量。

2. 预留空位置
-
另一种常见的解决方案是预留一个空的位置,作为队列满的状态标识。即队列的实际可用空间比数组的大小少 1。
-
在这种情况下,当尾指针的下一个位置(通过模运算计算)等于头指针时,认为队列已满;当头指针和尾指针相等时,认为队列为空。
-
这种方法简单易行,不需要额外的计数器或标记位变量,但会牺牲一部分数组空间。
7、信号量在多线程同步中的优势
在我们所讨论的场景中,引入了信号量这一强大的同步机制,使得多线程间的同步过程变得简单而高效。信号量本质上是一种计数器,用于控制对共享资源的访问。在环形队列的多线程应用中,信号量可以准确地记录队列中的空闲空间数量和可用数据数量。
(一)空间资源信号量(blank_sem)
-
用于表示环形队列中的空闲空间数量。
-
初始时,将其值设置为队列的最大容量(减去预留的空位置,如果采用预留空位置的方法)。
-
生产者线程在向队列中插入数据之前,需要先申请
blank_sem。 -
如果
blank_sem的值大于 0,说明队列中有空闲空间,生产者可以成功插入数据,并将blank_sem的值减 1;如果blank_sem的值为 0,说明队列已满,生产者线程将被阻塞,直到有其他线程释放了空闲空间(即消费者线程从队列中取出了数据,增加了空闲空间数量)。
(二)数据资源信号量(data_sem)
-
用于表示环形队列中的可用数据数量。
-
初始时,将其值设置为 0。消费者线程在从队列中取出数据之前,需要先申请
data_sem。 -
如果
data_sem的值大于 0,说明队列中有可用数据,消费者可以成功取出数据,并将data_sem的值减 1;如果data_sem的值为 0,说明队列为空,消费者线程将被阻塞,直到有其他线程向队列中插入了数据(即生产者线程插入了数据,增加了可用数据数量)。
通过使用信号量,生产者和消费者线程可以自动地进行同步和协调,避免了复杂的条件判断和手动同步操作,大大简化了多线程程序的设计和实现,提高了程序的可靠性和性能。
综上所述,环形队列采用数组模拟并结合模运算实现环状特性,通过加计数器、标记位或预留空位置等方法解决空满状态判断难题,而信号量的引入则为多线程间的同步提供了简洁而有效的解决方案。这些技术手段的结合,使得环形队列在多线程编程中能够发挥重要作用,实现高效的数据处理和同步。
二、代码实现
1、环形队列类实现 (RingQueue.hpp)
#pragma once#include <iostream>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>
#include <vector>
#include <cstdlib>
#include <ctime>#define DEFAULT_CAPACITY 8 // 默认队列容量template<class T>
class RingQueue
{
private:// 信号量P操作(等待)void P(sem_t& s) {sem_wait(&s);}// 信号量V操作(释放)void V(sem_t& s) {sem_post(&s);}public:// 构造函数,可指定队列容量explicit RingQueue(int cap = DEFAULT_CAPACITY): _cap(cap), _p_pos(0), _c_pos(0){_q.resize(_cap); // 调整vector大小为指定容量// 初始化信号量sem_init(&_blank_sem, 0, _cap); // 初始空间资源=队列容量sem_init(&_data_sem, 0, 0); // 初始数据资源=0}// 析构函数~RingQueue() {sem_destroy(&_blank_sem);sem_destroy(&_data_sem);}// 向环形队列插入数据(生产者调用)void Push(const T& data) {P(_blank_sem); // 等待有空闲空间// 临界区开始 - 实际写入数据_q[_p_pos] = data;// 临界区结束V(_data_sem); // 通知有新数据可用// 更新生产位置(非临界区操作)_p_pos = (_p_pos + 1) % _cap;}// 从环形队列获取数据(消费者调用)void Pop(T& data) {P(_data_sem); // 等待有数据可用// 临界区开始 - 实际读取数据data = _q[_c_pos];// 临界区结束V(_blank_sem); // 通知有空闲空间// 更新消费位置(非临界区操作)_c_pos = (_c_pos + 1) % _cap;}private:std::vector<T> _q; // 用vector实现的环形队列存储空间int _cap; // 队列容量上限int _p_pos; // 生产位置(仅由生产者线程修改)int _c_pos; // 消费位置(仅由消费者线程修改)sem_t _blank_sem; // 空闲空间信号量sem_t _data_sem; // 数据信号量
};
相关说明
-
环形队列默认容量上限为8。该实现基于vector结构,生产者和消费者分别通过p_pos和c_pos下标访问数据。
具体机制
-
生产者将数据存入vector的p_pos位置,随后递增p_pos并对容量取模,实现环形队列效果
-
消费者从vector的c_pos位置获取数据,随后递增c_pos并对容量取模,实现环形队列效果
线程安全设计
-
p_pos仅由生产者线程更新
-
c_pos仅由消费者线程更新
-
为确保最小化临界区代码,p_pos和c_pos的更新操作均放置在V操作之后执行
实现说明
模板类设计:
-
使用模板类使得队列可以存储任意类型的数据
-
默认队列容量为8,可通过构造函数参数指定
信号量操作:
-
P()和V()方法封装了信号量的等待和释放操作 -
使用POSIX信号量实现线程同步
线程安全机制:
-
_blank_sem:表示空闲空间数量,初始值为队列容量 -
_data_sem:表示可用数据数量,初始值为0 -
通过信号量自动实现生产者和消费者的同步
环形特性实现:
-
生产位置(
_p_pos)和消费位置(_c_pos)通过取模运算实现环形移动 -
位置更新操作放在信号量操作之后,减少临界区范围
2、生产者-消费者模型实现
我们采用单生产者-单消费者模型进行演示。主函数只需创建两个线程:一个生产者线程持续向环形队列生成数据,一个消费者线程则不断从队列中取出数据进行处理。
#include "RingQueue.hpp"// 生产者线程函数
void* Producer(void* arg) {RingQueue<int>* rq = static_cast<RingQueue<int>*>(arg);while (true) {sleep(1); // 模拟生产耗时int data = rand() % 100 + 1; // 生成1-100的随机数rq->Push(data);std::cout << "生产者生产: " << data << std::endl;}return nullptr;
}// 消费者线程函数
void* Consumer(void* arg) {RingQueue<int>* rq = static_cast<RingQueue<int>*>(arg);while (true) {sleep(1); // 模拟消费耗时int data = 0;rq->Pop(data);std::cout << "消费者消费: " << data << std::endl;}return nullptr;
}int main() {// 初始化随机数种子srand(static_cast<unsigned int>(time(nullptr)));// 创建线程和环形队列pthread_t producer_tid, consumer_tid;RingQueue<int>* rq = new RingQueue<int>();// 创建生产者消费者线程pthread_create(&producer_tid, nullptr, Producer, rq);pthread_create(&consumer_tid, nullptr, Consumer, rq);// 等待线程结束(实际上会一直运行)pthread_join(producer_tid, nullptr);pthread_join(consumer_tid, nullptr);// 清理资源(实际上不会执行到这里)delete rq;return 0;
}
相关说明
-
环形队列需要同时被生产者和消费者线程访问,因此在创建这两个线程时,必须将环形队列作为参数传递给它们的执行函数。
-
具体实现中,生产者线程负责将生成的随机数存入队列(Push操作),消费者线程则从队列中取出数据(Pop操作)。
-
为了便于调试和监控,我们可以在代码中打印显示生产者生成的数据和消费者获取的数据。
实现说明
线程函数设计:
-
生产者线程:每秒生成一个1-100的随机数并推入队列
-
消费者线程:每秒从队列取出一个数据并消费
-
两者都通过
RingQueue对象进行同步操作
主程序流程:
-
初始化随机数种子
-
创建环形队列实例
-
创建生产者和消费者线程,将队列对象作为参数传递
-
等待线程结束(实际会一直运行)
-
清理资源(示例中不会执行到这里)
同步机制:
-
当队列满时,生产者会被
_blank_sem阻塞 -
当队列空时,消费者会被
_data_sem阻塞 -
自动实现生产-消费的同步
生产者与消费者保持同步节奏
由于代码设定生产者每秒生成一个数据,消费者也每秒处理一个数据,因此程序运行时两者的执行频率完全匹配。

生产速度超过消费速度
实现方案:让生产者持续生产(去掉生产者的sleep(1)函数,消费者不变),同时控制消费者以每秒一次的频率进行消费
-
此时生产者生产速度极快,代码运行后立即填满环形队列。当生产者试图继续生产时,发现可用空间已耗尽,此时它只能在blank_sem的等待队列中阻塞等待。
-
只有当消费者消费完一个数据并对blank_sem执行V操作后,生产者才会被唤醒继续生产。
-
由于生产者持续保持高速生产状态,每次完成一个数据生产后又立即陷入等待,最终生产者和消费者的执行节奏将重新趋于同步。

生产速度较慢,而消费速度较快
实现方案:我们还可以调整节奏,让生产者每秒生产一次,同时消费者持续进行消费。(跟上面反过来)
-
当环形队列初始为空时,消费者因没有可用数据而被迫在data_sem的等待队列中阻塞。
-
只有当生产者完成数据生产并对data_sem执行V操作后,消费者才会被唤醒开始消费。
-
由于消费者处理速度极快,每消费完一个数据后就会再次进入等待状态,这使得生产者和消费者的操作节奏最终趋于同步。

三、信号量保护环形队列的原理
在采用 blank_sem(表示空闲空间数量的信号量)和 data_sem(表示已存储数据数量的信号量)这两个信号量对环形队列进行保护后,能够确保该环形队列在整个运行过程中不会出现数据不一致的问题。下面将详细阐述其工作原理。
1、数据不一致的产生条件
数据不一致问题通常出现在生产者和消费者同时指向环形队列的同一位置并进行访问操作时。具体而言,在对环形队列进行写入或读取数据的过程中,生产者和消费者指向同一位置的情况主要发生在以下两种特定场景:
-
环形队列为空时:此时队列中没有可消费的数据,若消费者和生产者同时操作同一位置,可能导致消费者读取到无效数据或生产者覆盖了本应由消费者读取的位置。
-
环形队列为满时:此时队列中没有可用的空闲空间,若生产者和消费者同时操作同一位置,可能导致生产者写入的数据被消费者错误地读取,或者消费者读取后生产者又覆盖了已读取的数据。
2、信号量对特殊情况的保障机制
然而,在上述这两种关键情况下,信号量机制能够有效地阻止生产者和消费者同时对环形队列进行访问:
-
当环形队列为空时:消费者尝试进行消费操作的前提是存在可消费的数据。由于此时
data_sem信号量的值为 0,表示没有已存储的数据可供消费。因此,消费者在尝试获取data_sem信号量时会陷入阻塞状态,无法继续执行消费操作,从而避免了在队列为空时对队列的无效访问,保证了数据的一致性。 -
当环形队列为满时:生产者尝试进行生产操作的前提是存在可用的空闲空间。由于此时
blank_sem信号量的值为 0,表示没有空闲空间可供存储新数据。因此,生产者在尝试获取blank_sem信号量时会陷入阻塞状态,无法继续执行生产操作,从而避免了在队列为满时对队列的无效写入,保证了数据的一致性。
3、正常情况下的并发执行
-
通过信号量对环形队列为空和为满这两种特殊情况的严格控制和串行化处理,确保了在关键节点上不会出现生产者和消费者同时访问同一位置的情况。
-
而在除了这两种特殊情况之外的大多数情况下,生产者和消费者所指向的环形队列位置是不同的。
-
例如,当环形队列中存在部分数据且还有部分空闲空间时,生产者可以将新数据写入空闲位置,而消费者可以从已有数据的位置读取数据进行消费。
-
由于这两个操作所涉及的位置不同,不会产生数据冲突,因此该环形队列可以安全地让生产者和消费者并发地执行各自的操作。这种并发执行机制能够充分利用系统资源,提高数据处理的效率。
综上所述,借助 blank_sem 和 data_sem 这两个信号量对环形队列的精心保护,能够有效地避免数据不一致问题的发生,同时还能在大多数情况下实现生产者和消费者的并发执行,从而在保证数据安全性的前提下提高了系统的整体性能。
四、信号量封装类 Sem和环形队列类 RingQueue的结合使用
1、信号量封装类 Sem
#pragma once
#include <iostream>
#include <semaphore.h>// 对信号量进行简单封装,便于使用
class Sem
{
public:// 构造函数,初始化信号量,n 为信号量的初始值Sem(int n){sem_init(&_sem, 0, n);}// P 操作(wait 操作),对信号量进行减 1 操作,若信号量值为 0 则阻塞void P(){sem_wait(&_sem);}// V 操作(post 操作),对信号量进行加 1 操作void V(){sem_post(&_sem);}// 析构函数,销毁信号量~Sem(){sem_destroy(&_sem);}private:sem_t _sem; // 信号量变量
};
详细讲解
-
#pragma once:防止头文件被重复包含,确保在同一个编译单元中该头文件只被包含一次。 -
#include <semaphore.h>:引入 POSIX 信号量相关的头文件,以便使用信号量相关的函数。 -
Sem类对 POSIX 信号量进行了简单的封装,提供了更直观的接口。-
Sem(int n):构造函数,使用sem_init函数初始化信号量,参数0表示该信号量在线程间共享,n为信号量的初始值。 -
P():对应sem_wait函数,当信号量的值大于 0 时,将其减 1 并立即返回;当信号量的值为 0 时,线程会被阻塞,直到信号量的值大于 0。 -
V():对应sem_post函数,将信号量的值加 1,如果有线程因为该信号量的值为 0 而被阻塞,那么其中一个线程会被唤醒。 -
~
Sem():析构函数,使用sem_destroy函数销毁信号量,释放相关资源。
-
2、环形队列类 RingQueue
#pragma once
#include <iostream>
#include <string>
#include <vector>
#include <semaphore.h>
#include <pthread.h>// 模板类,可以支持多种数据类型的环形队列
template<typename T>
class RingQueue
{
private:// 锁的封装函数,方便加锁和解锁操作void Lock(pthread_mutex_t &mutex){pthread_mutex_lock(&mutex);}void Unlock(pthread_mutex_t &mutex){pthread_mutex_unlock(&mutex);}public:// 构造函数,初始化环形队列的各个成员变量RingQueue(int cap): _ring_queue(cap),_cap(cap),_room_sem(cap), // 初始时,队列中有 cap 个空闲空间_data_sem(0), // 初始时,队列中没有数据_productor_step(0),_consumer_step(0){// 初始化生产者和消费者的互斥锁pthread_mutex_init(&_productor_mutex, nullptr);pthread_mutex_init(&_consumer_mutex, nullptr);}// 生产者入队操作void Enqueue(const T &in){// 先申请空闲空间信号量,确保有空间可以生产数据_room_sem.P();// 对生产者加锁,保证多个生产者之间的互斥访问Lock(_productor_mutex);// 一定有空间,进行生产操作_ring_queue[_productor_step++] = in; // 将数据放入队列,并移动生产者下标_productor_step %= _cap; // 确保下标在队列容量范围内循环// 解锁Unlock(_productor_mutex);// 释放数据信号量,通知消费者有数据可消费_data_sem.V();}// 消费者出队操作void Pop(T *out){// 先申请数据信号量,确保有数据可以消费_data_sem.P();// 对消费者加锁,保证多个消费者之间的互斥访问Lock(_consumer_mutex);// 从队列中取出数据,并移动消费者下标*out = _ring_queue[_consumer_step++];_consumer_step %= _cap; // 确保下标在队列容量范围内循环// 解锁Unlock(_consumer_mutex);// 释放空闲空间信号量,通知生产者有空间可以生产_room_sem.V();}// 析构函数,销毁互斥锁~RingQueue(){pthread_mutex_destroy(&_productor_mutex);pthread_mutex_destroy(&_consumer_mutex);}private:// 1. 环形队列,使用 vector 存储数据std::vector<T> _ring_queue;int _cap; // 环形队列的容量上限// 2. 生产者和消费者的下标,用于记录当前生产和消费的位置int _productor_step;int _consumer_step;// 3. 定义信号量Sem _room_sem; // 生产者关心,表示空闲空间数量Sem _data_sem; // 消费者关心,表示已存储数据数量// 4. 定义锁,维护多生产多消费之间的互斥关系pthread_mutex_t _productor_mutex; // 生产者互斥锁pthread_mutex_t _consumer_mutex; // 消费者互斥锁
};
详细讲解
头文件引入:
-
#include <iostream>、#include <string>、#include <vector>:引入标准库的头文件,用于输入输出、字符串处理和动态数组的使用。 -
#include <semaphore.h>和#include <pthread.h>:引入 POSIX 信号量和线程相关的头文件,以便使用信号量和互斥锁。
模板类:template<typename T>:定义模板类,使得 RingQueue 可以支持多种数据类型,提高了代码的复用性。
锁的封装函数:Lock(pthread_mutex_t &mutex) 和 Unlock(pthread_mutex_t &mutex):对 pthread_mutex_lock 和 pthread_mutex_unlock 函数进行封装,简化了加锁和解锁的操作。
构造函数:
-
初始化环形队列的各个成员变量,包括
_ring_queue、_cap、_room_sem、_data_sem、_productor_step和_consumer_step。 -
使用
pthread_mutex_init函数初始化生产者和消费者的互斥锁,nullptr表示使用默认的互斥锁属性。
生产者入队操作 Enqueue:
-
调用
_room_sem.P()申请空闲空间信号量,确保队列中有足够的空间可以生产数据。如果没有空闲空间,生产者会被阻塞,直到有消费者消费了数据,释放了空闲空间。 -
调用
Lock(_productor_mutex)对生产者加锁,保证多个生产者之间的互斥访问,避免多个生产者同时操作队列导致数据混乱。 -
将数据
in放入队列的_productor_step位置,然后移动生产者下标,并使用取模运算确保下标在队列容量范围内循环。 -
调用
Unlock(_productor_mutex)解锁。 -
调用
_data_sem.V()释放数据信号量,通知消费者有新的数据可以消费。
消费者出队操作 Pop:
-
调用
_data_sem.P()申请数据信号量,确保队列中有数据可以消费。如果没有数据,消费者会被阻塞,直到有生产者生产了数据,释放了数据信号量。 -
调用
Lock(_consumer_mutex)对消费者加锁,保证多个消费者之间的互斥访问,避免多个消费者同时操作队列导致数据混乱。 -
从队列的
_consumer_step位置取出数据,赋值给out,然后移动消费者下标,并使用取模运算确保下标在队列容量范围内循环。 -
调用
Unlock(_consumer_mutex)解锁。 -
调用
_room_sem.V()释放空闲空间信号量,通知生产者有空间可以生产新的数据。
析构函数:使用 pthread_mutex_destroy 函数销毁生产者和消费者的互斥锁,释放相关资源。
3、引入之前写的MutexModule命名空间再对环形队列类 RingQueue进行封装
#pragma once
#include <iostream>
#include <pthread.h>namespace MutexModule
{class Mutex{public:Mutex(){pthread_mutex_init(&_mutex, nullptr);}void Lock(){int n = pthread_mutex_lock(&_mutex);(void)n;}void Unlock(){int n = pthread_mutex_unlock(&_mutex);(void)n;}~Mutex(){pthread_mutex_destroy(&_mutex);}pthread_mutex_t *Get(){return &_mutex;}private:pthread_mutex_t _mutex;};class LockGuard{public:LockGuard(Mutex &mutex):_mutex(mutex){_mutex.Lock();}~LockGuard(){_mutex.Unlock();}private:Mutex &_mutex;};
}
把环形队列类与MutexModule进行封装整合,创建一个线程安全的环形队列。以下是封装后的代码:
#pragma once
#include <iostream>
#include <vector>
#include "MutexModule.hpp" // 假设MutexModule保存在这个头文件中namespace ThreadSafeQueue {template <typename T>
class CircularQueue {
public:// 构造函数,指定队列容量explicit CircularQueue(size_t capacity) : _capacity(capacity), _queue(capacity), _head(0), _tail(0), _size(0) {}// 禁止拷贝构造和赋值CircularQueue(const CircularQueue&) = delete;CircularQueue& operator=(const CircularQueue&) = delete;// 入队操作(线程安全)bool Enqueue(const T& item) {MutexModule::LockGuard lock(_mutex);if (IsFull()) {return false;}_queue[_tail] = item;_tail = (_tail + 1) % _capacity;++_size;return true;}// 出队操作(线程安全)bool Dequeue(T& item) {MutexModule::LockGuard lock(_mutex);if (IsEmpty()) {return false;}item = _queue[_head];_head = (_head + 1) % _capacity;--_size;return true;}// 查看队首元素(线程安全)bool Front(T& item) const {MutexModule::LockGuard lock(_mutex);if (IsEmpty()) {return false;}item = _queue[_head];return true;}// 查看队尾元素(线程安全)bool Rear(T& item) const {MutexModule::LockGuard lock(_mutex);if (IsEmpty()) {return false;}item = _queue[(_tail - 1 + _capacity) % _capacity];return true;}// 检查队列是否为空(线程安全)bool IsEmpty() const {MutexModule::LockGuard lock(_mutex);return _size == 0;}// 检查队列是否已满(线程安全)bool IsFull() const {MutexModule::LockGuard lock(_mutex);return _size == _capacity;}// 获取队列当前大小(线程安全)size_t Size() const {MutexModule::LockGuard lock(_mutex);return _size;}// 获取队列容量size_t Capacity() const {return _capacity;}private:mutable MutexModule::Mutex _mutex; // 互斥锁,mutable允许在const函数中使用std::vector<T> _queue; // 存储队列元素的容器const size_t _capacity; // 队列容量size_t _head; // 队首索引size_t _tail; // 队尾索引size_t _size; // 当前队列大小
};} // namespace ThreadSafeQueue
封装说明
线程安全实现:
-
使用
MutexModule::Mutex作为内部互斥锁 -
使用
MutexModule::LockGuard实现RAII风格的锁管理 -
所有公共方法都通过锁保护,确保线程安全
功能完整性:
-
保留了环形队列的所有基本功能(Enqueue/Dequeue/Front/Rear等)
-
添加了线程安全的状态检查方法(IsEmpty/IsFull/Size)
设计优化:
-
将互斥锁声明为
mutable,使其可以在const成员函数中使用 -
禁止了拷贝构造和赋值操作,避免潜在的锁所有权问题
-
使用模板支持任意数据类型
命名空间:将线程安全队列封装在ThreadSafeQueue命名空间中,避免命名冲突
这个封装提供了完整的线程安全环形队列实现,可以直接在多线程环境中使用。使用时只需包含头文件并创建实例即可:
ThreadSafeQueue::CircularQueue<int> queue(10); // 创建容量为10的整数队列