【C++】15.并发支持库
本篇内容参考自cplusplus
1. thread
1.1 thread
thread库底层是对各个系统的线程库(Linux下的pthread库和Windows下Thread库)进行封装。C++11thread库的第一个特点是可以跨平台,第二个特点是Linux和Windows下提供的线程库都是面向过程的,C++11 thread是面向对象的,并且融合了一些C++11语言特点。
构造函数与赋值重载函数:
上面线程创建的4个构造函数,最常用的是第2个,它支持传一个可调用对象和参数即可,相比 pthread_create 而言,这里不再局限于只传递函数指针,其次就是参数传递也更方便, pthread_create 调用时,要传递多个参数需要打包成一个结构体,传结构体对象的指针过去。
另外也可以用第1个和第4个配合来创建线程,我们可以把右值线程对象移动构造或者移动赋值给另一个线程对象。
第3个可以看到线程对象是不支持拷贝的。
此外也可以看出线程对象是不支持赋值重载的。
获取线程id:
判断线程是否是正在执行的:
等待从线程:
分离从线程:
使用样例:
#include<iostream>
#include<thread>
#include<vector>
#include<mutex>
using namespace std;void Print(int n, int i)
{for (; i < n; i++){cout << this_thread::get_id() << ":" << i << endl;}cout << endl;
}
int main()
{thread t1(Print, 10, 0);t1.join();thread t2(Print, 20, 10);t2.detach();cout << this_thread::get_id() << endl;return 0;
}
1.2 this_thread
this_thread是一个命名空间,主要封装了线程相关的4个全局接口函数。
• get_id是当前执行线程的线程id。
• yield是主动让出当前线程的执行权,让其他线程先执行。
• sleep_for是阻塞当前线程执行,至少经过指定的sleep_duration。因为调度或资源争议延迟,此函数可能阻塞长于sleep_duration。
• sleep_until阻塞当前线程的执行,直到抵达指定的sleep_time。函数可能会因为调度或资源纠纷延迟而阻塞到sleep_time之后的某个时间点。
2. mutex
mutex是封装互斥锁的类,用于保护临界区的共享数据。
2.1 mutex
调用方线程从它成功调用 lock 或 try_lock 开始,到它调用 unlock 为止占有 mutex。线程占有 mutex 时,其他线程如果试图要求 mutex 的所有权,那么就会阻塞(对于 lock 的调用),对于 try_lock 就会返回false。
如果 mutex 在仍为任何线程所占有时即被销毁,或在占有 mutex 时线程终止,那么行为未定义。
下面代码展示了mutex的使用,其实如果线程对象传参给可调用对象时,使用引用方式传参,实参位置需要加上ref(obj)的方式,主要原因是thread本质还是系统库提供的线程API的封装,thread 构造取到参数包以后,要调用创建线程的API,还是需要将参数包打包成⼀个结构体传参过去,那么打包成结构体时,参考包对象就会拷贝给结构体对象,使用ref传参的参数,会让结构体中的对应参数成员类型推导为引用,这样才能实现引用传参,
#include <iostream>
#include <chrono>
#include <thread>
#include <mutex>
using namespace std;
void Print(int n, int& rx, mutex& rmtx)
{rmtx.lock();for (int i = 0; i < n; i++){// t1 t2++rx;}rmtx.unlock();
}
int main()
{int x = 0;mutex mtx;thread t1(Print, 1000000, ref(x), ref(mtx));thread t2(Print, 2000000, ref(x), ref(mtx));t1.join();t2.join();cout << x << endl;return 0;
}
2.2 其他的 mutex
• recursive_mutex
递归互斥锁,这把锁主要用来递归加锁的场景中,因为递归会引起死锁问题。
为什么会出现死锁?
因为当前在进入递归函数前,申请了锁资源,进入递归函数后(还没有释放锁资源),再次申请锁资源,此时就会出现锁在我手里,但我还申请不到的现象,也就是死锁。
解决这个 死锁 问题的关键在于自己在持有锁资源的情况下,不必再申请,此时就要用到recursive_mutex
• timed_mutex
时间互斥锁,这把锁中新增了定时解锁的功能,可以在程序运行指定时间后,自动解锁(如果还没有解锁的话)
• recursive_time_mutex
递归时间互斥锁,就是对timed_mutex时间互斥锁做了递归方面的升级,使其在面对递归场景时,不会出现死锁。
2.3 RAII 风格的 mutex
• lock_guard
C++11提供的支持RAII方式管理互斥锁资源的类,这样可以更有效的防止因为异常等原因导致的死锁问题。
#include <iostream>
#include <chrono>
#include <thread>
#include <mutex>
using namespace std;
int main()
{int x = 0;mutex mtx;auto Print = [&x, &mtx](size_t n) {lock_guard<mutex> lock(mtx);for (size_t i = 0; i < n; i++)++x;};thread t1(Print, 1000000);thread t2(Print, 2000000);t1.join();t2.join();cout << x << endl;return 0;
}
• unique_lock
也是C++11提供的支持RAII方式管理互斥锁资源的类,相比lock_guard功能更丰富复杂。首先在构造的时候传不同的tag用以支持持在构造的时候不同的方式式处理锁对象。
2.4 lock && try_lock
• lock是一个函数模板,可以支持对多个锁对象同时锁定。
• try_lock也是一个函数模板,尝试对多个锁对象进行同时尝试锁定。如果全部锁对象都锁定了,返 回-1,如果某⼀个锁对象尝试锁定失败,把已经锁定成功的锁对象解锁,并则返回这个对象的下标。
3. atomic
atomic是一个类模板,里面封装了大部分内置类型的++ -- ^ 等原子性操作,这样可以不用加锁来实现内置类型的一些原子操作。load和store可以原子的读取和修改atomic封装存储的T对象。
atomic的原理主要是硬件层面的支持,现代处理器提供了原子指令来支持原子操作。例如,在 x86 架构中有CMPXCHG(比较并交换)指令。这些原子指令能够在一个不可分割的操作中完成对内存的读取、比较和写入操作,简称CAS(Compare And Set 或 Compare And Swap)。
为了处理多个处理器缓存之间的数据一致性问题,硬件采用了缓存一致性协议,当一个atomic 操作修改了一个变量的值,缓存一致性协议会确保其他处理器缓存中的相同变量副本被正确地更新或标记为无效。
#include <atomic>
#include <iostream>
#include <thread>
#include <vector>
using namespace std;
atomic<int> acnt;
int cnt;
void f()
{for (int n = 0; n < 100000; ++n){++acnt;++cnt;}
}
int main()
{std::vector<thread> pool;for (int n = 0; n < 4; ++n)pool.emplace_back(f);for (auto& e : pool)e.join();cout << "原子计数器为 " << acnt.load() << '\n'<< "非原子计数器为 " << cnt << '\n';return 0;
}
4. condition_variable
condition_variable需要配合互斥锁系列进行使用,主要提供wait和notify系统接口。
// 下面演示一个经典问题,两个线程交替打印奇数和偶数
#include <iostream> // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable
using namespace std;
int main()
{std::mutex mtx;condition_variable c;int n = 100;bool flag = true;thread t1([&]() {int i = 0;while (i < n){unique_lock<mutex> lock(mtx);while (!flag){c.wait(lock);}cout << i << endl;flag = false;i += 2; // 偶数 c.notify_one();}});thread t2([&]() {int j = 1;while (j < n){unique_lock<mutex> lock(mtx);while (flag)c.wait(lock);cout << j << endl;j += 2; // 奇数 flag = true;c.notify_one();}});t1.join();t2.join();return 0;
}