《Muduo网络库:TcpConnection类》
Acceptor类是运行在mainLoop中,负责监听新用户的连接,那么已经建立的连接该如何管理呢?TcpConnection类,是Reactor模型中管理单个已建立的TCP连接的生命周期、数据读写以及事件回调的关键类。它封装了Socket、Channel和缓冲区,确保所有操作都在正确的EventLoop中执行,避免线程安全问题。
TcpConnection的核心作用是“管理已建立的TCP连接”,具体职责包括:
- 绑定连接的Socket和Channel,将I/O事件(读/写/关闭/错误)与回调函数关联。
- 管理输入输出缓冲区,解决“应用读写速度不匹配”和“TCP粘包”问题。
- 维护连接状态(连接中/断开中/已连接/已断开),确保操作(发送/关闭)在合法状态下执行。
- 提供统一的回调注册接口,让上层模块(TcpServer)自定义连接、消息、关闭等事件处理逻辑。
Callbacks.h
#pragma once#include <memory>
#include <functional>class Buffer;
class TcpConnection;
class Timestamp;using TcpConnectionPtr = std::shared_ptr<TcpConnection>;
using ConnectionCallback = std::function<void(const TcpConnectionPtr &)>;
using CloseCallback = std::function<void(const TcpConnectionPtr &)>;
using WriteCompleteCallback = std::function<void(const TcpConnectionPtr &)>;
using MessageCallback = std::function < void(const TcpConnectionPtr &,Buffer *, Timestamp)>;
using HighWaterMarkCallback = std::function<void(const TcpConnectionPtr &, size_t)>;TcpConnection.h
#pragma once#include "noncopyable.h"
#include "InetAddress.h"
#include "Callbacks.h"
#include "Buffer.h"
#include "Timestamp.h"#include <memory>
#include <string>
#include <atomic>class Channel;
class EventLoop;
class Socket;/*** TcpServer =》通过Acceptor 监听到有一个新用户连接,通过accept函数拿到connfd** =》connfd打包成一个TcpConnection =》TcpConnection设置回调** =》TcpConnection再把相应的回调设置给Channel =》Channel注册到Poller** =》Poller监听到Channel有事件发生之后,就调用Channel的回调*/class TcpConnection : noncopyable, public std::enable_shared_from_this<TcpConnection>
{
public:TcpConnection(EventLoop *loop,const std::string &nameArg,int sockfd,const InetAddress &localAddr,const InetAddress &peerAddr);~TcpConnection();EventLoop *getLoop() const { return loop_; }const std::string &name() const { return name_; }const InetAddress &localAddress() const { return localAddr_; }const InetAddress &peerAddress() const { return peerAddr_; }bool connected() const { return state_ == kConnected; }// 发送数据void send(const std::string& buf);// 关闭连接void shutdown();void setConnectionCallback(const ConnectionCallback &cb) { connectionCallback_ = cb; }void setMessageCallback(const MessageCallback &cb) { messageCallback_ = cb; }void setWriteCompleteCallback(const WriteCompleteCallback &cb) { writeCompleteCallback_ = cb; }void setHighWaterMarkCallback(const HighWaterMarkCallback &cb, size_t highWaterMark) { highWaterMarkCallback_ = cb, highWaterMark = highWaterMark_; }void setCloseCallback(const CloseCallback &cb) { closeCallback_ = cb; }// 连接建立void connectEstablished();// 连接销毁void connectDestroyed();private:enum StateE { kDisconnected, kConnecting,kConnected, kDisconnecting }; // 表示连接的状态void setState(StateE state) { state_ = state; }void handleRead(Timestamp receiveTime);void handleWrite();void handleClose();void handleError();void sendInLoop(const void *data, size_t len);void shutdownInLoop();EventLoop *loop_; // 这里绝对不是baseLoop,因为TcpConnection都是在subLoop里面管理的const std::string name_;std::atomic_int state_;bool reading_;// 和Acceptor类似 Acceptor=》mainLoop TcpConnection=》subLoopstd::unique_ptr<Socket> socket_;std::unique_ptr<Channel> channel_;const InetAddress localAddr_;const InetAddress peerAddr_;ConnectionCallback connectionCallback_; // 有新连接时的回调MessageCallback messageCallback_; // 有读写消息时的回调WriteCompleteCallback writeCompleteCallback_; // 消息发送完成以后的回调HighWaterMarkCallback highWaterMarkCallback_;CloseCallback closeCallback_;size_t highWaterMark_;Buffer inputBuffer_; // 接收数据的缓冲区Buffer outputBuffer_; // 发送数据的缓冲区
};TcpConnection.cc
#include "TcpConnection.h"
#include "Logger.h"
#include "Socket.h"
#include "Channel.h"
#include "EventLoop.h"#include <functional>
#include <errno.h>
#include <memory>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/tcp.h>
#include <strings.h>
#include <string>
#include <unistd.h>static EventLoop *CheckNotNull(EventLoop *loop)
{if (loop == nullptr){LOG_FATAL("%s:%s:%d TcpConnection Loop is null! \n", __FILE__, __FUNCTION__, __LINE__);}return loop;
}TcpConnection::TcpConnection(EventLoop *loop,const std::string &nameArg,int sockfd,const InetAddress &localAddr,const InetAddress &peerAddr): loop_(loop), name_(nameArg), state_(kConnecting), socket_(new Socket(sockfd)), channel_(new Channel(loop,sockfd)), localAddr_(localAddr), peerAddr_(peerAddr), highWaterMark_(64*1024*1024) // 64M
{// TcpConnection给channel设置相应事件的回调,Poller给channel通知感兴趣的事件发生了,channel会回调相应的操作函数channel_->setReadCallback(std::bind(&TcpConnection::handleRead, this, std::placeholders::_1));channel_->setWriteCallback(std::bind(&TcpConnection::handleWrite, this));channel_->setCloseCallback(std::bind(&TcpConnection::handleClose, this));channel_->setErrorCallback(std::bind(&TcpConnection::handleError, this));LOG_INFO("TcpConnection::ctor[%s] at fd=%d\n", name_.c_str(), sockfd);socket_->setKeepAlive(true);
}TcpConnection::~TcpConnection()
{LOG_INFO("TcpConnection::dtor[%s] at fd=%d state=%d \n", name_.c_str(), channel_->fd(), (int)state_);
}// 发送数据,提供send接口,供上层发送数据
void TcpConnection::send(const std::string &buf)
{if (state_ == kConnected){if (loop_->isInLoopThread()){sendInLoop(buf.c_str(), buf.size());}else{loop_->runInloop(std::bind(&TcpConnection::sendInLoop, this, buf.c_str(), buf.size()));}}
}/*** 发送数据 应用写的快,而内核发送数据慢,所以需要把待发送的数据写入缓冲区,并且设置了水位线*/
void TcpConnection::sendInLoop(const void *data, size_t len)
{ssize_t nwrote = 0; // 表示写了多少数据ssize_t remaining = len; // 表示写剩余的数据大小bool faultError = false;if (state_ == kDisconnected){LOG_ERROR("disconnected, give up writing!");return;}// 表示channel是第一次开始写(发送)数据,并且缓冲区没有待发送的数据(为空)if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0){nwrote = write(channel_->fd(), data, len);if (nwrote >= 0){remaining = len - nwrote;if (remaining == 0 && writeCompleteCallback_) // 数据全部发送完成,触发消息发送完成的回调{// 既然这里数据全部发送完成,就不用再给channel设置epollout事件了loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));}}else // 写失败,非EWOULDBLOCK,标记错误{nwrote = 0;if (errno != EWOULDBLOCK){LOG_ERROR("TcpConnection::sendInLoop");if (errno == EPIPE || errno == ECONNRESET) // SIGPIPE RESET{faultError = true;}}}}/*** 说明当前这一次write,并没有把数据全部发送出去,剩余的数据需要保存到缓冲区当中,* 然后给channel注册epollout事件,Poller发现tcp的发送缓冲区有数据,会通知相应的sock-》channel,* 调用writeCallback_回调方法,也就是调用TcpConnection::handleWrite方法,把发送缓冲区中的数据全部发送完成*/if (!faultError && remaining > 0){size_t oldLen = outputBuffer_.readableBytes(); // 目前发送缓冲区中已有的发送数据的长度(可读数据),待发送的数据量// 存入前待发送的数据量没有超过水位线、待发送的数据量+存入本次剩余的数据量超过了水位线、上层设置了高水位线回调if (oldLen + remaining >= highWaterMark_ && oldLen < highWaterMark_ && highWaterMarkCallback_){loop_->queueInLoop(std::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining));}// 剩余数据存入输出缓冲区outputBuffer_.append((char *)data + nwrote, remaining);if (!channel_->isWriting()){channel_->enableWriting(); // 这里一定要注册channel的写事件,否则Poller不会给channel通知epollout}}
}// 关闭连接
void TcpConnection::shutdown()
{if (state_ == kDisconnected){setState(kDisconnecting); // 切换状态为断开中loop_->queueInLoop(std::bind(&TcpConnection::shutdownInLoop, this));}
}void TcpConnection::shutdownInLoop()
{if (!channel_->isWriting()) // 说明当前outputBuffer缓冲区中的数据已经全部发送完成{socket_->shutdownWrite(); // 关闭写端}
}// 连接建立
void TcpConnection::connectEstablished()
{setState(kConnected);channel_->tie(shared_from_this()); // 绑定自身生命周期,避免channel回调时TcpConnection对象已经被销毁channel_->enableReading(); // 向Poller注册channel的epollin读事件connectionCallback_(shared_from_this()); // 触发连接建立回调(上层业务处理)
}// 连接销毁
void TcpConnection::connectDestroyed()
{if (state_ == kConnected){setState(kDisconnected);channel_->disableAll(); // 把channel所有感兴趣的事件,从Poller中移除del掉connectionCallback_(shared_from_this());}channel_->remove(); // 把channel从Poller中删除
}// 当Poller检测到channel有事件可读时,通过channel回调此函数
void TcpConnection::handleRead(Timestamp receiveTime)
{int savedErrno = 0;ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);if (n > 0){// 已建立连接的用户,有可读事件发生了,触发消息到达回调,即用户传入的回调操作onMessage,上层可从inputBuffer_中读取数据messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);}else if (n == 0) // 对端关闭连接,触发handleClose{handleClose();}else{errno = savedErrno;LOG_ERROR("TcpConnection::handleRead");handleError();}
}// 当Poller检测到channel有事件可写时,通过channel回调此函数继续发送缓冲数据
void TcpConnection::handleWrite()
{if (channel_->isWriting()){int savedErrno = 0;ssize_t n = outputBuffer_.writeFd(channel_->fd(), &savedErrno); // 从发送缓冲区向fd写数据if (n > 0){outputBuffer_.retrieve(n);if (outputBuffer_.readableBytes() == 0){// 缓冲区数据发送完,在Poller中取消channel的写事件channel_->disableWriting();if (writeCompleteCallback_){// 唤醒loop_对应的thread线程,执行回调loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));}// 若处于断开中,发送完数据以后关闭连接if (state_ == kDisconnecting){shutdownInLoop();}}}else{LOG_ERROR("TcpConnection::handleWrite");}}else{LOG_ERROR("TcpConnection fd=%d is down, no more writing \n", channel_->fd());}
}void TcpConnection::handleClose()
{LOG_INFO("TcpConnection::handleClose fd=%d state=%d \n", channel_->fd(), (int)state_);setState(kDisconnected);channel_->disableAll(); // 在Poller中取消该channel的所有注册的事件TcpConnectionPtr connPtr(shared_from_this());connectionCallback_(connPtr); // 执行连接关闭的回调,通知上层关闭连接closeCallback_(connPtr); // 关闭连接的回调,由TcpServer移除连接并销毁对象
}void TcpConnection::handleError()
{int optval;socklen_t optlen = sizeof optval;int err = 0;if (::getsockopt(channel_->fd(), SOL_SOCKET, SO_ERROR, &optval, &optlen) < 0){err = errno;}else{err = optval;}LOG_ERROR("TcpConnection::handleERROR name:%s - SO_ERROR:%d \n", name_.c_str(), err);
}上面就是TcpConnection类的实现代码,围绕TCP连接的完整生命周期展开,从连接建立、数据读写、连接关闭到资源销毁,全程基于Reactor事件驱动模型,且严格保证了线程安全。
下面我们按照TCP连接的生命周期阶段边拆解完整流程,边分析其代码设计。
一、初始化阶段:TcpConnection对象创建
TcpConnection对象并非主动创建,而是TcpServer通过Acceptor接受新连接后初始化,是被动创建的组件,流程如下:
1. 触发时机:Acceptor(运行在mainLoop)监听到客户端连接请求,掉用accept()获取新连接的sockfd。

2. 对象构造:TcpServer从EventLoopThreadPool线程池中,通过轮询选择一个subLoop,在该subLoop中创建TcpConnection对象,传入loop_(绑定的subLoop),sockfd(新连接的套接字),localAddr_/peerAddr_(本地服务端地址、客户端地址)。
3. 初始设置:
- 绑定sokfd到channel_以及socket_,初始状态设置为kConnecting(连接中)。
- 给channel_(封装sockfd的事件处理器)绑定4类事件回调:handleRead、handleWrite、handleClose、handleError。
- 调用socket_->setKeepAlive(true)开启Tcp保活机制,避免僵尸连接。
TcpServer通过Acceptor监听到有一个新用户的连接,通过accept拿到新连接的fd,将新连接的fd打包成一个TcpConnection,并设置相应的事件(读/写/错误/关闭)回调,TcpConnection再把相应的回调设置给Channel,Channel注册到Poller上,未来Poller监听到Channel上有事件发生时,就调用Channel的回调。
二、连接建立阶段:connectEstablished
TcpConnection创建后,需要调用connectEstablished()完成连接建立。
1. 状态切换:将连接状态state_从kConnecting(连接中)改为kConnected(已连接),标记连接已建立。
2. 生命周期绑定:调用channel_->tie(),将TcpConnection的生命周期与channel_绑定,确保channel_触发回调执行回调时,TcpConnection对象未被销毁(避免野指针),因为channel_执行的回调是TcpConnection对象设置的。
3. 注册读事件:调用channel_->enableReading(),将channel_注册到subLoop的Poller中,让Poller监听改sockfd的可读事件(等待客户端发送数据)。
4 .通知上层:触发connectionCallback_连接建立的回调(由上层TcpServer注册的回调),告知业务层“新连接已经建立”。
三、数据读写阶段:读(handleRead)+写(send/sendInLoop/handleWrite)
数据读写是TcpConnectin的核心功能,分为“接收客户端数据”和“向客户端发送数据”两类流程,均基于事件驱动。
1、接收数据:handleRead(被动读)
当客户端发送数据,Poller检测到sockfd可读,通过channel_调用它的可读事件回调,这个回调是TcpConnection给channel_绑定的handleRead,即channel_会回调handleRead。

1. 读取数据到缓冲区:调用inputBuffer_.readFd(channel_->fd(), &saveErrno),将sockfd中的数据读取到用户态的inputBuffer_(解决了TCP粘包问题,数据先存缓冲,再按协议解析)。
2. 分支处理:
- 若读取的字节数n>0:触发messageCallback_有消息时的回调(上层注册的回调),将inputBuffer_传递给业务层,让业务层按协议从inputBuffer_读取完整信息(如HTTP解析,自定义协议解包);
- 若n==0:表示客户端主动关闭连接(发送FIN包),调用handleClose()处理关闭逻辑;
- 若n<0:表示读取错误(如连接重置),调用handleError()记录错误信息(通过getsockopt获取具体错误码)。
2、发送数据:send -> sendInLoop -> handleWrite(主动写)

业务层调用send()向客户端发送数据,因为需要保证线程安全,所有操作都需要在subLoop的所属线程中完成,所以实际发送逻辑在sendInLoop中执行,流程分为两步:
(1)首次发送:sendInLoop
业务层调用send(const std::string& buf)后,先判断当前线程是否为subLoop线程:
- 若是:直接调用sendInLoop(buf.c_str(), buf.size());
- 若不是:通过loop_->runInLoop()将发送任务投递到任务队列中,让subLoop所属线程执行(避免多线程操作outputBuffer_)。

sendInLoop的核心逻辑(处理“应用写速>内核发速”的场景):
1. 尝试直接写内核:

若channel_未注册写事件且outputBuffer_输出缓冲区为空(无数据),表示是首次发送数据,直接调用write()向sockfd写数据。
- 若数据全部发送完(remaining剩余数据==0):触发writeCompleteCallback_消息写完时的回调(上层回调,进行后续处理),将该回调通过loop_->queueInLoop()放入任务队列,由subLoop的所属线程执行该回调,确保线程安全;
- 若数据未发送完(remaining剩余数据>0,如内核缓冲区满了):进入下一步。
2. 剩余缓冲区存缓冲:

检查“当前缓冲数据+未发送数据”是否超过了highWaterMark_水位线(设置为64M):若首次超过了阈值且注册了highWaterMarkCallback_高水位回调,触发该回调(上层可暂停接收新数据,避免缓冲溢出);
将未发完的reamining剩余数据通过outputBuffer_.append()存入outputBuffer_输出缓冲区;
调用channel_->enableWriting():向Poller注册写事件,等待内核缓冲区有空间时继续发送数据,未来内核缓冲区有空间可写了,Poller会通知channel_上有epollout可写事件。
(2):后续发送:handleWrite

当内核缓冲区有空间可写时,Poller检查到sockfd可写,通过channel_回调TcpConnection给其绑定的handleWrite回调:
1. 从缓冲写内核:调用outputBuffer_.writeFd(),将outputBuffer_中的剩余数据写入sockfd。
2. 清理与回调:调用outputBuffer_.retrieve()移除已发送的数据;
- 若缓冲区为空:表示数据已经发送完,调用channel_->disableWriting()在Poller中取消该channel_的写事件注册,避免Poller频繁通知),并触发writeCompleteCallback_回调;
- 若连接处于kDisconnecting(断开中):调用shutdownInLoop(),发送完缓冲数据后关闭连接。
四、连接关闭阶段:shutdown -> handleClose
连接关闭分为“主动关闭”(业务层触发)和“被动关闭”(客户端触发),最终都通过handleClose完成资源清理。
1、主动关闭:shutdown -> shutdownInLoop
业务层调用shutdown()主动关闭连接:

1. 状态切换:将state_从kConnected连接中改为kDisconnecting断开中,标记连接正在断开;
2. 投递关闭任务:通过loop_->queueInLoop()将shutdownInLoop投递到任务队列中,让subLoop所属线程执行;
3. 安全关闭:shutdownInLoop中,若channel_未注册写事件(缓冲数据已发完),调用socket_->shutdownWrite()关闭写端(发送FIN包,告知客户端“不再发数据”)。

2、被动关闭:handleRead(n==0)-> handleClose
客户端主动关闭连接,handleRead读取到n==0(接收FIN包),触发handleClose:

1. 状态切换:将state_改为kDisconnected;
2. 取消事件注册:调用channel_->disableAll(),从Poller中移除改channel_的所有感兴趣事件;
3. 通知上层:触发connectionCallback_(告知业务层“连接已关闭”,如清理业务数据、减少连接数);触发closeCallback_(上层TcpServer注册的回调,用于从ConnectionMap中移除当前连接,避免内存泄漏)。
五、资源销毁阶段:connectDestroyed
TcpServer收到closeCallback_后,调用connectDestroyed()完成最终的资源销毁:

1. 二次确认状态:若连接仍为kConnected,先切换为kDisconnected并取消事件注册;
2. 移除channel_:调用channel_->reamove(),将channel_从Poller中删除;
3. 资源释放:TcpConnection对象的shared_ptr引用计数减1,当引用计数为0时,自动析构socket_(关闭sockfd)和channel_,释放所有资源。
流程核心总结
TcpConnection的流程本质是“事件驱动+缓冲区缓冲+线程安全”的结合:
- 事件驱动:通过Channel和Poller监听IO事件,回调对应处理函数,避免轮询;
- 缓冲区:inputBuffer_解粘包,outputBuffer_平衡读写速度,减少系统调用;
- 线程安全:所有操作绑定到subLoop线程,通过runInLoop和queueInLoop投递任务,避免多线程竞争。
整个流程覆盖了TCP连接从建立到销毁的全场景,是高性能网络库如Muduo中“连接管理”的标准实现。
实现了TcpConnection之后,整个Muduo网络库只剩下一个来将这些组件联系起来的TcpServer类。
