当前位置: 首页 > news >正文

【高并发服务器】十一、Acceptor监听套接字模块 LoopThreadPool线程池模块

文章目录

  • Ⅰ. `Acceptor`监听套接字模块设计思想
  • Ⅱ. 具体实现
  • Ⅲ. 测试代码
    • 1、服务端代码
    • 2、客户端代码
    • 3、执行结果
  • Ⅳ. 线程模块的意义
  • Ⅴ. **`LoopThread`** 线程模块
    • 1、设计思想
    • 2、完整实现
    • 3、服务端测试代码
  • Ⅵ. `LoopThreadPool`线程管理模块
    • 1、设计思想
    • 2、完整实现
    • 3、服务端测试代码

在这里插入图片描述

Ⅰ. Acceptor监听套接字模块设计思想

​ 细心的同学该发现,该模块就是之前测试代码中监听套接字负责的模块,我们要将其单独拎出来进行管理!

​ 该模块是对 Socket 模块(实现监听套接字的操作)、Channel 模块(实现套接字 IO 事件管理以及事件触发后的回调处理)的一个整体封装,实现了 对一个监听套接字的整体的管理

​ 具体处理流程如下:

  1. 创建一个监听套接字。
  2. 启动读事件监控。
  3. 等到读事件触发后,就获取新连接。
  4. 然后调用新连接获取成功后的回调函数进行处理。(需要注意的是,事件回调处理函数的设置是由服务器来指定的,该模块就是负责提供设置回调函数的接口!)

在这里插入图片描述

​ 下面给出该模块的主体框架:

using AcceptCallBack = std::function<void(int)>;
class Acceptor
{
private:Socket _socket;   // 监听套接字的描述符EventLoop* _loop; // 用于对监听套接字进行事件监控Channel _channel; // 用于对监听套接字进行事件管理AcceptCallBack _accept_callback; // 获取到新连接之后的回调处理函数
public:Acceptor();// 启动监听套接字的可读事件监控void start_listen();// 设置监听套接字的可读事件回调处理函数void set_accept_callback(const AcceptCallBack& cb);
private:// 监听套接字的可读事件回调处理函数--即获取新连接,调用_accept_callback函数进行新连接处理void read_handle();// 包装一下创建套接字的过程int create_socket(uint16_t port);
};

Ⅱ. 具体实现

​ 实现其实并不难,最主要的一个细节就是 启动监听套接字的可读事件监控不能放在构造函数中进行!因为此时可能还没设置回调函数就已经有新连接到来,那么这些新连接是得不到处理的,因为回调函数还没设置,就会造成内存泄漏问题,所以我们必须搞一个单独的函数 start_listen() 来启动监听套接字的可读事件监控。

using AcceptCallBack = std::function<void(int)>;
class Acceptor
{
private:Socket _socket;   // 监听套接字的描述符EventLoop* _loop; // 用于对监听套接字进行事件监控Channel _channel; // 用于对监听套接字进行事件管理AcceptCallBack _accept_callback; // 获取到新连接之后的回调处理函数
public:Acceptor(EventLoop* loop, uint16_t port): _socket(create_socket(port)), _loop(loop), _channel(_socket.get_fd(), _loop){// 设置监听套接字的可读事件监控// 但是注意不能先启动可读事件监控,因为此时外部可能还没设置_accept_callback函数的回调!!!_channel.set_read_callback(std::bind(&Acceptor::read_handle, this));}// 启动监听套接字的可读事件监控void start_listen(){// 启动可读事件监控应该由外部控制顺序,必须在set_accept_callback()调用之后再去启动// 如果在构造函数中启动的话,此时如果还没设置回调函数就已经有新连接到来// 那么这些新连接是得不到处理的,因为回调函数还没设置,就会造成内存泄漏问题!_channel.enable_read();}// 设置监听套接字的可读事件回调处理函数void set_accept_callback(const AcceptCallBack& cb) { _accept_callback = cb; }
private:// 监听套接字的可读事件回调处理函数--即获取新连接,调用_accept_callback函数进行新连接处理void read_handle(){int newfd = _socket.Accept();if(newfd < 0)return;if(_accept_callback)_accept_callback(newfd);}// 包装一下创建套接字的过程int create_socket(uint16_t port){bool ret = _socket.create_server(port);assert(ret == true); // 直接断言,如果创建监听套接字失败了,那么其它的都没得说!return _socket.get_fd();}
};

Ⅲ. 测试代码

1、服务端代码

​ 有了监听套接字模块之后,我们就不需要自己创建一个 SocketChannel 对象了,而是直接使用 Acceptor 对象即可!

#include "../source/server.hpp"uint64_t id = 1; // 连接id
std::unordered_map<uint64_t, ConnectionPtr> connections; // 连接管理表
EventLoop loop; // 一个EventLoop对象,后面会用线程池来代替,这里作为全局变量使用即可void connected_handle(const ConnectionPtr& cptr)
{// 这里的连接建立处理,我们就简单的打印哪个连接建立即可DLOG("new connection: %p,the id is:%d", cptr.get(), cptr->get_connection_id());
}void message_handle(const ConnectionPtr& cptr, Buffer* buf)
{// 这里的消息事件处理,我们就做简单的打印以及回响即可DLOG("接收到:%s", buf->start_of_read());buf->push_reader_back(buf->get_sizeof_read());std::string str = "lirendada 你好啊!";cptr->send_data(str.c_str(), str.size());
}void closed_handle(const ConnectionPtr& cptr)
{// 就是将连接管理表中的该连接去掉DLOG("delete connection: %p,the id is:%d", cptr.get(), cptr->get_connection_id());connections.erase(cptr->get_connection_id());
}void acceptor_callback(int sockfd)
{// 用Connection包装该新链接,并且设置回调函数ConnectionPtr cptr(new Connection(&loop, id, sockfd));cptr->set_connected_callback(std::bind(connected_handle, std::placeholders::_1));cptr->set_message_callback(std::bind(message_handle, std::placeholders::_1, std::placeholders::_2));cptr->set_server_closed_callback(std::bind(closed_handle, std::placeholders::_1)); // 注意这里是服务器模块的关闭回调,也就是去掉与该连接的联系// 启动非活跃销毁功能,并将连接设置为建立完成状态cptr->enable_inactive_release(3);cptr->connecting_to_connceted();// 最后别忘了添加到服务器的连接管理表中connections[id++] = cptr;
}int main()
{// 创建监听套接字,然后利用bind函数设置获取新连接之后的回调函数,并且启动可读监控Acceptor acceptor(&loop, 8080);acceptor.set_accept_callback(std::bind(acceptor_callback, std::placeholders::_1));acceptor.start_listen();// 启动事件监控loop.start();return 0;
}

2、客户端代码

​ 客户端的代码和之前一样不变!

#include "../source/server.hpp"int main()
{// 创建客户端套接字Socket client_sock;client_sock.create_client(8080, "127.0.0.1");// 做五次简单的发送和回响,所以会刷新五次连接for(int i = 0; i < 5; ++i){std::string str = "lirendada";client_sock.Send(str.c_str(), str.size());char buf[1024] = { 0 };client_sock.Recv(buf, sizeof(buf) - 1);DLOG("%s", buf);sleep(1);}// 进入死循环while(1) sleep(1);return 0;
}

3、执行结果

在这里插入图片描述

Ⅳ. 线程模块的意义

​ 首先,我们不可能只是让一个线程去完成所有的事情,那样子的话效率是不高的,所以肯定是要进行多线程并发处理问题的!

​ 此外,我们在项目介绍的时候还强调,一个 EventLoop 对象只能在一个线程中执行,这样子才不会有线程安全问题,当 EventLoop 模块实例化一个对象的时候,其构造函数也会初始化一个线程标识 _tid,如下所示(可以翻阅前面的笔记):

class EventLoop
{
private:std::thread::id _tid; // EventLoop对应的线程ID// ...
public:// ...
}

​ 当我们在一个运行一个操作的时候,会先判断当前线程是否处于与当前对应 EventLoop 线程中,就是通过该 _tid 进行判断的!

​ 但是有一个问题,就是如果同时创建了多个 EventLoop 对象,此时我们会分配各自的 _tid 给每个 EventLoop 对象,然后再从线程池中调出空闲的线程让 EventLoop 与其绑定,但此时其实是线程不安全的,相当于这个空窗期期间, _tid 是不可控的,所以我们 不能先创建 EventLoop 对象再分配线程

​ 解决方法其实很简单,我们只需要包装一下线程池中的线程即可,当需要创建一个 EventLoop 的时候,首先 创建线程或者从线程池中调出线程,然后通过该线程模块的入口函数实例化对应的 EventLoop 对象,这样子才能保证没有空窗期!

​ 所以我们才需要有该线程池管理模块,它分为两部分:线程模块 LoopThread(单个)、线程池管理模块 LoopThreadPool(对多个线程模块进行管理)。下面我们先从单个线程管理开始设计!

Ⅴ. LoopThread 线程模块

1、设计思想

​ 该模块其实思想很简单,就是将 threadEventLoop 整合在一起,如下所示:

  1. 创建线程。
  2. 在线程中实例化 EventLoop 对象。

​ 然后我们只需要提供一个向外返回该实例化的 EventLoop 对象即可!下面给出其主体框架:

class LoopThread
{
private:    EventLoop* _loop; // 当前EventLoop的指针,需要在线程内实例化std::thread _td;  // 当前EventLoop对应的线程/* 需要有互斥锁和条件变量来防止_loop还为空的时候被获取 */std::mutex _mtx;             std::condition_variable _cv;
public:// 构造函数,创建线程,设定线程的入口函数LoopThread();// 返回当前线程关联的EventLoop指针EventLoop* get_loop();
private:// 线程入口函数,负责实例化关联的EventLoopvoid thread_entry();
};

2、完整实现

​ 下面需要注意的就是 thread_entry() 函数中实例化 EventLoop 对象的时候,如果我们是用动态内存开辟的话,那么后面就得想办法保管好内存,防止内存泄漏问题,但是更简单的操作就是直接生成一个临时对象即可,因为我们对于 EventLoop 线程,最重要的就是执行其 start() 函数,其内部是一个死循环,当该循环结束了之后出了函数,该线程自动就被销毁,无需我们关心其是否释放!

class LoopThread
{
private:    EventLoop* _loop; // 当前EventLoop的指针,需要在线程内实例化std::thread _td;  // 当前EventLoop对应的线程/* 需要有互斥锁和条件变量来防止_loop还为空的时候被获取 */std::mutex _mtx;             // 互斥锁std::condition_variable _cv; // 条件变量
public:// 构造函数,创建线程,设定线程的入口函数LoopThread(): _td(std::thread(&LoopThread::thread_entry, this)), _loop(nullptr){}// 返回当前线程关联的EventLoop指针EventLoop* get_loop() {// 需要不为空才能返回,否则阻塞住直到不为空std::unique_lock<std::mutex> lock(_mtx);while(_loop == nullptr) _cv.wait(lock);return _loop;}
private:// 线程入口函数,负责实例化关联的EventLoopvoid thread_entry(){EventLoop tmp; // 使用临时对象而不是new的话就不用考虑指针何时去释放的问题了{// 需要进行加锁,并且创建完毕后唤醒get_loop()std::unique_lock<std::mutex> lock(_mtx);_loop = &tmp;_cv.notify_all();}// 启动EventLoop进行事件监控tmp.start();}
};

3、服务端测试代码

​ 下面我们做个小测试,改动的内容就是服务端的代码,客户端都是一样的,所以这里就不展示了!

​ 服务端无非就是让监听套接字连接作为主线程,然后获取到新连接之后,就从简单的线程池中挑出对应的线程也就是对应的 EventLoop 进行关联,其后执行内容都是在各自线程中执行的!服务端代码如下所示:

#include "../source/server.hpp"uint64_t id = 1; // 连接id
std::unordered_map<uint64_t, ConnectionPtr> connections; // 连接管理表EventLoop server_loop;                 // 用于监听套接字的EventLoop对象
std::vector<LoopThread> threadpool(2); // 模拟简单的线程池
int thread_id = 0;void connected_handle(const ConnectionPtr& cptr)
{// 这里的连接建立处理,我们就简单的打印哪个连接建立即可DLOG("new connection: %p,the id is:%d", cptr.get(), cptr->get_connection_id());
}void message_handle(const ConnectionPtr& cptr, Buffer* buf)
{// 这里的消息事件处理,我们就做简单的打印以及回响即可DLOG("接收到:%s", buf->start_of_read());buf->push_reader_back(buf->get_sizeof_read());std::string str = "lirendada 你好啊!";cptr->send_data(str.c_str(), str.size());
}void closed_handle(const ConnectionPtr& cptr)
{// 就是将连接管理表中的该连接去掉DLOG("delete connection: %p,the id is:%d", cptr.get(), cptr->get_connection_id());connections.erase(cptr->get_connection_id());
}void acceptor_callback(int sockfd)
{DLOG("我是服务器主线程,用于监听新连接!");// 用Connection包装该新链接,并且设置回调函数,其中新连接的EventLoop由线程池中的提供thread_id = (thread_id + 1) % 2;ConnectionPtr cptr(new Connection(threadpool[thread_id].get_loop(), id, sockfd));cptr->set_connected_callback(std::bind(connected_handle, std::placeholders::_1));cptr->set_message_callback(std::bind(message_handle, std::placeholders::_1, std::placeholders::_2));cptr->set_server_closed_callback(std::bind(closed_handle, std::placeholders::_1)); cptr->enable_inactive_release(3);cptr->connecting_to_connceted();connections[id++] = cptr;
}int main()
{// 创建监听套接字,然后利用bind函数设置获取新连接之后的回调函数,并且启动可读监控Acceptor acceptor(&server_loop, 8080);acceptor.set_accept_callback(std::bind(acceptor_callback, std::placeholders::_1));acceptor.start_listen();// 启动事件监控server_loop.start();return 0;
}

在这里插入图片描述

Ⅵ. LoopThreadPool线程管理模块

1、设计思想

​ 该模块的思想也很简单,就是管理所有的 LoopThread 线程对象,并且进行线程的分配工作,也就是让哪个 Connection 对象关联上哪个 EventLoop 线程中去!其所需功能如下所示:

  • 可配置线程数量
    • 需要注意的是,在我们设计的服务器中,主线程负责新连接的获取,从属线程负责新连接的事件监控以及处理
  • 对所有线程进行管理
  • 提供线程分配的功能
    • 当主线程获取到了一个新连接,就需要将新连接挂到从属线程上进行事件监控以及处理!这时候有两种选择:
      1. 当前没有从属线程的话,则直接分配给主线程进行处理。
      2. 如果存在多个从属线程的话,则采用轮转分配的思想,进行线程的分配,也就是将对应的线程的 EventLoop 指针交给对应的连接也就是 Connection 对象进行关联!

​ 下面给出该模块的主体框架:

class LoopThreadPool
{
private:EventLoop* _mainloop; // 主线程:用于监听新连接(或者没有从属线程的话,也将连接交给主线程处理)int _nums_of_subthread;               // 当前从属线程的数量std::vector<LoopThread*> _subthreads; // 存放从属线程的数组std::vector<EventLoop*> _loops;       // 存放从属线程各自绑定的EventLoop*int _next_loop_id; // 指定下一个轮转到也就是要分配的EventLoop*的下标
public:LoopThreadPool();// 设置从属线程的数量void set_nums_of_subthread(int num);// 初始化线程数组和_loops数组void initialize();// 返回一个EventLoop*,表示分配到该EventLoop*对应的线程上EventLoop* allocate_thread();
};

2、完整实现

​ 实现起来并不难,唯一要注意的就是 allocate_thread() 函数中对 _next_loop_id 的累加要进行取模操作,不然会出现越界的情况,其他的没说需要注意的!代码如下所示:

class LoopThreadPool
{
private:EventLoop* _mainloop; // 主线程:用于监听新连接(或者没有从属线程的话,也将连接交给主线程处理)int _nums_of_subthread;               // 当前从属线程的数量std::vector<LoopThread*> _subthreads; // 存放从属线程的数组std::vector<EventLoop*> _loops;       // 存放从属线程各自绑定的EventLoop*int _next_loop_id; // 指定下一个轮转到也就是要分配的EventLoop*的下标
public:LoopThreadPool(EventLoop* mainloop): _mainloop(mainloop), _nums_of_subthread(0), _next_loop_id(0){}// 设置从属线程的数量void set_nums_of_subthread(int num) { _nums_of_subthread = num; }// 初始化线程数组和_loops数组void initialize(){for(int i = 0; i < _nums_of_subthread; ++i){_subthreads.push_back(new LoopThread());_loops.push_back(_subthreads[i]->get_loop());}}// 返回一个EventLoop*,表示分配到该EventLoop*对应的线程上EventLoop* allocate_thread(){// 如果没有从属线程的话,则直接分配到主线程中处理即可if(_nums_of_subthread == 0)return _mainloop;// 否则的话返回当前轮到的从属EventLoop线程即可EventLoop* ret = _loops[_next_loop_id];_next_loop_id = (_next_loop_id + 1) % _nums_of_subthread;return ret;}
};

3、服务端测试代码

​ 下面我们创建三个从属线程,进行测试,代码无需大改动,只需要小小的调整,如下所示:

#include "../source/server.hpp"uint64_t id = 1; // 连接id
std::unordered_map<uint64_t, ConnectionPtr> connections; // 连接管理表EventLoop server_loop;
LoopThreadPool threadpool(&server_loop);void connected_handle(const ConnectionPtr& cptr)
{// 这里的连接建立处理,我们就简单的打印哪个连接建立即可DLOG("new connection: %p,the id is:%d", cptr.get(), cptr->get_connection_id());
}void message_handle(const ConnectionPtr& cptr, Buffer* buf)
{// 这里的消息事件处理,我们就做简单的打印以及回响即可DLOG("接收到:%s", buf->start_of_read());buf->push_reader_back(buf->get_sizeof_read());std::string str = "lirendada 你好啊!";cptr->send_data(str.c_str(), str.size());
}void closed_handle(const ConnectionPtr& cptr)
{// 就是将连接管理表中的该连接去掉DLOG("delete connection: %p,the id is:%d", cptr.get(), cptr->get_connection_id());connections.erase(cptr->get_connection_id());
}void acceptor_callback(int sockfd)
{DLOG("我是服务器主线程,用于监听新连接!");// 用Connection包装该新链接,并且设置回调函数,其中新连接的EventLoop由线程池模块提供ConnectionPtr cptr(new Connection(threadpool.allocate_thread(), id, sockfd));cptr->set_connected_callback(std::bind(connected_handle, std::placeholders::_1));cptr->set_message_callback(std::bind(message_handle, std::placeholders::_1, std::placeholders::_2));cptr->set_server_closed_callback(std::bind(closed_handle, std::placeholders::_1)); // 启动非活跃销毁功能,并将连接设置为建立完成状态cptr->enable_inactive_release(3);cptr->connecting_to_connceted();// 最后别忘了添加到服务器的连接管理表中connections[id++] = cptr;
}int main()
{// 初始化一下线程池管理模块threadpool.set_nums_of_subthread(3);threadpool.initialize();// 创建监听套接字,然后利用bind函数设置获取新连接之后的回调函数,并且启动可读监控Acceptor acceptor(&server_loop, 8080);acceptor.set_accept_callback(std::bind(acceptor_callback, std::placeholders::_1));acceptor.start_listen();// 启动事件监控server_loop.start();return 0;
}

在这里插入图片描述

在这里插入图片描述

http://www.dtcms.com/a/540733.html

相关文章:

  • uniapp vue3 点击跳转外部网页
  • 基于“开源AI智能名片链动2+1模式S2B2C商城小程序”的会员制培养策略研究
  • 做家居网站设计o2o型网站
  • IDEA推送github,身份认证错误:Cannot assign requested address: getsockopt 解决方法
  • Rust Actix-Web框架源码解析:基于Actor模型的高性能Web开发
  • LLM辅助轻量级MES编排系统低代码开发方案介绍
  • 网站国际网络备案号百度收录提交之后如何让网站更快的展示出来
  • 学习Linux——组管理
  • 文件批量重命名(办公)脚本
  • 学习日记22:Adaptive Rotated Convolution for Rotated Object Detection
  • 十二要素应用
  • 同步、异步、阻塞、非阻塞的区别
  • 网站建设技术培训电商基地推广
  • 使用注解将日志存入Elasticsearch
  • 【STM32】WDG看门狗
  • 无锡市建设安全监督网站全国统一核酸检测价格
  • 做网站购买备案域名织梦高端html5网站建设工作室网络公司网站模板
  • 2.CSS3.(4).html
  • 记一次诡异的“偶发 404”排查:CDN 回源到 OSS 导致 REST API 失败
  • C++笔记(面向对象)类模板
  • Selenium IDE下载和安装教程(附安装包)
  • Quartz框架实现根据设置的cron表达式进行定时任务执行
  • linux20 线程同步--信号量
  • 内核的文件预取逻辑及blockdev的相关配置
  • [特殊字符] Web 字体裁剪优化实践:把 42MB 字体包瘦到 1.6MB
  • 平滑过渡,破解多库并存:浙人医基于金仓KFS的医疗信创实战解析
  • 做经营性的网站需要注册什么条件网站构思
  • Answer企业社区实战:零成本搭建技术问答平台,远程协作效率提升300%!
  • “听书”比“看书”更省力?
  • 大连 手机网站案例网站定位方案