C++项目:仿muduo库高并发服务器-------connection模块
文章目录
- 前言
- 一、Connection模块
- 二、代码实现
前言
一、Connection模块
Connection模块是对Buffer模块,Socket模块,Channel模块的⼀个整体封装,实现了对⼀个通信套接字的整体的管理,每⼀个进行数据通信的套接字(也就是accept获取到的新连接)都会使用Connection进行管理。
- 功能:
- 是对通信连接进行整体管理的模块,连接相关操作均通过它执行
- 处理连接事件,因不知使用者事件处理逻辑,提供事件回调函数由使用者设置
- 意义:并非单独功能模块,而是对连接做管理的模块,增加了连接操作的灵活性
- 功能设计:
- 基础操作:关闭连接、发送数据、协议切换(本质就是重新设置回调函数)、启动非活跃连接超时释放、取消非活跃连接超时释放
- 回调函数设置:连接建立完成回调、新数据接收成功回调、连接关闭回调、任意事件回调
- 协议切换:连接接收数据后的业务处理取决于上下文以及数据的业务处理回调函数。
二、代码实现
// DISCONNECTED:连接处于关闭状态
// CONNECTING:连接刚建立完成,待处理
// CONNECTED:连接已建立完成,并且各种处理已设置// DISCONNECTING:待关闭状态
typedef enum{DISCONNECTED,CONNECTING,CONNECTED,DISCONNECTING
}ConnStatu;
class Connection;
using PtrConnection=std::shared_ptr<Connection>;
class Connection: public std::enable_shared_from_this<Connection>{
private:using ConnectedCallback=std::function<void(PtrConnection)>;using MessageCallback=std::function<void(PtrConnection,Buffer*buf)>;using ClosedCallback=std::function<void(PtrConnection)>;using AnyEventCallback=std::function<void(PtrConnection)>;
public:Connection(EventLoop*loop,uint64_t id,int fd):_conn_id(id),_loop(loop),_sockfd(fd),_socket(fd),_channel(loop,fd),_statu(CONNECTING),_enable_inactive_release(false){_channel.SetCloseCallback(std::bind(&Connection::HandleClose,this));_channel.SetErrorCallback(std::bind(&Connection::HandleError,this));_channel.SetEventCallback(std::bind(&Connection::HandleEvent,this));_channel.SetReadCallback(std::bind(&Connection::HandleRead,this));_channel.SetWriteCallback(std::bind(&Connection::HandleWrite,this));}//获取连接iduint64_t Id(){return _conn_id;}//获取上下文数据Any*GetContext(){return &_context;}//获取连接描述符int GetFd(){return _sockfd;}//判断当前连接是否处于CONNECTED状态bool Connected(){return _statu==CONNECTED;}//设置上下文,连接建立完成时void SetContext(const Any&context){_context=context;}//连接建立完成后,进行channel回调设置,启动读监控,调用_connect_callbackvoid Established(){_loop->RunInLoop(std::bind(&Connection::EstablishedInLoop,this));}//将数据发送至发送缓冲区启动写事件监控void Send(const void*data,int len){//不可以直接使用传入的数据,该数据可能存储在临时空间,而该函数有可能不会立即执行Buffer buf;buf.WriteAndPush(data,len);_loop->RunInLoop(std::bind(&Connection::SendInLoop,this,buf));//_loop->RunInLoop(std::bind(&Connection::SendInLoop,this,data,len));}//给组件使用者提供的关闭连接接口,并不实际关闭,需要判断是否数据需要处理----设为半关闭状态void Shutdown(){_loop->RunInLoop(std::bind(&Connection::ShutdownInLoop,this));}//启动非活跃销毁,设定销毁时间void EnableInactiveRelease(int sec){_loop->RunInLoop(std::bind(&Connection::EnableInactiveReleaseInLoop,this,sec));}//取消非活跃销毁void CancelInactiveRelease(){_loop->RunInLoop(std::bind(&Connection::CancelInactiveReleaseInLoop,this));}//协议切换更新连接上下文void Upgrad(const Any&context,const ConnectedCallback&conn,const MessageCallback&mess,const ClosedCallback&close,const AnyEventCallback&any){//进行协议切换时,必须保证是在EventLoop中进行的,防止新事件触发后协议切换未执行_loop->AssertInLoop();_loop->RunInLoop(std::bind(&Connection::UpgradInLoop,this,context,conn,mess,close,any));}//这几个回调由组件使用者设置void SetConnectedCallback(const ConnectedCallback&cb) { _connected_callback = cb; }void SetMessageCallback(const MessageCallback&cb) { _message_callback = cb; }void SetClosedCallback(const ClosedCallback&cb) { _closed_callback = cb; }void SetAnyEventCallback(const AnyEventCallback&cb) { _event_callback = cb; }void SetSrvClosedCallback(const ClosedCallback&cb) { _server_closed_callback = cb; }private:uint64_t _conn_id;//connection标识符,便于查找int _sockfd;//连接关联的文件描述符bool _enable_inactive_release;//连接是否启动非活跃连接的标志//uint64_t _timer_id;简化使用conn_id代替ConnStatu _statu;//连接状态EventLoop*_loop;//连接关联的一个loop从后连接的操作均在loop中进行,保证线程安全Channel _channel;//管理连接事件Socket _socket;//套接字操作管理Buffer _in_buffer;//输入缓冲区,存放从socket中读取到的数据Buffer _out_buffer;//输出缓冲区,存放回复给socket的数据Any _context;//存放当前连接请求的上下文数据(应对请求不完整情况)//这几个回调由组件使用者设置ConnectedCallback _connected_callback;MessageCallback _message_callback;ClosedCallback _closed_callback;AnyEventCallback _event_callback;//组件内部调用的回调,用来清楚Server模块对连接的维护 ClosedCallback _server_closed_callback;//给channel提供的五个回调函数//描述符可读时间触发后,接收socket数据放到接收缓冲区中,调用_MessageCallbackvoid HandleRead(){char buf[65536]={0};int ret=_socket.NonBlockRecv(buf,65535);if(ret<0){ERR_LOG("HANDLEREAD FAIL!");//不能直接关闭需要处理内部资源return ShutdownInLoop();}//将数据存入输入缓冲区_in_buffer.WriteAndPush(buf,ret);if(_in_buffer.ReadAbleSize()>0){return _message_callback(shared_from_this(),&_in_buffer);}return; }//描述符可写事件触发后,将发送缓冲区的数据发送void HandleWrite(){int ret=_socket.NonBlockSend(_out_buffer.ReadPosition(),_out_buffer.ReadAbleSize());if(ret<0){ERR_LOG("HANDLEWRITE FAIL!");if(_in_buffer.ReadAbleSize()>0){_message_callback(shared_from_this(),&_in_buffer);}return ReleaseInLoop();}_out_buffer.MoveReadOffset(ret);//如果没有可写数据了,就关闭写监控,避免一直被触发//如果还要就不处理,等待下次可写事件触发if(_out_buffer.ReadAbleSize()==0){_channel.DisableWrite();//如果连接为待关闭状态就关闭if(_statu==DISCONNECTING){return ReleaseInLoop();}}return;}//描述符挂断事件触发后void HandleClose(){//如果读缓冲区中有数据就去处理数据if(_in_buffer.ReadAbleSize()>0){_message_callback(shared_from_this(),&_in_buffer);}ReleaseInLoop();}//描述符错误事件触发后void HandleError(){HandleClose();}//描述符任意事件触发void HandleEvent(){//如果非活跃连接启动if(_enable_inactive_release){_loop->TimerRefresh(_conn_id);}if(_event_callback){_event_callback(shared_from_this());}}//在Loop内执行保证线程安全//连接获取后,对链接状态进行各种设置(给channel设置事件回调,启动读监控)void EstablishedInLoop(){//对连接状态设置assert(_statu==CONNECTING);_statu=CONNECTED;//开启连接读事件监控// 一旦启动读事件监控就有可能会立即触发读事件,所以非活跃连接销毁要在事件监控前设置_channel.EnableRead();if(_connected_callback){_connected_callback(shared_from_this());}}//为内部提供的连接释放接口void ReleaseInLoop(){DBG_LOG(".......%ld %d",_conn_id,_channel.Fd());//修改连接状态_statu=DISCONNECTED;//移除事件监控_channel.Remove();//关闭描述符_socket.Close();//如果定时器任务中还要定时销毁任务就取消if(_loop->HasTimer(_conn_id)){CancelInactiveRelease();}//调用关闭处理函数,避免因先调用内部处理函数,导致connect被释放if(_closed_callback){_closed_callback(shared_from_this());}//清理server存储的connection信息if(_server_closed_callback){_server_closed_callback(shared_from_this());}}void SendInLoop(Buffer buf){if(_statu==DISCONNECTED)return;_out_buffer.WriteAsBufferAndPush(buf);//_out_buffer.WriteAndPush(data,len);//开启读事件监控if(_channel.WriteAble()==false){_channel.EnableWrite();}}void ShutdownInLoop(){//设置为半连接_statu=DISCONNECTING;if(_in_buffer.ReadAbleSize()>0){if(_message_callback){_message_callback(shared_from_this(),&_in_buffer);}}//如果发送缓冲区由数据,启动读事件监控,事件就绪就触发 HandleWrite处理if(_out_buffer.ReadAbleSize()>0){if(_channel.WriteAble()==false){_channel.EnableWrite();}}if(_out_buffer.ReadAbleSize()==0){ReleaseInLoop();}}void EnableInactiveReleaseInLoop(int sec){//启动超时销毁//DBG_LOG("----------------");_enable_inactive_release=true;//判断该连接的销毁任务是否已经存在,如果存在刷新,不存在设置if(_loop->HasTimer(_conn_id)){return _loop->TimerRefresh(_conn_id);}_loop->TimerAdd(_conn_id,sec,std::bind(&Connection::ReleaseInLoop,this));}void CancelInactiveReleaseInLoop(){//取消超时连接_enable_inactive_release=false;if(_loop->HasTimer(_conn_id)){_loop->TimerCancel(_conn_id);}}void UpgradInLoop(const Any&context,const ConnectedCallback&conn,const MessageCallback&mess,const ClosedCallback&close,const AnyEventCallback&any){_context=context;_connected_callback=conn;_message_callback=mess;_closed_callback=close;_event_callback=any;}
};