当前位置: 首页 > news >正文

《Muduo网络库:实现TcpServer类终章》

TcpServer类是Muduo库给用户提供的一个类,让用户只需要聚焦业务逻辑的处理(连接建立时做什么、收到消息时做什么、连接断开时做什么),而无需关注复杂底层。我们在使用Muduo库的时候也能看到这一点。

《Muduo网络库:基于Muduo的网络服务器编程示例》-CSDN博客

我们已经实现完成了Muduo网络库的核心组件,包括EventLoopThreadPool、Poller、Acceptor、TcpConnection等,那么TcpServer是如何将这些组件联系在一起交互的?

下面我们就来实现TcpServer类,来分析整个流程。


TcpServer.h

#pragma once#include "noncopyable.h"
#include "EventLoop.h"
#include "Acceptor.h"
#include "InetAddress.h"
#include "EventLoopThreadPool.h"
#include "Callbacks.h"
#include "TcpConnection.h"#include <functional>
#include <string>
#include <memory>
#include <atomic>
#include <unordered_map>// 对外的服务器编程使用的类
class TcpServer
{
public:using ThreadInitCallback = std::function<void(EventLoop *)>;using ConnectionMap = std::unordered_map<std::string, TcpConnectionPtr>;enum Option{kNoResuePort,kResuePort,};TcpServer(EventLoop *loop,const InetAddress &listenAddr,const std::string &nameArg,Option option = kNoResuePort);~TcpServer();// 这些回调接口是给用户提供的void setThreadInitCallback(const ThreadInitCallback &cb) { threadInitCallback_ = cb; }void setConnectionCallback(const ConnectionCallback &cb) { connectionCallback_ = cb; }void setMessageCallback(const MessageCallback &cb) { messageCallback_ = cb; }void setWriteCompleteCallback(const WriteCompleteCallback &cb) { writeCompleteCallback_ = cb; }// 设置线程池中线程数量,实际也就是设置subLoop的个数void setThreadNum(int numThreads);// 开启服务器监听void start();private:void newConnection(int sockfd, const InetAddress &peerAddr);void removeConnection(const TcpConnectionPtr &conn);void removeConnectionInLoop(const TcpConnectionPtr &conn);EventLoop *loop_; // the acceptor loop,即baseLoop(mainLoop),用户定义的loopconst std::string ipPort_;const std::string name_;std::unique_ptr<Acceptor> acceptor_; // 运行在mainLoop上,任务就是监听新连接事件std::shared_ptr<EventLoopThreadPool> threadPool_; // one loop per threadConnectionCallback connectionCallback_;       // 有新连接时的回调MessageCallback messageCallback_;             // 有读写消息时的回调WriteCompleteCallback writeCompleteCallback_; // 消息发送完成以后的回调ThreadInitCallback threadInitCallback_; // loop线程初始化的回调std::atomic_int started_;int nextConnId_; // 连接索引,保证连接唯一标识ConnectionMap connections_; // 保存所有的连接
};

TcpServer.cc

#include "TcpServer.h"
#include "Logger.h"#include <functional>
#include <strings.h>static EventLoop *CheckNotNull(EventLoop *loop)
{if (loop == nullptr){LOG_FATAL("%s:%s:%d mainLoop is null! \n", __FILE__, __FUNCTION__, __LINE__);}return loop;
}TcpServer::TcpServer(EventLoop *loop, const InetAddress &listenAddr, const std::string &nameArg, Option option):loop_(loop),ipPort_(listenAddr.toIpPort()),name_(nameArg),acceptor_(new Acceptor(loop,listenAddr,option==kResuePort)),threadPool_(new EventLoopThreadPool(loop,name_)),connectionCallback_(),messageCallback_(),nextConnId_(1)
{// 当有新用户连接时,会执行TcpServer::newConnectionacceptor_->setNewConnectionCallback(std::bind(&TcpServer::newConnection, this, std::placeholders::_1, std::placeholders::_2));
}TcpServer::~TcpServer()
{for (auto &item : connections_){// 这个局部的shared_ptr智能指针对象,出右括号,可以自动释放new出来的TcpConnection对象资源了TcpConnectionPtr conn(item.second);item.second.reset();// 销毁连接conn->getLoop()->runInloop(std::bind(&TcpConnection::connectDestroyed, conn));}
}// 设置线程池中线程数量,实际也就是设置subLoop的个数
void TcpServer::setThreadNum(int numThreads)
{threadPool_->setThreadNum(numThreads);
}// 开启服务器监听
void TcpServer::start()
{if (started_++ == 0) // 防止一个TcpServer对象被启动多次{threadPool_->start(threadInitCallback_); // 启动loop线程池loop_->runInloop(std::bind(&Acceptor::listen, acceptor_.get()));}
}// 有一个新的客户端连接,acceptor会执行这个回调
void TcpServer::newConnection(int sockfd, const InetAddress &peerAddr)
{// 轮询算法,选择一个subLoop,来管理channelEventLoop *ioLoop = threadPool_->getNextLoop();// 生成连接名char buf[64] = {0};snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);++nextConnId_;std::string connName = name_ + buf;LOG_INFO("TcpServer::newConnection [%s] - new connection [%s] from %s \n", name_.c_str(), connName.c_str(), peerAddr.toIpPort().c_str());// 通过sockfd获取其绑定的本机的ip地址和端口号sockaddr_in local;bzero(&local, 0);socklen_t addrlen = sizeof local;if (::getsockname(sockfd, (sockaddr *)&local, &addrlen) < 0){LOG_ERROR("sockets::getLocalAddr");}// 创建TcpConnection对象InetAddress localAddr(local);TcpConnectionPtr conn(new TcpConnection(ioLoop,connName,sockfd,  // Socket ChannellocalAddr,peerAddr));connections_[connName]=conn;// 下面的回调都是用户设置给TcpServer=》TcpConnection=》Channel=》Poller=》notify channel调用回调conn->setConnectionCallback(connectionCallback_);conn->setMessageCallback(messageCallback_);conn->setWriteCompleteCallback(writeCompleteCallback_);// 设置了如何关闭连接的回调conn->setCloseCallback(std::bind(&TcpServer::removeConnection,this,std::placeholders::_1));// 直接调用TcpConnection::connectEstablishedioLoop->runInloop(std::bind(&TcpConnection::connectEstablished,conn));
}void TcpServer::removeConnection(const TcpConnectionPtr &conn)
{loop_->runInloop(std::bind(&TcpServer::removeConnectionInLoop, this, conn));
}
void TcpServer::removeConnectionInLoop(const TcpConnectionPtr &conn)
{LOG_INFO("TcpServer::removeConnectionInLoop [%s] - connection %s\n", name_.c_str(), conn->name().c_str());connections_.erase(conn->name());EventLoop *ioLoop = conn->getLoop();ioLoop->queueInLoop(std::bind(&TcpConnection::connectDestroyed, conn));
}

核心组件职责

EventLoop:事件循环(Reactor核心),负责监听I/O事件(内部封装Poller和Channel)并触发回调。

Acceptor:运行在主循环,负责监听新连接(调用listen和accept)。

EventLoopThreadPool:管理子循环线程池,实现“one loop per thread”。

TcpConnection:封装单个TCP连接,管理读写事件和回调。

ConnectionMap:保存所有活跃连接,通过连接名索引。

核心设计细节

1. 多线程模型(主从Reactor模型):

主循环(baseLoop)

  • 由用户传入,唯一且固定,负责处理新连接监听(通过Acceptor)。
  • 不处理具体I/O读写,仅负责连接分发。

子循环(subLoop)

  • 由EventLoopThreadPool创建,数量由用户通过setThreadNum设定(默认为0,即单线程模式)。
  • 每个子循环运行在独立的线程,负责处理已连接客户端的读写事件。
  •  负载均衡:新连接通过轮询算法threadPool_->getNextLoop()分配到subLoop。

2. 回调机制设计:

TcpServer提供多层回调接口,用户只需要关注业务逻辑,框架负责事件分发:

  • 连接回调(ConnectionCallback):连接建立/断开时触发(如记录连接状态)。
  • 消息回调(MessageCallback):收到客户端数据时触发(如解析协议、处理业务)。
  • 写完成回到(WriteCompleteCallback):数据发送完成后触发(如流量控制)。
  • 线程初始化回调(ThreadInitCallback):子循环线程启动时触发(如初始化线程局部资源)。

回调传递路径:用户设置回调 -> TcpServer -> TcpConnection -> Channel ->事件触发时执行

用户自己将回调设置给TcpServer,TcpServer再设置给TcpConnection,TcpConnection再设置给Channel,Channel往Poller上注册感兴趣的事件,未来Poller监听到Channel上有事件发生,就触发Channel设置的相应事件的回调。

->TcpServer的回调

->TcpConnection的回调

->Channel的回调

->Channel执行相应事件的回调

3. 资源管理与线程安全

智能指针:

  • Acceptor用unique_ptr管理(生命周期与TcpServer绑定)。
  • TcpConnection用shared_ptr管理(多线程引用,避免提前销毁)。

线程安全保障:

  • 所有对EventLoop的操作必须在其所属的线程执行,通过runInLoop/queueInLoop实现。
  • ConnectionMap的修改仅在主循环执行(removeConnectionInLoop),避免多线程竞争。

详细流程

1. 服务器初始化阶段

  • 绑定baseLoop,即用户的Loop。
  • 创建Acceptor对象acceptor_,以及EventLoopThreadPool线程池对象threadPool_。
  • 同时给acceptor_设置新连接回调TcpServer::newConnection(),当有新用户连接时会触发该回调。

解析:Acceptor对象内部封装了acceptSocket_以及acceptChannel_,acceptor_对象一经创建,初始化时就创建了非阻塞的listen fd,并绑定到acceptChannel_,同时给acceptChannel_绑定了读事件回调Acceptor::handleRead(),有新连接到来时触发handleRead(),handleRead()通过acceptSocket_.accept()获取新连接的fd,同时调用TcpServer给Acceptor设置的新连接回调TcpServer::newConnection()。

2. 服务器启动阶段(TcpServer::start())

  • 线程池启动:threadPool_->start(threadInitCallback_)会创建numThreads个子线程,每个线程初始化一个EventLoop并运行事件循环,同时触发threadInitCallback_

监听端口:主循环调用Acceptor::listen(),底层通过listen系统调用开始监听客户端连接,并将封装了listen fd的acceptChannel_的读事件注册到Poller上。

3. 新连接处理流程(TcpServer::newConnection())

当客户端发起连接,Acceptor会触发newConnection回调,流程如下:

  • 连接名生成:通过nextConnId_自增保证唯一性,用于ConnectionMap索引。
  • 创建连接对象:创建TcpConnection对象,同时通过轮询算法threadPool_->getNextLoop()获取一个subLoop,新创建的TcpConnection对象与选中的子循环绑定,后续I/O事件由该子循环处理。将该连接放入connections_中管理。
  • 给新连接设置回调:给新连接的TcpConnection对象,设置相应回调connectionCallback_、messageCallback_、writeCompleteCallback_以及绑定到关闭连接的回调std::bind(&TcpServer::removeConnection...)
  • 连接建立:调用TcpConnection::connectEstablished(),会激活Channel(关联sockfd),并触发用户设置的connectionCallback_

4. 连接关闭流程(TcpServer::removeConnection())

当客户端断开连接或服务器主动关闭,流程如下:

  • 异步执行:关闭操作通过runInLoop/queueInLoop确保在正确的线程执行,避免线程安全问题。
  • 资源释放:从connections_中移除该连接。同时TcpConnection::connectDestroyed()会销毁Channel并调用connectionCallback_,TcpConnection的引用计数减为0时自动销毁。

5. 消息处理流程

  • 数据读取:当客户端发送消息,子循环监听到读事件之后,由TcpConnection做相应数据读写,TcpConnection封装了输入/输出缓冲区(inputBuffer_/outputBuffer_),自动处理数据粘包/拆包问题。
  • 回调触发:用户通过setMessageCallback设置业务逻辑。

总结

TcpServer框架通过主从Reactor模型实现了高并发处理:

  • 主循环专注新连接监听,避免I/O处理阻塞。
  • 子循环处理已连接的I/O事件,充分利用多核CPU。
  • 回调机制解耦框架与业务逻辑,用户无需关注底层网络细节。
  • 线程安全设计确保多线程环境下的资源管理正确性。

随着TcpServer类的实现结束,整个Muduo网络库的核心代码就此全部实现完成,整个业务流程已经逐渐清晰,其设计细节与精髓也学习到了。

下面会进行整个Muduo库项目代码测试,测试完成以后,再来梳理整个Muduo网络库各模块的交互以及扩展。

http://www.dtcms.com/a/549011.html

相关文章:

  • 三数之和:用Java思路分析
  • 企业有没有必要自建一套培训考试
  • 测试开发话题06---测试分类(1)
  • 【Agentic RL专题】一、LLM agent 与 agentic RL
  • 使用Java做URL短连接还原长链接获取参数
  • 自己网站做电子签章有效么有哪些网站做汽车周边服务
  • 做网站去哪好在线表单 wordpress
  • 从信号零损耗到智能协同:高清混合矩阵全链路技术拆解,分布式可视化系统十大趋势重塑行业
  • 【超详细】MySQL事务面试题
  • Ubuntu(③vsftpd)
  • Ubuntu 25.10 发布,各种衍生版也发布
  • HUAWEI A800I A2 aarch64架构Ubuntu服务器鲲鹏920开启 IOMMU/SMMU 硬件虚拟化功能
  • GitHub 发布 Agent HQ:欢迎回家,智能体们
  • 使用 Python 将 PowerPoint 转换为 Word 文档
  • 怎么成立自己的网站公司网站建设案例
  • 做数码后期上数码网站企业推广文章
  • iOS 抓包工具实战 开发者的工具矩阵与真机排查流程
  • Spring Boot 整合第三方组件:Redis、MyBatis、Kafka 实战
  • 可视化图解算法66:两个数组的交集
  • 7 种方法:如何将视频从电脑传输到安卓手机
  • Qt GridLayout布局详解:从基础到高级技巧
  • BTreeMap 的 B-Tree 之心:性能与安全的 Rust 式演绎
  • 中国查公司的网站长沙 网站设计 公司
  • R 因子:深度解析其在统计学中的重要作用
  • Laravel 结合影刀 RPA 实现企业微信自动询单报价流程
  • Rust 入门之Rust 运算符全面解析:从基础到实战
  • Rust:借用 切片
  • 【Blender工具】
  • Spring Al学习6:嵌入模型 API
  • 坪山区住房和建设局网站wordpress能放视频