c++线程池的实现
目录
一、问题引出
二、优化建议与改进方案
1. 基础加固
2. 功能增强
3. 性能优化
三、完整优化代码示例
四、关键优化点总结
1.安全性增强:
2.功能扩展:
3.性能优化:
4.工业级特性:
五、测试验证建议
一、问题引出
在阿Q技术站看到一个c++线程池的实现。该线程池还有可以改进的地方。
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <iostream>
class ThreadPool{
public:
ThreadPool(size_t num_threads) : stop(false) {
for(size_t i = 0; i < num_threads; ++i) {
workers.emplace_back([this] {
while(true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); });
if(this->stop && this->tasks.empty())
return;
task = std::move(this->tasks.front());
this->tasks.pop();
}
task();
}
});
}
}
template<class F>
void enqueue(F&& f) {
{
std::unique_lock<std::mutex> lock(queue_mutex);
tasks.emplace(std::forward<F>(f));
}
condition.notify_one();
}
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for(std::thread &worker : workers)
worker.join();
}
private:
std::vector<std::thread> workers;
std::queue<std::function<void()>> tasks;
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
};
int main() {
ThreadPool pool(4); // 创建4个线程的线程池
for (int i = 0; i < 8; ++i) {
pool.enqueue([i]() {
std::cout << "Task " << i << " executed by thread " << std::this_thread::get_id() << "\n";
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟工作
});
}
std::this_thread::sleep_for(std::chrono::seconds(1)); // 等待任务完成
return 0;
}
逻辑问题:
- 在enqueue时,如果线程池已经停止(stop=true),仍然可以添加任务,这可能导致在析构后执行任务,引发未定义行为。需要在enqueue时检查stop标志。
- 异常处理:如果task()抛出异常,线程将终止,导致线程池中的线程数减少,影响后续任务执行。
- stop标志可以改用原子类型。
二、优化建议与改进方案
1. 基础加固
问题修复代码:
// 修改 enqueue 函数
template<class F>
bool enqueue(F&& f) { // 返回是否提交成功
{
std::unique_lock<std::mutex> lock(queue_mutex);
if(stop) return false; // 池已关闭拒绝任务
tasks.emplace(std::forward<F>(f));
}
condition.notify_one();
return true;
}
2. 功能增强
(1) 支持任务返回值
template<class F>
auto enqueue(F&& f) -> std::future<decltype(f())> {
using RetType = decltype(f());
auto task = std::make_shared<std::packaged_task<RetType()>>(
std::forward<F>(f)
);
std::future<RetType> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
if(stop) throw std::runtime_error("enqueue on stopped pool");
tasks.emplace([task]() { (*task)(); });
}
condition.notify_one();
return res;
}
(2) 异常安全处理
// 修改工作线程循环
try {
task();
} catch (...) {
// 记录异常或传递异常
std::lock_guard<std::mutex> lock(exception_mutex);
exceptions.push_back(std::current_exception());
}
3. 性能优化
(1) 无锁队列替换
#include <boost/lockfree/queue.hpp> // 或实现无锁队列
class ThreadPool {
private:
boost::lockfree::queue<std::function<void()>> tasks{1000}; // 初始化容量
// 移除 mutex 和 condition_variable
};
(2) 动态线程调整
void resize(size_t new_size) {
if (new_size < workers.size()) {
// 减少线程
} else if (new_size > workers.size()) {
// 增加线程
}
}
三、完整优化代码示例
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <stdexcept>
#include <functional>
class ThreadPool {
public:
explicit ThreadPool(size_t threads, size_t max_tasks = 1000)
: stop(false), task_count(0) {
if(threads == 0) threads = std::thread::hardware_concurrency();
workers.reserve(threads);
for(size_t i = 0; i < threads; ++i) {
workers.emplace_back([this] {
while(true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queue_mutex);
condition.wait(lock, [this] {
return stop || !tasks.empty();
});
if(stop && tasks.empty()) return;
task = std::move(tasks.front());
tasks.pop();
--task_count;
}
try {
task();
} catch (...) {
std::lock_guard<std::mutex> elock(exception_mutex);
exceptions.push_back(std::current_exception());
}
}
});
}
}
template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type> {
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
if(stop) throw std::runtime_error("enqueue on stopped pool");
tasks.emplace([task](){ (*task)(); });
++task_count;
}
condition.notify_one();
return res;
}
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for(std::thread &worker : workers)
if(worker.joinable()) worker.join();
if(!exceptions.empty()) {
// 异常处理逻辑
}
}
private:
std::vector<std::thread> workers;
std::queue<std::function<void()>> tasks;
std::mutex queue_mutex;
std::condition_variable condition;
std::atomic_bool stop;
std::atomic<size_t> task_count;
std::mutex exception_mutex;
std::vector<std::exception_ptr> exceptions;
};
四、关键优化点总结
1.安全性增强:
-
原子操作
stop
和task_count
-
异常捕获与传递机制
-
入队前状态检查
2.功能扩展:
-
支持任务返回值(
future
/packaged_task
) -
动态线程数量调整预留接口
-
任务上限保护(通过
max_tasks
参数)
3.性能优化:
-
原子计数器减少锁竞争
-
任务批处理优化(示例未展示,可添加)
-
无锁队列选项(需第三方库支持)
4.工业级特性:
-
硬件并发数自动检测
-
资源释放保障(
joinable()
检查) -
防止任务堆积的拒绝策略
五、测试验证建议
#include<cassert>
/*
前面的线程池代码放在这
*/
// 验证用例
void test_pool() {
ThreadPool pool(4);
auto future = pool.enqueue([](int a) { return a*a; }, 5);
assert(future.get() == 25);
// 压力测试
for(int i=0; i<10000; ++i) {
pool.enqueue([i]{ std::this_thread::sleep_for(std::chrono::microseconds(1)); });
}
}
int main(int argc,char* argv[])
{
test_pool();
return 0;
}
通过静态分析工具(如Clang-Tidy)和动态测试(如ThreadSanitizer)确保线程安全。