当前位置: 首页 > news >正文

现代C++——并发编程

第六章:并发编程

📚 章节概述

并发编程是现代C++的重要特性之一,随着多核处理器的普及,掌握并发编程变得越来越重要。本章将深入探讨C++11及后续版本引入的并发编程工具,包括线程管理、同步原语、原子操作、异步编程等内容。

🎯 学习目标

学完本章后,您将能够:

  • 掌握std::thread的创建和管理
  • 理解各种同步机制(mutex、lock、condition_variable)
  • 使用std::atomic进行无锁编程
  • 掌握std::futurestd::promise进行异步编程
  • 理解内存模型和内存序
  • 实现线程安全的数据结构
  • 设计和实现线程池
  • 掌握并行算法的使用

1. std::thread - 线程基础

📖 理论讲解

std::thread是C++11引入的线程类,提供了跨平台的线程创建和管理接口。每个std::thread对象代表一个执行线程。

💡 基本用法

#include <thread>
#include <iostream>
#include <vector>// 1. 简单的线程函数
void worker_function(int id) {std::cout << "Worker " << id << " is working" << std::endl;std::this_thread::sleep_for(std::chrono::milliseconds(100));std::cout << "Worker " << id << " finished" << std::endl;
}void demo_basic_threading() {std::cout << "=== 基础多线程示例 ===" << std::endl;// 创建多个线程std::vector<std::thread> threads;for (int i = 1; i <= 3; ++i) {threads.emplace_back(worker_function, i);}// 等待所有线程完成for (auto& t : threads) {if (t.joinable()) {t.join();}}std::cout << "All threads completed" << std::endl;
}

🚀 高级线程操作

// 1. 线程与Lambda表达式
void demo_lambda_threading() {std::cout << "=== Lambda线程示例 ===" << std::endl;int shared_data = 0;// 使用Lambda创建线程std::thread t1([&shared_data]() {for (int i = 0; i < 5; ++i) {shared_data++;std::cout << "Thread 1: " << shared_data << std::endl;std::this_thread::sleep_for(std::chrono::milliseconds(50));}});std::thread t2([&shared_data]() {for (int i = 0; i < 5; ++i) {shared_data++;std::cout << "Thread 2: " << shared_data << std::endl;std::this_thread::sleep_for(std::chrono::milliseconds(50));}});t1.join();t2.join();std::cout << "Final shared_data: " << shared_data << std::endl;
}// 2. 线程与函数对象
class ThreadWorker {
private:std::string name_;int iterations_;public:ThreadWorker(std::string name, int iterations) : name_(std::move(name)), iterations_(iterations) {}void operator()() {for (int i = 0; i < iterations_; ++i) {std::cout << name_ << " iteration " << i << std::endl;std::this_thread::sleep_for(std::chrono::milliseconds(100));}}
};void demo_functor_threading() {std::cout << "=== 函数对象线程示例 ===" << std::endl;ThreadWorker worker1("Worker-A", 3);ThreadWorker worker2("Worker-B", 3);std::thread t1(worker1);std::thread t2(worker2);t1.join();t2.join();
}// 3. 线程管理和异常安全
class ThreadGuard {
private:std::thread& thread_;public:explicit ThreadGuard(std::thread& t) : thread_(t) {}~ThreadGuard() {if (thread_.joinable()) {thread_.join();}}// 禁止拷贝ThreadGuard(const ThreadGuard&) = delete;ThreadGuard& operator=(const ThreadGuard&) = delete;
};void demo_thread_management() {std::cout << "=== 线程管理示例 ===" << std::endl;std::thread worker([]() {std::this_thread::sleep_for(std::chrono::milliseconds(200));std::cout << "Worker thread completed" << std::endl;});ThreadGuard guard(worker);  // RAII管理线程// 即使发生异常,ThreadGuard的析构函数也会确保线程被正确joinstd::cout << "Main thread continues..." << std::endl;// ThreadGuard析构时会自动join线程
}

2. 同步机制 - mutex和锁

📖 理论讲解

多线程编程中最重要的是同步,防止数据竞争。C++提供了多种同步原语,包括mutex、lock_guard、unique_lock等。

💡 基本同步

#include <mutex>
#include <shared_mutex>// 1. 基本mutex使用
std::mutex counter_mutex;
int shared_counter = 0;void increment_counter(int thread_id, int increments) {for (int i = 0; i < increments; ++i) {std::lock_guard<std::mutex> lock(counter_mutex);  // RAII锁管理++shared_counter;std::cout << "Thread " << thread_id << ": counter = " << shared_counter << std::endl;}
}void demo_mutex_basic() {std::cout << "=== 互斥量和锁示例 ===" << std::endl;shared_counter = 0;  // 重置计数器std::vector<std::thread> threads;// 创建多个线程同时访问共享资源for (int i = 1; i <= 3; ++i) {threads.emplace_back(increment_counter, i, 5);}for (auto& t : threads) {t.join();}std::cout << "Final counter value: " << shared_counter << std::endl;
}

🚀 高级锁机制

// 1. unique_lock的灵活性
std::mutex flexible_mutex;
int flexible_data = 0;void demo_unique_lock() {std::cout << "=== unique_lock示例 ===" << std::endl;std::vector<std::thread> threads;for (int i = 0; i < 3; ++i) {threads.emplace_back([i]() {std::unique_lock<std::mutex> lock(flexible_mutex);flexible_data += 10;std::cout << "Thread " << i << " incremented data to " << flexible_data << std::endl;// 可以提前释放锁lock.unlock();// 做一些不需要锁的工作std::this_thread::sleep_for(std::chrono::milliseconds(50));// 重新获取锁lock.lock();flexible_data += 5;std::cout << "Thread " << i << " final data: " << flexible_data << std::endl;// lock析构时自动释放锁});}for (auto& t : threads) {t.join();}
}// 2. 读写锁 (shared_mutex)
std::shared_mutex rw_mutex;
std::string shared_data = "Initial Data";void reader_thread(int id) {std::shared_lock<std::shared_mutex> lock(rw_mutex);  // 共享锁(读锁)std::cout << "Reader " << id << " reads: " << shared_data << std::endl;std::this_thread::sleep_for(std::chrono::milliseconds(100));
}void writer_thread(int id) {std::unique_lock<std::shared_mutex> lock(rw_mutex);  // 独占锁(写锁)shared_data = "Data modified by writer " + std::to_string(id);std::cout << "Writer " << id << " wrote: " << shared_data << std::endl;std::this_thread::sleep_for(std::chrono::milliseconds(100));
}void demo_shared_mutex() {std::cout << "=== 读写锁示例 ===" << std::endl;std::vector<std::thread> threads;// 创建多个读线程和写线程for (int i = 1; i <= 3; ++i) {threads.emplace_back(reader_thread, i);}threads.emplace_back(writer_thread, 1);for (int i = 4; i <= 6; ++i) {threads.emplace_back(reader_thread, i);}threads.emplace_back(writer_thread, 2);for (auto& t : threads) {t.join();}
}// 3. 死锁避免 - std::lock
std::mutex mutex1, mutex2;void task1() {std::lock(mutex1, mutex2);  // 同时锁定两个mutex,避免死锁std::lock_guard<std::mutex> lock1(mutex1, std::adopt_lock);  // adopt_lock表示已经锁定std::lock_guard<std::mutex> lock2(mutex2, std::adopt_lock);std::cout << "Task 1 acquired both locks" << std::endl;std::this_thread::sleep_for(std::chrono::milliseconds(100));
}void task2() {std::lock(mutex2, mutex1);  // 不同的锁定顺序,但使用std::lock避免死锁std::lock_guard<std::mutex> lock1(mutex1, std::adopt_lock);std::lock_guard<std::mutex> lock2(mutex2, std::adopt_lock);std::cout << "Task 2 acquired both locks" << std::endl;std::this_thread::sleep_for(std::chrono::milliseconds(100));
}void demo_deadlock_avoidance() {std::cout << "=== 死锁避免示例 ===" << std::endl;std::thread t1(task1);std::thread t2(task2);t1.join();t2.join();std::cout << "Both tasks completed without deadlock" << std::endl;
}

3. std::atomic - 原子操作

📖 理论讲解

原子操作是不可分割的操作,可以避免数据竞争而无需使用锁。std::atomic提供了硬件级别的原子操作支持。

💡 基本原子操作

#include <atomic>// 1. 基本原子类型
std::atomic<int> atomic_counter{0};
std::atomic<bool> atomic_flag{false};void atomic_increment(int thread_id, int increments) {for (int i = 0; i < increments; ++i) {atomic_counter.fetch_add(1);  // 原子递增// 或者简单地使用 ++atomic_counter;}std::cout << "Thread " << thread_id << " completed" << std::endl;
}void demo_atomic_basic() {std::cout << "=== 原子操作示例 ===" << std::endl;atomic_counter = 0;  // 重置计数器std::vector<std::thread> threads;// 创建多个线程进行原子操作for (int i = 1; i <= 4; ++i) {threads.emplace_back(atomic_increment, i, 1000);}for (auto& t : threads) {t.join();}std::cout << "Final atomic counter: " << atomic_counter.load() << std::endl;
}

🚀 高级原子操作

// 1. 原子标志和自旋锁
class SpinLock {
private:std::atomic_flag flag_ = ATOMIC_FLAG_INIT;public:void lock() {while (flag_.test_and_set(std::memory_order_acquire)) {// 自旋等待std::this_thread::yield();  // 让出CPU时间片}}void unlock() {flag_.clear(std::memory_order_release);}
};SpinLock spin_lock;
int spin_protected_data = 0;void demo_spin_lock() {std::cout << "=== 自旋锁示例 ===" << std::endl;std::vector<std::thread> threads;for (int i = 0; i < 4; ++i) {threads.emplace_back([i]() {for (int j = 0; j < 100; ++j) {spin_lock.lock();++spin_protected_data;spin_lock.unlock();}std::cout << "Thread " << i << " completed" << std::endl;});}for (auto& t : threads) {t.join();}std::cout << "Final spin protected data: " << spin_protected_data << std::endl;
}// 2. Compare-and-swap操作
std::atomic<int> cas_value{0};void demo_compare_and_swap() {std::cout << "=== Compare-and-swap示例 ===" << std::endl;std::vector<std::thread> threads;for (int i = 0; i < 3; ++i) {threads.emplace_back([i]() {for (int attempt = 0; attempt < 10; ++attempt) {int expected = cas_value.load();int desired = expected + 1;// 原子比较并交换if (cas_value.compare_exchange_weak(expected, desired)) {std::cout << "Thread " << i << " successfully updated value to " << desired << std::endl;break;} else {std::cout << "Thread " << i << " CAS failed, expected " << (desired - 1) << " but got " << expected << std::endl;}std::this_thread::sleep_for(std::chrono::milliseconds(1));}});}for (auto& t : threads) {t.join();}
}// 3. 内存序示例
std::atomic<bool> ready{false};
std::atomic<int> data{0};void producer() {data.store(42, std::memory_order_relaxed);        // 写入数据ready.store(true, std::memory_order_release);     // 发布信号
}void consumer() {while (!ready.load(std::memory_order_acquire)) {  // 等待信号std::this_thread::yield();}std::cout << "Consumer got data: " << data.load(std::memory_order_relaxed) << std::endl;
}void demo_memory_ordering() {std::cout << "=== 内存序示例 ===" << std::endl;ready = false;data = 0;std::thread producer_thread(producer);std::thread consumer_thread(consumer);producer_thread.join();consumer_thread.join();
}

4. 条件变量 - 线程间通信

📖 理论讲解

条件变量是一种同步原语,允许线程等待某个条件成立。它通常与mutex一起使用,提供了高效的线程间通信机制。

💡 生产者-消费者模式

#include <condition_variable>
#include <queue>// 1. 经典生产者-消费者问题
std::queue<int> buffer;
std::mutex buffer_mutex;
std::condition_variable buffer_cv;
const size_t MAX_BUFFER_SIZE = 5;void producer(int id, int items) {for (int i = 1; i <= items; ++i) {std::unique_lock<std::mutex> lock(buffer_mutex);// 等待缓冲区有空间buffer_cv.wait(lock, []() { return buffer.size() < MAX_BUFFER_SIZE; });int item = id * 100 + i;buffer.push(item);std::cout << "Producer " << id << " produced: " << item << " (buffer size: " << buffer.size() << ")" << std::endl;lock.unlock();buffer_cv.notify_one();  // 通知消费者std::this_thread::sleep_for(std::chrono::milliseconds(100));}
}void consumer(int id, int items) {for (int i = 0; i < items; ++i) {std::unique_lock<std::mutex> lock(buffer_mutex);// 等待缓冲区有数据buffer_cv.wait(lock, []() { return !buffer.empty(); });int item = buffer.front();buffer.pop();std::cout << "Consumer " << id << " consumed: " << item << " (buffer size: " << buffer.size() << ")" << std::endl;lock.unlock();buffer_cv.notify_one();  // 通知生产者std::this_thread::sleep_for(std::chrono::milliseconds(150));}
}void demo_producer_consumer() {std::cout << "=== 条件变量示例 ===" << std::endl;// 清空缓冲区std::queue<int> empty;buffer.swap(empty);std::vector<std::thread> threads;// 创建生产者和消费者线程threads.emplace_back(producer, 1, 10);threads.emplace_back(producer, 2, 10);threads.emplace_back(consumer, 1, 10);threads.emplace_back(consumer, 2, 10);for (auto& t : threads) {t.join();}std::cout << "Producer-Consumer demo completed" << std::endl;
}

🚀 高级条件变量应用

// 1. 线程池实现基础
template<typename T>
class ThreadSafeQueue {
private:mutable std::mutex mutex_;std::queue<T> queue_;std::condition_variable condition_;public:void push(const T& item) {std::lock_guard<std::mutex> lock(mutex_);queue_.push(item);condition_.notify_one();}bool try_pop(T& item) {std::lock_guard<std::mutex> lock(mutex_);if (queue_.empty()) {return false;}item = queue_.front();queue_.pop();return true;}void wait_and_pop(T& item) {std::unique_lock<std::mutex> lock(mutex_);condition_.wait(lock, [this] { return !queue_.empty(); });item = queue_.front();queue_.pop();}bool empty() const {std::lock_guard<std::mutex> lock(mutex_);return queue_.empty();}size_t size() const {std::lock_guard<std::mutex> lock(mutex_);return queue_.size();}
};// 2. 屏障同步
class Barrier {
private:std::mutex mutex_;std::condition_variable condition_;size_t count_;size_t waiting_;public:explicit Barrier(size_t count) : count_(count), waiting_(0) {}void wait() {std::unique_lock<std::mutex> lock(mutex_);++waiting_;if (waiting_ == count_) {waiting_ = 0;condition_.notify_all();} else {condition_.wait(lock, [this] { return waiting_ == 0; });}}
};void demo_barrier() {std::cout << "=== 屏障同步示例 ===" << std::endl;const int num_threads = 3;Barrier barrier(num_threads);std::vector<std::thread> threads;for (int i = 0; i < num_threads; ++i) {threads.emplace_back([i, &barrier]() {// 第一阶段工作std::cout << "Thread " << i << " doing phase 1 work" << std::endl;std::this_thread::sleep_for(std::chrono::milliseconds(100 * (i + 1)));std::cout << "Thread " << i << " finished phase 1" << std::endl;// 等待所有线程完成第一阶段barrier.wait();// 第二阶段工作std::cout << "Thread " << i << " doing phase 2 work" << std::endl;std::this_thread::sleep_for(std::chrono::milliseconds(50));std::cout << "Thread " << i << " finished phase 2" << std::endl;});}for (auto& t : threads) {t.join();}std::cout << "All threads synchronized" << std::endl;
}

5. std::future和std::promise - 异步编程

📖 理论讲解

std::futurestd::promise提供了线程间传递值的机制,std::async则提供了便捷的异步任务执行接口。

💡 基本异步操作

#include <future>// 1. std::async基本使用
int calculate_sum(int start, int end) {std::cout << "Calculating sum from " << start << " to " << end << std::endl;int sum = 0;for (int i = start; i <= end; ++i) {sum += i;}std::this_thread::sleep_for(std::chrono::milliseconds(100));return sum;
}void demo_async_basic() {std::cout << "=== 异步编程和Future示例 ===" << std::endl;// 启动异步任务auto future1 = std::async(std::launch::async, calculate_sum, 1, 1000);auto future2 = std::async(std::launch::async, calculate_sum, 1001, 2000);auto future3 = std::async(std::launch::async, calculate_sum, 2001, 3000);std::cout << "Tasks started, doing other work..." << std::endl;// 获取结果int result1 = future1.get();int result2 = future2.get();int result3 = future3.get();std::cout << "Results: " << result1 << ", " << result2 << ", " << result3 << std::endl;std::cout << "Total: " << (result1 + result2 + result3) << std::endl;
}

🚀 promise和future的高级用法

// 1. std::promise和std::future配合使用
void worker_with_promise(std::promise<std::string> promise, int delay) {std::this_thread::sleep_for(std::chrono::milliseconds(delay));promise.set_value("Hello from worker thread!");
}void demo_promise_future() {std::cout << "=== Promise和Future示例 ===" << std::endl;std::promise<std::string> promise;std::future<std::string> future = promise.get_future();std::thread worker(worker_with_promise, std::move(promise), 200);std::cout << "Waiting for worker thread..." << std::endl;// 等待结果std::string result = future.get();std::cout << "Received: " << result << std::endl;worker.join();
}// 2. 异常处理
int risky_calculation(int value) {if (value < 0) {throw std::invalid_argument("Negative value not allowed");}return value * value;
}void demo_future_exception() {std::cout << "=== Future异常处理示例 ===" << std::endl;auto future1 = std::async(std::launch::async, risky_calculation, 5);auto future2 = std::async(std::launch::async, risky_calculation, -3);try {int result1 = future1.get();std::cout << "Result 1: " << result1 << std::endl;} catch (const std::exception& e) {std::cout << "Exception from future1: " << e.what() << std::endl;}try {int result2 = future2.get();std::cout << "Result 2: " << result2 << std::endl;} catch (const std::exception& e) {std::cout << "Exception from future2: " << e.what() << std::endl;}
}// 3. 超时和状态检查
void demo_future_timeout() {std::cout << "=== Future超时示例 ===" << std::endl;auto slow_task = std::async(std::launch::async, []() {std::this_thread::sleep_for(std::chrono::seconds(2));return 42;});// 检查任务状态auto status = slow_task.wait_for(std::chrono::milliseconds(100));switch (status) {case std::future_status::ready:std::cout << "Task completed immediately" << std::endl;break;case std::future_status::timeout:std::cout << "Task is still running..." << std::endl;break;case std::future_status::deferred:std::cout << "Task is deferred" << std::endl;break;}// 等待任务完成std::cout << "Waiting for task completion..." << std::endl;int result = slow_task.get();std::cout << "Task result: " << result << std::endl;
}

6. 线程池实现

📖 理论讲解

线程池是一种重要的并发模式,预先创建一定数量的线程来执行任务,避免频繁创建和销毁线程的开销。

🚀 简单线程池实现

class SimpleThreadPool {
private:std::vector<std::thread> workers_;ThreadSafeQueue<std::function<void()>> tasks_;std::atomic<bool> stop_;public:explicit SimpleThreadPool(size_t num_threads) : stop_(false) {for (size_t i = 0; i < num_threads; ++i) {workers_.emplace_back([this] {std::function<void()> task;while (!stop_.load()) {if (tasks_.try_pop(task)) {task();} else {std::this_thread::yield();}}});}}~SimpleThreadPool() {stop_.store(true);for (auto& worker : workers_) {if (worker.joinable()) {worker.join();}}}template<typename F>void enqueue(F&& f) {tasks_.push(std::forward<F>(f));}size_t size() const {return workers_.size();}
};void demo_thread_pool() {std::cout << "=== 线程池示例 ===" << std::endl;SimpleThreadPool pool(4);// 提交任务到线程池for (int i = 1; i <= 10; ++i) {pool.enqueue([i] {std::cout << "Task " << i << " executed by thread " << std::this_thread::get_id() << std::endl;std::this_thread::sleep_for(std::chrono::milliseconds(100));});}// 等待一段时间让任务完成std::this_thread::sleep_for(std::chrono::seconds(1));std::cout << "All tasks submitted to thread pool" << std::endl;
}

🧪 实践练习

练习1:线程安全的单例模式

// 实现线程安全的单例模式
template<typename T>
class ThreadSafeSingleton {
public:static T& instance() {// 使用call_once确保只初始化一次std::call_once(once_flag_, []() {instance_.reset(new T);});return *instance_;}private:static std::unique_ptr<T> instance_;static std::once_flag once_flag_;
};

练习2:并发计数器

// 实现一个支持多种操作的并发计数器
class ConcurrentCounter {
public:void increment();void decrement(); int get() const;void reset();bool compare_and_set(int expected, int new_value);private:// 选择合适的同步机制
};

练习3:并行排序算法

// 实现并行快速排序
template<typename Iterator>
void parallel_sort(Iterator first, Iterator last, size_t threshold = 1000);

📝 本章小结

并发编程是现代C++的重要组成部分:

🎯 核心概念回顾

  • std::thread:基本的线程创建和管理
  • 同步机制:mutex、lock_guard、unique_lock、shared_mutex
  • std::atomic:无锁编程的基础
  • std::condition_variable:高效的线程间通信
  • std::future/promise:异步编程和任务管理
  • 线程池:高效的任务执行模式

🚀 最佳实践

  1. 优先使用高级抽象:async、future优于直接使用thread
  2. 避免数据竞争:使用适当的同步机制
  3. 防止死锁:统一锁定顺序,使用std::lock
  4. 选择合适的同步原语:根据场景选择mutex、atomic等
  5. 异常安全:使用RAII管理资源

🎯 性能考虑

  • 原子操作比锁更高效,但功能有限
  • 读写锁适合读多写少的场景
  • 线程池避免频繁创建销毁线程的开销
  • 注意虚假唤醒和内存模型的影响

⚠️ 常见陷阱

  • 数据竞争和未定义行为
  • 死锁和活锁问题
  • 性能开销和可扩展性问题
  • 异常安全和资源管理

掌握并发编程让您能够充分利用现代多核处理器的性能,编写高效的并行程序。


🔗 相关资源

  • C++ Reference - Thread support library
  • C++ Reference - Atomic operations library
  • C++ Concurrency in Action by Anthony Williams
  • Effective Modern C++ - Items 35-40

📚 教程总结

恭喜您完成了现代C++学习教程的全部六个章节!您已经掌握了:

  1. 基础语法增强 - 现代C++的语法改进
  2. 智能指针 - 自动内存管理
  3. STL容器和算法增强 - 高效的数据结构和算法
  4. 高级特性 - Lambda、变参模板等强大特性
  5. 更多现代特性 - C++17/20的新增特性
  6. 并发编程 - 多线程和异步编程
http://www.dtcms.com/a/443239.html

相关文章:

  • Dotnet通过OpenAI方式接入DeepSeek
  • 第八课(零基础友好版)|第一次训练模型(Teachable Machine,超详细)
  • 定制制作网站公司建筑模型网站有哪些
  • 【多线程】无锁数据结构(Lock-Free Data Structures)是什么?
  • YOLO入门教程(番外):卷积神经网络—卷积神经网络(LeNet)
  • 哪里可以做寄生虫网站wordpress链接默认是什么样子
  • 网站开发常用插件怎样保存网站资料 做证据
  • 网站经常被挂码泉州seo排名扣费
  • 做网站需要做需求分析吗修改wordpress登录背景图片
  • 网站建设人员分布知名的摄影网站有哪些
  • MySQL 运维知识点(十四)---- 主从复制
  • Linux常用通配符大全含运行示例
  • 相城seo网站优化软件django企业级问答网站开发
  • (autode计算化学—atoms)AtomCollection原子集类
  • 主键索引和唯一性索引的区别与联系
  • 免备案域名是危险网站荣耀手机商城官方网
  • 电磁兼容试验标准:电快速脉冲EFT
  • 建立一个平台网站需要多少钱大学软件开发需要学什么
  • wordpress安装好了怎么登陆网站电影网站如何做长尾关键词
  • 苏州h5模板建站效果图
  • yy直播官网怀化网站优化多少钱
  • 浅聊一下TCP协议
  • 怎么建设微网站智能建造师证书
  • 网站询盘量制作网站需要什么语言
  • AI大事记8:深蓝与卡斯帕罗夫——符号主义 AI 的巅峰与局限(中)
  • 数据结构—双链表
  • 长沙公司网站建设上海专业网站制作公司
  • 黄冈网站免费投放平台免费下载计算机网站建设及管理
  • Tailwind CSS介绍(现代CSS框架,与传统CSS框架Bootstrap对比)Tailwind介绍
  • 网群企业网站管理系统哈尔滨快速建站专业定制