《Muduo网络库:实现EventLoop事件循环》
已经完成了EventLoop事件循环的两大重要模块,Channel通道模块和Poller抽象基类模块、以及具体Epoll的实现,理清EventLoop、Channel、Poller三者之间的关系,下面我们实现EventLoop事件循环类。
Muduo网络库的Reactor模型如下:
说明:
- Muduo网络库使用的是主从多线程模型,即一个mainLoop(mainReactor),多个subLoop(subReactor)。
- one loop per thread,一个线程一个EventLoop事件循环。
- 主线程mainLoop负责监听新连接fd,将新连接通过轮询算法分发给某个子线程subLoop。子线程subLoop处理已分配连接的IO事件(读写数据)和执行连接相关的业务逻辑。
- 主线程同时会为每一个fd创建对应的Channel,Channel包含了fd及其感兴趣的事件以及实际Poller返回的具体发生的事件。
先看代码,再分析其设计的细节与思路。
EventLoop.h
#pragma once#include "noncopyable.h"
#include "Timestamp.h"
#include "CurrentThread.h"#include <functional>
#include <vector>
#include <atomic>
#include <memory>
#include <mutex>class Channel;
class Poller;// 事件循环类 主要包含了两个大模块 Channel Poller(epoll的抽象)
class EventLoop : noncopyable
{
public:using Functor = std::function<void()>;EventLoop();~EventLoop();// 开启事件循环void loop();// 退出事件循环void quit();Timestamp pollReturnTime() const { return pollReturnTime_; }// 在当前loop中执行回调cbvoid runInloop(Functor cb);// 把cb放入队列中,唤醒loop所在的线程,执行cbvoid queueInLoop(Functor cb);// 用来唤醒loop所在的线程的void wakeup();// EventLoop的方法 =》 Poller的方法void updateChannel(Channel *channel);void removeChannel(Channel *channel);bool hasChannel(Channel *channel);// 判断EventLoop对象是否在自己的线程里面bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }private:void handleRead(); // waked upvoid doPendingFunctors(); // 执行回调using ChannelList = std::vector<Channel *>;std::atomic_bool looping_; // 原子操作的布尔值,底层CAS实现std::atomic_bool quit_; // 标识退出loop循环const pid_t threadId_; // 记录当前loop所在线程的idTimestamp pollReturnTime_; // poller返回发生事件的channels的时间点std::unique_ptr<Poller> poller_;int wakeupFd_; // 主要作用是:当mainLoop获取一个新用户的channel,通过轮询算法选择一个subLoop,通过该成员唤醒subLoop处理channelstd::unique_ptr<Channel> wakeupChannel_;ChannelList activeChannels_; // 活跃的channel列表,存储poll调用返回的有事件发生的channelstd::atomic_bool callingPendingFunctors_; // 标识当前loop是否有需要执行的回调操作std::vector<Functor> pendingFunctors_; // 存储loop需要执行的所有的回调操作std::mutex mutex_; // 互斥锁,用来保护上面vector容器的线程安全操作
};
EventLoop.cc
#include "EventLoop.h"
#include "Logger.h"
#include "Poller.h"
#include "Channel.h"#include <sys/eventfd.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <memory>// 防止一个线程创建多个Eventloop
__thread EventLoop *t_loopInThisThread = nullptr;// 定义默认的Poller IO复用接口的超时时间
const int kPollTimeMs = 10000;// 创建wakeupfd,用来notify唤醒subReactor处理新来的channel
int createEventfd()
{int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);if (evtfd < 0){LOG_FATAL("eventfd error:%d \n", errno);}return evtfd;
}EventLoop::EventLoop(): looping_(false), quit_(false), callingPendingFunctors_(false), threadId_(CurrentThread::tid()), poller_(Poller::newDefaultPoller(this)), wakeupFd_(createEventfd()), wakeupChannel_(new Channel(this, wakeupFd_))
{LOG_DEBUG("EventLoop created %p in thread %d \n", this, threadId_);if (t_loopInThisThread){LOG_FATAL("Another EventLoop %p exists in this therad %d \n", t_loopInThisThread, threadId_);}else{t_loopInThisThread = this;}// 设置wakeupfd的事件类型以及发生事件后的回调操作wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead, this));// 每一个eventloop都将监听wakeupchannel的EPOLLIN读事件了wakeupChannel_->enableReading();
}
EventLoop::~EventLoop()
{wakeupChannel_->disableAll();wakeupChannel_->remove();::close(wakeupFd_);t_loopInThisThread = nullptr;
}// 开启事件循环
void EventLoop::loop()
{looping_ = true;quit_ = false;LOG_INFO("EventLoop %p start looping. \n", this);while (!quit_){activeChannels_.clear();// EventLoop可以想象成poll poll监听两类fd,一种是client通信的fd,一种是唤醒subLoop的wakeupFdpollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);for (Channel *channel : activeChannels_){// Poller监听哪些channel发生事件了,然后上报给EventLoop,通知channel处理事件channel->handleEvent(pollReturnTime_);}// 执行当前EventLoop事件循环需要处理的回调操作/*** eg:* IO线程(主线程mainLoop负责accept fd监听新连接,打包成一个channel),如果是已经发生事件的channel,就要将channel注册分发给工作线程subLoop* 但是mainLoop不能直接操作subLoop的Poller,只能委托subLoop自己处理* 主线程调用subLoop的queueInLoop()方法,将"注册Channel"的回调加入pendingFunctors_队列* 主线程再调用wakeup()唤醒subLoop(subLoop在poll阻塞)* subLoop线程被唤醒之后,在doPendingFunctors()中,执行之前mainLoop注册的"注册Channel"回调操作* 这时候操作的是自己的Poller,不会有线程安全问题*/doPendingFunctors();}LOG_INFO("EventLoop %p stop looping. \n", this);looping_ = false;
}// 退出事件循环
/*** 退出eventloop有两种情况 1.loop在自己的线程中调用quit 2.在非loop的线程中,调用loop的quit* mainLoop** waupFd* no =============== 生产者-消费者的线程安全队列 mainLoop向队列写channel subLoop从队列取channel** subLoop1 subLoop2 subLoop3* 比如:subLoop1在subLoop2中调用quit,想退出subLoop2;或者在mainLoop中调用quit,想退出mainLoop都需要wakeupFd先唤醒另一个线程*/
void EventLoop::quit()
{quit_ = true;// EventLoop的quit()可能被其他线程调用,比如mainLoop调用subLoop的quit()退出subLoop,需要先唤醒subLoopif (!isInLoopThread()){wakeup();}
}// 在当前loop中执行回调cb
void EventLoop::runInloop(Functor cb)
{if (isInLoopThread()) // 在当前的loop线程中,执行cb{cb();}else // 在非当前的loop线程中执行cb,就需要唤醒loop所在的线程,执行cb。eg:subLoop1需要执行subLoop2的runInLoop,subLoop1就要唤醒subLoop2{queueInLoop(cb);}
}
// 把cb放入队列中,唤醒loop所在的线程,执行cb
void EventLoop::queueInLoop(Functor cb)
{// 加锁允许其他线程安全的提交需要执行的操作(回调函数){std::unique_lock<std::mutex> lock(mutex_);pendingFunctors_.emplace_back(cb);}// 唤醒相应的,需要执行上面回调操作的loop的线程/*** || callingPendingFunctors_的意思是:当前loop正在执行回调,但是loop自己又给自己添加了新的回调,也需要wakeup()唤醒,* 保证当前loop也能处理这个新的回调。因为执行dopendingFunctors()开始时已经通过swap将当时所有的回调取走了* 因此需要wakeup()唤醒自己,让EventLoop执行完当前回调之后,不进入poll阻塞,直接再次执行dopendingFunctors()处理新添加的回调,避免新回调延迟*/if (!isInLoopThread() || callingPendingFunctors_){wakeup(); // 唤醒loop所在的线程}
}void EventLoop::handleRead()
{uint64_t one = 1;ssize_t n = read(wakeupFd_, &one, sizeof one);if (n != sizeof one){LOG_ERROR("EventLoop::handleRead() reads %d bytes instad of 8", n);}
}// 用来唤醒loop所在的线程的 向wakeupFd_上写一个数据,wakeupChannel就发生了读事件,当前loop线程就会被唤醒
void EventLoop::wakeup()
{uint64_t one = 1; ssize_t n = write(wakeupFd_, &one, sizeof one);if (n != sizeof one){LOG_ERROR("EventLoop::wakeup() writes %lu bytes instead of 8 \n", n);}
}// EventLoop的方法 =》 Poller的方法
void EventLoop::updateChannel(Channel *channel)
{poller_->updateChannel(channel);
}
void EventLoop::removeChannel(Channel *channel)
{poller_->removeChannel(channel);
}
bool EventLoop::hasChannel(Channel *channel)
{return poller_->hasChannel(channel);
}// 执行回调
void EventLoop::doPendingFunctors()
{/*** 定义一个新的functors容器,和存储回调操作的pendingFunctors_进行交换。为什么不直接加锁遍历pendingFuncotrs_?* 锁竞争激烈。执行回调的过程可能很慢,锁被长时间占有,其他线程调用queueInloop()添加新的回调时会阻塞在加锁步骤,导致新回调无法及时添加* 定义一个新functor,和pendingFunctors_交换目的是:* 通过swap()瞬间完成回调的转移,将持有锁的时间压缩到最短* 用新容器存储待执行的回调,其他线程可以继续添加回调到pendingFuncotrs_,使添加回调和执行回调可以并发进行* 平衡了线程安全和执行效率,避免回调处理成为性能瓶颈*/std::vector<Functor> functors;callingPendingFunctors_ = true;{std::unique_lock<std::mutex> lock(mutex_);functors.swap(pendingFunctors_);}for (const Functor &functor : functors){functor(); // 执行当前loop需要执行的回调操作}callingPendingFunctors_ = false;
}
更改DefaultPoller.cc
更改Channel.cc
以上实现的基于事件驱动的EventLoop类,是典型的Reactor模式实现,也是Muduo的多线程Reactor模型的核心代码。
思路分析
mainLoop监听到一个新连接的Channel,要将这个Channel分发给某个subLoop进行管理,那么如何让subLoop管理这个Channel?
主线程mainLoop不能直接操作子线程subLoop(其他EventLoop)的Poller(比如,调用poller_->updateChannel())。核心原因是因为EventLoop内部有很多非线程安全的状态和操作,这些状态如果被多线程同时操作,可能会发生数据竞争、状态不一致等线程安全问题,最终导致程序崩溃、逻辑错乱或死锁。
一、直接操作会破坏Poller的线程安全
每个EventLoop都持有一个Poller(封装epoll的IO多路复用机制),Poller是完全非线程安全的组件,它内部维护两个关键状态:
- 内核态:与OS内核交互的epoll_fd(epoll的核心文件描述符),通过epoll_ctl注册/删除/修改/事件。
- 用户态:Channel与fd的映射表(std::unordered_map<int, Channel*>),用于快速查找fd对应的Channel。
如果主线程直接操作子线程subLoop的Poller,比如调用poller_->updateChannel()注册/修改事件、poller_->removeChannel()移除事件,会触发两个致命问题:
问题1:内核态操作冲突
epoll_ctl本身不是线程安全的,不支持多线程并发调用,如果主线程和子线程同时对一个epoll_fd调用epoll_ctl,会导致内核中epoll的事件表混乱:
比如主线程想注册fd=10的读事件,子线程同时想删除fd=10的事件,内核可能只执行其中一个操作,或把事件表搞坏,最终导致fd=10的事件完全无法被监听,更严重的是,这种冲突可能直接触发内核断言失败,导致程序崩溃。
问题2:用户态映射表竞争
Poller的用户态映射表unordered_map<int, Channel*>是普通的STL容器,没有任何线程安全的保护。如果主线程直接修改这个映射表(比如添加新Channel),而子线程同时在遍历这个映射表(子线程在epoll_wait返回后,可能会遍历映射表比如批量处理事件时),会导致:
- 迭代器失效:子线程遍历到一半时,主线程插入/删除元素,会导致STL容器迭代器失效,直接触发程序崩溃。
- 数据不一致:主线程可能先在映射表中添加了fd->Channel的记录,但还没来得及调用epoll_ctl(EPOLL_CTL_ADD)注册到内核,而子线程此时可能通过映射表认为该fd已经被管理,导致事件漏处理。
- 反向不一致:主线程可能先调用epoll_ctl(EPOLL_CTL_DEL)从内核中删除了事件,但还没来得及从映射表中移除fd记录,子线程可能继续处理该fd事件,导致对无效Channel的操作。
二、直接操作会破坏EventLoop的核心循环逻辑
子线程的subLoop一直在执行loop()循环,这个循环是单线程串行执行的,逻辑是:
// 开启事件循环
void EventLoop::loop()
{while (!quit_){activeChannels_.clear(); // 清空活跃的Channel列表activeChannels_pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); // 阻塞等待事件,有事件发生放入activeChannels_for (Channel *channel : activeChannels_) // 遍历activeChannels_处理活跃的Channel{channel->handleEvent(pollReturnTime_); }doPendingFunctors(); // 执行回调}
}
在事件循环中,activeChannels_会被频繁修改和遍历,每次循环开始调用activeChannels_.clear()清空,然后由poller_->poll()填充新的活跃的Channel,接着for循环遍历activeChannels_处理活跃的Channel事件。
如果此时主线程直接操作subLoop的activeChannels_(比如添加/删除Channel)会打断这个串行逻辑:
- 遍历过程中元素数量变化,引发迭代器失效,程序崩溃
- 同时读写导致数据不一致(主线程添加的Channel可能被重复处理,子线程刚清空,主线程就插入,导致下一次循环重复处理;或完全不处理,主线程刚添加Channel,子线程就调用了clear)。
主线程mainLoop并不能直接操作其他子线程subLoop,本质是“非线程安全的组件不能被多线程并发修改”。
那么,主线程应该如何操作其他EventLoop对象呢?
------ 线程归属检查:限制危险操作只能在EventLoop所属的线程执行 ------
即在多线程环境中,如何安全的操作其他EventLoop对象? 首先就必须要有明确的线程归属检查操作,检查当前正在执行代码的线程,是否就是创建这个EventLoop对象的线程。
// 判断EventLoop对象是否在自己的线程里面bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }
简单来说,就是操作EventLoop对象(执行EventLoop的核心方法)的当前线程,是创建这个EventLoop对象的线程,还是其他的线程。
- 如果当前线程是创建这个EventLoop对象的线程,可以直接操作这个EventLoop对象,不存在线程安全问题。
- 如果当前线程是其他的线程,不能直接操作这个EventLoop对象,会导致非线程安全状态被并发修改。
主线程并不能直接操作子线程的Poller,那么mainLoop如何将Channel轮询分发给subLoop,让subLoop管理(subLoop需要将该Channel注册进Poller)呢?mainLoop应该如何安全的操作子线程的EventLoop对象呢?
------- 线程安全的任务队列:让其他线程“间接提交”操作 ------
主线程“委托”子线程自己来执行该操作(比如“注册Channel”到subLoop的Poller)。
主线程将需要执行的操作(比如“注册Channel”)封装成回调函数(using Functor = std::function<void()>)。
EventLoop通过pendingFunctors_(任务队列)和互斥锁,允许其他线程安全的提交需要执行的操作(回调函数)。
- 如果当前线程是EventLoop所属线程,直接执行该回调操作,不涉及线程安全问题。
- 如果当前线程是其他线程,调用queueInLoop(),将该回调安全放进任务队列pendingFunctors_(加锁保证安全)里,子线程自己从任务队列中取回调操作来执行。
子线程自己来执行回调,此时操作的就是自己的EventLoop。所有对EventLoop的非线程安全状态的修改都由EventLoop所属的线程完成。
通过“委托”,就把“跨线程操作”转化为“子线程单线程内的串行操作”。这种设计通过“跨线程提交任务”,“本线程处理任务”的模式,彻底避免了线程安全问题。
------ 线程唤醒机制:确保EventLoop能够及时处理新任务 ------
EventLoop的loop()方法大部分时间都阻塞在poller_->poll()(等待IO事件),如果其他线程提交了新任务(比如,如果主线程监听到一个Channel事件,将“注册Channel”的回调操作提交到了任务队列里,子线程要自己执行该回调,将Channel注册到自己的Poller中),但是,此时子线程subLoop还在一直阻塞监听的状态,如何及时地执行该回调任务,而避免任务的延迟处理呢?
#include <sys/eventfd.h>
int eventfd(unsigned int initval, int flags);
参数1:内核计数器的初始值,通常为0。
参数2:控制eventfd行为的标志,通常有两个:
- EFD_NONBLOCK:将FD设为非阻塞模式(避免读/写操作阻塞线程)。
- EFD_CLOEXEC:设置FD的FD_CLOEXEC标志。
返回值:成功返回eventfd对应的文件描述符(非负整数),失败返回-1并设置errno。
eventfd()系统调用是一个轻量级的跨线程/进程通信机制,专门用于“事件通知”场景,通过一个特殊的文件描述符(FD)传递简单的数值信号,实现线程间的高效唤醒。
本质:一个内核维护的“计数器+文件描述符”。创建eventfd后,会得到一个FD,对这个FD的读写操作,本质是操作内核中的计数器。
工作原理:
- 创建:调用eventfd()创建一个eventfd对象,返回一个文件描述符。
- 写入:使用write()写入一个64位无符号整数(uint64_t),会将该值加到计数器上。
- 读取:使用read()读取计数器的值,默认模式下会将计数器清0;信号量模式下会将计数器减1
- 通知机制:当计数器值大于0时,文件描述符变为可读状态,可被I/O多路复用机制检测到。
优势:
- 相比传统的“管道(pipe)”唤醒,eventfd更轻量(仅需一个FD,而pipe需要两个)。
- 更高效(传递的是64位数值,仅用于通知,而pipe传递的是字节流,需处理复杂的缓冲区)。
- 且专门为“事件通知”设计,适用于线程/进程间“唤醒通知,而pipe适用的是通用字节流通信。
其他线程通过写eventfd发送可读事件,子线程监听到读eventfd事件从阻塞态被唤醒,就可以取任务队列里的回调执行了。
具体场景回顾:
- 主线程给子线程分配新连接时,会调用subLoop的queueInLoop()提交“注册Channel”的回调,将该回调放入pendingFunctors_队列中;
- 为了让subLoop线程尽快的处理这个回调,主线程会调用subLoop的wakeup(),向wakeupFd_写入数据;
- subLoop原本阻塞在poller_->poll()(等待IO事件),但wakeupFd_有了读事件后,poll()会立即返回;
- subLoop处理完wakeupFd_的读事件后,会执行doPendingFunctors(),从pendingFunctors_队列中取回调,处理主线程提交的回调(注册新Channel);
- subLoop此时操作的是自己的EventLoop对象的updateChannel()方法,不存线程安全问题。
如果没有eventfd,subLoop会一直阻塞在poll()中(直到超时或有其他IO事件),主线程提交的回调可能很久才被处理,导致新连接无法及时响应。
这种设计保证了跨线程任务的实时性,需要处理任务时可立即被唤醒。
核心功能
- 事件循环管理:通过loop()方法开启事件循环,quit()方法退出事件循环
- 事件分发:使用Poller(IO多路复用)监听文件描述符
- 跨线程任务调度:提供runInLoop()和queueInLoop()方法在事件循环线程中执行任务
- 线程唤醒机制:使用eventfd实现高效的线程间唤醒
关键分析
1、线程唯一性
- 使用__thread EventLoop* t_loopInThisThread确保每个线程最多只有一个EventLoop实例
- 在构造函数中检查并设置这个线程局部变量
2、事件循环流程
- 在loop()方法中,不断调用poller_->poll()等待事件发生
- 对活跃的Channel调用handleEvent()处理事件
- 执行等待中的回调函数doPendingFunctors()
3、线程安全的任务队列
- 使用pendingFunctors_存储需要在事件循环线程执行的回调
- 通过互斥锁mutex_保证队列操作的线程安全
- 在doPendingFunctors()中通过交换队列的方式减少锁的持有时间
4、线程唤醒机制
- 通过createEventfd()创建wakeupFd_用于线程间通信
- wakeup()方法向wakeupFd_写入数据,触发读事件
- handleRead()处理唤醒事件,读取wakeupFd_中的数据
5、Channel管理
- 间接通过Poller管理Channel的注册、更新和移除
- 确保Channel只属于一个EventLoop
设计亮点
1、高效的线程唤醒:
使用eventfd而非传统的pipe,更高效且适合事件通知
2、减少锁竞争:
在处理回调时通过交换向量的方式,缩短了持有锁的时间,且添加回调和执行回调可以并发执行,效率高
3、线程安全设计:
明确的线程归属检查、线程安全的任务队列、跨线程操作时的唤醒机制保证
4、分层设计:
EventLoop专注于事件循环调度,具体的IO复用由Poller实现