当前位置: 首页 > news >正文

【Linux系统】深入理解线程同步,实现生产消费模型

前言:

        上文我们讲到了线程的互斥的概念、互斥的接口以及互斥的原理【Linux系统】深入理解线程,互斥及其原理-CSDN博客

        本文我们再来讲一讲Linux系统中,与互斥息息相关的概念:同步!


线程同步概念

我们了解了线程的互斥,那么线程同步是干什么的呢?

同步机制的引入

        线程互斥通过锁机制,解决了多线程的情况下,数据不一致的问题。但新技术的引入也带来了新的问题!

        线程互斥的加锁机制,这需要线程之间相互抢夺锁,得到锁的线程才能够进入临界区访问临界资源!这就导致了一个问题:程序运行的效率很低,并且往往有线程一直都抢不到锁,造成线程饥饿问题!

        基于以上问题,所以引入了线程同步机制。

        线程同步机制主要实现以下3点:1.申请同一把锁的线程要排队 2.申请过锁的线程,不能立即申请第二次!3.申请过锁的线程,要回到队伍末端进行排队

        其目的是为了让线程按照一定顺序访问临界资源!


条件变量

条件变量其本质并不是“变量”!而是Linux下实现线程同步的具体机制!

感性理解条件变量

        条件变量机制的实现方式:让对应的线程进入队列中等待,并通过从前到后的顺序唤醒线程。这样就可以让线程按照一定的顺序访问临界资源!

        当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。 例如一个线程访问队列时,发现队列为空没有数据,那它只能等待,只等其它线程将一个节点添加到队列中。这种情况就需要用到条件变量,让当前需要等待的线程进入条件变量下的队列中等待,当对应的数据被添加后,再加线程唤醒,继续执行代码。

        这样做的好处在于,不会因为一个线程的等待,而导致整体的程序都被迫等待!使用条件变量后,让等待的线程到队列中等待,不妨碍其他线程的执行!

条件变量接口

条件变量的接口,与互斥锁的接口类似:

条件变量初始化:全局初始化:pthread_cond_t cond = PTHREAD_COND_INITIALIZER;局部初始化:#include <pthread.h>
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);cond:指向要初始化的条件变量(pthread_cond_t类型)
attr:条件变量的属性(通常为NULL,表示使用默认属性)注:全局初始化后OS会自动销毁,但局部初始化需要我们自己手动销毁!!!
条件变量销毁:局部初始化需要我们调用函数手动销毁int pthread_cond_destroy(pthread_cond_t *cond);cond:指向要销毁的条件变量
 等待条件满足:int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);cond:指向条件变量
mutex:指向已加锁的互斥锁(调用前必须由当前线程持有)
唤醒一个等待线程:int pthread_cond_signal(pthread_cond_t *cond);
cond:指向条件变量唤醒所有等待线程:int pthread_cond_broadcast(pthread_cond_t *cond);
cond:指向条件变量

接口使用演示:

这里我是让所有的线程全部都进入了条件变量下的队列进行了等待,并每隔一秒唤醒一个线程。

#include <iostream>
#include <stdlib.h>
#include <string>
#include <unistd.h>
#include <pthread.h>
using namespace std;int ticket = 1000;
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; // 定义并初始化锁
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;   // 定义并初始化条件变量void *route(void *args)
{string name = static_cast<char *>(args);while (true){// 对临界区上锁pthread_mutex_lock(&lock);if (ticket > 0){// 在条件变量下等待pthread_cond_wait(&cond, &lock);cout << name << ":sells ticket:" << ticket << endl;ticket--;// 结束访问临界区,解锁pthread_mutex_unlock(&lock);}else{// 结束访问临界区,解锁pthread_mutex_unlock(&lock);break;}}return nullptr;
}int main()
{while (1){pthread_t t1, t2, t3, t4;pthread_create(&t1, NULL, route, (void *)"thread 1");pthread_create(&t2, NULL, route, (void *)"thread 2");pthread_create(&t3, NULL, route, (void *)"thread 3");pthread_create(&t4, NULL, route, (void *)"thread 4");while (1){// 每个一秒唤醒一个线程sleep(1);pthread_cond_signal(&cond);}pthread_join(t1, NULL);pthread_join(t2, NULL);pthread_join(t3, NULL);pthread_join(t4, NULL);}
}

通过结果我们可以看到,线程是按照1342这个顺序进行执行的,符合我们对条件变量的预期。

thread 1:sells ticket:1000
thread 3:sells ticket:999
thread 4:sells ticket:998
thread 2:sells ticket:997
thread 1:sells ticket:996
thread 3:sells ticket:995
thread 4:sells ticket:994
thread 2:sells ticket:993
thread 1:sells ticket:992
thread 3:sells ticket:991
thread 4:sells ticket:990
thread 2:sells ticket:989

理解为什么条件变量等待接口参数需要传入锁:

        重点1:pthread_cond_wait调用成功,挂起当前线程之前,要先自动释放锁!!

        重点2:当线程被唤醒的时候,默认就在临界区内唤醒!要从pthread_cond_wait成功返回,需要当前线程,重新申请_mutex锁!!!

        重点3:如果被唤醒,但是申请锁失败了??就会在锁上阻塞等待!!!


生产者消费者模型

        生产者消费者模型,是很重要的一个模型。

        生产者 - 消费者模型,核心目的是高效解决 “数据生成” 与 “数据处理” 两类线程间的协同问题。通过 "生产者线程→缓冲区→消费者线程" 三层结构,搭配 Linux 线程同步机制来实现!

使用此模型的意义

        生产者消费者模式就是通过⼀个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于⼀个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

模型原则

生产者消费者模型遵循以下原则:三二一原则(便于记忆)

三种关系:

        生产者之间:竞争关系、互斥关系

        竞争关系:多个生产者需向同一缓冲区写入数据,但缓冲区空间有限。互斥关系:为避免 “多个生产者同时写缓冲区” 导致数据不一致问题

        消费者之间:竞争关系、互斥关系

        竞争关系:缓冲区空间数据有限。互斥关系:若多个消费者同时读 / 取缓冲区,可能出现 “同一数据被重复消费”“消费过程中数据被破坏” 等问题, 互斥——同一时间,只有 1 个消费者能操作缓冲区,避免并发读的冲突

        消费者与生产者之间:互斥关系、同步关系

        互斥关系:消费者 “读缓冲区” 和生产者 “写缓冲区”不能同时进行生产者正在写数据时,消费者若读,会拿到错误数据;消费者正在读数据时,生产者若写,会破坏正在被消费的数据。同步关系生产与消费需节奏匹配,缓冲区满了生产者必须等待,缓冲区空了消费者必须等待!

两种角色:

        生产者角色和消费者角色(由线程承担)

一个场所:

        以特定结构构成的一种缓冲区(内存种)

        

基于阻塞队列的生产者消费者模型

BlockingQueue阻塞队列

        在多线程编程中阻塞队列(Blocking Queue)是⼀种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放⼊了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)

模型实现

使用阻塞队列作为生产者消费者模型种的缓冲区

// 基于阻塞队列的生产者消费者模型的实现目标:
// 1.使用阻塞队列作为缓冲区存放数据,实现生产者与消费者的解耦
// 2.实现生产者与消费的互斥,既生产者与消费者不能同时访问缓冲区
// 3.实现生产者与消费者的同步,既生产与消费需节奏匹配,缓冲区满了生产者必须等待,缓冲区空了消费者必须等待
#pragma once
#include <queue>
#include <iostream>
#include <pthread.h>
using namespace std;// 设定阻塞队列的空间大小为5
#define CAP 5template <typename T>
class blockqueue
{
public:blockqueue(): _capacity(CAP), _sleep_c(0), _sleep_p(0){pthread_cond_init(&_empty_cond, nullptr);pthread_cond_init(&_full_cond, nullptr);pthread_mutex_init(&mutex, nullptr);}~blockqueue(){pthread_cond_destroy(&_empty_cond);pthread_cond_destroy(&_full_cond);pthread_mutex_destroy(&mutex);}queue<T> &bq(){return _q;}// 生产void Equeuq(T &num){pthread_mutex_lock(&mutex);while (Full()){_sleep_p++;// 空间放满了,进入等待pthread_cond_wait(&_full_cond, &mutex);// 问题1: pthread_cond_wait是函数,有可能失败,失败后pthread_cond_wait立即返回// 问题2:pthread_cond_wait可能会因为条件不满足,而导致pthread_cond_wait伪唤醒// 伪唤醒指:线程从 pthread_cond_wait 返回时,并没有收到其他线程的 pthread_cond_signal 或 pthread_cond_broadcast 通知// 且等待的条件可能仍然不满足。这可能由操作系统内核调度、信号中断等底层机制导致// 需要使用循环判断是否为伪唤醒:伪唤醒意味着消费者并没有消费,空间仍然为满,循环判断后继续进行队列种等待,若不是伪唤醒则空间不满跳出循环_sleep_p--;}// 没有满,放入数据_q.push(num);// 已经有数据了,唤醒休眠的消费者if (_sleep_c){pthread_cond_signal(&_empty_cond);cout << "唤醒消费者\n";}pthread_mutex_unlock(&mutex);}// 消费T Pop(){pthread_mutex_lock(&mutex);while (Empty()){_sleep_c++;// 空了,进入等待,直到生产者生产数据pthread_cond_wait(&_empty_cond, &mutex);_sleep_c--;}// 没有空,消费数据T num;num = _q.front();_q.pop();// 空间不为满,唤醒休眠的生产者if (_sleep_p){pthread_cond_signal(&_full_cond);cout << "唤醒生产者\n";}pthread_mutex_unlock(&mutex);return num;}private:// 判断空间是否满了bool Full(){if (_capacity == _q.size())return true;return false;}// 判断空间是否空了bool Empty(){if (_q.size() == 0)return true;return false;}queue<T> _q;   // 使用队列作为实现阻塞队列的基础int _capacity; // 容量大小// 条件变量pthread_cond_t _empty_cond;pthread_cond_t _full_cond;// 锁pthread_mutex_t mutex;// 休眠数量int _sleep_p;int _sleep_c;
};
#include "blockqueue.hpp"
#include <unistd.h>// 生产者
void *produce(void *args)
{blockqueue<int> *q = static_cast<blockqueue<int> *>(args);for (int i = 0;; i++){sleep(1);q->Equeuq(i);cout << "生产一个数据:" << i << endl;}
}// 消费者
void *consume(void *args)
{blockqueue<int> *q = static_cast<blockqueue<int> *>(args);while (1){sleep(1);int num = q->Pop();cout << "消费一个数据:" << num << endl;}
}int main()
{blockqueue<int> bq;pthread_t t1, t2;pthread_create(&t1, nullptr, produce, &bq);sleep(6);pthread_create(&t2, nullptr, consume, &bq);pthread_join(t1, nullptr);pthread_join(t2, nullptr);
}
hyc@hyc-alicloud:~/linux/线程同步/blockqueue$ ./test
生产一个数据:0
生产一个数据:1
生产一个数据:2
生产一个数据:3
生产一个数据:4
唤醒生产者
消费一个数据:0
生产一个数据:5
唤醒生产者
消费一个数据:1
生产一个数据:6
唤醒生产者
消费一个数据:2

对于单生产单消费:

         单生产单消费,仅需要维护生产者与消费者之间的互斥与同步即可!

        互斥:由于锁的存在,不论是生产还是消费都是竞争的同一个把锁,所以就维护了互斥关系。

        同步:当空间满了时,生产者就进入对应条件变量下等待,只有消费者可以访问并读取数据。当空间空了时,消费者就进入对应条件变量下等待,只有生产者可以访问并写入数据。

其次,对于多生产多消费:

        在单生产单消费的基础上,我们还需要维护生产者之间的互斥,消费者之间的互斥关系。

        但是我们会发现,由于我们的消费者、生产者都是竞争的同一个锁,所以自然也就维护了生产者之间的互斥,消费者之间的互斥!


POSIX信号量

POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。但POSIX可以用于线程间同步

        使用场景:当资源的是一个整体,使用【mutex+二元信号量】

 (此时资源是一个整体,必须保证同一时间只能有一个线程访问,使用锁来保证。二元信号量:只能在01切换的信号量)

                          当资源被划分为了多个“块”,分批使用【信号量】        

(此时资源并不是一个整体,而是被拆分为了多个独立的“块”,多个线程可同时访问统一资源下的不同块!)

        本质:信号量本质是一个计数器,是对资源的预定机制!

        作用:保护临界资源

        操作:申请资源,计数器--(称作P操作,是一个原子操作)

                   释放资源,计数器++(称作V操作,是一个原子操作)

        所有的线程都可以看到信号量,申请资源计数器--,释放资源计数器++,信号量本身就是临界资源

举例理解信号量:

        将资源是视作一个电影院,当前资源被划分为了一个个独立的“块”资源,既电影院的空间被划分为了一个个座位!多个线程可以同时访问资源中不同的“块”,既多个人可以同时进入电影院坐在不同的位置看电影!

        信号量本质是一个计数器,其初始值的大小就是资源中有多个独立“块”资源的大小,既电影院有多少座位的数量!

        申请资源,计数器--,既为座位被卖出。释放资源,计数器--,既座位被退款。

信号量接口

初始化信号量:

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);参数:
pshared:0表⽰线程间共享,⾮零表⽰进程间共享
value:信号量初始值

销毁信号量:

int sem_destroy(sem_t *sem);

等待信号量:

P()操作,原子性的操作!!!功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem); 

发布信号量:

V()操作,原子性的操作!!!功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);//V()

封装信号量接口

为了后续使用更加方便,我们这里先将信号接口先封装进一个类中。信号量(semaphore)下面缩写为sem。

//Sem.hpp#include <semaphore.h>#define default 5class Sem
{
public:Sem(int cap = default){sem_init(&_sem, 0, _cap);}~Sem(){sem_destroy(&_sem);}void P(){sem_wait(&_sem);}void V(){sem_post(&_sem);}private:sem_t _sem;int _cap;
};

值得一提:

        一般情况下,当线程通过接口成功的申请到了信号量,不能再次申请,必须要等待线程自己拥有的信号量释放后才可再次申请!

        若线程申请信号量,没有成功,线程将会被阻塞在操作系统内核为该信号量维护的专属等待队列中,直到信号量资源可用并被唤醒。

基于环形队列的生产者消费者模型

环形队列

        如图,我们了解了环形队列是什么东西,然后可以通过数组来模拟环形队列。

 对于环形队列我们会发现:

        1.只要不同时访问同一个位置,数据的生产与消费可以同时进行!

        2.而只要环形队列不为空、不为满时,就不会访问同一个位置!

        3.为空时(互斥),只能让生产者先运行(同步)

           为满时(互斥),只能让消费者先运行(同步)

所有要实现该模型对环形队列就有以下约定:

        约定1:队列空,生产者先运行

        约定2:队列满,消费者先运行

        约定3:消费者的指针不能超过生产者

        约定4:生产者的指针不能超越消费者一圈

值得一提的是:

        如果我们使用信号实现基于环形队列的生产者消费者模型,其中的环形队列是没有必要留下一格空间的!因为留下一格空间是为了我们在使用指针判断的时候,方便判断其队列空间是满还是空。

        但如果我们使用的是信号量。生产者的信号为0,既代表没有空间可以生产了,空间满了。消费者的信号量为0,既代表没有资源可以消费了,空间空了!

        所以使用信号作为判断,是没有必要留下一格空间的。

模型实现

对锁封装:Mutex.hpp

// 封装锁接口
#pragma once
#include <pthread.h>class Mutex
{
public:Mutex(){pthread_mutex_init(&mutex, nullptr);}~Mutex(){pthread_mutex_destroy(&mutex);}void Lock(){pthread_mutex_lock(&mutex);}void Unlock(){pthread_mutex_unlock(&mutex);}private:pthread_mutex_t mutex;
};class LockGuard
{
public:LockGuard(Mutex &mutex): _Mutex(mutex){_Mutex.Lock();}~LockGuard(){_Mutex.Unlock();}private:// 为了保证锁的底层逻辑,锁是不能够拷贝的,并且也是没有拷贝构造函数的//  避免拷贝,应该引用Mutex &_Mutex;
};

对信号量封装:Sem.hpp

// 封装信号量接口
#pragma once
#include <semaphore.h>
#define SIZE 5class Sem
{
public:Sem(int _cap = SIZE){sem_init(&_sem, 0, _cap);}~Sem(){sem_destroy(&_sem);}void P(){// 申请信号量sem_wait(&_sem);}void V(){// 释放信号量sem_post(&_sem);}private:sem_t _sem;int _cap;
};

基于环形队列的模型实现:RingQueue.hpp

#pragma once
#include "Sem.hpp"
#include "Mutex.hpp"
#include <vector>
using namespace std;// 用数组模拟实现环形队列
#define cap 5template <typename T>
class RingQueue
{
public:RingQueue(int s = cap): _rq(s), _csem(0), _date_step(0), _psem(s), _blank_step(0) // 总容量是比循环队列少一个的{}void Equeue(T &num){// 生产者// 申请信号量_psem.P();// 可以先申请信号量,再加锁。也可以先加锁,再申请信号量。// 但是先让线程瓜分完资源再排队,效率会高一些。// 其次信号申请操作本身就是原子的,没有必要加锁。{// 加锁与解锁:维护多生产与多消费下,生产者之间、消费者之前的互斥关系LockGuard lockguard(_pmutex);// 放入数据int size = _rq.size();_rq[_date_step++] = num;// 维护环形队列性质_date_step %= size;}// 更新消费者的信号量_csem.V();}// 局部变量不能返回引用,会报错:越界访问T Pop(){// 消费者// 申请信号量_csem.P();T num;{// 加锁与解锁:维护多生产与多消费下,生产者之间、消费者之前的互斥关系LockGuard lockguard(_cmutex);// 消费资源int size = _rq.size();num = _rq[_blank_step++];// 维护环形队列性质_blank_step %= size;}// 更新生产者的信号量_psem.V();return num;}private:// 用数组模拟实现vector<T> _rq;// 消费者Sem _csem;int _date_step; // 不空位置下标// 生产者Sem _psem;int _blank_step; // 空位置下标// 两把锁Mutex _pmutex;Mutex _cmutex;
};

主函数:main.cc

#include "RingQueue.hpp"
#include <unistd.h>
#include <iostream>
using namespace std;// 生产者
void *produce(void *args)
{RingQueue<int> *q = static_cast<RingQueue<int> *>(args);for (int i = 0;; i++){q->Equeue(i);cout << "生产一个数据:" << i << endl;}
}// 消费者
void *consume(void *args)
{RingQueue<int> *q = static_cast<RingQueue<int> *>(args);while (1){sleep(1);int num = q->Pop();cout << "消费一个数据:" << num << endl;}
}int main()
{RingQueue<int> bq;pthread_t t1, t2;pthread_create(&t1, nullptr, produce, &bq);pthread_create(&t2, nullptr, consume, &bq);pthread_join(t1, nullptr);pthread_join(t2, nullptr);
}

首先,对于单生产单消费:

        单生产单消费,仅需要维护生产者与消费者之间的互斥与同步即可!

        互斥:既两个线程不会同时访问同一个资源。只需要维护当空or满的时候的互斥即可。由于信号量的缘故,当为空时,消费者是不可能进行消费的,只有生产者能够访问当前资源写入数据。同样的,当为满时。生产者的信号量减为0,是不可能再运行了,只有消费者才能访问当前在资源,读取数据。

        同步一个道理由于信号量,为空时消费者必须等待生产者写入数据。为满时,生产者必须等待消费者读取数据。

        所以,对于单生产单消费。仅凭信号量就维护了生产者与消费者之间的互斥与同步!

其次,对于多生产多消费:

        在单生产单消费的基础上,我们还需要维护生产者之间的互斥,消费者之间的互斥关系

        只需要在消费、生产上加锁,即可保证同一时间下,不会存在多个生产者写入数据、多个消费者读取数据!

        以上,我们通过了条件变量和信号量,知道了线程同步是如何实现的!并且还使用了条件变量、信号量外加锁来实现了重要的模型:生产者消费者模型。

        通过模型,我们掌握了线程互斥与同步的运用。

http://www.dtcms.com/a/398386.html

相关文章:

  • 【2025-系统规划与管理师】第六章:云资源规划
  • JDK17 新特性梳理
  • ZooKeeper源码分析与实战-模块一:基础篇
  • 网站建设 优势网站开发需求报告
  • jikuaiarc项目构建,参考arc,把arc一直到antlr4,生成g4文件
  • 白话讲讲GenAI、LLM、Agent、RAG、LangChain
  • (25.09)使用Livox-mid-360录制数据并运行Fast-lio2命令
  • 【图文】Codex接入Kimi K2/GLM-4.5 环境配置指南 (Windows/macOS/Ubuntu)
  • 异步 vs 同步:JavaScript中的速度与激情
  • Django模型与数据表的映射方式详解:不止Code First与Database First
  • LangChain4J-(7)-Function Calling
  • C程序设计-01程序设计和C语言
  • 为何上不了建设银行网站网络营销工程师前景
  • 设计模式的几个准则
  • python+nodejs+springboot在线车辆租赁信息管理信息可视化系统
  • 计算机毕业设计 基于Python的音乐推荐系统 Python 大数据毕业设计 Hadoop毕业设计选题【附源码+文档报告+安装调试】
  • 《人机分工重塑开发:遗留系统重构的AI实践指南》
  • 从0死磕全栈第十天:nest.js集成prisma完成CRUD
  • 网站开发做什么科目网页设计与网站建设连接数据库
  • 如何看网站是html几代做的加拿大pc网站搭建
  • C#的MVVM架构中的几种数据绑定方式
  • Jmeter接口测试:jmeter组件元件介绍,利用取样器中http发送请求
  • Apache Tomcat 部署与配置
  • 网站建设详细合同范本西部数码网站管理助手破解版
  • 权限提升专项训练靶场:hacksudo: L.P.E.
  • 工作笔记----lwip的数据管理结构pbuf源码解析
  • 生产环境实战:Spring Cloud Sleuth与Zipkin分布式链路追踪实践
  • 学习React-15-useImperativeHandle
  • 响应式网站案列小学生做电子小报的网站
  • 【AskAI系列课程】:P4.将AI助手集成到Astro网站前端