Linux POSIX信号量与线程池
一.条件变量的封装
为了后续我们使用方便以及锻炼封装思维,我们对原生的条件变量进行封装。
1.Cond.hpp
条件变量要求我们传入一把锁,目的是在线程被阻塞等待进入队列时释放当前锁,交给其他线程竞争。
#pragma once#include <iostream>
#include <pthread.h>
#include "Mutex.hpp"using namespace MutexModule;namespace CondModule
{class Cond{public:Cond(){pthread_cond_init(&_cond, nullptr);}void Wait(Mutex &mutex){int n = pthread_cond_wait(&_cond, mutex.Get());(void)n;}void Signal(){// 唤醒在条件变量下等待的一个线程int n = pthread_cond_signal(&_cond);(void)n;}void Broadcast(){// 唤醒所有在条件变量下等待的线程int n = pthread_cond_broadcast(&_cond);(void)n;}~Cond(){pthread_cond_destroy(&_cond);}private:pthread_cond_t _cond;};
};二.POSIX信号量
1.回顾信号量
信号量:本质是一个计数器,用来表明临界资源的数量有多少,本质是对特定资源的预定机制
一共会有两个使用资源的场景:
1.将目标资源整体使用——mutex+二元信号量
2.将目标资源按不同的块分批使用——信号量
对于条件变量,就是把阻塞队列当作整体的量使用,所以需要加锁(互斥)。
所有的线程都需要看到信号量,那么信号量就是临界资源,因此对信号量的操作(PV)一定是原子的。
2.基于环形队列的生产者消费者模型
1.环形队列基本思想
数据结构层面上的环形队列,我们已经很熟悉。本节我们将使用环形队列实现生产者消费者模型,体会由底层实现带来的不同特性。
首先我们要完善对环形队列的基本认识:
队列空时头尾指针位置相同,队列满时位置也相同
那么自然就会引出一个问题:既然队列空与队列满指针情况相同,如何解决这个问题?
方案1:加计数器count
方案2:空一个位置,比如容量为10,存储9各数据。此时头尾指针相同时为空;判断tail的下一个位置是否为head,如果是就满,不再放数据。

实际实现对环形队列的封装:底层使用vector(不过这里会规定长度让它不扩容),并通过模运算实现环形队列的“回绕”效果。
环形队列容量:N
head++,head%=N
tail++,tail%=N
那么在生产者消费者模型中,这个环形队列如何工作?
生产者入队尾,消费者出队头。
假如生产者和消费者同时来,初始状态时它们会指向同一个空间,此时让生产者生产。
1.队列空,则生产者先运行
2.队列满,则消费者先运行
3.生产者不能把消费者套圈
4.消费者不能超过生产者
根据它奇特的工作方式,我们不妨思考:
1.只要生产者和消费者不同时访问同一个位置,他们就能同时运行!
2.什么时候生产者和消费者会在同一位置?队列空或满!
3.队列为空:只能互斥,生产者先同步运行;
队列为满:只能互斥,消费者先同步运行。
上面环形队列的工作方式,由谁保障?信号量可以解决。假如队列为满,此时生产者的空位置资源sem_blank为0,申请信号量就会失败,只允许消费者进行消费,同理队列为空。
对于生产者,它关注资源和空位置,而对于消费者,它只关心数据。
那么对于这个模型,我们就可以初步构建出以下设定:
生产者相关:空位置资源——sem_blank=N,初始进入队列位置int p_step=0
消费者相关:数据资源——sem_data=N,初始进入队列位置 int c_step=0
2.POSIX信号量接口使用
POSIX信号量和SystemV信号量作⽤相同,都是⽤于同步操作,达到⽆冲突的访问共享资源⽬的。但POSIX可以⽤于线程间同步。
1.初始化信号量
NAMEsem_init - initialize an unnamed semaphoreSYNOPSIS#include <semaphore.h>int sem_init(sem_t *sem, int pshared, unsigned int value);Link with -pthread.pshared:0表⽰线程间共享,⾮零表⽰进程间共享
value:信号量初始值
2.销毁信号量
NAMEsem_destroy - destroy an unnamed semaphoreSYNOPSIS#include <semaphore.h>int sem_destroy(sem_t *sem);Link with -pthread.
3.等待信号量:即P操作,代表当前线程拿到资源,令信号量内计数器- -
NAMEsem_wait, sem_timedwait, sem_trywait - lock a semaphoreSYNOPSIS#include <semaphore.h>int sem_wait(sem_t *sem);
4.发布信号量:即V操作,表示当前线程已使用完资源,令信号量内计数器++
NAMEsem_post - unlock a semaphoreSYNOPSIS#include <semaphore.h>int sem_post(sem_t *sem);Link with -pthread.
3.封装信号量
#include <iostream>
#include <semaphore.h>
#include <pthread.h>namespace SemModule
{const int defaultvalue = 1;class Sem{public:Sem(unsigned int sem_value = defaultvalue){sem_init(&_sem, 0, sem_value);}void P(){int n = sem_wait(&_sem); // 原子的(void)n;}void V(){int n = sem_post(&_sem); // 原子的}~Sem(){sem_destroy(&_sem);}private:sem_t _sem;};
}4.基于环形队列的生产者消费者模型
RingQueue.hpp
#pragma once#include <iostream>
#include <vector>
#include "Sem.hpp"
#include "Mutex.hpp"static const int gcap = 5; // for debugusing namespace SemModule;
using namespace MutexModule;template <typename T>
class RingQueue
{
public:RingQueue(int cap = gcap): _cap(cap),_rq(cap),_blank_sem(cap),_p_step(0),_data_sem(0),_c_step(0){}void Equeue(const T &in){// 生产者// 1. 申请信号量,空位置信号量_blank_sem.P();{LockGuard lockguard(_pmutex);// 2. 生产_rq[_p_step] = in;// 3. 更新下标++_p_step;// 4. 维持环形特性_p_step %= _cap;}_data_sem.V();}void Pop(T *out){// 消费者// 1. 申请信号量,数据信号量_data_sem.P();{LockGuard lockguard(_cmutex);// 2. 消费*out = _rq[_c_step];// 3. 更新下标++_c_step;// 4. 维持环形特性_c_step %= _cap;}_blank_sem.V();}private:std::vector<T> _rq;int _cap;// 生产者Sem _blank_sem; // 空位置int _p_step;// 消费者Sem _data_sem; // 数据int _c_step;// 维护多生产,多消费, 2把锁Mutex _cmutex;Mutex _pmutex;
};特别注意:对于多生产者和多消费者的改良中,为了实现生产者之间和消费者之间的互斥,我们需要对生产方法和消费方法加锁。
问题1:应该如何加锁?

从原理上讲,这两种加锁方式都是可行的。但是我们采用先获取信号量再加锁的方式,因为:多生产者获取信号量和加锁的过程就类似于:我们要去电影院看电影,是先竞争入大门的钥匙再买票,还是大家都进入大门后,再排队买票的区别。显然后者效率更高,因为即使竞争获得了锁,也只能由当前线程获取一个信号量;不如先让线程把信号量瓜分完,再决定由谁进行生产活动。
问题2:为什么信号量实现的生产者消费者,需要循环判断临界资源是否满足,而环形队列不用?
因为:信号量是用来描述资源数量的,申请信号量失败,就代表临界资源不满足!信号量把对临界资源是否存在等条件以原子性的方式呈现在访问临界资源之前就判断!
tips:
如果资源可拆分,就考虑用信号量sem
如果资源是整体使用,就考虑用锁mutex。
例如环形队列,sem_blank=1,此时队列只有一个元素,就退化成了阻塞队列
二元信号量对一个资源进行互相通知
一个2消费者,3生产者的用例:
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include "RingQueue.hpp"struct threaddata
{RingQueue<int> *rq;std::string name;
};void *consumer(void *args)
{threaddata *td = static_cast<threaddata*>(args);while (true){sleep(3);// 1. 消费任务int t = 0;td->rq->Pop(&t);// 2. 处理任务 -- 处理任务的时候,这个任务,已经被拿到线程的上下文中了,不属于队列了std::cout << td->name << " 消费者拿到了一个数据: " << t << std::endl;// t();}
}int data = 1;void *productor(void *args)
{threaddata *td = static_cast<threaddata*>(args);while (true){sleep(1);// sleep(2);// 1. 获得任务// std::cout << "生产了一个任务: " << x << "+" << y << "=?" << std::endl;std::cout << td->name << " 生产了一个任务: " << data << std::endl;// 2. 生产任务td->rq->Equeue(data);data++;}
}int main()
{// 申请阻塞队列RingQueue<int> *rq = new RingQueue<int>();// 构建生产和消费者// 如果我们改成多生产多消费呢??// 单单: cc, pp -> 互斥关系不需要维护,互斥与同步// 多多:cc, pp -> 之间的互斥关系!pthread_t c[2], p[3];threaddata *td = new threaddata();td->name = "cthread-1";td->rq = rq;pthread_create(c, nullptr, consumer, td);threaddata *td2 = new threaddata();td2->name = "cthread-2";td2->rq = rq;pthread_create(c + 1, nullptr, consumer, td2);threaddata *td3 = new threaddata();td3->name = "pthread-3";td3->rq = rq;pthread_create(p, nullptr, productor, td3);threaddata *td4 = new threaddata();td4->name = "pthread-4";td4->rq = rq;pthread_create(p + 1, nullptr, productor, td4);threaddata *td5 = new threaddata();td5->name = "pthread-5";td5->rq = rq;pthread_create(p + 2, nullptr, productor, td5);pthread_join(c[0], nullptr);pthread_join(c[1], nullptr);pthread_join(p[0], nullptr);pthread_join(p[1], nullptr);pthread_join(p[2], nullptr);return 0;
}三.线程池
对于线程池,我们这里仅做最简单的任务生产和执行,以及打印日志的功能。因此我们在实现线程池前,需要完成以下前置工作:
1.线程封装
2.锁的封装和条件变量封装
3.引入日志,对线程进行封装
池化技术:为了提高效率,可以参考预制菜的原理
申请内存,需要向OS要内存,就要调用系统调用。而系统调用是有成本的。内存池可以减少系统调用的次数从而提高效率。而线程池,也是一种池化技术。
1.基于策略模式的日志Log.hpp
首先我们来完成对日志相关代码的编写。
日志:程序的日志,在关键的结点输出信息,方便程序员对代码debug。
我们期望的日志输出风格:
时间——日志等级——进程pid——打印的消息日志对应的文件名与行号——消息内容
[2025-10-22 14:27:26] [DEBUG] [202399] [main.cc] [19] - hello world日志具体实现的功能:
1.形成完整日志
2.刷新日志到目标文件——基于策略模式
1.基类LogStrategy
class LogStrategy {
public:virtual void SyncLog(const std::string &message) = 0;
};纯虚函数:定义日志刷新的统一接口,由子类继承实现具体刷新策略
策略模式:允许动态切换日志输出方式
2.具体策略实现
ConsoleLogStrategy - 控制台输出:
void SyncLog(const std::string &message) override {LockGuard lockguard(_mutex); // 线程安全std::cout << message << gsep;
}FileLogStrategy - 文件输出:
FileLogStrategy(const std::string &path = defaultpath, const std::string &file = defaultfile) {// 自动创建目录std::filesystem::create_directories(_path);
}3.Logger类
函数调用运算符重载
LogMessage operator()(LogLevel level, std::string name, int line) {return LogMessage(level, name, line, *this);
}语法细节:
operator():使Logger对象可以像函数一样被调用返回值优化:返回临时对象,编译器会进行RVO优化
参数传递:接受日志级别、文件名、行号
4.LogMessage内部类
这个类是Logger的内部类,具体实现了一条日志的拼接,并利用析构函数进行刷新。
构造函数
传入一条日志的具体信息,并通过stringstream拼接成字符串_loginfo
LogMessage(LogLevel &level, std::string &src_name, int line_number, Logger &logger): _curr_time(GetTimeStamp()), // 初始化时间戳_level(level), // 日志级别_pid(getpid()), // 进程ID_src_name(src_name), // 源文件名_line_number(line_number), // 行号_logger(logger) // 引用外部Logger
{// 构建日志前缀格式std::stringstream ss;ss << "[" << _curr_time << "] "<< "[" << Level2Str(_level) << "] "<< "[" << _pid << "] "<< "[" << _src_name << "] "<< "[" << _line_number << "] "<< "- ";_loginfo = ss.str();
}流插入运算符重载
template <typename T>
LogMessage &operator<<(const T &info) {std::stringstream ss;ss << info;_loginfo += ss.str();return *this; // 返回引用支持链式调用
}关键语法细节:
模板函数:
template <typename T>支持任意类型参数返回引用:
LogMessage &支持链式调用:log << a << b << c类型安全:通过
stringstream自动类型转换const引用参数:避免不必要的拷贝
析构函数 - 自动刷新机制
这是一种典型的RAII设计,在日志对象析构时通过刷新到文件保证其持久化,是一种简洁优雅的设计。
~LogMessage() {if (_logger._fflush_strategy) {_logger._fflush_strategy->SyncLog(_loginfo);}
}5.完整日志Log实现
#ifndef __LOG_HPP__
#define __LOG_HPP__#include <iostream>
#include <cstdio>
#include <string>
#include <filesystem> //C++17
#include <sstream>
#include <fstream>
#include <memory>
#include <ctime>
#include <unistd.h>
#include "Mutex.hpp"namespace LogModule
{using namespace MutexModule;const std::string gsep = "\r\n";// 策略模式,C++多态特性// 2. 刷新策略 a: 显示器打印 b:向指定的文件写入// 刷新策略基类class LogStrategy{public:~LogStrategy() = default;virtual void SyncLog(const std::string &message) = 0;};// 显示器打印日志的策略 : 子类class ConsoleLogStrategy : public LogStrategy{public:ConsoleLogStrategy(){}void SyncLog(const std::string &message) override{LockGuard lockguard(_mutex);std::cout << message << gsep;}~ConsoleLogStrategy(){}private:Mutex _mutex;};// 文件打印日志的策略 : 子类const std::string defaultpath = "./log";const std::string defaultfile = "my.log";class FileLogStrategy : public LogStrategy{public:FileLogStrategy(const std::string &path = defaultpath, const std::string &file = defaultfile): _path(path),_file(file){LockGuard lockguard(_mutex);if (std::filesystem::exists(_path)){return;}try{std::filesystem::create_directories(_path);}catch (const std::filesystem::filesystem_error &e){std::cerr << e.what() << '\n';}}void SyncLog(const std::string &message) override{LockGuard lockguard(_mutex);std::string filename = _path + (_path.back() == '/' ? "" : "/") + _file; // "./log/" + "my.log"std::ofstream out(filename, std::ios::app); // 追加写入的 方式打开if (!out.is_open()){return;}out << message << gsep;out.close();}~FileLogStrategy(){}private:std::string _path; // 日志文件所在路径std::string _file; // 日志文件本身Mutex _mutex;};// 形成一条完整的日志&&根据上面的策略,选择不同的刷新方式// 1. 形成日志等级enum class LogLevel{DEBUG,INFO,WARNING,ERROR,FATAL};std::string Level2Str(LogLevel level){switch (level){case LogLevel::DEBUG:return "DEBUG";case LogLevel::INFO:return "INFO";case LogLevel::WARNING:return "WARNING";case LogLevel::ERROR:return "ERROR";case LogLevel::FATAL:return "FATAL";default:return "UNKNOWN";}}std::string GetTimeStamp(){time_t curr = time(nullptr);struct tm curr_tm;localtime_r(&curr, &curr_tm);char timebuffer[128];snprintf(timebuffer, sizeof(timebuffer),"%4d-%02d-%02d %02d:%02d:%02d",curr_tm.tm_year+1900,curr_tm.tm_mon+1,curr_tm.tm_mday,curr_tm.tm_hour,curr_tm.tm_min,curr_tm.tm_sec);return timebuffer;}// 1. 形成日志 && 2. 根据不同的策略,完成刷新class Logger{public:Logger(){EnableConsoleLogStrategy();}void EnableFileLogStrategy(){_fflush_strategy = std::make_unique<FileLogStrategy>();}void EnableConsoleLogStrategy(){_fflush_strategy = std::make_unique<ConsoleLogStrategy>();}// 表示的是未来的一条日志class LogMessage{public:LogMessage(LogLevel &level, std::string &src_name, int line_number, Logger &logger): _curr_time(GetTimeStamp()),_level(level),_pid(getpid()),_src_name(src_name),_line_number(line_number),_logger(logger){// 日志的左边部分,合并起来std::stringstream ss;ss << "[" << _curr_time << "] "<< "[" << Level2Str(_level) << "] "<< "[" << _pid << "] "<< "[" << _src_name << "] "<< "[" << _line_number << "] "<< "- ";_loginfo = ss.str();}// LogMessage() << "hell world" << "XXXX" << 3.14 << 1234template <typename T>LogMessage &operator<<(const T &info){// a = b = c =d;// 日志的右半部分,可变的std::stringstream ss;ss << info;_loginfo += ss.str();return *this;}~LogMessage(){if (_logger._fflush_strategy){_logger._fflush_strategy->SyncLog(_loginfo);}}private:std::string _curr_time;LogLevel _level;pid_t _pid;std::string _src_name;int _line_number;std::string _loginfo; // 合并之后,一条完整的信息Logger &_logger;};// 这里故意写成返回临时对象LogMessage operator()(LogLevel level, std::string name, int line){return LogMessage(level, name, line, *this);}~Logger(){}private:std::unique_ptr<LogStrategy> _fflush_strategy;};// 全局日志对象Logger logger;// 使用宏,简化用户操作,获取文件名和行号#define LOG(level) logger(level, __FILE__, __LINE__)#define Enable_Console_Log_Strategy() logger.EnableConsoleLogStrategy()#define Enable_File_Log_Strategy() logger.EnableFileLogStrategy()
}#endif6.一些细节
1.LogMessage中的Logger对象
从上面的代码我们可以看出,Logger对象主要用于策略模式的转换。在LogMessage的析构函数中我们要把日志进行刷新:
~LogMessage() {if (_logger._fflush_strategy) {_logger._fflush_strategy->SyncLog(_loginfo);}
}在使用时我们可以:灵活切换策略进行日志输出
// 场景1:默认控制台输出
Logger logger;
LOG(LogLevel::INFO) << "This goes to console";// 场景2:运行时切换到文件输出
logger.EnableFileLogStrategy();
LOG(LogLevel::ERROR) << "This goes to file"; // 这里LogMessage通过引用访问到新的策略// 场景3:再切换回控制台
logger.EnableConsoleLogStrategy();
LOG(LogLevel::DEBUG) << "Back to console";为什么需要引用而不是值:
动态策略切换:用户可能在运行时调用
EnableFileLogStrategy()或EnableConsoleLogStrategy()来切换策略多态调用:通过基类指针调用
SyncLog(),实现运行时多态避免拷贝:
Logger对象可能包含复杂状态,引用避免不必要的拷贝
2.运算符重载的作用
重载():能够让我们把Logger类像函数一样调用,传参时更优雅。
LogMessage operator()(LogLevel level, std::string name, int line) {return LogMessage(level, name, line, *this);
}示例:
// 传统方式(不优雅)
Logger::LogMessage msg = logger.createMessage(level, file, line);// 运算符重载方式(优雅)
logger(level, file, line);重载流插入<<:重载后的stringstream允许任意类型的输出,并且返回值允许了链式调用。
template <typename T>
LogMessage &operator<<(const T &info) {std::stringstream ss;ss << info;_loginfo += ss.str();return *this;
}示例:
LOG(LogLevel::INFO) << "Hello" << 123 << "World";// 步骤1:operator() 创建LogMessage对象
LogMessage temp = logger(LogLevel::INFO, "file.cpp", 42);// 步骤2:operator<< 链式构建内容
temp.operator<<("Hello").operator<<(123).operator<<("World");// 步骤3:临时对象析构,自动输出
// ~LogMessage() -> 调用策略输出完整日志3.宏定义简化调用
#define LOG(level) logger(level, __FILE__, __LINE__)
#define Enable_Console_Log_Strategy() logger.EnableConsoleLogStrategy()预定义宏:
__FILE__,__LINE__自动获取文件名和行号使用简便:用户只需关注日志级别和内容
4.时间戳生成(转化为年月日的形式)
std::string GetTimeStamp() {time_t curr = time(nullptr);struct tm curr_tm;localtime_r(&curr, &curr_tm); // 线程安全版本char timebuffer[128];snprintf(timebuffer, sizeof(timebuffer), "%4d-%02d-%02d %02d:%02d:%02d",curr_tm.tm_year+1900, curr_tm.tm_mon+1, ...);return timebuffer;
}2.单例模式
某些类之应该有一个对象,这些类就称之为单例。
1.饿汉方式实现单例模式
假如一个人吃完饭立马就洗碗,这种就是饿汉方式,因为下一顿要吃的时候可以立即拿着碗吃。 对于一个类来说,它定义static的成员。static成员属于整个类而不是某个对象,为所有对象共享,因此这些成员在创建类时就会直接被创建出来。
template <typename T>
class Singleton {static T data;
public:static T* GetInstance() {
return &data;}
}2.懒汉方式实现单例模式
假如一个人吃完饭,先把碗放下,直到下次吃饭时用到这个碗才会洗。对于一个类来说,定义的是static类型的指针成员,在使用这个成员时才会创建空间,本质是一种延迟加载。
template <typename T>
class Singleton {static T* inst;
public:static T* GetInstance() {if (inst == NULL) {inst = new T();
}return inst;}
}在操作系统中,懒汉方式最为常用。因此在实现线程池时,我们使用懒汉式单例模式实现。
3.单例模式实现的线程池
1.核心线程管理成员
std::vector<Thread> _threads; // 存储所有工作线程
int _num; // 线程池中线程的数量
bool _isrunning; // 线程池运行状态标志
int _sleepernum; // 当前休眠的线程数量逻辑作用:
_threads容器管理所有工作线程的生命周期_num控制线程池的规模,通过构造函数参数配置_isrunning是线程池的"总开关",控制整个线程池的启停_sleepernum是关键的性能指标,用于智能唤醒机制
2.任务队列与同步成员
std::queue<T> _taskq; // 任务队列
Cond _cond; // 条件变量
Mutex _mutex; // 互斥锁,保护共享资源逻辑作用:
_taskq作为生产者-消费者模式中的缓冲区_cond和_mutex配合实现线程间的高效同步
3.单例模式核心成员
static ThreadPool<T> *inc; // 单例指针
static Mutex _lock; // 静态互斥锁,保护单例创建逻辑作用:
inc保存唯一的单例实例_lock确保多线程环境下单例创建的线程安全
4.单例获取函数
static ThreadPool<T> *GetInstance()
{if (inc == nullptr) // 第一次检查:避免不必要的锁竞争{LockGuard lockguard(_lock); // 加锁保护LOG(LogLevel::DEBUG) << "获取单例....";if (inc == nullptr) // 第二次检查:确保单例只创建一次{LOG(LogLevel::DEBUG) << "首次使用单例, 创建之....";inc = new ThreadPool<T>(); // 创建单例inc->Start(); // 启动线程池}}return inc;
}单例模式的精髓——双重检查锁定模式:
第一次检查(无锁):如果实例已存在,直接返回,避免锁开销
加锁:确保多线程环境下的线程安全
第二次检查(有锁):防止多个线程同时通过第一次检查后重复创建
5.任务处理核心接口
void HandlerTask()
{char name[128];pthread_getname_np(pthread_self(), name, sizeof(name));while (true){T t;{LockGuard lockguard(_mutex);// 等待条件:队列为空且线程池在运行while (_taskq.empty() && _isrunning){_sleepernum++; // 增加休眠计数_cond.Wait(_mutex); // 等待条件变量_sleepernum--; // 减少休眠计数}// 退出条件:线程池停止且队列为空if (!_isrunning && _taskq.empty()){LOG(LogLevel::INFO) << name << " 退出了, 线程池退出&&任务队列为空";break;}// 获取任务t = _taskq.front();_taskq.pop();} // 锁作用域结束,释放锁t(); // 在锁外执行任务,提高并发性}
}关键设计:
锁粒度控制:只在访问共享数据时加锁,任务执行在锁外
条件变量使用:避免忙等待,节省CPU资源
优雅退出:确保所有任务完成后线程才退出
6.任务入队与线程唤醒
bool Enqueue(const T &in)
{if (_isrunning){LockGuard lockguard(_mutex);_taskq.push(in);if (_threads.size() == _sleepernum) // 智能唤醒条件WakeUpOne();return true;}return false;
}智能唤醒机制:
只有当所有线程都在休眠时才唤醒一个线程
避免不必要的唤醒操作,减少线程上下文切换
7.一些细节
对于单例模式在这个线程池中的体现:
1.资源的全局唯一性:确保系统资源(线程)不被重复创建,避免资源竞争和浪费。
// 整个应用程序中,对于同一类型T,只有一个线程池实例
ThreadPool<MyTaskType>* pool = ThreadPool<MyTaskType>::GetInstance();2.延迟初始化:懒汉模式的典型特点
// 只有在第一次调用GetInstance()时才创建线程池
inc = new ThreadPool<T>(); // 按需创建
inc->Start(); // 按需启动避免程序启动时就创建所有资源,提高启动速度。
3.线程安全创建
static Mutex _lock; // 静态锁,所有模板实例共享?4.封装与访问控制
private:ThreadPool(int num = gnum); // 构造函数私有化ThreadPool(const ThreadPool<T> &) = delete; // 禁止拷贝ThreadPool<T> &operator=(const ThreadPool<T> &) = delete; // 禁止赋值通过访问控制,强制使用者通过统一的接口获取实例。
8.完整代码实现
#pragma once
#include <iostream>
#include <string>
#include <vector>
#include <queue>
#include "Log.hpp"
#include "Thread.hpp"
#include "Cond.hpp"
#include "Mutex.hpp"// .hpp header onlynamespace ThreadPoolModule
{using namespace ThreadModlue;using namespace LogModule;using namespace CondModule;using namespace MutexModule;static const int gnum = 5;template <typename T>class ThreadPool{private:void WakeUpAllThread(){LockGuard lockguard(_mutex);if (_sleepernum)_cond.Broadcast();LOG(LogLevel::INFO) << "唤醒所有的休眠线程";}void WakeUpOne(){_cond.Signal();LOG(LogLevel::INFO) << "唤醒一个休眠线程";}ThreadPool(int num = gnum) : _num(num), _isrunning(false), _sleepernum(0){for (int i = 0; i < num; i++){_threads.emplace_back([this](){HandlerTask();});}}void Start(){if (_isrunning)return;_isrunning = true;for (auto &thread : _threads){thread.Start();LOG(LogLevel::INFO) << "start new thread success: " << thread.Name();}}ThreadPool(const ThreadPool<T> &) = delete;ThreadPool<T> &operator=(const ThreadPool<T> &) = delete;public:static ThreadPool<T> *GetInstance(){if (inc == nullptr){LockGuard lockguard(_lock);LOG(LogLevel::DEBUG) << "获取单例....";if (inc == nullptr){LOG(LogLevel::DEBUG) << "首次使用单例, 创建之....";inc = new ThreadPool<T>();inc->Start();}}return inc;}void Stop(){if (!_isrunning)return;_isrunning = false;// 唤醒所有的线层WakeUpAllThread();}void Join(){for (auto &thread : _threads){thread.Join();}}void HandlerTask(){char name[128];pthread_getname_np(pthread_self(), name, sizeof(name));while (true){T t;{LockGuard lockguard(_mutex);// 1. a.队列为空 b. 线程池没有退出while (_taskq.empty() && _isrunning){_sleepernum++;_cond.Wait(_mutex);_sleepernum--;}// 2. 内部的线程被唤醒if (!_isrunning && _taskq.empty()){LOG(LogLevel::INFO) << name << " 退出了, 线程池退出&&任务队列为空";break;}// 一定有任务t = _taskq.front(); // 从q中获取任务,任务已经是线程私有的了!!!_taskq.pop();}t(); // 处理任务,需/要在临界区内部处理吗?1 0}}bool Enqueue(const T &in){if (_isrunning){LockGuard lockguard(_mutex);_taskq.push(in);if (_threads.size() == _sleepernum)WakeUpOne();return true;}return false;}~ThreadPool(){}private:std::vector<Thread> _threads;int _num; // 线程池中,线程的个数std::queue<T> _taskq;Cond _cond;Mutex _mutex;bool _isrunning;int _sleepernum;// bug??static ThreadPool<T> *inc; // 单例指针static Mutex _lock;};template <typename T>ThreadPool<T> *ThreadPool<T>::inc = nullptr;template <typename T>Mutex ThreadPool<T>::_lock;}四.线程安全
1.线程安全与重入
线程安全:就是多个线程在访问共享资源时,能够正确地执⾏,不会相互⼲扰或破坏彼此的执⾏结果。⼀般⽽⾔,多个线程并发同⼀段只有局部变量的代码时,不会出现不同的结果。但是对全局变量或者静态变量进⾏操作,并且没有锁保护的情况下,容易出现该问题。
重⼊:同⼀个函数被不同的执⾏流调⽤,当前⼀个流程还没有执⾏完,就有其他的执⾏流再次进⼊,我们称之为重⼊。⼀个函数在重⼊的情况下,运⾏结果不会出现任何不同或者任何问题,则该函数被称为可重⼊函数,否则,是不可重⼊函数。
到目前为之,我们可以将重入分为两种情况:
1.多线程重入函数(比如上面的HandlerTask)
2.信号导致的单个线程重复进入函数
1.常见的线程不安全
• 不保护共享变量的函数
• 函数状态随着被调⽤,状态发⽣变化的函数 • 返回指向静态变量指针的函数• 调⽤线程不安全函数的函数
2.常见的线程安全
• 每个线程对全局变量或者静态变量只有读取的权限,⽽没有写⼊的权限,⼀般来说这些
线程是安全的 • 类或者接⼝对于线程来说都是原⼦操作 • 多个线程之间的切换不会导致该接⼝的执⾏结果存在⼆义性
注:线程安全并不代表可重入。例如上面线程池中的单例获取函数:若一个单线程接收到异常中断切换至内核处理信号,结果信号的处理结果又让这个进程重入这个函数,而此时进程是拿着锁被中断的,再重入时无法申请到锁,就引发了死锁问题。
static ThreadPool<T> *GetInstance()
{if (inc == nullptr) // 第一次检查:避免不必要的锁竞争{LockGuard lockguard(_lock); // 加锁保护LOG(LogLevel::DEBUG) << "获取单例....";if (inc == nullptr) // 第二次检查:确保单例只创建一次{LOG(LogLevel::DEBUG) << "首次使用单例, 创建之....";inc = new ThreadPool<T>(); // 创建单例inc->Start(); // 启动线程池}}return inc;
}3.常见不可重入
• 调⽤了malloc/free函数,因为malloc函数是⽤全局链表来管理堆的
• 调⽤了标准I/O库函数,标准I/O库的很多实现都以不可重⼊的⽅式使⽤全局数据结构• 可重⼊函数体内使⽤了静态的数据结构
4.常见可重入
• 不使⽤全局变量或静态变量
• 不使⽤⽤malloc或者new开辟出的空间
• 不调⽤不可重⼊函数
• 不返回静态或全局数据,所有数据都有函数的调⽤者提供 • 使⽤本地数据,或者通过制作全局数据的本地拷⻉来保护全局数据
实际上,线程安全和重入是一体两面的关系。如果函数可重入,那么一定线程安全;如果一个函数不可重入,那么他就会引发线程安全问题,需要引入同步互斥机制。
• 函数是可重⼊的,那就是线程安全的
• 函数是不可重⼊的,那就不能由多个线程使⽤,有可能引发线程安全问题• 如果⼀个函数中有全局变量,那么这个函数既不是线程安全也不是可重⼊的。
可重⼊函数是线程安全函数的⼀种
线程安全不⼀定是可重⼊的,⽽可重⼊函数则⼀定是线程安全的。 如果将对临界资源的访问加上锁,则这个函数是线程安全的,但如果这个重⼊函数若锁还未释放则会产⽣死锁,因此是不可重⼊的。
2.常见的锁概念
1.死锁
• 死锁是指在⼀组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程所占⽤不会
释放的资源⽽处于的⼀种永久等待状态。
• 为了⽅便表述,假设现在线程A,线程B必须同时持有锁1和锁2,才能进⾏后续资源的访问

申请一把锁是原子的,但申请第二把就不一定了。

造成的结果如下:

2.产生死锁的四个必要条件
1.互斥条件:只有在进行资源的互斥时才会出现死锁问题。
2.请求与保持条件:一个执行流因请求资源而阻塞,并不释放自己已经持有的资源。

3.不剥夺条件:一个执行流已获得的资源,在未使用完之前,不能强行剥夺——非抢占资源。

4.循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系。

3.STL与智能指针的线程安全
1.STL的容器是否线程安全?
原因是, STL 的设计初衷是将性能挖掘到极致, ⽽⼀旦涉及到加锁保证线程安全, 会对性能造成巨⼤的影响.
⽽且对于不同的容器, 加锁⽅式的不同, 性能可能也不同(例如hash表的锁表和锁桶).因此 STL 默认不是线程安全. 如果需要在多线程环境下使⽤, 往往需要调⽤者⾃⾏保证线程安全.
2.智能指针是否线程安全?
对于 unique_ptr, 由于只是在当前代码块范围内⽣效, 因此不涉及线程安全问题.
对于 shared_ptr, 多个对象需要共⽤⼀个引⽤计数变量, 所以会存在线程安全问题. 但是标准库实现的时候考虑到了这个问题, 基于原⼦操作(CAS)的⽅式保证 shared_ptr 能够⾼效, 原⼦的操作引⽤计数.
