当前位置: 首页 > news >正文

[TcpConnection]

`TcpConnection`类是muduo最核心的类,这个类主要封装了一个已建立的TCP连接,以及控制该TCP连接的方法(连接建立和关闭和销毁),以及该连接发生的各种事件(读/写/错误/连接)对应的处理函数,以及这个TCP连接的服务端和客户端的套接字地址信息等。
 

成员变量

private:EventLoop*loop_;    //这里绝对不是Mainloop,因为tcpconnection都是在subloop里边管理的const string name_;atomic<int> state_;bool reading;unique_ptr<Socket> socket_;unique_ptr<Channle> channel_;const InetAddress localAddr_;const InetAddress peerAddr_;ConnectionCallback connectionCallback_;       // 有新连接回调                void(const TcpConnectionPtr&)MessageCallback messageCallback_;             //有读写消息回调   void(constTcpConnectionPtr&,Buffer*,Timestamp)WriteCompleteCallback writeCompleteCallback_  // 消息发送完成以后的回调       void(const TcpConnectionPtr&)CloseCallback closeCallbacl_;                 //void(const TcpConnectionPtr&)HighWaterMarkCallback highWaterMarkCallback_; //void(const TcpConnectionPtr&,size_t)size_t highWaterMark_;Buffer inputBuffer_;                          //接受缓冲区Buffer outputBuffer_;                         //发送缓冲区enum StateE{kDisconnected,kConnecting,kConnected,kDisconnecting}; 
  •  `loop_`该`Tcp`连接的Channel注册到了哪一个sub EventLoop上。这个loop_就是那一个sub EventLoop。
  • `highWaterMark_` 因为发送数据,应用写得快,内核发送数据慢,需要把待发送的数据写入缓冲区,且设置了水位回调,防止发送太快
  • `inputBuffer_  outputBuffer_`  输入输出缓冲区,在输出缓冲区是用于暂存那些暂时发送不出去的待发送数据。因为`Tcp`发送缓冲区是有大小限制的,假如达到了高水位线,就没办法把发送的数据通过send()直接拷贝到`Tcp`发送缓冲区,而是暂存在这个`outputBuffer_`中,等TCP发送缓冲区有空间了,触发可写事件了,再把`outputBuffer_`中的数据拷贝到`Tcp`发送缓冲区中。

重要成员函数

void TcpConnection::send(const std::string &buf) //直接引用buffer
{if(state_ == kConnected){if(loop_->isInLoopThread()){//string.c_str是Borland封装的String类中的一个函数,它返回当前字符串的首字符地址。sendInLoop(buf.c_str(),buf.size());}else{loop_->runInLoop(std::bind(&TcpConnection::sendInLoop, this, buf.c_str(), buf.size()));}}
}
void TcpConnection::sendInLoop(const void* data, size_t len)
{ssize_t nwrote = 0;size_t remaining = len; //未发送的数据bool faultError = false; //记录是否产生错误//之前调用过connection的shutdown 不能在发送了if(state_ == kDisconnected){LOG_ERROR("disconnected,give up writing!");return ;}//channel 第一次开始写数据,且缓冲区没有待发送数据if(!channel_->isWriting() && outputBuffer_.readableBytes() == 0){nwrote = ::write(channel_->fd(),data,len);if(nwrote >= 0){remaining = len - nwrote;if(remaining == 0 && writeCompleteCallback_){loop_->queueInLoop(std::bind(writeCompleteCallback_,shared_from_this()));}}else{nwrote = 0;if(errno != EWOULDBLOCK) //用于非阻塞模式,不需要重新读或者写{LOG_ERROR("TcpConnection::sendInLoop");if(errno == EPIPE || errno == ECONNRESET) //SIGPIPE RESET{faultError = true;}}}}if(!faultError && remaining > 0) {//目前发送缓冲区剩余的待发送数据的长度size_t oldlen = outputBuffer_.readableBytes();if(oldlen + remaining >= highWaterMark_ && oldlen < highWaterMark_&& highWaterMark_){loop_->queueInLoop(std::bind(highWaterMarkCallback_,shared_from_this(),oldlen + remaining));}outputBuffer_.append((char*)data + nwrote,remaining);if(!channel_->isWriting()){channel_->enableWriting(); //注册channel写事件,否则poller不会向channel通知epollout}}
}
  1. 发送数据要发送的数据长度是`len`,如果在`loop_`在当前的线程里面,就调用`sendInLoop`,`sendInLoop`内部实际上是调用了系统的`write`,如果一次性发送完了,就设置`writeCompleteCallback_`,表明不要再给channel设置`epollout`事件了
  2. 如果没有写完,先计算一下`oldlen`目前发送缓冲区剩余的待发送数据的长度。满足:`if(oldlen + remaining >= highWaterMark_ && oldlen < highWaterMark_&& highWaterMark_)`   就会触发高水位回调
  3.  不满足以上的话,直接写入`outputBuffer_`
  4. 剩余的数据保存到缓冲区当中,要给给`channel`注册`epollout`事件(**切记,一定要注册channel的写事件,否则`poller`不会向channel通知`epollout`**),这样`poller`发现`tcp`发送缓冲区有空间,会通知相应的`socket-channel`调用相应的`writeCallback()`回调方法,也就是调用`TcpConnection::handleWrite`,把发送缓冲区中数据全部发送出去。
void TcpConnection::handleRead(TimeStamp receiveTime)
{int savedErrno = 0;ssize_t n = inputBuffer_.readFd(channel_->fd(),&savedErrno);if(n > 0){      messageCallback_(shared_from_this(),&inputBuffer_,receiveTime);}else if(n==0) //客户端断开{handleClose();} else{errno = savedErrno;LOG_ERROR("TcpConnection::hanleRead");handleError();}}

负责处理`Tcp`连接的可读事件,它会将客户端发送来的数据拷贝到用户缓冲区中`inputBuffer_`,然后再调用`connectionCallback_`保存的连接建立后的处理函数。

void TcpConnection::handleWrite()
{if(channel_->isWriting()){int savedErrno = 0;ssize_t n = outputBuffer_.writeFd(channel_->fd(),&savedErrno);if(n > 0){outputBuffer_.retrieve(n); //处理了n个if(outputBuffer_.readableBytes() == 0) //发送完成{channel_->disableWriting(); //不可写了if(writeCompleteCallback_){//唤醒loop对应的thread线程,执行回调loop_->queueInLoop(std::bind(writeCompleteCallback_,shared_from_this()));}if(state_ == kDisconnecting){shutdownInLoop();// 在当前loop中删除TcpConnection}}}else{LOG_ERROR("TcpConnection::handleWrite");}}else{LOG_ERROR("TcpConnection fd=%d is down, no more writing \n",channel_->fd());}
}
  1. 如果有可写,通过fd发送数据,直到发送完成
  2. 设置不可写,如果`writeCompleteCallback_`,唤醒`loop`对应的`thread`线程,执行回调
  3. 当前`tcp`正在断开连接,调用`shutdownInLoop`,在当前`loop`中删除`TcpConnection`
void TcpConnection::handleClose()
{LOG_INFO("TcpConnection::handleClose fd=%d state=%d \n",channel_->fd(),(int)state_);setState(kDisconnected);channel_->disableAll();TcpConnectionPtr connPtr(shared_from_this());connectionCallback_(connPtr); //执行连接关闭的回调closeCallback_(connPtr); //关闭连接的回调 TcpServer => TcpServer::removeConnection
}

处理逻辑就是将这个`TcpConnection`对象中的channel_从事件监听器中移除。然后调用`connectionCallback_`和`closeCallback_`保存的回调函数。`closeCallback_`在`TcpServer::newConnection()`为新连接新建`TcpConnection`时,已设为`TcpServer::removeConnection()`,而`removeConnection()`最终会调用`TcpConnection::connectDestroyed()`来销毁连接资源。

void TcpConnection::connectDestroyed()
{if(state_ == kConnected){setState(kDisconnected);channel_->disableAll(); //把channel所有感兴趣的事件,从poller中del掉connectionCallback_(shared_from_this());}channel_->remove();//把channel从poller中删除掉
}
 //关闭连接
void TcpConnection::shutdown()
{if(state_ == kConnected){setState(kDisconnecting);loop_->runInLoop(std::bind(&TcpConnection::shutdownInLoop,this));}
}void TcpConnection::shutdownInLoop()
{if(!channel_->isWriting()) //说明当前outputBuffer中的数据已经全部发送完成{socket_->shutdowmWrite(); // 关闭写端}
}

为什么只关闭了写端?

Muduo TcpConnection 没有提供 close,而只提供 shutdown ,这么做是为了收发数据的完整性。

TCP 是一个全双工协议,同一个文件描述符既可读又可写, shutdownWrite() 关闭了“写”方向的连接,保留了“读”方向,这称为 TCP half-close。如果直接 close(socket_fd),那么 socket_fd 就不能读或写了。

用 shutdown 而不用 close 的效果是,如果对方已经发送了数据,这些数据还“在路上”,那么 muduo 不会漏收这些数据。换句话说,muduo 在 TCP 这一层面解决了“当你打算关闭网络连接的时候,如何得知对方有没有发了一些数据而你还没有收到?”这一问题。当然,这个问题也可以在上面的协议层解决,双方商量好不再互发数据,就可以直接断开连接。

等于说 muduo 把“主动关闭连接”这件事情分成两步来做,如果要主动关闭连接,它会先关本地“写”端,等对方关闭之后,再关本地“读”端。被动关闭(muduo 在 read() 返回 0 的时候会回调 connection callback,这样客户代码就知道对方断开连接了。)

Muduo 这种关闭连接的方式对对方也有要求,那就是对方 read() 到 0 字节之后会主动关闭连接(无论 shutdownWrite() 还是 close()),一般的网络程序都会这样,不是什么问题。当然,这么做有一个潜在的安全漏洞,万一对方故意不不关,那么 muduo 的连接就一直半开着,消耗系统资源。

完整的流程是:我们发完了数据,于是 shutdownWrite,发送 TCP FIN 分节,对方会读到 0 字节,然后对方通常会关闭连接,这样 muduo 会读到 0 字节,然后 muduo 关闭连接。

另外,如果有必要,对方可以在 read() 返回 0 之后继续发送数据,这是直接利用了 half-close TCP 连接。muduo 会收到这些数据,通过 message callback 通知客户代码。

那么 muduo 什么时候真正 close socket 呢?在 TcpConnection 对象析构的时候。TcpConnection 持有一个 Socket 对象,Socket 是一个 RAII handler,它的析构函数会 close(sockfd_)。这样,如果发生 TcpConnection 对象泄漏,那么我们从 /proc/pid/fd/ 就能找到没有关闭的文件描述符,便于查错。

muduo 在 read() 返回 0 的时候会回调 connection callback,然后把 TcpConnection 的引用计数减一,如果 TcpConnection 的引用计数降到零,它就会析构了。

相关文章:

  • 长沙营销型网站建设费用产品全网营销推广
  • wordpress mobile主题seo快速排名软件
  • 网站运营和维护搜索指数在线查询
  • 怎样找到工厂直招网站刷网站seo排名软件
  • 独立站建站详细步骤百度链接收录提交入口
  • 武汉营销网站建设公司手机如何创建网站
  • 融合聚类与分类的退役锂电智能分选技术:助力新能源汽车产业可持续发展
  • 深度学习实战112-基于大模型Qwen+RAG+推荐算法的作业互评管理系统设计与实现
  • 如何在 Manjaro Linux 上安装 Docker 容器
  • 记一次AWS 中RDS优化费用使用的案例
  • 用 Docker 构建你的第一个 Python Flask 程序
  • MiniMax-M1 混合专家模型与 DeepSeek 一体机的能效革命
  • 命名数据网络 | TLV 编码
  • 左神算法之有序二维矩阵中的目标值查找
  • Vue基础(16)_Vue侦听数据改变的原理(对象)、Vue.set/vm.$set方法的使用
  • 北斗导航 | 基于CNN-LSTM-PSO算法的接收机自主完好性监测算法
  • LeetCode 3258.统计满足K约束的子字符串数量1
  • 【智能协同云图库】智能协同云图库第二弹:用户管理系统后端设计与接口开发
  • AR美型SDK,重塑面部美学,开启智能美颜新纪元
  • 定制开发开源AI智能名片与S2B2C商城小程序的内容分发体系构建:基于“1+N“素材复用模型的创新实践
  • 如何在 Manjaro Linux 的图像界面上安装 Stremio 而不是使用命令行
  • 服务器不支持PUT,DELETE 的解决方案
  • C# Avalonia 的 Source Generators 用处
  • ZArchiver×亚矩云手机:云端文件管理的“超维解压”革命
  • 以太网基础②RGMII 与 GMII 转换电路设计
  • Spring AOP 中有多个切面时执行顺序是怎样的?