当前位置: 首页 > news >正文

WebServer实现:muduo库的主丛Reactor架构

前言

  作为服务器,核心自然是高效的处理来自client的多个连接啦,那问题在于,如何高效的处理client的连接呢?这里就介绍两种架构:单Reactor架构和主丛Reactor架构。

单Reactor架构

  单Reactor架构的核心为,由一个主线程监听包括ServerFd在内的所有fd,当某个fd 可读/可写的时候,就把它交给线程池中的某个线程去处理,核心流程大概是:
单Reactor模型

while(true) {epoll_wait();/*遍历所有监听到的event*/for(所有活跃的events的fd) {if(fd == listenfd) dealListen();	/*如果是listen fd可读 ,则accept建立新的连接*/if(fd == readfd) dealRead(); /*如果clientfd可读,就交给线程池处理可读*/if(fd == writefd) dealWrite(); /*如果可写 交给线程池处理可写*/}
}
/*伪代码*/
dealListen()
{do{int retFd = accept();if(retFd < 0)		/*没有待处理的fd了*/break;/*其它和retFd相关的处理*/}while(true);
}

  这看起来是个很合理的实现,但是在某些场景下是会遇到性能瓶颈的,试想这样一个场景:如果同时突然间出现了大量的client请求链接,那我们的epoll_wait监听到的应该是listenFd可读,调用dealListen()来处理listen事件,dealListen可能是调用多次accept函数直到返回值小于0,才表明没有新的连接了,这个过程很费时间,也就意味着:哪怕有些client已经建立连接了,并且触发可读/可写了,主线程都无法及时将事件交给线程池处理,而这就是它的性能瓶颈
  这就引入了我们的主丛Reactor架构,它的处理逻辑是:主线程的epoll只负责监听,监听到了就把这个fd交给其他的epoll去处理(监听它的可读可写),这样,在面对突发IO连接的时候,也能及时响应。
主丛Reactor
  其中主Reactor的核心是Acceptor中处理listenfd的可读/写操作,然后把监听到的clientfd分发给其他的sub_reactor,当client_fd可读可写的时候,由对应的sub_reactor的epoll监听到进行读写。
  有了思路,剩下的事情就是怎么做,我们来看看muduo库的实现。

Channel类—>对fd的封装

  对于每个fd,不管是clientfd还是listenfd也好,监听到可读/可写后,都应该调用自身对应的可读/写回调函数进行处理;同时对于每个fd,他也有自己对应的epoll(在这里用EventLoop对epoll和thread进一步的封装)

class Channel : noncopyable
{
public:/*只是起了个别名的作用*/using EventCallback = std::function<void()>; // muduo仍使用typedefusing ReadEventCallback = std::function<void(Timestamp)>;// read和时间挂钩了Channel(EventLoop *loop, int fd);~Channel();// fd得到Poller通知以后 处理事件 handleEvent在EventLoop::loop()中调用void handleEvent(Timestamp receiveTime);// 设置回调函数对象// 用move比直接赋值好// 直接赋值有两次拷贝操作(实参到形参 形参到类内对象) 但是move简化了第二次// 使用move的前提:移动的资源在堆上,且支持这样的操作void setReadCallback(ReadEventCallback cb) { readCallback_ = std::move(cb); }void setWriteCallback(EventCallback cb) { writeCallback_ = std::move(cb); }void setCloseCallback(EventCallback cb) { closeCallback_ = std::move(cb); }void setErrorCallback(EventCallback cb) { errorCallback_ = std::move(cb); }// 防止当channel被手动remove掉 channel还在执行回调操作void tie(const std::shared_ptr<void> &);int fd() const { return fd_; }int events() const { return events_; }void set_revents(int revt) { revents_ = revt; }// 设置fd相应的事件状态 相当于epoll_ctl add deletevoid enableReading() { events_ |= kReadEvent; update(); }void disableReading() { events_ &= ~kReadEvent; update(); }void enableWriting() { events_ |= kWriteEvent; update(); }void disableWriting() { events_ &= ~kWriteEvent; update(); }void disableAll() { events_ = kNoneEvent; update(); }/*只是可读*/void setReading() { events_ = kReadEvent; update(); }/*只是可写*/void setWriting() { events_ = kWriteEvent; update(); }// 返回fd当前的事件状态bool isNoneEvent() const { return events_ == kNoneEvent; }bool isWriting() const { return events_ & kWriteEvent; }bool isReading() const { return events_ & kReadEvent; }int index() { return index_; }void set_index(int idx) { index_ = idx; }// one loop per threadEventLoop *ownerLoop() { return loop_; }void remove();private:void update();void handleEventWithGuard(Timestamp receiveTime);static const int kNoneEvent;static const int kReadEvent;static const int kWriteEvent;EventLoop *loop_; // 事件循环const int fd_;    // fd,Poller监听的对象int events_;      // 注册fd感兴趣的事件(只是一个表示,通过update调用到epoller的ctl重新设置flag)int revents_;     // Poller返回的具体发生的事件(poller在wait到之后通过set_revents返回值)int index_;std::weak_ptr<void> tie_;       /*观察对象是否存在*/bool tied_;// 因为channel通道里可获知fd最终发生的具体的事件events,所以它负责调用具体事件的回调操作ReadEventCallback readCallback_;    /*具体可读函数得看具体实现是咋样的*/EventCallback writeCallback_;EventCallback closeCallback_;EventCallback errorCallback_;
};
/*核心处理函数,当可读/写/ERROR发生的时候 调用对应的回调*/
void Channel::handleEventWithGuard(Timestamp receiveTime)
{//LOG_INFO<<"channel handleEvent revents:"<<revents_;// LOG_INFO("channel handleEvent revents:[%d]", revents_);// 关闭if ((revents_ & EPOLLHUP) && !(revents_ & EPOLLIN)) // 当TcpConnection对应Channel 通过shutdown 关闭写端 epoll触发EPOLLHUP{if (closeCallback_){closeCallback_();}}// 错误if (revents_ & EPOLLERR){if (errorCallback_){errorCallback_();}}// 读if (revents_ & (EPOLLIN | EPOLLPRI)){if (readCallback_){readCallback_(receiveTime);}}// 写if (revents_ & EPOLLOUT){if (writeCallback_){writeCallback_();}}
}/*update只是对epoll_ctl的进一步封装*//*只是可读*/void setReading() { events_ = kReadEvent; update(); }/*只是可写*/void setWriting() { events_ = kWriteEvent; update(); }

Epoller—>对Epoll的封装

  对于常用API进行了抽象和封装,epoll本身也归属于某个EventLoop。

EventLoop----“one loop per thread的体现”

  对于主Reactor也好,子Reactor也好,它们每个都是对应着某个线程,这也就是我们为什么抽象EventLoop这样一个类,那么思考EventLoop得有什么呢?

  1. 得有个线程处理函数func一直在执行
  2. fuc要做两件事:(1) 处理epoll_wait;(2) 处理和其它线程/资源交互的信息(比如主线程怎么把clientfd分发给其他线程)
      清楚了这两件事,那如何做呢?我们来看看muduo库的做法

线程间交互: eventfd + 回调函数机制

  如果想要和其它线程/资源交互,能想到的一点就是利用信号量/互斥量等操作,但是这样就有了另一个问题:我们的线程处理函数是要同时执行epoll_wait的,等不到的时候可是阻塞当前线程的,所以对于线程间的交互,处理的会不及时

while(true)
{epoll_wait(超时时间);for(){处理所有的事件};lock();	/*最坏情况得等到超时时间到了才能处理*/for(){处理线程间的交互事件}unlock();
}

  这里就引入了一个eventfd。我们给每一个EventLoop的Epoll,除了要监视clientfd之外,还需要额外监视一个fd----eventfd,关于它如果想多了解可以自行搜索,粗略的讲,eventfd里面有个计数器,每次write()写就会增加(此时会触发epoll可读),而每次读read()就会清零计数器。
  那以此来看其它线程可以怎么通过eventfd来和当前线程交互呢?—就是queueInLoop函数

// 把cb放入队列中 唤醒loop所在的线程执行cb(由其他线程或者本线程调用)
void EventLoop::queueInLoop(Functor cb)
{{std::unique_lock<std::mutex> lock(mutex_);pendingFunctors_.emplace_back(cb);/*存储了所有需要交互的函数*/}/*** || callingPendingFunctors的意思是 当前loop正在执行回调中 但是loop的pendingFunctors_中又加入了新的回调 需要通过wakeup写事件* 唤醒相应的需要执行上面回调操作的loop的线程 让loop()下一次poller_->poll()不再阻塞(阻塞的话会延迟前一次新加入的回调的执行),然后* 继续执行pendingFunctors_中的回调函数**/if (!isInLoopThread() || callingPendingFunctors_){wakeup(); // 唤醒loop所在线程,本质就是对eventfd进行写的操作}
}
// 当epoll触发的时候,eventfd对应的Channel绑定的回调函数
void EventLoop::handleRead()
{uint64_t one = 1;ssize_t n = read(wakeupFd_, &one, sizeof(one));	/*不清零就会一直epoll可读*/if (n != sizeof(one)){//LOG_ERROR<<"EventLoop::handleRead() reads"<<n<<"bytes instead of 8";LOG_ERROR("EventLoop::handleRead() reads[%d]bytes instead of 8",n);}
}

  依次就能写出这个EventLoop的loop函数的逻辑啦,eventfd确实帮了大忙简化了很多逻辑

EventLoop的核心----loop()函数

void EventLoop::loop()
{looping_ = true;quit_ = false;//LOG_INFO<<"EventLoop start looping";LOG_INFO("EventLoop start looping");while (!quit_){activeChannels_.clear();/*清空容器所有变量*/pollRetureTime_ = poller_->poll(kPollTimeMs, &activeChannels_);for (Channel *channel : activeChannels_){//LOG_INFO("active channel fd:[%d]",channel->fd());// Poller监听哪些channel发生了事件 然后上报给EventLoop 通知channel处理相应的事件// 包括cilentfd和eventfd两类(子Reactor)// 包括listenfd和eventfd两类(主Reactor)// 当然还有一个timefd,可以epoll + timefd处理一些超时事件// 感兴趣可以了解一下channel->handleEvent(pollRetureTime_);}/*** 执行当前EventLoop事件循环需要处理的回调操作 对于线程数 >=2 的情况 IO线程 mainloop(mainReactor) 主要工作:* accept接收连接 => 将accept返回的connfd打包为Channel => TcpServer::newConnection通过轮询将TcpConnection对象分配给subloop处理** mainloop调用queueInLoop将回调加入subloop(该回调需要subloop执行 但subloop还在poller_->poll处阻塞) queueInLoop通过wakeup将subloop唤醒**/doPendingFunctors();		/*调用vector保存的所有回调函数*/}//LOG_INFO<<"EventLoopstop looping";LOG_INFO("EventLoopstop looping")looping_ = false;
}

Acceptor:ListenFd的封装

  Acceptor里面的成员函数,就是主Reactor对应的线程的操作函数。我们来想Acceptor需要做什么:

  1. 得能开启listen
  2. 监听到可读事件后,需要调用对应的可读回调函数处理
  3. 在回调函数中,应该把clientfd分配给某个EventLoop

Acceptor::Acceptor(EventLoop *loop, const InetAddress &listenAddr, bool reuseport): loop_(loop), acceptSocket_(createNonblocking()), acceptChannel_(loop, acceptSocket_.fd()), listenning_(false)
{//LOG_INFO("server id:[%d]",acceptSocket_.fd());//LOG_INFO("Acceptor make success");acceptSocket_.setReuseAddr(true);acceptSocket_.setReusePort(true);acceptSocket_.bindAddress(listenAddr);// TcpServer::start() => Acceptor.listen() 如果有新用户连接 要执行一个回调(accept => connfd => 打包成Channel => 唤醒subloop)// baseloop监听到有事件发生 => acceptChannel_(listenfd) => 执行该回调函数acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this));
}Acceptor::~Acceptor()
{acceptChannel_.disableAll();    // 把从Poller中感兴趣的事件删除掉acceptChannel_.remove();        // 调用EventLoop->removeChannel => Poller->removeChannel 把Poller的ChannelMap对应的部分删除
}void Acceptor::listen()
{listenning_ = true;acceptSocket_.listen();         // 开启listenacceptChannel_.enableReading(); // acceptChannel_注册至Poller,要不怎么监听呢
}// listenfd有事件发生了,就是有新用户连接了
void Acceptor::handleRead()
{InetAddress peerAddr;do{int connfd = acceptSocket_.accept(&peerAddr);if(connfd < 0)break;//LOG_INFO("listen fd:[%d] success",connfd);//fcntl(connfd, F_SETFL, fcntl(connfd, F_GETFD, 0) | O_NONBLOCK);(不用 已经设计过了)if (connfd >= 0){if (NewConnectionCallback_) /*Tcp Server中调用的*/{/*这里实现看需求了,所以封装成回调函数的形式更灵活*/NewConnectionCallback_(connfd, peerAddr); // 轮询找到subLoop 唤醒并分发当前的新客户端的Channel}else{::close(connfd);}}else{LOG_ERROR("accept Err");if (errno == EMFILE){LOG_ERROR("sockfd reached limit");}}} while (true);}

EventLoopThreadPool----对应subReactor的操作

#include "../MultiReactor/EventLoopThread.h"
#include "../MultiReactor/EventLoop.h"EventLoopThread::EventLoopThread(const ThreadInitCallback &cb,const std::string &name): loop_(nullptr), exiting_(false), thread_(std::bind(&EventLoopThread::threadFunc, this), name), mutex_(), cond_(), callback_(cb)
{
}
// 由谁调用呢
EventLoopThread::~EventLoopThread()
{exiting_ = true;if (loop_ != nullptr){loop_->quit();thread_.join();//调用EventLoopThread析构的线程会阻塞 直到thread_对应的线程运行完毕}
}/*是让主Reactor调用的*/
EventLoop *EventLoopThread::startLoop()
{thread_.start(); // 启用底层线程Thread类对象thread_中通过start()创建的线程EventLoop *loop = nullptr;{std::unique_lock<std::mutex> lock(mutex_);cond_.wait(lock, [this](){return loop_ != nullptr;});loop = loop_;}return loop;
}// 下面这个方法 是在单独的新线程里运行的
void EventLoopThread::threadFunc()
{// 给每个线程创建一个EventLoopEventLoop loop;if(callback_){callback_(&loop);//如果设置了回调函数}// 新线程调用回调就能获得自己的EventLoop了// 所有的Loop是由谁创建的呢?{std::unique_lock<std::mutex> lock(mutex_);loop_ = &loop;cond_.notify_one();}loop.loop();    // 执行EventLoop的loop() 开启了底层的Poller的poll()std::unique_lock<std::mutex> lock(mutex_);loop_ = nullptr;
}#include <memory>
#include "../MultiReactor/EventLoopThreadPool.h"
#include "../MultiReactor/EventLoopThread.h"
#include "../Log/log.h"EventLoopThreadPool::EventLoopThreadPool(EventLoop *baseLoop, const std::string &nameArg): baseLoop_(baseLoop), name_(nameArg), started_(false), numThreads_(12), next_(0)
{
}
/*析构就是啥也不做?不过好像确实这个是整个程序的生命周期*/
EventLoopThreadPool::~EventLoopThreadPool()
{// Don't delete loop, it's stack variable
}void EventLoopThreadPool::start(const ThreadInitCallback &cb)
{started_ = true;for (int i = 0; i < numThreads_; ++i){char buf[name_.size() + 32];snprintf(buf, sizeof buf, "%s%d", name_.c_str(), i);/*池子确定了回调函数是什么*//*这是一个创建成功的回调函数 新线程创建成功了通过callback就能获得自己的专属loop_*/EventLoopThread *t = new EventLoopThread(cb, buf);threads_.push_back(std::unique_ptr<EventLoopThread>(t));loops_.push_back(t->startLoop()); // 底层创建线程 绑定一个新的EventLoop 并返回该loop的地址}if (numThreads_ == 0 && cb) // 整个服务端只有一个线程运行baseLoop{cb(baseLoop_);}
}// 如果工作在多线程中,baseLoop_(mainLoop)会默认以轮询的方式分配Channel给subLoop
EventLoop *EventLoopThreadPool::getNextLoop()
{// 如果只设置一个线程 也就是只有一个mainReactor 无subReactor // 那么轮询只有一个线程 getNextLoop()每次都返回当前的baseLoop_EventLoop *loop = baseLoop_;    // 通过轮询获取下一个处理事件的loop// 如果没设置多线程数量,则不会进去,相当于直接返回baseLoopif(!loops_.empty())             {loop = loops_[next_];++next_;// 轮询if(next_ >= loops_.size()){next_ = 0;}}return loop;
}std::vector<EventLoop *> EventLoopThreadPool::getAllLoops()
{if (loops_.empty()){return std::vector<EventLoop *>(1, baseLoop_);}else{return loops_;}
}

相关文章:

  • 高并发秒杀系统实战(Redis+Lua分布式锁防超卖与库存扣减优化)
  • JVM对象内存分配机制全解析
  • PCB板高速飞拍检测系统 助力电子制造自动化领域
  • 如何导出和迁移离线 Conda 环境
  • Cisco Nexus93240接口带宽显示异常高故障- bug
  • 【目标检测】什么是目标检测?应用场景与基本流程
  • [持续集成]
  • 【案例】性能优化在持续集成与持续交付中的应用
  • element-ui使用el-button去掉focus,删除el-button的focus效果
  • 魔音音乐 5.0.2 | 无损下载 同步网易云歌单UI美观
  • 数据分析对比图表-雷达图全面指南
  • 纯视觉SOTA!华科小米推出ReCogDrive:结合VLM和强化学习的端到端自动驾驶框架
  • 数据库中间件ShardingSphere5
  • 高性能群集部署技术-Nginx+Tomcat负载均衡群集
  • Zephyr 开发进阶:设备树 DTS、板卡 BSP 与驱动模型全解析
  • C++11 GC Interface:从入门到精通
  • window显示驱动开发—使用状态刷新回调函数
  • C# WPF程序界面美化方法与详细步骤
  • wpf DataTemplate 宽度和控件宽度一样
  • 在小程序中实现上下左右拖动表格
  • 做黑网站赚钱/舆情信息在哪里找
  • 帮别人做网站赚钱6/网店营销策划方案范文
  • 四川做网站优化价格/搜索引擎营销的特点有
  • 大连做网站绍兴厂商/seo优化工具哪个好
  • 怎样做元古建筑的网站结构图/可以免费领取会员的软件
  • 做一个网站flash收多少钱/在百度上怎么发布广告