C++项目:仿muduo库高并发服务器------EventLoop模块的设计
文章目录
- 前言
- 一、 EventLoop模块
- 二、EventLoop模块设计思路
- 2.1 事件通知机制
- 2.2 事件处理
- 三、代码实现
前言
本篇文章介绍的是仿muduo库高并发服务器中的EventLoop模块的设计思路,及代码实现。
一、 EventLoop模块
- 功能:
- 事件监控管理模块,是“one thread one loop”里的loop、reactor
- 一个模块对应一个线程
- 意义:
- 负责服务器所有事件
- 每个Connection连接绑定一个EventLoop模块和线程,连接操作需在对应线程执行
- 思想:
- 监控所有连接事件,事件触发后调用回调函数处理
- 连接操作放到EventLoop线程执行
- 功能设计:
- 连接操作任务入队
- 定时任务增、刷、删
EventLoop模块综合了其他模块的功能,是一个比较重要的模块
二、EventLoop模块设计思路
2.1 事件通知机制
这里我们直接使用库中提供的eventfd机制作为事件通知机制
-
eventfd 机制:是一种事件通知机制,通过创建描述符实现事件通知,本质是内核管理的计数器,创建时会在内核中生成计数器结构。
-
操作方式:向 eventfd 写入数值表示事件通知次数,可用
read
读取通知次数,读取后计数清 0;也通过read
、write
、close
操作,且进行 IO 时数据只能是 8 字节。 -
函数
int eventfd(unsigned int initval, int flags);
功能:创建 eventfd 对象,实现事件通知。
参数:initval
:计数初值(一般设置为0)。flags
:可选项为EFD_CLOEXEC
(禁止进程复制)、EFD_NONBLOCK
(启动非阻塞属性)。
-
返回值:返回用于操作的文件描述符。
-
用处:在 EventLoop 模块中实现线程间的事件通知功能。
这里不做详细介绍可自行了解
2.2 事件处理
EventLoop模块在进行事件处理时,面临着下列问题:
- 线程安全问题:若一个描述符在多个线程中触发事件并处理,会存在线程安全问题,所以需将一个连接的事件监控、处理及其他操作放在同一线程。
如:一个连接触发事件后,A线程执行操作,执行过程中该连接又在B线程中触发了事件,就会造成线程安全问题。
要解决这个问题就需要将一个连接的监控与一个线程绑定起来,使得该连接的事件只能在对于线程中执行,但是直接与线程绑定是做不到的,我们可以通过和EventLoop对象绑定,达到和线程绑定的目的。
- 保证操作在对应线程的方案:给 EventLoop 模块添加任务队列,对连接的所有操作进行封装,当作任务添加到任务队列而非直接执行。
应对使用者使用线程池分发处理任务,我们通过包装任务,将任务具体操作存入EventLoop中的任务队列中,当线程处理完就绪事件,再将任务从任务队列中取出,统一执行任务
- EventLoop 处理流程:
- 在线程中对描述符进行事件监控。
- 若有描述符就绪,对其进行事件处理(需保证处理回调函数中的操作在线程中)。
- 所有就绪事件处理完后,执行任务队列中的所有任务。
这样能保证连接的所有操作在一个线程中进行,不涉及线程安全问题。但是任务队列的操作存在线程安全问题,我们只需要给任务(task)的操作加一把锁即可。
通过上面的设计,我们可以保证一个连接绑定一个线程(对连接的事件监控、对就绪事件的操作)巧妙的避免了对锁的大量使用
三、代码实现
```cpp
class EventLoop{
private:using Functor=std::function<void()>;
public:EventLoop():_thread_id(std::this_thread::get_id())//封装的线程id,_event_fd(CreateEventfd()),_eventfd_channel(new Channel(this,_event_fd)){//获取事件通知_eventfd_channel->SetReadCallback(std::bind(&EventLoop::ReadEventfd,this));_eventfd_channel->EnableRead();}//启动EventLoop//1.添加事件监控//2.就绪事件处理//3.执行任务void Start(){std::vector<Channel*>actives;_poller.Poll(&actives);for(auto &it:actives){it->HandleEvent();}RunAllTask();}//判断将要执行的操作 是否处于当前线程,如果是就执行不是就压入任务池void RunInLoop(const Functor&cb){if(IsInLoop()){cb();}else{QueueInLoop(cb);}}//将任务压入任务池void QueueInLoop(const Functor&cb){{std::unique_lock<std::mutex> lock(_mutex);_task.push_back(cb);}//唤醒epoll,epoll可能因为没有事件就绪而阻塞WeakUpEventFd();//---------------------------------这里为什么要唤醒,epoll阻塞就阻塞了唤醒干嘛}//判断当前线程是否是EventLoop对应的线程bool IsInLoop(){return _thread_id==std::this_thread::get_id();}//添加/修改描述符监控void UpdateEvent(Channel*channel){_poller.UpdateEvent(channel);}//移除描述符监控void RemoveEvent(Channel*channel){_poller.UpdateEvent(channel);}//执行任务池中的所有任务void RunAllTask(){std::vector<Functor> functor;{std::unique_lock<std::mutex> _lock(_mutex);std::swap(functor,_task);}for(auto &f:functor){f();}return;}//创建事件通知描述符static int CreateEventfd(){// int eventfd(unsigned int initval, int flags);int evfd=eventfd(0,EFD_CLOEXEC|EFD_NONBLOCK);//防止拷贝、非阻塞if(evfd<0){ERR_LOG("EVENTFD FAIL %s",strerror(errno));abort();}return evfd;}void ReadEventfd(){uint64_t val=0;int ret=read(_event_fd,&val,sizeof(val));if(ret<0){if(errno==EINTR||errno==EAGAIN){return;}ERR_LOG("READ FAIL %s",strerror(errno));abort();}}void WeakUpEventFd(){uint64_t val=0;int ret=write(_event_fd,&val,sizeof(val));if(ret<0){ERR_LOG("WRITE FAIL %s",strerror(errno));abort();}}
private:std::thread::id _thread_id;//线程idint _event_fd;//eventfd事件通知,唤醒阻塞的I/O事件Poller _poller;//进行所有描述符的事件监std::unique_ptr<Channel> _eventfd_channel;//对eventfd进行事件监控std::vector<Functor> _task;//任务池std::mutex _mutex;//任务锁,保证任务池操作线程安全};
这里我们就可以将Channel模块、Poller模块、EventLoop模块进行联合调试了,后面对调试内容补充