linux(sem信号量 + 线程池)
信号量 + 线程池
- 1 POSIX信号量
- 1.1 基于环形队列的⽣产消费模型
- 1.2 基于信号量的单生产者单消费者模型demo代码(环形队列)
- 1.3 重新理解信号量,重新理解互斥锁
- 1.4 基于信号量的多生产者多消费者模型demo代码(环形队列)
- 2 日志
- 2.1 ⽇志认识
- 3 线程池
- 3.1 有关线程池
- 3.2 demo代码
- 3.3 线程安全的单例模式
- 3.3.1 懒汉模式demo代码
- 3.3.2 饿汉模式demo代码
- 4 线程安全
- 5 常见锁概念
- 5.1 死锁
- 5.2 死锁四个必要条件
- 5.3 避免死锁
- 6 STL,智能指针和线程安全
- 7 加餐
1 POSIX信号量
(1)POSIX信号量和SystemV信号量作⽤相同,都是⽤于同步操作,达到⽆冲突的访问共享资源⽬的。但POSIX可以⽤于线程间同步。
(2)锁其实就是之前学的信号量的一个子集,也就是将资源整体使用,信号量其实就是资源的数目
1.1 基于环形队列的⽣产消费模型
(1)环形队列采⽤数组模拟,⽤模运算来模拟环状特性
(2)环形结构起始状态和结束状态都是⼀样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留⼀个空的位置,作为满的状态
(3)我们现在有信号量这个计数器,就很简单的进⾏多线程间的同步过程。
(4)P( - - 操作)、V(++操作)
(5)单生产者单消费者模型只需要两个信号量(1、sem_t data = 0 2、sem_t space = N)
1.2 基于信号量的单生产者单消费者模型demo代码(环形队列)
(1)makefile
(2)Sem.hpp
(3)RingBuffer.hpp
#pragma once#include <iostream>
#include <vector>
#include <semaphore.h>
#include "Sem.hpp"
#include <unistd.h>namespace RingBufferMoudle
{using namespace SemMoudle;template <typename T>class RingBuffer{public:RingBuffer(int cap): _ring(cap), _cap(cap), _p_step(0),_c_step(0), _datasem(0), _spacesem(cap){}void Equeue(const T &in) // 生产者{_spacesem.P(); // 申请空间_ring[_p_step] = in;_p_step++; // 不需要我们额外的判断满没满,因为sem信号量本身就是资源的数目_p_step %= _cap; // 维持环形属性_datasem.V(); // 增加资源}void Pop(T *out) // 消费者{_datasem.P(); // 减少一个数据*out = _ring[_c_step];_c_step++;_c_step %= _cap;_spacesem.V(); // 增加一个空间}~RingBuffer(){}private:std::vector<T> _ring; // 环形队列int _cap; // 容量int _p_step; // 生产者位置int _c_step; // 消费者位置Sem _datasem; // 数据信号量Sem _spacesem; // 空间信号量};
}
(4)Main.cc
1.3 重新理解信号量,重新理解互斥锁
(1)为什么上面的代码不需要判断?
信号量本身就是表示资源数目的,只要成功就一定有资源
(2)之前的代码需要判断是因为我们把资源当做整体使用
1.4 基于信号量的多生产者多消费者模型demo代码(环形队列)
(1)需要几把锁? 2把
从321原则中的3(生产和消费、生产和生产、消费和消费),生产和消费我们已经完成了同步与互斥,我们现在需要新增生产和生产,消费和消费之间的互斥关系,这里需要两把锁
(2)我们如何完成多生产和多消费
这里我们采取竞争法,也就是消费者之间竞争出一个来消费,生产者之间竞争出一个来生产
RingBuffer.hpp
#pragma once#include <iostream>
#include <vector>
#include <semaphore.h>
#include "Sem.hpp"
#include <unistd.h>
#include <pthread.h>
#include "mutex.hpp"namespace RingBufferMoudle
{using namespace SemMoudle;using namespace LockModule;template <typename T>class RingBuffer{public:RingBuffer(int cap): _ring(cap), _cap(cap), _p_step(0),_c_step(0), _datasem(0), _spacesem(cap){}void Equeue(const T &in) // 生产者{// pthread_mutex_lock(&_p_lock);_spacesem.P(); // 申请空间// 先锁在信号量和先信号量后锁是有区别的// 先锁的话只有一个线程能够拿到锁,也只有一个线程能够信号量// 先信号量后锁的话所有线程都能拿到信号量,但只有一个线程能够拿到锁(进行生产/消费){LockGuard lockguard(_p_lock);_ring[_p_step] = in;_p_step++; // 不需要我们额外的判断满没满,因为sem信号量本身就是资源的数目_p_step %= _cap; // 维持环形属性}_datasem.V(); // 增加资源// pthread_mutex_unlock(&_p_lock);}void Pop(T *out) // 消费者{// pthread_mutex_lock(&_c_lock);_datasem.P(); // 减少一个数据{LockGuard lockgurad(_c_lock);*out = _ring[_c_step];_c_step++;_c_step %= _cap;}_spacesem.V(); // 增加一个空间// pthread_mutex_unlock(&_c_lock);}~RingBuffer(){}private:std::vector<T> _ring; // 环形队列int _cap; // 容量int _p_step; // 生产者位置int _c_step; // 消费者位置Sem _datasem; // 数据信号量Sem _spacesem; // 空间信号量Mutex _p_lock; // 生产者之间的锁Mutex _c_lock; // 消费者之间的锁};
}
2 日志
2.1 ⽇志认识
计算机中的⽇志是记录系统和软件运⾏中发⽣事件的⽂件,主要作⽤是监控运⾏状态、记录异常信息,帮助快速定位问题并⽀持程序员进⾏问题修复。它是系统维护、故障排查和安全管理的重要⼯具。
⽇志格式以下⼏个指标是必须得有的
• 时间戳
• ⽇志等级
• ⽇志内容
以下⼏个指标是可选的
• ⽂件名⾏号
• 进程,线程相关id信息等
⽇志有现成的解决⽅案,如:spdlog、glog、Boost.Log、Log4cxx等等,我们依旧采⽤⾃定义⽇志的⽅式。
这⾥我们采⽤设计模式-策略模式来进⾏⽇志的设计,具体策略模式介绍,详情看代码和课程
代码如下:
log.hpp
#pragma once
#include <iostream>
#include <string>
#include <fstream>
#include <memory>
#include <ctime>
#include <sstream>
#include <filesystem> // C++17, 需要⾼版本编译器和-std=c++17
#include <unistd.h>
#include "mutex.hpp"namespace LogModule
{using namespace LockModule; // 使⽤我们⾃⼰封装的锁,也可以采⽤C++11的锁.// 默认路径和⽇志名称const std::string defaultpath = "./log/";const std::string defaultname = "log.txt";// ⽇志等级enum class LogLevel{DEBUG,INFO,WARNING,ERROR,FATAL};// ⽇志转换成为字符串std::string LogLevelToString(LogLevel level){switch (level){case LogLevel::DEBUG:return "DEBUG";case LogLevel::INFO:return "INFO";case LogLevel::WARNING:return "WARNING";case LogLevel::ERROR:return "ERROR";case LogLevel::FATAL:return "FATAL";default:return "UNKNOWN";}}std::string CurrentTime(){time_t time_stamp = ::time(nullptr); // 时间戳struct tm curr;localtime_r(&time_stamp, &curr); // 转化时间戳char buff[1024];snprintf(buff, sizeof(buff), "%4d-%02d-%02d %02d-%02d-%02d",curr.tm_year + 1900,curr.tm_mon + 1,curr.tm_mday,curr.tm_hour,curr.tm_min,curr.tm_sec);return buff;}// 策略模式,策略接⼝class LogStrategy{public:virtual ~LogStrategy() = default;// 策略的构造函数virtual void SyncLog(const std::string &message) = 0; // 不同模式核⼼是刷新⽅式的不同};// 控制台⽇志策略,就是⽇志只向显⽰器打印,⽅便我们debugclass ConsoleLogStrategy : public LogStrategy{public:void SyncLog(const std::string &message) override{LockGuard LockGuard(_mutex);std::cerr << message << std::endl;}~ConsoleLogStrategy(){// std::cout << "~ConsoleLogStrategy" << std::endl; // for debug}private:Mutex _mutex; // 显⽰器也是临界资源,保证输出线程安全};// ⽂件⽇志策略class FileLogStrategy : public LogStrategy{public:// 构造函数,建⽴出来指定的⽬录结构和⽂件结构FileLogStrategy(const std::string logpath = defaultpath, std::stringlogfilename = defaultname): _logpath(logpath), _logfilename(logfilename){LockGuard lockguard(_mutex);if (std::filesystem::exists(_logpath))return;try{std::filesystem::create_directories(_logpath);}catch (const std::filesystem::filesystem_error &e){std::cerr << e.what() << '\n';}}// 将⼀条⽇志信息写⼊到⽂件中void SyncLog(const std::string &message) override{LockGuard lockguard(_mutex);std::string log = _logpath + _logfilename;std::ofstream out(log.c_str(), std::ios::app); // 追加⽅式if (!out.is_open())return;out << message << "\n";out.close();}~FileLogStrategy(){// std::cout << "~FileLogStrategy" << std::endl; // for debug}public:std::string _logpath;std::string _logfilename;Mutex _mutex; // 保证输出线程安全,粗狂⽅式下,可以不⽤};// 具体的⽇志类class Logger{public:Logger(){// 默认使⽤显⽰器策略,如果⽤⼾⼆次指明了策略,会释放在申请,测试的时候注意析构次数UseConsoleStrategy();}~Logger(){}void UseConsoleStrategy(){_strategy = std::make_unique<ConsoleLogStrategy>();}void UseFileStrategy(){_strategy = std::make_unique<FileLogStrategy>();}// 内部类,实现RAII⻛格的⽇志格式化和刷新// 这个LogMessage,表⽰⼀条完整的⽇志对象class LogMessage{private:LogLevel _type;// ⽇志等级std::string _curr_time; // ⽇志时间pid_t _pid;// 写⼊⽇志的时间std::string _filename; // 对应的⽂件名int _line;// 对应的⽂件⾏号Logger &_logger;// 引⽤外部logger类, ⽅便使⽤策略进⾏刷新std::string _loginfo;// ⼀条合并完成的,完整的⽇志信息public:// RAII⻛格,构造的时候构建好⽇志头部信息LogMessage(LogLevel type, std::string &filename, int line, Logger &logger): _type(type),_curr_time(CurrentTime()),_pid(getpid()),_filename(filename),_line(line),_logger(logger){// stringstream不允许拷⻉,所以这⾥就当做格式化功能使⽤std::stringstream ssbuffer;ssbuffer << "[" << _curr_time << "] "<< "[" << LogLevelToString(type) << "] "<< "[" << _pid << "] "<< "[" << _filename << "] "<< "[" << _line << "]"<< " - ";_loginfo = ssbuffer.str();}// 重载 << ⽀持C++⻛格的⽇志输⼊,使⽤模版,表⽰⽀持任意类型template <typename T>LogMessage &operator<<(const T &info){std::stringstream ssbuffer;ssbuffer << info;_loginfo += ssbuffer.str();return *this; // 返回当前LogMessage对象,⽅便下次继续进⾏<<}// RAII⻛格,析构的时候进⾏⽇志持久化,采⽤指定的策略~LogMessage(){if (_logger._strategy){_logger._strategy->SyncLog(_loginfo);}// std::cout << "~LogMessage" << std::endl;}};// 故意拷⻉,形成LogMessage临时对象,后续在被<<时,会被持续引⽤,// 直到完成输⼊,才会⾃动析构临时LogMessage,⾄此也完成了⽇志的显⽰或者刷新// 同时,形成的临时对象内包含独⽴⽇志数据// 未来采⽤宏替换,进⾏⽂件名和代码⾏数的获取LogMessage operator()(LogLevel type, std::string filename, int line){return LogMessage(type, filename, line, *this);}private:std::unique_ptr<LogStrategy> _strategy; // 写⼊⽇志的策略};// 定义全局的logger对象Logger logger;
// 使⽤宏,可以进⾏代码插⼊,⽅便随时获取⽂件名和⾏号
#define LOG(type) logger(type, __FILE__, __LINE__)
// 提供选择使⽤何种⽇志策略的⽅法
#define ENABLE_CONSOLE_LOG_STRATEGY() logger.UseConsoleStrategy()
#define ENABLE_FILE_LOG_STRATEGY() logger.UseFileStrategy()
}
main.cc
3 线程池
3.1 有关线程池
3.2 demo代码
Threadpool.hpp
#pragma once#include <iostream>
#include <string>
#include "Thread.hpp"
#include "mutex.hpp"
#include "Cond.hpp"
#include "Log.hpp"
#include <memory>
#include <queue>
#include <vector>namespace ThreadPoolMoudle
{using namespace ThreadModule;using namespace LogModule;using namespace LockModule;using namespace CondMoudle;const static int defaultnum = 5; // 默认线程为5个using Thread_t = std::shared_ptr<Thread>;// 用来做测试的线程方法void DefaultTest(){while (1){LOG(LogLevel::DEBUG) << "我是一个测试方法";sleep(1);}}template <typename T>class ThreadPool{private:bool IsEmpty() { return _taskq.empty(); }void HandlerTask() // 拿取任务{while (true){// 1、拿任务T t;{LockGuard lockguard(_lock);while (IsEmpty() && _isrunning){_wait_num++;_cond.Wait(_lock);_wait_num--;}// 两种情况 1、任务队列非空 2、线程池退出了if (IsEmpty() && !_isrunning)break;t = _taskq.front();_taskq.pop();}// 2、处理任务t(); // 规定未来所有的任务处理,必须提供()方法}LOG(LogLevel::INFO) << "线程退出";}public:ThreadPool(int num = defaultnum): _num(num), _wait_num(0), _isrunning(false){for (int i = 0; i < _num; i++){_threads.push_back(std::make_shared<Thread>(std::bind(&ThreadPool::HandlerTask, this)));LOG(LogLevel::DEBUG) << "构建线程" << _threads.back()->Name();}}void Equeue(T &&in){LockGuard lockgurad(_lock);if (!_isrunning)return;_taskq.push(std::move(in));if (_wait_num > 0)_cond.Notify();}void Start(){if (_isrunning) return;_isrunning = true;//bugfor (auto &tptr : _threads){tptr->Start();LOG(LogLevel::INFO) << "启动线程" << tptr->Name();}}void Wait(){for (auto &tptr : _threads){tptr->Join();LOG(LogLevel::INFO) << "回收线程" << tptr->Name();}}void Stop(){LockGuard lockgurad(_lock);if (_isrunning){// 1、让线程自己退出(要唤醒)// 2、历史任务已经做完了// 3、不能在入任务了_isrunning = false;if (_wait_num > 0)_cond.NotifyAll(); // 让线程将历史任务处理完}}~ThreadPool() {}private:int _wait_num;int _num;std::queue<T> _taskq; // 临界资源std::vector<Thread_t> _threads; // 管理线程用vector即可Mutex _lock;Cond _cond;bool _isrunning; // 判断线程是否运行};
}
ThreadPool.cc
Task.hpp
3.3 线程安全的单例模式
某些类, 只应该具有⼀个对象(实例), 就称之为单例.
例如⼀个男⼈只能有⼀个媳妇.
在很多服务器开发场景中, 经常需要让服务器加载很多的数据 (上百G) 到内存中. 此时往往要⽤⼀个单例的类来管理这些数据
(1)吃完饭, ⽴刻洗碗, 这种就是饿汉⽅式. 因为下⼀顿吃的时候可以⽴刻拿着碗就能吃饭.
(2)吃完饭, 先把碗放下, 然后下⼀顿饭⽤到这个碗了再洗碗, 就是懒汉⽅式.
懒汉⽅式最核⼼的思想是 “延时加载”. 从⽽能够优化服务器的启动速度.
3.3.1 懒汉模式demo代码
template <typename T>
class Singleton
{static T* inst;
public:static T* GetInstance() {if (inst == NULL) {inst = new T();}return inst;}
};
存在⼀个严重的问题, 线程不安全.
第⼀次调⽤ GetInstance 的时候, 如果两个线程同时调⽤, 可能会创建出两份 T 对象的实例.
但是后续再次调⽤, 就没有问题了.
3.3.2 饿汉模式demo代码
template <typename T>
class Singleton {static T data;
public:static T* GetInstance() {return &data;}
};
只要通过 Singleton 这个包装类来使⽤ T 对象, 则⼀个进程中只有⼀个 T 对象的实例.
4 线程安全
总结:
可重入:描述函数
线程安全:描述线程
5 常见锁概念
5.1 死锁
• 死锁是指在⼀组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程所站⽤不会
释放的资源⽽处于的⼀种永久等待状态。
• 为了⽅便表述,假设现在线程A,线程B必须同时持有锁1和锁2,才能进⾏后续资源的访问
申请⼀把锁是原⼦的,但是申请两把锁就不⼀定了
造成的结果是
5.2 死锁四个必要条件
• 互斥条件:⼀个资源每次只能被⼀个执⾏流使⽤
• 请求与保持条件:⼀个执⾏流因请求资源⽽阻塞时,对已获得的资源保持不放
• 不剥夺条件:⼀个执⾏流已获得的资源,在末使⽤完之前,不能强⾏剥夺
• 循环等待条件:若⼲执⾏流之间形成⼀种头尾相接的循环等待资源的关系
5.3 避免死锁
(1)破坏死锁的四个必要条件(之一)
…破坏循环等待条件问题:资源⼀次性分配, 使⽤超时机制、加锁顺序⼀致
(2)避免锁未释放的场景