并发编程的守护者:信号量与日志策略模式解析
一、信号量
关于信号量的介绍在深入Linux内核:IPC资源管理揭秘
这篇文章当中已经做了初步的介绍了,相信大家对于信号量已经有了初步的认知了。
今天,我们就来探讨如何实现信号量。
1. 信号量的接口
//初始化信号量
//成功了,返回0,失败了,返回-1并且设置错误码
//sem初始化的信号量
//pshared设置为0,代表线程间使用
//value信号量的初始值
int sem_init(sem_t* sem, int pshared, unsigned int value);
//销毁信号量
//成功返回0,失败返回-1并且设置错误码
int sem_destroy(sem_t* sem);
//减少信号量
//成功返回0,失败返回-1并且设置错误码
int sem_wait(sem_t* sem);
//增加信号量
//成功返回0,失败返回-1并且设置错误码
int sem_post(sem_t* sem);
2. 信号量实现的一些细节问题
信号量的接口就了解到这里。我们实现的信号量是基于一个环形队列实现的(数组)
。
接下来,我们了解实现的一些细节。
队列的容量是有限的,刚开始时,队列为空,一定是生产者先运行。此时生产者和消费者访问同一个位置,生产者还没生产数据,消费者就开始消费数据,这是不行的,所以,必须等到生产者生产数据之后,消费者才可以消费数据。所以,生产者和消费者之间需要维护互斥与同步的关系。
当队列为满时,必须让消费者先运行。此时生产者,消费者又指向了同一个位置,当消费者拿取数据时,生产者是不能立即生产数据的,要不然消费者还没有获取到数据,生产者已经把数据覆盖了,不就导致数据错乱了吗!所以,这个过程不仅需要生产者和消费者互斥的获取数据,还需要同步。
当队列不为空,不为满时,生产者和消费者肯定不是指向同一个位置的,所以,生产者和消费者不就可以并发执行了。
3. 信号量的实现
Sem.hpp
#pragma once
#include<iostream>
#include<vector>
#include<unistd.h>
#include<semaphore.h>class Sem
{
public:Sem(int num):_initnum(num){sem_init(&_sem, 0, _initnum);}void P(){int n = sem_wait(&_sem);}void V(){int n = sem_post(&_sem);}~Sem(){sem_destroy(&_sem);}
private:sem_t _sem;int _initnum;
};
RingQueue.hpp
#include"Sem.hpp"int gcap = 5;
template<typename T>
class RingQueue
{
public:RingQueue(int cap = gcap):_ring_queue(cap),_cap(cap),_space_sem(cap),_data_sem(0),_c_step(0),_p_step(0){}void EnQueue(const T& in){//先申请空间信号量,对资源的一种预定机制_space_sem.P();//生产数据_ring_queue[_p_step++] = in;_p_step %= _cap;_data_sem.V();}void Pop(T* out){//先申请数据信号量_data_sem.P();//消费数据*out = _ring_queue[_c_step++];_c_step %= _cap;_space_sem.V();}~RingQueue(){}
private:std::vector<T> _ring_queue;int _cap;Sem _space_sem;Sem _data_sem;int _c_step;int _p_step;
};
main.cc
#include"RingQueue.hpp"void* consumer(void* args)
{RingQueue<int>* rq = static_cast<RingQueue<int>*>(args);while(true){int data = 0;rq->Pop(&data);std::cout << "消费者消费了一个数据" << data << std::endl;}
}void* productor(void* args)
{RingQueue<int>* rq = static_cast<RingQueue<int>*>(args);int data = 1;while(true){sleep(1);rq->EnQueue(data);std::cout << "生产者生产了一个数据" << data << std::endl;data++;}
}
int main()
{RingQueue<int>* rq = new RingQueue<int>();pthread_t c, p;pthread_create(&c, nullptr, consumer, (void*)rq);pthread_create(&p, nullptr, productor, (void*)rq);pthread_join(c, nullptr);pthread_join(p, nullptr);return 0;
}
Makefile
ringqueue:main.ccg++ -o $@ $^ -std=c++11
.PHONY:clean
clean:rm -f ringqueue
4. 信号量实现过程中的一些疑问
这是一个基于单生产者,单消费者的信号量。
问题1:我们在申请信号量的过程当中并没有用到锁,难道就不怕数据不安全吗?
刚开始时,队列为空,生产者先申请信号量,生产数据,然后在V操作,唤醒消费者,消费者才能消费数据。这个过程本身就已经完成了生产者和消费者之间的互斥与同步关系
。
当队列为满时,生产者申请信号量失败,就被阻塞住,此时消费者申请信号量,消费数据,然后再唤醒生产者,生产者才能生产数据,所以这个过程本身也完成了生产者与消费者之间的互斥与同步关系
。
而队列不为空也不为满时,生产者和消费者可以并发执行
。
问题2:我们怎么没有在临界区内部,判断资源是否就绪呢?
信号量本身就是一把计数器,是对于资源的一种预定机制,对信号量进行P操作的时候,虽然是申请信号量,但本质就是对资源是否就绪进行判断。有多少资源就可以预定多少资源,绝不会预定出的资源比实际资源多,也就是说有多少资源就可以有多少个生产者线程
。
重新理解信号量。
我们把信号量设置为5,如果信号量设置为1呢?不就是二元信号量,一个线程申请信号量之后就不可能再有第二个线程成功申请信号量,信号量就变为了0,这不就是一把锁
吗!控制着线程的开关。
重新理解一下锁:不就是认为自己的资源只有一份,申请锁不就类似于二元信号量,信号量P操作,释放锁不就是V操作
。
所以,锁是信号量的一种特殊情况。
二、日志与策略模式
什么是日志呢?
计算机中的日志是记录系统和软件运行中发生事件的文件,主要作用是监控运行状态,记录异常信息,帮助快速定位问题并支持程序员进行问题修复,它是系统维护,故障排查和安全管理的重要工具。
我们设计的日志格式主要包含以下几个指标:
时间戳、日志等级、日志内容、文件名、行号,进程线程相关 id 信息。
//获取时间戳
//tloc设置为nullptr
time_t time(time_t* tloc);
//timep获取到的时间戳
//result输出型参数
struct tm* localtime_r(const time_t* timep, struct tm* result);
struct tm
{int tm_sec; /* Seconds (0-60) */int tm_min; /* Minutes (0-59) */int tm_hour; /* Hours (0-23) */int tm_mday; /* Day of the month (1-31) */int tm_mon; /* Month (0-11) */int tm_year; /* Year - 1900 */int tm_wday; /* Day of the week (0-6, Sunday = 0) */int tm_yday; /* Day in the year (0-365, 1 Jan = 0) */int tm_isdst; /* Daylight saving time */
};
Logger.hpp
#pragma once
#include <iostream>
#include <filesystem>
#include <fstream>
#include <string>
#include <sstream>
#include <memory>
#include <unistd.h>
#include "Mutex.hpp"enum class LoggerLevel
{DEBUG,INFO,WARNING,ERROR,FATAL
};std::string LoggerLevelToString(LoggerLevel level)
{switch (level){case LoggerLevel::DEBUG:return "Debug";case LoggerLevel::INFO:return "Info";case LoggerLevel::WARNING:return "Warning";case LoggerLevel::ERROR:return "Error";case LoggerLevel::FATAL:return "Fatal";default:return "Unknown";}
}std::string GetCurrentTime()
{// 获取时间戳time_t timep = time(nullptr);// 把时间戳转化为时间格式struct tm currtm;localtime_r(&timep, &currtm);// 转化为字符串char buffer[64];snprintf(buffer, sizeof(buffer), "%4d-%02d-%02d %02d-%02d-%02d",currtm.tm_year + 1900, currtm.tm_mon + 1, currtm.tm_mday,currtm.tm_hour, currtm.tm_min, currtm.tm_sec);return buffer;
}class LogStrategy
{
public:virtual ~LogStrategy() = default;virtual void SyncLog(const std::string &logmessage) = 0;
};// 显示器刷新
class ConsoleLogStrategy : public LogStrategy
{
public:~ConsoleLogStrategy(){}virtual void SyncLog(const std::string &logmessage) override{{LockGuard lockguard(&_lock);std::cout << logmessage << std::endl;}}private:Mutex _lock;
};const std::string default_dir_path_name = "log";
const std::string default_filename = "test.log";
// 文件刷新
class FileLogStrategy : public LogStrategy
{
public:FileLogStrategy(const std::string dir_path_name = default_dir_path_name,const std::string filename = default_filename): _dir_path_name(dir_path_name), _filename(filename){if (std::filesystem::exists(_dir_path_name)){return;}try{std::filesystem::create_directories(_dir_path_name);}catch (const std::filesystem::filesystem_error &e){std::cerr << e.what() << "\r\n";}}~FileLogStrategy(){}virtual void SyncLog(const std::string &logmessage) override{{LockGuard lock(&_lock);std::string target = _dir_path_name;target += '/';target += _filename;std::ofstream out(target.c_str(), std::ios::app);if (!out.is_open()){return;}out << logmessage << "\n";out.close();}}private:std::string _dir_path_name;std::string _filename;Mutex _lock;
};class Logger
{
public:Logger(){}void EnableConsoleStrategy(){_strategy = std::make_unique<ConsoleLogStrategy>();}void EnableFileStrategy(){_strategy = std::make_unique<FileLogStrategy>();}class LogMessage{public:LogMessage(LoggerLevel level, std::string filename, int line, Logger& logger): _curr_time(GetCurrentTime()), _level(level), _pid(getpid()), _filename(filename), _line(line), _logger(logger){std::stringstream ss;ss << "[" << _curr_time << "] "<< "[" << LoggerLevelToString(_level) << "] "<< "[" << _pid << "] "<< "[" << _filename << "] "<< "[" << _line << "]"<< " - ";_loginfo = ss.str();}template <typename T>LogMessage &operator<<(const T &info){std::stringstream ss;ss << info;_loginfo += ss.str();return *this;}~LogMessage(){if (_logger._strategy){_logger._strategy->SyncLog(_loginfo);}}private:std::string _curr_time; // 时间戳LoggerLevel _level; // 日志等级pid_t _pid; // 进程pidstd::string _filename; // 文件名int _line; // 行号std::string _loginfo; // 一条合并完成的,完整的日志信息Logger &_logger; // 提供刷新策略的具体做法};LogMessage operator()(LoggerLevel level, std::string filename, int line){return LogMessage(level, filename, line, *this);}~Logger(){}private:std::unique_ptr<LogStrategy> _strategy;
};Logger logger;#define LOG(level) logger(level, __FILE__, __LINE__)
#define EnableConsoleStrategy() logger.EnableConsoleStrategy()
#define EnableFileStrategy() logger.EnableFileStrategy()
Mutex.hpp
#pragma once
#include<iostream>
#include<mutex>
#include<pthread.h>class Mutex
{
public:Mutex(){pthread_mutex_init(&_lock, nullptr);}void Lock(){pthread_mutex_lock(&_lock);}void Unlock(){pthread_mutex_unlock(&_lock);}~Mutex(){pthread_mutex_destroy(&_lock);}
private:pthread_mutex_t _lock;
};class LockGuard
{
public:LockGuard(Mutex* _mutex):_mutexp(_mutex){_mutexp->Lock();}~LockGuard(){_mutexp->Unlock();}
private:Mutex* _mutexp;
};
main.cc
#include"Logger.hpp"int main()
{EnableConsoleStrategy();LOG(LoggerLevel::ERROR) << "hello linux" << ", 6.66 " << 123;LOG(LoggerLevel::WARNING) << "hello linux" << ", 6.66 " << 123;LOG(LoggerLevel::ERROR) << "hello linux" << ", 6.66 " << 123;LOG(LoggerLevel::ERROR) << "hello linux" << ", 6.66 " << 123;LOG(LoggerLevel::ERROR) << "hello linux" << ", 6.66 " << 123;// std::string test = "hello world, hello log";// std::unique_ptr<LogStrategy> logger_ptr = std::make_unique<ConsoleLogStrategy>();// // logger_ptr->SyncLog(test);// // std::unique_ptr<LogStrategy> logger_ptr = std::make_unique<FileLogStrategy>();// logger_ptr->SyncLog(GetCurrentTime());// sleep(1);// logger_ptr->SyncLog(GetCurrentTime());// sleep(1);// logger_ptr->SyncLog(GetCurrentTime());// sleep(1);// logger_ptr->SyncLog(GetCurrentTime());// sleep(1);// logger_ptr->SyncLog(GetCurrentTime());return 0;
}
Makefile
logger_test:main.ccg++ -o $@ $^ -std=c++17 -lpthread
.PHONY:clean
clean:rm -f logger_test
这里的构思非常的巧妙,本来想要输出一条完整的日志信息,需要很复杂的操作,现在利用这样的做法就可以用一行代码输出一条完整的日志信息。
下面我们就来看看是怎样的做法呢?
我们在外部类 Logger 里重载了运算符(),返回了一个 LogMessage 类的临时对象
。
EnableConsoleStrategy();这个其实就是一个宏,这个宏是对于 Logger 类里面的两个函数的简便操作,LOG也是一个宏,是对于()运算符的重载函数的简便操作
。
所以,当调用了LOG宏之后会返回一个临时对象,<<运算符重载函数是LogMessage类的一个成员函数,返回的是临时对象的引用,因为,LOG宏返回一个LogMessage类的临时对象,这个临时对象又继续调用了 << 运算符函数,继续返回临时对象的引用,以此类推,直到调用结束
。
临时对象是具有常性的,它的生命周期在一条语句之后结束,所以可以返回临时对象的引用。
今天的文章分享到此结束,觉得不错的伙伴给个一键三连吧。