当前位置: 首页 > news >正文

【Linux】线程池模拟

目录

一. 什么是线程池

二. 线程池模拟

1. Mutex 接口封装

2. Cond 接口分装

3. Thread 线程封装

4. Task 任务模拟

5. Log 日志模拟

6. ThreadPool 线程池模拟

7. 主函数运行


一. 什么是线程池

线程池是线程的一种管理模式,它会预先创建好线程放在容器中待命,当有任务时会分配线程完成任务,而不是当有任务时再创建新的线程。执行完任务后会重新回到线程池中进行等待。

一个简单的线程池需要包含任务队列,线程管理,任务处理,日志打印等。

二. 线程池模拟

1. Mutex 接口封装

我们首先对 pthread_mutex 接口进行类封装,需要包含 上锁解锁动态锁销毁锁的获取等。

紧接着我们再将锁进行 LockGuard 再次封装,我们就无需进行开锁解锁操作,类将帮助我们自动完成。

#pragma once
#include <iostream>
#include <pthread.h>class Mutex
{
public:Mutex(){pthread_mutex_init(&_mutex, nullptr);}void Lock(){int n = pthread_mutex_lock(&_mutex);(void)n;}void Unlock(){int n = pthread_mutex_unlock(&_mutex);(void)n;}~Mutex(){pthread_mutex_destroy(&_mutex);}pthread_mutex_t *Get(){return &_mutex;}private:pthread_mutex_t _mutex;
};class LockGuard
{
public:LockGuard(Mutex &mutex) : _mutex(mutex){_mutex.Lock();}~LockGuard(){_mutex.Unlock();}private:Mutex &_mutex;
};

此处的 Get 函数是为了解决存在需要指针传递的参数问题而做的接口


2. Cond 接口分装

线程条件变量封装接口主要有,条件变量初始化,条件等待,单线程唤醒,所有线程唤醒,动态销毁等

#pragma once#include <iostream>
#include <pthread.h>
#include "Mutex.hpp"class Cond
{
public:Cond(){pthread_cond_init(&_cond, nullptr);}void Wait(Mutex &mutex){int n = pthread_cond_wait(&_cond, mutex.Get());(void)n;}void Signal(){// 唤醒在条件变量下等待的一个线程int n = pthread_cond_signal(&_cond);(void)n;}void Broadcast(){// 唤醒所有在条件变量下等待的线程int n = pthread_cond_broadcast(&_cond);(void)n;}~Cond(){pthread_cond_destroy(&_cond);}private:pthread_cond_t _cond;
};

3. Thread 线程封装

首先一个线程需要包含它自身的 名字,线程 tid,执行函数,是否运行,是否分离等。

有了这些成员变量,我们需要实现相关函数接口,线程开始停止等待分离。

不能直接使用外部函数直接传递给 pthread_create ,因为其包含了 this 指针,不符合 pthread 库本来的要求。

由于线程的接口是不包含this指针的,所以我们需要在类内部使用 静态成员函数解决函数传递问题。但是静态成员函数无法调用内部成员变量。所有我们将 args 对象传递给 self 指针,让 self 指针完成成员间接访问。用 static 内部函数将外部的 func 函数变为了内部静态没有this指针的函数。

#ifndef _THREAD_H_
#define _THREAD_H_#include <iostream>
#include <string>
#include <pthread.h>
#include <cstdio>
#include <cstring>
#include <functional>
#include "Log.hpp"static uint32_t number = 1; // bugclass Thread
{using func_t = std::function<void()>; 
private:void EnableDetach(){_isdetach = true;}void EnableRunning(){_isrunning = true;}static void *Routine(void *args) // 属于类内的成员函数,默认包含this指针!{Thread *self = static_cast<Thread *>(args);self->EnableRunning();if (self->_isdetach)self->Detach();pthread_setname_np(self->_tid, self->_name.c_str());self->_func(); // 回调处理return nullptr;}// bug
public:Thread(func_t func): _tid(0),_isdetach(false),_isrunning(false),res(nullptr),_func(func){_name = "thread-" + to_string(number++);}void Detach(){if (_isdetach)return;if (_isrunning)pthread_detach(_tid);EnableDetach();}std::string Name(){return _name;}bool Start(){if (_isrunning)return false;int n = pthread_create(&_tid, nullptr, Routine, this);if (n != 0){return false;}else{return true;}}bool Stop(){if (_isrunning){int n = pthread_cancel(_tid);if (n != 0){return false;}else{_isrunning = false;return true;}}return false;}void Join(){if (_isdetach){return;}int n = pthread_join(_tid, &res);if (n != 0){LOG(DEBUG) << "Join线程失败";}else{LOG(DEBUG) << "Join线程成功";}}~Thread(){}private:pthread_t _tid;string _name;bool _isdetach;bool _isrunning;void *res;//func_t _func;//执行功能
};#endif

4. Task 任务模拟

创建一个任务模拟

#pragma once
#include <iostream>
#include <unistd.h>
#include <functional>
#include "Log.hpp"using task_t = std::function<void()>;void Download()
{LOG(DEBUG) << "我是一个下载任务...";
}class Task
{
public:Task(){}Task(int x, int y):_x(x), _y(y){}void Execute(){_result = _x + _y;}int X() { return _x; }int Y() { return _y; }int Result(){return _result;}
private:int _x;int _y;int _result;
};

5. Log 日志模拟

日志需要包含日期,线程id,行号,文件名,状态,以及打印内容。

#ifndef __LOG_HPP__
#define __LOG_HPP__#include <iostream>
#include <unistd.h>
#include <string>
#include <filesystem>
#include <cstdio>
#include <memory>
#include <ctime>
#include <sstream>
#include <fstream>
#include "Mutex.hpp"
using namespace std;const string gresp = "\r\n";class LogStrategy
{
public:~LogStrategy() {}virtual void SyncLog(const string &message) = 0;
};class ConsoleLogStrategy : public LogStrategy
{
public:ConsoleLogStrategy(){}void SyncLog(const string &messages){LockGuard lockguard(_mutex);cout << messages << gresp;}~ConsoleLogStrategy(){}private:Mutex _mutex;
};const string defaultpath = "./log";
const string defaultfile = "my.log";class FileLogStrategy : public LogStrategy
{
public:FileLogStrategy(const string &path = defaultpath, const string &file = defaultfile): _path(path), _file(file){LockGuard lockguard(_mutex);if (filesystem::exists(_path)){return;}try{filesystem::create_directories(_path);}catch (const filesystem::filesystem_error &e){cerr << e.what() << endl;}}void SyncLog(const string &messages){LockGuard lockguard(_mutex);string filename = _path + (_path.back() == '/' ? "" : "/") + _file;ofstream out(filename, ios::app);if (!out.is_open()){return;}out << messages << gresp;out.close();}private:string _path;string _file;Mutex _mutex;
};enum LogLevel
{DEBUG,INFO,WARNING,FATAL,ERROR
};
string Levelstr(LogLevel level)
{switch (level){case LogLevel::DEBUG:return "DEBUG";case LogLevel::INFO:return "INFO";case LogLevel::WARNING:return "WARNING";case LogLevel::FATAL:return "FATAL";case LogLevel::ERROR:return "ERROR";default:return "UNKNOWN";}
}
string GettimeStamp()
{time_t curr = time(nullptr);struct tm curr_m;localtime_r(&curr, &curr_m);char buffer[128];snprintf(buffer, sizeof(buffer), "%4d-%02d-%02d %02d-%02d-%02d",curr_m.tm_year + 1900,curr_m.tm_mon + 1,curr_m.tm_mday,curr_m.tm_hour,curr_m.tm_min,curr_m.tm_sec);return buffer;
}class Logger
{
public:Logger(){EnableConsoleLogStrategy();}void EnableFileLogStrategy(){_fflush_strategy = make_unique<FileLogStrategy>();}void EnableConsoleLogStrategy(){_fflush_strategy = make_unique<ConsoleLogStrategy>();}class LogMessage{public:LogMessage(LogLevel &level, string &src_name, int line_num, Logger &logger): _curr_time(GettimeStamp()), _line_num(line_num), _pid(getpid()), _src_name(src_name), _level(level), _log(logger){stringstream ss;ss << "[" << _curr_time << "]"<< "[" << Levelstr(_level) << "]"<< "[" << _pid << "]"<< "[" << _src_name << "]"<< "[" << _line_num << "]"<< "-";_log_info = ss.str();}template <typename T>LogMessage &operator<<(const T &info){stringstream ss;ss << info;_log_info += ss.str();return *this;}~LogMessage(){if (_log._fflush_strategy){_log._fflush_strategy->SyncLog(_log_info);}}private:string _curr_time;int _line_num;pid_t _pid;string _src_name;string _log_info;LogLevel _level;Logger &_log;};LogMessage operator()(LogLevel level, string name, int line){return LogMessage(level, name, line, *this);}~Logger(){}private:unique_ptr<LogStrategy> _fflush_strategy;
};Logger logger;#define LOG(level) logger(level, __FILE__, __LINE__)
#define Enable_Console_Log_Strategy() logger.EnableConsoleLogStrategy()
#define Enable_File_Log_Strategy() logger.EnableFileLogStrategy()#endif

需要注意,我们重载运算符(),是为了实现 LOG()的方式调用日志,这样更加优雅,所以此处重载运算符返回一个临时对象传给 Logger 。

6. ThreadPool 线程池模拟

线程池当中,我们需要包含存储线程的容器 _threads,线程的个数,管理任务的任务队列,条件变量和锁,运行状态,线程休眠个数,指向线程池的单例指针和锁。

为了避免线程池被随意的创建,容易导致资源浪费,所以我们采用 GetInstance 的接口,保证每次只有一个线程池可使用。

用静态指针 inc 指向 GetInstance ,实现了单例的唯一性。套两层锁,可以降低系统开锁解锁的消耗,若已经拥有 线程池 ,那么就不进入,若当前没有线程池,进入内部上锁,此处保证了内部是没有锁的,若多个线程进入就会同时争夺锁资源,所以再进行一次判断。紧接着进行线程池初始化,向 线程容器 vector 内部插入线程,此处用 lambda 表达式执行 HandlerTask 函数,从任务队列中取函数并执行。Thread 初始化会会将 lambda 表达式中的函数执行当做参数传递给 pthread_init 。

#pragma once
#include "Thread.hpp"
#include "Log.hpp"
#include "Cond.hpp"
#include "Mutex.hpp"
#include <iostream>
#include <string>
#include <vector>
#include <queue>
using namespace std;static const int num = 5;
template <typename T>
class ThreadPool
{
private:void WakeUpAllThread(){LockGuard lockguard(_mutex);if (_sleepnum)_cond.Broadcast();LOG(INFO) << "唤醒所有线程";}void WakeUpOne(){_cond.Signal();LOG(INFO) << "唤醒一个线程";}ThreadPool(int defaultnum = num): _num(defaultnum), _isrunning(false), _sleepnum(0){for (int i = 0; i < _num; i++){_threads.emplace_back([this](){HandlerTask();});}}void Start()//{if (_isrunning)return;_isrunning = true;for (auto e : _threads){e.Start();//LOG(INFO) << "start new thread success" << e.Name();}}ThreadPool(const ThreadPool<T> &) = delete;ThreadPool<T> &operator=(const ThreadPool<T> &) = delete;public:static ThreadPool<T> *GetInstance(){if (inc == nullptr){LockGuard lockguard(_lock);if (inc == nullptr){LOG(DEBUG) << "首次使用单例";inc = new ThreadPool<T>();//这块!!!!!!!!!!!1inc->Start();//}}return inc;}void Stop(){if (!_isrunning)return;_isrunning = false;WakeUpAllThread();}void Join(){for (auto &e : _threads){e.Join();}}void HandlerTask()//执行任务队列{char name[128];pthread_getname_np(pthread_self(), name, sizeof(name));while (true){T t;{LockGuard lockguard(_mutex);while (_taskq.empty() && _isrunning){_sleepnum++;_cond.Wait(_mutex);_sleepnum--;}if (!_isrunning && _taskq.empty()){LOG(INFO) << name << "退出了,线程池退出,任务队列为空";break;}t = _taskq.front();_taskq.pop();}t();//取任务队列,执行函数}}bool Enqueue(const T &in){if (_isrunning){LockGuard lockguard(_mutex);_taskq.push(in);if (_threads.size() == _sleepnum){WakeUpOne();}return true;}return false;}~ThreadPool(){}private:vector<Thread> _threads;queue<T> _taskq;int _num;Cond _cond;Mutex _mutex;bool _isrunning;int _sleepnum;static ThreadPool<T> *inc;static Mutex _lock;
};
template <typename T>
ThreadPool<T> *ThreadPool<T>::inc = nullptr;template <typename T>
Mutex ThreadPool<T>::_lock;

7. 主函数运行

#include "Log.hpp"
#include "ThreadPool.hpp"
#include "Task.hpp"
#include <memory>int main()
{Enable_Console_Log_Strategy();int count = 10;while (count){sleep(1);ThreadPool<task_t>::GetInstance()->Enqueue(Download);count--;}ThreadPool<task_t>::GetInstance()->Stop();ThreadPool<task_t>::GetInstance()->Join();// Enable_File_Log_Strategy();return 0;
}

运行结果:


感谢大家支持


文章转载自:

http://iRDkye1J.jrqbr.cn
http://3NDvnbeN.jrqbr.cn
http://j1A5Raoi.jrqbr.cn
http://QDyQ0WaH.jrqbr.cn
http://KVLU6z5d.jrqbr.cn
http://tPwRdM5N.jrqbr.cn
http://d6RZGvUb.jrqbr.cn
http://XtOsCil7.jrqbr.cn
http://451bvDL8.jrqbr.cn
http://LYDpc1d5.jrqbr.cn
http://QzaANQ8U.jrqbr.cn
http://qi1SaYaD.jrqbr.cn
http://kg2pJRH4.jrqbr.cn
http://0HbTZbM1.jrqbr.cn
http://JIG2kxrY.jrqbr.cn
http://ZOk7mwOJ.jrqbr.cn
http://doD5qoSD.jrqbr.cn
http://Y4246S0W.jrqbr.cn
http://MAOmKD8u.jrqbr.cn
http://aFXy805K.jrqbr.cn
http://XSQhQ8AU.jrqbr.cn
http://UuIbj0mV.jrqbr.cn
http://BDMlgDhb.jrqbr.cn
http://N4YVkE4g.jrqbr.cn
http://ctYsD1kQ.jrqbr.cn
http://nSQIzUn6.jrqbr.cn
http://uLYJIrcW.jrqbr.cn
http://c4j0PJ1I.jrqbr.cn
http://YyvvXTa6.jrqbr.cn
http://GAJptbSl.jrqbr.cn
http://www.dtcms.com/a/383312.html

相关文章:

  • TensorRT 10.13.3: Limitations
  • RK3568编写自启动脚本
  • AI 伦理争议背后:算法偏见如何产生?又该如何规避?
  • C++ 中使用 iterator 中注意事项和优化技巧(2)
  • 【MySQL|第八篇】事务与索引
  • OD C卷 - 小明找位置
  • JavaScript与jQuery:从入门到面试的完整指南
  • 最长上升子序列(LIS)全解析:从基础到进阶(基础讲解篇)
  • 海盗王64位dx9客户端修改篇之七
  • 【c++进阶系列】:map和set的模拟实现(附模拟实现的源码)
  • Redis的RedLock
  • AutoGen——自定义Agent
  • 第5节-连接表-Natural-Join
  • CentOS Docker 环境下安装 HertzBeat 并配置 VictoriaMetrics 时序数据库指南
  • 【Linux】 存储分级的秘密
  • GitAgent-面壁智能联合清华大学发布的大模型智能体应用框架
  • 《基于国产Linux的机房终端安全重构方案》
  • JavaWeb-Servlet总结及JSP
  • 《黑神话:悟空》Xbox版本性能模式画质分析
  • 支持向量机:从理论到实践
  • 软件体系结构——发展脉络
  • 【C++】队列queue的使用
  • 对网络通信领域的“活化石”与“瑞士军刀”—— `telnet`
  • 迭代器和生成器的区别与联系
  • 如何解决 pip install 安装报错 ModuleNotFoundError: No module named ‘numpy’ 问题
  • ffplay数据结构分析
  • 我爱学算法之—— 位运算(上)
  • LeetCode 分类刷题:2187. 完成旅途的最少时间
  • Redis持久化之AOF:日志记录的艺术,数据安全保障详解
  • 应急响应-事件处理学习大纲(1)