当前位置: 首页 > news >正文

《Muduo网络库:TcpConnection类》

Acceptor类是运行在mainLoop中,负责监听新用户的连接,那么已经建立的连接该如何管理呢?TcpConnection类,是Reactor模型中管理单个已建立的TCP连接的生命周期、数据读写以及事件回调的关键类。它封装了Socket、Channel和缓冲区,确保所有操作都在正确的EventLoop中执行,避免线程安全问题。


TcpConnection的核心作用是“管理已建立的TCP连接”,具体职责包括:

  1. 绑定连接的Socket和Channel,将I/O事件(读/写/关闭/错误)与回调函数关联。
  2. 管理输入输出缓冲区,解决“应用读写速度不匹配”和“TCP粘包”问题。
  3. 维护连接状态(连接中/断开中/已连接/已断开),确保操作(发送/关闭)在合法状态下执行。
  4. 提供统一的回调注册接口,让上层模块(TcpServer)自定义连接、消息、关闭等事件处理逻辑。

Callbacks.h

#pragma once#include <memory>
#include <functional>class Buffer;
class TcpConnection;
class Timestamp;using TcpConnectionPtr = std::shared_ptr<TcpConnection>;
using ConnectionCallback = std::function<void(const TcpConnectionPtr &)>;
using CloseCallback = std::function<void(const TcpConnectionPtr &)>;
using WriteCompleteCallback = std::function<void(const TcpConnectionPtr &)>;
using MessageCallback = std::function < void(const TcpConnectionPtr &,Buffer *, Timestamp)>;
using HighWaterMarkCallback = std::function<void(const TcpConnectionPtr &, size_t)>;

TcpConnection.h

#pragma once#include "noncopyable.h"
#include "InetAddress.h"
#include "Callbacks.h"
#include "Buffer.h"
#include "Timestamp.h"#include <memory>
#include <string>
#include <atomic>class Channel;
class EventLoop;
class Socket;/*** TcpServer =》通过Acceptor 监听到有一个新用户连接,通过accept函数拿到connfd** =》connfd打包成一个TcpConnection =》TcpConnection设置回调** =》TcpConnection再把相应的回调设置给Channel =》Channel注册到Poller** =》Poller监听到Channel有事件发生之后,就调用Channel的回调*/class TcpConnection : noncopyable, public std::enable_shared_from_this<TcpConnection>
{
public:TcpConnection(EventLoop *loop,const std::string &nameArg,int sockfd,const InetAddress &localAddr,const InetAddress &peerAddr);~TcpConnection();EventLoop *getLoop() const { return loop_; }const std::string &name() const { return name_; }const InetAddress &localAddress() const { return localAddr_; }const InetAddress &peerAddress() const { return peerAddr_; }bool connected() const { return state_ == kConnected; }// 发送数据void send(const std::string& buf);// 关闭连接void shutdown();void setConnectionCallback(const ConnectionCallback &cb) { connectionCallback_ = cb; }void setMessageCallback(const MessageCallback &cb) { messageCallback_ = cb; }void setWriteCompleteCallback(const WriteCompleteCallback &cb) { writeCompleteCallback_ = cb; }void setHighWaterMarkCallback(const HighWaterMarkCallback &cb, size_t highWaterMark) { highWaterMarkCallback_ = cb, highWaterMark = highWaterMark_; }void setCloseCallback(const CloseCallback &cb) { closeCallback_ = cb; }// 连接建立void connectEstablished();// 连接销毁void connectDestroyed();private:enum StateE { kDisconnected, kConnecting,kConnected, kDisconnecting };  // 表示连接的状态void setState(StateE state) { state_ = state; }void handleRead(Timestamp receiveTime);void handleWrite();void handleClose();void handleError();void sendInLoop(const void *data, size_t len);void shutdownInLoop();EventLoop *loop_; // 这里绝对不是baseLoop,因为TcpConnection都是在subLoop里面管理的const std::string name_;std::atomic_int state_;bool reading_;// 和Acceptor类似 Acceptor=》mainLoop  TcpConnection=》subLoopstd::unique_ptr<Socket> socket_;std::unique_ptr<Channel> channel_;const InetAddress localAddr_;const InetAddress peerAddr_;ConnectionCallback connectionCallback_;       // 有新连接时的回调MessageCallback messageCallback_;             // 有读写消息时的回调WriteCompleteCallback writeCompleteCallback_; // 消息发送完成以后的回调HighWaterMarkCallback highWaterMarkCallback_;CloseCallback closeCallback_;size_t highWaterMark_;Buffer inputBuffer_;  // 接收数据的缓冲区Buffer outputBuffer_;  // 发送数据的缓冲区
};

TcpConnection.cc

#include "TcpConnection.h"
#include "Logger.h"
#include "Socket.h"
#include "Channel.h"
#include "EventLoop.h"#include <functional>
#include <errno.h>
#include <memory>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/tcp.h>
#include <strings.h>
#include <string>
#include <unistd.h>static EventLoop *CheckNotNull(EventLoop *loop)
{if (loop == nullptr){LOG_FATAL("%s:%s:%d TcpConnection Loop is null! \n", __FILE__, __FUNCTION__, __LINE__);}return loop;
}TcpConnection::TcpConnection(EventLoop *loop,const std::string &nameArg,int sockfd,const InetAddress &localAddr,const InetAddress &peerAddr): loop_(loop), name_(nameArg), state_(kConnecting), socket_(new Socket(sockfd)), channel_(new Channel(loop,sockfd)), localAddr_(localAddr), peerAddr_(peerAddr), highWaterMark_(64*1024*1024)  // 64M
{// TcpConnection给channel设置相应事件的回调,Poller给channel通知感兴趣的事件发生了,channel会回调相应的操作函数channel_->setReadCallback(std::bind(&TcpConnection::handleRead, this, std::placeholders::_1));channel_->setWriteCallback(std::bind(&TcpConnection::handleWrite, this));channel_->setCloseCallback(std::bind(&TcpConnection::handleClose, this));channel_->setErrorCallback(std::bind(&TcpConnection::handleError, this));LOG_INFO("TcpConnection::ctor[%s] at fd=%d\n", name_.c_str(), sockfd);socket_->setKeepAlive(true);
}TcpConnection::~TcpConnection()
{LOG_INFO("TcpConnection::dtor[%s] at fd=%d state=%d \n", name_.c_str(), channel_->fd(), (int)state_);
}// 发送数据,提供send接口,供上层发送数据
void TcpConnection::send(const std::string &buf)
{if (state_ == kConnected){if (loop_->isInLoopThread()){sendInLoop(buf.c_str(), buf.size());}else{loop_->runInloop(std::bind(&TcpConnection::sendInLoop, this, buf.c_str(), buf.size()));}}
}/*** 发送数据 应用写的快,而内核发送数据慢,所以需要把待发送的数据写入缓冲区,并且设置了水位线*/
void TcpConnection::sendInLoop(const void *data, size_t len)
{ssize_t nwrote = 0;      // 表示写了多少数据ssize_t remaining = len; // 表示写剩余的数据大小bool faultError = false;if (state_ == kDisconnected){LOG_ERROR("disconnected, give up writing!");return;}// 表示channel是第一次开始写(发送)数据,并且缓冲区没有待发送的数据(为空)if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0){nwrote = write(channel_->fd(), data, len);if (nwrote >= 0){remaining = len - nwrote;if (remaining == 0 && writeCompleteCallback_) // 数据全部发送完成,触发消息发送完成的回调{// 既然这里数据全部发送完成,就不用再给channel设置epollout事件了loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));}}else  // 写失败,非EWOULDBLOCK,标记错误{nwrote = 0;if (errno != EWOULDBLOCK){LOG_ERROR("TcpConnection::sendInLoop");if (errno == EPIPE || errno == ECONNRESET) // SIGPIPE RESET{faultError = true;}}}}/*** 说明当前这一次write,并没有把数据全部发送出去,剩余的数据需要保存到缓冲区当中,* 然后给channel注册epollout事件,Poller发现tcp的发送缓冲区有数据,会通知相应的sock-》channel,* 调用writeCallback_回调方法,也就是调用TcpConnection::handleWrite方法,把发送缓冲区中的数据全部发送完成*/if (!faultError && remaining > 0){size_t oldLen = outputBuffer_.readableBytes(); // 目前发送缓冲区中已有的发送数据的长度(可读数据),待发送的数据量// 存入前待发送的数据量没有超过水位线、待发送的数据量+存入本次剩余的数据量超过了水位线、上层设置了高水位线回调if (oldLen + remaining >= highWaterMark_ && oldLen < highWaterMark_ && highWaterMarkCallback_){loop_->queueInLoop(std::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining));}// 剩余数据存入输出缓冲区outputBuffer_.append((char *)data + nwrote, remaining);if (!channel_->isWriting()){channel_->enableWriting(); // 这里一定要注册channel的写事件,否则Poller不会给channel通知epollout}}
}// 关闭连接
void TcpConnection::shutdown()
{if (state_ == kDisconnected){setState(kDisconnecting);  // 切换状态为断开中loop_->queueInLoop(std::bind(&TcpConnection::shutdownInLoop, this));}
}void TcpConnection::shutdownInLoop()
{if (!channel_->isWriting()) // 说明当前outputBuffer缓冲区中的数据已经全部发送完成{socket_->shutdownWrite(); // 关闭写端}
}// 连接建立
void TcpConnection::connectEstablished()
{setState(kConnected);channel_->tie(shared_from_this()); // 绑定自身生命周期,避免channel回调时TcpConnection对象已经被销毁channel_->enableReading();         // 向Poller注册channel的epollin读事件connectionCallback_(shared_from_this()); // 触发连接建立回调(上层业务处理)
}// 连接销毁
void TcpConnection::connectDestroyed()
{if (state_ == kConnected){setState(kDisconnected);channel_->disableAll(); // 把channel所有感兴趣的事件,从Poller中移除del掉connectionCallback_(shared_from_this());}channel_->remove(); // 把channel从Poller中删除
}// 当Poller检测到channel有事件可读时,通过channel回调此函数
void TcpConnection::handleRead(Timestamp receiveTime)
{int savedErrno = 0;ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);if (n > 0){// 已建立连接的用户,有可读事件发生了,触发消息到达回调,即用户传入的回调操作onMessage,上层可从inputBuffer_中读取数据messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);}else if (n == 0)  // 对端关闭连接,触发handleClose{handleClose();}else{errno = savedErrno;LOG_ERROR("TcpConnection::handleRead");handleError();}
}// 当Poller检测到channel有事件可写时,通过channel回调此函数继续发送缓冲数据
void TcpConnection::handleWrite()
{if (channel_->isWriting()){int savedErrno = 0;ssize_t n = outputBuffer_.writeFd(channel_->fd(), &savedErrno);  // 从发送缓冲区向fd写数据if (n > 0){outputBuffer_.retrieve(n);if (outputBuffer_.readableBytes() == 0){// 缓冲区数据发送完,在Poller中取消channel的写事件channel_->disableWriting();if (writeCompleteCallback_){// 唤醒loop_对应的thread线程,执行回调loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));}// 若处于断开中,发送完数据以后关闭连接if (state_ == kDisconnecting){shutdownInLoop();}}}else{LOG_ERROR("TcpConnection::handleWrite");}}else{LOG_ERROR("TcpConnection fd=%d is down, no more writing \n", channel_->fd());}
}void TcpConnection::handleClose()
{LOG_INFO("TcpConnection::handleClose fd=%d state=%d \n", channel_->fd(), (int)state_);setState(kDisconnected);channel_->disableAll();  // 在Poller中取消该channel的所有注册的事件TcpConnectionPtr connPtr(shared_from_this());connectionCallback_(connPtr); // 执行连接关闭的回调,通知上层关闭连接closeCallback_(connPtr);      // 关闭连接的回调,由TcpServer移除连接并销毁对象
}void TcpConnection::handleError()
{int optval;socklen_t optlen = sizeof optval;int err = 0;if (::getsockopt(channel_->fd(), SOL_SOCKET, SO_ERROR, &optval, &optlen) < 0){err = errno;}else{err = optval;}LOG_ERROR("TcpConnection::handleERROR name:%s - SO_ERROR:%d \n", name_.c_str(), err);
}

上面就是TcpConnection类的实现代码,围绕TCP连接的完整生命周期展开,从连接建立、数据读写、连接关闭到资源销毁,全程基于Reactor事件驱动模型,且严格保证了线程安全。

下面我们按照TCP连接的生命周期阶段边拆解完整流程,边分析其代码设计。

一、初始化阶段:TcpConnection对象创建

TcpConnection对象并非主动创建,而是TcpServer通过Acceptor接受新连接后初始化,是被动创建的组件,流程如下:

1. 触发时机:Acceptor(运行在mainLoop)监听到客户端连接请求,掉用accept()获取新连接的sockfd。

2. 对象构造:TcpServer从EventLoopThreadPool线程池中,通过轮询选择一个subLoop,在该subLoop中创建TcpConnection对象,传入loop_(绑定的subLoop),sockfd(新连接的套接字),localAddr_/peerAddr_(本地服务端地址、客户端地址)。

3. 初始设置:

  • 绑定sokfd到channel_以及socket_,初始状态设置为kConnecting(连接中)。
  • 给channel_(封装sockfd的事件处理器)绑定4类事件回调:handleRead、handleWrite、handleClose、handleError。
  • 调用socket_->setKeepAlive(true)开启Tcp保活机制,避免僵尸连接。

TcpServer通过Acceptor监听到有一个新用户的连接,通过accept拿到新连接的fd,将新连接的fd打包成一个TcpConnection,并设置相应的事件(读/写/错误/关闭)回调,TcpConnection再把相应的回调设置给Channel,Channel注册到Poller上,未来Poller监听到Channel上有事件发生时,就调用Channel的回调。

二、连接建立阶段:connectEstablished

TcpConnection创建后,需要调用connectEstablished()完成连接建立。

1. 状态切换:将连接状态state_从kConnecting(连接中)改为kConnected(已连接),标记连接已建立。

2. 生命周期绑定:调用channel_->tie(),将TcpConnection的生命周期与channel_绑定,确保channel_触发回调执行回调时,TcpConnection对象未被销毁(避免野指针),因为channel_执行的回调是TcpConnection对象设置的。

3. 注册读事件:调用channel_->enableReading(),将channel_注册到subLoop的Poller中,让Poller监听改sockfd的可读事件(等待客户端发送数据)。

4 .通知上层:触发connectionCallback_连接建立的回调(由上层TcpServer注册的回调),告知业务层“新连接已经建立”。

三、数据读写阶段:读(handleRead)+写(send/sendInLoop/handleWrite)

数据读写是TcpConnectin的核心功能,分为“接收客户端数据”“向客户端发送数据”两类流程,均基于事件驱动。

1、接收数据:handleRead(被动读)

当客户端发送数据,Poller检测到sockfd可读,通过channel_调用它的可读事件回调,这个回调是TcpConnection给channel_绑定的handleRead,即channel_会回调handleRead。

1. 读取数据到缓冲区:调用inputBuffer_.readFd(channel_->fd(), &saveErrno),将sockfd中的数据读取到用户态的inputBuffer_(解决了TCP粘包问题,数据先存缓冲,再按协议解析)。

2. 分支处理:

  • 若读取的字节数n>0:触发messageCallback_有消息时的回调(上层注册的回调),将inputBuffer_传递给业务层,让业务层按协议从inputBuffer_读取完整信息(如HTTP解析,自定义协议解包);
  • 若n==0:表示客户端主动关闭连接(发送FIN包),调用handleClose()处理关闭逻辑;
  • 若n<0:表示读取错误(如连接重置),调用handleError()记录错误信息(通过getsockopt获取具体错误码)。

2、发送数据:send -> sendInLoop -> handleWrite(主动写)

业务层调用send()向客户端发送数据,因为需要保证线程安全,所有操作都需要在subLoop的所属线程中完成,所以实际发送逻辑在sendInLoop中执行,流程分为两步:

(1)首次发送:sendInLoop

业务层调用send(const std::string& buf)后,先判断当前线程是否为subLoop线程:

  • 若是:直接调用sendInLoop(buf.c_str(), buf.size());
  • 若不是:通过loop_->runInLoop()将发送任务投递到任务队列中,让subLoop所属线程执行(避免多线程操作outputBuffer_)。

sendInLoop的核心逻辑(处理“应用写速>内核发速”的场景):

1. 尝试直接写内核:

若channel_未注册写事件且outputBuffer_输出缓冲区为空(无数据),表示是首次发送数据,直接调用write()向sockfd写数据。

  • 若数据全部发送完(remaining剩余数据==0):触发writeCompleteCallback_消息写完时的回调(上层回调,进行后续处理),将该回调通过loop_->queueInLoop()放入任务队列,由subLoop的所属线程执行该回调,确保线程安全;
  • 若数据未发送完(remaining剩余数据>0,如内核缓冲区满了):进入下一步。

2. 剩余缓冲区存缓冲:

检查“当前缓冲数据+未发送数据”是否超过了highWaterMark_水位线(设置为64M):若首次超过了阈值且注册了highWaterMarkCallback_高水位回调,触发该回调(上层可暂停接收新数据,避免缓冲溢出);

将未发完的reamining剩余数据通过outputBuffer_.append()存入outputBuffer_输出缓冲区;

调用channel_->enableWriting():向Poller注册写事件,等待内核缓冲区有空间时继续发送数据,未来内核缓冲区有空间可写了,Poller会通知channel_上有epollout可写事件。

(2):后续发送:handleWrite

当内核缓冲区有空间可写时,Poller检查到sockfd可写,通过channel_回调TcpConnection给其绑定的handleWrite回调:

1. 从缓冲写内核:调用outputBuffer_.writeFd(),将outputBuffer_中的剩余数据写入sockfd。

2. 清理与回调:调用outputBuffer_.retrieve()移除已发送的数据;

  • 若缓冲区为空:表示数据已经发送完,调用channel_->disableWriting()在Poller中取消该channel_的写事件注册,避免Poller频繁通知),并触发writeCompleteCallback_回调;
  • 若连接处于kDisconnecting(断开中):调用shutdownInLoop(),发送完缓冲数据后关闭连接。

四、连接关闭阶段:shutdown -> handleClose

连接关闭分为“主动关闭”(业务层触发)和“被动关闭”(客户端触发),最终都通过handleClose完成资源清理。

1、主动关闭:shutdown -> shutdownInLoop

业务层调用shutdown()主动关闭连接:

1. 状态切换:将state_从kConnected连接中改为kDisconnecting断开中,标记连接正在断开;

2. 投递关闭任务:通过loop_->queueInLoop()将shutdownInLoop投递到任务队列中,让subLoop所属线程执行;

3. 安全关闭:shutdownInLoop中,若channel_未注册写事件(缓冲数据已发完),调用socket_->shutdownWrite()关闭写端(发送FIN包,告知客户端“不再发数据”)。

2、被动关闭:handleRead(n==0)-> handleClose

客户端主动关闭连接,handleRead读取到n==0(接收FIN包),触发handleClose:

1. 状态切换:将state_改为kDisconnected;

2. 取消事件注册:调用channel_->disableAll(),从Poller中移除改channel_的所有感兴趣事件;

3. 通知上层:触发connectionCallback_(告知业务层“连接已关闭”,如清理业务数据、减少连接数);触发closeCallback_(上层TcpServer注册的回调,用于从ConnectionMap中移除当前连接,避免内存泄漏)。

五、资源销毁阶段:connectDestroyed

TcpServer收到closeCallback_后,调用connectDestroyed()完成最终的资源销毁:

1. 二次确认状态:若连接仍为kConnected,先切换为kDisconnected并取消事件注册;

2. 移除channel_:调用channel_->reamove(),将channel_从Poller中删除;

3. 资源释放:TcpConnection对象的shared_ptr引用计数减1,当引用计数为0时,自动析构socket_(关闭sockfd)和channel_,释放所有资源。

流程核心总结

TcpConnection的流程本质是“事件驱动+缓冲区缓冲+线程安全”的结合:

  • 事件驱动:通过Channel和Poller监听IO事件,回调对应处理函数,避免轮询;
  • 缓冲区:inputBuffer_解粘包,outputBuffer_平衡读写速度,减少系统调用;
  • 线程安全:所有操作绑定到subLoop线程,通过runInLoop和queueInLoop投递任务,避免多线程竞争。

整个流程覆盖了TCP连接从建立到销毁的全场景,是高性能网络库如Muduo中“连接管理”的标准实现。


实现了TcpConnection之后,整个Muduo网络库只剩下一个来将这些组件联系起来的TcpServer类。

http://www.dtcms.com/a/540273.html

相关文章:

  • 网站详情页怎么做的好看的网页设计作品欣赏
  • 线扫相机上位机开发——如何提高问题排查效率
  • 计算机网络自顶向下方法10——应用层 HTTP/2 成帧 响应报文优先次序和服务器推
  • 孝感网站的建设网页设计一般一个月工资多少
  • 什么是持续集成(CI)和持续交付(CD)?测试在其中扮演什么角色?
  • 利用机器学习优化CPU调度的一些思路案例
  • Kafka 消息顺序消费深度解析:原理、实现方案与全局有序可行性分析
  • 数据结构初识,与算法复杂度
  • 网站色彩搭配中国纪检监察报社官网
  • (六)策略梯度算法 and Actor-Critic 框架
  • 基于萤火虫算法(FA)优化支持向量机(SVM)参数的分类实现
  • 【C++】C++11出来之后,到目前为止官方都做了些什么更新?
  • 公司网站建设及推广淮南网云小镇怎么样
  • UE C++ 离线安装 经验
  • Smart SVG Viewer,一款免费的SVG 图像查看器
  • 基於 MAC 的模型算力估算方法
  • VoxCPM macOS 安装部署
  • 【Linux篇】ELF文件与程序加载:理解链接过程中的静态库,动态库及目标文件
  • 做体育直播网站做数据权威的网站
  • 《因为独特》不畏惧与众不同 王宁泡泡玛特的独特之道:低风险创业的人性解码与产品设计指南
  • 【打靶日记】VulNyx 之 Lower3
  • DomainNet 数据集下载
  • 6.1.2.1 大数据方法论与实践指南-离线任务分类
  • wordpress密码忘了怎么找回郑州网站优化网络建设有限公司
  • AI隐式标识‌中的红绿名单水印技术通俗讲解
  • idea能怎么对比2个文件
  • 纠删码(erasure coding,EC)技术现状
  • 使用mybatis 实现量表关联,并且统计数据量
  • 哈希表的HashMap 和 HashSet
  • 从编程语言出发如何考虑投入研发RAG智能体