【Linux】线程同步与互斥(上)
目录
一、线程互斥
1.1 进程线程间的互斥相关背景概念
1.2 互斥量mutex
1.3 互斥量实现原理
1.4 互斥量的封装
二、线程同步
2.1 条件变量
2.2 同步概念与竞态条件
2.3 条件变量函数
2.4 生产者消费者模型
2.4.1 为何要使用生产者消费者模型
2.4.2 生产者消费者模型的优点
2.5 基于BlockingQueue的生产者消费者模型
2.5.1 BlockingQueue
2.5.2 C++ queue模拟阻塞队列的生产者消费者模型
2.6 为什么 pthread_cond_wait 需要互斥量
2.7 条件变量使用规范
2.8 条件变量的封装
三、POSIX信号量
3.1 基于环形队列的生产者消费者模型
一、线程互斥
1.1 进程线程间的互斥相关背景概念
- 共享资源
- 临界资源:多线程执行流被保护的共享的资源就叫做临界资源。
- 临界区:每个线程内部,访问临界资源的代码,叫做临界区。
- 互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用。
- 原子性:不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完成。
1.2 互斥量mutex
- 大部分情况,线程使用的数据都是局部变量,变量的地址空间在线程栈空间内,这种情况,变量归属单个线程,其他线程无法获得这种变量。
- 但有时候,很多变量都需要在线程间共享,这样的变量称为共享变量,可以通过数据的共享,完成线程之间的交互。
- 多个线程并发的操作共享变量,会带来一些问题。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>int ticket = 100;void* route(void* arg) {char* id = (char*)arg;while(1) {if(ticket > 0) {sleep(1); // 模拟中间的业务--ticket;printf("%s sell ticket, ticket number: %d\n", id, ticket);}else break;}
}int main()
{pthread_t p1, p2, p3, p4;pthread_create(&p1, NULL, route, (void*)"thread-1");pthread_create(&p2, NULL, route, (void*)"thread-2");pthread_create(&p3, NULL, route, (void*)"thread-3");pthread_create(&p4, NULL, route, (void*)"thread-4");pthread_join(p1, NULL);pthread_join(p2, NULL);pthread_join(p3, NULL);pthread_join(p4, NULL);return 0;
}
为什么可能无法获得正确结果呢?
- if 语句判断条件为真后,代码可以并发的切换到其他线程。
- sleep 这个用来模拟漫长的业务过程,在这个漫长的业务过程中,可能会有很多个线程进入这个代码段。
- --ticket 本身就不是一个原子操作。
11eb: bf 01 00 00 00 mov $0x1,%edi
11f0: e8 db fe ff ff call 10d0 <sleep@plt>
11f5: 8b 05 15 2e 00 00 mov 0x2e15(%rip),%eax # 4010 <ticket>
11fb: 83 e8 01 sub $0x1,%eax
11fe: 89 05 0c 2e 00 00 mov %eax,0x2e0c(%rip) # 4010 <ticket>
1204: 8b 15 06 2e 00 00 mov 0x2e06(%rip),%edx # 4010 <ticket>
-- 操作并不是原子操作,而是对应三条汇编指令:
- load:其共享变量 ticket 从内存加载到寄存器中。
- update:更新寄存器里面的值。
- store:将新值,从寄存器中写入 ticket 的内存地址。
要解决以上问题,需要做到三点:
- 代码必须要有互斥行为:当代码进入临界区执行时,不允许其他线程进入该临界区。
- 如果多个线程同时要求执行临界区代码,并且临界区没有线程在执行,那么只能允许一个线程进入该临界区。
- 如果线程不在临界区中执行,那么该线程不能阻止其他线程进入临界区。
要做到这三点,本质上就是要拥有一把锁。Linux上提供的这把锁叫互斥量。
互斥量的接口
初始化互斥量
初始化互斥量有两种方法:
- 方法1,静态分配:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER
- 方法2,动态分配:
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);
参数
- mutex:要初始化的互斥量
- attr:NULL
销毁互斥量
销毁互斥量需要注意:
- 使用 PTHREAD_ MUTEX_ INITIALIZER 初始化的互斥量不需要销毁。
- 不要销毁一个已经加锁的互斥量。
- 已经销毁的互斥量,要确保后面没有线程再尝试通过该互斥量进行加锁。
int pthread_mutex_destroy(pthread_mutex_t *mutex);
互斥量加锁和解锁
int pthread_mutex_lock(pthread_mutex_t *mutex);int pthread_mutex_unlock(pthread_mutex_t *mutex);返回值:成功返回0,失败返回错误号。
调用 pthread_mutex_lock 时,可能会遇到以下情况:
- 互斥量处于未锁状态,该函数会将互斥量锁定,同时返回成功。
- 发起函数调用时,其他线程已经锁定该互斥量,或者存在其他线程同时申请互斥量,但没有竞争到互斥量,那么 pthread_mutex_lock 调用会陷入阻塞(执行流被挂起),等待互斥量解锁。
改进上面的售票系统:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>int ticket = 100;
pthread_mutex_t mutex;void* route(void* arg) {char* id = (char*)arg;while(1) {pthread_mutex_lock(&mutex);if(ticket > 0) {sleep(1);--ticket;printf("%s sell ticket, ticket number: %d\n", id, ticket);pthread_mutex_unlock(&mutex);}else {pthread_mutex_unlock(&mutex);break;}}
}int main()
{pthread_t p1, p2, p3, p4;pthread_mutex_init(&mutex, NULL);pthread_create(&p1, NULL, route, (void*)"thread-1");pthread_create(&p2, NULL, route, (void*)"thread-2");pthread_create(&p3, NULL, route, (void*)"thread-3");pthread_create(&p4, NULL, route, (void*)"thread-4");pthread_join(p1, NULL);pthread_join(p2, NULL);pthread_join(p3, NULL);pthread_join(p4, NULL);pthread_mutex_destroy(&mutex);return 0;
}
这里总是线程1在执行,因为线程1第一个执行,第一个抢到锁,其他线程就被挂起。当线程1解锁后,系统唤醒在锁上挂起的线程,但是线程1解锁后立刻抢了锁,而其他线程还刚被唤醒,又被挂起等待,所以导致只有线程1在执行,其他线程一直挂起。
1.3 互斥量实现原理
- 经过上面的例子,大家已经意识到单纯的 i++ 和 ++i 都不是原子的,有可能会有数据一致性问题。
- 为了实现互斥锁操作,大多数体系结构都提供了 swap 或 exchange 指令,该指令的作用是把寄存器和内存单元的数据相交换,由于只有一条指令,保证了原子性,即使是多处理平台,访问内存的总线周期也有先后,一个处理器的交换指令执行时另一个处理器的交换指令只能等待总线周期。现在,我们把 lock 和 unlock 的伪代码改一下
1.4 互斥量的封装
// Lock.hpp#pragma once#include <iostream>
#include <string>
#include <pthread.h>namespace LockModule
{// 对锁进行封装,可以独立使用class Mutex{public:Mutex(const Mutex&) = delete;const Mutex& operator=(const Mutex&) = delete;Mutex() {int n = pthread_mutex_init(&_mutex, nullptr);if(n != 0) std::cerr << "pthread_mutex_init" << std::endl;}void Lock() {int n = pthread_mutex_lock(&_mutex);if(n != 0) std::cerr << "pthread_mutex_lock" << std::endl;}void Unlock() {int n = pthread_mutex_unlock(&_mutex);if(n != 0) std::cerr << "pthread_mutex_unlock" << std::endl;}pthread_mutex_t* GetMutexPtr() {return &_mutex;}~Mutex() {int n = pthread_mutex_destroy(&_mutex);if(n != 0) std::cerr << "pthread_mutex_destroy" << std::endl;}private:pthread_mutex_t _mutex;};// 采用RAII风格,进行锁管理class LockGuard{public:LockGuard(Mutex& mutex): _mutex(mutex){_mutex.Lock();}~LockGuard() {_mutex.Unlock();}private:Mutex& _mutex;};
}
// 使用封装的抢票系统#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include "Lock.hpp"using namespace LockModule;int ticket = 100;
Mutex mutex;void* route(void* arg) {char* id = (char*)arg;while(1) {LockGuard lock(mutex);if(ticket > 0) {sleep(1);--ticket;printf("%s sell ticket, ticket number: %d\n", id, ticket);}else {break;}}
}int main()
{pthread_t p1, p2, p3, p4;pthread_create(&p1, NULL, route, (void*)"thread-1");pthread_create(&p2, NULL, route, (void*)"thread-2");pthread_create(&p3, NULL, route, (void*)"thread-3");pthread_create(&p4, NULL, route, (void*)"thread-4");pthread_join(p1, NULL);pthread_join(p2, NULL);pthread_join(p3, NULL);pthread_join(p4, NULL);return 0;
}
RAII 风格的互斥锁,C++11 也有,比如:
std::mutex mtx;
std::lock_guard<std::mutex> guard(mtx);
此处我们仅做封装,方便后面使用。
二、线程同步
2.1 条件变量
- 当一个线程互斥的访问某个变量时,他可能发现在其他线程改变状态之前,他什么也做不了。
- 例如一个线程访问队列是,发现队列为空,它只能等待,直到其他线程将一个节点添加到队列中。这种情况就需要用到条件变量。
2.2 同步概念与竞态条件
- 同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题。
- 竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件。
2.3 条件变量函数
初始化
int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr);
参数:
- cond:要初始化的条件变量。
- attr:NULL
返回值:成功返回0,失败返回错误码。
销毁
int pthread_cond_destroy(pthread_cond_t *cond);
参数:
- cond:要销毁的条件变量
返回值:成功返回0,失败返回错误码。
等待条件满足
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);
参数:
- cond:要在这个条件变量上等待
- mutex:互斥量
返回值:成功返回0,失败返回错误码。
唤醒等待
int pthread_cond_broadcast(pthread_cond_t *cond);
功能:唤醒所有在 cond 条件变量上等待的线程。
int pthread_cond_signal(pthread_cond_t *cond);
功能:唤醒一个在 cond 条件变量上等待的线程。
简单案例:
- 我们先使用 PTHREAD_COND_INITIALIZER和PTHREAD_MUTEX_INITIALIZER 进行测试,不追究细节。
- 然后将接口更改称为使用 pthread_cond_init/pthread_cond_destroy 形式,后面再封装。
#include <iostream>
#include <string.h>
#include <unistd.h>
#include <pthread.h>pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;void *active(void *arg)
{std::string name = static_cast<const char *>(arg);while (true){pthread_mutex_lock(&mutex);pthread_cond_wait(&cond, &mutex);std::cout << name << " 活动..." << std::endl;pthread_mutex_unlock(&mutex);}
}int main(void)
{pthread_t t1, t2;pthread_create(&t1, NULL, active, (void *)"thread-1");pthread_create(&t2, NULL, active, (void *)"thread-2");sleep(3); // 这里确保两个线程已经在运行while (true){// 对比测试// pthread_cond_signal(&cond); // 唤醒一个线程pthread_cond_broadcast(&cond); // 唤醒所有线程sleep(1);}pthread_join(t1, NULL);pthread_join(t2, NULL);
}
2.4 生产者消费者模型
2.4.1 为何要使用生产者消费者模型
生产着消费者模型就是通过一个容器来解决生产者和消费者之间强耦合问题。生产者和消费者之间不直接通讯,而是通过阻塞队列进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里获取。阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
2.4.2 生产者消费者模型的优点
- 解耦
- 支持并发
- 支持忙闲不均
2.5 基于BlockingQueue的生产者消费者模型
2.5.1 BlockingQueue
在多线程编程中阻塞队列(BlockingQueue)是一种常用与实现生产者消费者模型的数据结构。其与普通的队列区别在于,当对列为空时,从队列中获取元素的操作会被阻塞,知道队列中被放入了元素;当队列满时,从队列中存放元素的操作会被阻塞,直到有元素从队列中取出(以上的操作都是基于不同的线程来说,线程在对阻塞队列操作时会被阻塞)。
2.5.2 C++ queue模拟阻塞队列的生产者消费者模型
代码:
- 为了便于大家理解,我们以单生产者,单消费者,进行讲解
- 刚开始写,我们采用原始接口
- 我们先写单生产,单消费,后面改为多生产,多消费(这里的代码其实不变)
// BlockQueue.hpp#pragma once#include <iostream>
#include <string>
#include <queue>
#include <pthread.h>template<class T>
class BlockQueue
{
public:BlockQueue(int cap):_cap(cap){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_product_cond, nullptr);pthread_cond_init(&_consum_cond, nullptr);_product_wait_num = 0;_consum_wait_num = 0;}void Push(T& in) { // 生产者用的接口pthread_mutex_lock(&_mutex);while(IsFull()) {// 生产线程去等待,是在临界区等待的,还带着锁呢// pthread_cond_wait 调用时:a. 让调用线程等待; b. 将其身上携带的锁释放掉; c. 当条件满足,线程唤醒, pthread_cond_wait要求线性// 必须重新竞争 _mutex 锁,竞争成功,才能返回// 之前:安全_product_wait_num++;pthread_cond_wait(&_product_cond, &_mutex); // 只要等待,必定会有唤醒,唤醒的时候,就要继续从这个地方往下运行_product_wait_num--;// 之后:安全}// 生产者进行生产_block_queue.push(in);// 通知消费者来消费if(_consum_wait_num > 0) pthread_cond_signal(&_consum_cond);pthread_mutex_unlock(&_mutex);}void Pop(T *out) { // 消费者用的接口pthread_mutex_lock(&_mutex);while(IsEmpty()) {// 消费线程去等待,是在临界区等待的,还带着锁呢_consum_wait_num++;pthread_cond_wait(&_consum_cond, &_mutex);_consum_wait_num--;}// 消费者进行消费*out = _block_queue.front();_block_queue.pop(); // 通知生产者来生产if(_product_wait_num > 0)pthread_cond_signal(&_product_cond);pthread_mutex_unlock(&_mutex);}~BlockQueue() {pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_product_cond);pthread_cond_destroy(&_consum_cond);}private:bool IsFull() {return _block_queue.size() == _cap;}bool IsEmpty() {return _block_queue.size() == 0;}private:queue<T> _block_queue; // 阻塞队列,被整体使用int _cap; // 总上限pthread_mutex_t _mutex; // 保护阻塞队列pthread_cond_t _product_cond; // 专门给生产者的条件变量pthread_cond_t _consum_cond; // 专门给消费者的条件变量int _product_wait_num; // 在 _product_cond 条件变量上等待的线程个数int _consum_wait_num; // 在 _consum_cond 条件变量上等待的线程个数
};
注意:
这里采用模板,是想要告诉大家,队列中不仅仅可以放置内置类型,如:int,对象也可以作为任务来参与生产消费的过程。
2.6 为什么 pthread_cond_wait 需要互斥量
- 条件等待是线程间同步的一种手段,如果只有一个线程,条件不满足,一直等下去都不会满足,所以必须要有一个线程通过某些操作,改变共享变量,是原先不满足的变量变得满足,并且友好的通知在条件变量上等待的线程。
- 条件不会无缘无故突然变的满足,必然会牵扯到共享数据的变化。所以一定要用互斥锁来保护。没有互斥锁就无法安全的获取和修改共享数据。
按照上面的说法,我们设计了如下代码:先上锁,发现条件不满足,解锁,然后等待在条件变量上不就可以了吗。
// 错误的设计
pthread_mutex_lock(&mutex);
while (condition_is_false)
{pthread_mutex_unlock(&mutex);// 解锁之后,等待之前,条件可能已经满⾜,信号已经发出,但是该信号可能被错过pthread_cond_wait(&cond, &mutex);pthread_mutex_lock(&mutex);
}
pthread_mutex_unlock(&mutex);
- 由于解锁和等待不是原子操作。调用解锁之后,pthread_cond_wait 之前,如果已经有其他线程获取了互斥量,摒弃条件满足,发出了信号,那么 pthread_cond_wait 将错过这个信号,可能会导致线程一直阻塞在这个 pthread_cond_wait。所以解锁和等待必须是一个原子操作。
- int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex); 进入该函数后,会去看条件变量等于0不?等于,就把互斥量变为1,知道函数返回,把条件变量改为1,互斥量恢复为原样。
2.7 条件变量使用规范
- 等待条件代码
pthread_mutex_lock(&mutex);
while (条件为假) //if??
pthread_cond_wait(cond, mutex);
修改条件
pthread_mutex_unlock(&mutex);
- 给条件发送信号代码
pthread_mutex_lock(&mutex);
设置条件为真
pthread_cond_signal(cond);
pthread_mutex_unlock(&mutex);
2.8 条件变量的封装
- 基于上面的基本认识,我们已经知道了条件变量如何使用,我们在这里可以做一个简单的封装。
// Cond.hpp#pragma once#include <iostream>
#include <string>
#include <pthread.h>
#include "Lock.hpp"namespace CondModule
{using namespace LockModule;class Cond{public:Cond() {int n = pthread_cond_init(&_cond, nullptr);if(n != 0) std::cerr << "pthread_cond_init" << std::endl;}void Wait(Mutex& mutex) {int n = pthread_cond_wait(&_cond, mutex.GetMutexPtr());if(n != 0) std::cerr << "pthread_cond_wait" << std::endl;}void Notify() {int n = pthread_cond_signal(&_cond);if(n != 0) std::cerr << "pthread_cond_signal" << std::endl;}void NotifyAll() {int n = pthread_cond_broadcast(&_cond);if(n != 0) std::cerr << "pthread_cond_broadcast" << std::endl;}~Cond() {int n = pthread_cond_destroy(&_cond);if(n != 0) std::cerr << "pthread_cond_destroy" << std::endl;}private:pthread_cond_t _cond; };
}
三、POSIX信号量
POSIX信号量和System V信号量作用相同,都是用于同步操作,达到无冲突访问共享资源的目的。但是POSIX信号量可以用于线程间同步。
初始化信号量
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
- sem:指向需要初始化的信号量对象的指针。该对象必须在调用前分配内存(通常为栈变量或动态分配的内存)。
- pshared:0为线程间共享,非0为进程间共享。
- value:信号量的初始计数值。
销毁信号量
int sem_destroy(sem_t *sem);
等待信号量
int sem_wait(sem_t *sem);
功能:等待信号量,表示要使用资源了,会将信号量值 -1。
发布信号量
int sem_post(sem_t *sem);
功能:发布信号量,表示资源使用完了,可以归还资源,会将信号量值 +1。
3.1 基于环形队列的生产者消费者模型
- 环形队列采用数组模拟,用模运算来模拟环状特性。
- 环形结构起始状态和结束状态是一样的,不好判断为空或者为满,所以可以通过加计数器或者标志位来判断满或者空。另外也可以预留一个空的位置,作为满的状态。
- 但是我们现在有信号量这个计数器,就很简单的进行多线程的同步过程。
// Sem.hpp
// 随手封装一下#pragma once#include <iostream>
#include <semaphore.h>class Sem
{
public:Sem(int n) {sem_init(&_sem, 0, n);}void P() {sem_wait(&_sem);}void V() {sem_post(&_sem);}~Sem() {sem_destroy(&_sem);}
private:sem_t _sem;
};
// RingQueue.hpp#pragma once#include <iostream>
#include <pthread.h>
#include <string>
#include <vector>
#include "Sem.hpp"template<class T>
class RingQueue
{
public:RingQueue(int cap):_cap(cap),_ring_queue(cap),_product_step(0),_consum_step(0),_space_sem(cap),_data_sem(0){pthread_mutex_init(_prodect_mutex);pthread_mutex_init(_consum_mutex);}void Push(const T& in) {_space_sem.P();Lock(_prodect_mutex);// 一定有空间_ring_queue[_product_step++] = in;_product_step %= _cap;Unlock(_prodect_mutex);_data_sem.V();}void Pop(T *out) {_data_sem.P();Lock(_consum_mutex);*out = _ring_queue[_consum_step++];_consum_step %= _cap;Unlock(_consum_mutex);_space_sem.V();}~RingQueue() {pthread_mutex_destroy(&_prodect_mutex);pthread_mutex_destroy(&_consum_mutex);}
private:void Lock(pthread_mutex_t& mutex) {pthread_mutex_lock(&mutex);}void Unlock(pthread_mutex_t& mutex) {pthread_mutex_unlock(&mutex);}
private:// 环形队列std::vector<T> _ring_queue;int _cap; // 环形队列容量上限int _product_step; // 生产者下标int _consum_step; // 消费者下标// 定义信号量Sem _space_sem; // 生产者关心Sem _data_sem; // 消费者关心// 定义锁,维护多生产多消费之间关系pthread_mutex_t _prodect_mutex;pthread_mutex_t _consum_mutex;
};