多线程异步日志系统与实现及 TCP/IP C/S 模型
一、多线程日志系统分析
多线程程序对日志库提出了新的需求:线程安全,即多个线程可以并发写日志,多个线程的日志消息不会出现交织。线程安全不难办到,简单的办法是用一个全局 mutex 保护 IO,或者每个线程单独写一个日志文件,但这两种做法的高效性就堪忧了。前者会造成全部线程抢一个锁,后者有可能让业务线程阻塞在写磁盘操作上。
每个进程中的多线程程序最好只写一个日志文件,这样分析日志更容易,不必在多个文件中跳来跳去。再说多线程写多个文件也不一定能提速。解决办法不难想到,用一个工作线程负责收集日志消息,并写入日志文件,其他业务线程只管往这个 "日志线程" 发送日志消息,这称为 "异步日志"。
二、多线程日志系统设计
日志系统采用的是双缓冲(double buffering) 技术,基本思路是准备两块 buffer:A 和 B,前端负责往 buffer A 填数据(日志消息),后端负责将 buffer B 的数据写入文件。当 buffer A 写满之后,交换 A 和 B,让后端将 buffer A 的数据写入文件,而前端则往 buffer B 填入新的日志消息,如此往复。
用两个 buffer 的好处是在新建日志消息的时候不必等待磁盘文件操作,也避免每条新日志消息都触发(唤醒)后端日志线程。换言之,前端不是将一条条日志消息分别传送给后端,而是将多条日志消息拼成一个大的 buffer 传送给后端,相当于批处理,减少了线程唤醒的频度,降低开销。另外,为了及时将日志消息写入文件,即便 buffer A 未满,日志库也会每 3 秒执行一次上述交换写入操作。
三、日志系统核心类实现
1. LogFile 类关键成员变量
count_(0), // 写数据次数计数, 超过限值checkEveryN_时清除, 然后重新计数
mutex_(threadSafe ? new mutex{} : nullptr), // 互斥锁指针, 根据是否需要线程安全来初始化
startOfPeriod_(0), // 本次写log周期的起始时间(秒)
lastRoll_(0), // 上次roll日志文件时间(秒)
lastFlush_(0) // 上次flush日志文件时间(秒)
{//assert(basename.find('/') == string::npos);rollFile();
}
LogFile::~LogFile() {}
2. AsyncLogging 类定义
#include<atomic>
#include<thread>
#include<memory>
#include<mutex>
#include<condition_variable>
#include<string>
#include<vector>
using namespace std;
//own
#include"LogFile.hpp"
namespace tulun
{class AsyncLogging{private:AsyncLogging(const AsyncLogging&) = delete;AsyncLogging& operator=(const AsyncLogging&) = delete;private:void workthreadfunc(); // 工作线程private:const int flushInterval_; // 定期(flushInterval_秒)将缓冲区的数据写到文件中std::atomic<bool> running_; // 是否正在运行const string basename_; // 日志filename名字const size_t rollSize_; // 回滚大小std::unique_ptr<std::thread> pthread_; // 执行该异步日志记录器的线程std::mutex mutex_; // 互斥锁std::condition_variable cond_; // 条件变量std::string currentBuffer_; // 当前的缓冲区std::vector<std::string> buffers_; // 数据缓冲区队列tulun::LogFile output_; // 定义日志文件对象 public:// rollSize //回滚大小//flushInterval = 3 ; // 刷新间隔,默认值3秒AsyncLogging(const string &basename,size_t rollSize,int flushInterval = 3);~AsyncLogging();void append(const string &info);void append(const char *info,int len);void start();void stop();void flush();};
}
3. AsyncLogging 类实现
#include"AsyncLogging.hpp"
namespace tulun
{static const int BufMaxLen = 4000;static const int BufQueueSize = 16;AsyncLogging::AsyncLogging(const std::string& basename, size_t rollSize, int flushInterval):flushInterval_(flushInterval), // 刷新间隔running_(false),rollSize_(rollSize),pthread_(nullptr),// latch_(1),output_(basename,rollSize,false) // 定义日志文件对象 {currentBuffer_.reserve(BufMaxLen);buffers_.reserve(BufQueueSize); // vector预定大小,避免自动增长(效率更高)}AsyncLogging::~AsyncLogging(){if (running_){stop();}}void AsyncLogging::start(){running_ = true;// 执行该异步日志记录器的线程pthread_.reset(new std::thread(&AsyncLogging::workthreadfunc,this));// latch_.wait();}void AsyncLogging::stop(){running_ = false;cond_.notify_all();pthread_->join();}void AsyncLogging::append(const string &info){append(info.c_str(),info.size());}/******************************************************************** Description : 前端在生成一条日志消息时,会调用AsyncLogging::append()。如果currentBuffer_够用,就把日志内容写入到currentBuffer_中,如果不够用(就认为其满了),就把currentBuffer_放到已满buffer_数组中,等待消费者线程(即后台线程)来取。则将预备好的另一块缓冲(nextBuffer_)移用为当前缓冲区(currentBuffer_)。*******************************************************************/void AsyncLogging::append(const char *info,int len){std::unique_lock<std::mutex> _lock(mutex_);if(currentBuffer_.size() >= BufMaxLen || (currentBuffer_.capacity() - currentBuffer_.size()) < len){buffers_.push_back(std::move(currentBuffer_));currentBuffer_.reserve(BufMaxLen);}else{currentBuffer_.append(info,len);}cond_.notify_all();}void AsyncLogging::workthreadfunc(){//tulun::LogFile output(basename_,rollSize_,false); //定义日志文件对象 std::vector<std::string> buffersToWrite;// 线程函数的局部队列// latch_.countDown();while (running_){//std::this_thread::sleep_for(std::chrono::milliseconds(5000));{std::unique_lock<std::mutex> _lock(mutex_);if (buffers_.empty()){cond_.wait_for(_lock, std::chrono::seconds(flushInterval_));// 时间点到达 ,还要获得mutex_ 才能从wait_for 函数返回;}// 无论cond是因何(一是超时,二是当前缓冲区写满了)而醒来,都要将currentBuffer_放到buffers_中。 // 如果是因为时间到(3秒)而醒,那么currentBuffer_还没满,此时也要将之写入LogFile中。// 如果已经有一个前端buffer满了,那么在前端线程中就已经把一个前端buffer放到buffers_中了。// 此时,还是需要把currentBuffer_放到buffers_中(注意,前后放置是不同的buffer,// 因为在前端线程中,currentBuffer_已经被换成nextBuffer_指向的buffer了)。buffers_.push_back(std::move(currentBuffer_));currentBuffer_.reserve(BufMaxLen);buffersToWrite.swap(buffers_);buffers_.reserve(BufQueueSize);// 释放mutex_ ;}////异步写文件// 如果将要写入文件的buffer列表中buffer的个数大于25,那么将多余数据删除。// 前端陷入死循环,拼命发送日志消息,超过后端的处理能力,这是典型的生产速度超过消费速度, // 会造成数据在内存中的堆积,严重时引发性能问题(可用内存不足)或程序崩溃(分配内存失败)。 if (buffersToWrite.size() > 25) { char buf[256]; snprintf(buf, sizeof buf, "Dropped log messages at larger buffers\n"); fputs(buf, stderr); // 丢掉多余日志,以腾出内存,仅保留两块缓冲区 buffersToWrite.erase(buffersToWrite.begin() + 2, buffersToWrite.end()); }// 将buffersToWrite的数据写入到日志文件中for (const auto& buffer : buffersToWrite){output_.append(buffer.c_str(),buffer.size());}buffersToWrite.clear();}output_.flush();}void AsyncLogging::flush(){std::vector<std::string> buffersToWrite;// std::unique_lock<std::mutex> _lock(mutex_);buffers_.push_back(std::move(currentBuffer_));buffersToWrite.swap(buffers_);for (const auto& buffer : buffersToWrite){output_.append(buffer.c_str(),buffer.size());}output_.flush();buffersToWrite.clear();}
}
四、日志系统测试代码
1. 基础测试
#include"Logger.hpp"
tulun::LogFile *plog = nullptr;
void writefile(const string &info)
{plog->append(info);
}
void flushfile()
{plog->flush();
}
int main()
{//tulun::LogFile logfile("yhping",1024*1024*1000);tulun::Logger::setOutput(writefile);tulun::Logger::setFlush(flushfile);plog = new tulun::LogFile("yhping",1024*1024*1000);for(int i = 0;i<1000000;++i){LOG_INFO<<"main"<<i;}return 0;
}
2. 异步日志单线程测试
#include"Logger.hpp"
tulun::AsyncLogging *asynclog = nullptr;
void asyncWriteFile(const string &info)
{asynclog->append(info);
}
void asyncFlushFile()
{asynclog->flush();
}
int main()
{asynclog = new tulun::AsyncLogging("yhping",1024*10);tulun::Logger::setOutput(asyncWriteFile);tulun::Logger::setFlush(asyncFlushFile);asynclog->start();for(int i = 0;i<1000;++i){LOG_INFO<<"main "<<i;std::this_thread::sleep_for(std::chrono::milliseconds(200));}return 0;
}
3. 异步日志多线程测试
#include"Logger.hpp"
tulun::AsyncLogging *asynclog = nullptr;
void asyncWriteFile(const string &info)
{asynclog->append(info);
}
void asyncFlushFile()
{asynclog->flush();
}
void func(char ch)
{for(int i = 0;i<1000;++i){LOG_INFO<<"main "<<i<<ch;std::this_thread::sleep_for(std::chrono::milliseconds(100));}
}
int main()
{asynclog = new tulun::AsyncLogging("yhping",1024*10);tulun::Logger::setOutput(asyncWriteFile);tulun::Logger::setFlush(asyncFlushFile);asynclog->start();std::thread tha(func,'a');std::thread thb(func,'b');std::thread thc(func,'c');tha.join();thb.join();thc.join();return 0;
}
五、倒计时同步工具类
1. CountDownLatch 类定义
#include<mutex>
#include<condition_variable>
using namespace std;
namespace tulun
{class CountDownLatch {public:explicit CountDownLatch(int count);void wait();void countDown();int getCount() const;private:mutable std::mutex mutex_;std::condition_variable condition_;int count_;};
}
2. CountDownLatch 类实现
#include"CountDownLatch.hpp"
namespace tulun
{CountDownLatch::CountDownLatch(int count):count_(count) {}void CountDownLatch::wait(){std::unique_lock<std::mutex> _lock(mutex_);while(count_ > 0){condition_.wait(_lock);}}void CountDownLatch::countDown(){std::unique_lock<std::mutex> _lock(mutex_);count_-=1;if(count_ == 0){condition_.notify_all();}}int CountDownLatch::getCount() const{std::unique_lock<std::mutex> _lock(mutex_);return count_;}
}
3. 集成 CountDownLatch 的 AsyncLogging 修改
在 AsyncLogging 类中添加倒计时对象:
tulun::CountDownLatch latch_; // 倒计时对象
构造函数初始化:
latch_(1),
修改 start () 和 workthreadfunc ():
void AsyncLogging::start()
{running_ = true;// 执行该异步日志记录器的线程pthread_.reset(new std::thread(&AsyncLogging::workthreadfunc,this));latch_.wait();
}void AsyncLogging::workthreadfunc()
{//tulun::LogFile output(basename_,rollSize_,false); //定义日志文件对象 std::vector<std::string> buffersToWrite;// 线程函数的局部队列latch_.countDown();while (running_){/// 原有逻辑}
}
六、TCP/IP 协议 C/S 模型实现
1. 服务器代码实现
#include<stdio.h>
#include<assert.h>
#include<netinet/in.h> // sockaddr_in
#include<string.h> // bzero
#include<arpa/inet.h> // inet_ntop; inet_pton
#include<sys/socket.h> // socket;
#include<sys/types.h> // uint16_t
#include<unistd.h> // read
#include<errno.h> // errno;
const uint16_t PORT = 12345;
const char *IP = "127.0.0.1";
const int LISTENSIZE = 10;
const int BUFFSIZE = 128;
int main(int argc,char *argv[])
{int listenfd = 0, connfd = 0;int ret = 0;char buff[BUFFSIZE]={};struct sockaddr_in seraddr,cliaddr;bzero(&seraddr,sizeof(seraddr)); seraddr.sin_family = AF_INET;inet_pton(AF_INET,IP,&seraddr.sin_addr);seraddr.sin_port = htons(PORT);listenfd = socket(PF_INET,SOCK_STREAM,0);assert(listenfd != -1);ret = bind(listenfd,(struct sockaddr*) &seraddr,sizeof(seraddr));assert(ret != -1);ret = listen(listenfd,LISTENSIZE);assert(ret != -1);while(1){socklen_t cliaddrlen = sizeof(cliaddr); // socklen_t => unsigned int;connfd = accept(listenfd,(struct sockaddr*)&cliaddr,&cliaddrlen);assert(connfd != -1);//int len = read(connfd,buff,sizeof(buff));int len = recv(connfd,buff,sizeof(buff),0);if(len > 0){printf("client : data: %d %s \n",len ,buff);send(connfd,buff,len,0);close(connfd);}else if(len == 0){close(connfd);}else{printf("error %s \n",strerror(errno));close(connfd);}}close(listenfd);return 0;
}
2.客户端代码实现
#include<stdio.h>
#include<arpa/inet.h>
#include<sys/types.h>
#include<unistd.h>
#include<sys/socket.h>
#include<string.h>
#define MAXLEN 64
#define PORT 2344int main()
{struct sockaddr_in serveraddr;int connfd;char buf[MAXLEN] = "";// 创建socketconnfd = socket(AF_INET, SOCK_STREAM, 0);if (connfd < 0){perror("socket creation failed");return 1;}// 初始化服务器地址结构bzero(&serveraddr, sizeof(serveraddr));serveraddr.sin_family = AF_INET; // IPv4协议serveraddr.sin_port = htons(PORT); // 端口转换inet_pton(AF_INET, "127.0.0.1", &serveraddr.sin_addr); // IP地址转换// 连接服务器if (connect(connfd, (struct sockaddr *)&serveraddr, sizeof(serveraddr)) < 0){perror("connection failed");close(connfd);return 1;}// 循环读取用户输入并发送给服务器while (~scanf("%s", buf)){// 输入"quit"退出if (!strncmp("quit", buf, 4))break;// 发送数据write(connfd, buf, strlen(buf));// 接收服务器响应int retlen = read(connfd, buf, MAXLEN);if (retlen > 0){printf("Server response: %s\n", buf);}}// 发送退出信号并关闭连接write(connfd, "quit", 4);close(connfd);return 0;
}