llfc项目TCP服务器笔记
ChatServer
一个TCP服务器必然会有连接的接收,维持,收发数据等逻辑。那我们就要基于asio完成这个服务的搭建。主服务是这个样子的
- #include "LogicSystem.h"
- #include <csignal>
- #include <thread>
- #include <mutex>
- #include "AsioIOServicePool.h"
- #include "CServer.h"
- #include "ConfigMgr.h"
- using namespace std;
- bool bstop = false;
- std::condition_variable cond_quit;
- std::mutex mutex_quit;
- int main()
- {
- try {
- auto &cfg = ConfigMgr::Inst();
- auto pool = AsioIOServicePool::GetInstance();
- boost::asio::io_context io_context;
- boost::asio::signal_set signals(io_context, SIGINT, SIGTERM);
- signals.async_wait([&io_context, pool](auto, auto) {
- io_context.stop();
- pool->Stop();
- });
- auto port_str = cfg["SelfServer"]["Port"];
- CServer s(io_context, atoi(port_str.c_str()));
- io_context.run();
- }
- catch (std::exception& e) {
- std::cerr << "Exception: " << e.what() << endl;
- }
- }
CServer类的声明
- #include <boost/asio.hpp>
- #include "CSession.h"
- #include <memory.h>
- #include <map>
- #include <mutex>
- using namespace std;
- using boost::asio::ip::tcp;
- class CServer
- {
- public:
- CServer(boost::asio::io_context& io_context, short port);
- ~CServer();
- void ClearSession(std::string);
- private:
- void HandleAccept(shared_ptr<CSession>, const boost::system::error_code & error);
- void StartAccept();
- boost::asio::io_context &_io_context;
- short _port;
- tcp::acceptor _acceptor;
- std::map<std::string, shared_ptr<CSession>> _sessions;
- std::mutex _mutex;
- };
构造函数中监听对方连接
- CServer::CServer(boost::asio::io_context& io_context, short port):_io_context(io_context), _port(port),
- _acceptor(io_context, tcp::endpoint(tcp::v4(),port))
- {
- cout << "Server start success, listen on port : " << _port << endl;
- StartAccept();
- }
接受连接的函数
- void CServer::StartAccept() {
- auto &io_context = AsioIOServicePool::GetInstance()->GetIOService();
- shared_ptr<CSession> new_session = make_shared<CSession>(io_context, this);
- _acceptor.async_accept(new_session->GetSocket(), std::bind(&CServer::HandleAccept, this, new_session, placeholders::_1));
- }
AsioIOServicePool
从AsioIOServicePool中返回一个可用的iocontext构造Session,然后将接受的新链接的socket写入这个Session保管。
AsioIOServicePool已经在前面讲解很多次了,它的声明如下
- #include <vector>
- #include <boost/asio.hpp>
- #include "Singleton.h"
- class AsioIOServicePool:public Singleton<AsioIOServicePool>
- {
- friend Singleton<AsioIOServicePool>;
- public:
- using IOService = boost::asio::io_context;
- using Work = boost::asio::io_context::work;
- using WorkPtr = std::unique_ptr<Work>;
- ~AsioIOServicePool();
- AsioIOServicePool(const AsioIOServicePool&) = delete;
- AsioIOServicePool& operator=(const AsioIOServicePool&) = delete;
- // 使用 round-robin 的方式返回一个 io_service
- boost::asio::io_context& GetIOService();
- void Stop();
- private:
- AsioIOServicePool(std::size_t size = std::thread::hardware_concurrency());
- std::vector<IOService> _ioServices;
- std::vector<WorkPtr> _works;
- std::vector<std::thread> _threads;
- std::size_t _nextIOService;
- };
AsioIOServicePool具体实现
- #include "AsioIOServicePool.h"
- #include <iostream>
- using namespace std;
- AsioIOServicePool::AsioIOServicePool(std::size_t size):_ioServices(size),
- _works(size), _nextIOService(0){
- for (std::size_t i = 0; i < size; ++i) {
- _works[i] = std::unique_ptr<Work>(new Work(_ioServices[i]));
- }
- //遍历多个ioservice,创建多个线程,每个线程内部启动ioservice
- for (std::size_t i = 0; i < _ioServices.size(); ++i) {
- _threads.emplace_back([this, i]() {
- _ioServices[i].run();
- });
- }
- }
- AsioIOServicePool::~AsioIOServicePool() {
- std::cout << "AsioIOServicePool destruct" << endl;
- }
- boost::asio::io_context& AsioIOServicePool::GetIOService() {
- auto& service = _ioServices[_nextIOService++];
- if (_nextIOService == _ioServices.size()) {
- _nextIOService = 0;
- }
- return service;
- }
- void AsioIOServicePool::Stop(){
- //因为仅仅执行work.reset并不能让iocontext从run的状态中退出
- //当iocontext已经绑定了读或写的监听事件后,还需要手动stop该服务。
- for (auto& work : _works) {
- //把服务先停止
- work->get_io_context().stop();
- work.reset();
- }
- for (auto& t : _threads) {
- t.join();
- }
- }
CServer的处理连接逻辑
- void CServer::HandleAccept(shared_ptr<CSession> new_session, const boost::system::error_code& error){
- if (!error) {
- new_session->Start();
- lock_guard<mutex> lock(_mutex);
- _sessions.insert(ma
