【高并发服务器】十一、Acceptor监听套接字模块 LoopThreadPool线程池模块
文章目录
- Ⅰ. `Acceptor`监听套接字模块设计思想
- Ⅱ. 具体实现
- Ⅲ. 测试代码
- 1、服务端代码
- 2、客户端代码
- 3、执行结果
- Ⅳ. 线程模块的意义
- Ⅴ. **`LoopThread`** 线程模块
- 1、设计思想
- 2、完整实现
- 3、服务端测试代码
- Ⅵ. `LoopThreadPool`线程管理模块
- 1、设计思想
- 2、完整实现
- 3、服务端测试代码
Ⅰ. Acceptor监听套接字模块设计思想
细心的同学该发现,该模块就是之前测试代码中监听套接字负责的模块,我们要将其单独拎出来进行管理!
该模块是对 Socket 模块(实现监听套接字的操作)、Channel 模块(实现套接字 IO 事件管理以及事件触发后的回调处理)的一个整体封装,实现了 对一个监听套接字的整体的管理。
具体处理流程如下:
- 创建一个监听套接字。
- 启动读事件监控。
- 等到读事件触发后,就获取新连接。
- 然后调用新连接获取成功后的回调函数进行处理。(需要注意的是,事件回调处理函数的设置是由服务器来指定的,该模块就是负责提供设置回调函数的接口!)

下面给出该模块的主体框架:
using AcceptCallBack = std::function<void(int)>;
class Acceptor
{
private:Socket _socket; // 监听套接字的描述符EventLoop* _loop; // 用于对监听套接字进行事件监控Channel _channel; // 用于对监听套接字进行事件管理AcceptCallBack _accept_callback; // 获取到新连接之后的回调处理函数
public:Acceptor();// 启动监听套接字的可读事件监控void start_listen();// 设置监听套接字的可读事件回调处理函数void set_accept_callback(const AcceptCallBack& cb);
private:// 监听套接字的可读事件回调处理函数--即获取新连接,调用_accept_callback函数进行新连接处理void read_handle();// 包装一下创建套接字的过程int create_socket(uint16_t port);
};
Ⅱ. 具体实现
实现其实并不难,最主要的一个细节就是 启动监听套接字的可读事件监控不能放在构造函数中进行!因为此时可能还没设置回调函数就已经有新连接到来,那么这些新连接是得不到处理的,因为回调函数还没设置,就会造成内存泄漏问题,所以我们必须搞一个单独的函数 start_listen() 来启动监听套接字的可读事件监控。
using AcceptCallBack = std::function<void(int)>;
class Acceptor
{
private:Socket _socket; // 监听套接字的描述符EventLoop* _loop; // 用于对监听套接字进行事件监控Channel _channel; // 用于对监听套接字进行事件管理AcceptCallBack _accept_callback; // 获取到新连接之后的回调处理函数
public:Acceptor(EventLoop* loop, uint16_t port): _socket(create_socket(port)), _loop(loop), _channel(_socket.get_fd(), _loop){// 设置监听套接字的可读事件监控// 但是注意不能先启动可读事件监控,因为此时外部可能还没设置_accept_callback函数的回调!!!_channel.set_read_callback(std::bind(&Acceptor::read_handle, this));}// 启动监听套接字的可读事件监控void start_listen(){// 启动可读事件监控应该由外部控制顺序,必须在set_accept_callback()调用之后再去启动// 如果在构造函数中启动的话,此时如果还没设置回调函数就已经有新连接到来// 那么这些新连接是得不到处理的,因为回调函数还没设置,就会造成内存泄漏问题!_channel.enable_read();}// 设置监听套接字的可读事件回调处理函数void set_accept_callback(const AcceptCallBack& cb) { _accept_callback = cb; }
private:// 监听套接字的可读事件回调处理函数--即获取新连接,调用_accept_callback函数进行新连接处理void read_handle(){int newfd = _socket.Accept();if(newfd < 0)return;if(_accept_callback)_accept_callback(newfd);}// 包装一下创建套接字的过程int create_socket(uint16_t port){bool ret = _socket.create_server(port);assert(ret == true); // 直接断言,如果创建监听套接字失败了,那么其它的都没得说!return _socket.get_fd();}
};
Ⅲ. 测试代码
1、服务端代码
有了监听套接字模块之后,我们就不需要自己创建一个 Socket 和 Channel 对象了,而是直接使用 Acceptor 对象即可!
#include "../source/server.hpp"uint64_t id = 1; // 连接id
std::unordered_map<uint64_t, ConnectionPtr> connections; // 连接管理表
EventLoop loop; // 一个EventLoop对象,后面会用线程池来代替,这里作为全局变量使用即可void connected_handle(const ConnectionPtr& cptr)
{// 这里的连接建立处理,我们就简单的打印哪个连接建立即可DLOG("new connection: %p,the id is:%d", cptr.get(), cptr->get_connection_id());
}void message_handle(const ConnectionPtr& cptr, Buffer* buf)
{// 这里的消息事件处理,我们就做简单的打印以及回响即可DLOG("接收到:%s", buf->start_of_read());buf->push_reader_back(buf->get_sizeof_read());std::string str = "lirendada 你好啊!";cptr->send_data(str.c_str(), str.size());
}void closed_handle(const ConnectionPtr& cptr)
{// 就是将连接管理表中的该连接去掉DLOG("delete connection: %p,the id is:%d", cptr.get(), cptr->get_connection_id());connections.erase(cptr->get_connection_id());
}void acceptor_callback(int sockfd)
{// 用Connection包装该新链接,并且设置回调函数ConnectionPtr cptr(new Connection(&loop, id, sockfd));cptr->set_connected_callback(std::bind(connected_handle, std::placeholders::_1));cptr->set_message_callback(std::bind(message_handle, std::placeholders::_1, std::placeholders::_2));cptr->set_server_closed_callback(std::bind(closed_handle, std::placeholders::_1)); // 注意这里是服务器模块的关闭回调,也就是去掉与该连接的联系// 启动非活跃销毁功能,并将连接设置为建立完成状态cptr->enable_inactive_release(3);cptr->connecting_to_connceted();// 最后别忘了添加到服务器的连接管理表中connections[id++] = cptr;
}int main()
{// 创建监听套接字,然后利用bind函数设置获取新连接之后的回调函数,并且启动可读监控Acceptor acceptor(&loop, 8080);acceptor.set_accept_callback(std::bind(acceptor_callback, std::placeholders::_1));acceptor.start_listen();// 启动事件监控loop.start();return 0;
}
2、客户端代码
客户端的代码和之前一样不变!
#include "../source/server.hpp"int main()
{// 创建客户端套接字Socket client_sock;client_sock.create_client(8080, "127.0.0.1");// 做五次简单的发送和回响,所以会刷新五次连接for(int i = 0; i < 5; ++i){std::string str = "lirendada";client_sock.Send(str.c_str(), str.size());char buf[1024] = { 0 };client_sock.Recv(buf, sizeof(buf) - 1);DLOG("%s", buf);sleep(1);}// 进入死循环while(1) sleep(1);return 0;
}
3、执行结果

Ⅳ. 线程模块的意义
首先,我们不可能只是让一个线程去完成所有的事情,那样子的话效率是不高的,所以肯定是要进行多线程并发处理问题的!
此外,我们在项目介绍的时候还强调,一个 EventLoop 对象只能在一个线程中执行,这样子才不会有线程安全问题,当 EventLoop 模块实例化一个对象的时候,其构造函数也会初始化一个线程标识 _tid,如下所示(可以翻阅前面的笔记):
class EventLoop
{
private:std::thread::id _tid; // EventLoop对应的线程ID// ...
public:// ...
}
当我们在一个运行一个操作的时候,会先判断当前线程是否处于与当前对应 EventLoop 线程中,就是通过该 _tid 进行判断的!
但是有一个问题,就是如果同时创建了多个 EventLoop 对象,此时我们会分配各自的 _tid 给每个 EventLoop 对象,然后再从线程池中调出空闲的线程让 EventLoop 与其绑定,但此时其实是线程不安全的,相当于这个空窗期期间, _tid 是不可控的,所以我们 不能先创建 EventLoop 对象再分配线程!
解决方法其实很简单,我们只需要包装一下线程池中的线程即可,当需要创建一个 EventLoop 的时候,首先 创建线程或者从线程池中调出线程,然后通过该线程模块的入口函数实例化对应的 EventLoop 对象,这样子才能保证没有空窗期!
所以我们才需要有该线程池管理模块,它分为两部分:线程模块 LoopThread(单个)、线程池管理模块 LoopThreadPool(对多个线程模块进行管理)。下面我们先从单个线程管理开始设计!
Ⅴ. LoopThread 线程模块
1、设计思想
该模块其实思想很简单,就是将 thread 与 EventLoop 整合在一起,如下所示:
- 创建线程。
- 在线程中实例化
EventLoop对象。
然后我们只需要提供一个向外返回该实例化的 EventLoop 对象即可!下面给出其主体框架:
class LoopThread
{
private: EventLoop* _loop; // 当前EventLoop的指针,需要在线程内实例化std::thread _td; // 当前EventLoop对应的线程/* 需要有互斥锁和条件变量来防止_loop还为空的时候被获取 */std::mutex _mtx; std::condition_variable _cv;
public:// 构造函数,创建线程,设定线程的入口函数LoopThread();// 返回当前线程关联的EventLoop指针EventLoop* get_loop();
private:// 线程入口函数,负责实例化关联的EventLoopvoid thread_entry();
};
2、完整实现
下面需要注意的就是 thread_entry() 函数中实例化 EventLoop 对象的时候,如果我们是用动态内存开辟的话,那么后面就得想办法保管好内存,防止内存泄漏问题,但是更简单的操作就是直接生成一个临时对象即可,因为我们对于 EventLoop 线程,最重要的就是执行其 start() 函数,其内部是一个死循环,当该循环结束了之后出了函数,该线程自动就被销毁,无需我们关心其是否释放!
class LoopThread
{
private: EventLoop* _loop; // 当前EventLoop的指针,需要在线程内实例化std::thread _td; // 当前EventLoop对应的线程/* 需要有互斥锁和条件变量来防止_loop还为空的时候被获取 */std::mutex _mtx; // 互斥锁std::condition_variable _cv; // 条件变量
public:// 构造函数,创建线程,设定线程的入口函数LoopThread(): _td(std::thread(&LoopThread::thread_entry, this)), _loop(nullptr){}// 返回当前线程关联的EventLoop指针EventLoop* get_loop() {// 需要不为空才能返回,否则阻塞住直到不为空std::unique_lock<std::mutex> lock(_mtx);while(_loop == nullptr) _cv.wait(lock);return _loop;}
private:// 线程入口函数,负责实例化关联的EventLoopvoid thread_entry(){EventLoop tmp; // 使用临时对象而不是new的话就不用考虑指针何时去释放的问题了{// 需要进行加锁,并且创建完毕后唤醒get_loop()std::unique_lock<std::mutex> lock(_mtx);_loop = &tmp;_cv.notify_all();}// 启动EventLoop进行事件监控tmp.start();}
};
3、服务端测试代码
下面我们做个小测试,改动的内容就是服务端的代码,客户端都是一样的,所以这里就不展示了!
服务端无非就是让监听套接字连接作为主线程,然后获取到新连接之后,就从简单的线程池中挑出对应的线程也就是对应的 EventLoop 进行关联,其后执行内容都是在各自线程中执行的!服务端代码如下所示:
#include "../source/server.hpp"uint64_t id = 1; // 连接id
std::unordered_map<uint64_t, ConnectionPtr> connections; // 连接管理表EventLoop server_loop; // 用于监听套接字的EventLoop对象
std::vector<LoopThread> threadpool(2); // 模拟简单的线程池
int thread_id = 0;void connected_handle(const ConnectionPtr& cptr)
{// 这里的连接建立处理,我们就简单的打印哪个连接建立即可DLOG("new connection: %p,the id is:%d", cptr.get(), cptr->get_connection_id());
}void message_handle(const ConnectionPtr& cptr, Buffer* buf)
{// 这里的消息事件处理,我们就做简单的打印以及回响即可DLOG("接收到:%s", buf->start_of_read());buf->push_reader_back(buf->get_sizeof_read());std::string str = "lirendada 你好啊!";cptr->send_data(str.c_str(), str.size());
}void closed_handle(const ConnectionPtr& cptr)
{// 就是将连接管理表中的该连接去掉DLOG("delete connection: %p,the id is:%d", cptr.get(), cptr->get_connection_id());connections.erase(cptr->get_connection_id());
}void acceptor_callback(int sockfd)
{DLOG("我是服务器主线程,用于监听新连接!");// 用Connection包装该新链接,并且设置回调函数,其中新连接的EventLoop由线程池中的提供thread_id = (thread_id + 1) % 2;ConnectionPtr cptr(new Connection(threadpool[thread_id].get_loop(), id, sockfd));cptr->set_connected_callback(std::bind(connected_handle, std::placeholders::_1));cptr->set_message_callback(std::bind(message_handle, std::placeholders::_1, std::placeholders::_2));cptr->set_server_closed_callback(std::bind(closed_handle, std::placeholders::_1)); cptr->enable_inactive_release(3);cptr->connecting_to_connceted();connections[id++] = cptr;
}int main()
{// 创建监听套接字,然后利用bind函数设置获取新连接之后的回调函数,并且启动可读监控Acceptor acceptor(&server_loop, 8080);acceptor.set_accept_callback(std::bind(acceptor_callback, std::placeholders::_1));acceptor.start_listen();// 启动事件监控server_loop.start();return 0;
}

Ⅵ. LoopThreadPool线程管理模块
1、设计思想
该模块的思想也很简单,就是管理所有的 LoopThread 线程对象,并且进行线程的分配工作,也就是让哪个 Connection 对象关联上哪个 EventLoop 线程中去!其所需功能如下所示:
- 可配置线程数量
- 需要注意的是,在我们设计的服务器中,主线程负责新连接的获取,从属线程负责新连接的事件监控以及处理。
- 对所有线程进行管理
- 提供线程分配的功能
- 当主线程获取到了一个新连接,就需要将新连接挂到从属线程上进行事件监控以及处理!这时候有两种选择:
- 当前没有从属线程的话,则直接分配给主线程进行处理。
- 如果存在多个从属线程的话,则采用轮转分配的思想,进行线程的分配,也就是将对应的线程的
EventLoop指针交给对应的连接也就是Connection对象进行关联!
- 当主线程获取到了一个新连接,就需要将新连接挂到从属线程上进行事件监控以及处理!这时候有两种选择:
下面给出该模块的主体框架:
class LoopThreadPool
{
private:EventLoop* _mainloop; // 主线程:用于监听新连接(或者没有从属线程的话,也将连接交给主线程处理)int _nums_of_subthread; // 当前从属线程的数量std::vector<LoopThread*> _subthreads; // 存放从属线程的数组std::vector<EventLoop*> _loops; // 存放从属线程各自绑定的EventLoop*int _next_loop_id; // 指定下一个轮转到也就是要分配的EventLoop*的下标
public:LoopThreadPool();// 设置从属线程的数量void set_nums_of_subthread(int num);// 初始化线程数组和_loops数组void initialize();// 返回一个EventLoop*,表示分配到该EventLoop*对应的线程上EventLoop* allocate_thread();
};
2、完整实现
实现起来并不难,唯一要注意的就是 allocate_thread() 函数中对 _next_loop_id 的累加要进行取模操作,不然会出现越界的情况,其他的没说需要注意的!代码如下所示:
class LoopThreadPool
{
private:EventLoop* _mainloop; // 主线程:用于监听新连接(或者没有从属线程的话,也将连接交给主线程处理)int _nums_of_subthread; // 当前从属线程的数量std::vector<LoopThread*> _subthreads; // 存放从属线程的数组std::vector<EventLoop*> _loops; // 存放从属线程各自绑定的EventLoop*int _next_loop_id; // 指定下一个轮转到也就是要分配的EventLoop*的下标
public:LoopThreadPool(EventLoop* mainloop): _mainloop(mainloop), _nums_of_subthread(0), _next_loop_id(0){}// 设置从属线程的数量void set_nums_of_subthread(int num) { _nums_of_subthread = num; }// 初始化线程数组和_loops数组void initialize(){for(int i = 0; i < _nums_of_subthread; ++i){_subthreads.push_back(new LoopThread());_loops.push_back(_subthreads[i]->get_loop());}}// 返回一个EventLoop*,表示分配到该EventLoop*对应的线程上EventLoop* allocate_thread(){// 如果没有从属线程的话,则直接分配到主线程中处理即可if(_nums_of_subthread == 0)return _mainloop;// 否则的话返回当前轮到的从属EventLoop线程即可EventLoop* ret = _loops[_next_loop_id];_next_loop_id = (_next_loop_id + 1) % _nums_of_subthread;return ret;}
};
3、服务端测试代码
下面我们创建三个从属线程,进行测试,代码无需大改动,只需要小小的调整,如下所示:
#include "../source/server.hpp"uint64_t id = 1; // 连接id
std::unordered_map<uint64_t, ConnectionPtr> connections; // 连接管理表EventLoop server_loop;
LoopThreadPool threadpool(&server_loop);void connected_handle(const ConnectionPtr& cptr)
{// 这里的连接建立处理,我们就简单的打印哪个连接建立即可DLOG("new connection: %p,the id is:%d", cptr.get(), cptr->get_connection_id());
}void message_handle(const ConnectionPtr& cptr, Buffer* buf)
{// 这里的消息事件处理,我们就做简单的打印以及回响即可DLOG("接收到:%s", buf->start_of_read());buf->push_reader_back(buf->get_sizeof_read());std::string str = "lirendada 你好啊!";cptr->send_data(str.c_str(), str.size());
}void closed_handle(const ConnectionPtr& cptr)
{// 就是将连接管理表中的该连接去掉DLOG("delete connection: %p,the id is:%d", cptr.get(), cptr->get_connection_id());connections.erase(cptr->get_connection_id());
}void acceptor_callback(int sockfd)
{DLOG("我是服务器主线程,用于监听新连接!");// 用Connection包装该新链接,并且设置回调函数,其中新连接的EventLoop由线程池模块提供ConnectionPtr cptr(new Connection(threadpool.allocate_thread(), id, sockfd));cptr->set_connected_callback(std::bind(connected_handle, std::placeholders::_1));cptr->set_message_callback(std::bind(message_handle, std::placeholders::_1, std::placeholders::_2));cptr->set_server_closed_callback(std::bind(closed_handle, std::placeholders::_1)); // 启动非活跃销毁功能,并将连接设置为建立完成状态cptr->enable_inactive_release(3);cptr->connecting_to_connceted();// 最后别忘了添加到服务器的连接管理表中connections[id++] = cptr;
}int main()
{// 初始化一下线程池管理模块threadpool.set_nums_of_subthread(3);threadpool.initialize();// 创建监听套接字,然后利用bind函数设置获取新连接之后的回调函数,并且启动可读监控Acceptor acceptor(&server_loop, 8080);acceptor.set_accept_callback(std::bind(acceptor_callback, std::placeholders::_1));acceptor.start_listen();// 启动事件监控server_loop.start();return 0;
}


