当前位置: 首页 > news >正文

c++线程池的实现

目录

一、问题引出

二、优化建议与改进方案

1. 基础加固

2. 功能增强

3. 性能优化

三、完整优化代码示例

四、关键优化点总结

1.安全性增强:

2.功能扩展:

3.性能优化:

4.工业级特性:

五、测试验证建议


一、问题引出

在阿Q技术站看到一个c++线程池的实现。该线程池还有可以改进的地方。

#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <iostream>

class ThreadPool{
public:
    ThreadPool(size_t num_threads) : stop(false) {
        for(size_t i = 0; i < num_threads; ++i) {
            workers.emplace_back([this] {
                while(true) {
                    std::function<void()> task;
                    {
                        std::unique_lock<std::mutex> lock(this->queue_mutex);
                        this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); });
                        if(this->stop && this->tasks.empty())
                            return;
                        task = std::move(this->tasks.front());
                        this->tasks.pop();
                    }
                    task();
                }
            });
        }
    }

    template<class F>
    void enqueue(F&& f) {
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            tasks.emplace(std::forward<F>(f));
        }
        condition.notify_one();
    }

    ~ThreadPool() {
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            stop = true;
        }
        condition.notify_all();
        for(std::thread &worker : workers)
            worker.join();
    }

private:
    std::vector<std::thread> workers;
    std::queue<std::function<void()>> tasks;
    std::mutex queue_mutex;
    std::condition_variable condition;
    bool stop;
};

int main() {
    ThreadPool pool(4); // 创建4个线程的线程池
    for (int i = 0; i < 8; ++i) {
        pool.enqueue([i]() {
            std::cout << "Task " << i << " executed by thread " << std::this_thread::get_id() << "\n";
            std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟工作
        });
    }
    std::this_thread::sleep_for(std::chrono::seconds(1)); // 等待任务完成
    return 0;
}

逻辑问题:

  1. 在enqueue时,如果线程池已经停止(stop=true),仍然可以添加任务,这可能导致在析构后执行任务,引发未定义行为。需要在enqueue时检查stop标志。
  2. 异常处理:如果task()抛出异常,线程将终止,导致线程池中的线程数减少,影响后续任务执行。
  3. stop标志可以改用原子类型。

二、优化建议与改进方案

1. 基础加固

问题修复代码:

// 修改 enqueue 函数
template<class F>
bool enqueue(F&& f) {  // 返回是否提交成功
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        if(stop) return false;  // 池已关闭拒绝任务
        tasks.emplace(std::forward<F>(f));
    }
    condition.notify_one();
    return true;
}

2. 功能增强

(1) 支持任务返回值

template<class F>
auto enqueue(F&& f) -> std::future<decltype(f())> {
    using RetType = decltype(f());
    
    auto task = std::make_shared<std::packaged_task<RetType()>>(
        std::forward<F>(f)
    );
    
    std::future<RetType> res = task->get_future();
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        if(stop) throw std::runtime_error("enqueue on stopped pool");
        tasks.emplace([task]() { (*task)(); });
    }
    condition.notify_one();
    return res;
}

(2) 异常安全处理

// 修改工作线程循环
try {
    task();
} catch (...) {
    // 记录异常或传递异常
    std::lock_guard<std::mutex> lock(exception_mutex);
    exceptions.push_back(std::current_exception());
}

3. 性能优化

(1) 无锁队列替换

#include <boost/lockfree/queue.hpp> // 或实现无锁队列

class ThreadPool {
private:
    boost::lockfree::queue<std::function<void()>> tasks{1000}; // 初始化容量
    // 移除 mutex 和 condition_variable
};

(2) 动态线程调整

void resize(size_t new_size) {
    if (new_size < workers.size()) {
        // 减少线程
    } else if (new_size > workers.size()) {
        // 增加线程
    }
}

三、完整优化代码示例

#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <stdexcept>
#include <functional>

class ThreadPool {
public:
    explicit ThreadPool(size_t threads, size_t max_tasks = 1000) 
        : stop(false), task_count(0) {
        if(threads == 0) threads = std::thread::hardware_concurrency();
        workers.reserve(threads);
        for(size_t i = 0; i < threads; ++i) {
            workers.emplace_back([this] {
                while(true) {
                    std::function<void()> task;
                    {
                        std::unique_lock<std::mutex> lock(queue_mutex);
                        condition.wait(lock, [this] {
                            return stop || !tasks.empty();
                        });
                        
                        if(stop && tasks.empty()) return;
                        
                        task = std::move(tasks.front());
                        tasks.pop();
                        --task_count;
                    }
                    
                    try {
                        task();
                    } catch (...) {
                        std::lock_guard<std::mutex> elock(exception_mutex);
                        exceptions.push_back(std::current_exception());
                    }
                }
            });
        }
    }

    template<class F, class... Args>
    auto enqueue(F&& f, Args&&... args) 
        -> std::future<typename std::result_of<F(Args...)>::type> {
        using return_type = typename std::result_of<F(Args...)>::type;
        
        auto task = std::make_shared<std::packaged_task<return_type()>>(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );
        
        std::future<return_type> res = task->get_future();
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            if(stop) throw std::runtime_error("enqueue on stopped pool");
            
            tasks.emplace([task](){ (*task)(); });
            ++task_count;
        }
        condition.notify_one();
        return res;
    }

    ~ThreadPool() {
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            stop = true;
        }
        condition.notify_all();
        for(std::thread &worker : workers)
            if(worker.joinable()) worker.join();
        
        if(!exceptions.empty()) {
            // 异常处理逻辑
        }
    }

private:
    std::vector<std::thread> workers;
    std::queue<std::function<void()>> tasks;
    std::mutex queue_mutex;
    std::condition_variable condition;
    std::atomic_bool stop;
    std::atomic<size_t> task_count;
    
    std::mutex exception_mutex;
    std::vector<std::exception_ptr> exceptions;
};

四、关键优化点总结

1.安全性增强

  • 原子操作 stop 和 task_count

  • 异常捕获与传递机制

  • 入队前状态检查

2.功能扩展

  • 支持任务返回值(future/packaged_task

  • 动态线程数量调整预留接口

  • 任务上限保护(通过 max_tasks 参数)

3.性能优化

  • 原子计数器减少锁竞争

  • 任务批处理优化(示例未展示,可添加)

  • 无锁队列选项(需第三方库支持)

4.工业级特性

  • 硬件并发数自动检测

  • 资源释放保障(joinable() 检查)

  • 防止任务堆积的拒绝策略


五、测试验证建议

#include<cassert>
/*
前面的线程池代码放在这
*/

// 验证用例
void test_pool() {
    ThreadPool pool(4);
    auto future = pool.enqueue([](int a) { return a*a; }, 5);
    assert(future.get() == 25);
    
    // 压力测试
    for(int i=0; i<10000; ++i) {
        pool.enqueue([i]{ std::this_thread::sleep_for(std::chrono::microseconds(1)); });
    }
}

int main(int argc,char* argv[])
{
    test_pool();
    return 0;
}

通过静态分析工具(如Clang-Tidy)和动态测试(如ThreadSanitizer)确保线程安全。

相关文章:

  • 计算机毕业设计SpringBoot+Vue.js客户关系管理系统CRM(源码+文档+PPT+讲解)
  • 【区块链 + 绿色低碳】郑州数据交易中心双碳数据服务专区 | FISCO BCOS 应用案例
  • HashMap 的底层结构详解:原理、put和get示例
  • PMP项目管理—资源管理篇—3.获取资源
  • 文本处理Bert面试内容整理-BERT的基本原理是什么?
  • 03.购物单
  • IO进程思维导图和练习题
  • 防火墙虚拟系统
  • DeepSeek:大模型领域的创新力量
  • 自学嵌入式第27天------TCP和UDP,URL爬虫
  • 物联网感应层数据采集器实现协议转换 数据格式化
  • 在日常生活、工作中deepseek能帮我们解决哪些问题
  • 【JavaScript—前端快速入门】JavaScript 对象与函数
  • 计算机常用单词
  • 需求管理工具选型指南:Jama Connect +Jira vs Word/Excel+Jira
  • 网络安全检查漏洞内容回复 网络安全的漏洞
  • ArcGIS操作:13 生成最小外接矩阵
  • 基于STM32的环境监测系统(自制蓝牙APP)
  • Lua脚本使用教学指南:与Spring Boot项目集成示例
  • 动态内存分配
  • 拉萨网站建设价格/宁波网络营销推广公司
  • 乐平市网站建设/免费b站在线观看人数在哪儿
  • 搜索引擎网站推广定义/宁波正规优化seo公司
  • 长葛做网站/网页制作软件有哪些
  • 如果只做p2p种子搜索网站/nba新闻最新消息
  • 开发公司消防未移交物业/郑州seo培训