当前位置: 首页 > news >正文

concurrentqueue:一个高并发高性能的C++无锁队列

目录

1.并发队列介绍

2.concurrentqueue简介  

3.concurrentqueue示例

4.其它的并发队列

4.1. Intel TBB::concurrent_queue(并行编程库集成)

4.2.Boost.Lockfree::queue(Boost 生态,轻量级无锁)

4.3.其它库

4.4.库的选择建议

5.总结


1.并发队列介绍

        在多线程编程中,concurrentqueue(并发队列)是一种支持多线程安全访问的队列数据结构,主要用于解决多个线程同时进行 “入队(enqueue)” 和 “出队(dequeue)” 操作时的数据竞争问题,确保线程安全和数据一致性。

        并发队列是多线程环境中线程间通信的核心组件,典型场景包括:

  • 生产者 - 消费者模型:生产者线程往队列中写入数据,消费者线程从队列中读取数据,通过队列协调线程间的数据传递。
  • 任务调度:线程池中的任务队列,多个工作线程从队列中获取任务执行。

实现方式

并发队列的实现需保证 “线程安全”,常见实现方式有两种:

1)基于锁的实现(Lock-based)

通过互斥锁(mutex)保证同一时间只有一个线程操作队列,配合条件变量(condition_variable)处理 “队列空时消费者等待” 或 “队列满时生产者等待” 的场景。

优点:实现简单,逻辑清晰,适合大多数场景。
缺点:高并发下锁竞争可能导致性能瓶颈。

C++ 示例(简单的有界并发队列):

#include <queue>
#include <mutex>
#include <condition_variable>
#include <stdexcept>template <typename T>
class BoundedConcurrentQueue {
private:std::queue<T> queue_;mutable std::mutex mutex_;       // 保护队列的互斥锁std::condition_variable not_full_;  // 队列不满时通知生产者std::condition_variable not_empty_; // 队列非空时通知消费者size_t max_size_;                // 队列最大容量(有界)public:explicit BoundedConcurrentQueue(size_t max_size) : max_size_(max_size) {}// 入队(阻塞,直到队列有空间)void enqueue(const T& item) {std::unique_lock<std::mutex> lock(mutex_);// 等待队列不满not_full_.wait(lock, [this] { return queue_.size() < max_size_; });queue_.push(item);not_empty_.notify_one();  // 通知消费者队列非空}// 出队(阻塞,直到队列有元素)T dequeue() {std::unique_lock<std::mutex> lock(mutex_);// 等待队列非空not_empty_.wait(lock, [this] { return !queue_.empty(); });T item = queue_.front();queue_.pop();not_full_.notify_one();   // 通知生产者队列有空间return item;}// 尝试入队(非阻塞,失败返回false)bool try_enqueue(const T& item) {std::lock_guard<std::mutex> lock(mutex_);if (queue_.size() >= max_size_) {return false;}queue_.push(item);not_empty_.notify_one();return true;}// 尝试出队(非阻塞,失败返回false)bool try_dequeue(T& item) {std::lock_guard<std::mutex> lock(mutex_);if (queue_.empty()) {return false;}item = queue_.front();queue_.pop();not_full_.notify_one();return true;}size_t size() const {std::lock_guard<std::mutex> lock(mutex_);return queue_.size();}
};

2)无锁实现(Lock-free)

不依赖锁,而是通过原子操作(atomic operations)(如std::atomic)和CAS(Compare-And-Swap) 机制实现线程安全。核心是通过原子指令保证操作的原子性,避免锁竞争。

优点:高并发下性能更好(无锁开销),适合对性能要求极高的场景(如高频交易、实时数据处理)。
缺点:实现复杂,需处理 “ABA 问题”“内存顺序” 等细节,调试难度大。

实现关键点

  • 用原子指针管理队列节点(如链表节点)。
  • 通过 CAS 操作保证节点插入 / 删除的原子性。
  • 处理多生产者 / 多消费者场景下的并发冲突。

2.concurrentqueue简介  

        ConcurrentQueue 是一个线程安全的先进先出(FIFO)集合,允许多个生产者线程和多个消费者线程同时操作。它基于 C++11 的特性实现,完全无锁(lock-free),这意味着它通过原子操作来保证线程安全,而不是依赖传统的互斥锁。这种设计不仅提高了性能,还减少了锁带来的开销。

    主要特性有:

  • 高性能ConcurrentQueue 是一个高性能的并发队列,特别是在多线程环境下表现优异。它支持快速的批量操作,例如批量入队和批量出队,这些操作的速度甚至可以接近非并发队列。
  • 单头文件实现:整个队列的实现仅包含在一个头文件中,这使得它非常易于集成到任何项目中。
  • 完全线程安全ConcurrentQueue 是一个完全无锁的队列,可以在任意数量的线程中安全地使用。
  • 支持 C++11 的移动语义:在可能的情况下,队列会移动元素而不是复制它们,从而提高效率。
  • 灵活的内存管理:内存可以一次性分配,也可以根据需要动态分配。
  • 无限制的元素类型和数量:队列对元素类型或最大数量没有人为限制。
  • 支持数据类型:支持任意类型元素(包括非可复制、非可移动类型)。
  • 可移植性:跨平台(Windows、Linux、macOS),仅依赖 C++11 标准库。
  • 支持模式:提供阻塞和非阻塞两种操作模式。
  • 异常安全:即使在并发操作中,队列也能保证异常安全。
  • 批量操作:队列支持批量入队和出队操作,这可以显著减少操作的开销。
  • 自定义特性:可以通过自定义特性模板参数来调整队列的行为,例如内存分配和块大小。
  • 无构造函数的类型出队:对于没有默认构造函数的类型,可以通过包装类来避免构造函数的调用。

 仓库地址:https://github.com/cameron314/concurrentqueue

无锁队列测试效果统计:A Fast General Purpose Lock-Free Queue for C++

3.concurrentqueue示例

#include "concurrentqueue.h"
#include <thread>
#include <iostream>
#include <vector>// 定义并发队列(存储int类型)
moodycamel::ConcurrentQueue<int> q;// 生产者线程:往队列中入队数据
void producer(int id, int count) {for (int i = 0; i < count; ++i) {q.enqueue(i);  // 非阻塞入队(成功返回true,失败返回false)// 也可使用enqueue_with_allocator(自定义内存分配)}std::cout << "Producer " << id << " finished\n";
}// 消费者线程:从队列中出队数据
void consumer(int id) {int item;size_t count = 0;// 非阻塞尝试出队,直到队列空且所有生产者完成(实际需配合结束标志)while (q.try_dequeue(item)) {++count;// 处理数据(示例中仅计数)}std::cout << "Consumer " << id << " processed " << count << " items\n";
}int main() {// 启动2个生产者,每个生产1000个数据std::thread p1(producer, 1, 1000);std::thread p2(producer, 2, 1000);// 启动2个消费者std::thread c1(consumer, 1);std::thread c2(consumer, 2);// 等待生产者完成p1.join();p2.join();// 等待消费者处理完剩余数据(实际场景需确保所有数据被消费)c1.join();c2.join();return 0;
}

编译方式:只需包含头文件(concurrentqueue.hblockingconcurrentqueue.h),无需链接额外库,直接编译即可。

4.其它的并发队列

4.1. Intel TBB::concurrent_queue(并行编程库集成)

        Intel TBB(Threading Building Blocks)库中的并发队列,属于 TBB 并行编程框架的一部分,支持多生产者 - 多消费者模型,实现成熟稳定。

      特点:

  • 基于锁的实现(但经过高度优化),兼顾易用性和性能;
  • 支持 “阻塞出队”(wait_and_pop)和 “非阻塞出队”(try_pop);
  • 与 TBB 的其他组件(如线程池、并行算法)无缝集成;
  • 适合需要整体并行框架支持的场景。

仓库地址:https://github.com/oneapi-src/oneTBB(TBB 已并入 Intel oneAPI)

基本用法示例:

#include <tbb/concurrent_queue.h>
#include <thread>
#include <iostream>tbb::concurrent_queue<int> q;void producer(int id, int count) {for (int i = 0; i < count; ++i) {q.push(i);  // 入队(非阻塞,总是成功,内部动态扩容)}std::cout << "Producer " << id << " finished\n";
}void consumer(int id) {int item;size_t count = 0;// 非阻塞尝试出队while (q.try_pop(item)) {++count;}std::cout << "Consumer " << id << " processed " << count << " items\n";
}int main() {std::thread p1(producer, 1, 1000);std::thread p2(producer, 2, 1000);std::thread c1(consumer, 1);std::thread c2(consumer, 2);p1.join();p2.join();// 等待队列空(TBB无内置阻塞等待,需手动处理或用额外同步)while (!q.empty()) {}c1.join();c2.join();return 0;
}

编译方式:需安装 TBB 库,编译时链接 TBB(如-ltbb)。

4.2.Boost.Lockfree::queue(Boost 生态,轻量级无锁)

        Boost 库中的无锁队列组件,属于boost.lockfree模块,支持单生产者 - 单消费者(SPSC)或多生产者 - 多消费者(MPMC)模式,适合对轻量级和 Boost 生态依赖的场景。

        特点有:

  • 无锁实现,支持固定大小(有界)队列;
  • 接口简洁,与 Boost 其他组件兼容;
  • 需注意:MPMC 模式下性能略逊于 moodycamel,但稳定性高。

文档地址:Chapter 19. Boost.Lockfree

基本用法示例:

#include <boost/lockfree/queue.hpp>
#include <thread>
#include <iostream>// 定义有界队列(容量1024,int类型)
boost::lockfree::queue<int> q(1024);void producer(int id, int count) {for (int i = 0; i < count; ++i) {// 非阻塞入队(队列满时返回false,需循环尝试)while (!q.push(i)) {}}std::cout << "Producer " << id << " finished\n";
}void consumer(int id) {int item;size_t count = 0;// 非阻塞出队while (q.pop(item)) {++count;}std::cout << "Consumer " << id << " processed " << count << " items\n";
}int main() {std::thread p1(producer, 1, 1000);std::thread p2(producer, 2, 1000);std::thread c1(consumer, 1);std::thread c2(consumer, 2);p1.join();p2.join();// 等待队列空while (!q.empty()) {}c1.join();c2.join();return 0;
}

编译方式:需安装 Boost 库,编译时链接boost_system等(如-lboost_system)。

4.3.其它库

  • folly::ConcurrentQueue:Facebook 开源的folly库中的并发队列,无锁实现,性能优异,但依赖folly庞大的生态,适合大型项目。

Folly,一个强大的C++库_folly库-CSDN博客

  • QPix::ConcurrentQueue:轻量级实现,支持阻塞 / 非阻塞操作,接口简洁,适合嵌入式或资源受限场景。

4.4.库的选择建议

场景需求推荐库理由
高性能、多生产者多消费者moodycamel::ConcurrentQueue无锁设计,性能顶尖,无额外依赖
已使用 TBB 并行框架tbb::concurrent_queue与 TBB 组件无缝集成,开发效率高
依赖 Boost 生态、追求稳定性boost::lockfree::queue经过长期验证,兼容性好
大型项目、需要丰富工具链支持folly::ConcurrentQueue功能全面,适合 Facebook 生态项目

5.总结

        在 C++ 中,虽然有一些标准库和第三方库提供了并发队列的实现,但它们往往存在一些限制。例如,Boost 的并发队列对对象的赋值运算符和析构函数有严格要求,而 Intel 的 TBB 队列则不是无锁的。ConcurrentQueue 不仅限制更少,而且经过了良好的测试,提供了更高级的特性。

        C++ 并发队列库各有侧重,其中moodycamel::ConcurrentQueue以其无锁高性能、无依赖、易用性成为多数场景下的首选。实际开发中,应根据项目的并发规模、依赖限制、性能需求选择合适的库,避免重复实现线程安全逻辑。

http://www.dtcms.com/a/318929.html

相关文章:

  • Oracle exp imp expdp impdp 命令详解
  • 无人机光伏巡检效率提升68%!陌讯动态融合算法实战解析
  • 模拟-38.外观数列-力扣(LeetCode)
  • 成就非凡:如何识别并服务那些注定成功的软件客户-优雅草卓伊凡
  • 正向矩阵(DCT)变换后还是一个矩阵,怎么减少存储空间
  • 软件加密工具-DSProtector使用说明
  • Adobe最新+MAC系统+系统+教程 软件+课程
  • 计算机视觉(opencv)——图像本质、数字矩阵、RGB + 基本操作(实战一)
  • 酉矩阵(Unitary Matrix)和随机矩阵
  • 在开发板上画出一个2048棋盘的矩阵
  • Jenkins全链路教程——Jenkins用户权限矩阵配置
  • 什么是键值缓存?让 LLM 闪电般快速
  • 面向远程智能终端的超低延迟RTSP|RTMP视频SDK架构与实践指南
  • 动手学深度学习(pytorch版):第一节——引言
  • web前端结合Microsoft Office Online 在线预览,vue实现(PPT、Word、Excel、PDF等)
  • 美食广场: 城市胃的便利店
  • JAVA,Maven继承
  • 开源大模型实战:GPT-OSS本地部署与全面测评
  • Postman接口测试详解
  • SpringBoot微头条实战项目
  • OpenCV入门:图像处理基础教程
  • 【题解】洛谷P3768 简单的数学题[杜教筛]+两种欧反公式解析
  • UDP网络编程chat
  • CompletableFuture的基础用法介绍
  • 技术优势铸就行业标杆:物联网边缘计算网关凭何引领智能变革?
  • 施耐德 Easy Altivar ATV310 变频器:高效电机控制的理想选择(含快速调试步骤及常见故障代码)
  • Flutter 局部刷新方案对比:ValueListenableBuilder vs. GetBuilder vs. Obx
  • 齐护机器人小智AI_MCP图形化编程控制Arduino_ESP32
  • 亚远景-ISO 42001:汽车AI安全的行业标准新趋势
  • 网站 博客遭遇DDoS,CC攻击应该怎样应对?