C++学习:六个月从基础到就业——多线程编程:并发容器与无锁编程
C++学习:六个月从基础到就业——多线程编程:并发容器与无锁编程
本文是我C++学习之旅系列的第五十八篇技术文章,也是第四阶段"并发与高级主题"的第五篇,介绍C++中的并发容器和无锁编程技术。查看完整系列目录了解更多内容。
引言
在前几篇文章中,我们学习了基本的多线程工具,如互斥量、条件变量和future。虽然这些机制能有效确保线程安全,但在高性能场景下,传统的锁机制往往会导致性能瓶颈。本文将介绍两种先进的并发编程技术:并发容器和无锁编程。
并发容器是专为多线程环境设计的数据结构,能够安全高效地在多线程间共享。无锁编程则允许线程在不使用互斥锁的情况下安全共享数据,主要通过原子操作实现。这些技术能在保证线程安全的同时,显著提高并发程序的性能。
目录
- 多线程编程:并发容器与无锁编程
- 引言
- 目录
- 原子类型与操作
- std::atomic基础
- 原子操作类型
- 内存序
- 比较与交换
- 并发容器
- C++标准库的支持
- 第三方并发容器库
- 自定义线程安全容器
- 无锁数据结构
- 无锁编程原理
- ABA问题及解决方案
- 无锁栈实现
- 无锁队列实现
- 性能分析与实践
- 锁vs无锁:何时使用
- 并发计数器案例分析
- 多生产者-多消费者优化
- 总结
原子类型与操作
无锁编程的基础是原子操作,C++11引入了std::atomic
模板类来支持这一功能。
std::atomic基础
std::atomic
是一个模板类,可以包装几乎任何基本类型,提供对该类型的原子访问:
#include <atomic>
#include <thread>
#include <vector>
#include <iostream>void atomic_counter_demo() {std::atomic<int> counter(0); // 原子计数器// 创建10个线程,每个递增计数器10000次std::vector<std::thread> threads;for (int i = 0; i < 10; ++i) {threads.emplace_back([&counter]() {for (int j = 0; j < 10000; ++j) {counter++; // 原子递增,等同于counter.fetch_add(1)}});}// 等待所有线程完成for (auto& t : threads) {t.join();}std::cout << "最终计数: " << counter << std::endl; // 应该精确等于100000
}int main() {atomic_counter_demo();return 0;
}
使用std::atomic
的主要优势:
- 无需互斥锁即可安全访问共享数据
- 操作是原子的,不会被中断
- 通常比互斥锁更高效,特别是在争用较低的情况下
原子操作类型
std::atomic
提供了多种原子操作:
#include <atomic>
#include <iostream>void atomic_operations_demo() {std::atomic<int> value(0);// 存储操作value.store(10);// 加载操作int current = value.load();std::cout << "当前值: " << current << std::endl;// 原子交换int old_value = value.exchange(20);std::cout << "交换前值: " << old_value << ", 当前值: " << value.load() << std::endl;// 原子算术操作int prev = value.fetch_add(5); // 原子加法,返回操作前的值std::cout << "加法前值: " << prev << ", 加法后值: " << value.load() << std::endl;prev = value.fetch_sub(10); // 原子减法std::cout << "减法前值: " << prev << ", 减法后值: " << value.load() << std::endl;// 原子位操作std::atomic<int> flags(0x5); // 二进制: 0101flags.fetch_or(0x6); // 二进制: 0110, 结果应为 0111std::cout << "位或操作后: 0x" << std::hex << flags.load() << std::endl;flags.fetch_and(0x3); // 二进制: 0011, 应与0111相与得 0011std::cout << "位与操作后: 0x" << std::hex << flags.load() << std::dec << std::endl;
}
内存序
原子操作的一个关键方面是内存序(Memory Ordering),它决定了不同线程中的内存操作如何排序:
#include <atomic>
#include <thread>
#include <iostream>void memory_ordering_demo() {std::atomic<bool> ready(false);std::atomic<int> data(0);// 生产者线程std::thread producer([&]() {// 准备数据data.store(42, std::memory_order_relaxed);// 确保数据写入对其他线程可见,再设置ready标志ready.store(true, std::memory_order_release);});// 消费者线程std::thread consumer([&]() {// 等待ready信号,使用获取序while (!ready.load(std::memory_order_acquire)) {// 自旋等待std::this_thread::yield();}// 读取数据,此时数据写入已对此线程可见int value = data.load(std::memory_order_relaxed);std::cout << "消费者读取值: " << value << std::endl; // 保证输出42});producer.join();consumer.join();
}
C++提供了六种内存序选项:
- memory_order_relaxed: 最宽松,仅保证当前操作原子性
- memory_order_consume: 依赖于此操作结果的后续操作不会被重排(已弃用)
- memory_order_acquire: 读取操作,建立同步点,后续操作不会被重排到此前
- memory_order_release: 写入操作,建立同步点,前面操作不会被重排到此后
- memory_order_acq_rel: 结合获取和释放语义
- memory_order_seq_cst: 最严格,建立全序关系(默认)
在实际应用中,获取-释放序(acquire-release)是最常用的模式,可以高效建立线程间同步。
比较与交换
比较与交换(Compare-And-Swap,CAS)是无锁编程的基础操作:
#include <atomic>
#include <iostream>// 使用CAS实现线程安全计数器
class cas_counter {std::atomic<int> value{0};public:void increment() {int expected = value.load();while (!value.compare_exchange_weak(expected, expected + 1)) {// expected被失败的CAS操作自动更新}}int get() const {return value.load();}
};void cas_demo() {std::atomic<int> value(10);// 只有当值等于10时才更新为20int expected = 10;bool success = value.compare_exchange_strong(expected, 20);std::cout << "CAS " << (success ? "成功" : "失败") << std::endl;std::cout << "当前值: " << value.load() << std::endl;std::cout << "Expected值: " << expected << std::endl;// 尝试再次更新,应该失败,此时expected被更新为当前值20expected = 10;success = value.compare_exchange_strong(expected, 30);std::cout << "第二次CAS " << (success ? "成功" : "失败") << std::endl;std::cout << "当前值: " << value.load() << std::endl;std::cout << "Updated expected值: " << expected << std::endl;
}
compare_exchange_weak
和compare_exchange_strong
是两个CAS操作函数:
weak
版本在某些平台上可能出现伪失败(spurious failure),但性能更好strong
版本保证只在实际值不等于期望值时才失败
并发容器
虽然C++标准库没有提供完整的并发容器集合,但我们可以使用标准组件构建线程安全容器,或使用第三方库。
C++标准库的支持
标准库不直接提供线程安全容器,但提供了原子类型等基础构建块。使用std::atomic_flag
可以实现简单的自旋锁:
#include <atomic>
#include <thread>
#include <iostream>
#include <vector>// 使用atomic_flag实现简单自旋锁
class spinlock {std::atomic_flag flag = ATOMIC_FLAG_INIT;public:void lock() {while (flag.test_and_set(std::memory_order_acquire)) {// 自旋等待}}void unlock() {flag.clear(std::memory_order_release);}
};// 使用自旋锁保护STL容器
template<typename T>
class threadsafe_vector {std::vector<T> data;mutable spinlock lock;public:void push_back(const T& value) {lock.lock();data.push_back(value);lock.unlock();}size_t size() const {lock.lock();size_t result = data.size();lock.unlock();return result;}// 更多方法...
};
第三方并发容器库
在实际项目中,通常使用成熟的第三方库提供的并发容器:
-
Intel TBB (Threading Building Blocks):
concurrent_vector
: 允许安全并发追加元素concurrent_hash_map
: 高性能并发哈希表concurrent_queue
: 线程安全队列
-
Boost.Lockfree:
boost::lockfree::queue
: 无锁队列boost::lockfree::stack
: 无锁栈
-
folly (Facebook的开源库):
folly::ConcurrentHashMap
: 高性能并发哈希表
使用这些库的示例:
// Intel TBB示例
#include <tbb/concurrent_vector.h>
#include <tbb/concurrent_queue.h>
#include <iostream>
#include <thread>
#include <vector>void tbb_demo() {// 并发向量tbb::concurrent_vector<int> vec;// 并发队列tbb::concurrent_queue<int> queue;// 多线程向容器添加元素std::vector<std::thread> threads;for (int i = 0; i < 10; ++i) {threads.emplace_back([&, i]() {for (int j = 0; j < 100; ++j) {vec.push_back(i * 100 + j);queue.push(i * 100 + j);}});}for (auto& t : threads) {t.join();}std::cout << "向量大小: " << vec.size() << std::endl; // 应为1000int value;int count = 0;while (queue.try_pop(value)) {count++;}std::cout << "队列中元素数: " << count << std::endl; // 应为1000
}
自定义线程安全容器
有时我们需要实现自定义的线程安全容器。以下是一个基于细粒度锁的线程安全链表:
#include <memory>
#include <mutex>
#include <algorithm>template<typename T>
class threadsafe_list {
private:struct node {std::mutex m;std::shared_ptr<T> data;std::unique_ptr<node> next;node() : next(nullptr) {}node(T const& value) : data(std::make_shared<T>(value)), next(nullptr) {}};node head;public:threadsafe_list() {}~threadsafe_list() {remove_if([](const T&){ return true; });}// 禁止复制和赋值threadsafe_list(threadsafe_list const&) = delete;threadsafe_list& operator=(threadsafe_list const&) = delete;// 添加元素到链表头部void push_front(T const& value) {std::unique_ptr<node> new_node(new node(value));std::lock_guard<std::mutex> lk(head.m);new_node->next = std::move(head.next);head.next = std::move(new_node);}// 对每个元素应用函数template<typename Function>void for_each(Function f) {node* current = &head;std::unique_lock<std::mutex> lk(head.m);while (node* next = current->next.get()) {std::unique_lock<std::mutex> next_lk(next->m);lk.unlock();f(*next->data);current = next;lk = std::move(next_lk);}}// 查找满足条件的第一个元素template<typename Predicate>std::shared_ptr<T> find_first_if(Predicate p) {node* current = &head;std::unique_lock<std::mutex> lk(head.m);while (node* next = current->next.get()) {std::unique_lock<std::mutex> next_lk(next->m);lk.unlock();if (p(*next->data)) {return next->data;}current = next;lk = std::move(next_lk);}return std::shared_ptr<T>();}// 移除满足条件的元素template<typename Predicate>void remove_if(Predicate p) {node* current = &head;std::unique_lock<std::mutex> lk(head.m);while (node* next = current->next.get()) {std::unique_lock<std::mutex> next_lk(next->m);if (p(*next->data)) {std::unique_ptr<node> old_next = std::move(current->next);current->next = std::move(next->next);next_lk.unlock();} else {lk.unlock();current = next;lk = std::move(next_lk);}}}
};
这种实现的特点是:
- 使用细粒度锁,每个节点有自己的互斥量
- 链表操作时只锁定相关节点,而非整个链表
- 遍历时采用手递手锁定方式,提高并发性能
无锁数据结构
无锁编程原理
无锁编程的核心是使用原子操作(主要是CAS)替代互斥锁:
#include <atomic>
#include <memory>
#include <iostream>// 无锁计数器,不同实现方式的性能比较
class lock_free_counter {std::atomic<int> value{0};public:// 使用fetch_addvoid increment_fetch_add() {value.fetch_add(1, std::memory_order_relaxed);}// 使用CASvoid increment_cas() {int expected = value.load(std::memory_order_relaxed);while (!value.compare_exchange_weak(expected, expected + 1,std::memory_order_relaxed,std::memory_order_relaxed)) {// expected自动更新为当前值}}int get() const {return value.load(std::memory_order_relaxed);}
};
无锁编程的优势:
- 避免了互斥锁的上下文切换开销
- 减少了线程阻塞,改善了系统响应性
- 在高竞争环境下可能提供更好的性能
- 消除了死锁风险
缺点:
- 实现更复杂,需要深入理解内存模型
- 容易出现难以调试的错误
- 可能出现ABA问题和内存管理问题
ABA问题及解决方案
ABA问题是无锁编程中的经典问题:当一个值从A变为B,再变回A时,CAS操作会认为值未改变,而实际上可能已经发生了变化。
解决方案是使用版本号或指针标记:
#include <atomic>
#include <memory>
#include <iostream>// 带版本计数器的指针包装器
template<typename T>
class versioned_ptr {
private:struct counted_ptr {T* ptr;unsigned version;};std::atomic<counted_ptr> data;public:versioned_ptr() : data({nullptr, 0}) {}void store(T* new_ptr) {counted_ptr old_value = data.load();counted_ptr new_value{new_ptr, old_value.version + 1};data.store(new_value);}T* load() const {return data.load().ptr;}bool compare_exchange_strong(T*& expected_ptr, T* new_ptr) {counted_ptr old_value = data.load();if (old_value.ptr != expected_ptr) {expected_ptr = old_value.ptr;return false;}counted_ptr new_value{new_ptr, old_value.version + 1};bool success = data.compare_exchange_strong(old_value, new_value);if (!success) {expected_ptr = old_value.ptr;}return success;}
};
现代C++实现通常使用std::shared_ptr
和内存回收技术(如引用计数或回收器)来解决ABA问题。
无锁栈实现
下面是一个简单的无锁栈实现,使用CAS操作进行头节点的原子更新:
#include <atomic>
#include <memory>template<typename T>
class lock_free_stack {
private:struct node {std::shared_ptr<T> data;node* next;node(T const& data_) : data(std::make_shared<T>(data_)) {}};std::atomic<node*> head;public:lock_free_stack() : head(nullptr) {}void push(T const& data) {node* const new_node = new node(data);new_node->next = head.load();// 尝试CAS,如果失败则更新new_node->next并重试while (!head.compare_exchange_weak(new_node->next, new_node));}std::shared_ptr<T> pop() {node* old_head = head.load();while (old_head && !head.compare_exchange_weak(old_head, old_head->next));if (!old_head) {return std::shared_ptr<T>(); // 栈为空}std::shared_ptr<T> res(old_head->data); // 获取数据副本delete old_head; // 清理节点return res;}// 检查栈是否为空bool empty() const {return head.load() == nullptr;}
};
这个实现存在内存管理问题——弹出节点后立即删除可能导致其他线程访问已删除内存。实际应用中需要使用更复杂的内存管理技术,如引用计数或回收器。
无锁队列实现
Michael-Scott队列是经典的无锁队列算法,下面是其简化实现:
#include <atomic>
#include <memory>template<typename T>
class lock_free_queue {
private:struct node {std::shared_ptr<T> data;std::atomic<node*> next;node() : next(nullptr) {}};std::atomic<node*> head;std::atomic<node*> tail;public:lock_free_queue() {node* dummy = new node;head.store(dummy);tail.store(dummy);}~lock_free_queue() {// 清理队列中的节点while (node* const old_head = head.load()) {head.store(old_head->next);delete old_head;}}void push(T const& data) {// 创建新节点std::shared_ptr<T> new_data = std::make_shared<T>(data);node* p = new node;node* old_tail = tail.load();node* next;// 尝试更新尾节点do {// 检查尾指针是否过时while (old_tail != tail.load()) {old_tail = tail.load();}next = old_tail->next.load();if (next) {// 尾指针已过时,帮助更新tail.compare_exchange_weak(old_tail, next);continue;}// 尝试将新节点添加到队列末尾if (old_tail->next.compare_exchange_weak(next, p)) {p->data = new_data;break;}} while (true);// 尝试更新尾指针tail.compare_exchange_strong(old_tail, p);}std::shared_ptr<T> pop() {node* old_head = head.load();node* next;do {do {// 检查队列是否为空next = old_head->next.load();if (!next) {return std::shared_ptr<T>(); // 队列为空}} while (!head.compare_exchange_weak(old_head, next));// 此时已成功更新头指针std::shared_ptr<T> res = next->data;delete old_head; // 删除旧的头节点(哑节点)return res;} while (true);}
};
无锁队列实现比栈更复杂,需要处理尾指针过时和同步头尾指针等问题。上述实现仍存在一些简化,实际应用中需要更多安全保障。
性能分析与实践
锁vs无锁:何时使用
选择互斥锁还是无锁结构取决于多个因素:
-
竞争程度
- 高竞争:如果多个线程频繁访问同一数据,无锁结构可能优势更明显
- 低竞争:互斥锁可能已经足够高效,且实现更简单
-
操作复杂性
- 简单操作(如计数器):无锁通常更高效
- 复杂操作(大量数据修改):互斥锁可能更适合
-
硬件支持
- 现代CPU对原子操作提供良好支持,但旧硬件可能性能较差
-
安全需求
- 互斥锁的正确性更易保证
- 无锁编程需要更多专业知识和测试
并发计数器案例分析
比较不同计数器实现的性能:
#include <iostream>
#include <thread>
#include <vector>
#include <mutex>
#include <atomic>
#include <chrono>// 使用互斥锁的计数器
class mutex_counter {int value = 0;std::mutex mtx;public:void increment() {std::lock_guard<std::mutex> lock(mtx);++value;}int get() const {std::lock_guard<std::mutex> lock(mtx);return value;}
};// 使用原子变量的计数器
class atomic_counter {std::atomic<int> value{0};public:void increment() {value.fetch_add(1, std::memory_order_relaxed);}int get() const {return value.load(std::memory_order_relaxed);}
};// 分区计数器(减少竞争)
class partitioned_counter {static const int NUM_COUNTERS = 16; // 假设16核心以内std::atomic<int> counters[NUM_COUNTERS]{};public:void increment(int thread_id) {counters[thread_id % NUM_COUNTERS].fetch_add(1, std::memory_order_relaxed);}int get() const {int sum = 0;for (int i = 0; i < NUM_COUNTERS; ++i) {sum += counters[i].load(std::memory_order_relaxed);}return sum;}
};// 性能测试函数
template<typename Counter>
void benchmark(Counter& counter, const std::string& name, int num_threads, int iterations_per_thread) {std::vector<std::thread> threads;auto start_time = std::chrono::high_resolution_clock::now();// 启动线程for (int i = 0; i < num_threads; ++i) {threads.emplace_back([&counter, iterations_per_thread, i]() {for (int j = 0; j < iterations_per_thread; ++j) {if constexpr (std::is_same_v<Counter, partitioned_counter>) {counter.increment(i); // 分区计数器需要线程ID} else {counter.increment();}}});}// 等待所有线程完成for (auto& t : threads) {t.join();}auto end_time = std::chrono::high_resolution_clock::now();auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);std::cout << name << ":" << std::endl;std::cout << " 线程数: " << num_threads << std::endl;std::cout << " 每线程操作次数: " << iterations_per_thread << std::endl;std::cout << " 执行时间: " << duration.count() << " ms" << std::endl;std::cout << " 最终结果: " << counter.get() << std::endl;std::cout << " 期望结果: " << (num_threads * iterations_per_thread) << std::endl;std::cout << std::endl;
}int main() {const int num_threads = 8;const int iterations = 1000000;{mutex_counter counter;benchmark(counter, "互斥锁计数器", num_threads, iterations);}{atomic_counter counter;benchmark(counter, "原子计数器", num_threads, iterations);}{partitioned_counter counter;benchmark(counter, "分区计数器", num_threads, iterations);}return 0;
}
在多核环境下,通常会观察到以下结果:
- 互斥锁计数器:最慢,因为线程需要等待锁
- 原子计数器:中等性能,但随着线程数增加,竞争会变得严重
- 分区计数器:最快,因为减少了线程间竞争
多生产者-多消费者优化
优化一个真实场景:多线程环境下的生产者-消费者模式:
#include <iostream>
#include <thread>
#include <vector>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <chrono>// 基于互斥锁和条件变量的队列
template<typename T>
class blocking_queue {std::queue<T> queue;std::mutex mtx;std::condition_variable cv;public:void push(T item) {{std::lock_guard<std::mutex> lock(mtx);queue.push(std::move(item));}cv.notify_one();}bool try_pop(T& item) {std::lock_guard<std::mutex> lock(mtx);if (queue.empty()) {return false;}item = std::move(queue.front());queue.pop();return true;}T pop() {std::unique_lock<std::mutex> lock(mtx);cv.wait(lock, [this]{ return !queue.empty(); });T item = std::move(queue.front());queue.pop();return item;}bool empty() const {std::lock_guard<std::mutex> lock(mtx);return queue.empty();}
};// 无锁队列及相关代码已在上文给出// 性能测试
template<typename Queue>
void producer_consumer_test(Queue& q, const std::string& name, int producers, int consumers, int items_per_producer) {std::atomic<int> total_produced(0);std::atomic<int> total_consumed(0);std::atomic<bool> producers_done(false);auto start_time = std::chrono::high_resolution_clock::now();// 创建生产者std::vector<std::thread> producer_threads;for (int i = 0; i < producers; ++i) {producer_threads.emplace_back([&, i]() {for (int j = 0; j < items_per_producer; ++j) {q.push(i * items_per_producer + j);total_produced.fetch_add(1);}});}// 创建消费者std::vector<std::thread> consumer_threads;for (int i = 0; i < consumers; ++i) {consumer_threads.emplace_back([&]() {int item;while (true) {if (q.try_pop(item)) {total_consumed.fetch_add(1);} else if (producers_done && total_consumed.load() >= total_produced.load()) {break; // 所有项目已消费} else {std::this_thread::yield(); // 让出CPU时间}}});}// 等待生产者完成for (auto& t : producer_threads) {t.join();}producers_done = true;// 等待消费者完成for (auto& t : consumer_threads) {t.join();}auto end_time = std::chrono::high_resolution_clock::now();auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);std::cout << name << ":" << std::endl;std::cout << " 生产者数量: " << producers << std::endl;std::cout << " 消费者数量: " << consumers << std::endl;std::cout << " 每个生产者产生项目数: " << items_per_producer << std::endl;std::cout << " 执行时间: " << duration.count() << " ms" << std::endl;std::cout << " 生产项目数: " << total_produced.load() << std::endl;std::cout << " 消费项目数: " << total_consumed.load() << std::endl;std::cout << std::endl;
}
总结
并发容器和无锁编程是构建高性能并发系统的关键技术。通过使用原子操作和精心设计的数据结构,我们可以在不依赖互斥锁的情况下实现线程安全的功能,从而减少同步开销,提高并发性能。
关键要点:
-
原子操作是无锁编程的基础,C++11引入的
std::atomic
提供了类型安全和平台无关的原子操作支持。 -
内存序决定了原子操作的同步语义,正确选择内存序对于性能和正确性至关重要。
-
CAS操作(比较与交换)是实现无锁数据结构的核心机制。
-
ABA问题是无锁编程中的常见陷阱,需要使用版本计数等技术解决。
-
无锁数据结构如栈和队列可以在高竞争环境中提供比互斥锁更好的性能,但实现更复杂。
-
性能权衡:在选择锁与无锁方案时,需根据竞争程度、操作复杂性和安全需求进行权衡。
-
分区技术可以减少竞争,是提高并发性能的有效策略。
虽然无锁编程具有性能优势,但并非所有场景都适合使用无锁技术。简单的场景可能互斥锁方案已经足够好,且更容易实现正确。实际应用中,应根据具体需求和环境选择合适的并发策略。
在下一篇文章中,我们将继续探索C++高级主题,介绍多线程设计模式和并发编程的最佳实践。
这是我C++学习之旅系列的第五十八篇技术文章。查看完整系列目录了解更多内容。