C++ 之多线程和互斥锁原理和使用详解
多线程基础:
为什么需要多线程?
提高性能: 充分利用多核 CPU 的计算能力,让多个任务真正并行执行,缩短程序运行时间(CPU 密集型任务)。
提高响应性: 在 GUI 应用或服务器程序中,主线程(如 UI 线程或网络监听线程)保持响应,将耗时操作(如文件 I/O、复杂计算、网络请求)交给后台线程执行,避免界面“卡死”。
简化建模: 某些问题(如模拟、游戏 AI、实时数据处理)天然适合用多个独立或协作的执行单元来描述。
进程 (Process) vs 线程 (Thread)
进程: 操作系统资源分配的基本单位。每个进程拥有独立的地址空间、数据栈、代码、文件描述符、环境变量等。进程间通信 (IPC) 成本较高(管道、消息队列、共享内存等)。
线程: 进程内的执行流,是 CPU 调度的基本单位。同一进程内的所有线程共享进程的地址空间和资源(如全局变量、堆内存、文件描述符)。每个线程拥有自己独立的栈空间和程序计数器 (PC)。线程间通信 (ITC) 成本较低(主要通过共享内存),但也因此带来了数据竞争 (Data Race) 和同步 (Synchronization) 的难题。
C++ 多线程支持
C++11 之前: 依赖平台特定的 API (如 POSIX Threads
pthreads
on Linux/macOS, Windows Threads API)。C++11 及之后: 在标准库
<thread>
中引入了std::thread
,提供了跨平台的线程管理接口。这是现代 C++ 多线程编程的基础。
互斥锁 (Mutex) 原理:
数据竞争 (Data Race)
当多个线程在没有同步机制的情况下,并发读写同一个共享资源(变量、内存区域、文件等),并且至少有一个线程执行写操作时,就会发生数据竞争。
后果: 程序行为未定义 (Undefined Behavior)。读取的值可能是写操作完成前的旧值、部分完成写入的中间值、甚至是完全混乱的值。程序可能崩溃、产生错误结果、或表现出难以复现的诡异行为。
临界区 (Critical Section)
指访问共享资源的代码段。在任何时刻,只允许一个线程执行临界区内的代码。
互斥锁 (Mutex - Mutual Exclusion Lock)
目的: 保护临界区,确保同一时间只有一个线程可以进入临界区,从而消除数据竞争。
核心操作:
lock()
: 尝试获取锁的所有权。
如果锁当前是空闲的,则调用线程获得锁,并继续执行。
如果锁当前已被其他线程持有,则调用线程被阻塞 (Blocked),进入等待状态,直到锁被释放。
unlock()
: 释放锁的所有权,允许其他被阻塞的线程(如果有)获得锁并继续执行。底层机制 (简化理解):
锁通常由一个内存位置 (Lock Word) 表示。
lock()
操作依赖于处理器的原子指令 (Atomic Instructions) (如test-and-set
,compare-and-swap
- CAS)。这些指令在执行过程中不会被中断,保证了操作的原子性。当线程调用
lock()
:
原子指令检查锁的状态(空闲/已持有)。
如果空闲,原子地将其设置为“已持有”,线程获得锁。
如果已持有,线程将自己加入该锁的等待队列,并让出 CPU(阻塞)。
unlock()
操作:
原子地将锁状态设置为“空闲”。
从等待队列中唤醒一个(或多个)阻塞的线程(具体唤醒策略由操作系统调度器决定)。
关键特性:
互斥性: 保证只有一个线程持有锁。
原子性:
lock()
/unlock()
操作本身是原子的,避免了锁状态管理中的竞争。阻塞/唤醒: 提供线程调度机制。
C++ 中的互斥锁 (
<mutex>
)
std::mutex
: 最基本的互斥锁,不可递归(同一线程多次lock()
会导致死锁)。
std::recursive_mutex
: 允许同一线程多次lock()
(需要相同次数的unlock()
)。
std::timed_mutex
/std::recursive_timed_mutex
: 提供try_lock_for()
/try_lock_until()
,允许尝试获取锁一段时间或直到某个时间点。锁守卫 (Lock Guard - RAII 封装):
std::lock_guard
: C++11 引入。在构造函数中自动lock()
,在析构函数(离开作用域时)自动unlock()
。简单易用,推荐首选。
std::unique_lock
: C++11 引入。功能更强大(但开销稍大):
支持延迟锁定 (
defer_lock
)。支持尝试锁定 (
try_lock()
)。支持超时锁定 (
try_lock_for()
,try_lock_until()
)。可以手动
lock()
/unlock()
(不推荐,除非必须)。可以转移所有权 (
std::move
)。必须与
std::condition_variable
配合使用。为什么 RAII? 确保即使在临界区内发生异常或提前
return
,锁也能被安全释放,避免死锁。
最佳实践:
1. 基础互斥锁(std::mutex)
#include <iostream> #include <thread> #include <mutex>std::mutex mtx; // 全局互斥锁 int shared_data = 0;void increment() {for (int i = 0; i < 100000; ++i) {mtx.lock(); // 获取锁++shared_data; // 临界区操作mtx.unlock(); // 释放锁} }int main() {std::thread t1(increment);std::thread t2(increment);t1.join();t2.join();std::cout << "Result: " << shared_data << std::endl;// 正确输出:200000 }
2. RAII 自动管理(推荐)
void safe_increment() {for (int i = 0; i < 100000; ++i) {std::lock_guard<std::mutex> lock(mtx); // 构造时加锁,析构时自动解锁++shared_data;} }
3. 带超时的互斥锁(std::timed_mutex)
std::timed_mutex tmtx;void try_increment() {for (int i = 0; i < 5; ++i) {if (tmtx.try_lock_for(std::chrono::milliseconds(100))) {// 成功获取锁++shared_data;tmtx.unlock();} else {// 超时处理std::cout << "Timeout!" << std::endl;}} }
最小化临界区:锁内只包含必要操作
// 错误示例(锁范围过大) {std::lock_guard<std::mutex> lock(mtx);result = complex_calculation(); // 耗时操作不应在临界区shared_data = result; }// 正确做法 int result = complex_calculation(); // 在锁外执行计算 {std::lock_guard<std::mutex> lock(mtx);shared_data = result; }
- 避免锁嵌套:容易导致死锁
- 使用RAII管理:确保异常安全
- 优先选择原子操作:对于简单数据类型
#include <atomic> std::atomic<int> counter(0); // 无锁线程安全
死锁预防技巧
固定加锁顺序:所有线程按相同顺序获取锁
使用std::scoped_lock(C++17)
void safe_operation() {std::scoped_lock lock(mtx1, mtx2); // 自动处理多个锁// 操作共享资源 }
超时机制:
std::unique_lock<std::mutex> lock(mtx, std::chrono::milliseconds(100)); if (lock.owns_lock()) {// 成功获取锁 }
经典案例编写:
案例 1:银行账户转账 (保护共享资源)
#include <iostream> #include <thread> #include <mutex> #include <vector>class BankAccount { private:double balance_;std::mutex balance_mutex_; // 每个账户一个互斥锁,保护自己的余额public:explicit BankAccount(double initial_balance) : balance_(initial_balance) {}// 存款 (线程安全)void deposit(double amount) {std::lock_guard<std::mutex> lock(balance_mutex_); // RAII: 构造时加锁,析构时解锁balance_ += amount;std::cout << "Deposited " << amount << ". New balance: " << balance_ << " (Thread: " << std::this_thread::get_id() << ")\n";}// 取款 (线程安全)bool withdraw(double amount) {std::lock_guard<std::mutex> lock(balance_mutex_);if (balance_ >= amount) {balance_ -= amount;std::cout << "Withdrew " << amount << ". New balance: " << balance_ << " (Thread: " << std::this_thread::get_id() << ")\n";return true;}std::cout << "Withdrawal failed (insufficient funds). Requested: " << amount << ", Balance: " << balance_ << " (Thread: " << std::this_thread::get_id() << ")\n";return false;}// 获取余额 (线程安全 - 只读操作通常也需要保护,避免读到部分更新的值)double get_balance() const {std::lock_guard<std::mutex> lock(balance_mutex_);return balance_;}// 静态方法:线程安全的账户间转账static bool transfer(BankAccount& from, BankAccount& to, double amount) {// 关键:避免死锁!必须按固定顺序锁定两个账户的锁。// 方法1: 使用 std::lock 同时锁定多个互斥锁(避免死锁算法)std::unique_lock<std::mutex> lock_from(from.balance_mutex_, std::defer_lock);std::unique_lock<std::mutex> lock_to(to.balance_mutex_, std::defer_lock);std::lock(lock_from, lock_to); // 原子性地同时锁定两个锁,顺序由库保证// 方法2: 按账户ID等唯一标识排序锁定 (此处未实现排序逻辑)// ... 获取from和to的唯一标识并排序 ...// std::lock_guard<std::mutex> lock_first(...);// std::lock_guard<std::mutex> lock_second(...);if (from.balance_ >= amount) {from.balance_ -= amount;to.balance_ += amount;std::cout << "Transferred " << amount << " from acc1 to acc2." << std::endl;return true;}return false;} };int main() {BankAccount acc1(1000.0);BankAccount acc2(200.0);// 创建多个线程模拟并发操作std::vector<std::thread> threads;// 线程1: 多次存款到 acc1threads.emplace_back([&acc1]() {for (int i = 0; i < 5; ++i) {acc1.deposit(100);std::this_thread::sleep_for(std::chrono::milliseconds(50)); // 模拟操作耗时}});// 线程2: 多次从 acc1 取款threads.emplace_back([&acc1]() {for (int i = 0; i < 5; ++i) {acc1.withdraw(150);std::this_thread::sleep_for(std::chrono::milliseconds(70));}});// 线程3: 多次从 acc2 取款threads.emplace_back([&acc2]() {for (int i = 0; i < 5; ++i) {acc2.withdraw(50);std::this_thread::sleep_for(std::chrono::milliseconds(90));}});// 线程4: 多次从 acc1 向 acc2 转账threads.emplace_back([&acc1, &acc2]() {for (int i = 0; i < 3; ++i) {BankAccount::transfer(acc1, acc2, 200);std::this_thread::sleep_for(std::chrono::milliseconds(100));}});// 等待所有线程结束for (auto& t : threads) {t.join();}// 打印最终余额std::cout << "\nFinal balances:\n";std::cout << "Account 1: " << acc1.get_balance() << std::endl;std::cout << "Account 2: " << acc2.get_balance() << std::endl;return 0; }
关键点解释 (案例1):
BankAccount
类封装: 每个账户有自己的balance_
和专属的std::mutex
(balance_mutex_
)。这是非常重要的设计模式:资源与保护它的锁绑定在一起。成员函数加锁:
deposit
,withdraw
,get_balance
都使用std::lock_guard
在操作balance_
前自动加锁,操作后自动解锁。确保对单个账户余额的任何访问都是互斥的。
transfer
方法 (重点难点): 涉及两个账户 (from
和to
)。同时操作两个共享资源。
死锁风险: 如果线程 A 锁定
acc1
后尝试锁定acc2
,同时线程 B 锁定acc2
后尝试锁定acc1
,就会发生死锁,两个线程互相等待对方释放锁。解决方案1 (推荐): 使用
std::lock
和std::unique_lock
(withdefer_lock
)
std::unique_lock
的defer_lock
参数表示构造时不立即加锁。
std::lock(lock1, lock2, ...)
是一个原子操作,它会使用特定的算法(如避免死锁算法)同时锁定所有传入的锁对象,保证不会发生死锁。解决方案2: 固定顺序锁定
为所有账户定义一个全局唯一的排序标准(如账户 ID)。
在
transfer
中,总是先锁定排序靠前的账户的锁,再锁定排序靠后的账户的锁。所有线程都遵循这个顺序,死锁就不会发生。这种方法在代码中需要维护排序逻辑。
输出: 加入了线程 ID 和操作细节的输出,便于观察并发执行情况。注意:
std::cout
本身也是共享资源,多个线程同时输出可能会交错。在实际应用中,可能需要保护std::cout
或使用线程安全的日志库。
std::this_thread::sleep_for
: 模拟实际操作需要时间,增加线程间交错执行的可能性,更容易暴露并发问题。
案例 2:生产者-消费者问题 (线程间协作)
#include <iostream> #include <thread> #include <mutex> #include <condition_variable> #include <queue> #include <chrono> #include <random>const int BUFFER_SIZE = 5; // 缓冲区大小 const int NUM_ITEMS = 20; // 要生产的总物品数// 线程安全的缓冲区 (有限队列) class SafeBuffer { private:std::queue<int> buffer_; // 实际存储数据的队列std::mutex mutex_; // 保护对 buffer_ 的访问std::condition_variable cond_not_full_; // 生产者等待:当缓冲区满时std::condition_variable cond_not_empty_; // 消费者等待:当缓冲区空时public:// 生产者:添加物品到缓冲区void produce(int item) {std::unique_lock<std::mutex> lock(mutex_); // 必须用 unique_lock 配合条件变量// 等待缓冲区不满 (条件变量等待:如果条件不满足[缓冲区满],则释放锁并阻塞;被唤醒后重新获取锁并检查条件)cond_not_full_.wait(lock, [this]() {return buffer_.size() < BUFFER_SIZE;});// 条件满足 (缓冲区不满),添加物品buffer_.push(item);std::cout << "Produced item: " << item << " (Buffer size: " << buffer_.size() << ")\n";// 通知一个等待的消费者:缓冲区现在非空了cond_not_empty_.notify_one();}// 消费者:从缓冲区移除并返回物品int consume() {std::unique_lock<std::mutex> lock(mutex_);// 等待缓冲区不空cond_not_empty_.wait(lock, [this]() {return !buffer_.empty();});// 条件满足 (缓冲区不空),取出物品int item = buffer_.front();buffer_.pop();std::cout << "Consumed item: " << item << " (Buffer size: " << buffer_.size() << ")\n";// 通知一个等待的生产者:缓冲区现在不满cond_not_full_.notify_one();return item;} };// 生产者线程函数 void producer(SafeBuffer& buffer) {for (int i = 1; i <= NUM_ITEMS; ++i) {buffer.produce(i);// 模拟生产耗时std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 200 + 100));} }// 消费者线程函数 void consumer(SafeBuffer& buffer, int id) {for (int i = 0; i < NUM_ITEMS / 2; ++i) { // 两个消费者,各消费一半int item = buffer.consume();// 模拟消费耗时std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 300 + 150));} }int main() {SafeBuffer buffer;// 创建生产者和消费者线程std::thread prod_thread(producer, std::ref(buffer));std::thread cons_thread1(consumer, std::ref(buffer), 1);std::thread cons_thread2(consumer, std::ref(buffer), 2);// 等待所有线程结束prod_thread.join();cons_thread1.join();cons_thread2.join();std::cout << "All items produced and consumed." << std::endl;return 0; }
关键点解释 (案例2):
SafeBuffer
类: 封装了一个线程安全的有限队列 (FIFO)。
std::queue<int> buffer_
: 实际存储数据的队列。
std::mutex mutex_
: 保护整个缓冲区的访问。
std::condition_variable cond_not_full_
: 生产者等待的条件变量。当缓冲区满时,生产者需要等待。
std::condition_variable cond_not_empty_
: 消费者等待的条件变量。当缓冲区空时,消费者需要等待。条件变量 (
std::condition_variable
)
目的: 解决线程间的协作问题。当某个条件不满足时,让线程高效地等待,而不是忙等待 (busy-waiting) 浪费 CPU。
核心操作 (必须与
std::unique_lock<std::mutex>
配合使用):
wait(unique_lock<mutex>& lock, Predicate pred)
:
原子地释放锁
lock
并将线程置于等待状态(阻塞)。
当被其他线程的
notify_one()
或notify_all()
唤醒时:
a) 重新获取锁
lock
。b) 检查谓词
pred()
。
如果
pred()
返回true
,则wait()
返回,线程继续执行(此时锁已被持有)。如果
pred()
返回false
,则再次释放锁并阻塞(回到步骤1)。这是为了防止虚假唤醒 (Spurious Wakeup) - 即没有notify
也可能唤醒,或者条件尚未真正满足。
notify_one()
: 唤醒一个正在此条件变量上等待的线程(如果有)。唤醒哪个线程是不确定的。
notify_all()
: 唤醒所有正在此条件变量上等待的线程。
produce()
方法:
获取锁 (
unique_lock
)。使用
cond_not_full_.wait(lock, predicate)
等待缓冲区不满。
predicate
:[this]() { return buffer_.size() < BUFFER_SIZE; }
条件满足后,添加物品到缓冲区。
通知
cond_not_empty_
(唤醒一个等待的消费者)。
consume()
方法:
获取锁 (
unique_lock
)。使用
cond_not_empty_.wait(lock, predicate)
等待缓冲区不空。
predicate
:[this]() { return !buffer_.empty(); }
条件满足后,取出物品。
通知
cond_not_full_
(唤醒一个等待的生产者)。
std::unique_lock
的必要性: 条件变量的wait
操作需要能够在等待时原子地释放锁,并在唤醒时重新获取锁。std::lock_guard
不能在作用域中间释放和重新获取锁,所以必须使用std::unique_lock
。多个消费者: 本例有两个消费者线程。
notify_one()
通常足够,因为只要缓冲区有数据,任何一个消费者被唤醒都可以消费。notify_all()
可能会导致不必要的惊群效应 (Thundering Herd Problem)。
重要注意事项与最佳实践:
避免死锁 (Deadlock):
按固定顺序获取锁: 如转账案例所示。
使用
std::lock
同时锁定多个互斥量。避免嵌套锁: 如果必须嵌套,要极度小心顺序。
使用锁的粒度: 锁的范围尽量小(临界区尽量短),持有锁的时间尽量短。
考虑使用无锁编程 (Lock-Free Programming): 高级主题,使用原子操作和内存屏障,复杂度高但性能可能更好。
避免活锁 (Livelock): 线程不断尝试某个操作但总是失败(例如两个线程都礼貌地让对方先执行而自己后退,结果谁也无法前进)。通常需要引入随机性或其他策略。
避免优先级反转 (Priority Inversion): 低优先级线程持有高优先级线程需要的锁,导致高优先级线程被阻塞。操作系统通常有机制(如优先级继承)缓解。
RAII 管理锁: 总是使用
std::lock_guard
或std::unique_lock
来管理锁的生命周期,避免忘记unlock()
。最小化共享数据: 从根本上减少同步需求。使用线程局部存储 (
thread_local
),尽量通过消息传递 (如std::future
/std::promise
,std::async
) 或任务队列 (如生产者-消费者) 进行通信。小心虚假唤醒 (Spurious Wakeup): 条件变量的
wait()
可能在没有notify
的情况下返回。务必在wait()
中使用循环检查谓词条件 (Predicate)。wait(lock, predicate)
语法糖已经内部实现了这个循环。性能考虑: 锁操作有开销。过度同步(锁竞争激烈)会严重降低并发性能。分析性能瓶颈,考虑细粒度锁、读写锁 (
std::shared_mutex
C++17)、无锁数据结构等。内存可见性 (Memory Visibility) 与内存顺序 (Memory Order): 多核 CPU 架构下,不同核心的缓存可能导致一个线程的写入不能立即被另一个线程看到。互斥锁的
lock()
/unlock()
操作以及原子操作 (std::atomic
) 都隐式包含了内存屏障 (Memory Barrier) 或 Fence,确保临界区内的内存修改对所有其他线程在获取锁后是可见的。这是互斥锁能工作的基础之一。深入理解需要学习内存模型 (std::memory_order
),这是高级主题。C++ 标准库工具链:
<thread>
:std::thread
,std::this_thread
<mutex>
:mutex
,lock_guard
,unique_lock
,lock
,call_once
<condition_variable>
:condition_variable
,condition_variable_any
<future>
:async
,future
,promise
,packaged_task
(更高级的异步任务模型)
<atomic>
:atomic
(无锁编程基础)
总结:
C++ 多线程编程的核心在于安全地管理共享状态的并发访问。互斥锁 (
std::mutex
及其变体) 是保护临界区、防止数据竞争的基本工具。条件变量 (std::condition_variable
) 用于线程间基于条件的协作。RAII (std::lock_guard
,std::unique_lock
) 是安全使用锁和条件变量的关键模式。理解银行账户转账案例能让你掌握保护单个和多个共享资源的方法,特别是处理死锁的策略。理解生产者-消费者案例能让你掌握线程间如何高效协作和等待条件。
编写健壮高效的多线程程序需要谨慎的设计、对同步原理的深刻理解以及对潜在陷阱(死锁、数据竞争、性能瓶颈)的警惕。务必遵循最佳实践,并善用 C++ 标准库提供的线程安全工具。从简单案例开始,逐步深入复杂的并发模式。
技术 适用场景 优点 std::mutex
基本互斥需求 简单高效 std::lock_guard
简单作用域锁 自动释放,异常安全 std::unique_lock
需要灵活控制(延迟/转移锁所有权) 支持条件变量,更灵活 std::scoped_lock
需要同时获取多个锁(C++17) 避免死锁,自动排序 std::atomic
简单数据类型的原子操作 无锁操作,高性能 正确使用互斥锁是多线程编程的核心技能,需结合具体场景选择合适的同步机制。对于高性能场景,可考虑无锁编程(如原子操作、CAS操作),但实现复杂度较高。