Linux 线程的同步与互斥机制及应用
Liunx-线程的同步与互斥
1. 概念
-
共享资源: 多个执行流,看到的同一份公共资源。
-
临界资源: 被保护起来的共享资源叫做临界资源,即一次仅允许一个进程或线程使用的资源。
-
保护的机制: 同步和互斥。
-
互斥: 任何时刻,只允许一个执行流访问资源,叫做互斥。
-
同步: 多个执行流,访问临界资源的时候,具有一定的顺序性,叫做同步。
-
-
临界区: 访问临界资源的那部分代码。
-
非临界区: 不访问临界资源的代码。
-
原子性: 该操作是两态的,要么完成,要么未完成。不会被其他任何操作打断。
-
数据不一致: 当多个执行流(线程、进程)在没有适当保护的情况下,并发 地读取和修改同一份共享数据时,最终数据的值或数据间的逻辑关系会与预期不符。
2. 互斥
2.1 现象
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>int ticket = 100;
void *route(void *arg)
{char *id = (char *)arg;while (1){if (ticket > 0){usleep(1000);printf("%s sells ticket:%d\n", id, ticket);ticket--;}else{break;}}return nullptr;
}
int main(void)
{pthread_t t1, t2, t3, t4;pthread_create(&t1, NULL, route, (void *)"thread 1");pthread_create(&t2, NULL, route, (void *)"thread 2");pthread_create(&t3, NULL, route, (void *)"thread 3");pthread_create(&t4, NULL, route, (void *)"thread 4");pthread_join(t1, NULL);pthread_join(t2, NULL);pthread_join(t3, NULL);pthread_join(t4, NULL);return 0;
}// 1.执行的结果:
thread 1 sells ticket:100
...
thread 4 sells ticket:5
thread 2 sells ticket:4
thread 3 sells ticket:4
thread 1 sells ticket:2
thread 4 sells ticket:1
thread 2 sells ticket:0
thread 1 sells ticket:-1
thread 3 sells ticket:-2// 2.执行的结果:
thread 1 sells ticket:100
...
thread 1 sells ticket:4
thread 3 sells ticket:3
thread 2 sells ticket:2
thread 4 sells ticket:1
thread 1 sells ticket:0
thread 3 sells ticket:-1
thread 2 sells ticket:-1
为什么会出现这样的结果?
-
if语句判断条件为真以后,代码可以并发的切换到其他线程。 -
usleep模拟真实的业务处理过程,在这个过程中,有可能会有多个线程进入该代码段。 -
--ticket不是一个原子操作。
2.2 理解现象
解释 --ticket 不是一个原子操作:
C/C++代码会在编译时转换为汇编代码,其实 --ticket 语句,在汇编角度需要 三条指令:
-
load:将共享变量 ticket 从内存中加载到寄存器中。 -
modify:更新寄存器里面的值,执行 -1 操作。 -
write:将新值,从寄存器写回共享变量 ticket 的内存地址。
所以数据不一致可能出现的现象如下:
-
线程 A 开始执行
--ticket操作,首先通过 LOAD 指令将内存中ticket的值(假设为 100)读取到其私有寄存器(线程的上下文)中。 -
就在此时,发生了线程调度切换,线程 A 的执行被挂起,其上下文(包括寄存器中保存的
ticket=100)被保留。 -
线程 B 开始执行。它非常幸运,在没有被中断的情况下,完整地执行了多次
--ticket操作(即完整的 LOAD-MODIFY-WRITE 周期),最终成功地将内存中的ticket值修改为了 1。 -
调度器再次切换回线程 A。线程 A 恢复其上下文,包括那个已经过时的寄存器值(
ticket=100)。它接着执行被中断的操作:-
MODIFY:在自己的寄存器中将 100 减 1,得到 99。
-
WRITE:将这个过期的结果 99 写回内存。
-
最终,内存中的 ticket 值从线程 B 离开时的 1 被错误地覆盖成了 99。线程 B 期间所有成功的修改(从 100 到 1)都被线程 A 的这次 回退操作完全覆盖了,导致了严重的数据不一致。
而 2.1 代码导致 ticket 出现负数的根本原因是 if(ticket > 0)
比如 ticket = 1,可能发生以下时序:
-
线程A 执行
if (ticket > 0),此时ticket = 1,条件为真,进入if块。 -
在线程A执行
usleep或printf期间,发生线程切换。 -
线程B 执行
if (ticket > 0),此时ticket还是1(因为A还没减),条件为真,也进入if块。 -
线程C、D… 同样检查并通过判断,都进入了if块。
重点:
多线程中会有更多的切换,通常切换的时间点为:
时间片结束(因为时钟中断陷入内核)
阻塞式IO(通常是由系统调用触发的,而陷入内核后因为资源未就绪而导致切换)
系统调用(通过软中断 syscall 陷入内核,在执行完毕后从内核态返回用户态时检查线程是否需要切换)
内核态返回用户态的时候,不仅会进行信号的检查,也会进行线程切换的检查。
解决方案:通过互斥锁/互斥量解决。
补充线程上下文理解:
程序计数器 (PC): 存放下一条要执行的指令的地址。
栈指针 (SP): 指向当前线程栈的顶部。
通用寄存器 (RAX, RBX, RCX, RDX…): 存放线程计算过程中的临时数据。
状态寄存器 (EFLAGS / RFLAGS): 存放上一条指令执行后的状态(如是否为0、是否溢出等)。
2.3 互斥锁接口
2.3.1 创建锁与释放锁
1️⃣ 第一种方式(全局锁,无需释放)
#include <pthread.h>pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
2️⃣ 第二种方式(局部锁,需要释放)
#include <pthread.h>int pthread_mutex_init(pthread_mutex_t *restrict mutex,const pthread_mutexattr_t *restrict attr);int pthread_mutex_destroy(pthread_mutex_t *mutex);
参数:
-
mutex: 指向要初始化的互斥锁对象的指针。 -
attr: 指向互斥锁属性对象的指针。如果为NULL,则使用默认属性。
2.3.2 加锁与解锁
#include <pthread.h>int pthread_mutex_lock(pthread_mutex_t *mutex); // 阻塞式加锁
int pthread_mutex_unlock(pthread_mutex_t *mutex);
2.4 解决现象
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;int ticket = 100;void *route(void *arg)
{char *id = (char *)arg;while (1){pthread_mutex_lock(&mutex);// 临界区代码 startif (ticket > 0){usleep(1000);printf("%s sells ticket:%d\n", id, ticket);ticket--;// 临界区代码 endpthread_mutex_unlock(&mutex);}else{pthread_mutex_unlock(&mutex);break;}}return nullptr;
}
2.5 锁的原理
多个执行流竞争式申请锁,而多个线程首先都需要获取锁,因此,锁本身就是临界资源,申请锁的过程,必须是原子的。
-
申请锁成功:该执行流继续向后运行,访问临界区资源/代码。
-
申请锁失败:阻塞挂起。
锁的伪代码:
lock: // 加锁movb $0, %al // 向线程的私有上下文(al寄存器)写入0xchgb %al, mutex // 与内存中的 mutex 变量交换if(al寄存器的内容 > 0) {return 0;} else挂起等待;goto lock; // 重新申请锁
unlock: //解锁movb $1, mutex // 向内存中的 mutex 变量写入1唤醒等待mutex的线程;return 0;

锁的本质:将执行临界区代码由并行转换为串行,在执行期间,不会被打扰,是一种原子性操作。
-
xchgb %al, mutex是一个原子操作(不可被中断),确保只有一个线程能成功将mutex从1换成0。 -
如果多个线程同时尝试加锁,只有一个能成功,其他线程会看到
al == 0,从而阻塞等待。
锁通过原子操作确保只有一个线程能进入临界区,其他线程必须等待锁释放,从而防止多个线程同时访问共享资源,保护了临界区代码的安全执行。
- 进程 / 线程切换时,CPU 的寄存器硬件只有一套,但寄存器内的数据可以有多份,各自对应当前执行流(进程 / 线程)的上下文。
- 若将内存中变量的内容交换到 CPU 寄存器内部,本质是把该变量内容获取到当前执行流的硬件上下文 (私有上下文) 中。
- 当前 CPU 寄存器的硬件上下文(即各个寄存器的内容)是进程 / 线程私有的。
- 用
swap、exchange将内存变量交换到 CPU 寄存器的操作,本质是当前线程 / 进程获取锁的过程。因为是 “交换” 而非 “拷贝”,变量只有一份,所以谁申请到,谁就持有锁。
2.6 封装锁接口
// Mutex.hpp
#pragma once#include <pthread.h>class Mutex{public:Mutex() {pthread_mutex_init(&_mutex , nullptr);}void lock() {int n = pthread_mutex_lock(&_mutex);(void)n;}void unlock() {int n = pthread_mutex_unlock(&_mutex);(void)n;}~Mutex() {pthread_mutex_destroy(&_mutex);}private:pthread_mutex_t _mutex;
};
// 基于 RAII 的互斥锁
class MutexGuard{public:MutexGuard(Mutex& mutex):_mutex(mutex){_mutex.lock();}~MutexGuard() {_mutex.unlock();}private:Mutex& _mutex;
};
3. 同步
3.1 概念
互斥: 解决的是竞争问题,确保同一时刻只有一个执行流(线程/进程)能够进入临界区,访问共享资源。
同步:协调执行顺序。同步的存在,主要是为了在互斥的基础上,解决执行流之间的协作与顺序问题,从而提升效率和公平性,防止饥饿。核心是:你做完,我才能做(协调逻辑上的的顺序)。
-
在多线程竞争的场景下:因为线程A在释放锁之后,可能继续申请锁,导致其他线程的饥饿问题。
-
一个线程需要等待另一个线程完成某项工作或满足某个条件后才能继续执行
目前的理解(之后有了新的理解继续完善):
✅ 同步解决了:“生产者生产完,消费者才能消费” 这种跨线程的协作顺序
❌ 同步不解决:“生产者A和生产者B之间要公平轮流” 这种同类型线程间的公平性
- 现代操作系统的调度器会检测到线程饥饿,可能会暂时降低线程A的优先级,给其他线程机会。
比如生产者信号量已经为0,其余的生产者按照(A B C)的顺序先来访问信号量的,但是都被阻塞了,而信号量中只维持的是等待队列,没有严格的FIFO,所以通常还是需要竞争。
同步是通过协调线程间的执行顺序,确保在逻辑上存在依赖关系的操作按照正确的先后次序执行,比如队列为空,需要先生产再消费。而多个线程在阻塞时,不一定严格按照FIFO的顺序,这取决与OS的调度。一个是业务逻辑的顺序,一个是线程调度的顺序。
3.2 条件变量
条件变量 是一种线程 同步机制,它允许线程在某个条件不满足时主动阻塞(等待),并在条件可能满足时被其他线程唤醒。
它本身不持有状态,而是与一个互斥锁 结合使用。
3.2.1 创建条件变量与释放条件变量
1️⃣ 第一种方式(全局条件变量,无需释放)
#include <pthread.h>pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
2️⃣ 第二种方式(局部条件变量,需要释放)
#include <pthread.h>int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr);
int pthread_cond_destroy(pthread_cond_t *cond);
参数:
-
cond:指向要初始化/释放的条件变量的指针。 -
attr:指向条件变量属性对象的指针。如果为NULL,则使用默认属性。
返回值: 成功返回 0;失败返回错误码。
3.2.2 阻塞等待
#include <pthread.h>int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);
参数:
-
cond:要等待的条件变量。 -
mutex:在调用此函数前,当前线程必须已经锁定的互斥锁。
为什么等待需要锁?
通常是满足某种条件才需要等待,那么等待前就要对资源的数量进行判定,而判断本身就是访问临界资源,所以通常该接口一定是在临界区的内部,也就是说条件不满足要休眠,一定是在临界区休眠的。但是当前线程阻塞等待时,是持有锁的,所以 pthread_cond_wait 会先释放锁之后再进行等待。
3.2.3 唤醒等待的线程
唤醒一个线程
#include <pthread.h>int pthread_cond_signal(pthread_cond_t *cond);
唤醒所有线程
#include <pthread.h>int pthread_cond_broadcast(pthread_cond_t *cond);
唤醒线程之后,因为 pthread_cond_wait 接口在临界区内部,所以线程被唤醒之后在临界区内部,所以唤醒后,函数内部会先重新获取锁,然后才返回。
工作流程:
pthread_cond_wait 调用会立即释放互斥锁
mutex。然后将当前线程挂起,进入阻塞等待状态。
当被
pthread_cond_signal或pthread_cond_broadcast唤醒后,函数会先重新获取互斥锁mutex,然后才返回。如果申请锁失败,就会在锁上阻塞等待。即使等待队列中没有线程,调用唤醒接口(
pthread_cond_signal或pthread_cond_broadcast)也是完全安全的,不会有任何负面影响。
3.3 封装条件变量接口
// Mutex.hpp
#pragma once#include <pthread.h>class Mutex{public:Mutex() {pthread_mutex_init(&_mutex , nullptr);}void lock() {int n = pthread_mutex_lock(&_mutex);(void)n;}void unlock() {int n = pthread_mutex_unlock(&_mutex);(void)n;}pthread_mutex_t* getMutex() { return &_mutex; }~Mutex() {pthread_mutex_destroy(&_mutex);}private:pthread_mutex_t _mutex;
};
// 基于 RAII 的互斥锁
class MutexGuard{public:MutexGuard(Mutex& mutex):_mutex(mutex){_mutex.lock();}~MutexGuard() {_mutex.unlock();}private:Mutex& _mutex;
};// Cond.hpp
#pragma once#include <pthread.h>
#include "mutex.hpp"class Cond{public:Cond() {pthread_cond_init(&_cond , nullptr);}// 基于该锁下的条件变量进行等待void wait(Mutex& mutex) {int n = pthread_cond_wait(&_cond , mutex.getMutex());(void)n;}void signal() {int n = pthread_cond_signal(&_cond);(void)n;}void broadcast() {int n = pthread_cond_broadcast(&_cond);(void)n;}~Cond() {pthread_cond_destroy(&_cond);}private:pthread_cond_t _cond;
};
3.4 基于阻塞队列的生产者消费者模型
生产者消费者模型描述的是 两种不同角色线程 通过一个 共享的内存空间 进行协作的过程。
-
生产者:负责生成数据或任务的线程。
-
消费者:负责处理数据或任务的线程。
-
内存空间:一个共享的、大小通常固定的队列,用于在生产者和消费者之间传递数据。
三种关系:
-
生产者与生产者之间:互斥关系。
- 两个生产者同时向同一个位置添加数据,导致数 据覆盖。
-
消费者与消费者之间:互斥关系。
- 两个消费者同时取走同一个数据。
-
生产者与消费者之间:互斥和同步关系。
-
理解这里的同步:生产满了就必须消费者消费,消费空了就必须等待生产者生产。
-
理解这里的互斥:我生产的时候你不能消费,我消费的时候你不能生产。
-
两种角色: 生产者和消费者。
一个交易场所: 以特定结构构成的内存空间。
生产者消费者模型的优点:
-
解耦: 生产者和消费者之间不直接通信,而是通过共享的缓冲区进行交互。生产者只关心如何生成数据,消费者只关心如何处理数据。
-
并发: 生产者和消费者可以独立地、并行地执行。
-
提高效率: 关键就在于要把眼光从共享缓冲区这个临界区移开,看到整个生产者消费者模型的全局工作流。 在共享缓冲区这个临界区内,确实任何时候只能有一个生产者或消费者在操作,看起来是串行的。但效率的提升发生在临界区之外。生产时间和消费时间的并行化,这才是效率的关键,真正耗时的操作在锁外,这些操作可以并行执行,生产者A在生产时,消费者B在处理,互不干扰。

在多线程编程中阻塞队列(Blocking Queue)是⼀种常⽤于实现⽣产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作 (线程) 将会被阻塞,直到队列中被放⼊了元素;当队列满时,往队列⾥存放元素的操作 (线程) 也会被阻塞,直到有元素被从队列中取出。
代码:
//BlockQueue.hpp
#pragma once#include <iostream>
#include <queue>
#include <pthread.h>
#include <unistd.h>const int default_maxsize = 5;template<typename T>
class BlockQueue{public:BlockQueue(size_t maxsize = default_maxsize) :_maxsize(maxsize) {pthread_mutex_init(&_mutex , nullptr);pthread_cond_init(&_full_cond , nullptr);pthread_cond_init(&_empty_cond , nullptr);}// 生产者void production(const T& data) {pthread_mutex_lock(&_mutex);// 不能使用if注意点,会造成伪唤醒// 1. pthread_cond_wait调用失败立即返回,导致该线程继续向下执行 _q.push(data) 会向已经满的队列中继续添加,造成问题// 2. 假设现在队列中只有一个空位,但是消费者通过 pthread_cond_broadcast 唤醒所有等待的进程,多个生产者都依次竞争到锁,并且向下执行 q.push() 会向已经满的队列中继续添加,造成问题while(_q.size() == _maxsize) {// 生产满的时候生产者需要等待// 1.pthread_cond_wait会维护一个阻塞队列,当生产满的时候,该线程会到阻塞队列中排队等待,并且释放锁// 2.线程被唤醒时,会从pthread_cond_wait返回,但是返回的位置就已经在临界区内部了,所以pthread_cond_wait会在底层解决等待后并且申请锁成功才会返回pthread_cond_wait(&_full_cond , &_mutex);}_q.push(data);// 来到这个地方,一定是有数据的,通知消费者进行消费// 这样写即使 _empty_cond 阻塞队列中没有消费者也不影响pthread_cond_signal(&_empty_cond);pthread_mutex_unlock(&_mutex);}// 消费者T consumption() {pthread_mutex_lock(&_mutex);while(_q.empty()) {// 消费完的时候消费者需要等待// std::cout << "消费者进入阻塞队列中等待"pthread_cond_wait(&_empty_cond , &_mutex);}T data = _q.front();_q.pop();pthread_cond_signal(&_full_cond);pthread_mutex_unlock(&_mutex);return data;}~BlockQueue() {pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_full_cond);pthread_cond_destroy(&_empty_cond);}private:std::queue<T> _q; unsigned int _maxsize;pthread_mutex_t _mutex;pthread_cond_t _full_cond; // 生产者生产满的条件变量pthread_cond_t _empty_cond; // 消费者消费空的条件变量
};// Main.cc
#include "BlockQueue.hpp"
#include <ctime>void* producer(void* args) {BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(args);while(true) {int num = rand();bq->production(num);std::cout << "生产者生产了一个数字: " << num << std::endl;sleep(1); // 生产的慢一点,消费的快一点}return nullptr;
}void* customer(void* args) {BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(args);while(true) {int res = bq->consumption();std::cout << "消费者消费了一个数字: " << res << std::endl;}return nullptr;
}int main() {srand(time(nullptr));BlockQueue<int>* bq = new BlockQueue<int>;// 若需改成多生产多消费 BlockQueue.hpp 代码无需变,只需这里添加线程即可pthread_t t1 , t2;pthread_create(&t1 , nullptr , producer , bq);pthread_create(&t2 , nullptr , customer , bq);pthread_join(t1 , nullptr);pthread_join(t2 , nullptr);return 0;
}
代码注意点:
-
pthread_cond_wait 调用会立即释放互斥锁
mutex。 -
然后将当前线程挂起,进入阻塞等待状态。
-
当被
pthread_cond_signal或pthread_cond_broadcast唤醒后,函数会先重新获取互斥锁mutex,然后才返回。如果申请锁失败,就会在锁上阻塞等待。 -
即使等待队列中没有线程,调用唤醒接口(
pthread_cond_signal或pthread_cond_broadcast)也是完全安全的,不会有任何负面影响。 -
production和consumption函数中不能使用 if 来判断资源是否就绪,而需要使用while,这是因为有伪唤醒的情况发生。伪唤醒指线程在资源还不就绪的情况下醒来,导致向下继续执行,发生错误。
-
在多线程编程中,当线程调用条件等待函数(如
pthread_cond_wait)时,该函数有可能因各种原因而立即返回,而不是按预期进入阻塞等待状态。这种情况本质上也是一种伪唤醒。
情况一: 假设我们有一个固定大小的队列,最大容量为10个元素。当前队列已满:-
初始状态:队列已满(10/10),生产者线程准备添加第11个元素
-
条件检查:生产者线程检查
if (_q.size() >= MAX_SIZE),结果为true -
准备等待:线程调用
pthread_cond_wait准备进入等待状态 -
意外返回:由于系统原因,
pthread_cond_wait调用失败并立即返回,而不是阻塞等待 -
错误执行:线程继续执行
_q.push(data),向已满的队列添加第11个元素 -
后果:队列容量约束被破坏,可能引发内存越界、数据覆盖等严重问题
-
-
当消费者线程使用
pthread_cond_broadcast唤醒所有等待的生产者时,可能引发另一种伪唤醒问题。
情况二: 假设队列当前有9个元素,还剩1个空位,但有多个生产者线程在等待:-
初始状态:队列(10/10),多个生产者线程因队列已满而在等待
-
消费者动作:消费者取出一个元素,队列变为(9/10),然后调用
pthread_cond_broadcast唤醒所有等待的生产者 -
线程唤醒:所有等待的生产者线程都被唤醒,开始竞争互斥锁
-
线程P1获胜:第一个生产者线程获得锁,在临界区向下执行,向队列添加元素(10/10)
-
线程P2获得锁:第二个生产者线程获得锁,此时队列已变为(10/10),但该线程继续执行
_q.push(data) -
问题发生:队列变为(10/10),但线程P2继续添加第11个元素,破坏容量约束
-
-
4. POSIX信号量
4.1 概念
信号量 是一种用于控制多线程/多进程间访问共享资源的同步机制。它本质上是一个计数器,代表临界资源的数量,也是对特定资源的预定机制。信号量中的PV操作是两个原子操作,P表示获取一个资源(获取一个信号量),如果资源不可用则阻塞;V操作释放一个资源(释放一个信号量),并唤醒等待的线程。
多线程使用资源,有两种场景:
-
将目标资源整体使用 互斥锁 + 二元信号量或条件变量。
-
将目标资源分块/分批使用 信号量。
4.2 信号量接口
4.2.1 初始化信号量
#include <semaphore.h>int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
-
sem: 指向要初始化的信号量对象的指针。 -
pshared:-
0: 表示信号量将在同一进程的线程之间共享。这是我们用于线程同步时的标准选择。 -
非
0: 表示信号量可以在进程之间共享(需要放在共享内存中)。
-
-
value: 信号量的初始值。这个值决定了信号量在创建时允许多少个线程同时访问资源。-
value = 1: 一个二进制信号量,用作互斥锁(mutex)。 -
value = N: 一个计数信号量,允许最多 N 个线程同时访问一个资源池。
-
返回值:成功返回 0,失败返回 -1。
4.2.2 销毁信号量
#include <semaphore.h>int sem_destroy(sem_t *sem);
4.2.3 P操作
如果信号量的值大于 0,它会立即将值减 1 并返回。
如果信号量的值等于 0,调用线程会阻塞,直到另一个线程调用 sem_post 增加信号量的值。
int sem_wait(sem_t *sem);
4.2.4 V操作
发布信号量,表⽰资源使⽤完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);
4.3 封装信号量
// Sem.hpp
#pragma once#include <semaphore.h>class Sem{const unsigned int defaultvalue = 1;public:Sem(unsigned int sem_value) {sem_init(&_sem , 0 , sem_value);}// --void P() {int n = sem_wait(&_sem); // 原子的(void)n;}// ++void V() {int n =sem_post(&_sem); // 原子的(void)n;}~Sem() {sem_destroy(&_sem);}private:sem_t _sem;
};
4.4 基于环形队列的生产者消费者模型
-
队列为空时,生产者先运行。
-
队列为满时,消费者先运行。生产者阻塞等待,不能把消费者套一个圈以上。
-
消费者不能超过生产者。
-
只要生产者和消费者不访问同一个位置,就可以同时运行。只有当为空或者为满的时候,生产者和消费者才在同一个位置,也就是说,只要不为空不为满,就能同时运行。
- 而空和满的判断通过信号量隐式管理。
-
多生产者之间和多消费者之间使用锁维持互斥关系。
-
而消费者和生产者之间通过信号量维持同步与互斥关系。
// RingQueue.hpp
#pragma once
#include <iostream>
#include <vector>
#include "Mutex.hpp"
#include "Sem.hpp"template <typename T>
class RingQueue{const static unsigned int default_max_capacity = 5;public:RingQueue(unsigned int max_capacity = default_max_capacity) :_ring_q(max_capacity),_max_capacity(max_capacity),_blank_sem(max_capacity),_p_step(0),_data_sem(0),_c_step(0){}void production(const T& data) {// 多个线程先申请信号量_blank_sem.P();// 保证生产者之间的互斥关系{MutexGuard mg(_p_mutex);_ring_q[_p_step] = data;_p_step++;_p_step %= _max_capacity;}_data_sem.V();}void consumption(T* res) {_data_sem.P();// 临界区{MutexGuard mg(_c_mutex);*res = _ring_q[_c_step];_c_step++;_c_step %= _max_capacity;}_blank_sem.V();}private:std::vector<T> _ring_q;unsigned int _max_capacity;Sem _blank_sem; // 生产者信号量,代表可生产的资源数量unsigned int _p_step; // 生产者在该位置进行生产Sem _data_sem; // 消费者信号量,代表可消费的资源数量unsigned int _c_step; // 消费者在该位置进行消费Mutex _p_mutex; // 保证生产者之间的互斥关系Mutex _c_mutex; // 保证消费者之间的互斥关系
};// Main.cc
#include "RingQueue.hpp"#include <string>
#include <memory>
#include <ctime>
#include <unistd.h>const int producer_num = 2;
const int consumer_num = 3;struct ThreadDate{ThreadDate(const std::string& name , RingQueue<int>* ringq) :thread_name(name),rq(ringq){}std::string thread_name;RingQueue<int>* rq;
};void* producer(void* args) {ThreadDate* tdata = static_cast<ThreadDate*>(args);while(true) {int num = rand() % 10;std::cout << tdata->thread_name << " 生产了一个数据: " << num << std::endl;tdata->rq->production(num);}delete tdata;return nullptr;
}void* consumer(void* args) {ThreadDate* tdata = static_cast<ThreadDate*>(args);while(true) {sleep(1);int res = 0;tdata->rq->consumption(&res);std::cout << tdata->thread_name << " 消费了一个数据: " << res << std::endl;}delete tdata;return nullptr;
}int main() {srand(time(nullptr));RingQueue<int>* ringqueue = new RingQueue<int>;pthread_t ptids[producer_num] , ctids[consumer_num];for(int i = 0; i < producer_num; i++) {ThreadDate* td = new ThreadDate("producer-thread-" + std::to_string(i + 1) , ringqueue);pthread_create(ptids + i , nullptr , producer , td);}for(int i = 0; i < consumer_num; i++) {ThreadDate* td = new ThreadDate("consumer-thread-" + std::to_string(i + 1) , ringqueue);pthread_create(ctids + i , nullptr , consumer , td);}for(int i = 0; i < producer_num; i++) {pthread_join(ptids[i] , nullptr);}for(int i = 0; i < consumer_num; i++) {pthread_join(ctids[i] , nullptr); }delete ringqueue;return 0;
(ctids[i] , nullptr); }delete ringqueue;return 0;
}
为什么先竞争信号量再竞争锁呢?
这种顺序设计是为了减少临界区持有时间,提高并发度,避免不必要的线程阻塞。
-
多个生产者或消费者可以同时先竞争信号量。
-
只有竞争到信号量的线程才会进入锁竞争。
-
减少了无意义的锁竞争。
此模式的精髓在于将资源从一个整体资源解构为多个独立的块资源,目的是为了让多个线程同时进行访问不同的块资源。若先竞争锁,无异于当成整体资源看待了,线程面对的依然是一个不可分割的资源整体,而非立即可用的资源块,违背了开头的思想。
