当前位置: 首页 > news >正文

线程池介绍,分类,实现(工作原理,核心组成,拒绝策略),固态线程池的实现+详细解释(支持超时取消机制和不同的拒绝策略)

目录

线程池

介绍

分类

实现

工作原理

核心组成

拒绝策略 

固态线程池

功能

std::future

实现

拒绝策略支持

提交任务

超时取消

用户检测取消

安全销毁

代码

测试


线程池

介绍

线程池(图解,本质,模拟实现代码),添加单例模式(懒汉思路+代码)_线程池单例-CSDN博客

  • 包括线程池介绍+使用pthread库接口

这里我们就使用c++11中的thread库来实现一下,并且引入更多特性

分类

固定大小线程池(Fixed-size Thread Pool)

  • 线程池中的线程数在初始化时被设定并保持固定,不会根据负载自动扩展或收缩。

  • 适用于负载较为平稳的场景,例如Web服务器、数据库连接池等。

动态线程池(DynamicThread Pool)

  • 线程池的线程数是动态变化的,根据任务的数量来增加或减少线程数。

  • 当线程池中没有任务时,线程会被回收(通常有一个最大线程数限制)。

  • 适用于任务量不稳定、并发变化较大的场景,如文件处理、短时间的批量任务等。

单线程化线程池(Single-threaded Thread Pool)

  • 线程池中只有一个线程,所有任务都会按顺序提交给这个线程执行。

  • 适用于串行化任务,如日志记录、事件驱动任务等。

调度线程池(Scheduled Thread Pool)

  • 该线程池支持任务的延迟执行和周期性执行。

  • 通常用于定时任务或周期性任务,例如定时清理缓存等。

实现

工作原理

  • 线程池初始化:线程池初始化时创建一定数量的工作线程,并使其处于“等待”状态,准备执行任务
  • 任务提交:当有新任务提交时,线程池将任务放入任务队列中
  • 任务执行:线程池中的空闲线程从任务队列中取出任务并执行
  • 任务完成:任务执行完后,线程回到等待状态,准备接收新的任务
  • 线程销毁:当线程池中的线程闲置超过一定时间,线程池可以销毁一些线程以释放系统资源(适用于动态线程池)

核心组成

一个线程池一般由以下几个核心组件组成:

线程工作线程集合

  • 一组预先创建的固定数量或动态伸缩(包括核心线程和临时创建)的线程
  • 每个线程循环从任务队列中获取任务执行

任务队列

  • 用于存放等待执行的任务,一般为线程安全的队列(如 std::queue + mutex),支持任务入队/出队

任务提交接口

  • 对外暴露的函数接口,用于将任务提交到线程池,如 submit()

同步机制

  • 用于保护共享资源和协调线程间关系(每个线程都要访问任务队列)
  • 常用 mutex, condition_variable, atomic

任务拒绝策略(Rejection Policy)

  • 当队列已满时,决定如何处理新任务(下面介绍)

生命周期管理(Shutdown & Destruction)

  • 控制线程池的启动、停止、销毁,确保线程安全退出并释放资源

可选的任务取消机制

  • 支持任务在执行中被取消(比如因当前任务执行超时等原因)

可选的任务超时机制

  • 为每个任务设置执行超时,到期未完成的任务自动取消或通知中断

拒绝策略 

抛出异常,拒绝执行(默认策略)

  • 抛出 RejectedExecutionException 异常,告知任务无法执行
  • 适合你希望及时发现问题并中止提交任务

由提交任务的线程执行

  • 将任务回退给调用者,即提交任务的线程执行任务,而不是交给线程池中的线程处理
  • 可以起到“削峰”作用,但影响主线程性能(会阻塞提交线程)

静默丢弃

  • 丢弃无法执行的任务,不抛出异常
  • 适合对任务可丢弃、无严重后果的场景(如日志)

丢弃队列中最旧的任务,然后尝试重新提交当前任务

  • 适合任务有时效性(如更新 UI、股票报价)

固态线程池

功能

支持:

  • 固定线程数: 构造时指定线程数,固定数量线程常驻执行任务

  • 有界队列: 支持设置任务队列最大容量,避免过载

  • 拒绝策略支持: 支持 BLOCK(等待)、DISCARD(丢弃)、THROW(抛异常)三种策略

  • 提交任务: 提交 void(exec_context) 类型任务,通过适配器转换为 void() 并存入任务队列,最终返回 std::future<void>

  • 超时取消: 自动按任务类型 (因为我这里是把自己写的搜索引擎项目中增设的线程池作为例子,所以这里的类型就分为建立索引,搜索,搜索联想) , 设置超时时间,到时通知任务取消

  • 用户检测取消: 任务内部可用 canceled() 检测取消请求并安全退出

  • 安全销毁: 析构时安全停止所有线程并等待退出

  • 线程安全: 所有关键资源由 mutexcondition_variable 保护,支持多线程并发提交任务

std::future

std::future<T> 是 C++11 引入的标准库类型

  • 可以异步获取一个任务的执行结果
  • 作用 -- 当你把一个任务提交给线程池时,这个任务可能要等一会儿才能执行(毕竟线程有限,排队中),那么你就需要一个东西来 “将来拿到结果”

实现

拒绝策略支持

定义了一个枚举类

    enum class RejectionPolicy{BLOCK,DISCARD,THROW};

当任务提交后,根据当前的任务队列使用情况和拒绝策略的设置,决定对任务的处理方式

{std::unique_lock<std::mutex> lock(mtx_);// 阻塞策略:等待有空间if (tasks_.size() >= max_queue_size_ && !stop_){if (reject_policy_ == RejectionPolicy::BLOCK){//因为这里是带谓词的wait,所以即使虚假唤醒也会在条件为真后返回cond_.wait(lock, [this]{ return tasks_.size() < max_queue_size_ || stop_; });}else if (reject_policy_ == RejectionPolicy::DISCARD){throw std::runtime_error("Task queue is full. Task was discarded.");}else if (reject_policy_ == RejectionPolicy::THROW){throw std::runtime_error("Task queue is full.");}}if (stop_){throw std::runtime_error("ThreadPool is stopping. Cannot accept new tasks.");}tasks_.emplace([taskWrapper](){ (*taskWrapper)(); });}

这里的静默丢弃

  • 虽然定义上是不抛出异常,但为了调用方知道任务没被接收,还是加上了

这里的while循环是为了防止虚假唤醒:

提交任务
    // 传入void(exec_context) ,因为内部也需要配合template <class F>std::future<void> submit(const std::string &taskType, F &&f){auto timeout = getTimeoutForTask(taskType);auto cancellableTask = std::make_shared<CancellableTask>();cancellableTask->func = std::forward<F>(f);// 管理任务执行结果auto taskWrapper = std::make_shared<std::packaged_task<void()>>([cancellableTask](){ (*cancellableTask)(); });// 从packaged_task中获取一个关联的future对象std::future<void> taskFuture = taskWrapper->get_future();{std::unique_lock<std::mutex> lock(mtx_);// 阻塞策略:等待有空间if (tasks_.size() >= max_queue_size_ && !stop_){if (reject_policy_ == RejectionPolicy::BLOCK){cond_.wait(lock, [this]{ return tasks_.size() < max_queue_size_ || stop_; });}else if (reject_policy_ == RejectionPolicy::DISCARD){throw std::runtime_error("Task queue is full. Task was discarded.");}else if (reject_policy_ == RejectionPolicy::THROW){throw std::runtime_error("Task queue is full.");}}if (stop_){throw std::runtime_error("ThreadPool is stopping. Cannot accept new tasks.");}tasks_.emplace([taskWrapper](){ (*taskWrapper)(); });}cond_.notify_one();// 启动一个后台线程监控超时取消std::thread([taskFuture = std::move(taskFuture),controller = cancellableTask->controller, timeout]() mutable{if (taskFuture.wait_for(timeout) == std::future_status::timeout) {controller->notify_cancel();std::cerr << "[Timeout] Task exceeded time limit and was cancelled.\n";} }).detach();return taskFuture;}

这里因为任务队列中使用的void()类型,而外部传入的是void(exec_context) (因为需要内部配合停止)

  • 所以需要对类型进行一个转换
  • 也就是通过cancellableTask 这个可取消的任务封装器,实际是一个无参可调用对象,内部实例化一个exec_context ,然后调用带参数版本的函数
  • 最后将 (*taskWrapper)() 放入队列,实际就是调用了cancellableTask的operator()()
  • 于是实现了将void(exec_context) -> void()
超时取消

首先,是定义了两个模块,分别的作用是 --  取消状态的设置 和 作为对外接口

// 控制标识符
struct exec_controller
{bool notify_cancel() { return _should_cancel.exchange(true); }bool should_cancel() const { return _should_cancel; }private:std::atomic<bool> _should_cancel{false};
};
// 判断是否被取消
struct exec_context
{exec_context(std::shared_ptr<exec_controller> impl): _impl(std::move(impl)) {}bool canceled() const { return _impl->should_cancel(); }private:std::shared_ptr<exec_controller> _impl;
};
  • 使用原子变量避免竞态条件
  • 读取控制器可以被多个任务共享 (通过智能指针控制生命周期)

其次,定义了一个可取消的任务封装器

struct CancellableTask
{// 封装一个取消控制器std::shared_ptr<exec_controller> controller =std::make_shared<exec_controller>();// 将取消上下文封装进普通任务中std::function<void(exec_context)> func;void operator()(){exec_context ctx{controller};func(ctx);}
};

然后,还定义了根据不同任务类型,返回对应超时时间的函数

 static std::chrono::millisecondsgetTimeoutForTask(const std::string &taskType){if (taskType == ns_helper::TASK_TYPE_BUILD_INDEX){return std::chrono::seconds(120);}else if (taskType == ns_helper::TASK_TYPE_PERSIST_INDEX){return std::chrono::seconds(4 * 3600);}else if (taskType == ns_helper::TASK_TYPE_SEARCH ||taskType == ns_helper::TASK_TYPE_AUTOCOMPLETE){return std::chrono::seconds(1);}else{return std::chrono::seconds(1);}}
  • 我这里因为是搜索引擎,并且没有做性能优化,所以设置的时间比较长(跪)

并且,启动了一个后台线程去监控任务状态

        // 启动一个后台线程监控超时取消std::thread([taskFuture = std::move(taskFuture),controller = cancellableTask->controller, timeout]() mutable{if (taskFuture.wait_for(timeout) == std::future_status::timeout) {controller->notify_cancel();std::cerr << "[Timeout] Task exceeded time limit and was cancelled.\n";} }).detach();
  • 这里就只讲一下lambda内部逻辑,传参会在下面的核心逻辑介绍
  • 总之,这个线程会等待任务完成timeout秒,如果未完成,则任务超时 -> 设置取消控制器的标识符
  • 这里还设置了调用了datach,让它和主线程分离(也就是后台运行)

最后,就是核心逻辑了

    // 传入void(exec_context) ,因为内部也需要配合template <class F>std::future<void> submit(const std::string &taskType, F &&f){auto timeout = getTimeoutForTask(taskType);auto cancellableTask = std::make_shared<CancellableTask>();cancellableTask->func = std::forward<F>(f);// 管理任务执行结果auto taskWrapper = std::make_shared<std::packaged_task<void()>>([cancellableTask](){ (*cancellableTask)(); });// 从packaged_task中获取一个关联的future对象std::future<void> taskFuture = taskWrapper->get_future();{std::unique_lock<std::mutex> lock(mtx_);// 阻塞策略:等待有空间if (tasks_.size() >= max_queue_size_ && !stop_){if (reject_policy_ == RejectionPolicy::BLOCK){cond_.wait(lock, [this]{ return tasks_.size() < max_queue_size_ || stop_; });}else if (reject_policy_ == RejectionPolicy::DISCARD){throw std::runtime_error("Task queue is full. Task was discarded.");}else if (reject_policy_ == RejectionPolicy::THROW){throw std::runtime_error("Task queue is full.");}}if (stop_){throw std::runtime_error("ThreadPool is stopping. Cannot accept new tasks.");}tasks_.emplace([taskWrapper](){ (*taskWrapper)(); });}cond_.notify_one();// 启动一个后台线程监控超时取消std::thread([taskFuture = std::move(taskFuture),controller = cancellableTask->controller, timeout]() mutable{if (taskFuture.wait_for(timeout) == std::future_status::timeout) {controller->notify_cancel();std::cerr << "[Timeout] Task exceeded time limit and was cancelled.\n";} }).detach();return taskFuture;}
  • 其实大头设计就是那些组件,把它们组合起来使用+任务代码中显式检测取消标识,就能实现超时取消机制  -- 在后台单独启动一个线程,监控提交的任务是否在规定的超时时间内完成,如果超时则通知取消
用户检测取消

传入的任务也需要配合(检测取消标识符,为真时结束执行)

void test_cancel_task(exec_context ctx)
{std::cout << "Task started.\n";for (int i = 0; i < 50; ++i){if (ctx.canceled()){std::cout << "Task detected cancellation, exiting early.\n";return;}std::this_thread::sleep_for(std::chrono::milliseconds(100));std::cout << "Task working... step " << i << "\n";}std::cout << "Task completed normally.\n";
}
安全销毁
 explicit FixedThreadPool(size_t thread_count, size_t max_queue_size = 1000,RejectionPolicy policy = RejectionPolicy::BLOCK): max_queue_size_(max_queue_size), reject_policy_(policy), stop_(false){for (size_t i = 0; i < thread_count; ++i){workers_.emplace_back([this]{while (true) {std::function<void()> task;{std::unique_lock<std::mutex> lock(mtx_);cond_.wait(lock, [this] { return stop_ || !tasks_.empty(); });if (stop_ && tasks_.empty())return;task = std::move(tasks_.front());tasks_.pop();}task();} });}}
~FixedThreadPool(){{std::unique_lock<std::mutex> lock(mtx_);stop_ = true;}cond_.notify_all();for (std::thread &worker : workers_){if (worker.joinable()){worker.join();}}}
  • 释放资源前,先设置停止标志,并唤醒所有线程,让线程检测到停止标识后,安全地将队列中的遗留任务处理完毕,然后释放线程资源

代码

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <exception>
#include <functional>
#include <future>
#include <iostream>
#include <mutex>
#include <optional>
#include <queue>
#include <stdexcept>
#include <string>
#include <thread>
#include <vector>#include "../code/assistance.hpp"// --- exec control logic ---struct exec_controller
{bool notify_cancel() { return _should_cancel.exchange(true); }bool should_cancel() const { return _should_cancel; }private:std::atomic<bool> _should_cancel{false};
};struct exec_context
{exec_context(std::shared_ptr<exec_controller> impl): _impl(std::move(impl)) {}bool canceled() const { return _impl->should_cancel(); }private:std::shared_ptr<exec_controller> _impl;
};// --- CancellableTask ---struct CancellableTask
{std::shared_ptr<exec_controller> controller =std::make_shared<exec_controller>();std::function<void(exec_context)> func;void operator()(){exec_context ctx{controller};func(ctx);}
};// --- FixedThreadPool ---class FixedThreadPool
{
public:enum class RejectionPolicy{BLOCK,DISCARD,THROW};explicit FixedThreadPool(size_t thread_count, size_t max_queue_size = 1000,RejectionPolicy policy = RejectionPolicy::BLOCK): max_queue_size_(max_queue_size), reject_policy_(policy), stop_(false){for (size_t i = 0; i < thread_count; ++i){workers_.emplace_back([this]{while (true) {std::function<void()> task;{std::unique_lock<std::mutex> lock(mtx_);cond_.wait(lock, [this] { return stop_ || !tasks_.empty(); });if (stop_ && tasks_.empty())return;task = std::move(tasks_.front());tasks_.pop();}task();} });}}~FixedThreadPool(){{std::unique_lock<std::mutex> lock(mtx_);stop_ = true;}cond_.notify_all();for (std::thread &worker : workers_){if (worker.joinable()){worker.join();}}}template <class F>std::future<void> submit(const std::string &taskType, F &&f){auto timeout = getTimeoutForTask(taskType);auto cancellableTask = std::make_shared<CancellableTask>();cancellableTask->func = std::forward<F>(f);auto taskWrapper = std::make_shared<std::packaged_task<void()>>([cancellableTask](){ (*cancellableTask)(); });std::future<void> taskFuture = taskWrapper->get_future();{std::unique_lock<std::mutex> lock(mtx_);// 阻塞策略:等待有空间if (reject_policy_ == RejectionPolicy::BLOCK){cond_.wait(lock,[this]{ return tasks_.size() < max_queue_size_ || stop_; });}// 丢弃策略:抛出异常(不再返回默认构造的 future)else if (reject_policy_ == RejectionPolicy::DISCARD){if (tasks_.size() >= max_queue_size_){throw std::runtime_error("Task queue is full. Task was discarded.");}}// 异常策略:同样抛出异常else if (reject_policy_ == RejectionPolicy::THROW){if (tasks_.size() >= max_queue_size_){throw std::runtime_error("Task queue is full.");}}if (stop_){throw std::runtime_error("ThreadPool is stopping. Cannot accept new tasks.");}tasks_.emplace([taskWrapper](){ (*taskWrapper)(); });}cond_.notify_one();// 启动一个后台线程监控超时取消std::thread([taskFuture = std::move(taskFuture),controller = cancellableTask->controller, timeout]() mutable{if (taskFuture.wait_for(timeout) == std::future_status::timeout) {controller->notify_cancel();std::cerr << "[Timeout] Task exceeded time limit and was cancelled.\n";} }).detach();return taskFuture;}private:static std::chrono::millisecondsgetTimeoutForTask(const std::string &taskType){if (taskType == ns_helper::TASK_TYPE_BUILD_INDEX){return std::chrono::seconds(120);}else if (taskType == ns_helper::TASK_TYPE_PERSIST_INDEX) // ✅ 修复这里{return std::chrono::seconds(4 * 3600);}else if (taskType == ns_helper::TASK_TYPE_SEARCH ||taskType == ns_helper::TASK_TYPE_AUTOCOMPLETE){return std::chrono::seconds(1);}else{return std::chrono::seconds(1);}}std::vector<std::thread> workers_;std::queue<std::function<void()>> tasks_;std::mutex mtx_;std::condition_variable cond_;bool stop_;size_t max_queue_size_;RejectionPolicy reject_policy_;
};
测试
void test_cancel_task(exec_context ctx)
{std::cout << "Task started.\n";for (int i = 0; i < 50; ++i){if (ctx.canceled()){std::cout << "Task detected cancellation, exiting early.\n";return;}std::this_thread::sleep_for(std::chrono::milliseconds(100));std::cout << "Task working... step " << i << "\n";}std::cout << "Task completed normally.\n";
}int main()
{FixedThreadPool fp(2, 10, FixedThreadPool::RejectionPolicy::BLOCK);auto future = fp.submit("search", test_cancel_task);try{if (future.valid()){future.get();}}catch (const std::exception &e){std::cout << "Task threw exception: " << e.what() << "\n";}return 0;
}

动态线程池会在另一篇讲(我还没写 1 - 1) ,大家也可以帮我纠一纠错(跪倒)

http://www.dtcms.com/a/208745.html

相关文章:

  • OBS 玩转你直播录视频
  • 【SSL部署与优化​】​​OCSP Stapling配置指南:减少证书验证延迟​​
  • ten-vad:低延迟、轻量化且高性能的流式语音活动检测系统
  • HTTP协议接口三种测试方法之-postman
  • vue pinia 独立维护,仓库统一导出
  • leetcode hot100刷题日记——11.相交链表
  • Linux 之 Ubuntu Server 安装
  • 【Bluedroid】蓝牙HID Host disconnect流程源码解析
  • 【Hexo】3.主题
  • AI-02a5a8.神经网络-与学习相关的技巧-超参数的验证
  • java对接全文检索MeiliSearch
  • Linux之 SPI 驱动框架- spi-mem 框架
  • 【深尚想!爱普特APT32F1023H8S6单片机重构智能电机控制新标杆】
  • 【Java高阶面经:消息队列篇】24、Kafka消息顺序保障:单分区与多分区的性能优化
  • Appium+python自动化(三)- SDK Manager
  • 古文时空重构:当AI把课本诗词做成4D电影
  • 亚马逊云科技推出Anthropic新一代模型
  • Anthropic公司近日发布了两款新一代大型语言模型Claude Opus 4与Claude Sonnet 4
  • 2025年开源大模型技术全景图
  • 算法学习路径
  • 第六章 进阶12 周报的妙用
  • 案例分享——牛路水库安全监测之倒垂线、双金属标、多点位移计安装
  • 《C++20新特性全解析:模块、协程与概念(Concepts)》
  • HardFault_Handler调试及问题方法
  • redis 基本命令-17 (KEYS、EXISTS、TYPE、TTL)
  • 【MySQL】第6节|深入理解Mysql事务隔离级别与锁机制
  • 智慧应急指挥调度系统:构建城市安全“防护罩”
  • 企业知识管理面临的挑战与飞书知识问答的解决方案
  • 软件中级考试之软件设计师下午篇ER图做题方法总结
  • Android帧抢占协议技术剖析:触摸事件与UI绘制的智能调度优化方案