Linux线程池(上)(33)
文章目录
- 前言
- 一、线程池的概念
- 池化技术
- 线程池的优点
- 线程池的使用场景
- 二、线程池的实现
- v1版本
- v2版本
- 总结
前言
终于要结束了,写完这篇再来篇有关锁的文章,我们就可以结束 Linux系统编程 部分啦!
线程池是一种管理线程的机制,它可以在需要时自动创建和销毁线程,以及分配和回收线程资源。线程池的主要优点是减少了频繁创建和销毁线程所带来的开销,提高了系统的稳定性和可扩展性。
知乎优秀文章:什么是线程池?线程池 ThreadPoolExecutor 使用及其原理又是什么?
一、线程池的概念
池化技术
所谓的 线程池 就是 提前创建一批线程,当任务来临时,线程直接从任务队列中获取任务执行,可以提高整体效率;同时一批线程会被合理维护,避免调度时造成额外开销
像这种把未来会高频使用到,并且创建较为麻烦的资源提前申请好的技术称为 池化技术,池化技术 可以极大地提高性能,最典型的就是 线程池,常用于各种涉及网络连接相关的服务中,比如 MySQL 连接池、HTTP 连接池、Redis 连接池 等
除了线程池外还有内存池,比如 STL 中的容器在进行空间申请时,都是直接从 空间配置器 allocator 中获取的,并非直接使用系统调用来申请空间
其实未来我还打算在C++专栏中讲解 allocator 这个知识点
池化技术 的本质:空间换时间
池化技术 就好比你把钱从银行提前取出一部分放在支付宝中,可以随时使用,十分方便和高效,总不至于需要用钱时还得跑到银行排队取钱
线程池的优点
线程池 的优点在于 高效、方便
- 线程在使用前就已经创建好了,使用时直接将任务交给线程完成
- 线程会被合理调度,确保 任务与线程 间能做到负载均衡
线程池 中的线程数量不是越多越好,因为线程增多会导致调度变复杂,具体创建多少线程取决于具体业务场景,比如 处理器内核、剩余内存、网络中的 socket 数量等
线程池 还可以配合 「生产者消费者模型」 一起使用,做到 解耦与提高效率
可以把 任务队列 换成 「生产者消费者模型」
线程池的使用场景
- 存在大量且短小的任务请求,比如 Web 服务器中的网页请求,使用 线程池 就非常合适,因为网页点击量众多,并且大多都没有长时间连接访问
- 对性能要求苛刻,力求快速响应需求,比如游戏服务器,要求对玩家的操作做出快速响应
- 突发大量请求,但不至于使服务器产生过多的线程,短时间内,在服务器创建大量线程会使得内存达到极限,造成出错,可以使用 线程池 规避问题
二、线程池的实现
这块有点难,但是别怕,跟着我一步一步来!!!
v1版本
「朴素版」:实现最基本的线程池功能,直接使用系统提供的接口
先创建 ThreadPool.hpp 头文件
将 线程池 实现为一个类,提供接口供外部调用
首先要明白 线程池 的两大核心:一批线程 与 任务队列,客户端发出请求,新增任务,线程获取任务,执行任务,因此 ThreadPool.hpp 的大体框架如下
- 一批线程,通过容器管理
- 任务队列,存储就绪的任务
- 互斥锁
- 条件变量
互斥锁 的作用是 保证多个线程并发访问任务队列时的线程安全,而 条件变量 可以在 任务队列 为空时,让一批线程进入等待状态,也就是线程同步
注:为了方便实现,直接使用系统调用接口及容器,比如 pthread_t、vector、queue 等
#pragma once#include <vector>
#include <string>
#include <queue>
#include <memory>
#include <unistd.h>
#include <pthread.h>#define THREAD_NUM 10template<class T>
class ThreadPool
{
public:ThreadPool(int num = THREAD_NUM):_threads(num), _num(num){// 初始化互斥锁和条件变量pthread_mutex_init(&_mtx, nullptr);pthread_cond_init(&_cond, nullptr);}~ThreadPool(){// 互斥锁、条件变量pthread_mutex_destroy(&_mtx);pthread_cond_destroy(&_cond);}void init(){// 其他信息初始化(当前不需要)}void start(){// 启动线程池// ...}// 提供给线程的回调函数static void* threadRoutine(void *args){// 业务处理// ...}private:std::vector<pthread_t> _threads;int _num; // 线程数量std::queue<T> _tasks; // 利用 STL 自动扩容的特性,无需担心容量pthread_mutex_t _mtx;pthread_cond_t _cond;
};
注意:
- 需要提前给 vector 扩容,避免后面使用时发生越界访问
- 提供给线程的回调函数需要设置为静态,否则线程调不动(参数不匹配)
现在我们开始来填充函数体!
初始化线程池 init() — 位于 ThreadPool 类
当前场景只需要初始化 互斥锁 和 条件变量,在 构造函数 中完成就行了,所以这里的 init() 函数不需要补充
启动线程池 start() — 位于 ThreadPool 类
启动 线程池 需要先创建出一批线程,这里直接循环创建即可
void start()
{// 创建一批线程并启动for(int i = 0; i < _num; i++)pthread_create(&_threads[i], nullptr, threadRoutine, nullptr); // (存疑)
}
线程的回调函数 threadRoutine() — 位于 ThreadPool 类
这里进行简单测试,打印当前线程的线程 ID 就行了,并且直接 detach,主线程无需等待次线程运行结束
// 提供给线程的回调函数
static void* threadRoutine(void* args)
{// 避免等待线程,直接剥离pthread_detach(pthread_self());while (true){std::cout << "Thread Running... " << pthread_self() << std::endl;sleep(1);}
}
创建 main.cc 源文件,测试线程池的代码
#include "ThreadPool.hpp"
#include <memory>int main()
{std::unique_ptr<ThreadPool<int>> ptr(new ThreadPool<int>());ptr->init();ptr->start();// 还有后续动作return 0;
}
编译并运行代码,可以看到 确实创建了一批线程,当主线程退出后,其他次线程也就跟着终止了
线程池 还需要提供一个重要的接口 pushTask(),将用户需要执行的业务装载至 任务队列 中,等待线程执行
装载任务 pushTask() — 位于 ThreadPool 类
// 装载任务
void pushTask(const T& task)
{// 本质上就是在生产商品,需要加锁保护pthread_mutex_lock(&_mtx);_tasks.push(task);// 唤醒消费者进行消费pthread_cond_signal(&_cond);pthread_mutex_unlock(&_mtx);
}
装载任务的本质就是在生产任务,相当于用户充当生产者,通过这个接口将任务生产至任务队列中,而线程充当消费者,从任务队列中获取任务并消费
所以线程的回调函数需要从 任务队列 中获取任务,进行消费
- 检测是否有任务
- 有 -> 消费
- 没有 -> 等待
线程回调函数 threadRoutine() — 位于 ThreadPool 类
// 提供给线程的回调函数
static void *threadRoutine(void *args)
{// 避免等待线程,直接剥离pthread_detach(pthread_self());while (true){// 任务队列是临界资源,需要保护pthread_mutex_lock(&_mtx);// 等待条件满足while(_tasks.empty())pthread_cond_wait(&_cond, &_mtx);T task = _tasks.front();_tasks.pop();// task(); // 进行消费(存疑)pthread_mutex_unlock(&_mtx);}
}
注意: 判断任务队列是否为空需要使用 while,确保在多线程环境中不会出现问题
因为 任务队列、互斥锁、条件变量 是类内成员,而这里的 threadRoutine() 函数是一个静态函数,并没有 this 指针以访问类内成员,可以采取传递 this 指针的方式解决问题
启动线程池 start() — 位于 ThreadPool 类
void start()
{// 创建一批线程并启动for(int i = 0; i < _num; i++)pthread_create(&_threads[i], nullptr, threadRoutine, this); // 传递 this 指针
}
threadRoutine() 函数需要将参数 void* 转化为所在类对象的指针,并通过该指针访问类内成员
线程回调函数 threadRoutine() — 位于 ThreadPool 类
// 提供给线程的回调函数
static void* threadRoutine(void* args)
{// 避免等待线程,直接剥离pthread_detach(pthread_self());auto ptr = static_cast<ThreadPool<T>*>(args);while (true){// 任务队列是临界资源,需要保护pthread_mutex_lock(&ptr->_mtx);// 等待条件满足while(ptr->_tasks.empty())pthread_cond_wait(&ptr->_cond, &ptr->_mtx);T task = ptr->_tasks.front();ptr->_tasks.pop();//task(); // 进行消费(存疑)pthread_mutex_unlock(&ptr->_mtx);}
}
为了使得提高代码的可阅读性及可拓展性,这里还会封装一批接口,供函数调用
加锁、解锁 — 位于 ThreadPool 类
void lockQueue()
{pthread_mutex_lock(&_mtx);
}void unlockQueue()
{pthread_mutex_unlock(&_mtx);
}
等待、唤醒 — 位于 ThreadPool 类
void threadWait()
{pthread_cond_wait(&_cond, &_mtx);
}void threadWakeUp()
{pthread_cond_signal(&_cond);
}
判空、获取任务 — 位于 ThreadPool 类
bool isEmpty()
{return _tasks.empty();
}T popTask()
{T task = _tasks.front();_tasks.pop();return task;
}
接口封装完毕后,可以顺便修改之前的代码,比如 装载任务 pushTask()
装载任务 pushTask() — 位于 ThreadPool 类
// 装载任务
void pushTask(const T& task)
{// 本质上就是在生产商品,需要加锁保护lockQueue();_tasks.push(task);// 唤醒消费者进行消费threadWakeUp();unlockQueue();
}
以及 消费者 threadRountine()
线程回调函数 threadRoutine() — 位于 ThreadPool 类
// 提供给线程的回调函数
static void* threadRoutine(void* args)
{// 避免等待线程,直接剥离pthread_detach(pthread_self());auto ptr = static_cast<ThreadPool<T>*>(args);while (true){// 任务队列是临界资源,需要保护ptr->lockQueue();// 等待条件满足while(ptr->isEmpty())ptr->threadWait();T task = ptr->popTask();pthread_mutex_unlock(&ptr->_mtx);// 消费行为可以不用加锁(一个商品只会被一个线程消费)task(); }
}
细节1: 轮到线程执行任务时,不需要加锁,这就好比你买桶泡面回家,是不必担心别人会和你争抢,可以慢慢消费;同样的,你也不应该占用锁资源,主动让出锁资源以提高整体效率
细节2:task() 表示执行任务,这里实际是一个 operator()() 的重载
轮到 main.cc 进行操作了,逻辑很简单:创建线程池对象,初始化线程池,启动线程池,装载任务,等待运行结果
补充 main.cc
#include "ThreadPool.hpp"
#include <memory>typedef Task<int> type;int main()
{std::unique_ptr<ThreadPool<type>> ptr(new ThreadPool<type>());ptr->init();ptr->start();// 还有后续动作while(true){// 输入 操作数 操作数 操作符int x = 0, y = 0;char op = '+';std::cout << "输入 x: ";std::cin >> x;std::cout << "输入 y: ";std::cin >> y;std::cout << "输入 op: ";std::cin >> op;// 构建任务对象type task(x, y, op);// 装载任务ptr->pushTask(task);}return 0;
}
现在还有最后一个问题:如何获取计算结果?可以在 线程 执行完任务后,直接显示计算结果,也可以通过传入回调函数的方式,获取计算结果,前者非常简单,只需要在 threadRoutine() 中加入这行代码即可
线程回调函数 threadRoutine() — 位于 ThreadPool 类
void* threadRoutine(void* args)
{// ...// 显示计算结果std::cout << task.getResult() << std::endl;
}
除此之外,我们也可以通过 回调函数 的方式获取计算结果
目标:给线程传入一个回调函数,线程执行完任务后,将任务传给回调函数,回调函数结合业务逻辑,灵活处理结果
单纯打印的话,很容易就可以写出这个回调函数
回调函数 callBack() — 位于 main.cc 源文件
// 回调函数
void callBack(type& task)
{// 获取计算结果后打印std::string ret = task.getResult();std::cout << "计算结果为: " << ret;
}
为了能让 线程 在执行任务后能回调,需要将这个函数对象作为参数,传递给 ThreadPool 对象
main.cc
// ...int main()
{std::unique_ptr<ThreadPool<type>> ptr(new ThreadPool<type>(callBack));// ...
}
当然,这边传递了一个对象,那边就得接收此对象,为了存储该函数对象,ThreadPool 新增一个类成员:_func,函数对象类型为 void (T& )
修改 ThreadPool.hpp 头文件
// ...
#include <functional>#define THREAD_NUM 10template<class T>
class ThreadPool
{using func_t = std::function<void(T&)>; // 包装器public:ThreadPool(func_t func, int num = THREAD_NUM):_threads(num), _num(num), _func(func){// 初始化互斥锁和条件变量pthread_mutex_init(&_mtx, nullptr);pthread_cond_init(&_cond, nullptr);}// ...private:// ...func_t _func;
};
修改完成后,创建 ThreadPool 对象时,支持传入一个类型为 void(T&) 的函数对象
获取函数对象后,需要让 线程 在执行完任务后进行回调,但又因为这玩意是一个类内成员,同样需要借助外部传入的 this 指针进行访问,这里直接封装成一个接口,顺便进行调用
回调函数对象 callBack() — 位于 ThreadPool 类
func_t callBack(T &task)
{_func(task);
}
线程回调函数 threadRoutine() — 位于 ThreadPool 类
// 提供给线程的回调函数
static void *threadRoutine(void *args)
{// ...task(); // 执行任务ptr->callBack(task); // 回调函数}
}
做完上述准备后,我们就可以开始测试运行结果了,当然了,其实我们一直会遇到打印问题,这是因为屏幕也是临界资源,而我们却没有加以保护
v2版本
「封装版」:引入自己封装实现的线程库 Thread.hpp,支持对线程做出更多操作
之前写的线程池代码不够优雅,所能展现的线程相关信息太少了,为此可以选择引入之前封装实现的 Thread.hpp
#pragma once#include <iostream>
#include <string>
#include <pthread.h>enum class Status
{NEW = 0, // 新建RUNNING, // 运行中EXIT // 已退出
};// 参数、返回值为 void 的函数类型
typedef void (*func_t)(void*);class Thread
{
public:Thread(int num = 0, func_t func = nullptr, void* args = nullptr):_tid(0), _status(Status::NEW), _func(func), _args(args){// 根据编号写入名字char name[128];snprintf(name, sizeof name, "thread-%d", num);_name = name;}~Thread(){}// 获取 IDpthread_t getTID() const{return _tid;}// 获取线程名std::string getName() const{return _name;}// 获取状态Status getStatus() const{return _status;}// 回调方法static void* runHelper(void* args){Thread* myThis = static_cast<Thread*>(args);// 很简单,回调用户传进来的 func 函数即可myThis->_func(myThis->_args);}// 启动线程void run(){int ret = pthread_create(&_tid, nullptr, runHelper, this);if(ret != 0){std::cerr << "create thread fail!" << std::endl;exit(1); // 创建线程失败,直接退出}_status = Status::RUNNING; // 更改状态为 运行中}// 线程等待void join(){int ret = pthread_join(_tid, nullptr);if(ret != 0){std::cerr << "thread join fail!" << std::endl;exit(1); // 等待失败,直接退出}_status = Status::EXIT; // 更改状态为 退出}private:pthread_t _tid; // 线程 IDstd::string _name; // 线程名Status _status; // 线程状态func_t _func; // 线程回调函数void* _args; // 传递给回调函数的参数
};
不再直接使用原生线程库,转而使用自己封装的线程库
创建 ThreadPool.hpp 头文件
#define THREAD_NUM 10template<class T>
class ThreadPool
{using func_t = std::function<void(T&)>; // 包装器public:ThreadPool(func_t func, int num = THREAD_NUM):_num(num), _func(func){// 初始化互斥锁和条件变量pthread_mutex_init(&_mtx, nullptr);pthread_cond_init(&_cond, nullptr);}~ThreadPool(){// 等待线程退出for(auto &t : _threads)t.join();// 互斥锁、条件变量pthread_mutex_destroy(&_mtx);pthread_cond_destroy(&_cond);}void init(){// 创建一批线程for(int i = 0; i < _num; i++)_threads.push_back(Thread(i, threadRoutine, this));}void start(){// 启动线程for(auto &t : _threads)t.run();}// 提供给线程的回调函数(已修改返回类型为 void)static void threadRoutine(void *args){// 避免等待线程,直接剥离pthread_detach(pthread_self());auto ptr = static_cast<ThreadPool<T>*>(args);while (true){// 任务队列是临界资源,需要保护ptr->lockQueue();// 等待条件满足while(ptr->isEmpty())ptr->threadWait();T task = ptr->popTask();ptr->unlockQueue();task();ptr->callBack(task); // 回调函数}}// 装载任务void pushTask(const T& task){// 本质上就是在生产商品,需要加锁保护lockQueue();_tasks.push(task);// 唤醒消费者进行消费threadWakeUp();unlockQueue();}protected:void lockQueue(){pthread_mutex_lock(&_mtx);}void unlockQueue(){pthread_mutex_unlock(&_mtx);}void threadWait(){pthread_cond_wait(&_cond, &_mtx);}void threadWakeUp(){pthread_cond_signal(&_cond);}bool isEmpty(){return _tasks.empty();}T popTask(){T task = _tasks.front();_tasks.pop();return task;}func_t callBack(T &task){_func(task);}private:std::vector<Thread> _threads;std::queue<T> _tasks; // 利用 STL 自动扩容的特性,无需担心容量pthread_mutex_t _mtx;pthread_cond_t _cond;func_t _func;
};
涉及修改的内容:
- _threads 类型由 vector<pthread_t> 变为 vector
- init() 函数用于创建线程,注册线程信息
- start() 函数用于启动线程
- ~ThreadPool() 中新增等待线程退出
- 线程回调函数 threadRoutinue() 返回值改为 void
- 新增函数对象 _func
总结
麻了,好难,特别是那个条件变量,真的需要多看几次,哎~