[TcpConnection]
`TcpConnection`类是muduo最核心的类,这个类主要封装了一个已建立的TCP连接,以及控制该TCP连接的方法(连接建立和关闭和销毁),以及该连接发生的各种事件(读/写/错误/连接)对应的处理函数,以及这个TCP连接的服务端和客户端的套接字地址信息等。
成员变量
private:EventLoop*loop_; //这里绝对不是Mainloop,因为tcpconnection都是在subloop里边管理的const string name_;atomic<int> state_;bool reading;unique_ptr<Socket> socket_;unique_ptr<Channle> channel_;const InetAddress localAddr_;const InetAddress peerAddr_;ConnectionCallback connectionCallback_; // 有新连接回调 void(const TcpConnectionPtr&)MessageCallback messageCallback_; //有读写消息回调 void(constTcpConnectionPtr&,Buffer*,Timestamp)WriteCompleteCallback writeCompleteCallback_ // 消息发送完成以后的回调 void(const TcpConnectionPtr&)CloseCallback closeCallbacl_; //void(const TcpConnectionPtr&)HighWaterMarkCallback highWaterMarkCallback_; //void(const TcpConnectionPtr&,size_t)size_t highWaterMark_;Buffer inputBuffer_; //接受缓冲区Buffer outputBuffer_; //发送缓冲区enum StateE{kDisconnected,kConnecting,kConnected,kDisconnecting};
- `loop_`该`Tcp`连接的Channel注册到了哪一个sub EventLoop上。这个loop_就是那一个sub EventLoop。
- `highWaterMark_` 因为发送数据,应用写得快,内核发送数据慢,需要把待发送的数据写入缓冲区,且设置了水位回调,防止发送太快
- `inputBuffer_ outputBuffer_` 输入输出缓冲区,在输出缓冲区是用于暂存那些暂时发送不出去的待发送数据。因为`Tcp`发送缓冲区是有大小限制的,假如达到了高水位线,就没办法把发送的数据通过send()直接拷贝到`Tcp`发送缓冲区,而是暂存在这个`outputBuffer_`中,等TCP发送缓冲区有空间了,触发可写事件了,再把`outputBuffer_`中的数据拷贝到`Tcp`发送缓冲区中。
重要成员函数
void TcpConnection::send(const std::string &buf) //直接引用buffer
{if(state_ == kConnected){if(loop_->isInLoopThread()){//string.c_str是Borland封装的String类中的一个函数,它返回当前字符串的首字符地址。sendInLoop(buf.c_str(),buf.size());}else{loop_->runInLoop(std::bind(&TcpConnection::sendInLoop, this, buf.c_str(), buf.size()));}}
}
void TcpConnection::sendInLoop(const void* data, size_t len)
{ssize_t nwrote = 0;size_t remaining = len; //未发送的数据bool faultError = false; //记录是否产生错误//之前调用过connection的shutdown 不能在发送了if(state_ == kDisconnected){LOG_ERROR("disconnected,give up writing!");return ;}//channel 第一次开始写数据,且缓冲区没有待发送数据if(!channel_->isWriting() && outputBuffer_.readableBytes() == 0){nwrote = ::write(channel_->fd(),data,len);if(nwrote >= 0){remaining = len - nwrote;if(remaining == 0 && writeCompleteCallback_){loop_->queueInLoop(std::bind(writeCompleteCallback_,shared_from_this()));}}else{nwrote = 0;if(errno != EWOULDBLOCK) //用于非阻塞模式,不需要重新读或者写{LOG_ERROR("TcpConnection::sendInLoop");if(errno == EPIPE || errno == ECONNRESET) //SIGPIPE RESET{faultError = true;}}}}if(!faultError && remaining > 0) {//目前发送缓冲区剩余的待发送数据的长度size_t oldlen = outputBuffer_.readableBytes();if(oldlen + remaining >= highWaterMark_ && oldlen < highWaterMark_&& highWaterMark_){loop_->queueInLoop(std::bind(highWaterMarkCallback_,shared_from_this(),oldlen + remaining));}outputBuffer_.append((char*)data + nwrote,remaining);if(!channel_->isWriting()){channel_->enableWriting(); //注册channel写事件,否则poller不会向channel通知epollout}}
}
- 发送数据要发送的数据长度是`len`,如果在`loop_`在当前的线程里面,就调用`sendInLoop`,`sendInLoop`内部实际上是调用了系统的`write`,如果一次性发送完了,就设置`writeCompleteCallback_`,表明不要再给channel设置`epollout`事件了
- 如果没有写完,先计算一下`oldlen`目前发送缓冲区剩余的待发送数据的长度。满足:`if(oldlen + remaining >= highWaterMark_ && oldlen < highWaterMark_&& highWaterMark_)` 就会触发高水位回调
- 不满足以上的话,直接写入`outputBuffer_`
- 剩余的数据保存到缓冲区当中,要给给`channel`注册`epollout`事件(**切记,一定要注册channel的写事件,否则`poller`不会向channel通知`epollout`**),这样`poller`发现`tcp`发送缓冲区有空间,会通知相应的`socket-channel`调用相应的`writeCallback()`回调方法,也就是调用`TcpConnection::handleWrite`,把发送缓冲区中数据全部发送出去。
void TcpConnection::handleRead(TimeStamp receiveTime)
{int savedErrno = 0;ssize_t n = inputBuffer_.readFd(channel_->fd(),&savedErrno);if(n > 0){ messageCallback_(shared_from_this(),&inputBuffer_,receiveTime);}else if(n==0) //客户端断开{handleClose();} else{errno = savedErrno;LOG_ERROR("TcpConnection::hanleRead");handleError();}}
负责处理`Tcp`连接的可读事件,它会将客户端发送来的数据拷贝到用户缓冲区中`inputBuffer_`,然后再调用`connectionCallback_`保存的连接建立后的处理函数。
void TcpConnection::handleWrite()
{if(channel_->isWriting()){int savedErrno = 0;ssize_t n = outputBuffer_.writeFd(channel_->fd(),&savedErrno);if(n > 0){outputBuffer_.retrieve(n); //处理了n个if(outputBuffer_.readableBytes() == 0) //发送完成{channel_->disableWriting(); //不可写了if(writeCompleteCallback_){//唤醒loop对应的thread线程,执行回调loop_->queueInLoop(std::bind(writeCompleteCallback_,shared_from_this()));}if(state_ == kDisconnecting){shutdownInLoop();// 在当前loop中删除TcpConnection}}}else{LOG_ERROR("TcpConnection::handleWrite");}}else{LOG_ERROR("TcpConnection fd=%d is down, no more writing \n",channel_->fd());}
}
- 如果有可写,通过fd发送数据,直到发送完成
- 设置不可写,如果`writeCompleteCallback_`,唤醒`loop`对应的`thread`线程,执行回调
- 当前`tcp`正在断开连接,调用`shutdownInLoop`,在当前`loop`中删除`TcpConnection`
void TcpConnection::handleClose()
{LOG_INFO("TcpConnection::handleClose fd=%d state=%d \n",channel_->fd(),(int)state_);setState(kDisconnected);channel_->disableAll();TcpConnectionPtr connPtr(shared_from_this());connectionCallback_(connPtr); //执行连接关闭的回调closeCallback_(connPtr); //关闭连接的回调 TcpServer => TcpServer::removeConnection
}
处理逻辑就是将这个`TcpConnection`对象中的channel_从事件监听器中移除。然后调用`connectionCallback_`和`closeCallback_`保存的回调函数。`closeCallback_`在`TcpServer::newConnection()`为新连接新建`TcpConnection`时,已设为`TcpServer::removeConnection()`,而`removeConnection()`最终会调用`TcpConnection::connectDestroyed()`来销毁连接资源。
void TcpConnection::connectDestroyed()
{if(state_ == kConnected){setState(kDisconnected);channel_->disableAll(); //把channel所有感兴趣的事件,从poller中del掉connectionCallback_(shared_from_this());}channel_->remove();//把channel从poller中删除掉
}
//关闭连接
void TcpConnection::shutdown()
{if(state_ == kConnected){setState(kDisconnecting);loop_->runInLoop(std::bind(&TcpConnection::shutdownInLoop,this));}
}void TcpConnection::shutdownInLoop()
{if(!channel_->isWriting()) //说明当前outputBuffer中的数据已经全部发送完成{socket_->shutdowmWrite(); // 关闭写端}
}
为什么只关闭了写端?
Muduo TcpConnection 没有提供 close,而只提供 shutdown ,这么做是为了收发数据的完整性。
TCP 是一个全双工协议,同一个文件描述符既可读又可写, shutdownWrite() 关闭了“写”方向的连接,保留了“读”方向,这称为 TCP half-close。如果直接 close(socket_fd),那么 socket_fd 就不能读或写了。
用 shutdown 而不用 close 的效果是,如果对方已经发送了数据,这些数据还“在路上”,那么 muduo 不会漏收这些数据。换句话说,muduo 在 TCP 这一层面解决了“当你打算关闭网络连接的时候,如何得知对方有没有发了一些数据而你还没有收到?”这一问题。当然,这个问题也可以在上面的协议层解决,双方商量好不再互发数据,就可以直接断开连接。
等于说 muduo 把“主动关闭连接”这件事情分成两步来做,如果要主动关闭连接,它会先关本地“写”端,等对方关闭之后,再关本地“读”端。被动关闭(muduo 在 read() 返回 0 的时候会回调 connection callback,这样客户代码就知道对方断开连接了。)
Muduo 这种关闭连接的方式对对方也有要求,那就是对方 read() 到 0 字节之后会主动关闭连接(无论 shutdownWrite() 还是 close()),一般的网络程序都会这样,不是什么问题。当然,这么做有一个潜在的安全漏洞,万一对方故意不不关,那么 muduo 的连接就一直半开着,消耗系统资源。
完整的流程是:我们发完了数据,于是 shutdownWrite,发送 TCP FIN 分节,对方会读到 0 字节,然后对方通常会关闭连接,这样 muduo 会读到 0 字节,然后 muduo 关闭连接。
另外,如果有必要,对方可以在 read() 返回 0 之后继续发送数据,这是直接利用了 half-close TCP 连接。muduo 会收到这些数据,通过 message callback 通知客户代码。
那么 muduo 什么时候真正 close socket 呢?在 TcpConnection 对象析构的时候。TcpConnection 持有一个 Socket 对象,Socket 是一个 RAII handler,它的析构函数会 close(sockfd_)。这样,如果发生 TcpConnection 对象泄漏,那么我们从 /proc/pid/fd/ 就能找到没有关闭的文件描述符,便于查错。
muduo 在 read() 返回 0 的时候会回调 connection callback,然后把 TcpConnection 的引用计数减一,如果 TcpConnection 的引用计数降到零,它就会析构了。