当前位置: 首页 > news >正文

muduo库源码分析: One Loop Per Thread

One Loop Per Thread的含义就是,一个EventLoop和一个线程唯一绑定,和这个EventLoop有关的,被这个EventLoop管辖的一切操作都必须在这个EventLoop绑定线程中执行

1.在MainEventLoop中,负责新连接建立的操作都要在MainEventLoop线程中运行。

2.已建立的连接分发到某个SubEventLoop上,这个已建立连接的任何操作,比如接收数据发送数据,连接断开等事件处理都必须在这个SubEventLoop线程上运行,还不准跑到别的SubEventLoop线程上运行。

一.主要成员

二.wakeupFd_

调用函数eventfd()会创建一个eventfd对象,或者也可以理解打开一个eventfd类型的文件,类似普通文件的open操作。

eventfd的在内核空间维护一个无符号64位整型计数器, 初始化为initval的值。

#include <sys/eventfd.h>
int eventfd(unsigned int initval, int flags);

flags是以下三个标志位OR结果

  • EFD_CLOEXEC(2.6.27~) : eventfd()返回一个文件描述符,如果该进程被fork的时候,这个文件描述符也会被复制过去,这个时候就会有多个描述符指向同一个eventfd对象,如果设置了这个标志,则子进程在执行exec的时候,会自动清除掉父进程的这个文件描述符
  • EFD_NONBLOCK(2.6.27~):文件描述符会被设置为O_NONBLOCK,如果没有设置这个标志位,read操作的时候将会阻塞直到计数器中有值,如果设置了这个这个标志位,计数器没有值得时候也会立刻返回-1
  • EFD_SEMAPHORE(2.6.30~): 这个标志位会影响read操作。

三.EventLoop对象和一个线程唯一绑定

/***** EventLoop.cc *****/
__thread EventLoop *t_loopInThisThread = nullptr;

EventLoop::EventLoop() : 
    wakeupFd_(createEventfd()), //生成一个eventfd,每个EventLoop对象,都会有自己的eventfd
	wakeupChannel_(new Channel(this, wakeupFd_))
{
    LOG_DEBUG("EventLoop created %p in thread %d \n", this, threadId_);
    if(t_loopInThisThread) //如果当前线程已经绑定了某个EventLoop对象了,那么该线程就无法创建新的EventLoop对象了
        LOG_FATAL("Another EventLoop %p exits in this thread %d \n", t_loopInThisThread, threadId_);
    else
        t_loopInThisThread = this;
    wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead, this));
    wakeupChannel_->enableReading(); 
}

介绍一下这个__thread,这个__thread是一个关键字,被这个关键字修饰的全局变量 t_loopInThisThread会具备一个属性,每个线程私有一份

在EventLoop对象的构造函数中,如果当前线程没有绑定EventLoop对象,那么t_loopInThisThread为nullptr,然后就让该指针变量指向EventLoop对象的地址。如果t_loopInThisThread不为nullptr,说明当前线程已经绑定了一个EventLoop对象了,这时候EventLoop对象构造失败!

四. EventLoop的线程只运行该Loop的操作

我们来描绘一个情景,我们知道每个EventLoop线程主要就是在执行其EventLoop对象的loop函数(该函数就是一个while循环,循环的获取事件监听器的结果以及调用每一个发生事件的Channel的事件处理函数)。此时SubEventLoop上注册的Tcp连接都没有任何动静,整个SubEventLoop线程就阻塞在epoll_wait()上。

EventLoop* ioLoop = threadPool_->getNextLoop();

ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));

此时MainEventLoop接受了一个新连接请求,并把这个新连接封装成一个TcpConnection对象,并且希望在SubEventLoop线程中执行TcpConnection::connectEstablished()函数,因为该函数的目的是将TcpConnection注册到SubEventLoop的事件监听器上,并且调用用户自定义的连接建立后的处理函数。当该TcpConnection对象注册到SubEventLoop之后,这个TcpConnection对象的任何操作(包括调用用户自定义的连接建立后的处理函数。)都必须要在这个SubEventLoop线程中运行,所以TcpConnection::connectEstablished()函数必须要在SubEventLoop线程中运行。

那么我们怎么在MainEventLoop线程中通知SubEventLoop线程起来执行TcpConnection::connectEstablished()函数呢?这里就要好好研究一下EventLoop::runInLoop()函数了。

void EventLoop::runInLoop(Functor cb)
{//该函数保证了cb这个函数对象一定是在其EventLoop线程中被调用。
    if(isInLoopThread())//如果当前调用runInLoop的线程正好是EventLoop的运行线程,则直接执行此函数
        cb();
    else//否则调用 queueInLoop 函数
        queueInLoop(cb);
}
void EventLoop::queueInLoop(Functor cb)
{
    {
        unique_lock<mutex> lock(mutex_);
        pendingFunctors_.emplace_back(cb);
    }
    if(!isInLoopThread() || callingPendingFunctors_)
        wakeup(); 
}

void EventLoop::wakeup()
{
    uint64_t one = 1;
    ssize_t n = write(wakeupFd_, &one, sizeof(one));
    if(n != sizeof(n))
        LOG_ERROR("EventLoop::wakeup() writes %lu bytes instead of 8 \n", n);
}

EventLoop* ioLoop = threadPool_->getNextLoop();

ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));

此时的Loop为线程池EventLoopThreadPool中取出的一个从Reactor

ioLoop调用runInLoop,将cb放入queue(pendingFunctors)中,调用wakeup唤醒ioLoop线程

void EventLoop::loop()
{ //EventLoop 所属线程执行
    looping_ = true;
    quit_ = false;
    LOG_INFO("EventLoop %p start looping \n", this);    
    while(!quit_)
    {
        activeChannels_.clear();
        pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);//此时activeChannels已经填好了事件发生的channel
        for(Channel *channel : activeChannels_)
            channel->HandlerEvent(pollReturnTime_);
        doPendingFunctors(); //执行当前EventLoop事件循环需要处理的回调操作。
    }
}
void EventLoop::doPendingFunctors()
{
   std::vector<Functor> functors;
   callingPendingFunctors_ = true;
   {
       unique_lock<mutex> lock(mutex_);
       functors.swap(pendingFunctors_); //这里的swap其实只是交换的vector对象指向的内存空间的指针而已。
   }
   for(const Functor &functor:functors)
   {
       functor();
   }
   callingPendingFunctors_ = false;
}

从Reactor的loop过程中,处理activeChannels_时,会包含wakeup_channel,此时可以接管TcpConnection,(已经建立且就绪的事件,就调用相应的回调函数即可)

相关文章:

  • 解决windows server 2012服务器注册表删除Grace Period报错无法删除 GracePeriod: 删除项时出错
  • 数据结构day05
  • Centos7配置本地Yum源以及网络YUM源(保姆级)
  • HTTP:四.HTTP连接
  • 【软考系统架构设计师】系统配置与性能评价知识点
  • 解决 Elasticsearch 分页查询性能瓶颈——从10分钟到秒级的优化实践
  • Java基础知识全面复习指南
  • Linux:线程理解和控制
  • vue周边库安装与开发者工具(vue系列二)
  • JavaScript 简单类型与复杂类型-复杂类型传参
  • Oracle 排除交集数据 MINUS
  • AOSP14 Launcher3——手势上滑关键类AbsSwipeHandler解析
  • NDT和ICP构建点云地图 |【点云建图、Ubuntu、ROS】
  • 【特权FPGA】之UART串口
  • ZYNQ笔记书签
  • 高等数学同步测试卷 同济7版 试卷部分 上 做题记录 第三章微分中值定理与导数的应用同步测试卷 A 卷
  • Blender安装基础使用教程
  • 仓储物流园区可视化平台,推动物流高效运转
  • Android 14 、15动态申请读写权限实现 (Java)
  • spark RDD相关概念和运行架构
  • 电商设计师常用的网站/中国进入一级战备状态了吗
  • 济南中京网站建设公司/中国营销型网站有哪些
  • 海城网站制作建设/百度热搜榜排名昨日
  • 东台网站网站建设/三亚百度推广地址
  • seo优化范畴/seo博客网址
  • 浙江省建设监理协会官方网站/如何在google上免费推广