【高级IO】多路转接之单线程Reactor
这里写目录标题
- 一.Epoll的两种工作模式
- 二.单线程Reactor
- 1.Connection模块
- 2.Reactor服务器模块
- 2.1初始化Init
- 2.2启动循环服务器Loop
- 2.3事件派发Dispatcher
- 2.4连接管理器Accepter
- 2.5事件管理器Receiver
- 2.6发送管理器Sender
- 3.上层业务模块
- 定制协议
- 业务处理
- 代码
一.Epoll的两种工作模式
Epoll有两种工作模式,一种是LT,一种是ET。
select,poll,epoll默认都是LT模式的。
LT模式是指底层数据没有被上层全部取走的话,那么底层就一值保持就绪状态,并不断通知上层拿走。
ET模式是指底层数据被上层取走一次之后,不管数据有没有全部被取走,底层都会转换为不就绪状态,只有当下次又来新数据时,才会再通知上层取走。
ET模式会逼迫上层要最好将数据一次性全部取走。不然就得等下次新数据到来才能读取上次没有取完的数据。
因此ET的效率要比LT的效率要高,因为ET减少了epoll_wait返回的次数。
【上层如何将数据全部取走呢?】
通过循环读取缓冲区中的数据,直到读取完毕。这需要将文件描述符设置为非阻塞状态,这样每次读取完后都会立刻返回,直到底层没有数据可读时,就会返回EWOULDBLOCK。
二.单线程Reactor
Reactor设计的思想主要是,服务器模块主要功能是处理IO问题,而处理数据则是由上层业务。
每一个套接字创建出来都会形成一个连接对象。连接对象里存储着输入输出缓冲区。
也就是每一个套接字它都会有一对独立的输入输出缓冲区。并且每个连接对象都存储着套接字的读写异常回调方法。
当套接字创建出来时,我们就知道是关心该套接字的什么事件。比如listen套接字被创建出来,我们就知道要关心它的读事件。一旦客户端发起连接,那么就要立刻执行对应的读取连接方法。如果是普通的套接字被创建出来,我们就知道要关心它的读,写,异常事件。一旦客户端发起数据,那么就要立刻执行对应的读取数据方法…。
这些方法我们都是需要提前知道的,并在为每个套接字创建Connection对象时就设置进去。当一旦对应的套接字的事件就绪了,就会立刻执行对应的方法。
所以每一个套接字还配备着对应的读/写/异常事件的执行方法,供外层回调。
Reactor服务器端使用unordered_map将套接字和它的连接对象映射起来,进而管理所有的连接对象。
1.每一个套接字都拥有自己的输入输出缓冲区。
2.每一个套接字都拥有自己的读/写/异常方法
3.服务器通过unordered_map将套接字和连接映射起来,进而管理所有连接。
1.Connection模块
每一个套接字都会形成一个连接对象。
该Connection对象里存储着对应的套接字,输入输出缓冲区,读/写/异常回调方法,该连接里还需要定义一个回指指针,用来找到reactor服务器。
还可以定义对应的客户端的信息,在listen套接字中会有用。在普通的套接字没什么用。
class Connection;
class TcpServer;
using func_t = std::function<void(std::weak_ptr<Connection>)>;
// 定义一个包装器,类型别名func_t
class Connection
{
public:Connection(int sock) : _sock(sock){}//将上层的方法设置到里面connection对象里void SetHandler(func_t recv, func_t send, func_t except){_recv_cb = recv;_send_cb = send;_except_cb = except;}//服务器将从临时数组中读取的内容最终全部放入套接字的输入缓冲区中void AppendInbuffer(const std::string &request){_inbuffer += request;}//服务器将要发送的数据全部放入套接字输出缓冲区中void AppendOutbuffer(const std::string &response){_outbuffer += response;}int Sock(){return _sock;}std::string &Inbuffer(){return _inbuffer;}std::string &Outbuffer(){return _outbuffer;}void SetWeakPtr(std::weak_ptr<TcpServer> tcp_server_ptr){_tcp_svr = tcp_server_ptr;}~Connection(){}public:func_t _recv_cb;func_t _send_cb;func_t _except_cb;// 定义三个回调指针,用来指明连接套接字就绪执行什么方法std::weak_ptr<TcpServer> _tcp_svr;// 再定义一个回指指针,用来找到tcpserver服务器std::string _clientip;uint32_t _clientport;private:int _sock; // 链接的套接字std::string _inbuffer; // 每个链接里的输入缓冲区std::string _outbuffer; // 每个连接里的输出缓冲区
};
2.Reactor服务器模块
2.1初始化Init
初始化工作:
1.创建listen套接字,绑定,设置监听。
2.Reactor是Epoll的ET模式,需要将所有套接字都设置为非阻塞的,所以需要将listen套接字设置为非阻塞。
3.需要将listen套接字设置到内核红黑树中,让内核关心listen套接字读事件。
4.为listen套接字创建对应的Connection对象,并将读取连接事件就绪时的执行方法Accepter设置到Connection对象中,最后将套接字和连接插入到map里管理起来
我们将后面两步封装为一个函数SetOS_BindConnectio
void Init(){ _listensock_ptr->Socket();SetNonBlock(_listensock_ptr->Fd());_listensock_ptr->Bind(_port);_listensock_ptr->Listen();lg(Info, "lsiten create sucess fd:%d", _listensock_ptr->Fd());// 首次要关心只有listen套接字,并且是我们是知道listen套接字一定要关心,并且关心的是读事件,读事件就绪后,就执行读方法SetOS_BindConnection(_listensock_ptr->Fd(),EVENT_IN, std::bind(&TcpServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);}void SetOS_BindConnection(int sock, uint32_t event, func_t recv, func_t send, func_t except){// 1.将套接字设置到内核里_epoll_ptr->EpollerUpdate(EPOLL_CTL_ADD, sock, event);// 2.将套接字和Connection关联起来std::shared_ptr<Connection> new_connect(new Connection(sock));// 2.1并设置对应套接字链接内部的回调方法,外部指明要关心的套接字事件就绪后执行什么方法,就传入什么方法new_connect->SetHandler(recv, send, except);// 3.将套接字和连接放入map里管理_conmap.insert(std::make_pair(sock, new_connect));lg(Info, "add a new connection success fd:%d", sock);}
2.2启动循环服务器Loop
将服务器启动并循环执行
void Loop(){_quit = false;while (!_quit){Dispatcher(-1);//-1传入代表阻塞式}_quit = true;}
2.3事件派发Dispatcher
开始等待内核关心的文件描述就绪。如果一旦有对应文件描述符的事件就绪,就会立刻返回,去执行事件,可能一次有n个事件就绪,就需要循环执行n次。根据事件的不同,执行不同的方法。
如果对应就绪的套接字,里面设置了响应方法,代表着它可以执行,就去执行。
比如是listen套接字读事件就绪了,那么listen套接字关联的Connection对象就设置了Accepter方法,用来接收连接。
比如是普通套接字读事件就绪了,那么普通套接字关联的Connection对象就设置了Receiver方法,用来读取数据。
比如是普通套接字的写事件就绪了,那么普通套接字关联的Connection对象就设置里Seneder方法,用来发送数据。
就绪的套接字在哪呢?就在Connection对象里,所以需要将Connection作为参数再传入函数中。
void Dispatcher(int timeout){int n = _epoll_ptr->EpollerWait(revent, max_event_num, timeout);// 一旦有事件就绪了,就将所有事件全部执行for (int i = 0; i < n; i++){uint32_t event = revent[i].events; // 什么事件就绪了int sock = revent[i].data.fd; // 哪个文件描述符就绪了if ((event & EVENT_IN) && isConnectionSafe(sock)){if (_conmap[sock]->_recv_cb)_conmap[sock]->_recv_cb(_conmap[sock]);// 如果是listen套接字,就绪,就回执行Accepter方法// 如果是普通套接字,就绪,就会执行Receiver方法// 不同类型的套接字设置了不同的方法}if ((event & EVENT_OUT) && isConnectionSafe(sock)){if (_conmap[sock]->_send_cb)_conmap[sock]->_send_cb(_conmap[sock]);}}}
2.4连接管理器Accepter
一旦llisten套接字就绪,就会立刻执行Accepter,从套接字关联的Connection对象中获取到就绪的套接字。
因为listen套接字设置为非阻塞,需要循环读取。这里可以直接进行accept,获取客户端的套接字和信息。
一旦获取成功,就表示又产生一个新的套接字了,这个新的套接字仍然需要设置为非阻塞,插入到内核红黑树中,创建对应的Connection对象,设置要执行的读写异常方法,最后插入到map中管理。
因为非阻塞,成功获取客户端连接后,底层没有连接了,循环再回来,就会读取就会返回EWOULDBLOCK。
本次连接结束。
void Accepter(std::weak_ptr<Connection> con){while (true){std::string clientip;uint16_t clientport;int sock = _listensock_ptr->Accept(&clientip, &clientport);//获取客户端的连接if (con.expired())return;auto connection = con.lock(); // 用来获取shared_ptr//获取客户端的信息connection->_clientip = clientip;connection->_clientport = clientport;if (sock > 0){lg(Debug, "get a new client, info[ip:%s] [port:%d],sock:%d", clientip.c_str(), clientport, sock);// 1.每个普通的套接字都需要设置为非阻塞的,因为现在是ET模式SetNonBlock(sock);// 2.将获取的套接字设置到内核红黑树上,对每个新的套接字再关联对应的连接对象// 普通套接字关心读,写,异常,所以需要将读方法,写方法,异常方法都设置到连接对象里SetOS_BindConnection(sock, EVENT_IN,std::bind(&TcpServer::Receiver, this, std::placeholders::_1),std::bind(&TcpServer::Sender, this, std::placeholders::_1),std::bind(&TcpServer::Excepter, this, std::placeholders::_1));}else{if (errno == EWOULDBLOCK)break;else if (errno == EINTR)continue;elsebreak;}}}
2.5事件管理器Receiver
Reactor服务器它是用来处理IO问题的,不是处理数据格式问题的,所以它不关心数据的格式正不正确,它只管全部读取,处理数据部分交给上层业务。
应不应该关心数据的格式???不应该!!服务器只要IO数据就可以,有没有读完,报文的格式细节,你不用管。
如果是普通套接字的读事件就绪了,那么就会立刻调用Recevier,读取套接字中的数据。就绪的套接字在哪里呢?在Connection对象里,所以Recevier也需要传入一个Connection对象。
因为Reactor是ET模式,要求就绪一次,要将底层所有的数据全部取走。
将读取到的数据全部放入到套接字的输入缓冲区中。
①n>0:表示读取成功,将读取到的数据放入套接字的输入缓冲区中。
②n==0:表示对端连接关闭,读取阻塞,就去执行该套接字的异常处理并退出。
③其他:有两种情况,一种是底层没有数据了,会返回EWOULDBLOCK,表示本次读取完毕,退出循环即可。一种试试连接出异常了,去执行异常处理并退出。
只有将底层数据全部读取出来后,最后再交给上层业务去处理。
void Receiver(std::weak_ptr<Connection> con){if (con.expired())return;auto connection = con.lock();int fd = connection->Sock();// epoll是ET模式,要求就绪一次,将所有数据全部读取走while (true){char buffer[buffer_size];memset(buffer, 0, sizeof(buffer));ssize_t n = recv(fd, buffer, sizeof(buffer) - 1, 0); // 非阻塞读取if (n > 0) // 说明读取成功{// 读取成功就放入到该套接字的输入缓冲区中connection->AppendInbuffer(buffer);// std::cout << connection->Inbuffer() << endl;}else if (n == 0){lg(Info, "clien:%s|port:%d quit ....", connection->_clientip.c_str(), connection->_clientport);// 说明对方挂掉了,读不到,就去执行异常处理connection->_except_cb(connection);return;}else // 读取失败,有两种情况:1读取失败 2读取不就绪{// 读取不就绪就是底层没有数据可读了,全部读完了if (errno == EWOULDBLOCK)break;else if (errno == EINTR)continue;else{lg(Warning, "sock:%d ,client:%s %d recv erro", fd, connection->_clientip.c_str(), connection->_clientport);connection->_except_cb(connection);return;}}}// 将数据全部读取完后,再传递给上层业务处理// 数据是有了,但是不一定全,所以需要:1. 检测 2. 如果有完整报文,就处理_OnMessage(connection); // 你读到的sock所有的数据全在connection}
2.6发送管理器Sender
发送和读取不太一样,我们要正确的理解发送:
你要想发送的前提1是,你把对应的数据放入到来套接字的输出缓冲区中,前提2是底层的Tcp的输出缓冲区有空间。
1.epoll/select,因为写事件(发送缓冲区经常是有空间的)经常是就绪的。
2.如果对写事件设置关心,那么EPOLLOUT几乎每次都会有就绪,就会导致epoll经常返回。浪费CPU资源。
3.所以对于读事件,我们是设置常关心,而对于写事件,按需设置。
【做法】:一开始不设置关心,直接写入,如果写入完成,就结束。但如果写入完成,套接字的输出缓冲区还有数据,底层的发送缓冲区已经满了。这时候我们就需要对写事件进行关心了。然后如果一旦底层有空间了,就会接着发送数据,如果套接字的输出缓冲区写完了,就去掉对写事件的关心。
void Sender(std::weak_ptr<Connection> con){if (con.expired())return;auto connection = con.lock();auto &Outbuffer = connection->Outbuffer();while (true){ssize_t n = send(connection->Sock(), Outbuffer.c_str(), Outbuffer.size(), 0);if (n > 0){Outbuffer.erase(0, n);if (Outbuffer.empty())break;// 说明发送成功,且数据全部发送出去。}else if (n == 0){return;}else // 有两种情况:底层发送缓冲区不就绪 底层出问题{if (errno == EWOULDBLOCK){//什么问题呢,说明我们我一直发一直发一发可是最后底层缓冲区空间不够了。我上层可能还有数据,但底层不够了不够了,这时候就需要对写事件设置关心并直接breakif(!Outbuffer.empty()){EnableEvent(connection->Sock(),true,true);break;}else{EnableEvent(connection->Sock(),true,false);break;}}else if (errno == EINTR)continue;else{lg(Warning, "sock:%d ,client:%s %d recv erro", connection->Sock(), connection->_clientip.c_str(), connection->_clientport);connection->_except_cb(connection);return;}}}}//开启写事件关心,开启后,一旦底层的发送缓冲区有空间了,那么它就会立刻再回调Sender将套接字中的输出缓冲区数据发送,然后再去检测。void EnableEvent(int sock, bool readable, bool writeable){uint32_t events = 0;events |= ((readable ? EPOLLIN : 0) | (writeable ? EPOLLOUT : 0) | EPOLLET);_epoll_ptr->EpollerUpdate(EPOLL_CTL_MOD, sock, events);}
原理:直接send,那么这次缓冲区的数据可能没有全部发完,(底层没有空间了),没发完怎么办?
没发完我们就开启对写事件关心,一旦我们缓冲区有空间了,那么会自动帮我们做事件派发呢,就会响应到sender,而这个sender呢,就是我们刚刚所设定的它它会继续再发,如果继续再发,还是没发,继续关心写事件,如果此时,它发完了,那么它会把写事件自动关掉。
3.上层业务模块
如何正确的读取到一个完整的报文,需要双方定制协议,遵循相同的协议来对数据进行处理。现在我们就定义一个简单的计算器协议。
比如通过添加报头来构建一个完整的报文。对端就需要解包来提取一个完整的有效载荷。
定制协议
定义双方都认识的结构体。
1.构建一个结构化request请求
2.序列化(对结构体序列化形成字符串)
2.添加报头(形成完整报文)
对端:
1.解包(将有效数据提取处理)
2.反序列化(将字符串反序列化为结构体)
3.对结构化数据进行处理
4.构建结构化响应response
5.再对结构化响应序列化
6.添加报头。
Protocol.hpp
#pragma once
//#define MySelf 1
// 在网络通信之前,我们服务器端和客户端都需要知道协议。我们也可以自己定制协议,这个协议要被双方都能识别
// 比如我们可以定制一个计数器协议。协议就是一种约定,除了数据本身还有其他的字段。
// 1.我们要求将数据以结构化的形式保存这样双方都可以识别这个结构体对象,但传入网络里时,需要转换成字符类型。这个过程就是序列化.序列化的过程就是在构建有效载荷
// 2.对方接收到字符串类型的数据时,想要用服务操作时,发现是不能操作的,是因为它不认识,这时还需要将字符类型转成结构体类型,这个过程叫做反序列化。
// 3.为了能让对方接收时,能接收读取到对方想要的完整报文时,我们采取添加报头的形式来解决。
// 4.所以在将报文传入到网络里时,还需要添加报文,当对端接收到报文时,想要对它进行处理之前,还需要将报文的报头解包才可以正确处理。
#include <iostream>
#include <jsoncpp/json/json.h>
#include <string>
const std::string blank_space = " ";
const std::string protocol_space="\n";
// 封包:报文在发送到网络之前需要添加一些报头,来达到一些要求
std::string Encode(const std::string &content)//content就是有效载荷
{//"x + y"------>"len"\n"x + y"\n" 添加了一个报文长度和两个\nstd::string packpage=std::to_string(content.size());packpage+=protocol_space;packpage+=content;packpage+=protocol_space;return packpage;
}// 解包:对端读取到报文(可能读取到的不是想要的,根据原先添加上去的报头来获取准确想要的报文),想要处理它,需要先解除报头才能处理
bool Decode(std::string &packpage, std::string *content)
{ //"len"\n"x + y"\n"---->"x + y"std::size_t pos=packpage.find(protocol_space);if(pos==std::string::npos)return false;std::string len_str=packpage.substr(0,pos);//判断一下是否读取的内容是全部的std::size_t len =std::stoi(len_str);std::size_t total_len=len_str.size()+len+2;if(packpage.size()<total_len)//说明不是一个完整的报文return false;*content=packpage.substr(pos+1,len);//为了真正的拿走报文,还需要将响应inbuffer里的报文移除erase,这样才是真正的拿走报文packpage.erase(0,total_len);return true;
}class Request
{
public:Request(){}Request(int data1, int data2, char op) : _x(data1), _y(data2), _op(op) // 最初形成结构化数据{}bool Serialize(std::string *out) // 序列化,单纯的就是将结构体转换成字符串{
#ifdef MySelf // 构建报文的有效载荷// struct==》"x + y"std::string s = std::to_string(_x);s += blank_space;s += _op;s += blank_space;s += std::to_string(_y);*out = s;return true;#elseJson::Value root;//定义一个万能对象,可以存储数据,k-v形式的结构体root["x"]=_x;root["y"]=_y;root["op"]=_op;//Json::FastWriter w;Json::StyledWriter w;*out=w.write(root);//序列化成字符串return true; #endif}bool Deserialize(std::string &in) // 反序列化,就单纯的将字符串类型转成结构体{
#ifdef MySelf //"x + y"==>struct//获取左操作数xstd::size_t left=in.find(blank_space);if(left==std::string::npos)return false;std::string part_x=in.substr(0,left);//获取右操作数ystd::size_t right=in.rfind(blank_space);if(right==std::string::npos)return false;std::string part_y=in.substr(right+1);//获取操作码opif(left+2!=right)return false;_op=in[left+1];_x=std::stoi(part_x);_y=std::stoi(part_y);return true;
#elseJson::Value root;//定义一个万能对象,将序列化的数据存储在里面Json::Reader r;r.parse(in,root);//将数据存到万能对象里后,我们就可以根据key值找到_x=root["x"].asInt();_y=root["y"].asInt();_op=root["op"].asInt();return true;
#endif }void DebugPrint(){std::cout<<"新请求构建完毕:"<<_x<<_op<<_y<<"=???"<<std::endl;}
public: // x + yint _x;int _y;char _op;
};
class Response
{
public:Response(int reslut, int code) : _result(reslut), _code(code){}Response(){}bool Serialize(std::string *out) // 序列化,单纯的就是将结构体转换成字符串{
#ifdef MySelf//"reslut code"//构建报文的有效载荷std::string s=std::to_string(_reslut);s+=blank_space;s+=std::to_string(_code);*out=s;return true;
#elseJson::Value root;root["reslut"]=_result;root["code"]=_code;//Json::FastWriter w;Json::StyledWriter w;*out=w.write(root);return true;
#endif }bool Deserialize(std::string &in){
#ifdef MySelf//"reslut code"-->结构体类型std::size_t pos=in.find(blank_space);if(pos==std::string::npos)return false;std::string part_left=in.substr(0,pos);std::string part_right=in.substr(pos+1);_reslut=std::stoi(part_left);_code=std::stoi(part_right);return true;
#elseJson::Value root;Json::Reader r;r.parse(in,root);//将字符串数据存到万能对象里_result=root["reslut"].asInt();_code=root["code"].asInt();return true;
#endif}void DebugPrint(){std::cout<<"结果响应完成,reslut: "<<_result<<",code: "<<_code<<std::endl;}public:int _result;int _code;
};
业务处理
上面定义好一个计算器的协议,双方都要遵循,现在我们就要实现具体的计算器业务,能实现加减乘除的简单计算。
主要功能是,对接收到的序列化数据,进行相关处理:
1.首先对序列化数据解包,获取有效数据
2.将序列化数据反序列化,形成结构化对象。
3.处理结构化对象,构建响应
4.将响应序列化,并添加报头。
#pragma once
#include <iostream>
#include "Protocol.hpp"enum
{Div_Zero = 1,Mod_Zero,Other_Oper
};// 上层业务
class Calculator
{
public:Calculator(){}Response CalculatorHelper(const Request &req){Response resp(0, 0);switch (req._op){case '+':resp._result = req._x + req._y;break;case '-':resp._result = req._x - req._y;break;case '*':resp._result = req._x * req._y;break;case '/':{if (req._y == 0)resp._code = Div_Zero;elseresp._result = req._x / req._y;}break;case '%':{if (req._y == 0)resp._code = Mod_Zero;elseresp._result = req._x % req._y;}break;default:resp._code = Other_Oper;break;}return resp;}// "len"\n"10 + 20"\nstd::string Handler(std::string &package){std::string content;bool r = Decode(package, &content); // "len"\n"10 + 20"\nif (!r)return "";// "10 + 20"Request req;r = req.Deserialize(content); // "10 + 20" ->x=10 op=+ y=20if (!r)return "";content = ""; //Response resp = CalculatorHelper(req); // result=30 code=0;resp.Serialize(&content); // "30 0"content = Encode(content); // "len"\n"30 0"return content;}~Calculator(){}
};
#include <iostream>
#include <memory>
#include "TcpServer.hpp"
#include "Calculator.hpp"Calculator calculator;
void DefaultOnMessage(std::weak_ptr<Connection> con)
{ if(con.expired())return;auto connection =con.lock();//对报文进行处理std::cout<<"上层获取到数据: "<<connection->Inbuffer()<<std::endl;std::string response_cstr=calculator.Handler(connection->Inbuffer());//上层处理流程:获得序列化数据->解包->反序列化->处理构建响应->响应序列化->封包if(response_cstr.empty())return;lg(Debug,"%s",response_cstr.c_str());connection->AppendOutbuffer(response_cstr);//将处理的完的响应放入到输出缓冲区中auto tcpserver=connection->_tcp_svr.lock();tcpserver->Sender(connection);//处理完后就要将序列化的响应发送给客户端}int main()
{std::unique_ptr<TcpServer> epoll_svr(new TcpServer(8888,DefaultOnMessage));epoll_svr->Init();epoll_svr->Loop();
}
最终上层处理好数据后,就将处理好的数据放入套接字输出缓冲区中,服务器对数据进行发送。
代码
服务区端:
#include <iostream>
#include <string>
#include <memory>
#include <cerrno>
#include <functional>
#include <unordered_map>
#include "Log.hpp"
#include "nocopy.hpp"
#include "Epoller.hpp"
#include "Socket.hpp"
#include "Comm.hpp"class Connection;
class TcpServer;
using func_t = std::function<void(std::weak_ptr<Connection>)>;
// 定义一个包装器,类型别名func_t
const uint16_t defaultport = 8888;
const int max_event_num = 64;
const static int buffer_size = 128;uint32_t EVENT_IN = (EPOLLIN | EPOLLET);
uint32_t EVENT_OUT = (EPOLLOUT | EPOLLET);class Connection
{
public:Connection(int sock) : _sock(sock){}void SetHandler(func_t recv, func_t send, func_t except){_recv_cb = recv;_send_cb = send;_except_cb = except;}void AppendInbuffer(const std::string &request){_inbuffer += request;}void AppendOutbuffer(const std::string &response){_outbuffer += response;}int Sock(){return _sock;}std::string &Inbuffer(){return _inbuffer;}std::string &Outbuffer(){return _outbuffer;}void SetWeakPtr(std::weak_ptr<TcpServer> tcp_server_ptr){_tcp_svr = tcp_server_ptr;}~Connection(){}public:func_t _recv_cb;func_t _send_cb;func_t _except_cb;// 定义三个回调指针,用来指明连接套接字就绪执行什么方法std::weak_ptr<TcpServer> _tcp_svr;// 再定义一个回指指针,用来找到tcpserver服务器std::string _clientip;uint32_t _clientport;private:int _sock; // 链接的套接字std::string _inbuffer; // 每个链接里的输入缓冲区std::string _outbuffer; // 每个连接里的输出缓冲区
};class TcpServer : public nocopy
{public:TcpServer(int port , func_t Onmessage): _port(port),_epoll_ptr(new Epoller()),_listensock_ptr(new Sock()),_quit(true),_OnMessage(Onmessage){}void Init(){ _listensock_ptr->Socket();SetNonBlock(_listensock_ptr->Fd());_listensock_ptr->Bind(_port);_listensock_ptr->Listen();lg(Info, "lsiten create sucess fd:%d", _listensock_ptr->Fd());// 首次要关心只有listen套接字,并且是我们是知道listen套接字一定要关心,并且关心的是读事件,读事件就绪后,就执行读方法SetOS_BindConnection(_listensock_ptr->Fd(),EVENT_IN, std::bind(&TcpServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);}void SetOS_BindConnection(int sock, uint32_t event, func_t recv, func_t send, func_t except){// 1.将套接字设置到内核里_epoll_ptr->EpollerUpdate(EPOLL_CTL_ADD, sock, event);// 2.将套接字和Connection关联起来std::shared_ptr<Connection> new_connect(new Connection(sock));// 2.1并设置对应套接字链接内部的回调方法,外部指明要关心的套接字事件就绪后执行什么方法,就传入什么方法new_connect->SetHandler(recv, send, except);// 3.将套接字和连接放入map里管理_conmap.insert(std::make_pair(sock, new_connect));lg(Info, "add a new connection success fd:%d", sock);}//连接管理器void Accepter(std::weak_ptr<Connection> con){while (true){std::string clientip;uint16_t clientport;int sock = _listensock_ptr->Accept(&clientip, &clientport);if (con.expired())return;auto connection = con.lock(); // 用来获取shared_ptrconnection->_clientip = clientip;connection->_clientport = clientport;if (sock > 0){lg(Debug, "get a new client, info[ip:%s] [port:%d],sock:%d", clientip.c_str(), clientport, sock);// 1.每个普通的套接字都需要设置为非阻塞的,因为现在是ET模式SetNonBlock(sock);// 2.将获取的套接字设置到内核红黑树上,对每个新的套接字再关联对应的连接对象// 普通套接字关心读,写,异常,所以需要将读方法,写方法,异常方法都设置到连接对象里SetOS_BindConnection(sock, EVENT_IN,std::bind(&TcpServer::Receiver, this, std::placeholders::_1),std::bind(&TcpServer::Sender, this, std::placeholders::_1),std::bind(&TcpServer::Excepter, this, std::placeholders::_1));}else{if (errno == EWOULDBLOCK)break;else if (errno == EINTR)continue;elsebreak;}}}// 事件管理器// 应不应该关心数据的格式???不应该!!服务器只要IO数据就可以,有没有读完,报文的格式细节,你不用管。void Receiver(std::weak_ptr<Connection> con){if (con.expired())return;auto connection = con.lock();int fd = connection->Sock();// epoll是ET模式,要求就绪一次,将所有数据全部读取走while (true){char buffer[buffer_size];memset(buffer, 0, sizeof(buffer));ssize_t n = recv(fd, buffer, sizeof(buffer) - 1, 0); // 非阻塞读取if (n > 0) // 说明读取成功{// 读取成功就放入到该套接字的输入缓冲区中connection->AppendInbuffer(buffer);// std::cout << connection->Inbuffer() << endl;}else if (n == 0){lg(Info, "clien:%s|port:%d quit ....", connection->_clientip.c_str(), connection->_clientport);// 说明对方挂掉了,读不到,就去执行异常处理connection->_except_cb(connection);return;}else // 读取失败,有两种情况:1读取失败 2读取不就绪{// 读取不就绪就是底层没有数据可读了,全部读完了if (errno == EWOULDBLOCK)break;else if (errno == EINTR)continue;else{lg(Warning, "sock:%d ,client:%s %d recv erro", fd, connection->_clientip.c_str(), connection->_clientport);connection->_except_cb(connection);return;}}}// 将数据全部读取完后,再传递给上层业务处理// 数据是有了,但是不一定全,所以需要:1. 检测 2. 如果有完整报文,就处理_OnMessage(connection); // 你读到的sock所有的数据connection}void Sender(std::weak_ptr<Connection> con){if (con.expired())return;auto connection = con.lock();auto &Outbuffer = connection->Outbuffer();while (true){ssize_t n = send(connection->Sock(), Outbuffer.c_str(), Outbuffer.size(), 0);if (n > 0){// 说明发送成功Outbuffer.erase(0, n);if (Outbuffer.empty())break;}else if (n == 0){return;}else // 有两种情况:底层发送缓冲区不就绪 底层出问题{if (errno == EWOULDBLOCK){if(!Outbuffer.empty()){EnableEvent(connection->Sock(),true,true);break;}else{EnableEvent(connection->Sock(),true,false);break;}}else if (errno == EINTR)continue;else{lg(Warning, "sock:%d ,client:%s %d recv erro", connection->Sock(), connection->_clientip.c_str(), connection->_clientport);connection->_except_cb(connection);return;}}}}void EnableEvent(int sock, bool readable, bool writeable){uint32_t events = 0;events |= ((readable ? EPOLLIN : 0) | (writeable ? EPOLLOUT : 0) | EPOLLET);_epoll_ptr->EpollerUpdate(EPOLL_CTL_MOD, sock, events);}void Excepter(std::weak_ptr<Connection>){}void Loop(){_quit = false;while (!_quit){Dispatcher(-1);PrintConnection();}_quit = true;}void Dispatcher(int timeout){int n = _epoll_ptr->EpollerWait(revent, max_event_num, timeout);// 一旦有事件就绪了,就将所有事件全部执行for (int i = 0; i < n; i++){uint32_t event = revent[i].events; // 什么事件就绪了int sock = revent[i].data.fd; // 哪个文件描述符就绪了if ((event & EVENT_IN) && isConnectionSafe(sock)){if (_conmap[sock]->_recv_cb)_conmap[sock]->_recv_cb(_conmap[sock]);// 如果是listen套接字,就绪,就回执行Accepter方法// 如果是普通套接字,就绪,就会执行Receiver方法// 不同类型的套接字设置了不同的方法}if ((event & EVENT_OUT) && isConnectionSafe(sock)){if (_conmap[sock]->_send_cb)_conmap[sock]->_send_cb(_conmap[sock]);}}}bool isConnectionSafe(int sock){auto iter = _conmap.find(sock);if (iter == _conmap.end())return false;elsereturn true;}void PrintConnection(){std::cout << "_connections fd list: ";for (auto &connection : _conmap){std::cout << connection.second->Sock() << ", ";std::cout << "inbuffer: " << connection.second->Inbuffer().c_str();}std::cout << std::endl;}~TcpServer(){}private:std::shared_ptr<Epoller> _epoll_ptr;std::shared_ptr<Sock> _listensock_ptr;std::unordered_map<int, std::shared_ptr<Connection>> _conmap;// 将套接字和连接以kv绑定在一起,用map管理所有的连接uint16_t _port;bool _quit;struct epoll_event revent[max_event_num];func_t _OnMessage;
};
客户端:
#include <iostream>
#include <time.h>
#include <unistd.h>
#include <assert.h>
#include "Socket.hpp"
#include "Protocol.hpp"
void Usage(std::string proc)
{std::cout<<"\n\rUsage: "<<proc<<" port[1024+]\n"<<std::endl;
}
//./tcpclient ip port
int main(int args,char* argv[])
{if(args!=3){Usage(argv[0]);exit(1);}std::string serverip=argv[1];uint16_t serverport=std::stoi(argv[2]);Sock sockfd;sockfd.Socket();//创建套接字bool r=sockfd.Connect(serverip,serverport);//发起连接if(!r)return 1;srand(time(nullptr)^getpid());int cnt=1;std::string oper="+-*/%=$";std::string inbuffer_stream;while(cnt<=10){std::cout<<"========第"<<cnt<<"次测试"<<"============"<<std::endl;//1.开始构建请求int x=rand()%100+1;usleep(1234);int y=rand()%100;usleep(4321);char op=oper[rand()%oper.size()];Request req(x,y,op);//2.请求构建完毕req.DebugPrint();//3.数据序列化形成报文std::string content;req.Serialize(&content);//4.添加报头std::string packpage=Encode(content);//5.发送到网络里write(sockfd.Fd(),packpage.c_str(),packpage.size());//6.接收服务器端发送来的响应char buffer[128];ssize_t n=read(sockfd.Fd(),buffer,sizeof(buffer));//6.1处理读取if(n>0){buffer[n]=0;inbuffer_stream+=buffer;//接收到的是一个协议报文"len"\n"reslut code"\nstd::cout<<std::endl;std::cout<<"获取到的网络答应:"<<std::endl;std::cout<<inbuffer_stream<<std::endl;//将从网络里获取到的报文打印出来//7.首先需要解包检测std::string content;bool r =Decode(inbuffer_stream,&content);assert(r);//8.反序列化,将答应变成客户端可认识的形式Response resp;r=resp.Deserialize(content);assert(r);//9.结果响应完成resp.DebugPrint();}std::cout<<"============================="<<std::endl;sleep(1);cnt++;}sockfd.Close();
}