当前位置: 首页 > news >正文

模仿muduo库——Eventloop

muduo库eventloop所需模块设计-CSDN博客

上篇博客我们已近把它所需的模块实现,现在来实现eventloop

文章目录

    • 概要
    • 整体架构流程
    • 技术名词解释
    • 技术细节
    • 小结

概要

EventLoop 是 muduo 实现高并发网络编程的核心调度器,简单说就是一个负责事件管理和任务的调度的模块,它负责事件的监控,事件的分发,事件的调度,什么时候调用什么任务和什么事件就绪了需要被调用。

而我们的任务调度需要要分线程,现在还体现不出来,只有定时任务结束了放进任务池,和别人调用我的事件时,才放进进程池。

比如:

你自己想要说活, 那么直接说就可以了,而别人想让你说活,他不能帮你说,只能告诉你,让你说,你把当前说的那句话说完再回应他让你说的。

// EventLoop模块:
// EventLoop模块可以理解就是我们上边所说的Reactor模块,它是对Poller模块,TimerQueue模块,
// Socket模块的一个整体封装,进行所有描述符的事件监控。
// EventLoop模块必然是一个对象对应一个线程的模块,线程内部的目的就是运行EventLoop的启动函
// 数。
// EventLoop模块为了保证整个服务器的线程安全问题,因此要求使用者对于Connection的所有操作一
// 定要在其对应的EventLoop线程内完成,不能在其他线程中进行(比如组件使用者使用Connection发
// 送数据,以及关闭连接这种操作)。
// EventLoop模块保证自己内部所监控的所有描述符,都要是活跃连接,非活跃连接就要及时释放避免
// 资源浪费。
// • EventLoop模块内部包含有一个eventfd:eventfd其实就是linux内核提供的一个事件fd,专门用于
// 事件通知。
// • EventLoop模块内部包含有一个Poller对象:用于进行描述符的IO事件监控。
// • EventLoop模块内部包含有一个TimerQueue对象:用于进行定时任务的管理。
// • EventLoop模块内部包含有一个PendingTask队列:组件使用者将对Connection进行的所有操作,
// 都加入到任务队列中,由EventLoop模块进行管理,并在EventLoop对应的线程中进行执行。
// • 每一个Connection对象都会绑定到一个EventLoop上,这样能保证对这个连接的所有操作都是在
// 一个线程中完成的。
// 具体操作流程:
// 1. 通过Poller模块对当前模块管理内的所有描述符进行IO事件监控,有描述符事件就绪后,通过描述
// 符对应的Channel进行事件处理。
// 2. 所有就绪的描述符IO事件处理完毕后,对任务队列中的所有操作顺序进行执行。
// 3. 由于epoll的事件监控,有可能会因为没有事件到来而持续阻塞,导致任务队列中的任务不能及时得
// 到执行,因此创建了eventfd,添加到Poller的事件监控中,用于实现每次向任务队列添加任务的时
// 候,通过向eventfd写入数据来唤醒epoll的阻塞。


整体架构流程

既然负责事件和任务的管理,那么就需要timerwheel(负责管理定时任务)和poller(负责fd对应事件的监控);
成员定义:

class EventLoop
{
private:using Functor = std::function<void()>;// 用于判断当前是哪个线程,调用的事件是不是在自己的线程内,在直接执行,不在放到对方的任务队列中std::thread::id _thread_id; // 线程IDint _event_fd;              // eventfd唤醒IO事件监控有可能导致的阻塞std::unique_ptr<Channel> _event_channel;std::vector<Functor> _tasks; // 任务池std::mutex _mutex;           // 实现任务池操作的线程安全Poller _poller;              // 进行所有描述符的事件监控TimerWheel _timer_wheel;     // 定时器模块
};

因为后面是主线程负责套接字连接的管理,从线程负责连接fd和任务等管理,所以需要_thread_id;

构造函数

 // 执行任务池中的所有任务void RunAllTask(){// 将任务池里的任务全部取出来,然后一一执行std::vector<Functor> functor;{std::unique_lock<mutex> lock(_mutex);_tasks.swap(functor);}for (auto f : functor){f();}return;}static int CreateEventFd(){int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);if (efd < 0){ERR_LOG("CREATE EVENTFD FAILED!!");abort(); // 让程序异常退出}return efd;}void ReadEventfd(){uint64_t res = 0;int ret = read(_event_fd, &res, sizeof(res));if (ret < 0){// EINTR -- 被信号打断;   EAGAIN -- 表示无数据可读if (errno == EINTR || errno == EAGAIN){return;}ERR_LOG("READ EVENTFD FAILED!");abort();}return;}// 存在的意义是触发poll wait,让其执行任务队列的事件// 在执行任务队列执行先执行HandleEvent的读,从红黑树中读走这个事件事件通知 然后ReadEventfdvoid WeakUpEventFd(){uint64_t val = 1;int ret = write(_event_fd, &val, sizeof(val));if (ret < 0){if (errno == EINTR){return;}ERR_LOG("READ EVENTFD FAILED!");abort();}return;}
public:
EventLoop() : _thread_id(std::this_thread::get_id()), // 当前线程的id_event_fd(CreateEventFd()),             // eventfd,事件的通知_event_channel(new Channel(this, _event_fd)),_timer_wheel(this){// 给eventfd添加可读事件回调函数,读取eventfd事件通知次数_event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventfd, this));// 启动eventfd的读事件监控_event_channel->EnableRead();DBG_LOG("_event_fd:%d ",_event_fd);}

class EventLoop
{
private:using Functor = std::function<void()>;// 用于判断当前是哪个线程,调用的事件是不是在自己的线程内,在直接执行,不在放到对方的任务队列中std::thread::id _thread_id; // 线程IDint _event_fd;              // eventfd唤醒IO事件监控有可能导致的阻塞std::unique_ptr<Channel> _event_channel;std::vector<Functor> _tasks; // 任务池std::mutex _mutex;           // 实现任务池操作的线程安全Poller _poller;              // 进行所有描述符的事件监控TimerWheel _timer_wheel;     // 定时器模块
public:// 执行任务池中的所有任务void RunAllTask(){// 将任务池里的任务全部取出来,然后一一执行std::vector<Functor> functor;{std::unique_lock<mutex> lock(_mutex);_tasks.swap(functor);}for (auto f : functor){f();}return;}static int CreateEventFd(){int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);if (efd < 0){ERR_LOG("CREATE EVENTFD FAILED!!");abort(); // 让程序异常退出}return efd;}void ReadEventfd(){uint64_t res = 0;int ret = read(_event_fd, &res, sizeof(res));if (ret < 0){// EINTR -- 被信号打断;   EAGAIN -- 表示无数据可读if (errno == EINTR || errno == EAGAIN){return;}ERR_LOG("READ EVENTFD FAILED!");abort();}return;}// 存在的意义是触发poll wait,让其执行任务队列的事件// 在执行任务队列执行先执行HandleEvent的读,从红黑树中读走这个事件事件通知 然后ReadEventfdvoid WeakUpEventFd(){uint64_t val = 1;int ret = write(_event_fd, &val, sizeof(val));if (ret < 0){if (errno == EINTR){return;}ERR_LOG("READ EVENTFD FAILED!");abort();}return;}public:EventLoop() : _thread_id(std::this_thread::get_id()), // 当前线程的id_event_fd(CreateEventFd()),             // eventfd,事件的通知_event_channel(new Channel(this, _event_fd)),_timer_wheel(this){// 给eventfd添加可读事件回调函数,读取eventfd事件通知次数_event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventfd, this));// 启动eventfd的读事件监控_event_channel->EnableRead();DBG_LOG("_event_fd:%d ",_event_fd);}// 三步走--事件监控-》就绪事件处理-》执行任务void Start(){while (1){// 1. 事件监控,// 用于将poll wait到的事件拿到手执行std::vector<Channel *> actives;_poller.Poll(&actives);// 2. 事件处理。// 执行poll wait到的事件,这个事件是用户及上层设置到channel里面的函数// 体现了回调函数的作用for (auto &channel : actives){channel->HandleEvent();}// 3. 执行任务RunAllTask();}}// 用于判断当前线程是否是EventLoop对应的线程;bool IsInLoop(){return (_thread_id == std::this_thread::get_id());}// 将操作压入任务池,只有连接关闭才调用,和其他线程任务void QueueInLoop(const Functor &cb){DBG_LOG("QueueInLoop,关闭任务");{std::unique_lock<std::mutex> _lock(_mutex);_tasks.push_back(cb);}// 唤醒有可能因为没有事件就绪,而导致的epoll阻塞;// 其实就是给eventfd写入一个数据,eventfd就会触发可读事件WeakUpEventFd();}void AssertInLoop(){assert(_thread_id == std::this_thread::get_id());}void RunInLoop(const Functor &cb){if (IsInLoop()){return cb();}return QueueInLoop(cb);}// 添加/修改描述符的事件监控,添加和修改红黑树void UpdateEvent(Channel *channel){return _poller.UpdateEvent(channel);}// 移除描述符的监控,从红黑树移除void RemoveEvent(Channel *channel){return _poller.RemoveEvent(channel);}// 提供给上层使用的,下层是timerwheel里面的  TimerAddvoid TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb){return _timer_wheel.TimerAdd(id, delay, cb);}// 提供给上层使用的,下层是timerwheel里面的 TimerRefreshvoid TimerRefresh(uint64_t id){return _timer_wheel.TimerRefresh(id);}// 提供给上层使用的,下层是timerwheel里面的TimerCancelvoid TimerCancel(uint64_t id){return _timer_wheel.TimerCancel(id);}// 提供给上层使用的,下层是timerwheel里面的hastimerbool HasTimer(uint64_t id){return _timer_wheel.HasTimer(id);}
};


tcpserver.cc

// 调试eventloop版本
#include<iostream>#include <time.h>
#include "../EventLoop.hpp"
#include"../Socket.hpp"
using namespace std;
void Close(Channel *ch);
void Read(Channel *ch)
{int fd = ch->Fd();INF_LOG( "触发了读事件sock: %d " , fd ); char buffer[1024] = {0};int ret = recv(fd, buffer, sizeof(buffer), 0);if (ret < 0){Close(ch);return;}std::cout << buffer << std::endl;ch->EnableWrite();
}
void Write(Channel *ch)
{int fd = ch->Fd();cout << "触发了写事件sock: " << fd << endl;char buffer[1024] = "天气不错";int ret = send(fd, buffer, sizeof(buffer), 0);ch->DisableWrite();if (ret < 0){Close(ch);return;}
}
void Close(Channel *ch)
{INF_LOG( "触发了关闭事件,由于长时间未得到连接" );ch->Remove();delete ch;
}
void Error(Channel *ch)
{Close(ch);
}
void Event(EventLoop *loop, Channel *channel, uint64_t timerid)
{int fd = channel->Fd();INF_LOG( "fd: %d  触发了event回调函数",fd);loop->TimerRefresh(timerid);
}void Accept(EventLoop *loop, Channel *channel)
{cout << "触发了listsock的读事件" << endl;int fd = channel->Fd();int newfd = accept(fd, nullptr, nullptr);cout << "从fd: " << fd << " 读新连接newfd: " << newfd << endl;if (newfd < 0){cout << "连接出错" << endl;return;}uint64_t timerid = rand() % 10000;// v2修改了bind的绑定错误,导致无法读取调用事件出错Channel *newchannel = new Channel(loop, newfd);newchannel->SetReadCallback(std::bind(Read, newchannel));newchannel->SetWriteCallback(std::bind(Write, newchannel));newchannel->SetCloseCallback(std::bind(Close, newchannel));newchannel->SetErrorCallback(std::bind(Error, newchannel));newchannel->SetEventCallback(std::bind(Event, loop, newchannel, timerid));// 非活跃连接超时任务loop->TimerAdd(timerid, 10, std::bind(Close, newchannel));newchannel->EnableRead();
}
int main()
{// 是这个是把// fd3 loop      fd4 epoll     fd5 timerfd     fd6 sockEventLoop loop; // 每一个EvenLoop绑定一个线程cout << "创建了loop" << endl;// EvenLoop() : _thread_id(std::this_thread::get_id()),//               _event_fd(CreateEventFd()),//             _event_channel(new Channel(this, _event_fd)),//           _timer_weel(this)// 创建了一个eventfd事件通知机制,channel(管道)事件集合,负责从下放网络层获取数据,交付给上端// timer_weel时间轮,创建定时器,1秒触发一次,剩下的看不出来,先向下看代码srand(time(nullptr));// 创建服务器Socket list_sock; // 创建套接字list_sock.CreateServer(8080);// 为list_sock第一channelChannel channel(&loop, list_sock.Fd()); // 没有管理事件// 把list_sock的事件、fd放到channel中,channel进行封装// 由于list_sock主要任务是读fd中连接的fd,从而获取连接,那么写一个函数,专门负责连接// 有了函数,为了效率我们不能主动去调用它,这样会浪费io,我们将他的事件通过epoll触发的方式调用channel.SetReadCallback(std::bind(Accept, &loop, &channel)); // 设置事件// 把它的读事件打开,打开不是只在channel中打开,channel只负责记录它是什么事件,不负责监控,要想监控它的事件就哟啊让poller知道// 只负责打开监控,但是不负责监控channel.EnableRead(); // 通过epoll_ctl将fd添加到epfd,op负责处理是添加还是修改,event负责监听什么事件参数// 创建客服端while (true){// 负责监控loop.Start();sleep(1);}return 0;
}


运行流程


小结

总而言之,eventloop就是一个事件管理和任务管理的管家,负责管理poller的监控事件和任务池里面任务的调用和任务入池等问题

http://www.dtcms.com/a/424669.html

相关文章:

  • 住房和城市建设部网站网站界面设计规则
  • 搜狗网站推广重庆网站推广计划
  • 改进的自制 VNA
  • android 自定义Dialog多种方式
  • 微网站免费注册电子商务建设与网站规划
  • 快递网站怎么做的加盟招商网站建设方案
  • Spring框架面试问题及详细回答
  • 前端如何做响应式网站wordpress数据清理插件
  • 免费网站服务器安全中国查公司的网站
  • 电商带货视频:商用音乐素材网站选择与参考
  • 哈尔滨网站建设方案外包免费的编程软件下载
  • 九、kubernetes 1.29 之 service-Endpoint
  • centos 如何建立网站网站建设公司线下推广
  • JavaWeb--day13--SpringBoot原理
  • 门户网站管理建设wordpress 信息分析
  • 网站建设市场报价godaddy空间建立wordpress
  • io的异步处理io_uring,实现io_uring_tcp_server
  • 网站分享平台免费制作网络商城网站
  • 网站开发费用会计分录做好档案整理及网站建设
  • webpack学习
  • 做360网站快速排名软件10分钟免费建网站
  • 找人做网站毕业设计聚美优品网站建设分析
  • YOLO入门教程(番外):为什么激活函数如此关键。
  • 东营网站建设课程定位优化品牌设计网站大全
  • wordpress建站seo商城网站 搭建
  • 手机网站设计教育类模板wordpress 当前用户所有评论
  • 宝塔 crontab 开机启动任务位置
  • 天津企业模板建站网站主页设计注意点
  • spyglass waive使用
  • SAMCO与印度理工学院德里分校签署合作备忘录