C++多线程与锁机制
1. 基本多线程编程
1.1 创建线程
#include <iostream>
#include <thread>void thread_function() {std::cout << "Hello from thread!\n";
}int main() {std::thread t(thread_function); // 创建并启动线程t.join(); // 等待线程结束return 0;
}
1.2 带参数的线程函数
#include <thread>
#include <iostream>void print_num(int num) {std::cout << "Number: " << num << "\n";
}int main() {std::thread t(print_num, 42);t.join();return 0;
}
1.3 join() 和 detach()
std::thread t(threadFunction);// join() - 等待线程完成
t.join();// detach() - 分离线程,线程独立运行
// t.detach();// 检查线程是否可joinable
if (t.joinable()) {t.join();
}
1.4 获取当前线程信息
#include <thread>
#include <iostream>int main() {std::cout << "Main thread ID: " << std::this_thread::get_id() << std::endl;std::thread t([](){std::cout << "Worker thread ID: " << std::this_thread::get_id() << std::endl;});t.join();return 0;
}
1.5 线程休眠
#include <chrono>
#include <thread>int main() {std::cout << "Sleeping for 2 seconds..." << std::endl;std::this_thread::sleep_for(std::chrono::seconds(2));std::cout << "Awake!" << std::endl;return 0;
}
2. mutex (互斥锁)
#include <thread>
#include <mutex>
#include <iostream>std::mutex mtx; // 全局互斥锁
int shared_data = 0;void increment() {for (int i = 0; i < 100000; ++i) {mtx.lock(); // 上锁++shared_data;mtx.unlock(); // 解锁}
}int main() {std::thread t1(increment);std::thread t2(increment);t1.join();t2.join();std::cout << "Final value: " << shared_data << "\n";return 0;
}
2.1 lock_guard (自动管理锁)
lock_guard
在构造时自动上锁,在析构时自动解锁,防止忘记解锁
void increment_safe() {for (int i = 0; i < 100000; ++i) {std::lock_guard<std::mutex> lock(mtx); // 自动上锁++shared_data;} // 自动解锁
}
2.2 unique_lock
unique_lock
比 lock_guard
更灵活,可以手动上锁和解锁。
void increment_flexible() {for (int i = 0; i < 100000; ++i) {std::unique_lock<std::mutex> lock(mtx);++shared_data;lock.unlock(); // 可以手动解锁// 做一些不需要锁的操作lock.lock(); // 再手动上锁++shared_data;}
}
2.3 尝试锁 try_lock()
void tryLockExample() {std::unique_lock<std::mutex> lock(mtx, std::try_to_lock);if (lock.owns_lock()) {// 成功获取锁std::cout << "Got the lock!\n";} else {// 未能获取锁std::cout << "Couldn't get the lock, doing something else...\n";}
}
2.4 递归互斥锁 std::recursive_mutex
#include <mutex>std::recursive_mutex rec_mtx;void recursiveFunction(int count) {std::lock_guard<std::recursive_mutex> lock(rec_mtx);if (count > 0) {std::cout << "Count: " << count << '\n';recursiveFunction(count - 1);}
}int main() {std::thread t(recursiveFunction, 3);t.join();return 0;
}
2.5 定时互斥锁 std::timed_mutex
#include <mutex>
#include <chrono>std::timed_mutex timed_mtx;void timedLockExample() {auto timeout = std::chrono::milliseconds(100);if (timed_mtx.try_lock_for(timeout)) {// 在100ms内成功获取锁std::this_thread::sleep_for(std::chrono::milliseconds(50));timed_mtx.unlock();} else {// 超时未能获取锁std::cout << "Could not get the lock within 100ms\n";}
}
2.6 std::adopt_lock与std::defer_lock
特性 | std::adopt_lock | std::defer_lock |
---|---|---|
用途 | 表示锁已被当前线程获得 | 表示不立即获取锁 |
加锁时机 | 不尝试加锁(假设已锁定) | 稍后手动加锁 |
典型使用场景 | 与 std::lock 配合使用 | 延迟加锁或条件加锁 |
可用性 | 适用于 lock_guard 和 unique_lock | 仅适用于 unique_lock |
adopt_lock
表示当前线程已经获得了互斥锁的所有权,不需要再尝试加锁
#include <mutex>std::mutex mtx;void function() {mtx.lock(); // 手动加锁// 使用 adopt_lock 告诉 lock_guard 我们已经拥有锁std::lock_guard<std::mutex> lock(mtx, std::adopt_lock);// 临界区代码...// 离开作用域时自动解锁
}
3. 条件变量 (condition_variable)
用于线程间的同步,允许线程等待特定条件成立。
#include <thread>
#include <mutex>
#include <condition_variable>
#include <iostream>std::mutex mtx;
std::condition_variable cv;
bool ready = false;void worker() {std::unique_lock<std::mutex> lock(mtx);cv.wait(lock, []{ return ready; }); // 等待ready变为truestd::cout << "Worker is processing data\n";
}int main() {std::thread t(worker);{std::lock_guard<std::mutex> lock(mtx);ready = true;}cv.notify_one(); // 通知等待的线程t.join();return 0;
}
3.1 wait
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock); // 无条件等待,可能虚假唤醒
带谓词的 wait()
cv.wait(lock, []{ return ready; }); // 等价于:
// while (!ready) {
// cv.wait(lock);
// }
wait_for()
- 带超时等待
using namespace std::chrono_literals;
if (cv.wait_for(lock, 100ms, []{ return ready; })) {// 条件在超时前满足
} else {// 超时
}
wait_until()
- 等待到指定时间点
auto timeout = std::chrono::steady_clock::now() + 100ms;
if (cv.wait_until(lock, timeout, []{ return ready; })) {// 条件在时间点前满足
} else {// 超时
}
3.2 notify
notify_one()
- 通知一个等待线程
{std::lock_guard<std::mutex> lock(mtx);ready = true;
}
cv.notify_one(); // 只唤醒一个等待线程
notify_all()
- 通知所有等待线程
{std::lock_guard<std::mutex> lock(mtx);ready = true;
}
cv.notify_all(); // 唤醒所有等待线程
3.3 生产消费者模式示例
#include <queue>
#include <chrono>std::mutex mtx;
std::condition_variable cv;
std::queue<int> data_queue;
const int MAX_SIZE = 10;void producer() {for (int i = 0; i < 20; ++i) {std::unique_lock<std::mutex> lock(mtx);cv.wait(lock, []{ return data_queue.size() < MAX_SIZE; });data_queue.push(i);std::cout << "Produced: " << i << std::endl;lock.unlock();cv.notify_all();std::this_thread::sleep_for(std::chrono::milliseconds(100));}
}void consumer() {while (true) {std::unique_lock<std::mutex> lock(mtx);cv.wait(lock, []{ return !data_queue.empty(); });int data = data_queue.front();data_queue.pop();std::cout << "Consumed: " << data << std::endl;lock.unlock();cv.notify_all();if (data == 19) break; // 结束条件}
}int main() {std::thread p(producer);std::thread c(consumer);p.join();c.join();return 0;
}
4. 原子操作 (atomic)
对于简单的数据类型,可以使用原子操作避免锁的开销。
#include <atomic>
#include <thread>
#include <iostream>std::atomic<int> counter(0);void increment_atomic() {for (int i = 0; i < 100000; ++i) {++counter; // 原子操作,无需锁}
}int main() {std::thread t1(increment_atomic);std::thread t2(increment_atomic);t1.join();t2.join();std::cout << "Counter: " << counter << "\n";return 0;
}
4.1 基本原子类型
#include <atomic>std::atomic<int> atomicInt(0); // 原子整数
std::atomic<bool> atomicBool(false); // 原子布尔值
std::atomic<long> atomicLong; // 默认初始化为0
4.2 加载和存储
// 存储值
atomicInt.store(42); // 原子存储
atomicInt = 42; // 等价写法// 加载值
int value = atomicInt.load(); // 原子加载
value = atomicInt; // 等价写法
4.3 交换操作
int old = atomicInt.exchange(100); // 原子交换为新值,返回旧值
4.4 读-修改-写操作
std::atomic<int> counter(0);// 原子加法,返回旧值
int prev = counter.fetch_add(5); // counter += 5,返回加前的值// 原子减法
prev = counter.fetch_sub(3); // counter -= 3,返回减前的值
std::atomic<int> flags(0);flags.fetch_or(0x01); // 原子按位或
flags.fetch_and(~0x01); // 原子按位与
flags.fetch_xor(0x03); // 原子按位异或
4.5 比较交换 (CAS)
std::atomic<int> value(10);
int expected = 10;// 比较并交换
bool success = value.compare_exchange_weak(expected, 20);
// 如果value == expected,则设置为20,返回true
// 否则将expected更新为当前value,返回false// 强版本 (较少虚假失败)
success = value.compare_exchange_strong(expected, 30);
4.6 内存顺序 (Memory Order)
// 默认是最严格的内存顺序 (sequential consistency)
atomicInt.store(42, std::memory_order_seq_cst);// 宽松内存顺序
atomicInt.store(42, std::memory_order_relaxed);// 常见内存顺序:
// - memory_order_relaxed: 无顺序保证
// - memory_order_consume: 数据依赖顺序
// - memory_order_acquire: 读操作,防止上方读写重排
// - memory_order_release: 写操作,防止下方读写重排
// - memory_order_acq_rel: 读-修改-写操作
// - memory_order_seq_cst: 顺序一致性 (默认)
4.7 原子标志
std::atomic_flag flag = ATOMIC_FLAG_INIT; // 必须这样初始化// 测试并设置 (原子操作)
bool was_set = flag.test_and_set();// 清除标志
flag.clear();
4.8 原子指针
class MyClass {};
MyClass* ptr = new MyClass();
std::atomic<MyClass*> atomicPtr(ptr);// 原子指针操作
MyClass* old = atomicPtr.exchange(new MyClass());// 比较交换指针
MyClass* expected = old;
atomicPtr.compare_exchange_strong(expected, nullptr);
4.9 自定义原子类型
struct Point { int x; int y; };
std::atomic<Point> atomicPoint{Point{1, 2}};// 必须是可平凡复制的类型(trivially copyable)
static_assert(std::is_trivially_copyable<Point>::value, "Point must be trivially copyable");
// 原子操作示例
Point old = atomicPoint.load(); // 原子读取
atomicPoint.store(Point{3, 4}); // 原子写入
Point newVal{5, 6};
Point expected{3, 4};
atomicPoint.compare_exchange_strong(expected, newVal); // CAS操作
std::atomic
对模板类型 T
的关键要求是:
-
可平凡复制(Trivially Copyable):保证对象可以用
memcpy
方式安全复制 -
无用户定义的拷贝控制(析构函数、拷贝/移动构造/赋值)
-
标准布局(Standard Layout)
static_assert
在编译时验证这些条件,若不满足会立即报错(比运行时错误更安全)。
一个类型
T
是 平凡可复制(Trivially Copyable) 的,当且仅当满足以下所有条件:
没有用户定义的拷贝构造函数(
T(const T&)
)没有用户定义的移动构造函数(
T(T&&)
)没有用户定义的拷贝赋值运算符(
T& operator=(const T&)
)没有用户定义的移动赋值运算符(
T& operator=(T&&)
)有一个平凡的(隐式定义的或
=default
)析构函数所有非静态成员和基类也必须是平凡可复制的
不能有虚函数或虚基类
如果满足这些条件,编译器可以安全地使用
memcpy
来复制该类型的对象,而不会引发未定义行为(UB)。
5. 死锁预防
当多个线程需要多个锁时,可能产生死锁。预防方法:
-
总是以相同的顺序获取锁
-
使用
std::lock
同时锁定多个互斥量
std::mutex mtx1, mtx2;void safe_lock() {// 同时锁定两个互斥量,避免死锁std::lock(mtx1, mtx2);std::lock_guard<std::mutex> lock1(mtx1, std::adopt_lock);std::lock_guard<std::mutex> lock2(mtx2, std::adopt_lock);// 安全地访问共享资源
}
6. 线程局部存储 (thread_local)
使用 thread_local
关键字声明线程局部变量,每个线程有自己的副本。
#include <thread>
#include <iostream>thread_local int thread_specific_value = 0;void thread_function(int id) {thread_specific_value = id;std::cout << "Thread " << id << ": " << thread_specific_value << "\n";
}int main() {std::thread t1(thread_function, 1);std::thread t2(thread_function, 2);t1.join();t2.join();return 0;
}
7. 读写锁
读写锁是一种特殊的同步机制,允许多个读操作并发执行,但写操作必须独占访问。这种锁在"读多写少"的场景下能显著提高性能。
C++17 中的 std::shared_mutex
#include <shared_mutex>
#include <vector>class ThreadSafeContainer {
private:std::vector<int> data;mutable std::shared_mutex mutex; // mutable 允许const方法加锁public:// 读操作 - 使用共享锁int get(size_t index) const {std::shared_lock<std::shared_mutex> lock(mutex);return data.at(index);}// 写操作 - 使用独占锁void set(size_t index, int value) {std::unique_lock<std::shared_mutex> lock(mutex);data.at(index) = value;}// 批量读操作示例std::vector<int> getSnapshot() const {std::shared_lock<std::shared_mutex> lock(mutex);return data;}
};
读写锁特性
-
三种访问模式:
-
共享读锁 (
shared_lock
):多个线程可同时持有 -
独占写锁 (
unique_lock
):只有一个线程可持有 -
升级锁 (C++14没有直接支持,需手动实现)
-
-
锁的优先级策略:
-
读优先:容易导致写线程饥饿
-
写优先:可能降低读并发度
-
公平策略:折中方案
-
-
典型使用场景:
-
配置信息的热更新
-
缓存系统
-
高频查询低频修改的数据结构
-
8. 自旋锁 (Spin Lock)
自旋锁是一种非阻塞锁,当线程无法获取锁时不会休眠,而是循环检查锁状态(忙等待)。适用于锁持有时间极短的场景。
基本自旋锁实现
#include <atomic>class SpinLock {std::atomic_flag flag = ATOMIC_FLAG_INIT;public:void lock() {while(flag.test_and_set(std::memory_order_acquire)) {// 可加入CPU暂停指令减少争用时的能耗#ifdef __x86_64____builtin_ia32_pause();#endif}}void unlock() {flag.clear(std::memory_order_release);}bool try_lock() {return !flag.test_and_set(std::memory_order_acquire);}
};
TTAS + Backoff
class AdvancedSpinLock {std::atomic<bool> locked{false};public:void lock() {bool expected = false;int backoff = 1;const int max_backoff = 64;while(!locked.compare_exchange_weak(expected, true, std::memory_order_acquire, std::memory_order_relaxed)) {expected = false; // compare_exchange_weak会修改expected// 指数退避for(int i = 0; i < backoff; ++i) {#ifdef __x86_64____builtin_ia32_pause();#endif}backoff = std::min(backoff * 2, max_backoff);}}void unlock() {locked.store(false, std::memory_order_release);}
};