Linux之线程池
一、线程池
1. 线程池的认识
线程池是一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能,而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程时的代价。线程池不仅可以保证内核的充分利用,还能防止过分调度。
线程池的应用场景:
需要大量的线程完成任务,且完成任务的时间比较短。比如WEB服务器完成网页请求这样的任务。
对性能要求苛刻的应用,比如:要求服务器迅速响应客户请求。
2. 线程池的实现
ThreadPool.hpp
#pragma once
#include <iostream>
#include <vector>
#include <queue>
#include <string>
#include <memory>
#include <time.h>
#include <unistd.h>
#include "Mutex.hpp"
#include "Cond.hpp"
#include "Thread.hpp"
#include "Logger.hpp"
#include "Task.hpp"const int default_thread_num = 5;template <typename T>
class ThreadPool
{
private:bool QueueIsEmpty(){return _q.empty();}void Routine(const std::string &name){while (true){T t;{// 把公共资源获取到线程私有LockGuard lockguard(&_lock);while (QueueIsEmpty() && _is_running){// 队列为空,线程需要去等待_wait_thread_num++;_cond.Wait(_lock);_wait_thread_num--;}if (!_is_running && QueueIsEmpty()){LOG(LoggerLevel::INFO) << " 线程池退出 && 任务队列为空, " << name << " 退出";break;}// 一定有任务// 获取任务,消耗历史任务t = _q.front();_q.pop();}t(); // 执行任务,不需要在临界区执行,临界区保护的是类内的资源LOG(LoggerLevel::DEBUG) << name << " handler task, " << t.ResultToString();}}public:ThreadPool(int thread_num = default_thread_num): _thread_num(thread_num), _is_running(false), _wait_thread_num(0){for (int i = 0; i < _thread_num; ++i){std::string name = "thread-" + std::to_string(i + 1);_threads.emplace_back([this](const std::string &_name){ this->Routine(_name); }, name);// Thread t([this]() {// this->hello();// }, name);// _threads.push_back(std::move(t));}LOG(LoggerLevel::INFO) << "ThreadPool obj create success";}void Start(){if (_is_running){return;}_is_running = true;for (auto &t : _threads){t.Start();}}// 核心思想:让线程走正常的唤醒逻辑退出// 线程池要退出// 1.如果被唤醒 && 任务队列里没有任务 = 线程退出// 2.如果被唤醒 && 任务队列里有任务 = 线程不能立即退出,应该让线程把任务处理完,在退出// 3.如果线程本身没有被休眠,我们应该让它把能处理的任务全部处理完成,在退出// 3 || 2 --> 1void Stop(){if (!_is_running)return;_is_running = false;if (_wait_thread_num > 0)_cond.NotifyAll();// if (!_is_running)// return;// _is_running = false;// for (auto &t : _threads)// {// t.Stop();// }LOG(LoggerLevel::INFO) << "thread pool stop success";}void Wait(){for (auto &t : _threads){t.Join();}LOG(LoggerLevel::INFO) << "thread pool wait success";}void EnQueue(const T &t){//线程池退出,不要让用户继续添加任务if(!_is_running)return;{LockGuard lockguard(&_lock);_q.push(t);if (_wait_thread_num > 0)_cond.NotifyOne();}}~ThreadPool(){}private:std::queue<T> _q;std::vector<Thread> _threads;int _thread_num;Mutex _lock;Cond _cond;bool _is_running;int _wait_thread_num;
};
Task.hpp
#pragma once
#include<iostream>
#include<sstream>
#include<functional>
#include<unistd.h>
class Task
{
public:Task(){}Task(int x, int y):a(x), b(y){}void Excute(){result = a + b;}void operator()(){// sleep(1);Excute();}std::string ResultToString(){std::stringstream ss;ss << a << " + " << b << " = " << result;return ss.str();}// void Print()// {// std::cout << a << " + " << b << " = " << result << std::endl;// }
private:int a;int b;int result;
};// using func_t = std::function<void()>;// void PrintLog()
// {
// std::cout << "hello 我是一个日志任务" << std::endl;
// }
Mutex.hpp
#pragma once
#include<iostream>
#include<mutex>
#include<pthread.h>class Mutex
{
public:Mutex(){pthread_mutex_init(&_lock, nullptr);}void Lock(){pthread_mutex_lock(&_lock);}pthread_mutex_t* Get(){return &_lock;}void Unlock(){pthread_mutex_unlock(&_lock);}~Mutex(){pthread_mutex_destroy(&_lock);}
private:pthread_mutex_t _lock;
};class LockGuard
{
public:LockGuard(Mutex* _mutex):_mutexp(_mutex){_mutexp->Lock();}~LockGuard(){_mutexp->Unlock();}
private:Mutex* _mutexp;
};
Cond.hpp
#include"Mutex.hpp"class Cond
{
public:Cond(){pthread_cond_init(&_cond, nullptr);}void Wait(Mutex& lock){pthread_cond_wait(&_cond, lock.Get());}void NotifyOne(){pthread_cond_signal(&_cond);}void NotifyAll(){pthread_cond_broadcast(&_cond);}~Cond(){pthread_cond_destroy(&_cond);}
private:pthread_cond_t _cond;
};
Thread.hpp
#ifndef __THREAD_HPP__
#define __THREAD_HPP__#include<iostream>
#include<vector>
#include<string>
#include<functional>
#include<pthread.h>
#include<unistd.h>
#include<sys/syscall.h>
#include"Logger.hpp"#define get_lwp_id() syscall(SYS_gettid)using func_t = std::function<void(const std::string& name)>;
std::string thread_name_default = "None_Name";class Thread
{
public:Thread(func_t func, std::string name = thread_name_default):_isrunning(false),_name(name),_func(func){}static void* start_routine(void* args){Thread* self = static_cast<Thread*>(args);self->_isrunning = true;self->_lwpid = get_lwp_id();self->_func(self->_name);pthread_exit((void*)0);}void Start(){int n = pthread_create(&_tid, nullptr, start_routine, this);if(n == 0){LOG(LoggerLevel::INFO) << "pthread_create success";}}void Stop(){int n = pthread_cancel(_tid);LOG(LoggerLevel::INFO) << "thread cancel success";(void)n;}void Join(){if(!_isrunning)return;int n = pthread_join(_tid, nullptr);if(n == 0){LOG(LoggerLevel::INFO) << "pthread_join success";}}~Thread(){}
private:bool _isrunning;pthread_t _tid;pid_t _lwpid;std::string _name;func_t _func;
};#endif
Logger.hpp
#pragma once
#include <iostream>
#include <filesystem>
#include <fstream>
#include <string>
#include <sstream>
#include <memory>
#include <unistd.h>
#include "Mutex.hpp"enum class LoggerLevel
{DEBUG,INFO,WARNING,ERROR,FATAL
};std::string LoggerLevelToString(LoggerLevel level)
{switch (level){case LoggerLevel::DEBUG:return "Debug";case LoggerLevel::INFO:return "Info";case LoggerLevel::WARNING:return "Warning";case LoggerLevel::ERROR:return "Error";case LoggerLevel::FATAL:return "Fatal";default:return "Unknown";}
}std::string GetCurrentTime()
{// 获取时间戳time_t timep = time(nullptr);// 把时间戳转化为时间格式struct tm currtm;localtime_r(&timep, &currtm);// 转化为字符串char buffer[64];snprintf(buffer, sizeof(buffer), "%4d-%02d-%02d %02d-%02d-%02d",currtm.tm_year + 1900, currtm.tm_mon + 1, currtm.tm_mday,currtm.tm_hour, currtm.tm_min, currtm.tm_sec);return buffer;
}class LogStrategy
{
public:virtual ~LogStrategy() = default;virtual void SyncLog(const std::string &logmessage) = 0;
};// 显示器刷新
class ConsoleLogStrategy : public LogStrategy
{
public:~ConsoleLogStrategy(){}virtual void SyncLog(const std::string &logmessage) override{{LockGuard lockguard(&_lock);std::cout << logmessage << std::endl;}}private:Mutex _lock;
};const std::string default_dir_path_name = "log";
const std::string default_filename = "test.log";
// 文件刷新
class FileLogStrategy : public LogStrategy
{
public:FileLogStrategy(const std::string dir_path_name = default_dir_path_name,const std::string filename = default_filename): _dir_path_name(dir_path_name), _filename(filename){if (std::filesystem::exists(_dir_path_name)){return;}try{std::filesystem::create_directories(_dir_path_name);}catch (const std::filesystem::filesystem_error &e){std::cerr << e.what() << "\r\n";}}~FileLogStrategy(){}virtual void SyncLog(const std::string &logmessage) override{{LockGuard lock(&_lock);std::string target = _dir_path_name;target += '/';target += _filename;std::ofstream out(target.c_str(), std::ios::app);if (!out.is_open()){return;}out << logmessage << "\n";out.close();}}private:std::string _dir_path_name;std::string _filename;Mutex _lock;
};class Logger
{
public:Logger(){}void EnableConsoleStrategy(){_strategy = std::make_unique<ConsoleLogStrategy>();}void EnableFileStrategy(){_strategy = std::make_unique<FileLogStrategy>();}class LogMessage{public:LogMessage(LoggerLevel level, std::string filename, int line, Logger& logger): _curr_time(GetCurrentTime()), _level(level), _pid(getpid()), _filename(filename), _line(line), _logger(logger){std::stringstream ss;ss << "[" << _curr_time << "] "<< "[" << LoggerLevelToString(_level) << "] "<< "[" << _pid << "] "<< "[" << _filename << "] "<< "[" << _line << "]"<< " - ";_loginfo = ss.str();}template <typename T>LogMessage &operator<<(const T &info){std::stringstream ss;ss << info;_loginfo += ss.str();return *this;}~LogMessage(){if (_logger._strategy){_logger._strategy->SyncLog(_loginfo);}}private:std::string _curr_time; // 时间戳LoggerLevel _level; // 日志等级pid_t _pid; // 进程pidstd::string _filename; // 文件名int _line; // 行号std::string _loginfo; // 一条合并完成的,完整的日志信息Logger &_logger; // 提供刷新策略的具体做法};LogMessage operator()(LoggerLevel level, std::string filename, int line){return LogMessage(level, filename, line, *this);}~Logger(){}private:std::unique_ptr<LogStrategy> _strategy;
};Logger logger;#define LOG(level) logger(level, __FILE__, __LINE__)
#define EnableConsoleStrategy() logger.EnableConsoleStrategy()
#define EnableFileStrategy() logger.EnableFileStrategy()
main.cc
#include"ThreadPool.hpp"int main()
{srand((unsigned int)time(nullptr));EnableConsoleStrategy();std::unique_ptr<ThreadPool<Task>> tq = std::make_unique<ThreadPool<Task>>(10);tq->Start();// sleep(5);int cnt = 10;while(cnt--){int x = rand() % 10 + 1;int y = rand() % 5 + 1;Task t(x, y);tq->EnQueue(t);sleep(1);}tq->Stop();tq->Wait();return 0;
}
Makefile
threadpool:main.ccg++ -o $@ $^ -std=c++17 -lpthread
.PHONY:clean
clean:rm -f threadpool
3. 单例模式
什么是单例模式呢?
某些类,只能具有一个对象(实例),就称之为单例
。
在很多的服务器开发场景中,经常需要让服务器加载很多的数据(上百G)到内存中,此时往往需要一个单例的类来管理这些数据
。
实现单例的两种方式:饿汉方式和懒汉方式
。
所谓饿汉方式就是吃完饭,立刻洗碗,这就是饿汉方式,因为下一顿吃的时候就可以立即拿着碗吃饭。
懒汉方式就是吃完饭,把碗放下,下次吃的时候再洗碗,这就是懒汉。
饿汉方式实现单例模式
template <typename T>
class Singleton
{static T data;
public:static T* GetInstance() {return &data;}
};
该类里面有一个静态成员,静态成员被编译在全局区,程序被编译时就已经有了虚拟地址,在逻辑上可以认为已经有了对象。
懒汉实现方式单例模式
template <typename T>
class Singleton
{static T* inst;
public:static T* GetInstance() {if (inst == NULL) {inst = new T();}return inst;}
};
这个类里面有一个静态指针,在编译时也有虚拟地址,但是是静态指针的地址,并不是成员的虚拟地址,而是当调用GetInstance函数时,才会动态开辟空间
。这就是懒汉模式。
现在,我们就来将刚才的线程池进行改造,设计出一个单例模式。
这个就是单例线程池。但是,这个安全吗?是不安全的,如果是多个线程调用 GetInstance,就有可能创建出多份对象,这是不安全的。
所以,我们应该加一把锁,保证多线程调用时也是安全的
。
4. 线程安全和重入问题
线程安全:多个线程在访问共享资源时,能够正确的执行,不会相互干扰或破坏彼此的执行结果
。
重入:同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其它的执行流再次进入,称之为重入
。
到了现在,我们能够知道重入分为两种情况:
多线程重入函数
信号导致一个执行流重复进入函数
常见的线程不安全的情况:
不保护共享变量的函数
函数状态随着被调用,状态发生变化的函数
返回指向静态变量指针的函数
调用线程不安全函数的函数
常见线程安全的情况:
每个线程对全局变量和静态变量只有读取权限,而没有写入权限,一般来说这些线程是安全的
。
类或者接口对于线程来说都是原子性操作
。
多个线程之间的切换不会导致该接口的执行结果存在二义性
。
常见不可重入的情况:
调用了 malloc,free函数,因为 malloc函数底层是用全局链表来管理堆的
。
调用标准I/O库函数,标准I/O库的很多实现都以不可重入的方式使用全局数据结构
。
可重入函数体内使用了静态的数据结构
。
常见可重入的情况:
不使用全局变量和静态变量
不使用 malloc 和new开辟出来的空间
不调用不可重入函数
不返回静态数据和全局数据,所有数据都由函数的调用者提供
使用本地数据,或者通过制作全局数据的本地拷贝来保护全局数据
可重入与线程安全的联系:函数是可重入的,那就是线程安全的
。
可重入与线程安全的区别:线程安全不一定是可重入的,而可重入函数一定是线程安全的
。
比如说,主线程(只有一个线程)执行了信号捕捉方法,可因为信号的到来,也执行了信号捕捉方法,如果这个方法内部是加了锁的,那么当第一次进入函数内部申请锁之后,又因为信号的到来再一次申请锁,那不就把主线程挂载起来了吗!本来主线程就没有释放锁,又再一次申请锁,自己把自己挂载起来,不就出问题了,这不就造成死锁问题了吗!
二、常见锁概念
1. 死锁
什么是死锁呢?
死锁是指在一组进程中的各个进程均占有不会释放的资源,但因互相申请被其它进程所占用不会释放的资源而处于一种永久等待的状态
。
比如,现在有两个线程A,B,任何一个线程要访问临界资源,都必须同时申请到两把锁。线程A申请到了锁1,线程B申请到了锁2,现在线程A要申请锁2,线程B要申请锁1,这时候就会出现死锁。因为线程A,线程B分别申请了锁1和锁2,但是没有释放自己的锁,所以再一次申请对方的锁就会被阻塞,如果线程A,线程B不释放自己的锁,那么就会一直被阻塞,循环等待。
2. 形成死锁的四个必要条件
.
互斥条件:一个资源每次只能被一个执行流使用
。
.
请求与保持条件:一个执行流因请求资源而被阻塞时,对已获得的资源保持不放
。
比如:线程A申请到了锁1,还想申请锁2,线程B将锁2申请走,还想申请锁1。
.
不剥夺条件:一个执行流已获得的资源,在未使用完之前,不能被强行剥夺
。
比如:线程A申请到了锁,访问临界资源,线程B此刻申请锁就会被阻塞,不会申请成功,OS不能直接把锁给线程B。
.
循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系
。
形成死锁必须满足这四个全部条件。
3. 如何避免死锁
那么,形成死锁的条件我们已经知道了,该如何避免死锁呢?
只要破坏形成死锁的四个必要条件中的任何一个条件即可
。
.
破坏循环等待条件问题:资源一次性分配,使用超时机制,加锁顺序一致
。
.
避免锁未释放的场景
。