25、C++中的多线程同步机制【中高频】
文章目录
- 线程同步
- 实现线程同步的方式
- 1.原子类型
- 2.互斥量
- 3.锁
- ①死锁
- ②读写锁
- 读写锁操作函数
- 4.条件变量(新特性)
- 生产者消费者模型(条件变量版本)
- 5.信号量(非C++11特性)
- 生产者消费者模型(信号量版本)
线程同步
- 线程的最大优势,就是可以通过 全局变量 来共享信息(即通信),但是代价是:必须要保证多个线程不能 同时修改 同一个变量。当多个线程都操作 同一个全局变量或共享数据 时,可能引发 线程安全问题(又叫线程同步问题)
- 临界区 就是操作共享变量的代码片段。我们必须要保证 临界区是原子的,也就是要么不执行,要么一次性执行完,不能被其他线程中断。所以说,线程同步即:当一个线程正在 操作共享变量时,其他线程 只能等待,不能操作这个变量,直到该线程操作完成。
- 线程同步虽然会对效率造成一定影响,但它是必须要有的;由于只有 临界区 要且必须要 线程同步,其他代码都不需要,所以多线程还是可以实现高并发的
实现线程同步的方式
1.原子类型
- 声明一个类型为T 的原子类型变量t:
atmoic<T> t; // 声明一个类型为T的原子类型变量t
-
特点:
-
如果是对原子类型的变量进行修改,就不需要加锁了。线程会自动地互斥访问原子类型
-
原子类型是不允许拷贝和赋值的,因为atmoic模板类中的拷贝构造、移动构造、赋值运算符重载都被删除掉了
-
#include <atomic>
int main()
{
atomic<int> a1(0);
//atomic<int> a2(a1); // 编译失败
atomic<int> a2(0);
//a2 = a1; // 编译失败
return 0;
}
-
实例:
该例中,使用原子变量后 就不再需要加锁了
2.互斥量
原子类型可以保证一个变量的安全性,但是对于一段代码的安全性,我们只能通过 互斥量和锁 来实现
-
普通互斥量(std:: mutex)
-
注意,线程函数调用 lock() 时,可能会发生以下三种情况:
- 如果该互斥量当前没有被锁住,则调用线程将该互斥量锁住,直到调用 unlock之前,该线程一直拥有该锁
- 如果当前互斥量被其他线程锁住,则当前的调用线程被阻塞住
- 如果当前互斥量被当前调用线程锁住,则会产生死锁(deadlock)
-
线程函数调用 try_lock() 时,可能会发生以下三种情况:
- 如果当前互斥量没有被其他线程占有,则该线程锁住互斥量,直到该线程调用 unlock释放互斥量
- 如果当前互斥量被其他线程锁住,则当前调用线程返回 false,而并不会被阻塞掉
- 如果当前互斥量被当前调用线程锁住,则会产生死锁(deadlock)
-
-
递归互斥量(std:: recursive_mutex)
-
递归互斥锁 允许同一个线程对互斥量多次上锁(即递归上锁),来获得对互斥量对象的多层所有权。释放互斥量时需要调用相同次数的 unlock()。除此之外,std::recursive_mutex 的特性和 std::mutex 大致相同
-
这对于递归函数 可能需要在同一线程中多次获取锁的情况很有用:
-
#include <iostream>
#include <mutex>
#include <thread>
std::recursive_mutex myRecursiveMutex;
void recursiveAccess(int depth) {
std::unique_lock<std::recursive_mutex> lock(myRecursiveMutex);
if (depth > 0) {
recursiveAccess(depth - 1);
}
// 访问共享资源的代码
std::cout << "Accessing shared resource at depth " << depth << "...\n";
}
int main() {
std::thread t1(recursiveAccess, 3);
t1.join();
return 0;
}
```
- **定时互斥量(std::timed_mutex)**
- 比 std::mutex 多了两个成员函数,try_lock_for(),try_lock_until() :
- try_lock_for():函数参数表示一个时间范围,在这一段时间范围之内线程如果没有获得锁 则保持阻塞;如果在此期间其他线程释放了锁,则该线程可获得该互斥锁;如果超时(指定时间范围内没有获得锁),则函数调用返回false。
```c++
timed_mutex myMutex;
chrono::milliseconds timeout(100); //100毫秒
if (myMutex.try_lock_for(timeout))
{
//在100毫秒内获取了锁
//业务代码
myMutex.unlock(); //释放锁
}
else
{
//在100毫秒内没有获取锁
//业务代码
}
-
try_lock_until():函数参数表示一个时刻,在这一时刻之前线程如果没有获得锁则保持阻塞;如果在此时刻前其他线程释放了锁,则该线程可获得该互斥锁;如果超过指定时刻没有获得锁,则函数调用返回false。
-
定时递归互斥量(std::recursive_timed_mutex)
允许同一线程多次获取锁,并提供了超时功能。与std::timed_mutex一样,std::recursive_timed_mutex也提供了try_lock_for()和try_lock_until()方法
3.锁
这里主要介绍两种RAII方式的锁封装(std::lock_guard 和 std::unique_lock),可以动态的释放锁资源,防止线程由于编码失误导致一直持有锁。
-
std::lock_gurad 是 C++11 中定义的模板类,取代了mutex的lock和unlock函数。定义如下:
- lock_guard类模板通过RAII来封装。在需要加锁的地方,只需要用互斥量实例化一个lock_guard对象,此时类内调用构造函数成功上锁;出作用域前,lock_guard对象要被销毁,调用析构函数自动解锁,可以有效避免死锁问题。
- lock_guard对象之间是不能拷贝和赋值的
- lock_guard的缺陷:太单一,用户没有办法对该锁进行控制,因此C++11又提供了unique_lock。
-
独占锁(unique_lock)
-
与lock_guard不同的是,unique_lock更加的灵活(但效率上差一点,内存占用多一点),提供了更多的成员函数:
- 上锁/解锁操作:lock、try_lock、try_lock_for、try_lock_until和unlock
-
//1.立即上锁
std::mutex mtx;
std::unique_lock<std::mutex> lck(mtx);
//2.延迟上锁
std::unique_lock<std::mutex> lck(mtx, std::defer_lock); //只是创建对象,不上锁
lck.lock(); // 此时才上锁
if(lck.try_lock()){
// 已获得锁
}
lck.unlock();
- 修改操作:移动赋值、交换即swap(与另一个unique_lock对象互换所管理的互斥量所有权)、释放即release(返回它所管理的互斥量对象的指针,并释放所有权)
std::unique_lock<std::mutex> ul1(mtx);
std::unique_lock<std::mutex> ul2 = std::move(ul1); // 把ul1的锁转移到ul2上,即现在是ul2对mtx上锁
if (!ul1.owns_lock()) // 现在 ul1 应该不再持有锁了
{
assert(true);
}
- 获取属性:owns_lock(返回当前对象是否上了锁)、operator bool()(与owns_lock()的功能相同)、mutex(返回当前unique_lock所管理的互斥量的指针)
if(lck.owns_lock()){
// 此时拥有锁
}
auto* m = lck.mutex();
①死锁
- 死锁就是 多个进程因为争夺共享资源,导致它们之间互相等待。若无外力干预,这些进程都无法继续执行
- 死锁的几种场景:
- 忘记释放锁(自己不能再用了,其他线程也不能用)
- 重复加同一个锁 (往往是在被锁定的代码区内调用函数,而那个函数内部也有同一个锁,这种错误就比较隐蔽了)
- 多线程多锁,抢占锁资源(线程之间互相需要对方的锁,但是都给不了)
第三个场景的实例:
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
// 创建2个互斥量
pthread_mutex_t mutex1, mutex2;
void * workA(void * arg) {
pthread_mutex_lock(&mutex1);
sleep(1);
pthread_mutex_lock(&mutex2);
printf("workA....\n");
pthread_mutex_unlock(&mutex2);
pthread_mutex_unlock(&mutex1);
return NULL;
}
void * workB(void * arg) {
pthread_mutex_lock(&mutex2);
sleep(1);
pthread_mutex_lock(&mutex1);
printf("workB....\n");
pthread_mutex_unlock(&mutex1);
pthread_mutex_unlock(&mutex2);
return NULL;
}
int main() {
// 初始化互斥量
pthread_mutex_init(&mutex1, NULL);
pthread_mutex_init(&mutex2, NULL);
// 创建2个子线程
pthread_t tid1, tid2;
pthread_create(&tid1, NULL, workA, NULL);
pthread_create(&tid2, NULL, workB, NULL);
// 回收子线程资源
pthread_join(tid1, NULL);
pthread_join(tid2, NULL);
// 释放互斥量资源
pthread_mutex_destroy(&mutex1);
pthread_mutex_destroy(&mutex2);
return 0;
}
②读写锁
-
多个线程同时写共享数据,会出问题;但它们同时读一个共享数据却不会发生问题
-
在 读远大于写的场景中,可以使用 读写锁,使得 多个线程可以同时读 一份共享数据,但是只有一个线程可以写(修改)共享数据
-
特点:
- 如果 有线程在读共享数据,可允许其他线程读,但不能让其他线程写(读的时候不能写)
- 如果 有线程在写共享数据,那么其他线程既不能读,也不能写(写的时候也不能读)
- 获得 写锁的线程 先进入临界区(A、C读锁,B写锁,则先让B修改共享数据,然后再让A、C读)
-
相较于 互斥量,读写锁 本质是提高了 读的并发性
读写锁操作函数
读写锁的类型 pthread_rwlock_t//只有一个变量
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);//初始化
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);//释放变量资源
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);//读锁
int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);//写锁
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);//解锁,共用一个
实例:
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
// 创建一个共享数据
int num = 1;
// pthread_mutex_t mutex;
pthread_rwlock_t rwlock;//读写锁变量
void * writeNum(void * arg) {
while(1) {
pthread_rwlock_wrlock(&rwlock);//写锁
num++;
printf("++write, tid : %ld, num : %d\n", pthread_self(), num);
pthread_rwlock_unlock(&rwlock);//解锁
usleep(100);
}
return NULL;
}
void * readNum(void * arg) {
while(1) {
pthread_rwlock_rdlock(&rwlock);//读锁
printf("===read, tid : %ld, num : %d\n", pthread_self(), num);
pthread_rwlock_unlock(&rwlock);//解锁
usleep(100);
}
return NULL;
}
int main() {
pthread_rwlock_init(&rwlock, NULL);//在main中先初始化
// 创建3个写线程,5个读线程
pthread_t wtids[3], rtids[5];
for(int i = 0; i < 3; i++) {
pthread_create(&wtids[i], NULL, writeNum, NULL);
}
for(int i = 0; i < 5; i++) {
pthread_create(&rtids[i], NULL, readNum, NULL);
}
// 设置线程分离
for(int i = 0; i < 3; i++) {
pthread_detach(wtids[i]);
}
for(int i = 0; i < 5; i++) {
pthread_detach(rtids[i]);
}
while(1)//因为要等子线程运行完了之后,才能去销毁互斥量,所以要么是 join,要么是 detach+while(1)
//这样才能保证,在销毁前子线程已运行完
{
sleep(1);
}
pthread_rwlock_destroy(&rwlock);
pthread_exit(NULL);
return 0;
}
4.条件变量(新特性)
1.条件变量(condition_variable)总是和 锁、互斥量 结合在一起,通常情况下这个互斥量是 std::mutex,并且 **锁 只能是 std::unique_lock< std::mutex > **。通过这样,可以控制线程的阻塞,同时在满足一定条件后被唤醒。
2.定义:
condition_variable cv;
3.成员函数:
-
等待条件成立: wait,wait_for 和 wait_unitl
-
wait :使当前线程阻塞,直到另一个线程调用 notify_one 或 notify_all。
- 参数(1个或2个):它需要两个参数:一个是 锁 unique_lock< mutex >,它应该在调用 wait 之前由调用线程锁定,并且在 wait 等待期间由 wait 自动解锁;另一个是函数或可调用对象(通常是一个lambda表达式或函数指针),用于检查条件是否满足(这个可以不写)
-
void wait(std::unique_lock<std::mutex>& lock);
// 若发生虚假唤醒,wait 之后的代码将会被执行
template<class Predicate>
void wait(std::unique_lock<std::mutex>& lock, Predicate pred);
/** 相当于
while (!pred()) {
通过检查 pred 返回值,可以避免虚假唤醒
wait(lock);
}
**/
-
特点:wait 会无限期地等待,直到条件满足。它会在每次从 wait 返回时重新获取互斥锁。
-
wait_for :导致当前的线程阻塞直到条件变量被通知,或者超时返回。
- 参数(2个或3个):与 wait 类似,它需要一个 unique_lock< std::mutex > 和 一个函数或可调用对象来检查条件(这个可以不写)。此外,它还需要一个表示等待时间的 (std::chrono::)duration 类型的参数。
template<class Rep, class Period>
std::cv_status wait_for(std::unique_lock<std::mutex>& lock,
const std::chrono::duration<Rep, Period>& rel_time);
// 被其他线程唤醒或发生虚假唤醒,未超时,返回 std::cv_status::no_timeout,
// 超时则返回 std::cv_status::timeout
template<class Rep, class Period, class Predicate>
bool wait_for(std::unique_lock<std::mutex>& lock,
const std::chrono::duration<Rep, Period>& rel_time,
Predicate pred);
// 相当于
// wait_until(lock, std::chrono::steady_clock::now() + rel_time, std::move(pred));
-
特点:wait_for 提供了等待时间的上限。如果在指定的时间内条件没有变为真,则 wait_for 会返回,即使 notify_one 或 notify_all 还没有被调用。
-
wait_until:使当前线程阻塞 直到指定的时间点 或 直到另一个线程调用 notify_one 或 notify_all
- 参数:与 wait 和 wait_for 类似,它需要一个 std::unique_lock< std::mutex > 和 一个函数或可调用对象来检查条件(可以不写)。此外,它还需要一个表示未来某个时间点的 (std::chrono::)time_point 类型的参数。
template<class Clock, class Duration>
std::cv_status wait_until(std::unique_lock<std::mutex>& lock,
const std::chrono::time_point<Clock, Duration>& timeout_time);
// 被其他线程唤醒或发生虚假唤醒,未超时,返回 std::cv_status::no_timeout
// 超时则返回 std::cv_status::timeout
template<class Clock, class Duration, class Pred>
bool wait_until(std::unique_lock<std::mutex>& lock,
const std::chrono::time_point<Clock, Duration>& timeout_time, Pred pred);
/** 相当于
while (!pred()) {
通过检查 pred 返回值,可以避免虚假唤醒
if (wait_until(lock, timeout_time) == std::cv_status::timeout) {
return pred();
}
}
return true;
**/
-
特点:wait_until 允许指定一个绝对的时间点作为等待的结束条件。如果在指定的时间点之前条件没有变为真,则 wait_until 会返回,即使 notify_one 或 notify_all 还没有被调用。
-
以上3个类型的 wait 函数都会在阻塞时 自动释放锁,即 调用 unique_lock 的成员函数 unlock(),来让其他线程获得锁(这就是条件变量只能和 unique_lock 一起使用的原因),否则线程在阻塞时也将一直占有锁。
-
wait 类型函数返回(即解阻塞)时要不是因为被唤醒,要不是因为超时。但是在实际中发现,因为操作系统的原因,wait 类型在不满足条件时,它也会返回,这就导致了虚假唤醒,因此我们一般都是使用带有函数指针或lamba表达式参数的 wait 函数。
-
给出信号: notify_one 和 notify_all 函数
-
notify_one(无参):唤醒 等待的线程之一
-
notify_all(无参):唤醒任何等待的线程
-
生产者消费者模型(条件变量版本)
-
模型中的对象:生产者(可不止一个)、消费者(可不止一个)、容器
-
特点:
- 如果 生产者生产的东西装满了容器,此时生产者需要等待(阻塞),并 唤醒消费者让其消费
- 如果 消费者 把容器中的东西全都消费完了,此时消费者需要等待(阻塞),并 唤醒生产者让其生产
- 生产者线程 和 消费者线程 之间需要做到 线程同步
-
条件变量版本:
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <mutex>
#include <thread>
#include <condition_variable>
// 创建一个互斥量
mutex mtx;
// 创建条件变量
condition_variable cv;
//共享资源就是这个链表
struct Node{
int num;
Node *next;
};
// 头结点
Node * head = NULL;
void * producer(void * arg) {
// 不断的创建新的节点,添加到链表中
while(1) {
unique_lock<mutex> ul(&mtx);
Node * newNode = new Node;
newNode->next = head;
head = newNode;
newNode->num = rand() % 1000;
printf("add node, num : %d, tid : %ld\n", newNode->num, pthread_self());
// 只要生产了一个,就通知消费者消费
cv.notify_one();
ul.unlock();
usleep(100);
}
return NULL;
}
void * customer(void * arg) {
while(1) {
unique_lock<mutex> ul(mtx);
// 保存头结点的指针
Node * tmp = head;
// 判断是否有数据
if(head != NULL) {
// 有数据
head = head->next;
//把数据打印出来
printf("del node, num : %d, tid : %ld\n", tmp->num, pthread_self());
delete(tmp);
ul.unlock();
usleep(100);
} else {
// 没有数据,需要等待
// 当这个函数调用阻塞的时候,会对互斥锁进行解锁,当不阻塞的,继续向下执行,会重新加锁。
cv.wait(ul);
ul.unlock();//须释放,因为下一层循环还要再获取锁
}
}
return NULL;
}
int main() {
// 创建5个生产者线程,和5个消费者线程
thread ptids[5], ctids[5];
for(int i = 0; i < 5; i++) {
ptid[i]=thread(producer);
ctid[i]=thread(custmer);
}
for(int i = 0; i < 5; i++) {
ptids[i].detach();
ctids[i].detach();
}
while(1) {
sleep(10);
}
pthread_exit(NULL);
return 0;
}
5.信号量(非C++11特性)
- 信号量也用于 控制线程的阻塞;但与条件变量不同的是,信号的wait 并不会立即阻塞线程,而是当信号量的value值为0,才会去阻塞,直到 value值 > 0 时才解除阻塞
- 调用 wait,会使 value - 1,并判断是否满足阻塞的条件;调用 post,会使 value + 1
- 控制多个线程的阻塞,只需要一个条件变量;而控制两个线程的阻塞和唤醒,就需要2个信号量,依次类推
- wait 一般用于控制自身线程 某个操作的重复次数;而 post 一般用于让其他线程进行某个操作
信号量操作函数:
信号量的类型 sem_t
int sem_init(sem_t *sem, int pshared, unsigned int value);
- 初始化信号量
- 参数:
- sem : 信号量变量的地址
- pshared : 有两个值,0 表示 用在线程间 ,非0 表示 用在进程间
- value : 信号量中的值
int sem_destroy(sem_t *sem);
- 释放信号量资源
int sem_wait(sem_t *sem);
- 调用一次对信号量的值-1,如果减1之前值为0,就阻塞,直到value值>0(先判断后减1)
int sem_trywait(sem_t *sem);
- sem_trywait()为sem_wait()的非阻塞版,只-1,不进行阻塞
int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);
- 阻塞特定时间
int sem_post(sem_t *sem);
- 调用一次对信号量的值+1
int sem_getvalue(sem_t *sem, int *sval);
生产者消费者模型(信号量版本)
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <semaphore.h>
// 创建一个互斥量
pthread_mutex_t mutex;
// 创建两个信号量,两个线程需要两个信号量
sem_t psem;
sem_t csem;
struct Node{
int num;
struct Node *next;
};
// 头结点
struct Node * head = NULL;
void * producer(void * arg) {
// 不断的创建新的节点,添加到链表中
while(1) {
sem_wait(&psem);//让生产者信号量-1
pthread_mutex_lock(&mutex);
struct Node * newNode = (struct Node *)malloc(sizeof(struct Node));
newNode->next = head;
head = newNode;
newNode->num = rand() % 1000;
printf("add node, num : %d, tid : %ld\n", newNode->num, pthread_self());
pthread_mutex_unlock(&mutex);
sem_post(&csem);//让消费者信号量+1
}
return NULL;
}
void * customer(void * arg) {
while(1) {
sem_wait(&csem);//让消费者信号量-1
pthread_mutex_lock(&mutex);
// 保存头结点的指针
struct Node * tmp = head;
head = head->next;
printf("del node, num : %d, tid : %ld\n", tmp->num, pthread_self());
free(tmp);
pthread_mutex_unlock(&mutex);
sem_post(&psem);//让生产者信号量+1
}
return NULL;
}
int main() {
pthread_mutex_init(&mutex, NULL);
sem_init(&psem, 0, 8);
sem_init(&csem, 0, 0);
// 创建5个生产者线程,和5个消费者线程
pthread_t ptids[5], ctids[5];
for(int i = 0; i < 5; i++) {
pthread_create(&ptids[i], NULL, producer, NULL);
pthread_create(&ctids[i], NULL, customer, NULL);
}
for(int i = 0; i < 5; i++) {
pthread_detach(ptids[i]);
pthread_detach(ctids[i]);
}
while(1) {
sleep(10);
}
pthread_mutex_destroy(&mutex);
pthread_exit(NULL);
return 0;
}