C++项目:仿muduo库高并发服务器---------LoopThreadPool模块和TcpServer模块的实现
文章目录
- 前言
- 一、LoopThreadPool模块设计
- 1.1 功能概述
- 1.2 代码实现
- 二、TcpServer模块设计
- 2.1 模块组成
- 2.2 功能概述
- 2.3 流程
- 2.4 代码实现
前言
本篇文章介绍了LoopThreadPool模块和TcpServer模块的实现其中:LoopThreadPool
模块是用来管理 LoopThread
而设计的,帮助我们给新建立的连接进行线程分配。TcpServer
模块用于整合所有相关模块,通过实例化该模块的对象,可简便搭建服务器。
一、LoopThreadPool模块设计
1.1 功能概述
LoopThreadPool
用于管理和分配所有 LoopThread
,支持可配置的线程数量(包括 0 个或多个),并能依据主从 Reactor 模型的逻辑,为新连接分配合适的线程(LoopThread
对应的 EventLoop
)以进行事件监控与处理。
- 线程数量可配置
线程池能设置包含 0 个或多个LoopThread
。当从属线程数量为 0 时,实现单 Reactor 服务器:一个线程既负责获取新连接,也负责连接的事件处理。
在一些轻量级场景下,一般不需要多线程执行任务
- 管理
LoopThread
对象
对 0 个或多个LoopThread
实例进行统一管理,包括创建、维护等操作。 - 线程分配功能
- 当主线程获取新连接后,需将其分配给对应线程的
EventLoop
以进行事件监控和处理。 - 若从属线程数量为 0,直接将新连接分配给主线程的
EventLoop
处理。 - 若存在多个从属线程,采用 RR(Round - Robin,轮转) 算法进行线程分配:轮询获取从属线程的
EventLoop
,并设置到对应的Connection
中。
- 当主线程获取新连接后,需将其分配给对应线程的
1.2 代码实现
class LoopEventPool{public:LoopEventPool(EventLoop*loop):_base_loop(loop),_next_index(0),_thread_count(0){}//设置线程个数void SetThreadCount(int sz){_thread_count=sz;}//创建从属线程void Create(){if(_thread_count>0){_threads.resize(_thread_count);_loops.resize(_thread_count);for(int i=0;i<_thread_count;i++){_threads[i]=new LoopServer();_loops[i]=_threads[i]->GetEventLoop();}}return;}//给新到来的连接获取loopEventLoop* NextLoop(){if(_thread_count==0){return _base_loop;}_next_index=(_next_index+1)%_thread_count;return _loops[_next_index];}private:int _thread_count;//创建从属线程线程个数int _next_index;//分配线程索引EventLoop* _base_loop;//主线程绑定的loopstd::vector<LoopServer*> _threads;//丛属线程std::vector<EventLoop*> _loops;//从属线程对应的loop
};
二、TcpServer模块设计
2.1 模块组成
Acceptor
对象:用于创建监听套接字。EventLoop
对象(baseloop
):实现对监听套接字的事件监控。std::unordered_map<uint64_t, PtrConnection> _conns
:对所有新建连接进行管理。LoopThreadPool
对象:创建线程池,对新建连接进行事件监控及处理。
2.2 功能概述
- 设置从属线程池数量。
- 启动服务器。
- 设置多种回调函数(连接建立完成、消息、关闭、任意等),用户设置给
TcpServer
,TcpServer
再设置给获取的新连接。 - 选择是否启动非活跃连接超时销毁功能。
- 具备添加定时任务功能。
2.3 流程
- 在
TcpServer
中实例化Acceptor
对象和EventLoop
对象(baseloop
)。 - 将
Acceptor
挂到baseloop
上进行事件监控。 - 当
Acceptor
对象就绪可读事件时,执行读事件回调函数获取新建连接。 - 对新连接,创建
Connection
进行管理。 - 对连接对应的
Connection
设置功能回调(连接完成回调、消息回调、关闭回调、任意事件回调)。 - 启动
Connection
的非活跃连接的超时销毁规则。 - 将新连接对应的
Connection
挂到LoopThreadPool
中的从属线程对应的EventLoop
中进行事件监控。 - 一旦
Connection
对应的连接就绪了可读事件,则执行读事件回调函数,读取数据,读取完毕后调用TcpServer
设置的消息回调。
2.4 代码实现
//对所有模块管理、快速搭建服务区
class TcpServer{//由组件使用者设置的事件回调using Functor=std::function<void()>;using ConnectedCallback=std::function<void(PtrConnection)>;using MessageCallback=std::function<void(PtrConnection,Buffer*buf)>;using ClosedCallback=std::function<void(PtrConnection)>;using AnyEventCallback=std::function<void(PtrConnection)>;ConnectedCallback _connected_callback;MessageCallback _message_callback;ClosedCallback _closed_callback;AnyEventCallback _event_callback;ClosedCallback _server_closed_callback;
private://为线连接创建Connection对象void NewConnection(int newfd){_next_id++;PtrConnection conn(new Connection(_threadpool.NextLoop(), _next_id, newfd));conn->SetMessageCallback(_message_callback);// conn->SetAnyEventCallback(std::bind(HandleRead));conn->SetConnectedCallback(_connected_callback);conn->SetClosedCallback(_closed_callback);conn->SetAnyEventCallback(_event_callback);//内部设置的清理连接信息回调conn->SetSrvClosedCallback(std::bind(&TcpServer::RemoveConnection,this,std::placeholders::_1));_conns.insert({_next_id, conn});if(_enable_inactive_release)conn->EnableInactiveRelease(_timeout);conn->Established();DBG_LOG("NEW LINK THREAD");}//将连接信息从_conns中移除void RemoveConnection(const PtrConnection& conn){_baseloop.RunInLoop(std::bind(&TcpServer::RemoveConnectionInLooop,this,conn));}void RemoveConnectionInLooop(const PtrConnection& conn){uint64_t id=conn->Id();auto it=_conns.find(id);if(it!=_conns.end()){_conns.erase(id);}}void RunAfterInLoop(const Functor&task,int dalay){_next_id++;_baseloop.TimerAdd(_next_id,dalay,task);}
public:TcpServer(int port):_next_id(0),_port(port),_enable_inactive_release(false),_acceptor(&_baseloop,port),_threadpool(&_baseloop){_acceptor.SetAcceptCallback(std::bind(&TcpServer::NewConnection,this,std::placeholders::_1));}//设置从属线程个数void SetThreadCount(int sz){_threadpool.SetThreadCount(sz);}void SetConnectedCallback(const ConnectedCallback&cb) { _connected_callback = cb; }void SetMessageCallback(const MessageCallback&cb) { _message_callback = cb; }void SetClosedCallback(const ClosedCallback&cb) { _closed_callback = cb; }void SetAnyEventCallback(const AnyEventCallback&cb) { _event_callback = cb; }void SetSrvClosedCallback(const ClosedCallback&cb) { _server_closed_callback = cb; }//启动非活跃销毁void EnableInactiveRelease(int timeout){_timeout=timeout;_enable_inactive_release=true;}//添加定时任务void RunAfter(const Functor&task,int dalay){_baseloop.RunInLoop(std::bind(&TcpServer::RunAfterInLoop,this,task,dalay));}void Start(){//创建从属线程_threadpool.Create();//创建从属线程_acceptor.Listen();//开始监听套接字std::cout<<"1"<<std::endl;_baseloop.Start();//开始监听处理事件}
private:uint64_t _next_id;//自增长的线程id;int _port;//监听套接字绑定的端口号int _timeout;//非活跃连接的超时时间bool _enable_inactive_release;//是否启用非活跃连接EventLoop _baseloop;//对监听套接字进行事件监控Acceptor _acceptor;//创建并管理监听套接字LoopEventPool _threadpool;//对从属线程进行管理std::unordered_map<uint64_t,PtrConnection> _conns;//存储连接信息};