手写muduo网络库(九):TcpConnection
一、引言
在网络编程中,TCP 连接是非常重要的一部分。TcpConnection
类在 muduo 网络库中承担着管理单个 TCP 连接的重任,它封装了与 TCP 连接相关的各种操作,如数据的读写、连接的建立与销毁等。本文将详细剖析 TcpConnection
类的代码,深入探讨其功能和实现逻辑。
二、功能概述
TcpConnection
类的主要功能包括:
- 管理 TCP 连接状态:跟踪连接的建立、连接中、已连接和断开连接等状态。
- 数据读写:提供数据发送和接收的接口,支持普通数据和文件的发送。
- 事件处理:处理连接上的读、写、关闭和错误事件。
- 回调机制:允许用户注册连接建立、消息接收、数据发送完成、高水位标记和连接关闭等回调函数。
三、代码结构与实现细节
3.1 头文件 TcpConnection.h
#pragma once#include "NonCopyable.h"
#include "InetAddress.h"
#include "Callbacks.h"
#include "Buffer.h"
#include "Timestamp.h"#include <memory>
#include <string>
#include <atomic>class Channel;
class EventLoop;
class Socket;class TcpConnection : NonCopyable, public std::enable_shared_from_this<TcpConnection>
{
public:TcpConnection(EventLoop *loop,const std::string &nameArg,int sockfd,const InetAddress &localAddr,const InetAddress &peerAddr);~TcpConnection();EventLoop *getLoop() const { return loop_; }const std::string &name() const { return name_; }const InetAddress &localAddress() const { return localAddr_; }const InetAddress &peerAddress() const { return peerAddr_; }bool connected() const { return state_ == kConnected; }// 发送数据void send(const std::string &buf);void sendFile(int fileDescriptor, off_t offset, size_t count); // 关闭半连接void shutdown();void setConnectionCallback(const ConnectionCallback &cb){ connectionCallback_ = cb; }void setMessageCallback(const MessageCallback &cb){ messageCallback_ = cb; }void setWriteCompleteCallback(const WriteCompleteCallback &cb){ writeCompleteCallback_ = cb; }void setCloseCallback(const CloseCallback &cb){ closeCallback_ = cb; }void setHighWaterMarkCallback(const HighWaterMarkCallback &cb, size_t highWaterMark){ highWaterMarkCallback_ = cb; highWaterMark_ = highWaterMark; }// 连接建立void connectEstablished();// 连接销毁void connectDestroyed();private:enum StateE{kDisconnected,kConnecting,kConnected,kDisconnecting };void setState(StateE state) { state_ = state; }void handleRead(Timestamp receiveTime);void handleWrite();void handleClose();void handleError();void sendInLoop(const void *data, size_t len);void shutdownInLoop();void sendFileInLoop(int fileDescriptor, off_t offset, size_t count);EventLoop *loop_;const std::string name_;std::atomic_int state_;bool reading_;std::unique_ptr<Socket> socket_;std::unique_ptr<Channel> channel_;const InetAddress localAddr_;const InetAddress peerAddr_;ConnectionCallback connectionCallback_;MessageCallback messageCallback_;WriteCompleteCallback writeCompleteCallback_;HighWaterMarkCallback highWaterMarkCallback_;CloseCallback closeCallback_;size_t highWaterMark_;Buffer inputBuffer_;Buffer outputBuffer_;
};
3.1.1 类定义与继承
TcpConnection
继承自NonCopyable
,这是为了防止该类被拷贝,因为 TCP 连接是唯一的,不应该被复制。std::enable_shared_from_this<TcpConnection>
是一个模板类,它允许在TcpConnection
对象内部安全地获取指向自身的std::shared_ptr
,这在回调函数中传递对象的所有权时非常有用。
3.1.2 成员变量
EventLoop *loop_
:指向所属的事件循环,用于处理该连接上的事件。const std::string name_
:连接的名称,方便调试和日志记录。std::atomic_int state_
:连接的状态,使用std::atomic
保证线程安全,避免多线程环境下的竞态条件。std::unique_ptr<Socket> socket_
和std::unique_ptr<Channel> channel_
:分别管理底层的套接字和事件通道,使用std::unique_ptr
确保资源的自动释放。const InetAddress localAddr_
和const InetAddress peerAddr_
:分别表示本地地址和对端地址。ConnectionCallback
、MessageCallback
等各种回调函数:用于处理连接建立、消息接收等事件。Buffer inputBuffer_
和Buffer outputBuffer_
:分别用于存储接收到的数据和待发送的数据。
3.1.3 成员函数
- 构造函数:初始化连接的各种属性,设置通道的回调函数,并开启保活机制。
- 析构函数:输出日志信息,确保资源被正确释放。
- 数据发送函数:
send
和sendFile
提供了发送普通数据和文件的接口,它们会根据当前线程情况调用sendInLoop
或sendFileInLoop
。 - 关闭连接函数:
shutdown
用于关闭半连接,它会调用shutdownInLoop
实际执行关闭操作。 - 回调设置函数:允许用户注册各种回调函数,以便在相应事件发生时执行自定义操作。
- 连接建立和销毁函数:
connectEstablished
和connectDestroyed
分别处理连接的建立和销毁过程。
3.2 源文件 TcpConnection.cpp
3.2.1 构造函数
TcpConnection::TcpConnection(EventLoop *loop,const std::string &nameArg,int sockfd,const InetAddress &localAddr,const InetAddress &peerAddr): loop_(CheckLoopNotNull(loop)), name_(nameArg), state_(kConnecting), reading_(true), socket_(new Socket(sockfd)), channel_(new Channel(loop, sockfd)), localAddr_(localAddr), peerAddr_(peerAddr), highWaterMark_(64 * 1024 * 1024) // 64M
{channel_->setReadCallback(std::bind(&TcpConnection::handleRead, this, std::placeholders::_1));channel_->setWriteCallback(std::bind(&TcpConnection::handleWrite, this));channel_->setCloseCallback(std::bind(&TcpConnection::handleClose, this));channel_->setErrorCallback(std::bind(&TcpConnection::handleError, this));LOG_INFO << "TcpConnection::ctor[" << name_.c_str() << "]at fd= " << sockfd;socket_->setKeepAlive(true);
}
CheckLoopNotNull
函数确保事件循环指针不为空,避免出现空指针异常。- 使用
std::bind
将通道的各种回调函数绑定到TcpConnection
的成员函数上,这样当通道上发生相应事件时,会调用TcpConnection
对应的处理函数。 - 开启保活机制,防止连接在长时间无数据传输时被关闭。
3.2.2 数据发送函数
void TcpConnection::send(const std::string &buf)
{if (state_ == kConnected){if (loop_->isInLoopThread()){sendInLoop(buf.c_str(), buf.size());}else{loop_->runInLoop(std::bind(&TcpConnection::sendInLoop, this, buf.c_str(), buf.size()));}}
}void TcpConnection::sendInLoop(const void *data, size_t len)
{ssize_t nwrote = 0;size_t remaining = len;bool faultError = false;if (state_ == kDisconnected){LOG_ERROR << "disconnected, give up writing";}if (!channel_->isWriting() || outputBuffer_.readableBytes() == 0){nwrote = ::write(channel_->fd(), data, len);if (nwrote >= 0){remaining = len - nwrote;if (remaining == 0 && writeCompleteCallback_){loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));}}else // nwrote < 0{nwrote = 0;if (errno != EWOULDBLOCK) // EWOULDBLOCK表示非阻塞情况下没有数据后的正常返回 等同于EAGAIN{LOG_ERROR << "TcpConnection::sendInLoop";if (errno == EPIPE || errno == ECONNRESET) // SIGPIPE RESET{faultError = true;}}}}if (!faultError && remaining > 0){size_t oldLen = outputBuffer_.readableBytes();if (oldLen + remaining >= highWaterMark_ && oldLen < highWaterMark_ && highWaterMarkCallback_){loop_->queueInLoop(std::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining));}outputBuffer_.append((char *)data + nwrote, remaining);if (!channel_->isWriting()){channel_->enableWriting(); // 这里一定要注册channel的写事件 否则poller不会给channel通知epollout}}
}
send
函数会根据当前线程是否是事件循环线程,决定是直接调用sendInLoop
还是通过事件循环的runInLoop
函数在事件循环线程中调用。sendInLoop
函数首先尝试直接将数据写入套接字,如果写入成功且数据全部发送完,则调用writeCompleteCallback_
回调函数。如果写入失败且不是非阻塞无数据的正常情况,则记录错误信息。- 如果还有剩余数据未发送,则将其追加到
outputBuffer_
中,并注册通道的写事件,以便后续继续发送。
3.2.3 关闭连接函数
void TcpConnection::shutdown()
{if (state_ == kConnected){setState(kDisconnecting);loop_->runInLoop(std::bind(&TcpConnection::shutdownInLoop, this));}
}void TcpConnection::shutdownInLoop()
{if (!channel_->isWriting()) // 说明当前outputBuffer_的数据全部向外发送完成{socket_->shutdownWrite();}
}
shutdown
函数将连接状态设置为kDisconnecting
,并通过事件循环的runInLoop
函数在事件循环线程中调用shutdownInLoop
。shutdownInLoop
函数检查outputBuffer_
中的数据是否全部发送完,如果是,则关闭写端。
3.2.4 连接建立和销毁函数
// 连接建立
void TcpConnection::connectEstablished()
{setState(kConnected);channel_->tie(shared_from_this());channel_->enableReading(); // 向poller注册channel的EPOLLIN读事件// 新连接建立 执行回调connectionCallback_(shared_from_this());
}
// 连接销毁
void TcpConnection::connectDestroyed()
{if (state_ == kConnected){setState(kDisconnected);channel_->disableAll(); // 把channel的所有感兴趣的事件从poller中删除掉connectionCallback_(shared_from_this());}channel_->remove(); // 把channel从poller中删除掉
}
connectEstablished
函数将连接状态设置为kConnected
,绑定通道和TcpConnection
对象,注册通道的读事件,并调用connectionCallback_
回调函数。connectDestroyed
函数将连接状态设置为kDisconnected
,禁用通道的所有事件,调用connectionCallback_
回调函数,最后从事件循环中移除通道。
3.2.5 事件处理函数
// 读是相对服务器而言的 当对端客户端有数据到达 服务器端检测到EPOLLIN 就会触发该fd上的回调 handleRead取读走对端发来的数据
void TcpConnection::handleRead(Timestamp receiveTime)
{int savedErrno = 0;ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);if (n > 0) // 有数据到达{// 已建立连接的用户有可读事件发生了 调用用户传入的回调操作onMessage shared_from_this就是获取了TcpConnection的智能指针messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);}else if (n == 0) // 客户端断开{handleClose();}else // 出错了{errno = savedErrno;LOG_ERROR << "TcpConnection::handleRead";handleError();}
}void TcpConnection::handleWrite()
{if (channel_->isWriting()){int savedErrno = 0;ssize_t n = outputBuffer_.writeFd(channel_->fd(), &savedErrno);if (n > 0){outputBuffer_.retrieve(n);//从缓冲区读取reable区域的数据移动readindex下标if (outputBuffer_.readableBytes() == 0){channel_->disableWriting();if (writeCompleteCallback_){// TcpConnection对象在其所在的subloop中 向pendingFunctors_中加入回调loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));}if (state_ == kDisconnecting){shutdownInLoop(); // 在当前所属的loop中把TcpConnection删除掉}}}else{LOG_ERROR << "TcpConnection::handleWrite";}}else{LOG_ERROR << "TcpConnection fd= " << channel_->fd() << " is down, no more writing";}
}void TcpConnection::handleClose()
{LOG_INFO << "TcpConnection::handleClose fd= " << channel_->fd() << " state= " << (int)state_;setState(kDisconnected);channel_->disableAll();TcpConnectionPtr connPtr(shared_from_this());connectionCallback_(connPtr); // 连接回调closeCallback_(connPtr); // 执行关闭连接的回调 执行的是TcpServer::removeConnection回调方法 // must be the last line
}void TcpConnection::handleError()
{int optval;socklen_t optlen = sizeof optval;int err = 0;if (::getsockopt(channel_->fd(), SOL_SOCKET, SO_ERROR, &optval, &optlen) < 0){err = errno;}else{err = optval;}LOG_ERROR << "TcpConnection::handleError name: " << name_.c_str() << " - SO_ERROR: " << err;
}
handleRead
函数从套接字读取数据到inputBuffer_
中,如果有数据到达,则调用messageCallback_
回调函数;如果客户端断开连接,则调用handleClose
函数;如果出错,则调用handleError
函数。handleWrite
函数将outputBuffer_
中的数据写入套接字,如果数据全部发送完,则禁用通道的写事件,调用writeCompleteCallback_
回调函数,并在需要时关闭连接。handleClose
函数将连接状态设置为kDisconnected
,禁用通道的所有事件,调用connectionCallback_
和closeCallback_
回调函数。handleError
函数获取套接字的错误信息,并记录错误日志。
3.2.6 零拷贝发送文件函数
// 新增的零拷贝发送函数
void TcpConnection::sendFile(int fileDescriptor, off_t offset, size_t count)
{if (connected()) {if (loop_->isInLoopThread()) { sendFileInLoop(fileDescriptor, offset, count);}else{ loop_->runInLoop(std::bind(&TcpConnection::sendFileInLoop, shared_from_this(), fileDescriptor, offset, count));}} else {LOG_ERROR << "TcpConnection::sendFile - not connected";}
}// 在事件循环中执行sendfile
void TcpConnection::sendFileInLoop(int fileDescriptor, off_t offset, size_t count)
{ssize_t bytesSent = 0; // 发送了多少字节数size_t remaining = count; // 还要多少数据要发送bool faultError = false; // 错误的标志位if (state_ == kDisconnecting) { // 表示此时连接已经断开就不需要发送数据了LOG_ERROR << "disconnected, give up writing";return;}// 表示Channel第一次开始写数据或者outputBuffer缓冲区中没有数据if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) {bytesSent = sendfile(socket_->fd(), fileDescriptor, &offset, remaining);if (bytesSent >= 0) {remaining -= bytesSent;if (remaining == 0 && writeCompleteCallback_) {// remaining为0意味着数据正好全部发送完,就不需要给其设置写事件的监听。loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));}} else{ // bytesSent < 0if (errno != EWOULDBLOCK) { // 如果是非阻塞没有数据返回错误这个是正常显现等同于EAGAIN,否则就异常情况LOG_ERROR << "TcpConnection::sendFileInLoop";}if (errno == EPIPE || errno == ECONNRESET) {faultError = true;}}}// 处理剩余数据if (!faultError && remaining > 0) {// 继续发送剩余数据loop_->queueInLoop(std::bind(&TcpConnection::sendFileInLoop, shared_from_this(), fileDescriptor, offset, remaining));}
}
sendFile
函数提供了发送文件的接口,它会根据当前线程情况调用sendFileInLoop
。sendFileInLoop
函数使用sendfile
系统调用实现零拷贝发送文件,如果数据全部发送完,则调用writeCompleteCallback_
回调函数;如果还有剩余数据,则通过事件循环的queueInLoop
函数继续发送。
四、设计思路与 C++ 语言特性细节
4.1 设计思路
- 分层架构:
TcpConnection
类将底层的套接字操作和事件处理封装起来,提供了高层的接口,使得上层代码可以方便地使用 TCP 连接。 - 回调机制:通过注册回调函数,用户可以自定义连接建立、消息接收等事件的处理逻辑,提高了代码的灵活性和可扩展性。
- 事件驱动:使用事件循环和通道来处理连接上的事件,避免了轮询带来的性能开销。
4.2 C++ 语言特性细节
- 智能指针:使用
std::unique_ptr
管理Socket
和Channel
对象,确保资源的自动释放;使用std::shared_ptr
和std::enable_shared_from_this
安全地传递对象的所有权。 std::bind
和std::placeholders
:用于绑定成员函数和参数,实现回调函数的注册。std::atomic
:保证连接状态的线程安全,避免多线程环境下的竞态条件。
五、总结
TcpConnection
类是 muduo 网络库中非常重要的一部分,它封装了 TCP 连接的各种操作,提供了灵活的回调机制和高效的事件处理方式。通过深入理解 TcpConnection
类的代码,我们可以学习到网络编程中的一些重要设计思路和 C++ 语言的高级特性。