当前位置: 首页 > news >正文

互斥锁与条件变量

1. 线程互斥基础概念

1-1 进程线程间的互斥相关背景概念

想象一下银行的ATM机大厅——这是理解线程互斥的绝佳场景。在这个比喻中:

  • 临界资源就像ATM机本身,它是所有顾客(线程)共享的资源。在我们的代码示例中,ticket变量就是这样一个临界资源,它被多个线程共享并操作。

  • 临界区则像是使用ATM机的过程,即访问临界资源的代码段。在售票系统代码中,从if(ticket > 0)判断开始到ticket--结束的代码块就是临界区。

  • 互斥机制就像ATM机前的排队系统,确保同一时间只有一个人在操作ATM机。互斥保证了在任何时刻,只有一个执行流能够进入临界区访问临界资源。

  • 原子性可以理解为ATM机的一次完整交易过程——要么全部完成(钱和卡都拿到手),要么完全不发生(交易取消)。在计算机中,原子操作是指不会被任何调度机制打断的操作,它只有两种状态:要么完成,要么未完成。

1-2 互斥量(mutex)的必要性

在大多数情况下,线程使用的数据都是局部变量,这些变量的地址空间位于线程的栈空间内,属于单个线程私有。但是,当我们需要在线程间共享数据时(如售票系统中的票数),问题就出现了。

问题演示:有缺陷的售票系统
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>int ticket = 100;void* route(void* arg) {char* id = (char*)arg;while (1) {if (ticket > 0) {usleep(1000);printf("%s sells ticket:%d\n", id, ticket);ticket--;} else {break;}}
}int main(void) {pthread_t t1, t2, t3, t4;pthread_create(&t1, NULL, route, "thread 1");pthread_create(&t2, NULL, route, "thread 2");pthread_create(&t3, NULL, route, "thread 3");pthread_create(&t4, NULL, route, "thread 4");pthread_join(t1, NULL);pthread_join(t2, NULL);pthread_join(t3, NULL);pthread_join(t4, NULL);
}

运行这个程序,你可能会看到如下输出:


thread 4 sells ticket:100
thread 4 sells ticket:1
thread 2 sells ticket:0
thread 1 sells ticket:-1
thread 3 sells ticket:-2
为什么会出现负数的票?

这个问题的根源在于三个关键点:

  1. 非原子操作ticket--看似简单,实际上对应三条汇编指令:

    mov 0x2004e3(%rip),%eax  # 将ticket从内存加载到寄存器
    sub $0x1,%eax            # 寄存器值减1
    mov %eax,0x2004da(%rip)  # 将新值写回内存
  2. 线程切换时机:在if判断和实际减操作之间,线程可能被切换。

  3. 竞态条件:多个线程可能同时读取相同的ticket值,然后各自减一后写回。

解决方案的三要素

要解决这个问题,我们需要确保:

  1. 互斥行为:一次只允许一个线程进入临界区

  2. 公平竞争:当多个线程竞争时,只有一个能进入

  3. 非阻塞性:不在临界区的线程不应阻碍其他线程

Linux提供的解决方案就是互斥量(mutex),它就像ATM机前的电子锁,确保一次只服务一个人。

1-3 互斥量的使用

初始化互斥量

有两种初始化方法:

  1. 静态初始化

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
  1. 动态初始化

int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);
改进后的售票系统
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <sched.h>int ticket = 100;
pthread_mutex_t mutex;void* route(void* arg) {char* id = (char*)arg;while (1) {pthread_mutex_lock(&mutex);  // 加锁if (ticket > 0) {usleep(1000);printf("%s sells ticket:%d\n", id, ticket);ticket--;pthread_mutex_unlock(&mutex);  // 解锁} else {pthread_mutex_unlock(&mutex);  // 解锁break;}}
}int main(void) {pthread_t t1, t2, t3, t4;pthread_mutex_init(&mutex, NULL);  // 初始化互斥量pthread_create(&t1, NULL, route, "thread 1");pthread_create(&t2, NULL, route, "thread 2");pthread_create(&t3, NULL, route, "thread 3");pthread_create(&t4, NULL, route, "thread 4");pthread_join(t1, NULL);pthread_join(t2, NULL);pthread_join(t3, NULL);pthread_join(t4, NULL);pthread_mutex_destroy(&mutex);  // 销毁互斥量
}

1-4 互斥量实现原理探究

现代处理器提供了特殊的原子指令来实现互斥锁。最常用的是swapexchange指令,它能原子地交换寄存器和内存单元的数据。

互斥锁的伪代码实现:

lock:movb $0, %al      # 将0存入al寄存器xchgb %al, mutex  # 交换al和mutex的值if(al寄存器的内容 > 0) {挂起等待;goto lock;}else return 0;unlock:movb $1, mutex    # 将mutex置1唤醒等待Mutex的线程;return 0;

这个实现的关键在于:

  1. xchgb指令保证了原子性的交换操作

  2. 当锁被占用时(mutex=0),新线程会自旋等待

  3. 解锁时将mutex置1并唤醒等待线程

这种实现方式即使在多处理器环境下也能工作,因为总线仲裁机制会确保内存访问的有序性。

1-5 互斥量的封装

Lock.hpp

#pragma once#include <iostream>
#include <string>
#include <pthread.h>namespace LockModule
{// 对锁进行封装,可以独立使用class Mutex{public:// 删除不要的拷贝和赋值Mutex(const Mutex&) = delete;const Mutex& operator=(const Mutex&) = delete;Mutex(){int n = pthread_mutex_init(&_mutex, nullptr);(void)n;}void Lock(){int n = pthread_mutex_lock(&_mutex);(void)n;}void Unlock(){int n = pthread_mutex_unlock(&_mutex);(void)n;}pthread_mutex_t* GetMutexOriginal() // 获取原始指针{return &_mutex;}~Mutex(){int n = pthread_mutex_destroy(&_mutex);(void)n;}private:pthread_mutex_t _mutex;};// 采用RAII风格,进行锁管理class LockGuard{public:LockGuard(Mutex& mutex) : _mutex(mutex){_mutex.Lock();}~LockGuard(){_mutex.Unlock();}private:Mutex& _mutex;};
}

main.cc

// 抢票的代码就可以更新成为
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>#include "Lock.hpp"
using namespace LockModule;int ticket = 1000;
Mutex mutex;// RAII风格的互斥锁,C++11也有,比如:
// std::mutex mtx;
// std::lock_guard<std::mutex> guard(mtx);void* route(void* arg)
{char* id = (char*)arg;while (1){LockGuard lockguard(mutex); // 使用RAII风格的锁if (ticket > 0){usleep(1000);printf("%s sells ticket:%d\n", id, ticket);ticket--;}elsebreak;}return nullptr;
}int main(void)
{pthread_t t1, t2, t3, t4;pthread_create(&t1, NULL, route, (void*)"thread 1");pthread_create(&t2, NULL, route, (void*)"thread 2");pthread_create(&t3, NULL, route, (void*)"thread 3");pthread_create(&t4, NULL, route, (void*)"thread 4");pthread_join(t1, NULL);pthread_join(t2, NULL);pthread_join(t3, NULL);pthread_join(t4, NULL);
}

2. 线程同步与条件变量

2-1 条件变量的引入

想象一下这样的场景:你走进一家咖啡店想买杯咖啡,但发现咖啡机正在维护中。你有两个选择:

  1. 不断询问店员"咖啡机好了吗?"(忙等待)

  2. 找个座位等待,等咖啡机修好后店员会通知你(条件变量方式)

 

条件变量(Condition Variable)正是为了解决这种"等待特定条件满足"的场景而设计的。它允许线程在某个条件不满足时主动休眠,当条件可能满足时再被唤醒,避免了忙等待带来的资源浪费。

2-2 同步概念与竞态条件

同步的本质是协调多个线程的执行顺序,在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源。就像交通信号灯协调车辆通行顺序一样。

竞态条件则像是十字路口没有信号灯的情况——车辆到达的顺序决定了谁先通过,这种因时序问题导致的程序异常就是竞态条件。例如:

// 线程A
if (queue.empty()) {// 这里可能被切换queue.push(item); 
}// 线程B
if (queue.empty()) {queue.push(item);
}

两个线程可能都判断队列为空,然后都尝试添加元素,导致数据不一致。

2-3 条件变量函数详解

基本接口

初始化

int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr);

销毁

int pthread_cond_destroy(pthread_cond_t *cond);

等待条件

int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);

唤醒等待

int pthread_cond_signal(pthread_cond_t *cond);  // 唤醒至少一个线程
int pthread_cond_broadcast(pthread_cond_t *cond); // 唤醒所有等待线程
条件变量使用示例
#include <iostream>
#include <pthread.h>
#include <unistd.h>pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;void* active(void* arg) {const char* name = static_cast<const char*>(arg);while (true) {pthread_mutex_lock(&mutex);pthread_cond_wait(&cond, &mutex); // 自动释放锁并等待std::cout << name << " 活动..." << std::endl;pthread_mutex_unlock(&mutex);}
}int main() {pthread_t t1, t2;pthread_create(&t1, NULL, active, (void*)"thread-1");pthread_create(&t2, NULL, active, (void*)"thread-2");sleep(1); // 确保线程已启动for (int i = 0; i < 3; ++i) {// pthread_cond_signal(&cond); // 每次唤醒一个线程pthread_cond_broadcast(&cond); // 每次唤醒所有线程sleep(1);}pthread_join(t1, NULL);pthread_join(t2, NULL);return 0;
}
为什么pthread_cond_wait需要互斥锁?

这是一个精妙的设计,确保了"检查条件"和"进入等待"的原子性。具体流程:

  1. 调用线程首先获取互斥锁

  2. 检查条件,如果条件不满足:

    • 原子性地释放锁并进入等待状态

    • 被唤醒时,重新获取锁后再返回

这防止了以下竞态条件:

  • 线程A检查条件→不满足

  • 在A准备等待前,线程B修改了条件并发出信号

  • 然后线程A才开始等待,可能永远等不到信号

2-4 生产者消费者模型

2-4-1 为何需要生产者消费者模型

想象一个包子铺的场景:

  • 生产者(厨师):制作包子

  • 消费者(顾客):购买包子

  • 阻塞队列(蒸笼):存放包子

如果没有蒸笼:

  • 厨师必须等顾客来了才能开始做包子(强耦合)

  • 顾客必须等厨师做好包子才能购买(效率低下)

有了蒸笼:

  • 厨师可以提前做好包子放入蒸笼

  • 顾客可以直接从蒸笼取包子

  • 双方不需要直接交互(解耦)

2-4-2 生产者消费者模型的优点
  1. 解耦:生产者和消费者不直接依赖对方

  2. 支持并发:生产者填充队列时,消费者可以同时消费

  3. 平衡负载:当生产者快于消费者时,队列缓冲数据;反之消费者可以等待新数据

2-5 基于BlockingQueue的生产者消费者实现

2-5-1 BlockingQueue概念

阻塞队列是一种特殊的队列:

  • 当队列为空时,消费者尝试获取元素会被阻塞,直到有元素可用

  • 当队列满时,生产者尝试添加元素会被阻塞,直到有空间可用

2-5-2 C++实现阻塞队列

#ifndef __BLOCK_QUEUE_HPP__
#define __BLOCK_QUEUE_HPP__
#include <iostream>
#include <string>
#include <queue>
#include <pthread.h>template <typename T>
class BlockQueue
{
private:bool IsFull(){return _block_queue.size() == _cap;}bool IsEmpty(){return _block_queue.empty();}public:BlockQueue(int cap) : _cap(cap){_productor_wait_num = 0;_consumer_wait_num = 0;pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_product_cond, nullptr);pthread_cond_init(&_consum_cond, nullptr);}void Enqueue(T &in) // 生产者用的接口{pthread_mutex_lock(&_mutex);while(IsFull()) // 保证代码的健壮性{// 生产线程去等待,是在临界区中休眠的!你现在还持有锁呢!!!// 1. pthread_cond_wait调用是: a. 让调用线程等待 b. 自动释放曾经持有的// _mutex锁 c. 当条件满足,线程唤醒,pthread_cond_wait要求线性// 必须重新竞争_mutex锁,竞争成功,方可返回!!!// 之前:安全_productor_wait_num++;pthread_cond_wait(&_product_cond, &_mutex); // 只要等待,必定会有唤// 醒,唤醒的时候,就要继续从这个位置向下运行!!_productor_wait_num--;// 之后:安全}// 进行生产// _block_queue.push(std::move(in));// std::cout << in << std::endl;_block_queue.push(in);// 通知消费者来消费if(_consumer_wait_num > 0)pthread_cond_signal(&_consum_cond); // pthread_cond_broadcastpthread_mutex_unlock(&_mutex);}void Pop(T *out) // 消费者用的接口 --- 5个消费者{pthread_mutex_lock(&_mutex);while(IsEmpty()) // 保证代码的健壮性{// 消费线程去等待,是在临界区中休眠的!你现在还持有锁呢!!!// 1. pthread_cond_wait调用是: a. 让调用进程等待 b. 自动释放曾经持有的// _mutex锁_consumer_wait_num++;pthread_cond_wait(&_consum_cond, &_mutex); // 伪唤醒_consumer_wait_num--;}// 进行消费*out = _block_queue.front();_block_queue.pop();// 通知生产者来生产if(_productor_wait_num > 0)pthread_cond_signal(&_product_cond);pthread_mutex_unlock(&_mutex);// pthread_cond_signal(&_product_cond);}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_product_cond);pthread_cond_destroy(&_consum_cond);}private:std::queue<T> _block_queue; // 阻塞队列,是被整体使用的!!!int _cap; // 总上限pthread_mutex_t _mutex; // 保护_block_queue的锁pthread_cond_t _product_cond; // 专门给生产者提供的条件变量pthread_cond_t _consum_cond; // 专门给消费者提供的条件变量int _productor_wait_num;int _consumer_wait_num;
};
#endif
关键点说明
  1. 为什么用while而不是if检查条件

    • 防止虚假唤醒(spurious wakeup)

    • 即使被唤醒,条件可能仍然不满足

  2. 通知机制

    • 生产者添加元素后通知消费者

    • 消费者取出元素后通知生产者

  3. 内存屏障

    • 互斥锁的加锁解锁操作隐含内存屏障,确保变量的可见性

这个实现展示了如何将互斥锁和条件变量结合起来解决实际的线程同步问题。生产者消费者模型是并发编程中最基础也最重要的模式之一,理解它对掌握多线程编程至关重要。

2-6 为什么pthread_cond_wait需要互斥量?

原子性保护:解锁与等待的不可分割性

想象你和朋友约在咖啡店见面,但你不确定他是否已经到了。正确的做法应该是:

  1. 先查看座位(检查条件)

  2. 如果朋友不在,你决定等待

  3. 但你必须确保在"决定等待"和"实际开始等待"之间,朋友不会到达并错过通知

pthread_cond_wait的互斥量正是为了解决这个问题。它将"释放锁"和"进入等待"这两个操作合并为一个原子操作:

// 错误做法(可能导致信号丢失)
pthread_mutex_unlock(&mutex);
// 这里可能被切换,其他线程可能已经发送信号
pthread_cond_wait(&cond); // 正确做法(原子操作)
pthread_cond_wait(&cond, &mutex); // 自动释放锁并等待

竞态条件分析

考虑以下时间序列:

  1. 线程A检查条件 → 不满足

  2. 线程A释放锁

  3. 线程B获取锁,修改条件,发送信号(此时没有线程在等待,信号丢失)

  4. 线程A开始等待,可能永远等不到信号

pthread_cond_wait的原子性保证了在释放锁和进入等待之间不会有其他线程能够修改条件变量。

2-7 条件变量使用规范

等待条件的标准模式

pthread_mutex_lock(&mutex);
while (condition_is_false) {  // 必须用while而不是ifpthread_cond_wait(&cond, &mutex);
}
// 操作共享资源
pthread_mutex_unlock(&mutex);

为什么使用while循环?

  • 防止虚假唤醒(spurious wakeup)

  • 确保被唤醒后条件确实满足

  • POSIX标准明确允许条件变量在没有显式信号时也可能返回

通知条件的标准模式

pthread_mutex_lock(&mutex);
// 修改共享变量使条件为真
condition = true;
pthread_cond_signal(&cond); // 或pthread_cond_broadcast
pthread_mutex_unlock(&mutex);

2-8 条件变量的封装

Cond类的设计要点

class Cond {
public:Cond() { pthread_cond_init(&_cond, nullptr); }void Wait(Mutex &mutex) {pthread_cond_wait(&_cond, mutex.GetMutexOriginal());}void Notify() { pthread_cond_signal(&_cond); }void NotifyAll() { pthread_cond_broadcast(&_cond); }~Cond() { pthread_cond_destroy(&_cond); }private:pthread_cond_t _cond;
};

设计决策:

  1. 不与特定互斥锁耦合,保持灵活性

  2. 提供基本的等待/通知接口

  3. 错误处理可根据实际需求添加

2-9 POSIX信号量

信号量基础

信号量是一种更通用的同步原语,可以看作是一个计数器,支持两个原子操作:

  • P()/wait:计数器减1,如果计数器为0则阻塞

  • V()/post:计数器加1,唤醒等待的线程

#include <semaphore.h>// 初始化
int sem_init(sem_t *sem, int pshared, unsigned int value);// 销毁
int sem_destroy(sem_t *sem);// P操作(等待)
int sem_wait(sem_t *sem); // V操作(发布)
int sem_post(sem_t *sem);

信号量封装

class Sem {
public:Sem(int n) { sem_init(&_sem, 0, n); }void P() { sem_wait(&_sem); }void V() { sem_post(&_sem); }~Sem() { sem_destroy(&_sem); }private:sem_t _sem;
};

2-10 基于环形队列的生产消费模型

环形队列设计

#pragma once
#include <iostream>
#include <string>
#include <vector>
#include <semaphore.h>
#include <pthread.h>// 单生产、单消费/多生产、多消费
// "321":三种关系
// a: 生产和消费互斥和同步
// b: 生产者之间互斥
// c: 消费者之间互斥
// 解决方案:加锁,需2把锁template<typename T>
class RingQueue
{
private:void Lock(pthread_mutex_t &mutex){pthread_mutex_lock(&mutex);}void Unlock(pthread_mutex_t &mutex){pthread_mutex_unlock(&mutex);}public:RingQueue(int cap): _ring_queue(cap),_cap(cap),_room_sem(cap),_data_sem(0),_productor_step(0),_consumer_step(0){pthread_mutex_init(&_productor_mutex, nullptr);pthread_mutex_init(&_consumer_mutex, nullptr);}void Enqueue(const T &in){// 生产行为_room_sem.P();Lock(_productor_mutex);// 一定有空间_ring_queue[_productor_step++] = in; // 生产_productor_step %= _cap;Unlock(_productor_mutex);_data_sem.V();}void Pop(T *out){// 消费行为_data_sem.P();Lock(_consumer_mutex);*out = _ring_queue[_consumer_step++];_consumer_step %= _cap;Unlock(_consumer_mutex);_room_sem.V();}~RingQueue(){pthread_mutex_destroy(&_productor_mutex);pthread_mutex_destroy(&_consumer_mutex);}private:// 环形队列std::vector<T> _ring_queue;int _cap; // 容量上限// 生产和消费的下标int _productor_step;int _consumer_step;// 信号量Sem _room_sem; // 生产者关心(空闲空间数)Sem _data_sem; // 消费者关心(数据个数)// 锁(维护多生产多消费互斥)pthread_mutex_t _productor_mutex;pthread_mutex_t _consumer_mutex;
};

设计要点解析

  1. 双信号量控制

    • _room_sem:跟踪可用空间(初始为容量)

    • _data_sem:跟踪数据项(初始为0)

  2. 双互斥锁设计

    • 生产者互斥锁:保护生产者之间的竞争

    • 消费者互斥锁:保护消费者之间的竞争

  3. 环形索引管理

    • 使用模运算实现环形缓冲区

    • 生产者和消费者索引独立移动

  4. 同步流程

    • 生产者:等待空间→获取锁→生产→释放锁→通知数据可用

    • 消费者:等待数据→获取锁→消费→释放锁→通知空间可用

这种设计高效地解决了多生产者多消费者问题,同时避免了不必要的锁竞争。

http://www.dtcms.com/a/318552.html

相关文章:

  • 每日五个pyecharts可视化图表-bars(5)
  • Java语言基础深度面试题
  • List、ArrayList 与顺序表
  • 智能学号抽取系统 V5.7.4 更新报告:修复关键同步漏洞,体验更臻完美
  • Spring Boot 项目代码笔记
  • 三、Istio流量治理(二)
  • 文件权限合规扫描针对香港服务器安全基线的实施流程
  • 《零基础入门AI:深度学习入门(从PyTorch安装到自动微分)》
  • Anthropic于本周一推出了其旗舰模型的升级版Claude Opus 4.1
  • 《第十三篇》深入解析 `kb_api.py`:知识库的创建、删除与查询接口
  • 基于Vue 3 的智能支付二维码弹窗组件设计与实现
  • Effective C++ 条款26: 尽可能延后变量定义式的出现时间
  • 007 前端( JavaScript HTML DOM+Echarts)
  • 【保留小数精度不舍入】2022-10-8
  • MaxKB 使用 MCP 连接 Oracle (免安装 cx_Oracle 和 Oracle Instant Client)
  • 智慧水务管理系统
  • C++、STL面试题总结(二)
  • 三、Envoy的管理接口
  • 数据科学与计算pandas
  • 沉寂半年,Kimi归来!
  • 地铁和城市宏基因组项目metaSUB
  • 脂质体转染、物理转染(电穿孔)与病毒转染:原理及操作步骤详解
  • nlp-词汇分析
  • 【Dify学习笔记】:Dify搭建表单信息提交系统
  • windows系统创建ubuntu系统
  • C++线程中 detach() 和 join() 的区别
  • hf的国内平替hf-mirror
  • AT32的freertos下modbus TCP移植
  • cdn是什么
  • 快手小店客服自动化回复