【Linux系统篇】:Linux线程同步---条件变量,信号量与CP模型实现
✨感谢您阅读本篇文章,文章内容是个人学习笔记的整理,如果哪里有误的话还请您指正噢✨
✨ 个人主页:余辉zmh–CSDN博客
✨ 文章所属专栏:Linux篇–CSDN博客
文章目录
- 一.同步的基本概念
- 二.条件变量
- 1.条件变量的接口函数
- 2.生产者-消费者模型
- 3.基于阻塞队列的生产者-消费者模型
- 三.POSIX信号量
- 1.POSIX信号量的接口函数
- 2.基于环形队列的生产者-消费者模型
一.同步的基本概念
1.同步的基本概念:
同步是一种协调机制,保证数据安全的情况下,让多线程访问资源时具有一定的顺序性,从而有效避免饥饿问题,主要解决线程间的“协作”问题;通过条件变量或信号量等方式实现。
注意:同步是在互斥基础上的一种解决策略;互斥问题本身也是一种解决策略;如果在纯互斥情况下,只要能保证线程之间的调度均衡,也能用来作为解决方法;但是在互斥的某些情况下,可能会出现线程调度,以及资源分配不均衡的情况而导致的饥饿问题,此时就要在互斥的前提下,通过同步来进行协调,让多线程访问资源时具有一定的顺序性,从而避免饥饿问题。
2.互斥与同步的区别:
互斥:
- 目的:保护共享资源
- 实现:互斥锁
- 特点:排他性访问
同步:
- 目的:协调线程执行顺序
- 实现:条件变量,信号量等
- 特点:线程间协作
3.互斥与同步的配合原则:
- 互斥是基础:
- 必须先有互斥保护
- 同步机制建立在互斥基础上
- 同步是目的:
- 互斥保护共享资源
- 同步协调线程行为
- 配合使用:
- 互斥锁保护共享资源
- 条件变量实现线程同步
- 两者缺一不可
二.条件变量
Linux中的条件变量是多线程编程中用于线程同步的重要机制,常与互斥锁配合使用(为什么后面会讲)。
主要作用:
条件变量允许线程在特定条件不满足时主动挂起(阻塞等待),并释放锁,避免忙等待。当其他线程通过某些操作使条件满足时,挂起的线程会被唤醒并重新尝试申请锁,继续执行。
比如:
一个线程访问队列时,发现队列为空,他只能阻塞等待,直到其他线程将一个节点添加到队列中,等待的线程被唤醒然后继续执行。这种情况就需要用到条件变量。
1.条件变量的接口函数
1.初始化条件变量:
// 静态初始化,在全局定义,不需要再调用初始化函数以及销毁函数
pthread_cond_t cond=PTHREAD_COND_INITIALIZER;// 动态初始化,在局部定义,需要调用初始化函数以及销毁函数
pthread_cond_t cond;
int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr);
- 参数
cond
是指向条件变量的指针 - 参数
attr
是条件变量的属性,通常设置为NULL
表示使用默认属性
2.销毁条件变量:
int pthread_cond_destroy(pthread_cond_t *cond);
- 用于销毁条件变量
- 必须在条件变量不再使用时调用销毁
3.等待条件变量:
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
- 使线程等待条件变量
- 必须与互斥锁配合使用
- 当前线程会自动释放申请的锁,并在被唤醒时重新申请锁
4.唤醒等待的线程:
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);
- 第一个函数是唤醒一个等待的线程
- 第二个函数是唤醒所有等待的线程
简单案例:
#include <iostream>
#include <unistd.h>
#include <pthread.h>
using namespace std;// 共享资源
int cnt = 0;// 静态初始化互斥锁和条件变量
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;void *Count(void *args){// 分离线程pthread_detach(pthread_self());uint64_t number = (uint64_t)args;cout << "pthread: " << number << " create success" << endl;while(true){// 申请锁pthread_mutex_lock(&mutex);// 等待条件变量pthread_cond_wait(&cond, &mutex);cout << "pthread: " << number << " , cnt: " << cnt++ << endl;// 释放锁pthread_mutex_unlock(&mutex);}
}int main(){for (uint64_t i = 0; i < 5; i++){pthread_t tid;pthread_create(&tid, nullptr, Count, (void *)i);usleep(10);}sleep(3);cout << "main thread ctrl begin: " << endl;while(true){sleep(1);// 唤醒一个等待的线程// pthread_cond_signal(&cond);// cout << "signal one thread..." << endl;// 唤醒所有等待的线程pthread_cond_broadcast(&cond);cout << "broadcast all thread..." << endl;}return 0;
}
唤醒一个等待的线程:
唤醒所有等待的线程:
1.为什么pthread_cond_wait
需要互斥量?
- 条件等待是线程同步的一种手段,如果只有一个线程,条件不满足,一直等下去都不会满足,所以必须要有一个线程通过某些操作,改变共享数据,使原先不满足的条件变得满足,并且友好的通知阻塞等待在条件变量上的线程;
- 条件不会无缘无故的突然就变得满足了,必然会牵扯到共享数据的变化。所以一定要用互斥锁来保护。没有互斥锁就无法安全的获取和修改共享数据。
2.为什么pthread_cond_wait
线程等待是在线程申请锁之后的?
前面已经提到过一点因为牵扯到共享数据的变化,需要先用互斥锁进行保护;
此外还有一点就是条件是否满足是要先进行判断的,而判断共享资源是否满足条件,本质上就是在访问共享资源,所以也是需要先用互斥锁进行保护的。
2.生产者-消费者模型
先用日常生活中的例子理解:
角色对应:
-
生产者–>供货商
生产行为:不断生产商品(比如面包,牛奶);
核心任务:将商品运送到超市的货架上;
约束:如果货架满了,必须等待,直到有空位再继续送货;
-
缓冲区–>超市
作用:暂存商品,平衡生产速度和消费速度不均衡;
容量限制:货架空间有限(比如最多存放100个面包);
-
消费者–>顾客
消费行为:从货架上购买商品;
核心任务:消费商品(比如买面包);
约束:如果或架空了,必须等待,直到有商品补货;
流程演示:
1.正常情况下(生产与消费平衡)
- 供货商生产面包—>放入超市货架
- 顾客购买面包—>从货架取走
- 货架既不空也不满,双方无需等待
2.货架已满(生产过快)
- 供货商继续生产,试图向货架放面包
- 货架已满—>供货商必须等待(阻塞),直到顾客买走面包腾出空间
- 当顾客买走一个面包后,超市通知供货商用空间了—>供货商恢复送货
3.货架已空(消费过快)
- 顾客想买面包了,但货架空了
- 货架为空—>顾客必须等待(阻塞),直到供货商补货
- 当供货商放入新面包后,超市通知顾客有货了—>顾客恢复购买
站在计算机角度来理解:
生产者和消费者都是由线程来承担,而超市作为缓冲区,则是一个特定结构的内存空间;生产者生产的商品,以及消费者消费的商品则是数据,相当于一个线程把数据交给另一个线程,所以生产者-消费者模型本质上就是执行流在做通信。
只不过这里重点不是怎么通信,而是如何安全高效的通信?
因为执行流间通信首先要看到同一份资源,而线程和线程之间的特定结构的内存空间就是共享资源,既然是共享资源,那么就一定会有并发问题。
生产者-消费者模型的三种关系:
- 生产者vs生产者:互斥关系,因为可能存在多个生产者线程竞争生产数据存放到缓冲区中;
- 消费者vs消费者:互斥关系,因为可能存在多个消费者线程竞争从缓冲区中消费数据;
- 生产者vs消费者:互斥与同步关系,互斥是前提,主要保证数据安全;同步是目的,保证生产和消费两者的顺序性;因为缓冲区为空时,只能生产者先访问执行;缓冲区已满时,只能消费者先访问执行,需要限制访问顺序。
快速记住生产者-消费者模型:“321原则“
3种关系;2种角色—生产者和消费者;1种个缓冲区—特定结构的内存空间
生产者消费者模型的优点:
- 解耦
- 支持并发
- 支持忙闲不均
3.基于阻塞队列的生产者-消费者模型
什么是阻塞队列(BlockQueue
)?
在对线程编程中阻塞队列(BlockQueue
)是一种常用于实现生产者和消费者模型的数据结构(用于充当缓冲区)。与普通队列的区别在于,当队列为空时,消费者线程从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,生产者线程往队列中存放元素的操作也会被阻塞,直到有元素从队列中取出。
先用单生产单消费讲解:
- C++ queue实现阻塞队列BlockQueue的封装:
//BlockQueue.hpp文件:
#pragma once#include <iostream>
#include <pthread.h>
#include <queue>
#include <unistd.h>template <class T>
class BlockQueue{static const int defaultnum = 5;public:BlockQueue(int max_capacity = defaultnum): _max_capacity(max_capacity){// 初始化互斥锁和条件变量pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_c_cond, nullptr);pthread_cond_init(&_p_cond, nullptr);}T pop(){// 加锁pthread_mutex_lock(&_mutex);// 消费条件:判断队列是否为空if(_q.size() == 0){pthread_cond_wait(&_c_cond, &_mutex);}T out = _q.front(); // 消费前需要先满足消费条件 _q.pop(); // 消费后通知生产者进行生产,唤醒一个等待生产的线程pthread_cond_signal(&_p_cond);// 释放锁pthread_mutex_unlock(&_mutex);return out;}void push(const T& in){// 加锁pthread_mutex_lock(&_mutex);// 生产条件:判断队列是否等于极值---判断也是在访问临界资源if (_q.size() == _max_capacity){pthread_cond_wait(&_p_cond, &_mutex);}// 可以执行生产有两种情况---1.队列没满,条件满足;2.被唤醒_q.push(in); // 生产前需要先满足生产条件// 生产后通知消费者进行消费,唤醒一个等待消费的线程pthread_cond_signal(&_c_cond);// 释放锁pthread_mutex_unlock(&_mutex);}~BlockQueue(){// 销毁互斥锁和条件变量pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_c_cond);pthread_cond_destroy(&_p_cond);}private:std::queue<T> _q;// 极值int _max_capacity;// 互斥锁pthread_mutex_t _mutex;// 条件变量pthread_cond_t _c_cond;pthread_cond_t _p_cond;
};
- 单生产单消费实现:
//main.cc文件:
#include "BlockQueue.hpp"void *Consumer(void *args){BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);while(true){// 消费// 测试一:每2秒消费一次//sleep(2);int data = bq->pop();std::cout << "消费了一个数据:" << data << std::endl;}
}void *Producter(void *args){BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);int data = 0;while (true){// 生产// 测试二:每2秒生产一次sleep(2);data++;bq->push(data);std::cout << "生产了一个数据:" << data << std::endl;}
}int main(){// 创建一个阻塞队列对象充当特定的内存BlockQueue<int> *bq = new BlockQueue<int>();// 创建两个线程--生产者和消费者pthread_t c, p;pthread_create(&c, nullptr, Consumer, bq);pthread_create(&p, nullptr, Producter, bq);// 等待回收两个线程pthread_join(c, nullptr);pthread_join(p, nullptr);// 释放阻塞队列delete bq;return 0;
}
测试一:每2秒消费一次
测试二:每2秒生产一次
1.生产者的生产行为不光是将数据存放到仓库中,还要先获取数据,可以从用户或者网络中获取;并且获取时也是要花时间获取的
生产的两个动作:1.获取数据;2.生产数据到仓库中;
2.消费者的消费行为不光是从仓库中消费数据,还要对数据进行加工处理,并且加工处理也是要花时间的
消费的两个动作:1.从仓库中消费数据;2.加工处理数据;
3.生产消费模型是高效的,为什么高效?
生产者生产数据到仓库中和消费者从仓库中消费数据都是经过互斥与同步机制进行保护的,所以多执行流情况下,这两个动作都是访问临界,也就是串行执行,既然是串行执行,所以在这两个地方一定无法体现出高效;但是前面已经提到了,生产者的生产行为不光是生产数据到仓库中,还要花时间来获取数据,而消费者的消费行为也不光是从仓库中消费数据,还要花时间来加工处理数据;所以生产者正在执行生产数据到仓库中时,虽然消费者可能无法执行从仓库中消费数据,但并不影响加工处理数据;反观,消费者正在执行从仓库中消费数据,虽然生产者可能无法执行生产数据到仓库中,但并不影响获取数据;也就是说一个访问临界区,一个访问非临界区,这种情况两个线程就可以高效并发执行了。
4.生产者消费者模型本质上是让多线程并发来协作完成任务的,所以不是说单纯的传入一个简单的数据就完事了,还可以传递由类封装的任务对象,让一个线程指派另一个线程去完成任务!
- 简单任务类封装:
//Task.hpp文件:
#pragma once
#include <iostream>
#include <string>std::string opers = "+-*/%";enum{DivZero = 1,ModZero,Unknown
};class Task{
public:Task(int data1, int data2, char op):_data1(data1),_data2(data2),_oper(op),_result(0),_exitcode(0){}void run(){switch(_oper){case '+':_result = _data1 + _data2;break;case '-':_result = _data1 - _data2;break;case '*':_result = _data1 * _data2;break;case '/':{if (_data2 == 0)_exitcode = DivZero;else_result = _data1 / _data2;}break;case '%':{if (_data2 == 0)_exitcode = ModZero;else_result = _data1 % _data2;}break;default:_exitcode = Unknown;break;}}std::string GetResult(){std::string ret;ret += std::to_string(_data1);ret += _oper;ret += std::to_string(_data2);ret += '=';ret += std::to_string(_result);ret += "[code: ";ret += std::to_string(_exitcode);ret += ']';return ret;}std::string GetTask(){std::string ret;ret += std::to_string(_data1);ret += _oper;ret += std::to_string(_data2);ret += "=?";return ret;}~Task(){}private:int _data1;int _data2;char _oper;int _result;int _exitcode;
};
- 单生产单消费实现:
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <ctime>void *Consumer(void *args){BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);while(true){// 从仓库中消费数据Task t = bq->pop();// 加工处理数据t.run();std::cout << "消费了一个任务:" << t.GetResult() << std ::endl;}
}void *Producter(void *args){BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);int data = 0;while (true){// 获取数据sleep(1);int data1 = rand() % 10 + 1;int data2 = rand() % 10;char op = opers[rand() % opers.size()];Task t(data1, data2, op);// 生产数据到仓库中bq->push(t);std::cout << "生产了一个任务:" << t.GetTask() << std::endl;}
}int main(){srand(time(nullptr));// 创建一个阻塞队列对象充当特定的内存BlockQueue<Task> *bq = new BlockQueue<Task>();// 创建两个线程--生产者和消费者pthread_t c, p;pthread_create(&c, nullptr, Consumer, bq);pthread_create(&p, nullptr, Producter, bq);// 等待回收两个线程pthread_join(c, nullptr);pthread_join(p, nullptr);// 释放阻塞队列delete bq;return 0;
}
关于阻塞队列封装实现的细节点:
1.为什么push
函数和pop
函数这里,判断条件语句,需要放在加锁和解锁之间(也就是临界区中)?
因为判断本身也是在访问临界资源,只要是访问临界资源就只能放在加锁后面;如果判断出临界资源不就绪时,当前线程就要通过条件变量阻塞等待,并且阻塞等待时是占用锁资源的,所以需要先把申请的锁释放掉,这就是为什么phread_cond_wait
函数第二个参数是锁的原因,调用的时候,会自动释放锁;当临界资源就绪,等待的线程被唤醒后,会重新持有锁。
2.什么是伪唤醒?
在多线程的情况下,如果有多个生产者线程都在进行生产,可是生产条件不满足,此时就有多个生产者线程投入到条件变量下阻塞等待,如果生产条件满足,临界资源就绪,此时只需要唤醒一个生产者线程即可,但是可能却把多个线程全部唤醒了,此时所有被唤醒的线程就不在条件变量队列下等待了,有序性就消失,所有被唤醒的线程就会竞争式的竞争一把锁,即便只有一个线程申请锁成功了,但是生产完后也要释放锁,刚才剩余的竞争失败的线程就会继续竞争锁,假如此时生产条件又不满足了,竞争到锁的线程就会继续生产,就会导致出现错误,这种情况就是伪唤醒。
如何防止出现伪唤醒情况?
只需要在申请锁后,判断条件语句从if
形式修改成while
形式即可,多次判断防止出现伪唤醒情况:
T pop(){// 加锁pthread_mutex_lock(&_mutex);// 消费条件:判断队列是否为空// if(_q.size() == 0){// pthread_cond_wait(&_c_cond, &_mutex);// }while(_q.size() == 0){pthread_cond_wait(&_c_cond, &_mutex);}T out = _q.front(); // 消费前需要先满足消费条件 _q.pop(); // 消费后通知生产者进行生产,唤醒一个等待生产的线程pthread_cond_signal(&_p_cond);// 释放锁pthread_mutex_unlock(&_mutex);return out;}void push(const T& in){// 加锁pthread_mutex_lock(&_mutex);// 生产条件:判断队列是否等于极值---判断也是在访问临界资源// if (_q.size() == _max_capacity){// pthread_cond_wait(&_p_cond, &_mutex); // 可能会出现伪唤醒情况// }// while循环判断,防止出现伪唤醒情况while(_q.size() == _max_capacity){pthread_cond_wait(&_p_cond, &_mutex);}// 可以执行生产有两种情况---1.队列没满,条件满足;2.被唤醒_q.push(in); // 生产前需要先满足生产条件// 生产后通知消费者进行消费,唤醒一个等待消费的线程pthread_cond_signal(&_c_cond);// 释放锁pthread_mutex_unlock(&_mutex);}
- 多生产多消费实现
只需要修改主函数即可:
int main(){srand(time(nullptr));// 创建一个阻塞队列对象充当特定的内存BlockQueue<Task> *bq = new BlockQueue<Task>();// // 创建两个线程--生产者和消费者// pthread_t c, p;// pthread_create(&c, nullptr, Consumer, bq);// pthread_create(&p, nullptr, Producter, bq);// // 等待回收两个线程// pthread_join(c, nullptr);// pthread_join(p, nullptr);// 创建多生产者线程和多消费者线程pthread_t c[3], p[5];for (int i = 0; i < 3; i++){pthread_create(c + i, nullptr, Consumer, bq);sleep(1);}for (int i = 0; i < 5; i++){pthread_create(p + i, nullptr, Producter, bq);sleep(1);}// 等待回收所有线程for (int i = 0; i < 3; i++){pthread_join(c[i], nullptr);}for (int i = 0; i < 5; i++){pthread_join(p[i], nullptr);}// 释放阻塞队列delete bq;return 0;
}
三.POSIX信号量
POSIX信号量是一种用于进程间或线程间同步的机制,遵循POSIX标准;它通过管理计数器来控制对共享资源的访问,防止竞争条件。
在之前讲解System V
信号量时,已经讲解过关于信号量的相关概念,POSiX
信号量和System V
信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源的目的,但POSIX
信号量可以用于线程间同步;至于两者相关概念的理解都是一样,只不过实现方式以及接口函数的使用不同。
信号量的相关概念总结如下:
1.信号量本质上是一把计数器,申请和释放信号量的操作叫做
PV
操作—具有原子性2.执行流申请临界资源,必须先申请信号量资源,得到信号量之后,才能访问临界资源
3.信号量值只有0和1两态的,叫做二元信号量,本质上就是一个锁,具有互斥功能
4.申请信号量资源的本质:是对临界资源的预定机制
既然信号量的本质是一把计数器,那么这把计数器的本质又是什么?
计数器本质上就是描述临界资源中资源数量的,只不过把临界资源是否就绪放在了临界区之外,在申请信号量时,其实就已经间接的在做判断了。
1.POSIX信号量的接口函数
1.初始化信号量:
#include <semaphore.h>int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
sem
:目标信号量的地址;pshared
:0表示目标信号量线程间共享;非0表示进程间共享value
:目标信号量的初始值
2.销毁信号量:
int sem_destroy(sem_t *sem);
3.等待(申请)信号量—>P()操作:
// 功能:等待(申请)信号量,会将信号量的值减一
int sem_wait(sem_t *sem);
4.发布(释放)信号量—>V()操作:
// 功能:发布(释放)信号量,表示资源使用完毕,可以归还资源了,将信号量的值加一
int sem_post(sem_t *sem);
2.基于环形队列的生产者-消费者模型
什么是环形队列:
环形队列,也称为循环队列,是一种特殊的线性数据结构,其底层通常基于数组或链表实现,它的核心是首尾相连,形成一个逻辑上的”环“,从而能够高效的重复利用已释放的空间,避免普通队列因出队操作导致的”空间浪费“问题。
核心结构与原理:
1.物理结构:
- 通常采用数组实现,物理存储是线性的,但通过模运算(取余)使头尾指针在逻辑上循环。
- 包含两个关键指针:
- 头指针(front):指向队列的第一个元素。
- 尾指针(rear):指向队列最后一个元素的下一个位置(或当前可插入的位置)。
2.逻辑循环:
- 指针的移动规则,先后移再模运算:
front++;front%=capacity
以及rear++;rear%=capacity
;当指针移动到数组末尾时,通过先后移再模运算,就可以跳到数组开头,形成循环。
3.判空和判满:
- 判空:
front==rear
; - 判满(两种常见方式):
- 舍弃一个存储单元,满足
(rear+)%capacity==front
时认为队列满; - 增加一个计数器
size
,通过size == capacity
判断队列满;
- 舍弃一个存储单元,满足
因为信号量本质上就是一个计数器,所以在实现时可以直接采用信号量来进行判空和判满。
单生产单消费为例:
只要生产者和消费者不访问同一个位置,就可以实现同时生产和消费,那么什么时候生产者和消费者会访问同一个位置?
当然是环形队列为空和为满的时候会访问同一个位置,反过来不空或者不满时,生产者和消费者一定指向不同的位置,就可以实现同时生产和消费;
必须满足的三个条件:
1.指向同一个位置:为空时,只能生产者访问;为满时,只能消费者访问;
2.消费者不能超过生产者;
3.生产者不能把消费者套一个圈;
采用环形队列实现时,要满足上面三个条件就需要借助信号量来实现
生产者关注的是环形队列中剩余多少空间,可以定义一个SpaceSem
信号量用来表示空间资源的数量,初始时,队列为空,SpaceSem = N
;
消费者关注的是环形队列中剩余多少数据,可以定义一个DataSem
信号量用来表示数据资源的数量,初始时,队列为空,DataSem = 0
;
生产者生产之间需要先有一个空位置,所以先执行P(SpaceSem)
操作(SpaceSem--
)申请一个空间资源,然后才能进行生产;而消费者消费之前需要先有一个数据,所以先执行P(DataSem)
操作(DataSem--
)申请一个数据资源,然后才能进行消费;
在最开始环形队列为空时,一定会是生产者先开始执行,因为此时的数据资源DataSem = 0
,消费者申请不到数据资源,挂起等待;反过来在环形队列为满时,一定会是消费者先开始执行,因为此时的空间资源SpaceSem = 0
,生产者申请不到空间资源,挂起等待;
所以这就是指向同一个位置,为空时,生产者先访问;为满时,消费者先访问,满足第一个条件。
当生产者生产完一个数据后,空间资源还在占用着,但是数据资源多了一个,所以生产者生产完后,要执行V(DataSem)
操作(DataSem++
);当消费者消费完一个数据后,数据资源也是还在占用着,但是空间资源多了一个,所以消费者消费完后,要执行V(SpaceSem)
操作(SpaceSem++
)。
如果队列空后,生产者一直生产,消费者一直不消费,一旦生产者将整个队列生产满时,此时队列为满,SpaceSem = 0
,生产者就无法继续申请空间资源,要挂起等待,此时生产者和消费者访问同一个位置,但只能消费者继续执行,所以这就是生产者不能把消费者套一个圈,满足第三个条件;
如果队列满后,消费者一直消费,生产者一直不生产,一旦消费者将整个队列消费空时,此时队里为空,DataSem = 0
,消费者就无法继续申请数据资源,要挂起等待,此时生产者和消费者访问同一个位置,但只能生产者继续执行,所以这就是消费者不能超过生产者,满足第二个条件。
单生产单消费:
- 环形队列的封装:
#pragma once
#include <iostream>
#include <vector>
#include <semaphore.h>template<class T>
class RingQueue{static const int defaultcap = 5;private:// 申请信号量封装成P操作void P(sem_t &sem){sem_wait(&sem);}// 释放信号量封装成V操作void V(sem_t &sem){sem_post(&sem);}public:RingQueue(int capacity=defaultcap):_ringqueue(capacity),_capacity(capacity),_c_step(0),_p_step(0){sem_init(&_c_data_sem, 0, 0);sem_init(&_p_space_sem, 0, capacity);}// 生产者void push(const T &in){P(_p_space_sem);_ringqueue[_p_step] = in;V(_c_data_sem);// 位置后移,维持环形队列_p_step++;_p_step %= _capacity;}//消费者void pop(T *out){P(_c_data_sem);*out = _ringqueue[_c_step];V(_p_space_sem);// 位置后移,维持环形队列_c_step++;_c_step %= _capacity;}~RingQueue(){sem_destroy(&_c_data_sem);sem_destroy(&_p_space_sem);}private:std::vector<T> _ringqueue;int _capacity;int _c_step; // 消费者指针int _p_step; // 生产者指针sem_t _c_data_sem; // 消费者关注的数据资源sem_t _p_space_sem; // 生产者关注的空间资源
};
- 单生产单消费实现:
#include "RingQueue.hpp"
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <ctime>
#include <string>template<class T>
class ThreadData{
public:RingQueue<T> *_rq;std::string _threadname;
};// 生产者
void *Producter(void *args){ThreadData<int> *td = static_cast<ThreadData<int> *>(args);RingQueue<int> *rq = td->_rq;std::string threadname = td->_threadname;while (true){// 测试一:每一秒生产一次sleep(1);// 1.获取数据int data = rand() % 10 + 1;// 2.生产数据rq->push(data);std::cout << "Producter data done, data is: " << data << " who: " << threadname << std::endl;}return nullptr;
}// 消费者
void *Consumer(void *args){ThreadData<int> *td = static_cast<ThreadData<int> *>(args);RingQueue<int> *rq = td->_rq;std::string threadname = td->_threadname;while(true){// 测试二:每1秒消费一次//sleep(1);// 1.消费数据int data = 0;rq->pop(&data);// 2.处理数据data++;std::cout << "Consumer get a data, data is: " << data << " who: " << threadname << std::endl;}return nullptr;
}int main(){srand(time(nullptr));RingQueue<int> *rq = new RingQueue<int>();pthread_t c, p;ThreadData<int> *c_td = new ThreadData<int>();c_td->_rq = rq;c_td->_threadname = "Consumer";pthread_create(&c, nullptr, Consumer, c_td);ThreadData<int> *p_td = new ThreadData<int>();p_td->_rq = rq;p_td->_threadname = "Producter";pthread_create(&p, nullptr, Producter, p_td);pthread_join(c, nullptr);pthread_join(p, nullptr);delete c_td;delete p_td;delete rq;return 0;
}
测试一:每一秒生产一次
测试二:每一秒消费一次
多生产多消费:
在单生产单消费中,信号量只是保证了生产者和消费者之间的互斥与同步关系;而生产者线程和消费者线程只有一个,所以在访问各自的数组下标时,不会出现并发访问的情况,所以不需要再处理生产者和生产者的互斥关系以及消费者与消费者的互斥关系;
但是在多生产多消费的情况下,多生产者线程有多个,而数组下标_p_step
只有一个,所以会出现并发访问共享资源的情况,还需要单独处理生产者和生产者的互斥关系,所以需要加锁进行保护;多消费者线程也是同理,也需要加锁保护共享资源–数组下标_c_step
。
因此在基于环形队列实现的多生产多消费中,即使已经通过信号量控制了生产者和消费者的执行条件,仍然需要加锁进行保护。
- 环形队列的封装修改:
#pragma once
#include <iostream>
#include <vector>
#include <semaphore.h>
#include <pthread.h>template<class T>
class RingQueue{static const int defaultcap = 5;private:// 申请信号量封装成P操作void P(sem_t &sem){sem_wait(&sem);}// 释放信号量封装成V操作void V(sem_t &sem){sem_post(&sem);}// 申请锁封装void Lock(pthread_mutex_t &mutex){pthread_mutex_lock(&mutex);}// 释放锁封装void UnLock(pthread_mutex_t &mutex){pthread_mutex_unlock(&mutex);}public:RingQueue(int capacity=defaultcap):_ringqueue(capacity),_capacity(capacity),_c_step(0),_p_step(0){sem_init(&_c_data_sem, 0, 0);sem_init(&_p_space_sem, 0, capacity);pthread_mutex_init(&_c_mutex, nullptr);pthread_mutex_init(&_p_mutex, nullptr);}// 生产者void push(const T &in){P(_p_space_sem);// 先申请信号量再申请锁Lock(_p_mutex);_ringqueue[_p_step] = in;// 位置后移,维持环形队列_p_step++;_p_step %= _capacity;UnLock(_p_mutex);V(_c_data_sem);}//消费者void pop(T *out){P(_c_data_sem);Lock(_c_mutex);*out = _ringqueue[_c_step];// 位置后移,维持环形队列_c_step++;_c_step %= _capacity;UnLock(_c_mutex);V(_p_space_sem);}~RingQueue(){sem_destroy(&_c_data_sem);sem_destroy(&_p_space_sem);pthread_mutex_destroy(&_c_mutex);pthread_mutex_destroy(&_p_mutex);}private:std::vector<T> _ringqueue;int _capacity;int _c_step; // 消费者下标int _p_step; // 生产者下标sem_t _c_data_sem; // 消费者关注的数据资源sem_t _p_space_sem; // 生产者关注的空间资源pthread_mutex_t _c_mutex;pthread_mutex_t _p_mutex;
};
为什么申请锁选择放在申请信号量之后?
1.首先站在技术角度上:信号量资源是不需要被保护的,因为申请信号量的P操作是原子的;并且临界区的代码要尽可能少,所以一般而言不需要把申请信号量放在加锁之后;
2.逻辑角度上:如果先申请锁,只有一个执行流能申请成功,申请锁成功的执行流才去申请信号量,所以申请锁和申请信号量一定是串行的;而先申请信号量再申请锁,能保证在多线程竞争访问申请信号量期间,可以有多个执行流申请成功,然后再竞争申请锁,即使只有一个执行流可以申请锁成功,但是一旦该执行流释放锁后,先前已经申请信号量成功,但是申请锁失败的执行流后立即再次申请锁,此时申请信号量和申请锁就变成了并行的,提高多线程并发执行的效率。
- 多生产多消费实现:
#include "RingQueue.hpp"
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <ctime>
#include <string>template<class T>
class ThreadData{
public:RingQueue<T> *_rq;std::string _threadname;
};// 生产者
void *Producter(void *args){ThreadData<int> *td = static_cast<ThreadData<int> *>(args);RingQueue<int> *rq = td->_rq;std::string threadname = td->_threadname;while (true){// 测试一:每一秒生产一次sleep(1);// 1.获取数据int data = rand() % 10 + 1;// 2.生产数据rq->push(data);std::cout << "Producter data done, data is: " << data << " who: " << threadname << std::endl;}delete td; // 在线程结束时释放 ThreadData 对象return nullptr;
}// 消费者
void *Consumer(void *args){ThreadData<int> *td = static_cast<ThreadData<int> *>(args);RingQueue<int> *rq = td->_rq;std::string threadname = td->_threadname;while(true){// 测试二:每一秒消费一次//sleep(1);// 1.消费数据int data = 0;rq->pop(&data);// 2.处理数据data++;std::cout << "Consumer get a data, data is: " << data << " who: " << threadname << std::endl;}delete td; // 在线程结束时释放 ThreadData 对象return nullptr;
}int main(){srand(time(nullptr));RingQueue<int> *rq = new RingQueue<int>();// 多生产多消费:pthread_t c[3], p[5];for (int i = 0; i < 3; i++){ThreadData<int> *c_td = new ThreadData<int>();c_td->_rq = rq;c_td->_threadname = "Consumer-" + std::to_string(i);pthread_create(c + i, nullptr, Consumer, c_td);}for (int i = 0; i < 5; i++){ThreadData<int> *p_td = new ThreadData<int>();p_td->_rq = rq;p_td->_threadname = "Producter-" + std::to_string(i);pthread_create(p + i, nullptr, Producter, p_td);}// 等待回收所有线程for (int i = 0; i < 3; i++){pthread_join(c[i], nullptr);}for (int i = 0; i < 5; i++){pthread_join(p[i], nullptr);}delete rq;return 0;
}
测试一:每一秒生产一次
测试二:每一秒消费一次
以上就是关于Linux线程同步的讲解,如果哪里有错的话,可以在评论区指正,也欢迎大家一起讨论学习,如果对你的学习有帮助的话,点点赞关注支持一下吧!!!