现代C++——并发编程
第六章:并发编程
📚 章节概述
并发编程是现代C++的重要特性之一,随着多核处理器的普及,掌握并发编程变得越来越重要。本章将深入探讨C++11及后续版本引入的并发编程工具,包括线程管理、同步原语、原子操作、异步编程等内容。
🎯 学习目标
学完本章后,您将能够:
- 掌握
std::thread
的创建和管理 - 理解各种同步机制(mutex、lock、condition_variable)
- 使用
std::atomic
进行无锁编程 - 掌握
std::future
和std::promise
进行异步编程 - 理解内存模型和内存序
- 实现线程安全的数据结构
- 设计和实现线程池
- 掌握并行算法的使用
1. std::thread - 线程基础
📖 理论讲解
std::thread
是C++11引入的线程类,提供了跨平台的线程创建和管理接口。每个std::thread
对象代表一个执行线程。
💡 基本用法
#include <thread>
#include <iostream>
#include <vector>// 1. 简单的线程函数
void worker_function(int id) {std::cout << "Worker " << id << " is working" << std::endl;std::this_thread::sleep_for(std::chrono::milliseconds(100));std::cout << "Worker " << id << " finished" << std::endl;
}void demo_basic_threading() {std::cout << "=== 基础多线程示例 ===" << std::endl;// 创建多个线程std::vector<std::thread> threads;for (int i = 1; i <= 3; ++i) {threads.emplace_back(worker_function, i);}// 等待所有线程完成for (auto& t : threads) {if (t.joinable()) {t.join();}}std::cout << "All threads completed" << std::endl;
}
🚀 高级线程操作
// 1. 线程与Lambda表达式
void demo_lambda_threading() {std::cout << "=== Lambda线程示例 ===" << std::endl;int shared_data = 0;// 使用Lambda创建线程std::thread t1([&shared_data]() {for (int i = 0; i < 5; ++i) {shared_data++;std::cout << "Thread 1: " << shared_data << std::endl;std::this_thread::sleep_for(std::chrono::milliseconds(50));}});std::thread t2([&shared_data]() {for (int i = 0; i < 5; ++i) {shared_data++;std::cout << "Thread 2: " << shared_data << std::endl;std::this_thread::sleep_for(std::chrono::milliseconds(50));}});t1.join();t2.join();std::cout << "Final shared_data: " << shared_data << std::endl;
}// 2. 线程与函数对象
class ThreadWorker {
private:std::string name_;int iterations_;public:ThreadWorker(std::string name, int iterations) : name_(std::move(name)), iterations_(iterations) {}void operator()() {for (int i = 0; i < iterations_; ++i) {std::cout << name_ << " iteration " << i << std::endl;std::this_thread::sleep_for(std::chrono::milliseconds(100));}}
};void demo_functor_threading() {std::cout << "=== 函数对象线程示例 ===" << std::endl;ThreadWorker worker1("Worker-A", 3);ThreadWorker worker2("Worker-B", 3);std::thread t1(worker1);std::thread t2(worker2);t1.join();t2.join();
}// 3. 线程管理和异常安全
class ThreadGuard {
private:std::thread& thread_;public:explicit ThreadGuard(std::thread& t) : thread_(t) {}~ThreadGuard() {if (thread_.joinable()) {thread_.join();}}// 禁止拷贝ThreadGuard(const ThreadGuard&) = delete;ThreadGuard& operator=(const ThreadGuard&) = delete;
};void demo_thread_management() {std::cout << "=== 线程管理示例 ===" << std::endl;std::thread worker([]() {std::this_thread::sleep_for(std::chrono::milliseconds(200));std::cout << "Worker thread completed" << std::endl;});ThreadGuard guard(worker); // RAII管理线程// 即使发生异常,ThreadGuard的析构函数也会确保线程被正确joinstd::cout << "Main thread continues..." << std::endl;// ThreadGuard析构时会自动join线程
}
2. 同步机制 - mutex和锁
📖 理论讲解
多线程编程中最重要的是同步,防止数据竞争。C++提供了多种同步原语,包括mutex、lock_guard、unique_lock等。
💡 基本同步
#include <mutex>
#include <shared_mutex>// 1. 基本mutex使用
std::mutex counter_mutex;
int shared_counter = 0;void increment_counter(int thread_id, int increments) {for (int i = 0; i < increments; ++i) {std::lock_guard<std::mutex> lock(counter_mutex); // RAII锁管理++shared_counter;std::cout << "Thread " << thread_id << ": counter = " << shared_counter << std::endl;}
}void demo_mutex_basic() {std::cout << "=== 互斥量和锁示例 ===" << std::endl;shared_counter = 0; // 重置计数器std::vector<std::thread> threads;// 创建多个线程同时访问共享资源for (int i = 1; i <= 3; ++i) {threads.emplace_back(increment_counter, i, 5);}for (auto& t : threads) {t.join();}std::cout << "Final counter value: " << shared_counter << std::endl;
}
🚀 高级锁机制
// 1. unique_lock的灵活性
std::mutex flexible_mutex;
int flexible_data = 0;void demo_unique_lock() {std::cout << "=== unique_lock示例 ===" << std::endl;std::vector<std::thread> threads;for (int i = 0; i < 3; ++i) {threads.emplace_back([i]() {std::unique_lock<std::mutex> lock(flexible_mutex);flexible_data += 10;std::cout << "Thread " << i << " incremented data to " << flexible_data << std::endl;// 可以提前释放锁lock.unlock();// 做一些不需要锁的工作std::this_thread::sleep_for(std::chrono::milliseconds(50));// 重新获取锁lock.lock();flexible_data += 5;std::cout << "Thread " << i << " final data: " << flexible_data << std::endl;// lock析构时自动释放锁});}for (auto& t : threads) {t.join();}
}// 2. 读写锁 (shared_mutex)
std::shared_mutex rw_mutex;
std::string shared_data = "Initial Data";void reader_thread(int id) {std::shared_lock<std::shared_mutex> lock(rw_mutex); // 共享锁(读锁)std::cout << "Reader " << id << " reads: " << shared_data << std::endl;std::this_thread::sleep_for(std::chrono::milliseconds(100));
}void writer_thread(int id) {std::unique_lock<std::shared_mutex> lock(rw_mutex); // 独占锁(写锁)shared_data = "Data modified by writer " + std::to_string(id);std::cout << "Writer " << id << " wrote: " << shared_data << std::endl;std::this_thread::sleep_for(std::chrono::milliseconds(100));
}void demo_shared_mutex() {std::cout << "=== 读写锁示例 ===" << std::endl;std::vector<std::thread> threads;// 创建多个读线程和写线程for (int i = 1; i <= 3; ++i) {threads.emplace_back(reader_thread, i);}threads.emplace_back(writer_thread, 1);for (int i = 4; i <= 6; ++i) {threads.emplace_back(reader_thread, i);}threads.emplace_back(writer_thread, 2);for (auto& t : threads) {t.join();}
}// 3. 死锁避免 - std::lock
std::mutex mutex1, mutex2;void task1() {std::lock(mutex1, mutex2); // 同时锁定两个mutex,避免死锁std::lock_guard<std::mutex> lock1(mutex1, std::adopt_lock); // adopt_lock表示已经锁定std::lock_guard<std::mutex> lock2(mutex2, std::adopt_lock);std::cout << "Task 1 acquired both locks" << std::endl;std::this_thread::sleep_for(std::chrono::milliseconds(100));
}void task2() {std::lock(mutex2, mutex1); // 不同的锁定顺序,但使用std::lock避免死锁std::lock_guard<std::mutex> lock1(mutex1, std::adopt_lock);std::lock_guard<std::mutex> lock2(mutex2, std::adopt_lock);std::cout << "Task 2 acquired both locks" << std::endl;std::this_thread::sleep_for(std::chrono::milliseconds(100));
}void demo_deadlock_avoidance() {std::cout << "=== 死锁避免示例 ===" << std::endl;std::thread t1(task1);std::thread t2(task2);t1.join();t2.join();std::cout << "Both tasks completed without deadlock" << std::endl;
}
3. std::atomic - 原子操作
📖 理论讲解
原子操作是不可分割的操作,可以避免数据竞争而无需使用锁。std::atomic
提供了硬件级别的原子操作支持。
💡 基本原子操作
#include <atomic>// 1. 基本原子类型
std::atomic<int> atomic_counter{0};
std::atomic<bool> atomic_flag{false};void atomic_increment(int thread_id, int increments) {for (int i = 0; i < increments; ++i) {atomic_counter.fetch_add(1); // 原子递增// 或者简单地使用 ++atomic_counter;}std::cout << "Thread " << thread_id << " completed" << std::endl;
}void demo_atomic_basic() {std::cout << "=== 原子操作示例 ===" << std::endl;atomic_counter = 0; // 重置计数器std::vector<std::thread> threads;// 创建多个线程进行原子操作for (int i = 1; i <= 4; ++i) {threads.emplace_back(atomic_increment, i, 1000);}for (auto& t : threads) {t.join();}std::cout << "Final atomic counter: " << atomic_counter.load() << std::endl;
}
🚀 高级原子操作
// 1. 原子标志和自旋锁
class SpinLock {
private:std::atomic_flag flag_ = ATOMIC_FLAG_INIT;public:void lock() {while (flag_.test_and_set(std::memory_order_acquire)) {// 自旋等待std::this_thread::yield(); // 让出CPU时间片}}void unlock() {flag_.clear(std::memory_order_release);}
};SpinLock spin_lock;
int spin_protected_data = 0;void demo_spin_lock() {std::cout << "=== 自旋锁示例 ===" << std::endl;std::vector<std::thread> threads;for (int i = 0; i < 4; ++i) {threads.emplace_back([i]() {for (int j = 0; j < 100; ++j) {spin_lock.lock();++spin_protected_data;spin_lock.unlock();}std::cout << "Thread " << i << " completed" << std::endl;});}for (auto& t : threads) {t.join();}std::cout << "Final spin protected data: " << spin_protected_data << std::endl;
}// 2. Compare-and-swap操作
std::atomic<int> cas_value{0};void demo_compare_and_swap() {std::cout << "=== Compare-and-swap示例 ===" << std::endl;std::vector<std::thread> threads;for (int i = 0; i < 3; ++i) {threads.emplace_back([i]() {for (int attempt = 0; attempt < 10; ++attempt) {int expected = cas_value.load();int desired = expected + 1;// 原子比较并交换if (cas_value.compare_exchange_weak(expected, desired)) {std::cout << "Thread " << i << " successfully updated value to " << desired << std::endl;break;} else {std::cout << "Thread " << i << " CAS failed, expected " << (desired - 1) << " but got " << expected << std::endl;}std::this_thread::sleep_for(std::chrono::milliseconds(1));}});}for (auto& t : threads) {t.join();}
}// 3. 内存序示例
std::atomic<bool> ready{false};
std::atomic<int> data{0};void producer() {data.store(42, std::memory_order_relaxed); // 写入数据ready.store(true, std::memory_order_release); // 发布信号
}void consumer() {while (!ready.load(std::memory_order_acquire)) { // 等待信号std::this_thread::yield();}std::cout << "Consumer got data: " << data.load(std::memory_order_relaxed) << std::endl;
}void demo_memory_ordering() {std::cout << "=== 内存序示例 ===" << std::endl;ready = false;data = 0;std::thread producer_thread(producer);std::thread consumer_thread(consumer);producer_thread.join();consumer_thread.join();
}
4. 条件变量 - 线程间通信
📖 理论讲解
条件变量是一种同步原语,允许线程等待某个条件成立。它通常与mutex一起使用,提供了高效的线程间通信机制。
💡 生产者-消费者模式
#include <condition_variable>
#include <queue>// 1. 经典生产者-消费者问题
std::queue<int> buffer;
std::mutex buffer_mutex;
std::condition_variable buffer_cv;
const size_t MAX_BUFFER_SIZE = 5;void producer(int id, int items) {for (int i = 1; i <= items; ++i) {std::unique_lock<std::mutex> lock(buffer_mutex);// 等待缓冲区有空间buffer_cv.wait(lock, []() { return buffer.size() < MAX_BUFFER_SIZE; });int item = id * 100 + i;buffer.push(item);std::cout << "Producer " << id << " produced: " << item << " (buffer size: " << buffer.size() << ")" << std::endl;lock.unlock();buffer_cv.notify_one(); // 通知消费者std::this_thread::sleep_for(std::chrono::milliseconds(100));}
}void consumer(int id, int items) {for (int i = 0; i < items; ++i) {std::unique_lock<std::mutex> lock(buffer_mutex);// 等待缓冲区有数据buffer_cv.wait(lock, []() { return !buffer.empty(); });int item = buffer.front();buffer.pop();std::cout << "Consumer " << id << " consumed: " << item << " (buffer size: " << buffer.size() << ")" << std::endl;lock.unlock();buffer_cv.notify_one(); // 通知生产者std::this_thread::sleep_for(std::chrono::milliseconds(150));}
}void demo_producer_consumer() {std::cout << "=== 条件变量示例 ===" << std::endl;// 清空缓冲区std::queue<int> empty;buffer.swap(empty);std::vector<std::thread> threads;// 创建生产者和消费者线程threads.emplace_back(producer, 1, 10);threads.emplace_back(producer, 2, 10);threads.emplace_back(consumer, 1, 10);threads.emplace_back(consumer, 2, 10);for (auto& t : threads) {t.join();}std::cout << "Producer-Consumer demo completed" << std::endl;
}
🚀 高级条件变量应用
// 1. 线程池实现基础
template<typename T>
class ThreadSafeQueue {
private:mutable std::mutex mutex_;std::queue<T> queue_;std::condition_variable condition_;public:void push(const T& item) {std::lock_guard<std::mutex> lock(mutex_);queue_.push(item);condition_.notify_one();}bool try_pop(T& item) {std::lock_guard<std::mutex> lock(mutex_);if (queue_.empty()) {return false;}item = queue_.front();queue_.pop();return true;}void wait_and_pop(T& item) {std::unique_lock<std::mutex> lock(mutex_);condition_.wait(lock, [this] { return !queue_.empty(); });item = queue_.front();queue_.pop();}bool empty() const {std::lock_guard<std::mutex> lock(mutex_);return queue_.empty();}size_t size() const {std::lock_guard<std::mutex> lock(mutex_);return queue_.size();}
};// 2. 屏障同步
class Barrier {
private:std::mutex mutex_;std::condition_variable condition_;size_t count_;size_t waiting_;public:explicit Barrier(size_t count) : count_(count), waiting_(0) {}void wait() {std::unique_lock<std::mutex> lock(mutex_);++waiting_;if (waiting_ == count_) {waiting_ = 0;condition_.notify_all();} else {condition_.wait(lock, [this] { return waiting_ == 0; });}}
};void demo_barrier() {std::cout << "=== 屏障同步示例 ===" << std::endl;const int num_threads = 3;Barrier barrier(num_threads);std::vector<std::thread> threads;for (int i = 0; i < num_threads; ++i) {threads.emplace_back([i, &barrier]() {// 第一阶段工作std::cout << "Thread " << i << " doing phase 1 work" << std::endl;std::this_thread::sleep_for(std::chrono::milliseconds(100 * (i + 1)));std::cout << "Thread " << i << " finished phase 1" << std::endl;// 等待所有线程完成第一阶段barrier.wait();// 第二阶段工作std::cout << "Thread " << i << " doing phase 2 work" << std::endl;std::this_thread::sleep_for(std::chrono::milliseconds(50));std::cout << "Thread " << i << " finished phase 2" << std::endl;});}for (auto& t : threads) {t.join();}std::cout << "All threads synchronized" << std::endl;
}
5. std::future和std::promise - 异步编程
📖 理论讲解
std::future
和std::promise
提供了线程间传递值的机制,std::async
则提供了便捷的异步任务执行接口。
💡 基本异步操作
#include <future>// 1. std::async基本使用
int calculate_sum(int start, int end) {std::cout << "Calculating sum from " << start << " to " << end << std::endl;int sum = 0;for (int i = start; i <= end; ++i) {sum += i;}std::this_thread::sleep_for(std::chrono::milliseconds(100));return sum;
}void demo_async_basic() {std::cout << "=== 异步编程和Future示例 ===" << std::endl;// 启动异步任务auto future1 = std::async(std::launch::async, calculate_sum, 1, 1000);auto future2 = std::async(std::launch::async, calculate_sum, 1001, 2000);auto future3 = std::async(std::launch::async, calculate_sum, 2001, 3000);std::cout << "Tasks started, doing other work..." << std::endl;// 获取结果int result1 = future1.get();int result2 = future2.get();int result3 = future3.get();std::cout << "Results: " << result1 << ", " << result2 << ", " << result3 << std::endl;std::cout << "Total: " << (result1 + result2 + result3) << std::endl;
}
🚀 promise和future的高级用法
// 1. std::promise和std::future配合使用
void worker_with_promise(std::promise<std::string> promise, int delay) {std::this_thread::sleep_for(std::chrono::milliseconds(delay));promise.set_value("Hello from worker thread!");
}void demo_promise_future() {std::cout << "=== Promise和Future示例 ===" << std::endl;std::promise<std::string> promise;std::future<std::string> future = promise.get_future();std::thread worker(worker_with_promise, std::move(promise), 200);std::cout << "Waiting for worker thread..." << std::endl;// 等待结果std::string result = future.get();std::cout << "Received: " << result << std::endl;worker.join();
}// 2. 异常处理
int risky_calculation(int value) {if (value < 0) {throw std::invalid_argument("Negative value not allowed");}return value * value;
}void demo_future_exception() {std::cout << "=== Future异常处理示例 ===" << std::endl;auto future1 = std::async(std::launch::async, risky_calculation, 5);auto future2 = std::async(std::launch::async, risky_calculation, -3);try {int result1 = future1.get();std::cout << "Result 1: " << result1 << std::endl;} catch (const std::exception& e) {std::cout << "Exception from future1: " << e.what() << std::endl;}try {int result2 = future2.get();std::cout << "Result 2: " << result2 << std::endl;} catch (const std::exception& e) {std::cout << "Exception from future2: " << e.what() << std::endl;}
}// 3. 超时和状态检查
void demo_future_timeout() {std::cout << "=== Future超时示例 ===" << std::endl;auto slow_task = std::async(std::launch::async, []() {std::this_thread::sleep_for(std::chrono::seconds(2));return 42;});// 检查任务状态auto status = slow_task.wait_for(std::chrono::milliseconds(100));switch (status) {case std::future_status::ready:std::cout << "Task completed immediately" << std::endl;break;case std::future_status::timeout:std::cout << "Task is still running..." << std::endl;break;case std::future_status::deferred:std::cout << "Task is deferred" << std::endl;break;}// 等待任务完成std::cout << "Waiting for task completion..." << std::endl;int result = slow_task.get();std::cout << "Task result: " << result << std::endl;
}
6. 线程池实现
📖 理论讲解
线程池是一种重要的并发模式,预先创建一定数量的线程来执行任务,避免频繁创建和销毁线程的开销。
🚀 简单线程池实现
class SimpleThreadPool {
private:std::vector<std::thread> workers_;ThreadSafeQueue<std::function<void()>> tasks_;std::atomic<bool> stop_;public:explicit SimpleThreadPool(size_t num_threads) : stop_(false) {for (size_t i = 0; i < num_threads; ++i) {workers_.emplace_back([this] {std::function<void()> task;while (!stop_.load()) {if (tasks_.try_pop(task)) {task();} else {std::this_thread::yield();}}});}}~SimpleThreadPool() {stop_.store(true);for (auto& worker : workers_) {if (worker.joinable()) {worker.join();}}}template<typename F>void enqueue(F&& f) {tasks_.push(std::forward<F>(f));}size_t size() const {return workers_.size();}
};void demo_thread_pool() {std::cout << "=== 线程池示例 ===" << std::endl;SimpleThreadPool pool(4);// 提交任务到线程池for (int i = 1; i <= 10; ++i) {pool.enqueue([i] {std::cout << "Task " << i << " executed by thread " << std::this_thread::get_id() << std::endl;std::this_thread::sleep_for(std::chrono::milliseconds(100));});}// 等待一段时间让任务完成std::this_thread::sleep_for(std::chrono::seconds(1));std::cout << "All tasks submitted to thread pool" << std::endl;
}
🧪 实践练习
练习1:线程安全的单例模式
// 实现线程安全的单例模式
template<typename T>
class ThreadSafeSingleton {
public:static T& instance() {// 使用call_once确保只初始化一次std::call_once(once_flag_, []() {instance_.reset(new T);});return *instance_;}private:static std::unique_ptr<T> instance_;static std::once_flag once_flag_;
};
练习2:并发计数器
// 实现一个支持多种操作的并发计数器
class ConcurrentCounter {
public:void increment();void decrement(); int get() const;void reset();bool compare_and_set(int expected, int new_value);private:// 选择合适的同步机制
};
练习3:并行排序算法
// 实现并行快速排序
template<typename Iterator>
void parallel_sort(Iterator first, Iterator last, size_t threshold = 1000);
📝 本章小结
并发编程是现代C++的重要组成部分:
🎯 核心概念回顾
- std::thread:基本的线程创建和管理
- 同步机制:mutex、lock_guard、unique_lock、shared_mutex
- std::atomic:无锁编程的基础
- std::condition_variable:高效的线程间通信
- std::future/promise:异步编程和任务管理
- 线程池:高效的任务执行模式
🚀 最佳实践
- 优先使用高级抽象:async、future优于直接使用thread
- 避免数据竞争:使用适当的同步机制
- 防止死锁:统一锁定顺序,使用std::lock
- 选择合适的同步原语:根据场景选择mutex、atomic等
- 异常安全:使用RAII管理资源
🎯 性能考虑
- 原子操作比锁更高效,但功能有限
- 读写锁适合读多写少的场景
- 线程池避免频繁创建销毁线程的开销
- 注意虚假唤醒和内存模型的影响
⚠️ 常见陷阱
- 数据竞争和未定义行为
- 死锁和活锁问题
- 性能开销和可扩展性问题
- 异常安全和资源管理
掌握并发编程让您能够充分利用现代多核处理器的性能,编写高效的并行程序。
🔗 相关资源
- C++ Reference - Thread support library
- C++ Reference - Atomic operations library
- C++ Concurrency in Action by Anthony Williams
- Effective Modern C++ - Items 35-40
📚 教程总结
恭喜您完成了现代C++学习教程的全部六个章节!您已经掌握了:
- 基础语法增强 - 现代C++的语法改进
- 智能指针 - 自动内存管理
- STL容器和算法增强 - 高效的数据结构和算法
- 高级特性 - Lambda、变参模板等强大特性
- 更多现代特性 - C++17/20的新增特性
- 并发编程 - 多线程和异步编程