C++手撕无锁线程池
背景
在实现自己的线程池的时候,小H发现需要频繁加锁、解锁,这中间是否有太多的时间消耗掉呢?是否可以实现一个无锁的线程池来提高性能,带着这些疑问,小H开始学习相关内容。
本文代码放置在该仓库(futureseek/Cpp_learn: learn some model about C++)的Infrastructure_Components目录下。
首先要说明的是,无锁≠无竞争,同样要做好同步等管理,否则就会出现各种并发上的问题。从下表也可以简单看到两者的区别。
维度 | 基于 mutex 的线程池 | 无锁线程池(本文) |
---|---|---|
竞争策略 | 内核阻塞 + 上下文切换 | 用户态自旋 + 退避 |
最坏延迟 | 毫秒级阻塞 | 微秒级忙等 |
吞吐 | 低 | 高(CPU 密集场景) |
在普通的mutex线程池中,我们可能只需要一个简单的vector来管理对象,但是这里我们要引入一个MPMC(多消费者多生产者)队列来负责承载我们的任务。
Vyukov MPMC queue
Vyukov MPMC queue 是一种由 Dmitry Vyukov 设计的多生产者多消费者(Multiple Producers, Multiple Consumers,MPMC)队列。以下是对它的简单说明(来自豆包):
- 基本概念:它是一种用于多线程编程的队列数据结构,允许多个生产者线程同时向队列中添加数据,多个消费者线程同时从队列中移除数据,并且能够保证线程安全。
- 实现特点
- 基于数组:通常采用基于数组的数据结构,这种结构可以提供连续的内存布局,有利于提高缓存利用率和访问速度。
- 无锁算法:虽然不是严格意义上的无锁算法,但它通过原子读 - 修改 - 写(RMW)操作来实现,主要使用比较并交换(CAS)指令,极大地减少了锁的使用,从而降低了线程间同步的开销。每个入队或出队操作的成本通常为一个 CAS 指令,操作高效且具有可预测性。
- 固定容量:具有固定的容量,当队列空间不足时,入队操作将简单地失败,而不会尝试使用垃圾回收器(GC)来进行空间回收,这有助于减少因 GC 造成的性能延迟。
- 因果 FIFO 顺序:保证了因果关系的先入先出(FIFO)顺序,即数据项的处理顺序会保持其因果关系,这对于保持程序逻辑的正确性非常重要。
- 生产者和消费者隔离:生产者和消费者在逻辑上是隔离的,各自操作不同的数据集合,避免了不必要的数据竞争。
成员变量
首先我们先看到其应该有的成员变量。作为一个环形队列,我们需要来有一个队伍头还有一个队尾,用来标识生产和消费到的位置,同时内部提供一个类似的node的节点对应一个槽位,槽位中应该有这个槽位的数据以及标识符(用来复用的)。
struct Cell {std::atomic<size_t> seq;T data;};const size_t size_;const size_t mask_;std::unique_ptr<Cell[]> buffer_;alignas(64) std::atomic<size_t> head_{0}; // producer reservationalignas(64) std::atomic<size_t> tail_{0}; // consumer reservation
这里buffer使用了智能指针来进行管理,自动释放内存;
小知识:alignas:避免「伪共享」——head_
和 tail_
分别被生产者和消费者高频修改,单独占一个 64 字节缓存行,防止互相干扰导致的缓存失效,提升性能,这里可能对Cell本身也要做这种内存对齐的操作,否则也可能出现共享失效的问题。
工作流程
以「生产者入队」和「消费者出队」为例,看数据结构的协作:
1. 生产者 push
流程
-
生产者调用
head_.fetch_add(1)
原子抢占一个pos
(如pos=5
); -
计算槽位索引:
idx = pos & mask_
(如mask_=1023
时,idx=5
),找到对应的Cell
; -
循环检查
Cell.seq
:直到seq == pos
(确认槽位空闲); -
写入
Cell.data = std::move(item)
,然后将Cell.seq
设为pos+1
(标记为「可被消费者读取」); -
入队完成。
2. 消费者 try_pop
流程
-
消费者读取
tail_
得到pos
(如pos=5
); -
计算槽位索引
idx = pos & mask_
,找到对应的Cell
; -
检查
Cell.seq
:若seq == pos+1
(确认槽位有数据),则用compare_exchange_weak
原子抢占tail_
(将tail_
从pos
改为pos+1
); -
读取
Cell.data
到out
,然后将Cell.seq
设为pos+size_
(标记为「可复用」); -
出队完成。
template<typename T>
class MPMCQueue {struct Cell {std::atomic<size_t> seq;T data;};const size_t size_;const size_t mask_;std::unique_ptr<Cell[]> buffer_;alignas(64) std::atomic<size_t> head_{0}; // producer reservationalignas(64) std::atomic<size_t> tail_{0}; // consumer reservationpublic:explicit MPMCQueue(size_t n = 1024): size_(n), mask_(n - 1), buffer_(new Cell[n]) {assert((n & (n - 1)) == 0 && "size must be power of 2");for (size_t i = 0; i < size_; ++i) buffer_[i].seq.store(i, std::memory_order_relaxed);}// push always succeeds (blocks via spin until slot available)// returns true on successbool push(T item) {const size_t pos = head_.fetch_add(1, std::memory_order_acq_rel);Cell &cell = buffer_[pos & mask_];// wait until seq == pos (slot free)int spins = 0;while (true) {size_t seq = cell.seq.load(std::memory_order_acquire);intptr_t dif = (intptr_t)seq - (intptr_t)pos;if (dif == 0) break; // free to write++spins;if ((spins & 0x3f) == 0) std::this_thread::sleep_for(std::chrono::microseconds(50));else std::this_thread::yield();}cell.data = std::move(item);std::atomic_thread_fence(std::memory_order_release);cell.seq.store(pos + 1, std::memory_order_release); // make visiblereturn true;}// try_pop: non-blocking. returns true and sets out on success, false if empty or lost race.bool try_pop(T &out) {size_t pos = tail_.load(std::memory_order_relaxed);for (;;) {Cell &cell = buffer_[pos & mask_];size_t seq = cell.seq.load(std::memory_order_acquire);intptr_t dif = (intptr_t)seq - (intptr_t)(pos + 1);if (dif == 0) {// try to claim this slotif (tail_.compare_exchange_weak(pos, pos + 1, std::memory_order_acq_rel, std::memory_order_relaxed)) {out = std::move(cell.data);std::atomic_thread_fence(std::memory_order_release);cell.seq.store(pos + size_, std::memory_order_release); // mark slot reusablereturn true;}// CAS failed: pos updated with current tail, retry loop} else if (dif < 0) {// emptyreturn false;} else {// another consumer is ahead, advance local pospos = tail_.load(std::memory_order_relaxed);}}}// approx empty check (not 100% atomic for concurrent producers)bool empty() const {size_t h = head_.load(std::memory_order_acquire);size_t t = tail_.load(std::memory_order_acquire);return h == t;}// approximate size (may be slightly off under concurrency)size_t approximate_size() const {size_t h = head_.load(std::memory_order_acquire);size_t t = tail_.load(std::memory_order_acquire);return h - t;}
};
线程池
对于线程池的设计就是依赖无锁任务队列+多工作线程+原子状态来是实现同步,通过无锁结构减少线程间竞争,同时用原子变量和退避策略平衡性能与 CPU 占用。
成员变量
成员变量 | 类型 | 核心作用 |
---|---|---|
queue_ | MPMCQueue<Task> | 无锁多生产者多消费者队列,存储待执行的任务(Task 是 std::function<void()> ) |
workers_ | std::vector<std::thread> | 工作线程数组,每个线程运行 worker_loop 循环,从队列取任务执行 |
stop_ | std::atomic<bool> | 原子停止标记:控制线程池是否退出,避免多线程修改的竞争 |
active_ | std::atomic<int> | 原子活跃任务计数:记录当前正在执行的任务数,用于 wait 等待所有任务完成 |
工作流程
阶段 | 关键变量 | 说明 |
---|---|---|
构造 | workers_ 启动 | 所有线程跑 worker_loop() |
提交任务 | queue_.push() | 永不失败(阻塞自旋) |
等待完成 | wait() | active_ == 0 && queue_.empty() |
关闭 | stop_ = true | 线程发现空队列 + stop 后退出 |
work_loop
每个工作线程都在 worker_loop
中循环执行「取任务→执行任务」,同时处理「无任务时的退避策略」和「停机信号」,是线程池的 “执行大脑”。逻辑拆解如下:
-
取任务与执行:
-
先调用
queue_.try_pop(task)
非阻塞取任务,取到则更新active_
计数,执行任务(捕获所有异常避免线程崩溃); -
任务执行完后再次进入循环,持续取任务。
-
-
无任务时的退避策略(避免 CPU 空转):
-
若没取到任务,先检查「是否需要停机」(
stop_ == true
且队列空),是则退出循环; -
否则进入「自旋退避」:前 32 次循环(
spin_limit=32
)调用std::this_thread::yield()
让出 CPU 给其他线程,同时重试取任务; -
若自旋 32 次仍没任务,进入「深度睡眠」(
sleep_for(50us)
),进一步减少 CPU 占用(避免线程池空闲时浪费资源)。
-
-
异常安全:任务执行用
try-catch
捕获所有异常,防止单个任务抛异常导致整个工作线程退出。
其他部分就是简单的提交或者拿取任务,不做赘述。
测试
给出测试代码,从基础测试、竞争测试、并发压力测试几个方面入手,代码在后文。
结语
在本次实现的过程中,出现了不少关于同步的时候出现的问题,一开始可能只打算使用原子变量,但是出现了ABA的问题,后面才又引入了自旋等待以及seq确认的操作,实现高性能的关键在于减少CPU的‘忙等’,但是又要避免出现同步上的问题,需要自行斟酌取舍。
代码上如果有问题可以和博主说明,博主也是新手来着。
代码
#ifndef CPP_LEARN_NOLOCKTHREADPOOL_H
#define CPP_LEARN_NOLOCKTHREADPOOL_H#include <atomic>
#include <memory>
#include <iostream>
#include <functional>
#include <thread>
#include <vector>
#include <chrono>
#include <cassert>namespace NoLockThreadPool {// Vyukov MPMC queue (bounded, capacity must be power of two)
template<typename T>
class MPMCQueue {struct Cell {std::atomic<size_t> seq;T data;};const size_t size_;const size_t mask_;std::unique_ptr<Cell[]> buffer_;alignas(64) std::atomic<size_t> head_{0}; // producer reservationalignas(64) std::atomic<size_t> tail_{0}; // consumer reservationpublic:explicit MPMCQueue(size_t n = 1024): size_(n), mask_(n - 1), buffer_(new Cell[n]) {assert((n & (n - 1)) == 0 && "size must be power of 2");for (size_t i = 0; i < size_; ++i) buffer_[i].seq.store(i, std::memory_order_relaxed);}// push always succeeds (blocks via spin until slot available)// returns true on successbool push(T item) {const size_t pos = head_.fetch_add(1, std::memory_order_acq_rel);Cell &cell = buffer_[pos & mask_];// wait until seq == pos (slot free)int spins = 0;while (true) {size_t seq = cell.seq.load(std::memory_order_acquire);intptr_t dif = (intptr_t)seq - (intptr_t)pos;if (dif == 0) break; // free to write++spins;if ((spins & 0x3f) == 0) std::this_thread::sleep_for(std::chrono::microseconds(50));else std::this_thread::yield();}cell.data = std::move(item);std::atomic_thread_fence(std::memory_order_release);cell.seq.store(pos + 1, std::memory_order_release); // make visiblereturn true;}// try_pop: non-blocking. returns true and sets out on success, false if empty or lost race.bool try_pop(T &out) {size_t pos = tail_.load(std::memory_order_relaxed);for (;;) {Cell &cell = buffer_[pos & mask_];size_t seq = cell.seq.load(std::memory_order_acquire);intptr_t dif = (intptr_t)seq - (intptr_t)(pos + 1);if (dif == 0) {// try to claim this slotif (tail_.compare_exchange_weak(pos, pos + 1, std::memory_order_acq_rel, std::memory_order_relaxed)) {out = std::move(cell.data);std::atomic_thread_fence(std::memory_order_release);cell.seq.store(pos + size_, std::memory_order_release); // mark slot reusablereturn true;}// CAS failed: pos updated with current tail, retry loop} else if (dif < 0) {// emptyreturn false;} else {// another consumer is ahead, advance local pospos = tail_.load(std::memory_order_relaxed);}}}// approx empty check (not 100% atomic for concurrent producers)bool empty() const {size_t h = head_.load(std::memory_order_acquire);size_t t = tail_.load(std::memory_order_acquire);return h == t;}// approximate size (may be slightly off under concurrency)size_t approximate_size() const {size_t h = head_.load(std::memory_order_acquire);size_t t = tail_.load(std::memory_order_acquire);return h - t;}
};// High-performance thread pool using the MPMCQueue.
// Workers use try_pop with exponential/backoff sleeping to reduce CPU burn.
class NoLockThreadPool {
public:using Task = std::function<void()>;explicit NoLockThreadPool(size_t threads = std::thread::hardware_concurrency(), size_t queue_capacity = 1024): queue_(queue_capacity), stop_(false), active_(0) {if (threads == 0) threads = 1;workers_.reserve(threads);for (size_t i = 0; i < threads; ++i) {workers_.emplace_back([this] { this->worker_loop(); });}}~NoLockThreadPool() {shutdown();}// submit a task, returns false if pool stoppedbool submit(Task t) {if (stop_.load(std::memory_order_acquire)) return false;return queue_.push(std::move(t));}// wait until all tasks processed (approx): no active tasks and queue emptyvoid wait() {while (true) {if (active_.load(std::memory_order_acquire) == 0 && queue_.empty()) break;std::this_thread::sleep_for(std::chrono::milliseconds(1));}}// graceful shutdown: signal and join workersvoid shutdown() {bool expected = false;if (!stop_.compare_exchange_strong(expected, true)) return;// join workers - workers will exit after observing stop_ and empty queuefor (auto &t : workers_) if (t.joinable()) t.join();workers_.clear();}private:void worker_loop() {Task task;// backoff parametersconst int spin_limit = 32;while (true) {if (queue_.try_pop(task)) {active_.fetch_add(1, std::memory_order_acq_rel);try { task(); } catch(...) {}active_.fetch_sub(1, std::memory_order_acq_rel);continue;}// no task foundif (stop_.load(std::memory_order_acquire) && queue_.empty()) break;// exponential backoffint spins = 0;while (spins < spin_limit) {++spins;if (queue_.try_pop(task)) break;std::this_thread::yield();}if (spins < spin_limit) {// got a taskactive_.fetch_add(1, std::memory_order_acq_rel);try { task(); } catch(...) {}active_.fetch_sub(1, std::memory_order_acq_rel);continue;}// deeper sleep to avoid burning CPU when idlestd::this_thread::sleep_for(std::chrono::microseconds(50));}}private:MPMCQueue<Task> queue_;std::vector<std::thread> workers_;std::atomic<bool> stop_;std::atomic<int> active_;
};///////////////////// 全面测试 /////////////////////// 测试wait()方法的竞态条件
inline void test_wait_race_condition() {std::cout << "=== 测试wait()竞态条件 ===\n";const int iterations = 10;int failed_count = 0;for (int iter = 0; iter < iterations; ++iter) {NoLockThreadPool pool(4, 128);std::atomic<int> counter{0};std::atomic<bool> submit_done{false};// 启动一个线程持续提交任务std::thread submitter([&]() {for (int i = 0; i < 100; ++i) {pool.submit([&counter]() {std::this_thread::sleep_for(std::chrono::microseconds(100));counter.fetch_add(1);});if (i == 50) {// 在中间时刻让主线程开始waitstd::this_thread::sleep_for(std::chrono::milliseconds(1));}}submit_done.store(true);});// 稍微延迟后开始waitstd::this_thread::sleep_for(std::chrono::milliseconds(5));pool.wait();submitter.join();pool.shutdown();if (counter.load() != 100) {std::cout << "第" << iter << "次迭代失败: 期望100个任务,实际" << counter.load() << "个\n";failed_count++;}}std::cout << "wait()竞态测试结果: " << (iterations - failed_count) << "/" << iterations << " 次成功\n";
}// 测试高并发压力
inline void test_high_concurrency() {std::cout << "=== 测试高并发压力 ===\n";const int num_threads = 8;const int tasks_per_thread = 1000;const int total_tasks = num_threads * tasks_per_thread;NoLockThreadPool pool(4, 512);std::atomic<int> counter{0};std::atomic<int> submitted{0};std::vector<std::thread> producers;auto start = std::chrono::high_resolution_clock::now();// 多个线程同时提交任务for (int t = 0; t < num_threads; ++t) {producers.emplace_back([&, t]() {for (int i = 0; i < tasks_per_thread; ++i) {bool success = pool.submit([&counter]() {counter.fetch_add(1);});if (success) {submitted.fetch_add(1);}}});}for (auto& p : producers) {p.join();}pool.wait();auto end = std::chrono::high_resolution_clock::now();auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);std::cout << "高并发测试结果:\n";std::cout << " 提交任务数: " << submitted.load() << "/" << total_tasks << "\n";std::cout << " 完成任务数: " << counter.load() << "/" << submitted.load() << "\n";std::cout << " 用时: " << duration.count() << "ms\n";pool.shutdown();
}// 简单的基础测试
inline void test_basic() {std::cout << "=== 基础功能测试 ===\n";NoLockThreadPool pool(4, 128);std::atomic<int> counter{0};const int task_count = 50;for (int i = 0; i < task_count; ++i) {int id = i;pool.submit([id, &counter]() {// 模拟一些工作std::this_thread::sleep_for(std::chrono::milliseconds(10));counter.fetch_add(1);});}pool.wait();std::cout << "完成任务数: " << counter.load() << "/" << task_count << "\n";pool.shutdown();std::cout << "=== 基础测试完成 ===\n";
}// 运行所有测试
inline void test() {test_basic();test_wait_race_condition();test_high_concurrency();std::cout << "\n=== 所有测试完成 ===\n";
}} // namespace NoLockThreadPool#endif //CPP_LEARN_NOLOCKTHREADPOOL_H