当前位置: 首页 > news >正文

Linux -- 线程同步、POSIX信号量与生产者消费者模型

目录

一、线程同步

1、饥饿问题

2、条件变量

3、条件变量的接口

3.1 初始化条件变量

3.2 销毁条件变量

3.3 等待条件变量

3.4 唤醒等待

3.5 条件变量使用规范

4、生产者 - 消费者模型

4.1 概念

4.2 特点

4.3 为何要使用生产者消费者模型

4.4 基于block queue的生产者-消费者模型

4.5 为什么 pthread_cond_wait 需要互斥量?

5、条件变量的封装

6、POSIX信号量

6.1 信号量的概念

6.2 信号量的接口

6.2.1 初始化信号量

6.2.2 销毁信号量

6.2.3 申请信号量P

6.2.4 释放信号量V

6.3 信号量的封装

7、基于环形队列的生产者-消费者模型

7.1 理论模型

7.2 单生产-单消费

7.3 多生产-多消费

8、信号量的本质


一、线程同步

# 在上一章节中,我们使用互斥量之后,确实解决了数据竞争问题,但出现了新的问题:只有一个线程在处理所有售票任务。这展示了互斥量的一个局限性:它确保了线程安全,但不保证公平性。

1、饥饿问题

线程饥饿指的是某些线程由于各种原因,一直无法获得足够的 CPU 时间来执行任务,从而处于长期等待或执行时间极少的状态。 产生线程饥饿的原因主要有以下几种:

  1. 高优先级线程抢占:如果系统中有高优先级的线程持续占用 CPU 资源,那么低优先级的线程就可能长时间得不到执行机会,从而导致饥饿。例如,在实时系统中,高优先级的实时任务可能会一直抢占低优先级的普通任务。
  2. 线程调度不公平:如果线程调度算法不合理或者存在缺陷,可能导致某些线程被不公平地对待,长期无法获得执行机会。比如某些调度算法可能偏向于某些特定类型的线程或者特定状态的线程。
  3. 资源竞争:当多个线程竞争有限的资源时,一些线程可能因为一直无法获得所需资源而被阻塞,从而无法执行。例如,多个线程竞争一个互斥锁,而某些线程总是在竞争中失败,就可能陷入饥饿状态。

# 线程饥饿会导致系统性能下降,部分任务无法及时完成,甚至可能使整个系统陷入停滞或出现不可预测的行为。为了解决线程饥饿问题,我们可以让线程与线程之间形成同步关系。

同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题。

2、条件变量

# 为了解决线程饥饿问题,我们可以引入条件变量(Condition Variable) 。条件变量允许线程在特定条件不满足时主动等待,而不是忙等待或不公平地竞争锁。

条件变量是利用线程间共享的全局变量进行同步的一种机制,条件变量是用来描述某种资源是否就绪的一种数据化描述。其一般包含两个步骤:

  • 一个线程等待条件变量的条件成立而被挂起。
  • 另一个线程使条件成立后唤醒等待的线程。

3、条件变量的接口

3.1 初始化条件变量

# 我们可以使用 pthread_cond_init 初始化互斥量,使用方法如下:

  • 函数原型:int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);
  • 参数:
    1. cond:需要初始化的条件变量。
    2. attr:初始化条件变量的属性,一般设置为 nullptr 即可。
  • 返回值:条件变量初始化成功返回0,失败返回错误码。

# 这种调用函数接口初始化条件变量的方式我们称为动态分配,除此之外,我们也能使用如下的方式进行初始化,我们将其称为静态分配

pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

3.2 销毁条件变量

# 我们可以使用 pthread_cond_destory 销毁互斥量,使用方法如下:

  1. 函数原型:int pthread_cond_destroy(pthread_cond_t *cond);
  2. 参数:mutex:需要销毁的条件变量。
  3. 返回值:成功返回 0,失败返回错误码。
  • 使用 PTHREAD_COND_INITIALIZER 静态初始化的条件变量不需要销毁。

3.3 等待条件变量

# 当某个线程满足某个条件时,我们就可以将其至于条件变量下等待。

  • 函数原型:int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
  • 参数:
    1. cond:需要等待的条件变量。
    2. mutex:当前线程所处临界区对应的互斥锁。
  • 返回值:成功返回 0,失败返回错误码。

3.4 唤醒等待

# 在满足某个条件之后,我们就可以使用以下两种即可,将等待队列中的线程唤醒。

  • 函数原型:
    • int pthread_cond_broadcast(pthread_cond_t *cond);
    • int pthread_cond_signal(pthread_cond_t *cond);
  • 参数:cond:需要唤醒的条件变量。
  • 返回值:成功返回 0,失败返回错误码。

# 其中 pthread_cond_signal() 函数用于唤醒等待队列中的第一个线程。pthread_cond_broadcast() 函数用于唤醒等待队列中的全部线程。

# 比如我们下面创建五个线程,然后将其放入等待队列,最后由主线程进行唤醒。

#include<iostream>
#include<vector>
#include<pthread.h>
#include<string>
#include<unistd.h>#define NUM 5
int cnt = 10000;pthread_mutex_t glock = PTHREAD_MUTEX_INITIALIZER; // 定义锁   为什么一定要有锁??
pthread_cond_t gcond = PTHREAD_COND_INITIALIZER; // 定义条件变量// 等待是需要等才等,什么条件才会等呢?票数为0,等待之前就要对资源的数量进行判定
// 判定本身就是在访问临界资源,所以判断一定要在临界区内部,判定结果也一定在临界区内部
// 所以条件不满足要休眠,一定是在临界区内部休眠的// 证明一件事:条件变量允许线程进行等待
//            可以允许一个线程唤醒在从你的等待的其他线程,实现同步过程
void *threadrun(void *args)
{std::string name = static_cast<const char *>(args);while (true){pthread_mutex_lock(&glock);// 直接让对应的线程进行等待,临界资源不满足导致我们等待的pthread_cond_wait(&gcond, &glock); // glock在pthread_cond_wait之前,会被自动释放掉std::cout << name << "计算:" << cnt << std::endl;cnt++;pthread_mutex_unlock(&glock);}
}int main()
{std::vector<pthread_t> threads;for(int i = 0; i < NUM; i++){pthread_t tid;char *name = new char[64];snprintf(name, 64, "thread-%d", i);int n = pthread_create(&tid, nullptr, threadrun, name);if(n != 0)continue;threads.push_back(tid);sleep(1);}sleep(3);// 每隔1秒唤醒一个线程while(true){// std::cout << "唤醒一个线程..." << std::endl;// pthread_cond_signal(&gcond);std::cout << "唤醒所有线程..." << std::endl;pthread_cond_broadcast(&gcond);sleep(1);}for(auto &id : threads){int m = pthread_join(id, nullptr);(void)m;}return 0;
}

# 在调用 pthread_cond_wait 函数时需要传入对应的互斥锁,原因如下:

# 当线程由于某些条件不满足而需要在特定条件变量下进行等待时,必须释放该互斥锁。这是因为如果不释放互斥锁,其他线程将无法获取该锁以进入临界区修改共享资源,从而无法改变条件使等待线程被唤醒。

# 当该线程被唤醒后,会接着执行临界区内的代码,这就要求该线程必须立即获得对应的互斥锁。这样设计确保了线程在被唤醒后能够安全地访问临界区,避免了多个线程同时进入临界区而导致的数据不一致和资源竞争问题。

3.5 条件变量使用规范

# 使用条件变量我们一般遵守以下规范,如果是等待条件变量,函数应该放在互斥量加锁与解锁之间,因为判断条件也是一种临界资源。

pthread_mutex_lock(&mutex);
while (条件为假)
pthread_cond_wait(cond, mutex);
//修改条件
pthread_mutex_unlock(&mutex);

# 同样唤醒操作也需要类似的操作。

pthread_mutex_lock(&mutex);
//条件为真
pthread_cond_signal(cond);
pthread_mutex_unlock(&mutex);

4、生产者 - 消费者模型

4.1 概念

生产者 - 消费者模型是一种经典的多线程或多进程同步模型。它主要用于解决在数据生产和数据消费速度不一致的情况下,如何安全、高效地处理数据的问题。

# 在这个模型中,有两类角色:生产者消费者。生产者负责生产数据,例如在一个文件读取系统中,生产者可能是读取文件内容并将其转换为特定格式数据的线程或进程;消费者则负责消费(处理)生产者生产的数据,比如将读取到的数据进行进一步的分析或者存储到数据库中的线程或进程。

# 利用该模型我们能实现生产者与消费者之间的解耦,并且生产者在生产时,其它生产者可以获取数据,消费者可以处理数据,消费者在消费时也是同理,一定程度上实现了并发。

# 下面用一个例子形象说明什么事生产者 - 消费者模型:

4.2 特点

# 生产者-消费者模型一般具有以下三个特点:

  • 三种关系: 生产者和生产者(互斥关系)、消费者和消费者(互斥关系)、生产者和消费者(互斥关系、同步关系)。
  • 两种角色: 生产者和消费者。(通常由进程或线程承担)
  • 一个交易场所: 通常指的是内存中的一段缓冲区。

# 因为容器是能够被多个执行流访问的一个共享资源,所以生产者与生产者,消费者与消费者,生产者与消费者之间是一个互斥关系,而我们访问数据一定是生产者先生产,消费者再消费,所以生产者与消费者之间是一个同步关系。

# “321”原则:

4.3 为何要使用生产者消费者模型

  1. 生产过程和消费过程解耦
  2. 支持忙闲不均
  3. 提高效率

# 生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

4.4 基于block queue的生产者-消费者模型

# 在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)。

  • 当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中放入了元素。
  • 当队列满时,往队列里存放元素的操作会被阻塞,直到有元素从队列中取出。

# 其中阻塞队列最典型的应用场景实际上就是管道的实现。

# 首先我们可以先实现单生产者单消费者 BlockingQueue 的框架:首先我们需要一个队列 _q 作为成员变量以及表示其容量的 _cap,并且因为涉及多执行流访问,需要一把互斥锁 _mutex,最后我们还需要两个条件变量 _empty 与·_full 分别表示当我们队列为空时,执行消费的执行流需加入 _empty 条件变量与当我们队列为满时,执行生产的执行流需加入该条件变量 _full

// 阻塞队列的实现#pragma once#include<iostream>
#include<string>
#include<pthread.h>
#include<queue>const int defaultcap = 5; // for testtemplate<typename T>
class BlockQueue
{
private:bool IsFull(){return _q.size() >= _cap;}bool IsEmpty(){return _q.empty();}public:BlockQueue(int cap = defaultcap):_cap(cap),_csleep_num(0),_psleep_num(0){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_full_cond, nullptr);pthread_cond_init(&_empty_cond, nullptr);}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_full_cond);pthread_cond_destroy(&_empty_cond);}private:std::queue<T> _q; // 临界资源!!int _cap; // 容量大小pthread_mutex_t _mutex;pthread_cond_t _full_cond;pthread_cond_t _empty_cond;
};

# 并且我们实现生产 Equeue 与消费 Pop 操作也十分简单,生产时如果队列为满,则加入条件变量 _full 等待,没有则正常生产,生产完毕后该队列一定有数据,这时我们就需要唤醒 _empty 条件变量执行消费操作。而消费操作正好对应,如果消费时如果队列为空,则加入条件变量 _empty 等待,否则正常消费,消费完毕后该队列一定不为空,这时我们就需要唤醒 _full 条件变量执行生产操作。并且生产与消费操作都属于临界资源,所以需要加锁。

// 阻塞队列的实现#pragma once#include<iostream>
#include<string>
#include<pthread.h>
#include<queue>const int defaultcap = 5; // for testtemplate<typename T>
class BlockQueue
{
private:bool IsFull(){return _q.size() >= _cap;}bool IsEmpty(){return _q.empty();}public:BlockQueue(int cap = defaultcap):_cap(cap),_csleep_num(0),_psleep_num(0){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_full_cond, nullptr);pthread_cond_init(&_empty_cond, nullptr);}void Equeue(const T &in){pthread_mutex_lock(&_mutex);// 生产者调用while(IsFull()) {// 队列满了,进行等待// 重点1:pthread_cond_wait调用成功,挂起当前线程之前,要先自动释放锁// 重点2: 当线程被唤醒的时候,默认就在临界区内唤醒,要从pthread_cond_wait成功返回,需要当前线程//        重新申请_mutex锁// 重点3: 如果线程被唤醒,但是申请锁失败了,怎么办呢?就会在锁上阻塞等待_psleep_num++;pthread_cond_wait(&_full_cond, &_mutex);_psleep_num--;}// 暂定_q.push(in);// V2:// 走到这时,保证队列里至少有一个数据,因为操作是原子的,此时就要唤醒消费者出来消费if(_csleep_num > 0){pthread_cond_signal(&_empty_cond);std::cout << "唤醒消费者..." << std::endl;}// V1:// pthread_cond_signal(&_empty_cond);pthread_mutex_unlock(&_mutex);// pthread_cond_signal(&_empty_cond);}T Pop(){// 消费者调用pthread_mutex_lock(&_mutex);while(IsEmpty()) // 防止wait的两个问题出现,这里不用if,而用while{// 空队列_csleep_num++;pthread_cond_wait(&_empty_cond, &_mutex);// 问题1:pthread_cond_wait是函数,如果调用失败就会直接返回// 问题2:pthread_cond_wait可能会因为,其实条件不满足,pthread_cond_wait 伪唤醒_csleep_num--;}T data = _q.front();_q.pop();// V2:// 走到这时,队列一定至少有一个空间,此时就要唤醒生产者进行生产if(_psleep_num > 0){pthread_cond_signal(&_full_cond);std::cout << "唤醒生产者..." << std::endl;}// V1://pthread_cond_signal(&_full_cond); // 放在unlock前面,线程能立即醒来,但申请锁一定会失败,因为还没// 解锁。所以当singal唤醒的时候,消费者线程无法从wait中返回,但是// 他已经不是在条件变量上等了,而是已经申请锁失败,在锁上等,当// unlock执行的时候,他重新持有锁,就会返回了pthread_mutex_unlock(&_mutex);//pthread_cond_signal(&_full_cond); // 放在unlock后面,锁已经解锁了,就去条件变量下直接唤醒线程,但是// 万一锁刚释放就被其他线程抢占了怎么办,答案是不影响,他也会在锁// 里面去等待,当条件变量满足时,就会自动持有锁去返回return data;}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_full_cond);pthread_cond_destroy(&_empty_cond);}private:std::queue<T> _q; // 临界资源!!int _cap; // 容量大小pthread_mutex_t _mutex;pthread_cond_t _full_cond;pthread_cond_t _empty_cond;int _csleep_num; // 消费者休眠的个数int _psleep_num; // 生产者休眠的个数
};

# 其中需要注意的是,pthread_cond_signal 放在 unlock 前后的区别:

# 还需要注意的是,pthread_cond_wait 函数作为让当前执行流进行等待的函数,存在调用失败的可能性,若调用失败,该执行流会继续往后执行。

# 在多生产者的情形下,当消费者消费了一个数据后,若使用 pthread_cond_broadcast 函数唤醒多个生产者,此时若阻塞队列仅有一个空位,且唤醒的生产者与消费者竞争,当生产者持续竞争锁成功时,就可能出现错误。鉴于此,为避免上述情况发生,必须让线程被唤醒后再次进行判断,以确认是否真正满足生产消费条件,所以这里要用  while 进行判断。

# 阻塞队列已经封装好了,接下来就需要在主程序中编写,测试单生产单消费模型:

#include "BlockQueue.hpp"
#include <iostream>
#include <pthread.h>
#include <unistd.h>void *consumer(void *args)
{BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);while (true){// sleep(1);int data = bq->Pop();std::cout << "消费了一个数据:" << data << std::endl;}
}void *productor(void *args)
{int data = 1;BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);while(true){sleep(1);std::cout << "生产了一个数据:" << data << std::endl;bq->Equeue(data);data++;}
}int main()
{// 申请阻塞队列BlockQueue<int> *bq = new BlockQueue<int>();// 构建生产者和消费者pthread_t c, p;pthread_create(&c, nullptr, consumer, bq);pthread_create(&p, nullptr, productor, bq);pthread_join(c, nullptr);pthread_join(p, nullptr);return 0;
}

# 我们也可以来试一下队列为满的情况,其他代码不变,先让消费者 sleep 上1秒钟,让生产者把队列 push 满:

#include "BlockQueue.hpp"
#include <iostream>
#include <pthread.h>
#include <unistd.h>void *consumer(void *args)
{BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);while (true){sleep(1);int data = bq->Pop();std::cout << "消费了一个数据:" << data << std::endl;}
}void *productor(void *args)
{int data = 1;BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);while(true){// sleep(1);std::cout << "生产了一个数据:" << data << std::endl;bq->Equeue(data);data++;}
}int main()
{// 申请阻塞队列BlockQueue<int> *bq = new BlockQueue<int>();// 构建生产者和消费者pthread_t c, p;pthread_create(&c, nullptr, consumer, bq);pthread_create(&p, nullptr, productor, bq);pthread_join(c, nullptr);pthread_join(p, nullptr);return 0;
}

# 注意:这里使用模板是为了说明队列中不仅可以存放内置类型(如 int ),对象同样可以作为任务参与生产消费流程。

#pragma onceclass Task
{
public:Task(){}Task(int x, int y):_x(x), _y(y){}int X() { return _x; }int Y() { return _y; }int Execute(){return _result = _x + _y;}int Result(){return _result;}~Task(){}private:int _x;int _y;int _result;
};
#include "BlockQueue.hpp"
#include "Task.hpp"#include <iostream>
#include <pthread.h>
#include <unistd.h>void *consumer(void *args)
{BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);while (true){sleep(1);// int data = bq->Pop();Task t = bq->Pop();t.Execute();// std::cout << "消费了一个数据:" << data << std::endl;std::cout << "消费了一个任务:" << t.X() << "+" << t.Y() << "=" << t.Result() << std::endl;}
}void *productor(void *args)
{// int data = 1;int x = 1;int y = 1;BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);while(true){// sleep(1);// std::cout << "生产了一个数据:" << data << std::endl;std::cout << "生产了一个任务:" << x << "+" << y << "=?" << std::endl;Task t(x, y);bq->Equeue(t);x++, y++;// bq->Equeue(data);// data++;}
}int main()
{// 申请阻塞队列// 扩展认识:阻塞队列里面也可以放任务BlockQueue<Task> *bq = new BlockQueue<Task>();// 构建生产者和消费者pthread_t c, p;pthread_create(&c, nullptr, consumer, bq);pthread_create(&p, nullptr, productor, bq);pthread_join(c, nullptr);pthread_join(p, nullptr);return 0;
}

# 同时我们的任务也不一定是类,也可以是函数对象。阻塞队列也可以放入 function 包装器 task_t:

#pragma once#include<iostream>
#include<functional>// 任务形式2:
// 定义了一个任务类型,返回值为void,参数为空
using task_t = std::function<void()>;void Download()
{std::cout << "我是一个下载任务" << std::endl;
}
#include "BlockQueue.hpp"
#include "Task.hpp"#include <iostream>
#include <pthread.h>
#include <unistd.h>// 任务形式2:
void *consumer(void *args)
{BlockQueue<task_t> *bq = static_cast<BlockQueue<task_t> *>(args);while (true){// sleep(1);task_t t = bq->Pop();t();}
}void *productor(void *args)
{BlockQueue<task_t> *bq = static_cast<BlockQueue<task_t> *>(args);while (true){sleep(1);std::cout << "生产了一个任务" << std::endl;bq->Equeue(Download);}
}int main()
{// 申请阻塞队列// 扩展认识:阻塞队列里面也可以放任务BlockQueue<task_t> *bq = new BlockQueue<task_t>();// 构建生产者和消费者pthread_t c, p;pthread_create(&c, nullptr, consumer, bq);pthread_create(&p, nullptr, productor, bq);pthread_join(c, nullptr);pthread_join(p, nullptr);return 0;
}

# 对于多生产多消费模型,我们的阻塞队列代码并不需要改变,其实原理都是一样的,因为不管是谁访问队列,都需要互斥访问。

4.5 为什么 pthread_cond_wait 需要互斥量?

基本原理

# 条件等待是多线程编程中实现线程同步的重要手段。它的核心逻辑是:当一个线程发现某个条件不满足时,主动进入等待状态,直到其他线程修改了共享变量使得条件满足,并通过信号唤醒等待线程。这种机制必须满足以下两个基本要素:

  1. 共享变量的修改:必须有至少一个线程能够修改影响条件的共享变量
  2. 互斥保护:所有对共享变量的访问和修改都必须通过互斥锁进行保护

错误实现示例分析

# 考虑以下看似合理的错误实现:

// 错误的设计
pthread_mutex_lock(&mutex);
while (condition_is_false) {pthread_mutex_unlock(&mutex);//解锁之后,等待之前,条件可能已经满⾜,信号已经发出,但是该信号可能被错过pthread_cond_wait(&cond, &mutex);pthread_mutex_lock(&mutex);
}
pthread_mutex_unlock(&mutex);

# 这个设计存在严重的竞态条件问题:

  1. 在解锁后到调用 pthread_cond_wait之前存在时间窗口
  2. 其他线程可能在此期间获取锁、修改条件并发送信号
  3. 这会导致信号丢失,等待线程可能永远阻塞

正确的原子性操作

正确的实现要求解锁和等待必须是原子操作,这正是 pthread_cond_wait的设计目的:

5、条件变量的封装

# 和封装互斥量一样非常简单,代码如下:

#pragma once
#include <cstring>
#include "Mutex.hpp"using namespace MutexModule;namespace CondModule
{class Cond{public:Cond(){int n = pthread_cond_init(&_cond, nullptr);if (n != 0){std::cerr << "cond init failed: " << strerror(n) << std::endl;}}void Wait(Mutex& mutex){int n = pthread_cond_wait(&_cond, mutex.Get());if (n != 0){std::cerr << "cond wait failed: " << strerror(n) << std::endl;}}void Signal(){int n = pthread_cond_signal(&_cond);if (n != 0){std::cerr << "cond signal failed: " << strerror(n) << std::endl;}}void Broadcast(){int n = pthread_cond_broadcast(&_cond);if (n != 0){std::cerr << "cond broadcast failed: " << strerror(n) << std::endl;}}~Cond(){int n = pthread_cond_destroy(&_cond);if (n != 0){std::cerr << "cond destroy failed: " << strerror(n) << std::endl;}}private:pthread_cond_t _cond;};
}

# 为了提高条件变量的通用性,建议在封装 Cond 类时避免直接引用内部的互斥量。这样可以在后续组合使用时避免因代码耦合导致的初始化困难,因为 Mutex Cond 通常需要同时创建。

# 我们给互斥量新增一个接口,用于条件变量中需要 wait 获得锁的情况:

        pthread_mutex_t* Get(){return &_mutex;}

# 下面我们也可以将阻塞队列修改一下,将封装的互斥量和条件变量复用起来:

#include <iostream>
#include <queue>
#include <pthread.h>
#include "Cond.hpp"
#include "Mutex.hpp"using namespace MutexModule;
using namespace CondModule;const int  defaultcap = 5;template <class T>
class BlockQueue
{
private:bool IsFull() { return _q.size() >= _cap; }bool IsEmpty() { return _q.empty(); }
public:BlockQueue(int cap = defaultcap):_cap(cap), _csleep_num(0), _psleep_num(0){}// 生产者生产数据入队列void Enqueue(const T& in){LockGuard lockguard(_mutex);// 不能使用if判断,会虚假唤醒while(IsFull()) {_psleep_num++;std::cout << "队列已满, 生产者进入休眠, 生产者休眠个数: " << _psleep_num << std::endl;_full_cond.Wait(_mutex);_psleep_num--;}// 此时队列必定有空间_q.push(in);// 只有队列为空时,消费者才会阻塞休眠,此时队列肯定不为空// 那么就判断是否有消费者休眠,有就唤醒if(_csleep_num > 0){_empty_cond.Signal();std::cout << "唤醒消费者..." << std::endl;}}// 消费者消费数据出队列T Pop(){LockGuard lockguard(_mutex);while(IsEmpty()) {_csleep_num++;std::cout << "队列为空, 消费者进入休眠, 消费者休眠个数: " << _csleep_num << std::endl;_empty_cond.Wait(_mutex);_csleep_num--;}// 此时队列必定有空间T data = _q.front();_q.pop();// 只有队列为空时,消费者才会阻塞休眠,此时队列肯定不为空// 那么就判断是否有消费者休眠,有就唤醒if(_psleep_num > 0){_full_cond.Signal();std::cout << "唤醒生产者..." << std::endl;}return data;}~BlockQueue() {}
private:std::queue<T> _q;size_t _cap; // 队列容量大小Mutex _mutex;Cond _full_cond;Cond _empty_cond;int _csleep_num; // 消费者休眠的个数int _psleep_num; // 生产者休眠的个数
};

6、POSIX信号量

6.1 信号量的概念

# 为了解决多执行流访问临界区,造成数据不一致等问题,我们除了使用互斥锁外,我们还可以使用一种 POSIX 信号量的方法。

# 当我们运用互斥锁来保护临界资源时,意味着我们把这块临界资源视为一个不可分割的整体,在同一时刻只准许一个执行流对其进行访问。

# 其实我们也能将这块临界资源进一步划分成多个区域。当多个执行流有访问临界资源的需求时,若让这些执行流同时去访问临界资源的不同区域,此时也并不会引发数据不一致等问题。信号量就是基于此的解决方法。

POSIX 信号量本质上是一个计数器,用于衡量临界资源中的资源数目。它对临界资源内部的资源数进行统计,同时操作系统为其提供了一种对临界资源的预定机制。所有执行流在访问临界资源之前,必须先申请信号量。

# 信号量的 PV 操作:

  • P操作:我们将申请信号量称为P操作,申请信号量的本质就是申请获得临界资源中某块资源的使用权限,当申请成功时临界资源中资源的数目应该减一,因此P操作的本质就是让计数器减一。
  • V操作:我们将释放信号量称为V操作,释放信号量的本质就是归还临界资源中某块资源的使用权限,当释放成功时临界资源中资源的数目就应该加一,因此V操作的本质就是让计数器加一。

# 并且由于因信号量的 PV 操作同样属于临界资源,所以 PV 操作肯定是原子的。

# 值得注意的是: 虽然 POSIX 信号量和 SystemV 信号量作用相同,都是用于同步操作,但 POSIX 信号量常用于线程间同步,而 SystemV 信号量常用于进程间通信。

6.2 信号量的接口

6.2.1 初始化信号量

# 我们首先需要使用 sem_init 初始化信号量,其用法如下:

  • 函数接口:int sem_init(sem_t *sem, int pshared, unsigned int value);
  • 参数:
    1. sem:需要初始化的信号量。
    2. pshared:传入0值表示线程间共享,传入非零值表示进程间共享。
    3. value:信号量的初始值(计数器的初始值)。
  • 返回值:初始化信号量成功返回0,失败返回-1。

6.2.2 销毁信号量

# 在使用完信号量之后,我们就需要用 sem_destory 对其进行销毁,其用法如下:

  • 函数接口:int sem_destroy(sem_t *sem);
  • 参数:
    • sem:需要销毁的信号量。
  • 返回值:销毁信号量成功返回0,失败返回-1。

6.2.3 申请信号量P

# 申请信号量也就是 P 操作,我们需要使用 sem_wait 函数,其用法如下:

  • 函数接口:int sem_wait(sem_t *sem);
  • 参数:
    • sem:需要申请的信号量。
  • 返回值:申请信号量成功返回0,信号量的值减一。申请信号量失败返回-1,信号量的值保持不变。如果信号量为 0,则该执行流会被阻塞,直至信号量大于 0。

6.2.4 释放信号量V

# 释放信号量也就是 V 操作,我们需要使用 sem_post 函数,其用法如下:

  • 函数接口:int sem_post(sem_t *sem);
  • 参数:
    • sem:需要释放的信号量。
  • 返回值:释放信号量成功返回0,信号量的值加一。释放信号量失败返回-1,信号量的值保持不变。

# 如果信号量的初始值为1,那么此时信号量所描述的临界资源只有一份,这个临界资源也只能同时被一个执行流访问。此时信号量的作用基本等价于互斥锁,这种信号量我们称为二元信号量

6.3 信号量的封装

# 和前面章节封装互斥量,条件变量一样,比较简单,不做讲解。代码如下:

#include<iostream>
#include<pthread.h>
#include<semaphore.h>namespace SemModule
{const int defaultvalue = 1;class Sem{public:Sem(unsigned int sem_value = defaultvalue){sem_init(&_sem, 0, sem_value);}void P(){int n = sem_wait(&_sem); // 原子的(void)n;}void V(){int n = sem_post(&_sem); // 原子的(void)n;}~Sem(){sem_destroy(&_sem);}private:sem_t _sem;};
}

7、基于环形队列的生产者-消费者模型

7.1 理论模型

7.2 单生产-单消费

# 根据上述所说,我们可以进行代码的编写,下面我们同样还是先以单生产者单消费者为例,后面再改成多生产多消费。

#pragma once#include<iostream>
#include<vector>#include"Sem.hpp"static const int gcap = 5; // for debugusing namespace SemModule;template<typename T>
class RingQueue
{
public:RingQueue(int cap = gcap): _rq(cap),        // 初始化底层容器,容量为cap_cap(cap),       // 记录队列容量_blank_sem(cap), // 空位置信号量初始值为cap(初始时队列全为空)_data_sem(0),    // 数据信号量初始值为0(初始时队列无数据)_p_step(0),      // 生产者下标初始化为0(从第一个位置开始生产)_c_step(0)       // 消费者下标初始化为0(从第一个位置开始消费){}void Equeue(const T &in){// 生产者// 1. 生产者申请空位置信号量//    - 如果有空位置(信号量>0),P操作会将信号量减1,继续执行//    - 如果无空位置(信号量=0),P操作会阻塞等待,直到有位置释放_blank_sem.P();// 2.生产,将数据放入当前生产者位置_rq[_p_step] = in;// 3.更新下标(向后移动一位)++_p_step;// 4.维持环形特性_p_step %= _cap;// 数据入队后,数据信号量加1(通知消费者有新数据)_data_sem.V();}void Pop(T *out){// 消费者// 1. 消费者申请数据信号量//    - 如果有数据(信号量>0),P操作会将信号量减1,继续执行//    - 如果无数据(信号量=0),P操作会阻塞等待,直到有新数据_data_sem.P();// 2.消费,从当前消费者位置取出数据*out = _rq[_c_step];// 3.更新下标(向后移动一位)++_c_step;// 4. 维持环形特性_c_step %= _cap;// 数据出队后,空位置信号量加1(通知生产者有新空位)_blank_sem.V();}// vector会自动释放资源~RingQueue(){}private:std::vector<T> _rq;int _cap;// 生产者Sem _blank_sem; // 空位置信号量:记录当前可用的空位置数量int _p_step; // 生产者当前操作的下标位置// 消费者Sem _data_sem; // 数据信号量:记录当前可用的数据数量int _c_step;   // 消费者当前操作的下标位置
};

#include <iostream>
#include <pthread.h>
#include <unistd.h>#include"RingQueue.hpp"void *consumer(void *args)
{RingQueue<int> *rq = static_cast<RingQueue<int> *>(args);while (true){sleep(1);// 1.消费任务int t = 0;rq->Pop(&t);// 2.处理任务 -- 处理任务的时候,这个任务,已经被拿到线程的上下文当中了,不属于队列了std::cout << "消费者拿到了一个数据:" << t << std::endl;// t();}
}void *productor(void *args)
{RingQueue<int> *rq = static_cast<RingQueue<int> *>(args);int data = 1;while (true){// 1.获得任务std::cout << "生产了一个任务" << data << std::endl;// 2.生产任务rq->Equeue(data);data++;}
}int main()
{// 申请阻塞队列// 扩展认识:阻塞队列里面也可以放任务RingQueue<int> *rq = new RingQueue<int>();// 构建生产者和消费者pthread_t c[1], p[1];pthread_create(c, nullptr, consumer, rq);pthread_create(p, nullptr, productor, rq);pthread_join(c[0], nullptr);pthread_join(p[0], nullptr);return 0;
}

# 可以看到我们让生产者先运行,消费者 sleep 上1秒,生产者很快就将队列生产满,然后消费者消费旧数据,生产者又继续生产新数据。

# 但这是单生产单消费,我们使用信号量完成了生产者和消费者之间的互斥和同步,不需要维护生产者与生产者之间的互斥关系,也不需要维护消费者与消费者之间的互斥关系。

7.3 多生产-多消费

# 那如果是多生产多消费呢?那我们就需要维护生产者与生产者之间,消费者与消费者之间的互斥关系。

# 怎么维护?答案是加锁。生产者与生产者之间需要一把锁,消费者与消费者之间需要一把锁。

// RingQueue.hpp#pragma once#include<iostream>
#include<vector>#include"Sem.hpp"
#include"Mutex.hpp"static const int gcap = 5; // for debugusing namespace SemModule;
using namespace MutexModule;template<typename T>
class RingQueue
{
public:RingQueue(int cap = gcap): _rq(cap),        // 初始化底层容器,容量为cap_cap(cap),       // 记录队列容量_blank_sem(cap), // 空位置信号量初始值为cap(初始时队列全为空)_data_sem(0),    // 数据信号量初始值为0(初始时队列无数据)_p_step(0),      // 生产者下标初始化为0(从第一个位置开始生产)_c_step(0)       // 消费者下标初始化为0(从第一个位置开始消费){}void Equeue(const T &in){// 生产者// _pmutex.Lock();// 1. 生产者申请空位置信号量//    - 如果有空位置(信号量>0),P操作会将信号量减1,继续执行//    - 如果无空位置(信号量=0),P操作会阻塞等待,直到有位置释放_blank_sem.P();{// _pmutex.Lock(); // 先申请信号量再加锁的效率会比较高LockGuard lockguard(_pmutex);// 2.生产,将数据放入当前生产者位置_rq[_p_step] = in;// 3.更新下标(向后移动一位)++_p_step;// 4.维持环形特性_p_step %= _cap;}// 数据入队后,数据信号量加1(通知消费者有新数据)_data_sem.V();// _pmutex.Unlock();}void Pop(T *out){// 消费者// _cmutex.Lock();// 1. 消费者申请数据信号量//    - 如果有数据(信号量>0),P操作会将信号量减1,继续执行//    - 如果无数据(信号量=0),P操作会阻塞等待,直到有新数据_data_sem.P();{// _cmutex.Lock();LockGuard lockguard(_cmutex);// 2.消费,从当前消费者位置取出数据*out = _rq[_c_step];// 3.更新下标(向后移动一位)++_c_step;// 4. 维持环形特性_c_step %= _cap;}// 数据出队后,空位置信号量加1(通知生产者有新空位)_blank_sem.V();// _cmutex.Unlock();}// vector会自动释放资源~RingQueue(){}private:std::vector<T> _rq;int _cap;// 生产者Sem _blank_sem; // 空位置信号量:记录当前可用的空位置数量int _p_step; // 生产者当前操作的下标位置// 消费者Sem _data_sem; // 数据信号量:记录当前可用的数据数量int _c_step;   // 消费者当前操作的下标位置// 维护多生产多消费(2把锁)Mutex _cmutex;Mutex _pmutex;
};
// main.cc#include <iostream>
#include <pthread.h>
#include <unistd.h>#include"RingQueue.hpp"struct threaddata
{RingQueue<int> *rq;std::string name;
};void *consumer(void *args)
{threaddata *td = static_cast<threaddata *>(args);while (true){sleep(3);// 1.消费任务int t = 0;td->rq->Pop(&t);// 2.处理任务 -- 处理任务的时候,这个任务,已经被拿到线程的上下文当中了,不属于队列了std::cout << td->name << "消费者拿到了一个数据:" << t << std::endl;// t();}
}int data = 1;void *productor(void *args)
{threaddata *td = static_cast<threaddata *>(args);while (true){sleep(1);// 1.获得任务std::cout << td->name << "生产了一个任务" << data << std::endl;// 2.生产任务td->rq->Equeue(data);data++;}
}int main()
{// 申请阻塞队列// 扩展认识:阻塞队列里面也可以放任务RingQueue<int> *rq = new RingQueue<int>();// 构建生产者和消费者// 如果改成多生产多消费呢?// 单单:cc,pp -> 不需要维护互斥关系//       cp    -> 只需维护cp间的互斥与同步// 多多:cc,pp -> 只需再维护cp间的互斥关系,就要加锁pthread_t c[2], p[3];threaddata *td = new threaddata();td->name = "cthread-1";td->rq = rq;pthread_create(c, nullptr, consumer, td);threaddata *td2 = new threaddata();td2->name = "cthread-2";td2->rq = rq;pthread_create(c+1, nullptr, consumer, td2);threaddata *td3 = new threaddata();td3->name = "pthread-1";td3->rq = rq;pthread_create(p, nullptr, productor, td3);threaddata *td4 = new threaddata();td4->name = "pthread-2";td4->rq = rq;pthread_create(p+1, nullptr, productor, td4);threaddata *td5 = new threaddata();td5->name = "pthread-3";td5->rq = rq;pthread_create(p+2, nullptr, productor, td5);pthread_join(c[0], nullptr);pthread_join(c[1], nullptr);pthread_join(p[0], nullptr);pthread_join(p[1], nullptr);pthread_join(p[2], nullptr);return 0;
}

8、信号量的本质

http://www.dtcms.com/a/609526.html

相关文章:

  • 微服务重要知识点
  • 东莞seo建站排名昆山有名的网站建设公司
  • 主从服务器
  • Linux 文件缓冲区
  • Node.js中常见的事件类型
  • Nacos的三层缓存是什么
  • 交通事故自动识别_YOLO11分割_DRB实现
  • 用flex做的网站空间注册网站
  • Vue + Axios + Node.js(Express)如何实现无感刷新Token?
  • 重大更新!Ubuntu Pro 现提供长达 15 年的安全支持
  • 重庆做学校网站公司农村服务建设有限公司网站
  • 尝试本地部署 Stable Diffusion
  • 网站前置审批专项好的用户体验网站
  • 【动规】背包问题
  • js:网页屏幕尺寸小于768时,切换到移动端页面
  • 《LLM零开销抽象与插件化扩展指南》
  • C++_面试题_21_字符串操作
  • 多重组合问题与矩阵配额问题
  • 什么情况下会把 SYN 包丢弃?
  • EG27324 带关断功能双路MOS驱动芯片技术解析
  • do_action wordpress 模板关键词优化排名的步骤
  • 海外网站入口通信管理局 网站备案
  • 在 Java 中实现 Excel 数字与文本转换
  • 如何保持不同平台的体验一致性
  • redis(五)——管道、主从复制
  • OBS直播教程:OBS实时字幕插件如何下载?OBS实时字幕插件如何安装?OBS实时字幕插件如何使用?OBS实时字幕插件官方下载地址
  • WPF中TemplatePart机制详解
  • 大学生毕业设计课题做网站网站开发研发设计
  • PPT制作正在发生一场静默革命
  • 无线通信信道的衰落特性