concurrentqueue:一个高并发高性能的C++无锁队列
目录
1.并发队列介绍
2.concurrentqueue简介
3.concurrentqueue示例
4.其它的并发队列
4.1. Intel TBB::concurrent_queue(并行编程库集成)
4.2.Boost.Lockfree::queue(Boost 生态,轻量级无锁)
4.3.其它库
4.4.库的选择建议
5.总结
1.并发队列介绍
在多线程编程中,concurrentqueue
(并发队列)是一种支持多线程安全访问的队列数据结构,主要用于解决多个线程同时进行 “入队(enqueue)” 和 “出队(dequeue)” 操作时的数据竞争问题,确保线程安全和数据一致性。
并发队列是多线程环境中线程间通信的核心组件,典型场景包括:
- 生产者 - 消费者模型:生产者线程往队列中写入数据,消费者线程从队列中读取数据,通过队列协调线程间的数据传递。
- 任务调度:线程池中的任务队列,多个工作线程从队列中获取任务执行。
实现方式
并发队列的实现需保证 “线程安全”,常见实现方式有两种:
1)基于锁的实现(Lock-based)
通过互斥锁(mutex
)保证同一时间只有一个线程操作队列,配合条件变量(condition_variable
)处理 “队列空时消费者等待” 或 “队列满时生产者等待” 的场景。
优点:实现简单,逻辑清晰,适合大多数场景。
缺点:高并发下锁竞争可能导致性能瓶颈。
C++ 示例(简单的有界并发队列):
#include <queue>
#include <mutex>
#include <condition_variable>
#include <stdexcept>template <typename T>
class BoundedConcurrentQueue {
private:std::queue<T> queue_;mutable std::mutex mutex_; // 保护队列的互斥锁std::condition_variable not_full_; // 队列不满时通知生产者std::condition_variable not_empty_; // 队列非空时通知消费者size_t max_size_; // 队列最大容量(有界)public:explicit BoundedConcurrentQueue(size_t max_size) : max_size_(max_size) {}// 入队(阻塞,直到队列有空间)void enqueue(const T& item) {std::unique_lock<std::mutex> lock(mutex_);// 等待队列不满not_full_.wait(lock, [this] { return queue_.size() < max_size_; });queue_.push(item);not_empty_.notify_one(); // 通知消费者队列非空}// 出队(阻塞,直到队列有元素)T dequeue() {std::unique_lock<std::mutex> lock(mutex_);// 等待队列非空not_empty_.wait(lock, [this] { return !queue_.empty(); });T item = queue_.front();queue_.pop();not_full_.notify_one(); // 通知生产者队列有空间return item;}// 尝试入队(非阻塞,失败返回false)bool try_enqueue(const T& item) {std::lock_guard<std::mutex> lock(mutex_);if (queue_.size() >= max_size_) {return false;}queue_.push(item);not_empty_.notify_one();return true;}// 尝试出队(非阻塞,失败返回false)bool try_dequeue(T& item) {std::lock_guard<std::mutex> lock(mutex_);if (queue_.empty()) {return false;}item = queue_.front();queue_.pop();not_full_.notify_one();return true;}size_t size() const {std::lock_guard<std::mutex> lock(mutex_);return queue_.size();}
};
2)无锁实现(Lock-free)
不依赖锁,而是通过原子操作(atomic operations)(如std::atomic
)和CAS(Compare-And-Swap) 机制实现线程安全。核心是通过原子指令保证操作的原子性,避免锁竞争。
优点:高并发下性能更好(无锁开销),适合对性能要求极高的场景(如高频交易、实时数据处理)。
缺点:实现复杂,需处理 “ABA 问题”“内存顺序” 等细节,调试难度大。
实现关键点:
- 用原子指针管理队列节点(如链表节点)。
- 通过 CAS 操作保证节点插入 / 删除的原子性。
- 处理多生产者 / 多消费者场景下的并发冲突。
2.concurrentqueue简介
ConcurrentQueue
是一个线程安全的先进先出(FIFO)集合,允许多个生产者线程和多个消费者线程同时操作。它基于 C++11 的特性实现,完全无锁(lock-free),这意味着它通过原子操作来保证线程安全,而不是依赖传统的互斥锁。这种设计不仅提高了性能,还减少了锁带来的开销。
主要特性有:
- 高性能:
ConcurrentQueue
是一个高性能的并发队列,特别是在多线程环境下表现优异。它支持快速的批量操作,例如批量入队和批量出队,这些操作的速度甚至可以接近非并发队列。 - 单头文件实现:整个队列的实现仅包含在一个头文件中,这使得它非常易于集成到任何项目中。
- 完全线程安全:
ConcurrentQueue
是一个完全无锁的队列,可以在任意数量的线程中安全地使用。 - 支持 C++11 的移动语义:在可能的情况下,队列会移动元素而不是复制它们,从而提高效率。
- 灵活的内存管理:内存可以一次性分配,也可以根据需要动态分配。
- 无限制的元素类型和数量:队列对元素类型或最大数量没有人为限制。
- 支持数据类型:支持任意类型元素(包括非可复制、非可移动类型)。
- 可移植性:跨平台(Windows、Linux、macOS),仅依赖 C++11 标准库。
- 支持模式:提供阻塞和非阻塞两种操作模式。
- 异常安全:即使在并发操作中,队列也能保证异常安全。
- 批量操作:队列支持批量入队和出队操作,这可以显著减少操作的开销。
- 自定义特性:可以通过自定义特性模板参数来调整队列的行为,例如内存分配和块大小。
- 无构造函数的类型出队:对于没有默认构造函数的类型,可以通过包装类来避免构造函数的调用。
仓库地址:https://github.com/cameron314/concurrentqueue
无锁队列测试效果统计:A Fast General Purpose Lock-Free Queue for C++
3.concurrentqueue示例
#include "concurrentqueue.h"
#include <thread>
#include <iostream>
#include <vector>// 定义并发队列(存储int类型)
moodycamel::ConcurrentQueue<int> q;// 生产者线程:往队列中入队数据
void producer(int id, int count) {for (int i = 0; i < count; ++i) {q.enqueue(i); // 非阻塞入队(成功返回true,失败返回false)// 也可使用enqueue_with_allocator(自定义内存分配)}std::cout << "Producer " << id << " finished\n";
}// 消费者线程:从队列中出队数据
void consumer(int id) {int item;size_t count = 0;// 非阻塞尝试出队,直到队列空且所有生产者完成(实际需配合结束标志)while (q.try_dequeue(item)) {++count;// 处理数据(示例中仅计数)}std::cout << "Consumer " << id << " processed " << count << " items\n";
}int main() {// 启动2个生产者,每个生产1000个数据std::thread p1(producer, 1, 1000);std::thread p2(producer, 2, 1000);// 启动2个消费者std::thread c1(consumer, 1);std::thread c2(consumer, 2);// 等待生产者完成p1.join();p2.join();// 等待消费者处理完剩余数据(实际场景需确保所有数据被消费)c1.join();c2.join();return 0;
}
编译方式:只需包含头文件(concurrentqueue.h
和blockingconcurrentqueue.h
),无需链接额外库,直接编译即可。
4.其它的并发队列
4.1. Intel TBB::concurrent_queue(并行编程库集成)
Intel TBB(Threading Building Blocks)库中的并发队列,属于 TBB 并行编程框架的一部分,支持多生产者 - 多消费者模型,实现成熟稳定。
特点:
- 基于锁的实现(但经过高度优化),兼顾易用性和性能;
- 支持 “阻塞出队”(
wait_and_pop
)和 “非阻塞出队”(try_pop
); - 与 TBB 的其他组件(如线程池、并行算法)无缝集成;
- 适合需要整体并行框架支持的场景。
仓库地址:https://github.com/oneapi-src/oneTBB(TBB 已并入 Intel oneAPI)
基本用法示例:
#include <tbb/concurrent_queue.h>
#include <thread>
#include <iostream>tbb::concurrent_queue<int> q;void producer(int id, int count) {for (int i = 0; i < count; ++i) {q.push(i); // 入队(非阻塞,总是成功,内部动态扩容)}std::cout << "Producer " << id << " finished\n";
}void consumer(int id) {int item;size_t count = 0;// 非阻塞尝试出队while (q.try_pop(item)) {++count;}std::cout << "Consumer " << id << " processed " << count << " items\n";
}int main() {std::thread p1(producer, 1, 1000);std::thread p2(producer, 2, 1000);std::thread c1(consumer, 1);std::thread c2(consumer, 2);p1.join();p2.join();// 等待队列空(TBB无内置阻塞等待,需手动处理或用额外同步)while (!q.empty()) {}c1.join();c2.join();return 0;
}
编译方式:需安装 TBB 库,编译时链接 TBB(如-ltbb
)。
4.2.Boost.Lockfree::queue(Boost 生态,轻量级无锁)
Boost 库中的无锁队列组件,属于boost.lockfree
模块,支持单生产者 - 单消费者(SPSC)或多生产者 - 多消费者(MPMC)模式,适合对轻量级和 Boost 生态依赖的场景。
特点有:
- 无锁实现,支持固定大小(有界)队列;
- 接口简洁,与 Boost 其他组件兼容;
- 需注意:MPMC 模式下性能略逊于 moodycamel,但稳定性高。
文档地址:Chapter 19. Boost.Lockfree
基本用法示例:
#include <boost/lockfree/queue.hpp>
#include <thread>
#include <iostream>// 定义有界队列(容量1024,int类型)
boost::lockfree::queue<int> q(1024);void producer(int id, int count) {for (int i = 0; i < count; ++i) {// 非阻塞入队(队列满时返回false,需循环尝试)while (!q.push(i)) {}}std::cout << "Producer " << id << " finished\n";
}void consumer(int id) {int item;size_t count = 0;// 非阻塞出队while (q.pop(item)) {++count;}std::cout << "Consumer " << id << " processed " << count << " items\n";
}int main() {std::thread p1(producer, 1, 1000);std::thread p2(producer, 2, 1000);std::thread c1(consumer, 1);std::thread c2(consumer, 2);p1.join();p2.join();// 等待队列空while (!q.empty()) {}c1.join();c2.join();return 0;
}
编译方式:需安装 Boost 库,编译时链接boost_system
等(如-lboost_system
)。
4.3.其它库
- folly::ConcurrentQueue:Facebook 开源的
folly
库中的并发队列,无锁实现,性能优异,但依赖folly
庞大的生态,适合大型项目。
Folly,一个强大的C++库_folly库-CSDN博客
- QPix::ConcurrentQueue:轻量级实现,支持阻塞 / 非阻塞操作,接口简洁,适合嵌入式或资源受限场景。
4.4.库的选择建议
场景需求 | 推荐库 | 理由 |
---|---|---|
高性能、多生产者多消费者 | moodycamel::ConcurrentQueue | 无锁设计,性能顶尖,无额外依赖 |
已使用 TBB 并行框架 | tbb::concurrent_queue | 与 TBB 组件无缝集成,开发效率高 |
依赖 Boost 生态、追求稳定性 | boost::lockfree::queue | 经过长期验证,兼容性好 |
大型项目、需要丰富工具链支持 | folly::ConcurrentQueue | 功能全面,适合 Facebook 生态项目 |
5.总结
在 C++ 中,虽然有一些标准库和第三方库提供了并发队列的实现,但它们往往存在一些限制。例如,Boost 的并发队列对对象的赋值运算符和析构函数有严格要求,而 Intel 的 TBB 队列则不是无锁的。ConcurrentQueue
不仅限制更少,而且经过了良好的测试,提供了更高级的特性。
C++ 并发队列库各有侧重,其中moodycamel::ConcurrentQueue以其无锁高性能、无依赖、易用性成为多数场景下的首选。实际开发中,应根据项目的并发规模、依赖限制、性能需求选择合适的库,避免重复实现线程安全逻辑。