【Linux】线程同步与互斥 (生产者消费者模型)
🌈 个人主页:努力可抵万难
🔥 系列专栏:Linux
引言
在多线程编程中,多个线程并发访问共享资源(如全局变量)时,若缺乏同步机制,可能导致数据竞争和不一致性。本文通过一个经典的“抢票”案例,剖析线程安全问题,并演示如何通过互斥量(mutex)实现资源的安全访问。
一、Linux线程互斥
相关概念
- 临界资源:多线程执行流共享的资源就叫做临界资源
- 临界区:每个线程内部,访问临界资源的代码,就叫做临界区
- 互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,让多个线程串行访问共享资源,通常对临界资源起保护作用
- 原子性(后面讨论如何实现):不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完成(如只用一条汇编就能完成)
互斥量mutex
- 大部分情况,线程使用的数据都是局部变量,变量的地址空间在线程栈空间内,这种情况,变量归属单个线程,其他线程无法获得这种变量。但有时候,很多变量都需要在线程间共享,这样的变量称为共享变量,可以通过数据的共享,完成线程之间的交互。多个线程并发的操作共享变量,会带来一些问题。
- 多个线程交叉执行本质:就是让调度器尽可能的频繁发生线程调度与切换
- 线程一般在什么时候发生切换呢?时间片到了,来了更高优先级的线程,线程等待的时候。
- 线程是在什么时候检测上面的问题呢?从内核态返回用户态的时候,线程要对调度状态进行检测,如果可以,就直接发生线程切换。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>// 共享资源, 火车票
int tickets = 10000;void *route(void *arg)
{char *id = (char*)arg;while ( 1 ) {if ( ticket > 0 ) {usleep(1000);printf("%s sells ticket:%d\n", id, ticket);ticket--;} else break;}
} int main( void )
{pthread_t t1, t2, t3, t4;pthread_create(&t1, NULL, route, "thread 1");pthread_create(&t2, NULL, route, "thread 2");pthread_create(&t3, NULL, route, "thread 3");pthread_create(&t4, NULL, route, "thread 4");pthread_join(t1, NULL);pthread_join(t2, NULL);pthread_join(t3, NULL);pthread_join(t4, NULL);
}
对一个全局变量进行多线程更改是安全的吗?
取出ticket--部分的汇编代码
objdump -d a.out > test.objdump
152 40064b: 8b 05 e3 04 20 00 mov 0x2004e3(%rip),%eax # 600b34 <ticket>
153 400651: 83 e8 01 sub $0x1,%eax
154 400654: 89 05 da 04 20 00 mov %eax,0x2004da(%rip) # 600b34 <ticket>
对变量进行++,--,在C、C++上,看起来只有一条语句,但是汇编之后至少是三条语句
- load :将共享变量ticket从内存加载到CPU寄存器中
- update : 更新寄存器里面的值,进行对应的算数逻辑运算
- store :将新值,从寄存器写回共享变量ticket的内存地址
我们定义的全局变量在没有保护的时候,往往是不安全的,像上面多个线程在交替执行的时候会造成数据安全问题,导致数据不一致。 要解决以上问题,需要做到三点:
- 代码必须要有互斥行为:当代码进入临界区执行时,不允许其他线程进入该临界区。
- 如果多个线程同时要求执行临界区的代码,并且临界区没有线程在执行,那么只能允许一个线程进入该临界区。
- 如果线程不在临界区中执行,那么该线程不能阻止其他线程进入临界区。
要做到这三点,本质上就是需要一把锁。Linux上提供的这把锁叫互斥量(mutex)。
如何看待锁?
- 锁,本身就是一个共享资源!全局的变量是要被保护的,锁是用来保护全局的资源的,所以锁本身也是全局资源,那锁的安全谁来保护呢?
- pthread_mutex_lock、pthread_mutex_unlock:加锁的过程必须是安全的!加锁和解锁的过程其实一定要是原子的!
- 如果申请成功,就继续向后执行,如果申请暂时没有成功,执行流会阻塞!
- 谁持有锁,谁进入临界区!
- 加锁和解锁的过程使得多个线程串行执行,导致程序变慢
- 锁只规定互斥访问,没有规定必须让谁优先执行。
- 锁是让多个执行流进行竞争的结果
互斥量(mutex)的接口
初始化互斥量
初始化互斥量有两种⽅法:
⽅法1,静态分配:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER
⽅法2,动态分配:
int pthread_mutex_init(pthread_mutex_t *restrict mutex, constpthread_mutexattr_t *restrict attr);参数:mutex:要初始化的互斥量attr:NULL
销毁互斥量
销毁互斥量需要注意:
- 使⽤ PTHREAD_ MUTEX_ INITIALIZER 初始化的互斥量不需要销毁
- 不要销毁⼀个已经加锁的互斥量
- 已经销毁的互斥量,要确保后⾯不会有线程再尝试加锁
int pthread_mutex_destroy(pthread_mutex_t *mutex);
互斥量加锁和解锁
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
返回值:成功返回0,失败返回错误号
加锁之后的售票系统:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <sched.h>int ticket = 100;
pthread_mutex_t mutex;void *route(void *arg)
{char *id = (char *)arg;while (1){pthread_mutex_lock(&mutex);if (ticket > 0){usleep(1000);printf("%s sells ticket:%d\n", id, ticket);ticket--;pthread_mutex_unlock(&mutex);// sched_yield(); 放弃CPU}else{pthread_mutex_unlock(&mutex);break;}}
}int main()
{pthread_t t1, t2, t3, t4;pthread_mutex_init(&mutex, NULL);pthread_create(&t1, NULL, route, (void*)"thread 1");pthread_create(&t2, NULL, route, (void*)"thread 2");pthread_create(&t3, NULL, route, (void*)"thread 3");pthread_create(&t4, NULL, route, (void*)"thread 4");pthread_join(t1, NULL);pthread_join(t2, NULL);pthread_join(t3, NULL);pthread_join(t4, NULL);pthread_mutex_destroy(&mutex);return 0;
}
互斥量实现原理探究
- 经过上面的例子,大家已经意识到单纯的 i++ 或者 ++i 都不是原子的,有可能会有数据一致性问题
- 为了实现互斥锁操作,大多数体系结构都提供了swap或exchange指令,该指令的作用是把寄存器和内存单元的数据相交换,由于只有一条指令,保证了原子性,即使是多处理器平台,访问内存的 总线周期也有先后,一个处理器上的交换指令执行时另一个处理器的交换指令只能等待总线周期。 现在我们把lock和unlock的伪代码改一下
如果我们想简单的使用锁,该如何进行封装设计?
RAII⻛格的互斥锁, C++11也有,⽐如:
std::mutex mtx;
std::lock_guard<std::mutex> guard(mtx);此处我们仅做封装,⽅便后续使⽤客
Mutex.hpp
#pragma once
#include <iostream>
#include <pthread.h>
class Mutex
{
public:Mutex(pthread_mutex_t *lock_p = nullptr): lock_p_(lock_p){}void lock(){if(lock_p_) pthread_mutex_lock(lock_p_);}void unlock(){if(lock_p_) pthread_mutex_unlock(lock_p_);}~Mutex(){}
private:pthread_mutex_t *lock_p_;
};class LockGuard
{
public:LockGuard(pthread_mutex_t *mutex): mutex_(mutex){mutex_.lock(); //在构造函数中进行加锁}~LockGuard(){mutex_.unlock(); //在析构函数中进行解锁}
private:Mutex mutex_;
};
二、线程同步
条件变量
- 当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。
- 例如一个线程访问队列时,发现队列为空,它只能等待,只到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量。
同步概念与竞态条件
- 同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步
- 竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件
条件变量函数
初始化
int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict
attr);
参数:
cond:要初始化的条件变量
attr:NULL
销毁
int pthread_cond_destroy(pthread_cond_t *cond)
等待条件满足
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);
参数:
cond:要在这个条件变量上等待
mutex:互斥量,后面详细解释
唤醒等待
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);
简单案例:
- 我们先使⽤ PTHREAD_COND / MUTEX_INITIALIZER 进⾏测试,对其他细节暂不追究
- 然后将接⼝更改成为使⽤ pthread_cond_init / pthread_cond_destroy 的⽅式,⽅便后续进⾏封装
#include <iostream>
#include <string>
#include <pthread.h>
#include <unistd.h>pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;void *active(void *args)
{std::string name = static_cast<const char*>(args);while(true){pthread_mutex_lock(&mutex);// 没有对资源是否就绪的判定pthread_cond_wait(&cond, &mutex);printf("%s is active!\n", name.c_str());pthread_mutex_unlock(&mutex);}return nullptr;
}int main()
{pthread_t tid1, tid2, tid3;pthread_create(&tid1, nullptr, active, (void*)"thread-1");pthread_create(&tid1, nullptr, active, (void*)"thread-2");pthread_create(&tid1, nullptr, active, (void*)"thread-3");sleep(1);printf("Main thread ctrl begin...\n");while(true){printf("main wakeup thread...\n");pthread_cond_signal(&cond);sleep(1);}pthread_join(tid1, nullptr);pthread_join(tid2, nullptr);pthread_join(tid3, nullptr);return 0;
}
运行结果:
Main thread ctrl begin...
main wakeup thread...
thread-1 is active!
main wakeup thread...
thread-2 is active!
main wakeup thread...
thread-3 is active!
三、⽣产者消费者模型
- 321原则(便于记忆) 三种关系 两个角色 一个消费场所(某种数据结构组织的连续的内存空间)
为何要使用生产者消费者模型
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
生产者消费者模型优点
- 解耦
- 支持并发
- 支持忙闲不均
基于BlockingQueue的生产者消费者模型
BlockingQueue
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
C++ queue模拟阻塞队列的生产消费模型
BlockQueue.hpp
#pragma#include <iostream>
#include <queue>
#include <pthread.h>
#include "Mutex.hpp"
#include "Cond.hpp"namespace BlockQueueModule
{using namespace LockModule;using namespace CondModule;// version2static const int gcap = 10;template<typename T>class BlockQueue{public:BlockQueue(int cap = gcap):_cap(cap),_cwait_num(0),_pwait_num(0){}bool IsFull() { return _q.size() == _cap; }bool IsEmpty() { return _q.empty(); }void Equeue(const T &in) // 生产者{LockGuard lockguard(_mutex);// 生产数据有条件// 结论1:在临界区中等待是必然的(当前)while(IsFull()) // 为了防止伪唤醒 使用while判断{std::cout << "生产者进入等待..." << std::endl;// 2. 等待 释放锁_pwait_num++;_productor_cond.Wait(_mutex); // wait的时候,必定是持有锁的_pwait_num--;// 3. 返回,线程被唤醒 重新申请并持有锁std::cout << "生产者被唤醒..." << std::endl;}// 4. isfull不满足 || 线程被唤醒 _q.push(in); // 生产// 肯定有数据if(_cwait_num){std::cout << "叫醒消费者" << std::endl;_consumer_cond.Notify();}}void Pop(T* out) // 消费者{LockGuard lockguard(_mutex);while(IsEmpty()){std::cout << "消费者进入等待..." << std::endl;_cwait_num++;_consumer_cond.Wait(_mutex); // 伪唤醒_cwait_num--;std::cout << "消费者被唤醒..." << std::endl;}// 4. 线程被唤醒*out = _q.front();_q.pop();// 一定不为满if(_pwait_num){std::cout << "叫醒生产者" << std::endl;_productor_cond.Notify();}}~BlockQueue(){}private:std::queue<T> _q; // 临界资源Mutex _mutex; // 互斥Cond _productor_cond; // 生产者条件变量Cond _consumer_cond; // 消费者条件变量int _cap; // bq最大容量int _cwait_num;int _pwait_num;};
}
main.cc
#include <functional>
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <unistd.h>using namespace BlockQueueModule;
using namespace TaskModule;using task_t = std::function<void()>;void *Comsumer(void *args)
{BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);int data = 10;while(true){// 1. 从bq中拿到数据bq->Pop(&data);// 2. 做处理printf("Comsumer 消费了一个数据:%d\n", data);data++;}
}void *Productor(void *args)
{BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);// 1. 从外部获取数据int data = 10;while(true){sleep(2);// 2. 生产到队列中printf("Productor 生产了一个数据:%d\n", data);bq->Equeue(data);data++;}
}int main()
{BlockQueue<int> *bq = new BlockQueue<int>(5); // 共享资源 -> 临界资源// 单生产 单消费pthread_t c1, c2, p1, p2, p3;pthread_create(&c1, nullptr, Comsumer, (void*)bq);pthread_create(&p3, nullptr, Productor, (void*)bq);pthread_join(c1, nullptr);pthread_join(p3, nullptr);delete bq;return 0;
}
四、POSIX信号量
POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。
初始化信号量
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);参数:pshared:0表⽰线程间共享,⾮零表⽰进程间共享value:信号量初始值
销毁信号量
int sem_destroy(sem_t *sem);
等待信号量
功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem); //P()
发布信号量
功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);//V()
基于环形队列的生产消费模型
- 环形队列采用数组模拟,用模运算来模拟环状特性
- 环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态。
- 但是我们现在有信号量这个计数器,就很简单的进行多线程间的同步过程。做⼀下封装Sem.hpp
#pragma#include <iostream> #include <semaphore.h>namespace SemModule {const int defaultsemval = 1;class Sem{public:Sem(int value = defaultsemval):_init_value(value){int n = ::sem_init(&_sem, 0, value);(void)n;}void P(){int n = ::sem_wait(&_sem);(void)n;}void V(){int n = ::sem_post(&_sem);(void)n;}~Sem(){int n = ::sem_destroy(&_sem);(void)n;}private:sem_t _sem;int _init_value;}; }
RingBuffer.hpp
#pragma#include <iostream> #include <vector> #include <semaphore.h> #include <pthread.h> #include "Sem.hpp" #include "Mutex.hpp"namespace RingBufferModule {using namespace SemModule;using namespace LockModule;template<typename T>class RingBuffer{public:RingBuffer(int cap):_ring(cap),_cap(cap),_p_step(0),_c_step(0),_datasem(0),_spacesem(cap){}void Equeue(const T& in){// 生产者// pthread_mutex_lock(&_p_lock);_spacesem.P();LockGuard lockguard(_p_lock); // 放这里更好 申请信号量和申请锁是并行执行了 _ring[_p_step] = in; // 生产完毕_p_step++;_p_step %= _cap;_datasem.V();}void Pop(T* out){// 消费者// pthread_mutex_lock(&_c_lock);_datasem.P();LockGuard lockguard(_c_lock);*out = _ring[_c_step];_c_step++;_c_step %= _cap;_spacesem.V();}~RingBuffer() {}private:std::vector<T> _ring; // 环,临界资源int _cap; // 总容量int _p_step; // 生产者位置int _c_step; // 消费位置Sem _datasem; // 数据信号量Sem _spacesem; // 空间信号量Mutex _p_lock;Mutex _c_lock;}; }
main.cc
#include <functional> #include "RingBuffer.hpp" #include <unistd.h> #include <pthread.h>using namespace RingBufferModule;void *Comsumer(void *args) {RingBuffer<int> *ring_buffer = static_cast<RingBuffer<int> *>(args);while(true){sleep(1);// 消费数据int data;ring_buffer->Pop(&data);// 处理数据:花时间std::cout << "消费了一个数据: " << data << std::endl;}}void *Productor(void *args) {RingBuffer<int> *ring_buffer = static_cast<RingBuffer<int> *>(args);int data = 0;while(true){// 获取数据: 花时间// 生产数据ring_buffer->Equeue(data);data++;std::cout << "生产了一个数据: " << data << std::endl;}}int main() {RingBuffer<int> *ring_buffer = new RingBuffer<int>(5);pthread_t c1, c2, c3, p1, p2;pthread_create(&c1, nullptr, Comsumer, ring_buffer);pthread_create(&c2, nullptr, Comsumer, ring_buffer);pthread_create(&c3, nullptr, Comsumer, ring_buffer);pthread_create(&p1, nullptr, Productor, ring_buffer);pthread_create(&p2, nullptr, Productor, ring_buffer);pthread_join(c1, nullptr);pthread_join(c2, nullptr);pthread_join(c3, nullptr);pthread_join(p1, nullptr);pthread_join(p2, nullptr);delete ring_buffer;return 0; }
结语
多线程编程的核心在于平衡效率与安全。互斥量解决了数据竞争的燃眉之急,而条件变量和信号量则为复杂的线程协作提供了优雅的解决方案。生产者消费者模型不仅是经典的设计模式,更是理解同步与互斥的试金石。实践中需注意:
- 锁的粒度:过细增加复杂度,过粗降低性能。
- 死锁预防:避免嵌套锁与顺序不一致。
- 同步机制的选择:条件变量适用于事件驱动,信号量适合资源计数。
- 性能优化:环形队列通过空间换时间,减少线程冲突。
掌握这些技术,方能驾驭多线程的“双刃剑”,构建高效稳健的并发系统。理论结合实践,持续探索方能游刃有余。