当前位置: 首页 > news >正文

《Muduo网络库:实现EventLoop事件循环》

已经完成了EventLoop事件循环的两大重要模块,Channel通道模块和Poller抽象基类模块、以及具体Epoll的实现,理清EventLoop、Channel、Poller三者之间的关系,下面我们实现EventLoop事件循环类。

Muduo网络库的Reactor模型如下:

说明:

  • Muduo网络库使用的是主从多线程模型,即一个mainLoop(mainReactor),多个subLoop(subReactor)。
  • one loop per thread,一个线程一个EventLoop事件循环。
  • 主线程mainLoop负责监听新连接fd,将新连接通过轮询算法分发给某个子线程subLoop。子线程subLoop处理已分配连接的IO事件(读写数据)和执行连接相关的业务逻辑。
  • 主线程同时会为每一个fd创建对应的Channel,Channel包含了fd及其感兴趣的事件以及实际Poller返回的具体发生的事件。

先看代码,再分析其设计的细节与思路。

EventLoop.h

#pragma once#include "noncopyable.h"
#include "Timestamp.h"
#include "CurrentThread.h"#include <functional>
#include <vector>
#include <atomic>
#include <memory>
#include <mutex>class Channel;
class Poller;// 事件循环类 主要包含了两个大模块 Channel Poller(epoll的抽象)
class EventLoop : noncopyable
{
public:using Functor = std::function<void()>;EventLoop();~EventLoop();// 开启事件循环void loop();// 退出事件循环void quit();Timestamp pollReturnTime() const { return pollReturnTime_; }// 在当前loop中执行回调cbvoid runInloop(Functor cb);// 把cb放入队列中,唤醒loop所在的线程,执行cbvoid queueInLoop(Functor cb);// 用来唤醒loop所在的线程的void wakeup();// EventLoop的方法 =》 Poller的方法void updateChannel(Channel *channel);void removeChannel(Channel *channel);bool hasChannel(Channel *channel);// 判断EventLoop对象是否在自己的线程里面bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }private:void handleRead();        // waked upvoid doPendingFunctors(); // 执行回调using ChannelList = std::vector<Channel *>;std::atomic_bool looping_; // 原子操作的布尔值,底层CAS实现std::atomic_bool quit_;    // 标识退出loop循环const pid_t threadId_;     // 记录当前loop所在线程的idTimestamp pollReturnTime_; // poller返回发生事件的channels的时间点std::unique_ptr<Poller> poller_;int wakeupFd_; // 主要作用是:当mainLoop获取一个新用户的channel,通过轮询算法选择一个subLoop,通过该成员唤醒subLoop处理channelstd::unique_ptr<Channel> wakeupChannel_;ChannelList activeChannels_;  // 活跃的channel列表,存储poll调用返回的有事件发生的channelstd::atomic_bool callingPendingFunctors_; // 标识当前loop是否有需要执行的回调操作std::vector<Functor> pendingFunctors_;    // 存储loop需要执行的所有的回调操作std::mutex mutex_;                        // 互斥锁,用来保护上面vector容器的线程安全操作
};

EventLoop.cc

#include "EventLoop.h"
#include "Logger.h"
#include "Poller.h"
#include "Channel.h"#include <sys/eventfd.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <memory>// 防止一个线程创建多个Eventloop
__thread EventLoop *t_loopInThisThread = nullptr;// 定义默认的Poller IO复用接口的超时时间
const int kPollTimeMs = 10000;// 创建wakeupfd,用来notify唤醒subReactor处理新来的channel
int createEventfd()
{int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);if (evtfd < 0){LOG_FATAL("eventfd error:%d \n", errno);}return evtfd;
}EventLoop::EventLoop(): looping_(false), quit_(false), callingPendingFunctors_(false), threadId_(CurrentThread::tid()), poller_(Poller::newDefaultPoller(this)), wakeupFd_(createEventfd()), wakeupChannel_(new Channel(this, wakeupFd_))
{LOG_DEBUG("EventLoop created %p in thread %d \n", this, threadId_);if (t_loopInThisThread){LOG_FATAL("Another EventLoop %p exists in this therad %d \n", t_loopInThisThread, threadId_);}else{t_loopInThisThread = this;}// 设置wakeupfd的事件类型以及发生事件后的回调操作wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead, this));// 每一个eventloop都将监听wakeupchannel的EPOLLIN读事件了wakeupChannel_->enableReading();
}
EventLoop::~EventLoop()
{wakeupChannel_->disableAll();wakeupChannel_->remove();::close(wakeupFd_);t_loopInThisThread = nullptr;
}// 开启事件循环
void EventLoop::loop()
{looping_ = true;quit_ = false;LOG_INFO("EventLoop %p start looping. \n", this);while (!quit_){activeChannels_.clear();// EventLoop可以想象成poll poll监听两类fd,一种是client通信的fd,一种是唤醒subLoop的wakeupFdpollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);for (Channel *channel : activeChannels_){// Poller监听哪些channel发生事件了,然后上报给EventLoop,通知channel处理事件channel->handleEvent(pollReturnTime_);}// 执行当前EventLoop事件循环需要处理的回调操作/*** eg:* IO线程(主线程mainLoop负责accept fd监听新连接,打包成一个channel),如果是已经发生事件的channel,就要将channel注册分发给工作线程subLoop* 但是mainLoop不能直接操作subLoop的Poller,只能委托subLoop自己处理* 主线程调用subLoop的queueInLoop()方法,将"注册Channel"的回调加入pendingFunctors_队列* 主线程再调用wakeup()唤醒subLoop(subLoop在poll阻塞)* subLoop线程被唤醒之后,在doPendingFunctors()中,执行之前mainLoop注册的"注册Channel"回调操作* 这时候操作的是自己的Poller,不会有线程安全问题*/doPendingFunctors();}LOG_INFO("EventLoop %p stop looping. \n", this);looping_ = false;
}// 退出事件循环
/*** 退出eventloop有两种情况 1.loop在自己的线程中调用quit 2.在非loop的线程中,调用loop的quit*                              mainLoop**                              waupFd*                                                                          no =============== 生产者-消费者的线程安全队列 mainLoop向队列写channel subLoop从队列取channel**              subLoop1        subLoop2         subLoop3* 比如:subLoop1在subLoop2中调用quit,想退出subLoop2;或者在mainLoop中调用quit,想退出mainLoop都需要wakeupFd先唤醒另一个线程*/
void EventLoop::quit()
{quit_ = true;// EventLoop的quit()可能被其他线程调用,比如mainLoop调用subLoop的quit()退出subLoop,需要先唤醒subLoopif (!isInLoopThread()){wakeup();}
}// 在当前loop中执行回调cb
void EventLoop::runInloop(Functor cb)
{if (isInLoopThread()) // 在当前的loop线程中,执行cb{cb();}else // 在非当前的loop线程中执行cb,就需要唤醒loop所在的线程,执行cb。eg:subLoop1需要执行subLoop2的runInLoop,subLoop1就要唤醒subLoop2{queueInLoop(cb);}
}
// 把cb放入队列中,唤醒loop所在的线程,执行cb
void EventLoop::queueInLoop(Functor cb)
{// 加锁允许其他线程安全的提交需要执行的操作(回调函数){std::unique_lock<std::mutex> lock(mutex_);pendingFunctors_.emplace_back(cb);}// 唤醒相应的,需要执行上面回调操作的loop的线程/*** || callingPendingFunctors_的意思是:当前loop正在执行回调,但是loop自己又给自己添加了新的回调,也需要wakeup()唤醒,* 保证当前loop也能处理这个新的回调。因为执行dopendingFunctors()开始时已经通过swap将当时所有的回调取走了* 因此需要wakeup()唤醒自己,让EventLoop执行完当前回调之后,不进入poll阻塞,直接再次执行dopendingFunctors()处理新添加的回调,避免新回调延迟*/if (!isInLoopThread() || callingPendingFunctors_){wakeup(); // 唤醒loop所在的线程}
}void EventLoop::handleRead()
{uint64_t one = 1;ssize_t n = read(wakeupFd_, &one, sizeof one);if (n != sizeof one){LOG_ERROR("EventLoop::handleRead() reads %d bytes instad of 8", n);}
}// 用来唤醒loop所在的线程的 向wakeupFd_上写一个数据,wakeupChannel就发生了读事件,当前loop线程就会被唤醒
void EventLoop::wakeup()
{uint64_t one = 1; ssize_t n = write(wakeupFd_, &one, sizeof one);if (n != sizeof one){LOG_ERROR("EventLoop::wakeup() writes %lu bytes instead of 8 \n", n);}
}// EventLoop的方法 =》 Poller的方法
void EventLoop::updateChannel(Channel *channel)
{poller_->updateChannel(channel);
}
void EventLoop::removeChannel(Channel *channel)
{poller_->removeChannel(channel);
}
bool EventLoop::hasChannel(Channel *channel)
{return poller_->hasChannel(channel);
}// 执行回调
void EventLoop::doPendingFunctors()
{/*** 定义一个新的functors容器,和存储回调操作的pendingFunctors_进行交换。为什么不直接加锁遍历pendingFuncotrs_?* 锁竞争激烈。执行回调的过程可能很慢,锁被长时间占有,其他线程调用queueInloop()添加新的回调时会阻塞在加锁步骤,导致新回调无法及时添加* 定义一个新functor,和pendingFunctors_交换目的是:* 通过swap()瞬间完成回调的转移,将持有锁的时间压缩到最短* 用新容器存储待执行的回调,其他线程可以继续添加回调到pendingFuncotrs_,使添加回调和执行回调可以并发进行* 平衡了线程安全和执行效率,避免回调处理成为性能瓶颈*/std::vector<Functor> functors;callingPendingFunctors_ = true;{std::unique_lock<std::mutex> lock(mutex_);functors.swap(pendingFunctors_);}for (const Functor &functor : functors){functor(); // 执行当前loop需要执行的回调操作}callingPendingFunctors_ = false;
}

更改DefaultPoller.cc

更改Channel.cc

以上实现的基于事件驱动的EventLoop类,是典型的Reactor模式实现,也是Muduo的多线程Reactor模型的核心代码。

思路分析

mainLoop监听到一个新连接的Channel,要将这个Channel分发给某个subLoop进行管理,那么如何让subLoop管理这个Channel?

主线程mainLoop不能直接操作子线程subLoop(其他EventLoop)的Poller(比如,调用poller_->updateChannel())。核心原因是因为EventLoop内部有很多非线程安全的状态和操作,这些状态如果被多线程同时操作,可能会发生数据竞争、状态不一致等线程安全问题,最终导致程序崩溃、逻辑错乱或死锁。

一、直接操作会破坏Poller的线程安全

每个EventLoop都持有一个Poller(封装epoll的IO多路复用机制),Poller是完全非线程安全的组件,它内部维护两个关键状态:

  • 内核态:与OS内核交互的epoll_fd(epoll的核心文件描述符),通过epoll_ctl注册/删除/修改/事件。
  • 用户态:Channel与fd的映射表(std::unordered_map<int, Channel*>),用于快速查找fd对应的Channel。

如果主线程直接操作子线程subLoop的Poller,比如调用poller_->updateChannel()注册/修改事件、poller_->removeChannel()移除事件,会触发两个致命问题:

问题1:内核态操作冲突

epoll_ctl本身不是线程安全的,不支持多线程并发调用,如果主线程和子线程同时对一个epoll_fd调用epoll_ctl,会导致内核中epoll的事件表混乱:

比如主线程想注册fd=10的读事件,子线程同时想删除fd=10的事件,内核可能只执行其中一个操作,或把事件表搞坏,最终导致fd=10的事件完全无法被监听,更严重的是,这种冲突可能直接触发内核断言失败,导致程序崩溃。

问题2:用户态映射表竞争

Poller的用户态映射表unordered_map<int, Channel*>是普通的STL容器,没有任何线程安全的保护。如果主线程直接修改这个映射表(比如添加新Channel),而子线程同时在遍历这个映射表(子线程在epoll_wait返回后,可能会遍历映射表比如批量处理事件时),会导致:

  • 迭代器失效:子线程遍历到一半时,主线程插入/删除元素,会导致STL容器迭代器失效,直接触发程序崩溃。
  • 数据不一致:主线程可能先在映射表中添加了fd->Channel的记录,但还没来得及调用epoll_ctl(EPOLL_CTL_ADD)注册到内核,而子线程此时可能通过映射表认为该fd已经被管理,导致事件漏处理。
  • 反向不一致:主线程可能先调用epoll_ctl(EPOLL_CTL_DEL)从内核中删除了事件,但还没来得及从映射表中移除fd记录,子线程可能继续处理该fd事件,导致对无效Channel的操作。

二、直接操作会破坏EventLoop的核心循环逻辑

子线程的subLoop一直在执行loop()循环,这个循环是单线程串行执行的,逻辑是:

// 开启事件循环
void EventLoop::loop()
{while (!quit_){activeChannels_.clear();  // 清空活跃的Channel列表activeChannels_pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);  // 阻塞等待事件,有事件发生放入activeChannels_for (Channel *channel : activeChannels_)  // 遍历activeChannels_处理活跃的Channel{channel->handleEvent(pollReturnTime_);  }doPendingFunctors();  // 执行回调}
}

在事件循环中,activeChannels_会被频繁修改和遍历,每次循环开始调用activeChannels_.clear()清空,然后由poller_->poll()填充新的活跃的Channel,接着for循环遍历activeChannels_处理活跃的Channel事件。

如果此时主线程直接操作subLoop的activeChannels_(比如添加/删除Channel)会打断这个串行逻辑:

  • 遍历过程中元素数量变化,引发迭代器失效,程序崩溃
  • 同时读写导致数据不一致(主线程添加的Channel可能被重复处理,子线程刚清空,主线程就插入,导致下一次循环重复处理;或完全不处理,主线程刚添加Channel,子线程就调用了clear)。

主线程mainLoop并不能直接操作其他子线程subLoop,本质是“非线程安全的组件不能被多线程并发修改”。

那么,主线程应该如何操作其他EventLoop对象呢?

------ 线程归属检查:限制危险操作只能在EventLoop所属的线程执行 ------

即在多线程环境中,如何安全的操作其他EventLoop对象? 首先就必须要有明确的线程归属检查操作,检查当前正在执行代码的线程,是否就是创建这个EventLoop对象的线程。

 // 判断EventLoop对象是否在自己的线程里面bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }

简单来说,就是操作EventLoop对象(执行EventLoop的核心方法)的当前线程,是创建这个EventLoop对象的线程,还是其他的线程。

  • 如果当前线程是创建这个EventLoop对象的线程,可以直接操作这个EventLoop对象,不存在线程安全问题。
  • 如果当前线程是其他的线程,不能直接操作这个EventLoop对象,会导致非线程安全状态被并发修改。

主线程并不能直接操作子线程的Poller,那么mainLoop如何将Channel轮询分发给subLoop,让subLoop管理(subLoop需要将该Channel注册进Poller)呢?mainLoop应该如何安全的操作子线程的EventLoop对象呢?

------- 线程安全的任务队列:让其他线程“间接提交”操作 ------

主线程“委托”子线程自己来执行该操作(比如“注册Channel”到subLoop的Poller)。

主线程将需要执行的操作(比如“注册Channel”)封装成回调函数(using Functor = std::function<void()>)。

EventLoop通过pendingFunctors_(任务队列)互斥锁,允许其他线程安全的提交需要执行的操作(回调函数)。

  • 如果当前线程是EventLoop所属线程,直接执行该回调操作,不涉及线程安全问题。
  • 如果当前线程是其他线程,调用queueInLoop(),将该回调安全放进任务队列pendingFunctors_(加锁保证安全)里,子线程自己从任务队列中取回调操作来执行。

子线程自己来执行回调,此时操作的就是自己的EventLoop。所有对EventLoop的非线程安全状态的修改都由EventLoop所属的线程完成。

通过“委托”,就把“跨线程操作”转化为“子线程单线程内的串行操作”。这种设计通过“跨线程提交任务”“本线程处理任务”的模式,彻底避免了线程安全问题

------ 线程唤醒机制:确保EventLoop能够及时处理新任务 ------

EventLoop的loop()方法大部分时间都阻塞在poller_->poll()(等待IO事件),如果其他线程提交了新任务(比如,如果主线程监听到一个Channel事件,将“注册Channel”的回调操作提交到了任务队列里,子线程要自己执行该回调,将Channel注册到自己的Poller中),但是,此时子线程subLoop还在一直阻塞监听的状态,如何及时地执行该回调任务,而避免任务的延迟处理呢?

#include <sys/eventfd.h>

int eventfd(unsigned int initval, int flags);

参数1:内核计数器的初始值,通常为0。

参数2:控制eventfd行为的标志,通常有两个:

  • EFD_NONBLOCK:将FD设为非阻塞模式(避免读/写操作阻塞线程)。
  • EFD_CLOEXEC:设置FD的FD_CLOEXEC标志。

返回值:成功返回eventfd对应的文件描述符(非负整数),失败返回-1并设置errno。

eventfd()系统调用是一个轻量级的跨线程/进程通信机制,专门用于“事件通知”场景,通过一个特殊的文件描述符(FD)传递简单的数值信号,实现线程间的高效唤醒

本质:一个内核维护的“计数器+文件描述符”。创建eventfd后,会得到一个FD,对这个FD的读写操作,本质是操作内核中的计数器。

工作原理:

  • 创建:调用eventfd()创建一个eventfd对象,返回一个文件描述符。
  • 写入:使用write()写入一个64位无符号整数(uint64_t),会将该值加到计数器上。
  • 读取:使用read()读取计数器的值,默认模式下会将计数器清0;信号量模式下会将计数器减1
  • 通知机制:当计数器值大于0时,文件描述符变为可读状态,可被I/O多路复用机制检测到。

优势:

  • 相比传统的“管道(pipe)”唤醒,eventfd更轻量(仅需一个FD,而pipe需要两个)。
  • 更高效(传递的是64位数值,仅用于通知,而pipe传递的是字节流,需处理复杂的缓冲区)。
  • 且专门为“事件通知”设计,适用于线程/进程间“唤醒通知,而pipe适用的是通用字节流通信。

其他线程通过写eventfd发送可读事件,子线程监听到读eventfd事件从阻塞态被唤醒,就可以取任务队列里的回调执行了。

具体场景回顾:

  1. 主线程给子线程分配新连接时,会调用subLoop的queueInLoop()提交“注册Channel”的回调,将该回调放入pendingFunctors_队列中;
  2. 为了让subLoop线程尽快的处理这个回调,主线程会调用subLoop的wakeup(),向wakeupFd_写入数据;
  3. subLoop原本阻塞在poller_->poll()(等待IO事件),但wakeupFd_有了读事件后,poll()会立即返回;
  4. subLoop处理完wakeupFd_的读事件后,会执行doPendingFunctors(),从pendingFunctors_队列中取回调,处理主线程提交的回调(注册新Channel);
  5. subLoop此时操作的是自己的EventLoop对象的updateChannel()方法,不存线程安全问题。

如果没有eventfd,subLoop会一直阻塞在poll()中(直到超时或有其他IO事件),主线程提交的回调可能很久才被处理,导致新连接无法及时响应。

这种设计保证了跨线程任务的实时性,需要处理任务时可立即被唤醒。

核心功能

  1. 事件循环管理:通过loop()方法开启事件循环,quit()方法退出事件循环
  2. 事件分发:使用Poller(IO多路复用)监听文件描述符
  3. 跨线程任务调度:提供runInLoop()和queueInLoop()方法在事件循环线程中执行任务
  4. 线程唤醒机制:使用eventfd实现高效的线程间唤醒

关键分析

1、线程唯一性

  • 使用__thread EventLoop* t_loopInThisThread确保每个线程最多只有一个EventLoop实例
  • 在构造函数中检查并设置这个线程局部变量

2、事件循环流程

  • 在loop()方法中,不断调用poller_->poll()等待事件发生
  • 对活跃的Channel调用handleEvent()处理事件
  • 执行等待中的回调函数doPendingFunctors()

3、线程安全的任务队列

  • 使用pendingFunctors_存储需要在事件循环线程执行的回调
  • 通过互斥锁mutex_保证队列操作的线程安全
  • 在doPendingFunctors()中通过交换队列的方式减少锁的持有时间

4、线程唤醒机制

  • 通过createEventfd()创建wakeupFd_用于线程间通信
  • wakeup()方法向wakeupFd_写入数据,触发读事件
  • handleRead()处理唤醒事件,读取wakeupFd_中的数据

5、Channel管理

  • 间接通过Poller管理Channel的注册、更新和移除
  • 确保Channel只属于一个EventLoop

设计亮点

1、高效的线程唤醒:

使用eventfd而非传统的pipe,更高效且适合事件通知

2、减少锁竞争:

在处理回调时通过交换向量的方式,缩短了持有锁的时间,且添加回调和执行回调可以并发执行,效率高

3、线程安全设计:

明确的线程归属检查、线程安全的任务队列、跨线程操作时的唤醒机制保证

4、分层设计:

EventLoop专注于事件循环调度,具体的IO复用由Poller实现

http://www.dtcms.com/a/479513.html

相关文章:

  • 企业网站推荐微信公众号人工服务电话
  • 国家电网智能车载终端TBOX
  • 南宁网站建设升上去proxy网站
  • chatgpt-codex使用显示Error starting conversation
  • 搜狐网站建设的建议网站开发工资多少
  • Java中关于HashMap的元素遍历的顺序问题
  • 南通江苏网站建设建设淘宝网站的市场分析
  • 网站备案容易通过吗深入挖掘wordpress
  • 网站程序基础wordpress安装无法连接数据库
  • 淘宝天猫优惠卷网站建设网站建设 好发信息网
  • 柔性制造的终极答案:富唯智能机器人如何以“全能感官”实现一机多能
  • Zabbix 配置钉钉告警
  • 网站建设最快多长时间苏州做网站公司精选苏州聚尚网络
  • Docker 镜像版本Alpine、Slim、Bookworm、Bullseye、Stretch、Jessie
  • 网上网站代码可以下载吗转移网站如何转数据库
  • 企业网站示例网站建设方案撰写
  • 数据结构:哈希基础、6种哈希函数构造方法、4种解决哈希冲突的方法和哈希扩展(一致性哈希+虚拟节点+布隆过滤器)
  • 网站做哪种推广好建设银行租房网站湖北
  • 网站建设公司基本流程无锡专业做网站
  • 工商注册网站官网网站和app可以做充值余额功能
  • 东莞网站开发后缀微营销论文
  • 【AI论文】Paper2Video:从科学论文自动生成视频
  • Pod的进阶
  • 汽车4s店网站建设方案自己设计建房子的软件
  • 玉器哪家网站做的好中小企业信息网官网
  • Node.js zlib模块所有 API 详解 + 常用 API + 使用场景
  • wordpress怎么加入站长统计聊城网站建设策划建设公司
  • 优化网站的软件wordpress文章全部展示
  • 大型网站建设行情吉林长春建设工程信息网站
  • 在线网站做品牌网站怎么样