【高并发服务器】十三、TcpServer服务器管理模块
文章目录
- Ⅰ. `TcpServer`服务器模块设计思想
 - Ⅱ. `TcpServer`服务器模块代码实现
 - Ⅲ. `NetWork`小模块
 - Ⅳ. 测试代码
 - 1、服务端测试代码
 
- Ⅴ. 基于`TcpServer`实现一个简单的回显服务器
 - Ⅵ. 简单的服务器压力测试
 
 
Ⅰ. TcpServer服务器模块设计思想
 
 该模块其实就是对前边所有子模块的整合模块,是提供给用户用于搭建一个高性能服务器的模块,目的就是为了让组件使用者可以更加轻便的完成一个服务器的搭建。
 其内部成员包括:
- 一个 
EventLoop对象:- 即主线程 
main_thread对应的EventLoop对象,实现对监听套接字的事件监控。这个对象是以备在超轻量使用场景中不需要LoopThreadPool线程池中的从属线程,而只需要在主线程中完成所有操作的情况。 
 - 即主线程 
 - 一个 
Acceptor对象:- 作为一个 
TcpServer服务器必然对应有一个监听套接字,能够完成获取客户端新连接,并处理任务。 
 - 作为一个 
 - 一个 
LoopThreadPool对象:- 其实就是 
LoopThread类型的线程池,也就是从属Reactor线程池。 
 - 其实就是 
 - 一个 
std::shared_ptr<Connection>类型的哈希表:- 这个哈希表保存了所有的新建连接对应的 
Connection对象,注意,所有的Connection使用智能指针shared_ptr进行管理,这样能够保证在哈希表中删除了Connection信息后,在shared_ptr计数器为0的情况下完成对Connection资源的释放操作,也就是利用了RAII思想! 
 - 这个哈希表保存了所有的新建连接对应的 
 - 还需要一些辅助的成员变量,比如当前 
Connection对象的id等等,这个具体看实现时候的主体框架! 
 其中功能接口的设计:
- 设置从属线程的数量
 - 启动服务器
 - 启动非活跃连接超时销毁功能
 - 添加定时任务功能
 - 设置各种回调函数(一个连接产生了一个事件,对于这个事件如何处理,只有组件使用者知道,因此一个事件的处理回调,一定是组件使用者,设置给 
TcpServer,然后由TcpServer模块设置给各个Connection连接):- 对于连接建立后的回调
 - 对于通信产生信息的回调
 - 对于连接关闭后的回调
 - 任意事件触发后的回调
 
 
 该模块大概的流程是这样子:
- 在 
TcpServer中实例化一个Acceptor对象,以及一个EventLoop对象(即main_loop) - 将 
Acceptor对象挂到mainloop上进行事件监控 - 一旦 
Acceptor对象就绪了可读事件,则执行可读事件回调函数获取新建连接 - 对获取的新建连接,创建一个 
Connection对象进行管理 - 对连接对应的 
Connection对象进行功能函数回调(连接建立回调、消息回调、关闭回调、任意事件回调) - 判断是否启动非活跃连接超时销毁功能,是的话则启动
 - 将新连接对应的 
Connection对象挂到从属线程中分配的EventLoop中去(如果没有从属线程的话则挂到主线程上去处理) - 一旦 
Connection对象的连接触发了可读事件,此时就会去执行其可读事件回调函数,读取数据,读取完毕之后就会调用TcpServer设置的消息回调 - 剩下的就是业务处理等过程……
 
 下面是服务器模块的主体框架:
class TcpServer
{
private:int _timeout;                  // 非活跃连接超时销毁的时间bool _enable_inactive_release; // 是否启动非活跃连接销毁功能,true表示开启,默认为falseuint16_t _port;      // 服务器端口号EventLoop _mainloop; // 主线程对应的EventLoop对象Acceptor _acceptor;  // 监听套接字管理对象,绑定到主线程上进行事件监控LoopThreadPool _pool; // 从属线程池uint64_t _next_id;                                     // 管理连接对象的keystd::unordered_map<uint64_t, ConnectionPtr> _conn_table; // 管理所有连接的shared_ptr对象// 下面是提供给组件使用者设置的回调函数ConnectedCallBack _connected_callback; // 连接建立之后的回调MessageCallBack _message_callback;     // 有消息之后的回调ClosedCallBack _closed_callback;       // 连接关闭之后的回调ArbitraryCallBack _arbitrary_callback; // 任意事件的回调
public:TcpServer();// 设置从属线程的数量void set_nums_of_subthread(int num);// 启动服务器(即打开主线程的事件监控)void start_server();// 启动非活跃连接超时销毁功能void enable_inactive_release(int timeout);// 添加定时任务功能void add_timer(int sec, const func_t& task);// 设置对应回调函数的接口void set_connected_callback(const ConnectedCallBack& conn) { _connected_callback = conn; }void set_message_callback(const MessageCallBack& msg) { _message_callback = msg; }void set_closed_callback(const ClosedCallBack& closed) { _closed_callback = closed; }void set_arbitrary_callback(const ArbitraryCallBack& event) { _arbitrary_callback = event; }    
private:// 添加定时任务功能的实际实现接口void add_timer_inloop(int sec, const func_t& task);// 监听套接字的可读事件处理函数,也就是为新连接构造一个Connection进行管理并进行设置等等void acceptor_handler(int fd);// 从管理Connection的哈希表中移除掉对其的引用,才能正确释放连接void release_connections(const ConnectionPtr& cptr);// 释放管理连接的实际实现接口void release_connections_inloop(const ConnectionPtr& cptr);
};
 
Ⅱ. TcpServer服务器模块代码实现
 
 在实现服务器过程要注意的点就是各个模块之间的初始化顺序,我们在外部使用 TcpServer 的时候是先实例化一个 TcpServer 对象,然后对其进行设置从属线程的数量,那么 从属线程池的初始化就不能放在 TcpServer 构造函数中初始化,因为此时从属线程个数还没被设置!
 还有就是要注意监听套接字在绑定主线程之前,要记得先设置其可读事件回调处理!
class TcpServer
{
private:int _timeout;                  // 非活跃连接超时销毁的时间bool _enable_inactive_release; // 是否启动非活跃连接销毁功能,true表示开启,默认为falseuint16_t _port;      // 服务器端口号EventLoop _mainloop; // 主线程对应的EventLoop对象Acceptor _acceptor;  // 监听套接字管理对象,绑定到主线程上进行事件监控LoopThreadPool _pool; // 从属线程池uint64_t _next_id;                                     // 管理连接对象的keystd::unordered_map<uint64_t, ConnectionPtr> _conn_table; // 管理所有连接的shared_ptr对象// 下面是提供给组件使用者设置的回调函数ConnectedCallBack _connected_callback; // 连接建立之后的回调MessageCallBack _message_callback;     // 有消息之后的回调ClosedCallBack _closed_callback;       // 连接关闭之后的回调ArbitraryCallBack _arbitrary_callback; // 任意事件的回调
public:TcpServer(uint16_t port): _port(port), _enable_inactive_release(false), _acceptor(&_mainloop, _port), _pool(&_mainloop), _next_id(0){// 设置监听套接字的回调处理,然后挂到主线程上_acceptor.set_accept_callback(std::bind(&TcpServer::acceptor_handler, this, std::placeholders::_1));_acceptor.start_listen();}// 设置从属线程的数量void set_nums_of_subthread(int num) { _pool.set_nums_of_subthread(num); }// 启动服务器(即打开主线程的事件监控)void start_server() { _pool.initialize(); // 先初始化一下从属线程池_mainloop.start(); }// 启动非活跃连接超时销毁功能void enable_inactive_release(int timeout){_timeout = timeout;_enable_inactive_release = true;}// 添加定时任务功能void add_timer(int sec, const func_t& task) { return _mainloop.run_in_thread(std::bind(&TcpServer::add_timer_inloop, this, sec, task)); }// 设置对应回调函数的接口void set_connected_callback(const ConnectedCallBack& conn) { _connected_callback = conn; }void set_message_callback(const MessageCallBack& msg) { _message_callback = msg; }void set_closed_callback(const ClosedCallBack& closed) { _closed_callback = closed; }void set_arbitrary_callback(const ArbitraryCallBack& event) { _arbitrary_callback = event; }    
private:// 添加定时任务功能的实际实现接口void add_timer_inloop(int sec, const func_t& task) { return _mainloop.add_timer(_next_id++, sec, task); }// 监听套接字的可读事件处理函数,也就是为新连接构造一个Connection进行管理并进行设置等等void acceptor_handler(int fd){// 用Connection包装该新链接,其中新连接的EventLoop由线程池模块提供ConnectionPtr cptr(new Connection(_pool.allocate_thread(), _next_id, fd));// 设置回调函数cptr->set_connected_callback(_connected_callback);cptr->set_message_callback(_message_callback);cptr->set_closed_callback(_closed_callback);cptr->set_arbitrary_callback(_arbitrary_callback);cptr->set_server_closed_callback(std::bind(&TcpServer::release_connections, this, std::placeholders::_1));// 启动非活跃销毁功能,并将连接设置为建立完成状态if(_enable_inactive_release == true)cptr->enable_inactive_release(_timeout);cptr->connecting_to_connceted();// 最后别忘了添加到服务器的连接管理表中_conn_table[_next_id++] = cptr;}// 从管理Connection的哈希表中移除掉对其的引用,才能正确释放连接void release_connections(const ConnectionPtr& cptr) { return _mainloop.run_in_thread(std::bind(&TcpServer::release_connections_inloop, this, cptr)); }// 释放管理连接的实际实现接口void release_connections_inloop(const ConnectionPtr& cptr) { _conn_table.erase(cptr->get_connection_id()); }
};
 
Ⅲ. NetWork小模块
 
 我们还需要将程序中的一些信号进行屏蔽,比如 SIGPIPE,防止当进程向一个已经关闭写端的管道写入数据时发生错误导致程序退出,如下所示:
// 该类用于构造一个对象的时候进行一些信号的忽略处理,防止因为不必要的信号而导致程序退出
class NetWork
{
public:NetWork(){/* 忽略SIGPIPE信号是防止当进程向一个已经关闭写端的管道写入数据时,内核会向进程发送SIGPIPE信号,或者当进程向一个已经关闭的socket连接写入数据时,内核也会向进程发送SIGPIPE信号。 */DLOG("SIGPIPE is ginored");signal(SIGPIPE, SIG_IGN);}
};static NetWork nw; // 实例化一个对象出来,这样子保证让其执行构造函数
 
Ⅳ. 测试代码
1、服务端测试代码
 客户端代码依旧不变,这里只需要修改一下服务端的代码即可,此时我们就是使用者,只需要设置我们需要的回调处理函数即可,如下所示:
#include "../source/server.hpp"void connected_handle(const ConnectionPtr& cptr)
{// 这里的连接建立处理,我们就简单的打印哪个连接建立即可DLOG("new connection: %p,the id is:%d", cptr.get(), cptr->get_connection_id());
}void message_handle(const ConnectionPtr& cptr, Buffer* buf)
{// 这里的消息事件处理,我们就做简单的打印以及回响即可DLOG("接收到:%s", buf->start_of_read());buf->push_reader_back(buf->get_sizeof_read());std::string str = "lirendada 你好啊!";cptr->send_data(str.c_str(), str.size());
}void closed_handle(const ConnectionPtr& cptr)
{// 就是将连接管理表中的该连接去掉DLOG("delete connection: %p,the id is:%d", cptr.get(), cptr->get_connection_id());
}int main()
{// 创建tcpserver对象,进行设置后直接启动服务器即可TcpServer server(8080);server.set_nums_of_subthread(3);server.enable_inactive_release(5);server.set_connected_callback(connected_handle);server.set_message_callback(message_handle);server.set_closed_callback(closed_handle);server.start_server();return 0;
}
 

Ⅴ. 基于TcpServer实现一个简单的回显服务器
 
 说白了就是对 TcpServer 设置接口的再次封装,使得调用起来更加的方便,其实也就是上面测试代码的一个封装罢了!
#include "../server.hpp"class EchoServer
{
private:TcpServer _server;
public:EchoServer(uint16_t port): _server(port){// 对服务器进行设置_server.set_nums_of_subthread(3);_server.enable_inactive_release(5);_server.set_connected_callback(std::bind(&EchoServer::connected_handle, this, std::placeholders::_1));_server.set_message_callback(std::bind(&EchoServer::message_handle, this, std::placeholders::_1, std::placeholders::_2));_server.set_closed_callback(std::bind(&EchoServer::closed_handle, this, std::placeholders::_1));}// 启动回显服务器接口void start() { _server.start_server(); }
private:void connected_handle(const ConnectionPtr& cptr){// 这里的连接建立处理,我们就简单的打印哪个连接建立即可DLOG("new connection: %p,the id is:%d", cptr.get(), cptr->get_connection_id());}void message_handle(const ConnectionPtr& cptr, Buffer* buf){// 这里的消息事件处理,我们就做简单的打印以及回响即可DLOG("接收到:%s", buf->start_of_read());buf->push_reader_back(buf->get_sizeof_read());std::string str = "lirendada 你好啊!";cptr->send_data(str.c_str(), str.size());}void closed_handle(const ConnectionPtr& cptr){// 就是将连接管理表中的该连接去掉DLOG("delete connection: %p,the id is:%d", cptr.get(), cptr->get_connection_id());
}
};
 
 测试代码就非常的简单了,如下所示:
#include "echoserver.hpp"int main()
{EchoServer echoserver(8080);echoserver.start();return 0;
}
 
 执行结果就不演示了,和上面是一样的!
Ⅵ. 简单的服务器压力测试
https://github.com/EZLippi/WebBench
 我们使用一个开源工具 webbench 来进行网站的压力测试,但其实因为我们是本地进行测试的,所以没有考虑实际的带宽问题,并且我们服务器的带宽其实是比较少的,不像公司那种级别的服务器,所以我们只是做个稍微能参考的测试就行!
 上面是该工具的链接,根据其说明直接克隆下来并且执行即可,然后我们只需要打开服务器,使用下述指令:
webbench -t60 -c500 http://127.0.0.1:8080/
 
 表示测试 60 秒,用 500 个进程去访问该服务器,我们来看看结果怎么样:
 
 可以看到其实性能还是挺不错的,每秒钟发送 2257 个比特,然后一共有 74 个请求失败。

