当前位置: 首页 > news >正文

手写muduo网络库(九):TcpConnection

一、引言

在网络编程中,TCP 连接是非常重要的一部分。TcpConnection 类在 muduo 网络库中承担着管理单个 TCP 连接的重任,它封装了与 TCP 连接相关的各种操作,如数据的读写、连接的建立与销毁等。本文将详细剖析 TcpConnection 类的代码,深入探讨其功能和实现逻辑。

二、功能概述

TcpConnection 类的主要功能包括:

  1. 管理 TCP 连接状态:跟踪连接的建立、连接中、已连接和断开连接等状态。
  2. 数据读写:提供数据发送和接收的接口,支持普通数据和文件的发送。
  3. 事件处理:处理连接上的读、写、关闭和错误事件。
  4. 回调机制:允许用户注册连接建立、消息接收、数据发送完成、高水位标记和连接关闭等回调函数。

三、代码结构与实现细节

3.1 头文件 TcpConnection.h

#pragma once#include "NonCopyable.h"
#include "InetAddress.h"
#include "Callbacks.h"
#include "Buffer.h"
#include "Timestamp.h"#include <memory>
#include <string>
#include <atomic>class Channel;
class EventLoop;
class Socket;class TcpConnection : NonCopyable, public std::enable_shared_from_this<TcpConnection>
{
public:TcpConnection(EventLoop *loop,const std::string &nameArg,int sockfd,const InetAddress &localAddr,const InetAddress &peerAddr);~TcpConnection();EventLoop *getLoop() const { return loop_; }const std::string &name() const { return name_; }const InetAddress &localAddress() const { return localAddr_; }const InetAddress &peerAddress() const { return peerAddr_; }bool connected() const { return state_ == kConnected; }// 发送数据void send(const std::string &buf);void sendFile(int fileDescriptor, off_t offset, size_t count); // 关闭半连接void shutdown();void setConnectionCallback(const ConnectionCallback &cb){ connectionCallback_ = cb; }void setMessageCallback(const MessageCallback &cb){ messageCallback_ = cb; }void setWriteCompleteCallback(const WriteCompleteCallback &cb){ writeCompleteCallback_ = cb; }void setCloseCallback(const CloseCallback &cb){ closeCallback_ = cb; }void setHighWaterMarkCallback(const HighWaterMarkCallback &cb, size_t highWaterMark){ highWaterMarkCallback_ = cb; highWaterMark_ = highWaterMark; }// 连接建立void connectEstablished();// 连接销毁void connectDestroyed();private:enum StateE{kDisconnected,kConnecting,kConnected,kDisconnecting };void setState(StateE state) { state_ = state; }void handleRead(Timestamp receiveTime);void handleWrite();void handleClose();void handleError();void sendInLoop(const void *data, size_t len);void shutdownInLoop();void sendFileInLoop(int fileDescriptor, off_t offset, size_t count);EventLoop *loop_;const std::string name_;std::atomic_int state_;bool reading_;std::unique_ptr<Socket> socket_;std::unique_ptr<Channel> channel_;const InetAddress localAddr_;const InetAddress peerAddr_;ConnectionCallback connectionCallback_;MessageCallback messageCallback_;WriteCompleteCallback writeCompleteCallback_;HighWaterMarkCallback highWaterMarkCallback_;CloseCallback closeCallback_;size_t highWaterMark_;Buffer inputBuffer_;Buffer outputBuffer_;
};
3.1.1 类定义与继承
  • TcpConnection 继承自 NonCopyable,这是为了防止该类被拷贝,因为 TCP 连接是唯一的,不应该被复制。
  • std::enable_shared_from_this<TcpConnection> 是一个模板类,它允许在 TcpConnection 对象内部安全地获取指向自身的 std::shared_ptr,这在回调函数中传递对象的所有权时非常有用。
3.1.2 成员变量
  • EventLoop *loop_:指向所属的事件循环,用于处理该连接上的事件。
  • const std::string name_:连接的名称,方便调试和日志记录。
  • std::atomic_int state_:连接的状态,使用 std::atomic 保证线程安全,避免多线程环境下的竞态条件。
  • std::unique_ptr<Socket> socket_ 和 std::unique_ptr<Channel> channel_:分别管理底层的套接字和事件通道,使用 std::unique_ptr 确保资源的自动释放。
  • const InetAddress localAddr_ 和 const InetAddress peerAddr_:分别表示本地地址和对端地址。
  • ConnectionCallbackMessageCallback 等各种回调函数:用于处理连接建立、消息接收等事件。
  • Buffer inputBuffer_ 和 Buffer outputBuffer_:分别用于存储接收到的数据和待发送的数据。
3.1.3 成员函数
  • 构造函数:初始化连接的各种属性,设置通道的回调函数,并开启保活机制。
  • 析构函数:输出日志信息,确保资源被正确释放。
  • 数据发送函数send 和 sendFile 提供了发送普通数据和文件的接口,它们会根据当前线程情况调用 sendInLoop 或 sendFileInLoop
  • 关闭连接函数shutdown 用于关闭半连接,它会调用 shutdownInLoop 实际执行关闭操作。
  • 回调设置函数:允许用户注册各种回调函数,以便在相应事件发生时执行自定义操作。
  • 连接建立和销毁函数connectEstablished 和 connectDestroyed 分别处理连接的建立和销毁过程。

3.2 源文件 TcpConnection.cpp

3.2.1 构造函数
TcpConnection::TcpConnection(EventLoop *loop,const std::string &nameArg,int sockfd,const InetAddress &localAddr,const InetAddress &peerAddr): loop_(CheckLoopNotNull(loop)), name_(nameArg), state_(kConnecting), reading_(true), socket_(new Socket(sockfd)), channel_(new Channel(loop, sockfd)), localAddr_(localAddr), peerAddr_(peerAddr), highWaterMark_(64 * 1024 * 1024) // 64M
{channel_->setReadCallback(std::bind(&TcpConnection::handleRead, this, std::placeholders::_1));channel_->setWriteCallback(std::bind(&TcpConnection::handleWrite, this));channel_->setCloseCallback(std::bind(&TcpConnection::handleClose, this));channel_->setErrorCallback(std::bind(&TcpConnection::handleError, this));LOG_INFO << "TcpConnection::ctor[" << name_.c_str() << "]at fd= " << sockfd;socket_->setKeepAlive(true);
}
  • CheckLoopNotNull 函数确保事件循环指针不为空,避免出现空指针异常。
  • 使用 std::bind 将通道的各种回调函数绑定到 TcpConnection 的成员函数上,这样当通道上发生相应事件时,会调用 TcpConnection 对应的处理函数。
  • 开启保活机制,防止连接在长时间无数据传输时被关闭。
3.2.2 数据发送函数
void TcpConnection::send(const std::string &buf)
{if (state_ == kConnected){if (loop_->isInLoopThread()){sendInLoop(buf.c_str(), buf.size());}else{loop_->runInLoop(std::bind(&TcpConnection::sendInLoop, this, buf.c_str(), buf.size()));}}
}void TcpConnection::sendInLoop(const void *data, size_t len)
{ssize_t nwrote = 0;size_t remaining = len;bool faultError = false;if (state_ == kDisconnected){LOG_ERROR << "disconnected, give up writing";}if (!channel_->isWriting() || outputBuffer_.readableBytes() == 0){nwrote = ::write(channel_->fd(), data, len);if (nwrote >= 0){remaining = len - nwrote;if (remaining == 0 && writeCompleteCallback_){loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));}}else // nwrote < 0{nwrote = 0;if (errno != EWOULDBLOCK) // EWOULDBLOCK表示非阻塞情况下没有数据后的正常返回 等同于EAGAIN{LOG_ERROR << "TcpConnection::sendInLoop";if (errno == EPIPE || errno == ECONNRESET) // SIGPIPE RESET{faultError = true;}}}}if (!faultError && remaining > 0){size_t oldLen = outputBuffer_.readableBytes();if (oldLen + remaining >= highWaterMark_ && oldLen < highWaterMark_ && highWaterMarkCallback_){loop_->queueInLoop(std::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining));}outputBuffer_.append((char *)data + nwrote, remaining);if (!channel_->isWriting()){channel_->enableWriting(); // 这里一定要注册channel的写事件 否则poller不会给channel通知epollout}}
}
  • send 函数会根据当前线程是否是事件循环线程,决定是直接调用 sendInLoop 还是通过事件循环的 runInLoop 函数在事件循环线程中调用。
  • sendInLoop 函数首先尝试直接将数据写入套接字,如果写入成功且数据全部发送完,则调用 writeCompleteCallback_ 回调函数。如果写入失败且不是非阻塞无数据的正常情况,则记录错误信息。
  • 如果还有剩余数据未发送,则将其追加到 outputBuffer_ 中,并注册通道的写事件,以便后续继续发送。
3.2.3 关闭连接函数
void TcpConnection::shutdown()
{if (state_ == kConnected){setState(kDisconnecting);loop_->runInLoop(std::bind(&TcpConnection::shutdownInLoop, this));}
}void TcpConnection::shutdownInLoop()
{if (!channel_->isWriting()) // 说明当前outputBuffer_的数据全部向外发送完成{socket_->shutdownWrite();}
}
  • shutdown 函数将连接状态设置为 kDisconnecting,并通过事件循环的 runInLoop 函数在事件循环线程中调用 shutdownInLoop
  • shutdownInLoop 函数检查 outputBuffer_ 中的数据是否全部发送完,如果是,则关闭写端。
3.2.4 连接建立和销毁函数
// 连接建立
void TcpConnection::connectEstablished()
{setState(kConnected);channel_->tie(shared_from_this());channel_->enableReading(); // 向poller注册channel的EPOLLIN读事件// 新连接建立 执行回调connectionCallback_(shared_from_this());
}
// 连接销毁
void TcpConnection::connectDestroyed()
{if (state_ == kConnected){setState(kDisconnected);channel_->disableAll(); // 把channel的所有感兴趣的事件从poller中删除掉connectionCallback_(shared_from_this());}channel_->remove(); // 把channel从poller中删除掉
}
  • connectEstablished 函数将连接状态设置为 kConnected,绑定通道和 TcpConnection 对象,注册通道的读事件,并调用 connectionCallback_ 回调函数。
  • connectDestroyed 函数将连接状态设置为 kDisconnected,禁用通道的所有事件,调用 connectionCallback_ 回调函数,最后从事件循环中移除通道。
3.2.5 事件处理函数
// 读是相对服务器而言的 当对端客户端有数据到达 服务器端检测到EPOLLIN 就会触发该fd上的回调 handleRead取读走对端发来的数据
void TcpConnection::handleRead(Timestamp receiveTime)
{int savedErrno = 0;ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);if (n > 0) // 有数据到达{// 已建立连接的用户有可读事件发生了 调用用户传入的回调操作onMessage shared_from_this就是获取了TcpConnection的智能指针messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);}else if (n == 0) // 客户端断开{handleClose();}else // 出错了{errno = savedErrno;LOG_ERROR << "TcpConnection::handleRead";handleError();}
}void TcpConnection::handleWrite()
{if (channel_->isWriting()){int savedErrno = 0;ssize_t n = outputBuffer_.writeFd(channel_->fd(), &savedErrno);if (n > 0){outputBuffer_.retrieve(n);//从缓冲区读取reable区域的数据移动readindex下标if (outputBuffer_.readableBytes() == 0){channel_->disableWriting();if (writeCompleteCallback_){// TcpConnection对象在其所在的subloop中 向pendingFunctors_中加入回调loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));}if (state_ == kDisconnecting){shutdownInLoop(); // 在当前所属的loop中把TcpConnection删除掉}}}else{LOG_ERROR << "TcpConnection::handleWrite";}}else{LOG_ERROR << "TcpConnection fd= " << channel_->fd() << " is down, no more writing";}
}void TcpConnection::handleClose()
{LOG_INFO << "TcpConnection::handleClose fd= " << channel_->fd() << " state= " << (int)state_;setState(kDisconnected);channel_->disableAll();TcpConnectionPtr connPtr(shared_from_this());connectionCallback_(connPtr); // 连接回调closeCallback_(connPtr);      // 执行关闭连接的回调 执行的是TcpServer::removeConnection回调方法   // must be the last line
}void TcpConnection::handleError()
{int optval;socklen_t optlen = sizeof optval;int err = 0;if (::getsockopt(channel_->fd(), SOL_SOCKET, SO_ERROR, &optval, &optlen) < 0){err = errno;}else{err = optval;}LOG_ERROR << "TcpConnection::handleError name: " << name_.c_str() << " - SO_ERROR: " << err;
}
  • handleRead 函数从套接字读取数据到 inputBuffer_ 中,如果有数据到达,则调用 messageCallback_ 回调函数;如果客户端断开连接,则调用 handleClose 函数;如果出错,则调用 handleError 函数。
  • handleWrite 函数将 outputBuffer_ 中的数据写入套接字,如果数据全部发送完,则禁用通道的写事件,调用 writeCompleteCallback_ 回调函数,并在需要时关闭连接。
  • handleClose 函数将连接状态设置为 kDisconnected,禁用通道的所有事件,调用 connectionCallback_ 和 closeCallback_ 回调函数。
  • handleError 函数获取套接字的错误信息,并记录错误日志。
3.2.6 零拷贝发送文件函数
// 新增的零拷贝发送函数
void TcpConnection::sendFile(int fileDescriptor, off_t offset, size_t count) 
{if (connected()) {if (loop_->isInLoopThread()) { sendFileInLoop(fileDescriptor, offset, count);}else{ loop_->runInLoop(std::bind(&TcpConnection::sendFileInLoop, shared_from_this(), fileDescriptor, offset, count));}} else {LOG_ERROR << "TcpConnection::sendFile - not connected";}
}// 在事件循环中执行sendfile
void TcpConnection::sendFileInLoop(int fileDescriptor, off_t offset, size_t count) 
{ssize_t bytesSent = 0; // 发送了多少字节数size_t remaining = count; // 还要多少数据要发送bool faultError = false; // 错误的标志位if (state_ == kDisconnecting) { // 表示此时连接已经断开就不需要发送数据了LOG_ERROR << "disconnected, give up writing";return;}// 表示Channel第一次开始写数据或者outputBuffer缓冲区中没有数据if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) {bytesSent = sendfile(socket_->fd(), fileDescriptor, &offset, remaining);if (bytesSent >= 0) {remaining -= bytesSent;if (remaining == 0 && writeCompleteCallback_) {// remaining为0意味着数据正好全部发送完,就不需要给其设置写事件的监听。loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));}} else{ // bytesSent < 0if (errno != EWOULDBLOCK) { // 如果是非阻塞没有数据返回错误这个是正常显现等同于EAGAIN,否则就异常情况LOG_ERROR << "TcpConnection::sendFileInLoop";}if (errno == EPIPE || errno == ECONNRESET) {faultError = true;}}}// 处理剩余数据if (!faultError && remaining > 0) {// 继续发送剩余数据loop_->queueInLoop(std::bind(&TcpConnection::sendFileInLoop, shared_from_this(), fileDescriptor, offset, remaining));}
}
  • sendFile 函数提供了发送文件的接口,它会根据当前线程情况调用 sendFileInLoop
  • sendFileInLoop 函数使用 sendfile 系统调用实现零拷贝发送文件,如果数据全部发送完,则调用 writeCompleteCallback_ 回调函数;如果还有剩余数据,则通过事件循环的 queueInLoop 函数继续发送。

四、设计思路与 C++ 语言特性细节

4.1 设计思路

  • 分层架构TcpConnection 类将底层的套接字操作和事件处理封装起来,提供了高层的接口,使得上层代码可以方便地使用 TCP 连接。
  • 回调机制:通过注册回调函数,用户可以自定义连接建立、消息接收等事件的处理逻辑,提高了代码的灵活性和可扩展性。
  • 事件驱动:使用事件循环和通道来处理连接上的事件,避免了轮询带来的性能开销。

4.2 C++ 语言特性细节

  • 智能指针:使用 std::unique_ptr 管理 Socket 和 Channel 对象,确保资源的自动释放;使用 std::shared_ptr 和 std::enable_shared_from_this 安全地传递对象的所有权。
  • std::bind 和 std::placeholders:用于绑定成员函数和参数,实现回调函数的注册。
  • std::atomic:保证连接状态的线程安全,避免多线程环境下的竞态条件。

五、总结

TcpConnection 类是 muduo 网络库中非常重要的一部分,它封装了 TCP 连接的各种操作,提供了灵活的回调机制和高效的事件处理方式。通过深入理解 TcpConnection 类的代码,我们可以学习到网络编程中的一些重要设计思路和 C++ 语言的高级特性。

相关文章:

  • 如何使用configure脚本安装PBS
  • 图形编辑器基于Paper.js教程29:基于图层的所有矢量图元的填充规则实现
  • 组策略关闭 Windows 防火墙指南(企业版/专业版)
  • SpringMVC系列(一)(介绍,简单应用以及路径位置通配符)
  • 机器学习实验报告5-K-means 算法
  • Linux--存储系统探秘:从块设备到inode
  • 影视剧学经典系列-梁祝-陶渊明《感士不遇赋并序》
  • Appium+python自动化(二十三)- Monkeyrunner与Monkey
  • React forwardRef 与 useImperativeHandle 深度解析
  • selenium点击元素出现的obscure问题
  • 设计模式精讲 Day 2:工厂方法模式(Factory Method Pattern)
  • 什么是敏捷中的迭代(Iteration)和 Sprint?
  • 计算机硬件——主板
  • 【旧题新解】第 9 集 带余除法
  • Java 常用类 Arrays:从零到实战的数组操作指南
  • ArkUI-X框架LogInterface使用指南
  • 安卓9.0系统修改定制化____深入解析安卓 9.0 各手机分区:功能、作用与差异 基础篇二
  • Java的DI依赖注入
  • 易采集EasySpider v0.6.3 便携版
  • HTML5+JS实现一个简单的SVG 贝塞尔曲线可视化设计器,通过几个点移动位置,控制曲线的方向
  • pjblog wordpress/seo品牌
  • 培训web网站开发/如何建立电商平台
  • 南京怎样做网站/网站排名监控工具
  • 福州网红景点/百度热搜seo
  • 利用路由器做网站/国家免费职业技能培训官网
  • 重庆建网站多少钱/如何优化