当前位置: 首页 > news >正文

C++项目:仿muduo库高并发服务器---------LoopThreadPool模块和TcpServer模块的实现


文章目录

  • 前言
  • 一、LoopThreadPool模块设计
  • 1.1 功能概述
    • 1.2 代码实现
  • 二、TcpServer模块设计
    • 2.1 模块组成
    • 2.2 功能概述
    • 2.3 流程
    • 2.4 代码实现


前言

本篇文章介绍了LoopThreadPool模块和TcpServer模块的实现其中:LoopThreadPool 模块是用来管理 LoopThread而设计的,帮助我们给新建立的连接进行线程分配。TcpServer 模块用于整合所有相关模块,通过实例化该模块的对象,可简便搭建服务器。


一、LoopThreadPool模块设计

1.1 功能概述

LoopThreadPool 用于管理和分配所有 LoopThread,支持可配置的线程数量(包括 0 个或多个),并能依据主从 Reactor 模型的逻辑,为新连接分配合适的线程(LoopThread 对应的 EventLoop)以进行事件监控与处理。

  1. 线程数量可配置
    线程池能设置包含 0 个或多个 LoopThread。当从属线程数量为 0 时,实现单 Reactor 服务器:一个线程既负责获取新连接,也负责连接的事件处理。

在一些轻量级场景下,一般不需要多线程执行任务

  1. 管理 LoopThread 对象
    对 0 个或多个 LoopThread 实例进行统一管理,包括创建、维护等操作。
  2. 线程分配功能
    • 当主线程获取新连接后,需将其分配给对应线程的 EventLoop 以进行事件监控和处理。
    • 若从属线程数量为 0,直接将新连接分配给主线程的 EventLoop 处理。
    • 若存在多个从属线程,采用 RR(Round - Robin,轮转) 算法进行线程分配:轮询获取从属线程的 EventLoop,并设置到对应的 Connection 中。

1.2 代码实现

class LoopEventPool{public:LoopEventPool(EventLoop*loop):_base_loop(loop),_next_index(0),_thread_count(0){}//设置线程个数void SetThreadCount(int sz){_thread_count=sz;}//创建从属线程void Create(){if(_thread_count>0){_threads.resize(_thread_count);_loops.resize(_thread_count);for(int i=0;i<_thread_count;i++){_threads[i]=new LoopServer();_loops[i]=_threads[i]->GetEventLoop();}}return;}//给新到来的连接获取loopEventLoop* NextLoop(){if(_thread_count==0){return _base_loop;}_next_index=(_next_index+1)%_thread_count;return _loops[_next_index];}private:int _thread_count;//创建从属线程线程个数int _next_index;//分配线程索引EventLoop* _base_loop;//主线程绑定的loopstd::vector<LoopServer*> _threads;//丛属线程std::vector<EventLoop*> _loops;//从属线程对应的loop
};

二、TcpServer模块设计

2.1 模块组成

  1. Acceptor 对象:用于创建监听套接字。
  2. EventLoop 对象(baseloop):实现对监听套接字的事件监控。
  3. std::unordered_map<uint64_t, PtrConnection> _conns:对所有新建连接进行管理。
  4. LoopThreadPool 对象:创建线程池,对新建连接进行事件监控及处理。

2.2 功能概述

  1. 设置从属线程池数量。
  2. 启动服务器。
  3. 设置多种回调函数(连接建立完成、消息、关闭、任意等),用户设置给 TcpServerTcpServer 再设置给获取的新连接。
  4. 选择是否启动非活跃连接超时销毁功能。
  5. 具备添加定时任务功能。

2.3 流程

  1. TcpServer 中实例化 Acceptor 对象和 EventLoop 对象(baseloop)。
  2. Acceptor 挂到 baseloop 上进行事件监控。
  3. Acceptor 对象就绪可读事件时,执行读事件回调函数获取新建连接。
  4. 对新连接,创建 Connection 进行管理。
  5. 对连接对应的 Connection 设置功能回调(连接完成回调、消息回调、关闭回调、任意事件回调)。
  6. 启动 Connection 的非活跃连接的超时销毁规则。
  7. 将新连接对应的 Connection 挂到 LoopThreadPool 中的从属线程对应的 EventLoop 中进行事件监控。
  8. 一旦 Connection 对应的连接就绪了可读事件,则执行读事件回调函数,读取数据,读取完毕后调用 TcpServer 设置的消息回调。

2.4 代码实现

//对所有模块管理、快速搭建服务区
class TcpServer{//由组件使用者设置的事件回调using Functor=std::function<void()>;using ConnectedCallback=std::function<void(PtrConnection)>;using MessageCallback=std::function<void(PtrConnection,Buffer*buf)>;using ClosedCallback=std::function<void(PtrConnection)>;using AnyEventCallback=std::function<void(PtrConnection)>;ConnectedCallback _connected_callback;MessageCallback _message_callback;ClosedCallback _closed_callback;AnyEventCallback _event_callback;ClosedCallback _server_closed_callback;
private://为线连接创建Connection对象void NewConnection(int newfd){_next_id++;PtrConnection conn(new Connection(_threadpool.NextLoop(), _next_id, newfd));conn->SetMessageCallback(_message_callback);// conn->SetAnyEventCallback(std::bind(HandleRead));conn->SetConnectedCallback(_connected_callback);conn->SetClosedCallback(_closed_callback);conn->SetAnyEventCallback(_event_callback);//内部设置的清理连接信息回调conn->SetSrvClosedCallback(std::bind(&TcpServer::RemoveConnection,this,std::placeholders::_1));_conns.insert({_next_id, conn});if(_enable_inactive_release)conn->EnableInactiveRelease(_timeout);conn->Established();DBG_LOG("NEW LINK THREAD");}//将连接信息从_conns中移除void RemoveConnection(const PtrConnection& conn){_baseloop.RunInLoop(std::bind(&TcpServer::RemoveConnectionInLooop,this,conn));}void RemoveConnectionInLooop(const PtrConnection& conn){uint64_t id=conn->Id();auto it=_conns.find(id);if(it!=_conns.end()){_conns.erase(id);}}void RunAfterInLoop(const Functor&task,int dalay){_next_id++;_baseloop.TimerAdd(_next_id,dalay,task);}
public:TcpServer(int port):_next_id(0),_port(port),_enable_inactive_release(false),_acceptor(&_baseloop,port),_threadpool(&_baseloop){_acceptor.SetAcceptCallback(std::bind(&TcpServer::NewConnection,this,std::placeholders::_1));}//设置从属线程个数void SetThreadCount(int sz){_threadpool.SetThreadCount(sz);}void SetConnectedCallback(const ConnectedCallback&cb) { _connected_callback = cb; }void SetMessageCallback(const MessageCallback&cb) { _message_callback = cb; }void SetClosedCallback(const ClosedCallback&cb) { _closed_callback = cb; }void SetAnyEventCallback(const AnyEventCallback&cb) { _event_callback = cb; }void SetSrvClosedCallback(const ClosedCallback&cb) { _server_closed_callback = cb; }//启动非活跃销毁void EnableInactiveRelease(int timeout){_timeout=timeout;_enable_inactive_release=true;}//添加定时任务void RunAfter(const Functor&task,int dalay){_baseloop.RunInLoop(std::bind(&TcpServer::RunAfterInLoop,this,task,dalay));}void Start(){//创建从属线程_threadpool.Create();//创建从属线程_acceptor.Listen();//开始监听套接字std::cout<<"1"<<std::endl;_baseloop.Start();//开始监听处理事件}
private:uint64_t _next_id;//自增长的线程id;int _port;//监听套接字绑定的端口号int _timeout;//非活跃连接的超时时间bool _enable_inactive_release;//是否启用非活跃连接EventLoop _baseloop;//对监听套接字进行事件监控Acceptor _acceptor;//创建并管理监听套接字LoopEventPool _threadpool;//对从属线程进行管理std::unordered_map<uint64_t,PtrConnection> _conns;//存储连接信息};
http://www.dtcms.com/a/407017.html

相关文章:

  • S7-200 SMART GET/PUT 指令深度解析:从参数到故障排查(S7 协议的客户端 - 服务器通信)上
  • C++11之异常
  • 网站开发软硬件网站建设应注意什么
  • wordpress全站注明国外代理ip地址 免费
  • LightDM 深度解析:图形登录管理器的机制、TTY 映射与嵌入式调试实战
  • Dlib库 人脸应用实例 疲劳监测
  • 11.2. Linux 防火墙管理
  • VMware+RockyLinux+ikuai+docker+cri-docker+k8s 自用 实践笔记(三)
  • 基于全基因组做UGT基因家族,发Top期刊(纯生信)
  • 网店网站模板wordpress get_pages
  • 自己做视频网站的流程关键词排名点击软件首页
  • h5免费建站网站自动化采集
  • C语言字符串函数详解:字符处理、strlen、strcpy、strcat等使用与模拟实现
  • Alibaba Cloud Linux 3 +Docker 部署 ThinkPHP6 (宝塔环境)
  • ps免费素材网站有哪些wordpress虚拟3d网站
  • springBoot 集成Neo4j 实战演示
  • 深圳专业网站建设平台网页深圳龙华区发达吗
  • 正点原子【第四期】Linux之驱动开发学习笔记-6.1 pinctrl和gpio子系统
  • 网站要跟换域名怎么做wordpress支持大数据处理
  • 【LeetCode 每日一题】165. 比较版本号
  • 实验一 中断功能实验
  • 网站建设合同细节品牌注册号是哪个
  • 用Requests+BeautifulSoup实现天气预报数据采集:从入门到实战
  • 叫别人做网站要给什么东西长春哪家网站做的好
  • 怎样给网站做流量二级域名做网站域名
  • C语言⽂件操作讲解(1)
  • 记录commandlinerunner错误解决
  • Next.js + TanStack Query 架构中三种常见的数据请求模式
  • ISO 雨晨 26200.6588 Windows 11 企业版 LTSC 25H2 自用 edge 140.0.3485.81
  • 企业手机app开发公司网站优化策划方案