模仿muduo库——Eventloop
muduo库eventloop所需模块设计-CSDN博客
上篇博客我们已近把它所需的模块实现,现在来实现eventloop
文章目录
-
- 概要
- 整体架构流程
- 技术名词解释
- 技术细节
- 小结
概要
EventLoop
是 muduo 实现高并发网络编程的核心调度器,简单说就是一个负责事件管理和任务的调度的模块,它负责事件的监控,事件的分发,事件的调度,什么时候调用什么任务和什么事件就绪了需要被调用。
而我们的任务调度需要要分线程,现在还体现不出来,只有定时任务结束了放进任务池,和别人调用我的事件时,才放进进程池。
比如:
你自己想要说活, 那么直接说就可以了,而别人想让你说活,他不能帮你说,只能告诉你,让你说,你把当前说的那句话说完再回应他让你说的。
// EventLoop模块:
// EventLoop模块可以理解就是我们上边所说的Reactor模块,它是对Poller模块,TimerQueue模块,
// Socket模块的一个整体封装,进行所有描述符的事件监控。
// EventLoop模块必然是一个对象对应一个线程的模块,线程内部的目的就是运行EventLoop的启动函
// 数。
// EventLoop模块为了保证整个服务器的线程安全问题,因此要求使用者对于Connection的所有操作一
// 定要在其对应的EventLoop线程内完成,不能在其他线程中进行(比如组件使用者使用Connection发
// 送数据,以及关闭连接这种操作)。
// EventLoop模块保证自己内部所监控的所有描述符,都要是活跃连接,非活跃连接就要及时释放避免
// 资源浪费。
// • EventLoop模块内部包含有一个eventfd:eventfd其实就是linux内核提供的一个事件fd,专门用于
// 事件通知。
// • EventLoop模块内部包含有一个Poller对象:用于进行描述符的IO事件监控。
// • EventLoop模块内部包含有一个TimerQueue对象:用于进行定时任务的管理。
// • EventLoop模块内部包含有一个PendingTask队列:组件使用者将对Connection进行的所有操作,
// 都加入到任务队列中,由EventLoop模块进行管理,并在EventLoop对应的线程中进行执行。
// • 每一个Connection对象都会绑定到一个EventLoop上,这样能保证对这个连接的所有操作都是在
// 一个线程中完成的。
// 具体操作流程:
// 1. 通过Poller模块对当前模块管理内的所有描述符进行IO事件监控,有描述符事件就绪后,通过描述
// 符对应的Channel进行事件处理。
// 2. 所有就绪的描述符IO事件处理完毕后,对任务队列中的所有操作顺序进行执行。
// 3. 由于epoll的事件监控,有可能会因为没有事件到来而持续阻塞,导致任务队列中的任务不能及时得
// 到执行,因此创建了eventfd,添加到Poller的事件监控中,用于实现每次向任务队列添加任务的时
// 候,通过向eventfd写入数据来唤醒epoll的阻塞。
整体架构流程
既然负责事件和任务的管理,那么就需要timerwheel(负责管理定时任务)和poller(负责fd对应事件的监控);
成员定义:
class EventLoop
{
private:using Functor = std::function<void()>;// 用于判断当前是哪个线程,调用的事件是不是在自己的线程内,在直接执行,不在放到对方的任务队列中std::thread::id _thread_id; // 线程IDint _event_fd; // eventfd唤醒IO事件监控有可能导致的阻塞std::unique_ptr<Channel> _event_channel;std::vector<Functor> _tasks; // 任务池std::mutex _mutex; // 实现任务池操作的线程安全Poller _poller; // 进行所有描述符的事件监控TimerWheel _timer_wheel; // 定时器模块
};
因为后面是主线程负责套接字连接的管理,从线程负责连接fd和任务等管理,所以需要_thread_id;
构造函数
// 执行任务池中的所有任务void RunAllTask(){// 将任务池里的任务全部取出来,然后一一执行std::vector<Functor> functor;{std::unique_lock<mutex> lock(_mutex);_tasks.swap(functor);}for (auto f : functor){f();}return;}static int CreateEventFd(){int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);if (efd < 0){ERR_LOG("CREATE EVENTFD FAILED!!");abort(); // 让程序异常退出}return efd;}void ReadEventfd(){uint64_t res = 0;int ret = read(_event_fd, &res, sizeof(res));if (ret < 0){// EINTR -- 被信号打断; EAGAIN -- 表示无数据可读if (errno == EINTR || errno == EAGAIN){return;}ERR_LOG("READ EVENTFD FAILED!");abort();}return;}// 存在的意义是触发poll wait,让其执行任务队列的事件// 在执行任务队列执行先执行HandleEvent的读,从红黑树中读走这个事件事件通知 然后ReadEventfdvoid WeakUpEventFd(){uint64_t val = 1;int ret = write(_event_fd, &val, sizeof(val));if (ret < 0){if (errno == EINTR){return;}ERR_LOG("READ EVENTFD FAILED!");abort();}return;}
public:
EventLoop() : _thread_id(std::this_thread::get_id()), // 当前线程的id_event_fd(CreateEventFd()), // eventfd,事件的通知_event_channel(new Channel(this, _event_fd)),_timer_wheel(this){// 给eventfd添加可读事件回调函数,读取eventfd事件通知次数_event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventfd, this));// 启动eventfd的读事件监控_event_channel->EnableRead();DBG_LOG("_event_fd:%d ",_event_fd);}
class EventLoop
{
private:using Functor = std::function<void()>;// 用于判断当前是哪个线程,调用的事件是不是在自己的线程内,在直接执行,不在放到对方的任务队列中std::thread::id _thread_id; // 线程IDint _event_fd; // eventfd唤醒IO事件监控有可能导致的阻塞std::unique_ptr<Channel> _event_channel;std::vector<Functor> _tasks; // 任务池std::mutex _mutex; // 实现任务池操作的线程安全Poller _poller; // 进行所有描述符的事件监控TimerWheel _timer_wheel; // 定时器模块
public:// 执行任务池中的所有任务void RunAllTask(){// 将任务池里的任务全部取出来,然后一一执行std::vector<Functor> functor;{std::unique_lock<mutex> lock(_mutex);_tasks.swap(functor);}for (auto f : functor){f();}return;}static int CreateEventFd(){int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);if (efd < 0){ERR_LOG("CREATE EVENTFD FAILED!!");abort(); // 让程序异常退出}return efd;}void ReadEventfd(){uint64_t res = 0;int ret = read(_event_fd, &res, sizeof(res));if (ret < 0){// EINTR -- 被信号打断; EAGAIN -- 表示无数据可读if (errno == EINTR || errno == EAGAIN){return;}ERR_LOG("READ EVENTFD FAILED!");abort();}return;}// 存在的意义是触发poll wait,让其执行任务队列的事件// 在执行任务队列执行先执行HandleEvent的读,从红黑树中读走这个事件事件通知 然后ReadEventfdvoid WeakUpEventFd(){uint64_t val = 1;int ret = write(_event_fd, &val, sizeof(val));if (ret < 0){if (errno == EINTR){return;}ERR_LOG("READ EVENTFD FAILED!");abort();}return;}public:EventLoop() : _thread_id(std::this_thread::get_id()), // 当前线程的id_event_fd(CreateEventFd()), // eventfd,事件的通知_event_channel(new Channel(this, _event_fd)),_timer_wheel(this){// 给eventfd添加可读事件回调函数,读取eventfd事件通知次数_event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventfd, this));// 启动eventfd的读事件监控_event_channel->EnableRead();DBG_LOG("_event_fd:%d ",_event_fd);}// 三步走--事件监控-》就绪事件处理-》执行任务void Start(){while (1){// 1. 事件监控,// 用于将poll wait到的事件拿到手执行std::vector<Channel *> actives;_poller.Poll(&actives);// 2. 事件处理。// 执行poll wait到的事件,这个事件是用户及上层设置到channel里面的函数// 体现了回调函数的作用for (auto &channel : actives){channel->HandleEvent();}// 3. 执行任务RunAllTask();}}// 用于判断当前线程是否是EventLoop对应的线程;bool IsInLoop(){return (_thread_id == std::this_thread::get_id());}// 将操作压入任务池,只有连接关闭才调用,和其他线程任务void QueueInLoop(const Functor &cb){DBG_LOG("QueueInLoop,关闭任务");{std::unique_lock<std::mutex> _lock(_mutex);_tasks.push_back(cb);}// 唤醒有可能因为没有事件就绪,而导致的epoll阻塞;// 其实就是给eventfd写入一个数据,eventfd就会触发可读事件WeakUpEventFd();}void AssertInLoop(){assert(_thread_id == std::this_thread::get_id());}void RunInLoop(const Functor &cb){if (IsInLoop()){return cb();}return QueueInLoop(cb);}// 添加/修改描述符的事件监控,添加和修改红黑树void UpdateEvent(Channel *channel){return _poller.UpdateEvent(channel);}// 移除描述符的监控,从红黑树移除void RemoveEvent(Channel *channel){return _poller.RemoveEvent(channel);}// 提供给上层使用的,下层是timerwheel里面的 TimerAddvoid TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb){return _timer_wheel.TimerAdd(id, delay, cb);}// 提供给上层使用的,下层是timerwheel里面的 TimerRefreshvoid TimerRefresh(uint64_t id){return _timer_wheel.TimerRefresh(id);}// 提供给上层使用的,下层是timerwheel里面的TimerCancelvoid TimerCancel(uint64_t id){return _timer_wheel.TimerCancel(id);}// 提供给上层使用的,下层是timerwheel里面的hastimerbool HasTimer(uint64_t id){return _timer_wheel.HasTimer(id);}
};
tcpserver.cc
// 调试eventloop版本
#include<iostream>#include <time.h>
#include "../EventLoop.hpp"
#include"../Socket.hpp"
using namespace std;
void Close(Channel *ch);
void Read(Channel *ch)
{int fd = ch->Fd();INF_LOG( "触发了读事件sock: %d " , fd ); char buffer[1024] = {0};int ret = recv(fd, buffer, sizeof(buffer), 0);if (ret < 0){Close(ch);return;}std::cout << buffer << std::endl;ch->EnableWrite();
}
void Write(Channel *ch)
{int fd = ch->Fd();cout << "触发了写事件sock: " << fd << endl;char buffer[1024] = "天气不错";int ret = send(fd, buffer, sizeof(buffer), 0);ch->DisableWrite();if (ret < 0){Close(ch);return;}
}
void Close(Channel *ch)
{INF_LOG( "触发了关闭事件,由于长时间未得到连接" );ch->Remove();delete ch;
}
void Error(Channel *ch)
{Close(ch);
}
void Event(EventLoop *loop, Channel *channel, uint64_t timerid)
{int fd = channel->Fd();INF_LOG( "fd: %d 触发了event回调函数",fd);loop->TimerRefresh(timerid);
}void Accept(EventLoop *loop, Channel *channel)
{cout << "触发了listsock的读事件" << endl;int fd = channel->Fd();int newfd = accept(fd, nullptr, nullptr);cout << "从fd: " << fd << " 读新连接newfd: " << newfd << endl;if (newfd < 0){cout << "连接出错" << endl;return;}uint64_t timerid = rand() % 10000;// v2修改了bind的绑定错误,导致无法读取调用事件出错Channel *newchannel = new Channel(loop, newfd);newchannel->SetReadCallback(std::bind(Read, newchannel));newchannel->SetWriteCallback(std::bind(Write, newchannel));newchannel->SetCloseCallback(std::bind(Close, newchannel));newchannel->SetErrorCallback(std::bind(Error, newchannel));newchannel->SetEventCallback(std::bind(Event, loop, newchannel, timerid));// 非活跃连接超时任务loop->TimerAdd(timerid, 10, std::bind(Close, newchannel));newchannel->EnableRead();
}
int main()
{// 是这个是把// fd3 loop fd4 epoll fd5 timerfd fd6 sockEventLoop loop; // 每一个EvenLoop绑定一个线程cout << "创建了loop" << endl;// EvenLoop() : _thread_id(std::this_thread::get_id()),// _event_fd(CreateEventFd()),// _event_channel(new Channel(this, _event_fd)),// _timer_weel(this)// 创建了一个eventfd事件通知机制,channel(管道)事件集合,负责从下放网络层获取数据,交付给上端// timer_weel时间轮,创建定时器,1秒触发一次,剩下的看不出来,先向下看代码srand(time(nullptr));// 创建服务器Socket list_sock; // 创建套接字list_sock.CreateServer(8080);// 为list_sock第一channelChannel channel(&loop, list_sock.Fd()); // 没有管理事件// 把list_sock的事件、fd放到channel中,channel进行封装// 由于list_sock主要任务是读fd中连接的fd,从而获取连接,那么写一个函数,专门负责连接// 有了函数,为了效率我们不能主动去调用它,这样会浪费io,我们将他的事件通过epoll触发的方式调用channel.SetReadCallback(std::bind(Accept, &loop, &channel)); // 设置事件// 把它的读事件打开,打开不是只在channel中打开,channel只负责记录它是什么事件,不负责监控,要想监控它的事件就哟啊让poller知道// 只负责打开监控,但是不负责监控channel.EnableRead(); // 通过epoll_ctl将fd添加到epfd,op负责处理是添加还是修改,event负责监听什么事件参数// 创建客服端while (true){// 负责监控loop.Start();sleep(1);}return 0;
}
运行流程
小结
总而言之,eventloop就是一个事件管理和任务管理的管家,负责管理poller的监控事件和任务池里面任务的调用和任务入池等问题