原子操作:多线程编程
🎯 什么是原子操作
原子操作(Atomic Operation)是指在执行过程中不会被其他线程打断的操作,它要么完全执行,要么完全不执行,不存在中间状态。在多线程编程中,原子操作是实现线程安全的重要工具。
为什么需要原子操作?
考虑一个简单的递增操作:
// ❌ 非线程安全的操作
int counter = 0;// 线程A和线程B同时执行
counter++; // 这不是原子操作!
counter++
实际上包含三个步骤:
- 读取 counter 的值到寄存器
- 计算 寄存器值 + 1
- 写回 结果到 counter
如果两个线程同时执行,可能出现:
- 线程A读取 counter = 0
- 线程B读取 counter = 0
- 线程A计算并写回 counter = 1
- 线程B计算并写回 counter = 1
结果:期望值是2,实际值是1!
🔧 C++中的原子操作
std::atomic 基础用法
#include <atomic>
#include <thread>
#include <iostream>
#include <vector>class ThreadSafeCounter {
private:std::atomic<int> count{0}; // ✅ 原子整数public:void increment() {count++; // ✅ 原子递增操作}void decrement() {count--; // ✅ 原子递减操作}int get() const {return count.load(); // ✅ 原子读取}void set(int value) {count.store(value); // ✅ 原子写入}// 比较并交换bool compareAndSwap(int expected, int newValue) {return count.compare_exchange_weak(expected, newValue);}
};// 使用示例
void testAtomicCounter() {ThreadSafeCounter counter;std::vector<std::thread> threads;// 创建10个线程,每个线程递增1000次for (int i = 0; i < 10; ++i) {threads.emplace_back([&counter]() {for (int j = 0; j < 1000; ++j) {counter.increment();}});}// 等待所有线程完成for (auto& t : threads) {t.join();}std::cout << "Final count: " << counter.get() << std::endl; // 输出:10000
}
🎯 常用的原子操作类型
1. 基本算术操作
std::atomic<int> atomicInt{10};// 原子递增/递减
int oldValue = atomicInt++; // 后置递增,返回旧值
int newValue = ++atomicInt; // 前置递增,返回新值
atomicInt += 5; // 原子加法
atomicInt -= 3; // 原子减法
atomicInt *= 2; // 原子乘法
atomicInt /= 2; // 原子除法// fetch_xxx 系列(返回操作前的值)
int oldVal = atomicInt.fetch_add(10); // 加10,返回旧值
int oldVal2 = atomicInt.fetch_sub(5); // 减5,返回旧值
2. 比较并交换 (CAS - Compare And Swap)
std::atomic<int> atomicValue{5};int expected = 5;
int newValue = 10;// weak版本:可能会因为假失败而返回false
if (atomicValue.compare_exchange_weak(expected, newValue)) {std::cout << "Successfully changed from 5 to 10" << std::endl;
}// strong版本:只有在真的不相等时才返回false
expected = 10;
int anotherValue = 20;
if (atomicValue.compare_exchange_strong(expected, anotherValue)) {std::cout << "Successfully changed from 10 to 20" << std::endl;
} else {std::cout << "Current value is: " << expected << std::endl; // expected被更新为实际值
}
3. 内存序(Memory Ordering)
std::atomic<bool> ready{false};
std::atomic<int> data{0};// 生产者线程
void producer() {data.store(42, std::memory_order_relaxed); // 设置数据ready.store(true, std::memory_order_release); // 发布信号
}// 消费者线程
void consumer() {while (!ready.load(std::memory_order_acquire)) { // 等待信号// 自旋等待}std::cout << "Data: " << data.load(std::memory_order_relaxed) << std::endl; // 读取数据
}
📋 实际应用场景
场景1:线程安全的单例模式
class Singleton {
private:static std::atomic<Singleton*> instance;static std::mutex mutex;Singleton() = default;public:static Singleton* getInstance() {Singleton* tmp = instance.load(std::memory_order_acquire);if (tmp == nullptr) {std::lock_guard<std::mutex> lock(mutex);tmp = instance.load(std::memory_order_relaxed);if (tmp == nullptr) {tmp = new Singleton;instance.store(tmp, std::memory_order_release);}}return tmp;}
};std::atomic<Singleton*> Singleton::instance{nullptr};
std::mutex Singleton::mutex;
场景2:无锁队列(简化版)
template<typename T>
class LockFreeQueue {
private:struct Node {std::atomic<T*> data{nullptr};std::atomic<Node*> next{nullptr};};std::atomic<Node*> head{new Node};std::atomic<Node*> tail{head.load()};public:void enqueue(T item) {Node* newNode = new Node;T* data = new T(std::move(item));newNode->data.store(data);Node* prevTail = tail.exchange(newNode);prevTail->next.store(newNode);}bool dequeue(T& result) {Node* head_node = head.load();Node* next = head_node->next.load();if (next == nullptr) {return false; // 队列为空}T* data = next->data.load();if (data == nullptr) {return false;}result = *data;delete data;head.store(next);delete head_node;return true;}
};
场景3:线程池中的任务计数
class ThreadPool {
private:std::atomic<size_t> activeTasks{0};std::atomic<bool> shutdown{false};std::vector<std::thread> workers;public:void submitTask(std::function<void()> task) {if (shutdown.load()) return;activeTasks.fetch_add(1); // 原子递增任务计数// 提交任务给工作线程...auto wrappedTask = [this, task]() {try {task();} catch (...) {// 处理异常}activeTasks.fetch_sub(1); // 任务完成,原子递减};// 将 wrappedTask 放入队列...}void waitForAllTasks() {while (activeTasks.load() > 0) {std::this_thread::sleep_for(std::chrono::milliseconds(1));}}size_t getActiveTaskCount() const {return activeTasks.load();}
};
场景4:状态标志管理
class NetworkConnection {
private:enum State { DISCONNECTED, CONNECTING, CONNECTED, ERROR };std::atomic<State> connectionState{DISCONNECTED};std::atomic<bool> shouldStop{false};public:void connect() {State expected = DISCONNECTED;if (connectionState.compare_exchange_strong(expected, CONNECTING)) {// 执行连接逻辑...if (/* 连接成功 */) {connectionState.store(CONNECTED);} else {connectionState.store(ERROR);}}}void disconnect() {shouldStop.store(true);connectionState.store(DISCONNECTED);}bool isConnected() const {return connectionState.load() == CONNECTED;}bool shouldStopOperation() const {return shouldStop.load();}
};
🎯 Qt中的原子操作
Qt提供了自己的原子操作类:
#include <QAtomicInt>
#include <QAtomicPointer>class QtAtomicExample {
private:QAtomicInt counter{0};QAtomicPointer<QString> message{nullptr};public:void increment() {counter.fetchAndAddAcquire(1);}int getCount() const {return counter.loadAcquire();}void setMessage(QString* msg) {QString* old = message.fetchAndStoreRelease(msg);delete old; // 安全删除旧消息}QString* getMessage() const {return message.loadAcquire();}
};
⚡ 性能考虑
原子操作 vs 互斥锁
#include <chrono>
#include <mutex>// 性能测试:原子操作 vs 互斥锁
class PerformanceTest {
private:std::atomic<long long> atomicCounter{0};long long mutexCounter{0};std::mutex mtx;public:void testAtomic(int iterations) {auto start = std::chrono::high_resolution_clock::now();for (int i = 0; i < iterations; ++i) {atomicCounter.fetch_add(1);}auto end = std::chrono::high_resolution_clock::now();auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start);std::cout << "Atomic operations: " << duration.count() << " μs" << std::endl;}void testMutex(int iterations) {auto start = std::chrono::high_resolution_clock::now();for (int i = 0; i < iterations; ++i) {std::lock_guard<std::mutex> lock(mtx);++mutexCounter;}auto end = std::chrono::high_resolution_clock::now();auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start);std::cout << "Mutex operations: " << duration.count() << " μs" << std::endl;}
};
📝 最佳实践
1. 选择合适的内存序
// 对于简单的计数器,relaxed足够
std::atomic<int> counter{0};
counter.fetch_add(1, std::memory_order_relaxed);// 对于同步操作,使用acquire-release
std::atomic<bool> ready{false};
ready.store(true, std::memory_order_release); // 生产者
while (!ready.load(std::memory_order_acquire)) {} // 消费者
2. 避免ABA问题
// 使用版本号避免ABA问题
struct VersionedPointer {void* pointer;size_t version;
};std::atomic<VersionedPointer> versionedPtr;bool safeCompareExchange(void* expected, void* newPtr) {VersionedPointer current = versionedPtr.load();if (current.pointer == expected) {VersionedPointer newValue = {newPtr, current.version + 1};return versionedPtr.compare_exchange_weak(current, newValue);}return false;
}
3. 合理使用原子操作
// ✅ 适合使用原子操作的场景
std::atomic<bool> isReady{false}; // 简单标志
std::atomic<int> refCount{0}; // 引用计数
std::atomic<size_t> taskCount{0}; // 任务计数// ❌ 不适合使用原子操作的场景
// 复杂的数据结构更新应该使用锁
struct ComplexData {int value1;int value2;std::string description;
};
// 对整个结构的更新应该用mutex保护,而不是让每个字段都是原子的
原子操作的常见误区
🚫 误区一:万能论
错误认知:认为使用了原子变量就能解决所有并发问题。
实际情况:原子操作只保证单个操作的原子性,不能保证多个操作组合的原子性。比如"检查然后修改"的逻辑,即使每步都是原子的,整体仍可能出现竞态条件。
🚫 误区二:忽视内存重排序
错误认知:以为原子操作自动解决了内存可见性问题。
实际情况:编译器和处理器可能会重排序指令。如果不指定合适的内存序(memory ordering),普通变量的读写可能被重排到原子操作之后,导致数据不一致。
🚫 误区三:过度使用
错误认知:为了"安全",把所有变量都设为原子类型。
实际情况:原子操作有性能开销,且多个原子变量之间的操作仍需要额外同步。应该在真正需要无锁访问的场景才使用,复杂状态更新还是用锁更合适。
🚫 误区四:误解CAS行为
错误认知:以为compare_exchange失败后,期望值保持不变。
实际情况:CAS失败时,期望值会被更新为实际值。这是设计特性,方便重试,但很多人忽视了这点,可能导致逻辑错误。
🚫 误区五:ABA问题盲区
错误认知:CAS操作天然避免了竞态条件。
实际情况:ABA问题指值从A变为B再变回A,CAS会误认为没有变化。在指针操作或资源回收场景中特别危险,可能导致访问已释放的内存。
🚫 误区六:性能万能论
错误认知:原子操作总比锁快。
实际情况:在高竞争场景下,原子操作的CAS重试可能比锁更慢。原子操作适合低到中度竞争的场景,极高竞争时锁或其他策略可能更优。
🚫 误区七:混淆原子性和一致性
错误认知:原子变量的所有操作都能保证数据一致性。
实际情况:原子性只保证操作不被中断,不等于逻辑一致性。多个相关原子变量的更新仍可能被其他线程看到中间状态,需要额外的同步机制。
💡 核心要点
原子操作是强大的工具,但不是银弹。正确使用需要理解其局限性,结合具体场景选择合适的并发控制策略。简单的计数、标志位适合原子操作,复杂的业务逻辑仍需要锁或其他高级同步机制。
🎉 总结
原子操作是多线程编程中的重要工具,它提供了:
- 无锁编程的基础
- 高性能的线程同步机制
- 内存安全的数据访问方式
适用场景:
- 简单的计数器和标志位
- 无锁数据结构
- 性能敏感的同步操作
- 引用计数和状态管理
注意事项:
- 不是万能解决方案,复杂逻辑仍需要锁
- 需要理解内存序的概念
- 要考虑ABA问题等边界情况
合理使用原子操作,可以让你的多线程程序既高效又安全!