C++ —— 无锁队列
什么是无锁队列?
无锁队列是一种并发数据结构,允许多个线程同时进行入队和出队操作,而不需要使用传统的互斥锁来保护共享数据。
核心特征:
- 非阻塞:一个线程的失败或挂起不会导致其他线程阻塞
- 基于原子操作:使用CAS(Compare-And-Swap)等原子指令
- 避免锁竞争:减少线程上下文切换和等待时间
无锁队列的优势
- 高性能:在高并发场景下,避免了锁竞争带来的性能瓶颈
- 可扩展性:线程数量增加时,性能下降较缓慢
- 避免死锁:没有锁,自然避免了死锁问题
- 实时性:适合实时系统,操作有确定的时间上限
关键实现要点
- CAS操作:使用
compare_exchange_weak
确保原子性 - 内存序:默认使用
memory_order_seq_cst
,可根据需要优化 - ABA问题:通过指针和计数器组合解决
- 内存回收:生产环境需使用危险指针或引用计数
适用场景
- 高并发消息传递系统
- 实时数据处理
- 高性能计算
- 游戏服务器等低延迟场景
注意事项
无锁编程虽然性能优越,但实现复杂且容易出错。在实际项目中,建议优先使用成熟的并发库(如Intel TBB
、Boost.Lockfree
等),而非自行实现。
C++代码实现
#include <iostream>
#include <atomic>
#include <memory>
#include <thread>template <typename T>
class LockFreeQueue {
private:struct Node {std::shared_ptr<T> data;std::atomic<Node *> next;Node() : next(nullptr) {}Node(T value) : data(std::make_shared<T>(std::move(value))), next(nullptr) {}};std::atomic<Node*> head;std::atomic<Node*> tail;public:LockFreeQueue() {Node *dummy = new Node();head.store(dummy);tail.store(dummy);}~LockFreeQueue() {while(Node* old_head = head.load()) {head.store(old_head->next.load());delete old_head;}}// 禁止拷贝和赋值LockFreeQueue(const LockFreeQueue&) = delete;LockFreeQueue& operator=(const LockFreeQueue&) = delete;// 入队void enqueue(T value) {Node *new_node = new Node(std::move(value));while(true) {Node *old_tail = tail.load();Node *next = old_tail->next.load();// 检查tail是否仍指向我们看到的节点(其他线程可能在这个时候操作tail)if(old_tail == tail.load()) {// 如果tail是最后一个节点if(next == nullptr) {// 尝试将新节点连接到链表末尾if(old_tail->next.compare_exchange_weak(next, new_node)) {tail.compare_exchange_weak(old_tail, new_node);return;}} else {// tail不是最后一个节点,将其往后移动// 采用weak版本配合循环使用,性能更好tail.compare_exchange_weak(old_tail, next);}} }}// 出队bool dequeue(std::shared_ptr<T>& result) {while(true) {Node *old_head = head.load();Node *old_tail = tail.load();Node *next = old_head->next.load();// 检查一致性if(old_head == head.load()) {// 队列为空或者tail需要更新if(old_head == old_tail) {if(next == nullptr) {// 队列为空return false;}// tail不在末尾tail.compare_exchange_weak(old_tail, next);} else {// 读取数据result = next->data;// 尝试移动head指针if(head.compare_exchange_weak(old_head, next)) {delete old_head;return true;}}}}}bool empty() const {return head.load()->next.load() == nullptr;}
};int main()
{LockFreeQueue<int> queue;// 生产者auto producer = [&queue]() {for(int i = 0; i < 10; ++i) {queue.enqueue(i);}};// 消费者auto consumer = [&queue]() {std::shared_ptr<int> value;int count = 0;while(count < 10) {if(queue.dequeue(value)) {std::cout << *value << " ";++count;}}std::cout << std::endl;};std::thread t1(producer);std::thread t2(consumer);t1.join();t2.join();return 0;
}
参考:DeepSeek