Linux:线程的同步与互斥
一.线程互斥
1-1.线程互斥的相关概念
- 临界资源:多线程执⾏流共享的资源就叫做临界资源(比如当同一个进程的多个线程同时向屏幕进行输出时,标准输出流COUT就是一个临界资源)。
- 临界区:每个线程内部,访问临界资源的代码,就叫做临界区。
- 互斥:任何时刻,互斥保证有且只有⼀个执⾏流进⼊临界区,访问临界资源,通常对临界资源起保护作⽤。
- 原⼦性:不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完成(底层逻辑其实就是该操作仅有一条汇编语句,比如X=1就是原子操作)。
1-2.互斥锁mutex
-
⼤部分情况,线程使⽤的数据都是局部变量,变量的地址空间在线程栈空间内,这种情况,变量归属单个线程,其他线程⽆法获得这种变量。
-
但有时候,很多变量都需要在线程间共享,这样的变量称为共享变量,可以通过数据的共享,完成线程之间的交互。(这里说的共享变量可以理解为我们上面说的临界资源)
多个线程并发的操作共享变量,会带来⼀些问题。我们以一个抢票的例子,假设此时有1000张票,让多线程并发的去访问这个全局变量tickets。抢到0后停止:
const int default_size = 5;
int tickets = 1000;
void *Rountinue(void *arg)
{
std::string name = static_cast<const char *>(arg);
while (true)
{
if(tickets <= 0)
{
break;
}
std::cout << "线程:" << name << "抢到一张票,目前剩余tickets:" << --tickets << std::endl;
}
return nullptr;
}
int main()
{
std::vector<pthread_t> tids(default_size);
for (int i = 0; i < default_size; i++)
{
std::string name = "thread-" + std::to_string(i);
pthread_create(&tids[i], nullptr, Rountinue, (void *)name.c_str());
}
for (int i = 0; i < default_size; i++)
{
pthread_join(tids[i], nullptr);
}
return 0;
}
运行结果如下:
我们此时会发现,明明我们设置了每个线程在tickets<=0时退出。但最后票数却减到了负数。这是临界资源没有被保护的结果。上面代码段while循环部分是临界区(因为if语句和输出语句均访问了临界资源tickets)。我们来分析下原因:
1.ticket--
的非原子性是主要原因
ticket--
并非原子操作,它在底层会被拆解为多个步骤:读取ticket
的值 → 减1 → 写回新值。如果多个线程同时执行这一操作,可能会导致数据竞争(Data Race)。例如:
线程A和线程B同时读取ticket=1
。两者都执行减1并写回,最终ticket=0
,但实际卖出了两张票(数据不一致)。
2.ticket > 0
的条件判断也会导致并发问题
在检查if (ticket > 0)
时,多个线程可能同时通过条件判断(如ticket=1
时,4个线程都可能进入),导致它们都尝试卖票,最终可能使ticket
变为负数(如-4
),出现超卖现象。
就好比说一个单人房间,仅允许一个人居住。但由于没有上锁,其他人也可以进来,导致了该资源被多个对象同时访问。所以基于此种现象,为了确保每次访问该临界资源时只有一个线程可以访问,我们需要对临界区加锁以解决问题,Linux中称这种锁为互斥锁。下面我们来熟悉一下它的常用接口:
1-2-1.互斥锁的常用接口
初始化互斥锁的时候有两种方法,一种是全局定义(锁要确保所有对象都能看见,其也是一个临界资源):
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER
它会在代码运行结束后自动释放。还有一种是局部定义:
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const
pthread_mutexattr_t *restrict attr);
参数:
mutex:要初始化的互斥量
attr:NULL//定义锁的属性,我们一般使用默认即可
销毁互斥锁使⽤ PTHREAD_ MUTEX_ INITIALIZER 初始化的互斥量不需要手动销毁,也就是全局定义的会自动销毁。其次需要注意的是:
- 不要销毁⼀个已经加锁的互斥量
- 已经销毁的互斥量,要确保后⾯不会有线程再尝试加锁
int pthread_mutex_destroy(pthread_mutex_t *mutex);
使用互斥锁进行加锁解锁操作:
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
返回值:成功返回0,失败返回错误号
线程调用pthread_mutex_lock时会有下面两种情况(说明加锁是一个原子操作,要么索取锁成功,要么失败):
- 互斥量处于未锁状态,该函数会将互斥量锁定,同时返回成功
- 发起函数调⽤时,其他线程已经锁定互斥量,或者存在其他线程同时申请互斥量,但没有竞争到互斥量,那么pthread_ lock调⽤会陷⼊阻塞(执⾏流被挂起),等待互斥量解锁。
改进上⾯的售票系统:
pthread_mutex_t glock = PTHREAD_MUTEX_INITIALIZER;
const int default_size = 5;
int tickets = 1000;
void *Rountinue(void *arg)
{
std::string name = static_cast<const char *>(arg);
while (true)
{
pthread_mutex_lock(&glock);
//避免虚假唤醒
if(tickets <= 0)
{
pthread_mutex_unlock(&glock);
break;
}
std::cout << "线程:" << name << "抢到一张票,目前剩余tickets:" << --tickets << std::endl;
pthread_mutex_unlock(&glock);
}
return nullptr;
}
int main()
{
std::vector<pthread_t> tids(default_size);
for (int i = 0; i < default_size; i++)
{
std::string name = "thread-" + std::to_string(i);
pthread_create(&tids[i], nullptr, Rountinue, (void *)name.c_str());
}
for (int i = 0; i < default_size; i++)
{
pthread_join(tids[i], nullptr);
}
return 0;
}
此时票数就不会出现卖到负数的情况了:
1-2-2.互斥锁实现原理的探究
其实现方式有软件和硬件两个方面的实现方式。软件方面最简单的实现方式就是去掉访问临界区的线程的时间片,当他访问完临界区之后再还给它时间片。但万一线程出bug了,在临界区呆着不走了。直接就会导致系统死机。这需要确保临界区代码没有任何问题。而硬件方面,一种较为简单的方式则是基于交换的思想实现的,看下图:
我们来解释下这串伪代码。这里我们的锁初始值假设为1,而每个线程栈内存储的锁值为0。当线程执行lock时,系统会先清除0寄存器中的内容。然后与mutex交换值(此时al寄存器中的值为1,mutex中的值为0)。接下来if判断成功,当前线程获取锁,返回。调度下一个线程时,当前运行的线程要带走自己的上下文数据,将1带走。下一个线程来获取锁时,mutex与al寄存器中进行交换(0 - 0),寄存器中的值为0,if判断失败。从而被挂起。之前获取锁的线程归还锁后,唤醒该挂起线程,该线程继续尝试获取锁。这便是硬件方面上加锁的一种简单实现方式。
1-3.基于1-2-1的常用接口对mutex进行简单封装
#pragma once
#include <iostream>
#include <pthread.h>
namespace MutexModule
{
class Mutex
{
public:
Mutex()
{
pthread_mutex_init(&_mutex, nullptr);
}
void Lock()
{
int n = pthread_mutex_lock(&_mutex);
(void)n;
}
void Unlock()
{
int n = pthread_mutex_unlock(&_mutex);
(void)n;
}
~Mutex()
{
pthread_mutex_destroy(&_mutex);
}
pthread_mutex_t *Get()
{
return &_mutex;
}
private:
pthread_mutex_t _mutex;
};
class LockGuard
{
public:
LockGuard(Mutex &mutex):_mutex(mutex)
{
_mutex.Lock();
}
~LockGuard()
{
_mutex.Unlock();
}
private:
Mutex &_mutex;
};
}
此时我们便可以直接像使用智能指针那样使用LockGuard对临界区加锁,并在退出临界区之前自动解锁。
二.线程同步
细心的读者或许已经发现,我们上面对临界区加锁时。虽然解决了票数为负数的BUG。但是打印结果时我们发现经常会出现一个线程一直抢票而其他线程却一直在“休眠”。这就是典型的线程饥饿问题。我们举个通俗的例子来理解。比如有一个图书馆,图书馆里面有一个仅能一个人使用的带锁的单间。你去的早了拿到了单间钥匙。在你拥有锁的时候,无论你在不在单间学,别人都没办法进来,都要搁外面等着。你一刻不退还钥匙,别人就要多等一刻。那此时,别人(其他线程)就是饥饿状态。
我们是不希望这种情况出现的。我们希望的是线程依次获取锁来访问临界区代码。此时我们可以这样,对于每一个线程的锁请求无论有没有锁都先保留。让主线程来依次唤醒。以我们上面图书馆的例子可以解释为,此时有一个管理员监督这把钥匙。每个人来要要钥匙都需要经过管理员,管理员同意后把锁给最先来的人,最先来的人学完自己的任务后,想要继续获取钥匙,此时他就必须排到队伍最后等待再次轮到他时管理员给他发钥匙。
2-1.线程同步相关概念
条件变量:
- 当⼀个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。
- 例如⼀个线程访问队列时,发现队列为空,它只能等待,只到其它线程将⼀个节点添加到队列中。这种情况就需要⽤到条件变量。
同步概念与竞态条件:
- 同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从⽽有效避免饥饿问题,叫做同步。
- 竞态条件:因为时序问题,⽽导致程序异常,我们称之为竞态条件。
2-2.条件变量相关函数
下面每个函数后对应的括号为我们上面所举的图书馆例子场景中对应的环节:
初始化(为对应的锁添加管理员):
int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t
*restrict attr);
参数:
cond:要初始化的条件变量
attr:NULL
当然也可以像mutex全局定义,此时便不需要手动释放:
pthread_cond_t gcond = PTHREAD_COND_INITIALIZER;
销毁(撤销锁的管理员):
int pthread_cond_destroy(pthread_cond_t *cond)
等待条件满足(来的学生排在队伍里等待轮到自己管理员发钥匙):
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);
参数:
cond:要在这个条件变量上等待
mutex:互斥量,后⾯详细解释
唤醒等待(管理员给排到的同学发钥匙):
int pthread_cond_signal(pthread_cond_t *cond);
//还有一种直接唤醒所有等待的线程的函数,其实就类似于说管理员摆烂了,直接把钥匙
//扔给排队的学生们,让它们谁抢到了就是谁的
int pthread_cond_broadcast(pthread_cond_t *cond);
此时我们再对上面的抢票系统加以改进:
pthread_mutex_t glock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t gcond = PTHREAD_COND_INITIALIZER;
const int default_size = 5;
int tickets = 1000;
void *Rountinue(void *arg)
{
std::string name = static_cast<const char *>(arg);
while (true)
{
pthread_mutex_lock(&glock);
pthread_cond_wait(&gcond, &glock);
//避免虚假唤醒
if(tickets <= 0)
{
pthread_mutex_unlock(&glock);
break;
}
std::cout << "线程:" << name << "抢到一张票,目前剩余tickets:" << --tickets << std::endl;
pthread_mutex_unlock(&glock);
}
return nullptr;
}
int main()
{
std::vector<pthread_t> tids(default_size);
for (int i = 0; i < default_size; i++)
{
std::string name = "thread-" + std::to_string(i);
pthread_create(&tids[i], nullptr, Rountinue, (void *)name.c_str());
}
while (true)
{
pthread_mutex_lock(&glock);
if (tickets > 0)
{
pthread_cond_signal(&gcond);
}
else
{
pthread_mutex_unlock(&glock);
break;
}
pthread_mutex_unlock(&glock);
usleep(1000);
}
pthread_cond_broadcast(&gcond);//虚假唤醒,确保所有线程退出
for (int i = 0; i < default_size; i++)
{
pthread_join(tids[i], nullptr);
}
return 0;
}
就不会出现线程饥饿的问题了:
2-3.为什么 pthread_cond_wait 需要互斥量?
管理员的例子其实是为了我们方便理解才这样举例的,但实际上cond需要传入锁还有别的原因:
- 条件等待是线程间同步的⼀种⼿段,如果只有⼀个线程,条件不满⾜,⼀直等下去都不会满⾜,所以必须要有⼀个线程通过某些操作,改变共享变量,使原先不满⾜的条件变得满⾜,并且友好的通知等待在条件变量上的线程。
- 条件不会⽆缘⽆故的突然变得满⾜了,必然会牵扯到共享数据的变化。所以⼀定要⽤互斥锁来保护。没有互斥锁就⽆法安全的获取和修改共享数据。
因为管理员(cond)也是会被所有学生(线程)所看见的,所以cond也需要被mutex锁保护,因为它也是一种临界资源!!!
而上面我们改进的代码中,线程先获取了锁,之后又进入了等待。那么等待的时候该线程有锁吗?没有!它此时需要把锁释放。等到该线程再次被唤醒时,它才会继续去获取锁,如果调用的是唤醒所有等待线程的函数,该线程会与其他线程竞争锁,成功继续运行,失败则挂起。
2-4.条件变量的使用规范
等待条件代码
pthread_mutex_lock(&mutex);
while (条件为假)
pthread_cond_wait(cond, mutex);
修改条件
pthread_mutex_unlock(&mutex);
给条件发送信号代码
pthread_mutex_lock(&mutex);
设置条件为真
pthread_cond_signal(cond);
pthread_mutex_unlock(&mutex);
2-5. 基于2-2与1-3对条件变量进行简单的封装
#pragma once
#include <iostream>
#include <pthread.h>
#include "Mutex.hpp"
using namespace MutexModule;
namespace CondModule
{
class Cond
{
public:
Cond()
{
pthread_cond_init(&_cond, nullptr);
}
void Wait(Mutex &mutex)
{
int n = pthread_cond_wait(&_cond, mutex.Get());
(void)n;
}
void Signal()
{
// 唤醒在条件变量下等待的一个线程
int n = pthread_cond_signal(&_cond);
(void)n;
}
void Broadcast()
{
// 唤醒所有在条件变量下等待的线程
int n = pthread_cond_broadcast(&_cond);
(void)n;
}
~Cond()
{
pthread_cond_destroy(&_cond);
}
private:
pthread_cond_t _cond;
};
};
由于篇幅问题。我们下一篇文章再来介绍多线程中的生产者与消费者模型,并分别基于阻塞队列与环形队列实现生产消费模型。在此基础上实现日志与简单的线程池。