Linux网络编程05:IO多路转接(万字图文解析)
05-IO多路转接
一、五种网络IO模型
1、数据传输过程
服务器从网络接收的大致流程如下:
- 数据通过计算机网络传送到网卡
- 把网卡的数据复制到内核socket缓冲区
- 把内核socket缓冲区复制到用户缓冲区,之后应用程序就可以使用数据了
核心就是两次拷贝读取操作,五种网络IO模型的不同之处也就在于这两个读取操作怎么交互
2、阻塞/非阻塞 与 同步/异步
阻塞/非阻塞关注的是用户态进程/线程的状态,其要访问的数据是否就绪,进程/线程是否需要等待。当前接口数据还未准备就绪时,线程是否被阻塞挂起。所谓阻塞挂起,就是当前线程还处于CPU时间片当中,调用了阻塞的方法,由于数据未准备就绪,则时间片还未到就让出CPU。而非阻塞就是当前接口数据还未准备就绪时,线程不会被阻塞挂起,可以不断轮询请求接口,看看数据是否已经准备就绪。
同步/异步关注的是消息通信机制。所谓同步,就是在发出一个调用时,自己需要参与等待结果的过程,则为同步。同步需要主动读写数据,在读写数据的过程中还是会阻塞。异步IO,则指出发出调用以后到数据准备完成,自己都未参与,则为异步。异步只需要关注IO操作完成的通知,并不主动读写数据,由操作系统内核完成数据的读写。
3、IO模型
1)阻塞式IO
应用调用recvfrom读取数据时,其系统调用直到数据包到达且被复制到应用缓冲区中或者发送错误时才返回,在此期间一直会等待,进程从调用到返回这段时间内都是被阻塞的。在内核将数据准备好之前,系统调用会一直等待。所有的套接字, 默认都是阻塞方式。
- 应用进程向内核发起recfrom读取数据
- 内核进行准备数据报(此时应用进程阻塞)
- 内核将数据从内核复制到应用空间
- 复制完成后,返回成功提示
2)非阻塞式IO
当应用进程发起读取数据申请时,如果内核数据没有准备好会即刻告诉应用进程,不会让应用进程在这里等待,如果内核还未将数据准备好,系统调用仍然会直接返回,并且返回EWOULDBLOCK错误码。非阻塞IO往往需要开发者循环的方式反复尝试读写文件描述符。
- 应用进程向内核发起recvfrom读取数据
- 内核数据报没有准备好,即刻返回EWOULDBLOCK错误码
- 应用进程再次向内核发起recvfrom读取数据
- 内核如果已有数据包准备好就进行下一步骤,否则还是返回错误码
- 内核将数据拷贝到用户空间
- 完成后,返回成功提示
3)信号驱动式IO
信号驱动IO是在调用sigaction时候建立一个SIGIO的信号联系,当内核准备好数据之后再通过SIGIO信号通知线程,此文件描述符准备就绪;当线程收到可读信号后,此时再向内核发起recvfrom读取数据的请求,因为信号驱动IO的模型下,应用线程在发出信号监控后即可返回,不会阻塞,所以一个应用线程也可以同时监控多个文件描述符。
- 应用进程向内核发起recvfrom读取数据
- 内核进行准备数据报,即刻返回
- 内核倘若已有数据包准备好则通知应用线程
- 应用进程向内核发起recvfrom读取数据
- 内核将数据拷贝到用户空间
- 完成后,返回成功提示
4)异步IO
应用进程只需要向内核发送一个读取请求,告诉内核它要读取数据后即刻返回;内核收到请求后会建立一个信号联系,当数据准备就绪,内核会主动把数据从内核空间复制到用户空间,等所有操作都完成之后,内核会发起一个通知告诉应用进程。
- 应用进程向内核发起recvfrom读取数据
- 内核进行准备数据报,即刻返回
- 内核收到后会建立一个信号联系,如果已有数据包准备好,内核将数据拷贝到用户空间
- 完成后,返回成功提示
5)IO多路复用
由一个线程监控多个网络请求(linux系统把所有网络请求以一个文件描述符来标识),来完成数据状态询问的操作,当有数据准备就绪之后再分配对应的线程去读取数据。
- 应用进程向内核发起recvfrom读取数据
- 内核进行准备数据报(此时应用进程阻塞)
- 内核倘若已有数据包准备好则通知应用线程
- 内核将数据拷贝到用户空间
- 完成后,返回成功提示
二、fcntl
fcntl(File Control)用于对已打开的文件描述符进行各种控制操作。它提供了对文件描述符的低级控制能力。
#include <fcntl.h>
int fcntl(int fd, int cmd, ... /* arg */ );
- fd: 要操作的文件描述符,通常是通过
open()
,socket()
等系统调用获得的。 - cmd: 控制命令,指示要执行的操作类型,例如获取/设置标志操作等。
- arg: 可选参数,其类型和含义取决于
cmd
。例如,可能是整数、指向struct flock
的指针等。
fcntl 的功能非常丰富,主要通过 cmd
参数来控制。以下是其主要操作类型:
命令类型 | 常用命令 | 功能描述 |
---|---|---|
复制文件描述符 | F_DUPFD | 复制文件描述符,新描述符共享文件表项但可能有独立的标志。 |
文件描述符标志 | F_GETFD , F_SETFD | 获取/设置文件描述符标志,如 FD_CLOEXEC (控制 exec 时是否关闭)。 |
文件状态标志 | F_GETFL , F_SETFL | 获取/设置文件状态标志,如 O_NONBLOCK (非阻塞)、O_APPEND (追加)。 |
异步 I/O 所有权 | F_GETOWN , F_SETOWN | 获取/设置接收 SIGIO 和 SIGURG 信号的进程或进程组ID。 |
- 示例:设置文件描述符为非阻塞模式
#include <fcntl.h>
#include <unistd.h>int SetNonblock(int fd)
{int flags = fcntl(fd, F_GETFL, 0); // 获取当前标志if (flags == -1) {return -1;}flags |= O_NONBLOCK; // 添加非阻塞标志if (fcntl(fd, F_SETFL, flags) == -1) // 设置新标志{ return -1;}return 0;
}
设置后,read
, write
, accept
等调用在无法立即完成时会返回 -1 并将 errno
设置为 EAGAIN
或 EWOULDBLOCK
。
- open和recv也有flags,但是不常用,一般使用fcntl:
三、IO多路转接
* 原理
多进程与多线程并发服务器,不经常使用这种作为大型服务器开发的原因是,所有的监听与访问请求都由服务器操作。可以使用多路IO转接服务器(也叫多任务IO服务器),思想:不再由应用程序自己监视客户端连接,取而代之由内核替应用程序监视文件。
1、select
select
是 Linux中一种经典的I/O多路复用机制。它的核心作用是:让单个进程或线程能够同时监视多个文件描述符(file descriptors)的状态变化,等待其中任何一个或多个变得“就绪”(可读、可写或出现异常),从而避免阻塞在单个 I/O 操作上。
1)select的好处
传统的 I/O 模型面临两个问题:
- 阻塞 I/O:调用read()或accept()会一直阻塞,直到数据到来。单线程只能处理一个连接,效率极低。
- 多线程/多进程模型:为每个连接创建一个线程或进程。虽然能并发处理,但资源开销巨大(内存、上下文切换),可扩展性差。
select提供了一个高效的解决方案:
- 单线程处理多连接:一个线程可以监听成百上千个连接。
- 资源消耗低:远低于多线程/多进程模型。
2)函数接口
#include <sys/select.h>
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
参数介绍:
- nfds:监控的文件描述符集里最大文件描述符+1,因为此参数会告诉内核检测前多少个文件描述符的状态。
- readfs/writes/exceptfds:监控有读数据/写数据/异常发生到达文件描述符集合,三个都是传入传
- fd_set:文件描述符集合
select
使用一个位图(bitmap) 的结构来管理文件描述符集合,每一位(bit)对应一个文件描述符编号。如果某一位被设置为 1,就表示对应的文件描述符正在被监视。
对fd_set
的操作必须使用下面的一组宏:
| 宏 | 功能说明 |
| :— | :— |
|FD_ZERO(fd_set *set)
| 清空集合。在每次调用select
前,必须用此宏初始化集合。 |
|FD_SET(int fd, fd_set *set)
| 添加一个文件描述符fd
到集合中。 |
|FD_CLR(int fd, fd_set *set)
| 从集合中移除一个文件描述符fd
。 |
|FD_ISSET(int fd, fd_set *set)
| 检查文件描述符fd
是否在集合中(通常在select
返回后使用,判断哪个描述符就绪)。 |
重要限制:fd_set
的大小是固定的,由常量 FD_SETSIZE
决定(通常为 1024)。这意味着 select
能监视的文件描述符数量有上限(默认最多 1024 个),这是它的一个主要缺陷。
- timeout:
可以传入struct timeval结构体:
struct timeval
{long tv_sec; /* seconds */long tv_usec; /* microseconds */
};
timeout
参数的三种用法:
- 永久阻塞:
timeout = NULL
。select
会一直阻塞,直到有描述符就绪。 - 非阻塞(轮询):
timeout->tv_sec = 0; timeout->tv_usec = 0;
。select
立即返回,检查后不管结果如何都返回。 - 限时等待:设置具体的秒和微秒值。
select
最多等待这么长时间,超时后返回 0。
- 返回值:
返回值 | 含义 |
---|---|
> 0 | 成功,返回就绪的文件描述符的总个数。 |
0 | 超时,在超时时间到达前没有任何描述符就绪。 |
-1 | 出错,并设置相应的errno。 |
- 错误码:
EBADF 文件描述符为无效的或该文件已关闭
EINTR 此调用被信号所中断
EINVAL 参数n 为负值。
ENOMEM 核心内存不足
3)就绪条件
读就绪
- socket内核中,接收缓冲区中的字节数,大于等于低水位标记SO_RCVLOWAT。此时可以无阻塞的读该文件描述符,并且read/recv返回值大于0;
- socket TCP通信中,对端关闭连接,此时对该socket读,read/recv返回0;
- 监听的socket上有新的连接请求;
- socket上有未处理的错误;
写就绪:
- socket内核中,发送缓冲区中的可用字节数(发送缓冲区的空闲位置大小),大于等于低水位标记SO_SNDLOWAT,此时可以无阻塞的写,并且返回值大于0;
- socket的写操作被关闭(close或者shutdown)。对一个写操作被关闭的socket进行写操作,会触发SIGPIPE信号;
- socket使用非阻塞connect连接成功或失败之后;
- socket上有未读取的错误;
异常就绪
- socket上收到带外数据(与TCP的urg报文有关)。
4)示例代码
[shenalex@VM-8-16-centos 01-select]$ tree .
.
|-- err.hpp
|-- log.hpp
|-- main.cc
|-- makefile
|-- selectServer.hpp
`-- sock.hpp0 directories, 6 files
selectServer.hpp
#include <sys/types.h>
#include <sys/select.h>
#include <cerrno>
#include <string>
#include <functional>
#include "sock.hpp"namespace select_ns
{class SelectServer{static const uint16_t defaultPort = 8080; // 默认端口号static const int defaultFd = -1; // 表示无效的文件描述符static const int fdnum = sizeof(fd_set) * 8; // fd_set能容纳的最大文件描述符数量, fd_set本质是位图结构,每个字节可表示8个fdusing func_t = std::function<std::string(std::string)>;public:SelectServer(func_t fun, const uint16_t &port) : _func(fun), _port(port), _listensock(-1), _fdarray(nullptr){}void init(){_listensock = Sock::Socket();Sock::Bind(_listensock, _port);Sock::Listen(_listensock);// 初始化文件描述符管理数组, 用于记录所有需要监控的fd_fdarray = new int[fdnum];for (int i = 0; i < fdnum; i++)_fdarray[i] = defaultFd; // 初始化为默认无效的fd_fdarray[0] = _listensock; // 监听套接字存放在_fdarry[0]}void start(){for (;;){// 1.初始化文件描述符集fd_set rfds; // 读文件描述符集,每次循环都需要重新初始化(select会修改该集合, 因为是"输入输出型"参数)FD_ZERO(&rfds); // 清空集合int maxfd = _fdarray[0]; // 找到当前最大的文件描述符,加1之后用于select的第一个参数for (int i = 0; i < fdnum; i++){if (_fdarray[i] == defaultFd) // 跳过无效文件描述符continue;FD_SET(_fdarray[i], &rfds); // 合法文件描述符全部添加到读文件描述符集中if (maxfd < _fdarray[i])maxfd = _fdarray[i]; // 找到最大的文件描述符}logMessage(NORMAL, "max fd is :%d", maxfd);// 2.设置超时// timeout=0为非阻塞模式(轮询模式),timeout > 0, 等待一段时间后再阻塞,timeout为nullptr是阻塞struct timeval timeout = {1, 0}; // 1s+0us = 1.0s// 3.调用selectint n = select(maxfd + 1, &rfds, nullptr, nullptr, &timeout);// 内核会 线性 遍历传入的整个文件描述符集合。// 4.分析返回值switch (n){case 0:logMessage(NORMAL, "timeout..."); // 返回值为0意为超时是正常情况,比如等待了给定的一段时间后还没等到(timeout参数大于0时)break;case -1:logMessage(WARNING, "select error, code:%d, err string:%s", errno, strerror(errno)); // 出错时打印错误码break;default:// 有事件就绪了,对于这里来说, 由于我们没有设置写事件和异常事件(select第3,4参数),就意味着是 读事件就绪了logMessage(NORMAL, "have event ready");handlerReadEvent(rfds);break;}}}~SelectServer(){if (_listensock >= 0)close(_listensock);if (_fdarray != nullptr)delete[] _fdarray;}private:void handlerReadEvent(fd_set &rfds){for (int i = 0; i < fdnum; i++){// 过滤掉非法的if (_fdarray[i] == defaultFd)continue;// 5.检查就绪描述符// 下面可能是监听事件(我们认为是一种特殊的读事件)就绪也可能是其他读事件if (_fdarray[i] == _listensock && FD_ISSET(_fdarray[i], &rfds)) // 把_fdarray[i] == _listensock写在&&之前,减少内核遍历rfdsAccepter();else if (FD_ISSET(_fdarray[i], &rfds))Recver(_fdarray[i], i);}}void Accepter(){// 到此处,accept一定不会阻塞,因为select已经告知事件已经就绪std::string clientIp;uint16_t clientPort;int sock = Sock::Accept(_listensock, &clientIp, &clientPort);if (sock < 0)return;logMessage(NORMAL, "accept success [%s:%d]", clientIp.c_str(), clientPort);// 我们得到的sock套接字不能够直接recv/read, 我们设计为只有select才有资格检测事件是否就绪// 因此我们要把新的sock托管给select, 即添加到fdarray数组中int i = 0;for (; i < fdnum; i++){if (_fdarray[i] != defaultFd)continue;break;}// 检查是否有空闲位置if (i == fdnum){// 服务器连接数已满,关闭新连接logMessage(WARNING, "server full, please wait");close(sock);}else{// 将新套接字加入数组,随着函数退回,再次到start函数的for循环里面,新的套接字由select检测事件是否就绪_fdarray[i] = sock;}}void Recver(int sock, int pos){// FIXME 因为tcp协议中传输的字节流,直接从recv中读取,可能读不完整,这样读有问题char buffer[1024];ssize_t s = recv(sock, buffer, sizeof(buffer) - 1, 0);if (s > 0){buffer[s] = 0;logMessage(NORMAL, "client# %s", buffer);}else if (s == 0){close(sock);_fdarray[pos] = defaultFd;logMessage(NORMAL, "client quit");return;}else{close(sock);_fdarray[pos] = defaultFd;logMessage(ERROR, "client quit, error string: %s", strerror(errno));return;}// 2. 处理请求std::string response = _func(buffer);// 3. 响应报文// FIXME write bugwrite(sock, response.c_str(), response.size());}private:uint16_t _port;int _listensock;int *_fdarray; // 用于管理所有需要监控的文件描述符的数组func_t _func; // 回调函数};
}
sock.hpp
#pragma once
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include <cstring>
#include <cstdlib>
#include <memory>
#include <string>
#include "log.hpp"
#include "err.hpp"class Sock
{static const int backlog = 32;public:static int Socket(){int sock = socket(AF_INET, SOCK_STREAM, 0);if (sock < 0){logMessage(FATAL, "create socket failed");exit(SOCKET_ERR);}logMessage(NORMAL, "create socket success");int opt = 1;setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));return sock;}static void Bind(int sock, uint16_t port){struct sockaddr_in local;memset(&local, 0, sizeof(local));local.sin_addr.s_addr = htonl(INADDR_ANY);local.sin_family = AF_INET;local.sin_port = htons(port);if (bind(sock, reinterpret_cast<const sockaddr *>(&local), sizeof(local)) == -1){logMessage(FATAL, "bind failed");exit(BIND_ERR);}logMessage(NORMAL, "bind success");}static void Listen(int sock){if (listen(sock, backlog) == -1){logMessage(FATAL, "listen socket failed");exit(LISTEN_ERR);}logMessage(NORMAL, "listen socket success");}static int Accept(int listensock, std::string *clientIp, uint16_t *clientPort){struct sockaddr_in peer;memset(&peer, 0, sizeof(peer));socklen_t len = sizeof(peer);int sock = accept(listensock, reinterpret_cast<sockaddr *>(&peer), &len);if (sock == -1)logMessage(ERROR, "accept failed, next one");*clientPort = ntohs(peer.sin_port);*clientIp = inet_ntoa(peer.sin_addr);return sock;}
};
err.hpp
#pragma onceenum
{USAGE_ERR = 1,SOCKET_ERR,BIND_ERR,LISTEN_ERR
};
main.cc
#include <string>
#include <memory>
#include "selectServer.hpp"std::string handle(std::string s)
{s += "[server echo]";return s;
}int main(int argc, char *argv[])
{uint16_t port = 8080;if (argc == 2){port = atoi(argv[1]);}std::unique_ptr<select_ns::SelectServer> sv(new select_ns::SelectServer(handle, port));sv->init();sv->start();return 0;
}
- log.hpp:略
- makefile:略
5)select的缺陷
- 文件描述符数量限制:受
FD_SETSIZE
(通常 1024)限制,不适合高并发场景。 - 性能随描述符数量线性下降:每次调用都需要在内核态和用户态之间复制整个描述符集合,并且
select
返回后,应用程序需要遍历所有被监视的描述符才能发现哪些就绪。当描述符很多但活跃度不高时,效率很低。 - 内核态轮询:
select
的内部实现本质上是一种轮询机制,内核需要线性扫描所有被监视的描述符,开销较大。
2、poll
#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
1)参数介绍
- fds:struct pollfd数组。
- nfds:监控数组中有多少文件描述符需要被监控。
- timeout:毫秒级等待
-1:阻塞等待
0:立即返回,不阻塞进程
大于0:等待指定毫秒数 - struct pollfd:
struct pollfd
{int fd; /* file descriptor */short events; /* requested events */short revents; /* returned events */
};
// events:是应用程序设置的 “关注事件”(输入参数),告诉内核 “我想监控这个文件描述符的哪些事件”(如 POLLIN 表示关注读事件)。
// revents:是内核填充的 “就绪事件”(输出参数),告诉应用程序 “这个文件描述符上实际发生了哪些事件”。
- events和revents常见取值:
POLLIN:有数据可读(如 TCP 套接字收到数据、管道有数据、终端输入)。
POLLOUT:可以写入数据(如内核发送缓冲区有空闲空间)。
POLLERR:发生错误(如对方关闭连接后仍尝试写入)。
POLLHUP:连接被挂断(如 TCP 客户端主动关闭连接)。
POLLNVAL:文件描述符无效(如未打开的 fd 被加入监控)。
- 返回值:
| 返回值 | 含义 |
| :— | :— |
| > 0 | 成功,返回就绪的文件描述符的总个数。 |
| 0 | 超时,在超时时间到达前没有任何描述符就绪。 |
| -1 | 出错,并设置相应的errno。 |
2)优缺点
优点:
- 突破文件描述符1024的上限
- 监听与返回的集合分离
- 搜索范围变小(已经知道是那几个数组)
缺点:
- 和select函数一样,poll返回后,需要轮询pollfd来获取就绪的描述符
- 每次调用poll都需要把大量的pollfd结构从用户态拷贝到内核中
- 同时连接的大量客户端在一时刻可能只有很少的处于就绪状态,因此随着监视的描述符数量的增长,其效率也会线性下降
- 查看一个进程可以打开的文件描述符的上限数:
[shenalex@VM-8-16-centos 01-select]$ cat /proc/sys/fs/file-max
198973
- sudo vi /etc/security/limits.conf 在文件尾部写入以下配置,soft软限制,hard硬限制:
soft nofile 65536
hard nofile 100000
3)示例代码
[shenalex@VM-8-16-centos 02-poll]$ tree .
.
|-- err.hpp
|-- log.hpp
|-- main.cc
|-- makefile
|-- pollServer.hpp
`-- sock.hpp0 directories, 6 files
pollServer.hpp
#include <sys/types.h>
#include <poll.h>
#include <cerrno>
#include <string>
#include <functional>
#include "sock.hpp"namespace poll_ns
{class PollServer{static const uint16_t defaultPort = 8080;static const int defaultFd = -1; // 表示无效的文件描述符(未使用的位置)static const int num = 2048; // 最大可管理的文件描述符数量using func_t = std::function<std::string(std::string)>;public:PollServer(func_t fun, const uint16_t &port) : _func(fun), _port(port), _listensock(-1), _rfds(nullptr){}void init(){_listensock = Sock::Socket();Sock::Bind(_listensock, _port);Sock::Listen(_listensock);// 初始化pollfd数组:用于管理所有需要监控的文件描述符及其事件_rfds = new struct pollfd[num];for (int i = 0; i < num; i++)ResetItem(i); // 将每个元素初始化为无效状态// 初始化监听套接字的pollfd项->监控读事件_rfds[0].fd = _listensock;_rfds[0].events = POLLIN;}void start(){int timeout = -1;for (;;){int n = poll(_rfds, num, timeout);switch (n){case 0:logMessage(NORMAL, "timeout...");break;case -1:logMessage(WARNING, "poll error, code:%d, error string:%s", errno, strerror(errno));break;default:logMessage(NORMAL, "have event ready");handlerReadEvent();break;}}}~PollServer(){if (_listensock >= 0)close(_listensock);if (_rfds != nullptr)delete[] _rfds;}private:void ResetItem(int i){_rfds[i].fd = defaultFd;_rfds[i].events = 0; // 不监控任何事件_rfds[i].revents = 0; // 清除事件状态(由内核设置)}void handlerReadEvent(){for (int i = 0; i < num; i++){// 过滤掉非法的if (_rfds[i].fd == defaultFd)continue;if (!(_rfds[i].events & POLLIN))continue;if (_rfds[i].fd == _listensock && (_rfds[i].revents & POLLIN))Accepter();else if (_rfds[i].revents & POLLIN)Recver(i);}}void Accepter(){// 到此处,accept一定不会阻塞,因为poll已经告知listen监听接收新连接事件已经就绪std::string clientIp;uint16_t clientPort;int sock = Sock::Accept(_listensock, &clientIp, &clientPort);if (sock < 0)return;logMessage(NORMAL, "accept success [%s:%d]", clientIp.c_str(), clientPort);int i = 0;for (; i < num; i++){if (_rfds[i].fd != defaultFd)continue;break;}if (i == num){logMessage(WARNING, "server full, please wait");close(sock);}else{_rfds[i].fd = sock;_rfds[i].events = POLLIN;_rfds[i].revents = 0;}}void Recver(int pos){// FIXME: 因为tcp协议中传输的字节流,直接从recv中读取,可能读不完整,这样读有问题char buffer[1024];ssize_t s = recv(_rfds[pos].fd, buffer, sizeof(buffer) - 1, 0);if (s > 0){buffer[s] = 0;logMessage(NORMAL, "client# %s", buffer);}else if (s == 0){close(_rfds[pos].fd);ResetItem(pos);logMessage(NORMAL, "client quit");return;}else{close(_rfds[pos].fd);ResetItem(pos);logMessage(ERROR, "client quit, error string: %s", strerror(errno));return;}// 处理请求std::string response = _func(buffer);// 响应报文// FIXME: write bugwrite(_rfds[pos].fd, response.c_str(), response.size());}private:uint16_t _port;int _listensock;func_t _func; // 处理客户端请求的回调函数(业务逻辑)struct pollfd *_rfds; // 管理所有监控的文件描述符和事件};
}
main.cc
#include <string>
#include <memory>
#include "pollServer.hpp"std::string handle(std::string s)
{s += "[server echo]";return s;
}int main(int argc, char *argv[])
{uint16_t port = 8080;if (argc == 2){port = atoi(argv[1]);}std::unique_ptr<poll_ns::PollServer> sv(new poll_ns::PollServer(handle, port));sv->init();sv->start();return 0;
}
- err.hpp:略
- log.hpp:略
- makefile:略
- sock.hpp:略
3、epoll
epoll是Linux下IO多路复用接口select/poll的增强版本,能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率,因为它会复用文件描述符集合来传递结果而不是迫使开发者每次等待事件之前都必须重新准备要侦听的文件描述符集合,另一个原因是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历哪些被内核IO事件唤醒而加入Ready队列的描述符集合就行了。
1)三个系统调用
- epoll_create:创建一个 epoll 实例,返回一个文件描述符。自从linux2.6.8之后,size参数是被忽略的,但为了兼容性仍需传入一个大于零的值。用完之后,必须调用close()关闭。
#include <sys/epoll.h>
int epoll_create(int size);
- epoll_ctl:用于向 epoll 实例(由
epfd
指定)中注册、修改或删除需要监控的文件描述符(由fd
指定)及其关注的事件(由event
指定)。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
参数讲解:
- op:
EPOLL_CTL_ADD
(添加)、EPOLL_CTL_MOD
(修改)、EPOLL_CTL_DEL
(删除)。 - event:
event
参数是一个指向struct epoll_event
的指针,其结构如下:
typedef union epoll_data
{void *ptr;int fd;uint32_t u32;uint64_t u64;
} epoll_data_t;struct epoll_event
{uint32_t events; /* Epoll events */epoll_data_t data; /* User data variable */
} __EPOLL_PACKED;
-
event可以是以下事件组合:
EPOLLIN
: 文件描述符可读。
EPOLLOUT
: 文件描述符可写。
EPOLLERR
: 文件描述符发生错误。
EPOLLHUP
: 文件描述符被挂起(对端关闭连接)。
EPOLLET
: 设置边缘触发(Edge-Triggered)模式。
EPOLLONESHOT
: 设置一次性触发模式,事件通知后该文件描述符会被禁用,需重新用EPOLL_CTL_MOD
启用。 -
删除时,系统调用并不关心传入的event事件。在2.6.8内核之前必须传入一个struct epoll_event(即使不关心),但2.6.8之后可以传入null
- epoll_wait:等待在 epoll 实例中注册的文件描述符上的事件发生。
int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int
timeout);
参数讲解:
- events:用来存内核得到事件的集合(这里是个传出参数)
- maxevents:告知内核这个events有多大,这个maxevents的值不能大于创建epoll_create时的size(Linux 2.6.8后忽略)
- timeout:指定超时时间(毫秒),-1表示阻塞直到有事件发生,0表示立即返回。
- 返回值:函数返回就绪的事件数量,并通过
events
数组返回这些就绪事件的详细信息,出错返回-1。
2)epoll 的核心工作流程
epoll_create
:创建一个 epoll 实例(内核中的eventpoll
结构体),返回一个文件描述符epfd
。该结构体内部包含一棵红黑树 (用于高效管理所有待监测的文件描述符) 和一个就绪链表 (用于存放有事件发生的文件描述符,无需从红黑树拷贝节点到就绪链表,只需将就绪的每个节点直接链接起来)。epoll_ctl
:向epfd
添加、修改或删除需要监控的文件描述符(如 socket)及其感兴趣的事件(如可读EPOLLIN
、可写EPOLLOUT
)。此过程本质上是将用户进程“委托”给内核:内核会将 epoll 对象(而非用户进程本身)加入到指定 socket 的等待队列中,并设置一个回调函数 (ep_poll_callback
)。epoll_wait
: 等待事件发生。这是用户进程可能发生阻塞的地方,但它的作用是等待内核返回就绪链表中已有的事件,而非主动轮询所有 socket。一旦有 socket 发生 I/O 事件(如数据到达),网卡的中断处理程序或内核协议栈会最终调用事先注册的ep_poll_callback
函数。这个回调函数会将该 socket 对应的结构体(epitem
)放入就绪链表。epoll_wait
只需检查这个就绪链表是否非空,并将其中的事件拷贝到用户空间,然后返回给用户进程。用户进程收到这些就绪事件后,即可进行相应的 I/O 操作。
struct eventpoll
{ .... /*红黑树的根节点,这颗树中存储着所有添加到epoll中的需要监控的事件*/ struct rb_root rbr; /*双链表。存放着将要通过epoll_wait返回给用户的满足条件的事件*/ struct list_head rdlist; ....
};
// 当调用epoll_wait检查是否有事件发生时,只需要检查eventpoll对象中的rdlist双链表中是否有epitem元素即可
// 如果rdlist不为空,则把发生的事件复制到用户态,同时将事件数量返回给用户,这个操作的时间复杂度是O(1)
struct epitem
{ struct rb_node rbn;//红黑树节点 struct list_head rdllink;//双向链表节点 struct epoll_filefd ffd; //事件句柄信息 struct eventpoll *ep; //指向其所属的eventpoll对象 struct epoll_event event; //期待发生的事件类型
}
3)触发模式:LT 与 ET
epoll 提供了两种事件触发模式:
- 水平触发(Level-Triggered, LT):这是epoll默认模式。只要文件描述符关联的读内核缓冲区非空,有数据可以读取,就一直发出可读信号进行通知;当文件描述符关联的内核写缓冲区不满,有空间可以写入,就一直发出可写信号进行通知LT模式支持阻塞和非阻塞两种方式。
- 边缘触发(Edge-Triggered, ET):需要通过
EPOLLET
标志显式设置。只有当监控的文件描述符状态发生变化时(例如套接字接收缓冲区从空变为非空,即有新数据到达),才会通知一次。如果应用程序第一次没有读完所有数据,且没有新的数据到达,后续的epoll_wait
调用将不会再次通知该事件。因此,在 ET 模式下,必须使用非阻塞 I/O(non-blocking IO),并且循环读取或写入数据,直到系统调用返回EAGAIN
或EWOULDBLOCK
错误,以确保完全处理所有数据,避免事件丢失。ET 模式减少了重复通知,效率更高,但编程复杂性也增加。
3)高效性的原理
epoll 的高性能主要源于其内核实现机制
-
红黑树与就绪链表: 内核使用一颗红黑树来存储所有通过
epoll_ctl
注册的文件描述符,这使得增、删、改操作的效率很高(时间复杂度为 O(log n))。此外,内核维护一个就绪链表。当某个被监控的文件描述符上有事件发生时,内核会通过回调机制将其添加到就绪链表中。 -
避免无效遍历: 当调用
epoll_wait
时,内核只需检查就绪链表中是否有元素即可,无需像 select/poll 那样线性扫描所有文件描述符集合。这就使得epoll_wait
的效率仅与就绪的文件描述符数量相关,而与总共监控的文件描述符数量无关,这在连接数巨大但活跃连接较少的场景下优势极其明显。 -
❌
内存映射(mmap): epoll 使用了内存映射(mmap)技术,减少了内核态和用户态之间数据拷贝的开销。
这种说法是不准确的,我们定义的struct epoll_event是我们在用户空间中分配好的内存,势必还是需要将内核的数据拷贝到这个用户空间的内存中,用户进程并不能直接从内核中读取就绪链表。
4)select、poll与epoll对比
特性 | select | poll | epoll |
---|---|---|---|
最大连接数 | 有限制(通常 1024) | 无硬性限制 | 无硬性限制 |
工作效率 | O(n)。每次调用都需线性扫描所有fd,性能随fd数量增加线性下降 | O(n)。同select,性能随fd数量增加线性下降 | O(1)。仅通知就绪的fd,性能与活跃fd数相关,与总fd数无关 |
数据拷贝 | 每次调用都需将整个fd集从用户态拷贝到内核态 | 同select | 调用EPOLL_CTL_ADD将文件描述符拷贝到内核中,该操作相比于select和poll并不频繁;调用epoll_wait时还是需要将数据从内核拷贝至用户空间 |
触发模式 | 仅支持水平触发(LT) | 仅支持水平触发(LT) | 支持水平触发(LT)和边缘触发(ET) |
编程复杂度 | 低 | 中 | 相对较高(尤其是ET模式) |
5)实例代码(LT模式)
[shenalex@VM-8-16-centos 03-epoll]$ tree .
.
|-- epollServer.hpp
|-- err.hpp
|-- log.hpp
|-- main.cc
|-- makefile
`-- sock.hpp0 directories, 6 files
epollServer.hpp
#pragma once
#include <sys/types.h>
#include <sys/epoll.h>
#include <errno.h>
#include <functional>
#include <string>
#include "sock.hpp"
namespace epoll_ns
{static const uint16_t defaultPort = 8080;static const int defaultVal = -1;static const int defaultNum = 64;static const int defaultBufferSize = 1024;using func_t = std::function<std::string(const std::string &)>;class EpollServer{public:EpollServer(func_t func, const uint16_t &port = defaultPort, const int &num = defaultNum) : _func(func), _port(port), _num(num), _listensock(defaultVal){struct epoll_event s;}void init(){_listensock = Sock::Socket();Sock::Bind(_listensock, _port);Sock::Listen(_listensock);// 创建epoll实例// epoll默认是LT模式,如果要改为ET模式,则需要把套接字都设置为非阻塞模式(此处使用默认LT模式)_epfd = epoll_create(1); // linux 2.6.8之后, epoll_create传入的size已经被忽略了,但仍然需要传入一个大于0的数if (-1 == _epfd){logMessage(FATAL, "epoll_create failed, code:%d, code string:%s", errno, strerror(errno));exit(EPOLL_CREATE_ERR);}logMessage(NORMAL, "epoll_create success, _epfd = %d", _epfd);// 将监听套接字添加到epoll监控struct epoll_event ev;ev.data.fd = _listensock;ev.events = EPOLLIN;epoll_ctl(_epfd, EPOLL_CTL_ADD, _listensock, &ev);// 分配事件数组,用于epoll_wait返回就绪事件_revs = new struct epoll_event[_num];logMessage(NORMAL, "init server success");}void start(){int timeout = -1; // epoll_wait超时时间,-1表示永久阻塞for (;;){int n = epoll_wait(_epfd, _revs, _num, timeout);switch (n){case 0:// 超时,没有事件就绪logMessage(NORMAL, "[_epfd:%d]timeout", _epfd);break;case -1:// 出错logMessage(WARNING, "epoll_wait failed, code:%d, code string:%d", errno, strerror(errno));break;default:// 有n个事件就绪,处理这些事件logMessage(NORMAL, "have event ready");handler(n);break;}}}~EpollServer(){if (_listensock >= 0)close(_listensock);if (_epfd >= 0)close(_epfd);if (_revs)delete[] _revs;}private:/*** @brief 处理就绪事件* @param readyNum 就绪事件的数量*/void handler(int readyNum){// 遍历所有就绪事件for (int i = 0; i < readyNum; i++){uint32_t events = _revs[i].events; // 事件类型int sock = _revs[i].data.fd; // 事件关联的文件描述符if (sock == _listensock && (events & EPOLLIN)){std::string clientIp;uint16_t clientPort;// 接受新连接int fd = Sock::Accept(_listensock, &clientIp, &clientPort);if (fd < 0)continue;// 将新连接添加到epoll监控struct epoll_event ev;ev.data.fd = fd;ev.events = EPOLLIN;epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &ev); // 监控读事件(客户端发送数据)}else if (events & EPOLLIN) // 处理客户端socket的读事件(收到数据){// FIXME: 读取有问题,不能保证读到完整报文// 实际应用中需要考虑粘包/拆包问题,可能需要应用层协议char buffer[defaultBufferSize];ssize_t s = recv(sock, buffer, sizeof(buffer) - 1, 0);if (s > 0){buffer[s] = 0; // 添加字符串结束符logMessage(DEBUG, "client# %s", buffer);std::string response = _func(buffer);// FIXME: 发送同样也有问题,不能保证发送完整报文// 实际应用中需要循环发送直到全部数据发送完成send(sock, response.c_str(), response.size(), 0);}else if (s == 0){// 先从epoll移除, 才close(fd)// 删除时,系统调用并不关心传入的ev。在2.6.8内核之前必须传入一个struct epoll_event(即使不关心), 但2.6.8之后可以传入nullepoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr);close(sock);logMessage(NORMAL, "client quit");}else{epoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr);close(sock);logMessage(ERROR, "recv error, code:%d, code string:%s", errno, strerror(errno));}}else{// TODO: 其他情况,如写事件等// 可以处理EPOLLOUT等事件,用于非阻塞发送数据}}}private:int _listensock;uint16_t _port;int _epfd; // epoll实例文件描述符struct epoll_event *_revs; // 存储就绪事件的数组int _num; // 最大事件数量func_t _func;};
}
main.cc
#include <string>
#include <memory>
#include "epollServer.hpp"std::string handle(std::string s)
{s += "[server echo]";return s;
}int main(int argc, char *argv[])
{uint16_t port = 8080;if (argc == 2){port = atoi(argv[1]);}std::unique_ptr<epoll_ns::EpollServer> sv(new epoll_ns::EpollServer(handle, port));sv->init();sv->start();return 0;
}
err.hpp
#pragma onceenum
{USAGE_ERR = 1,SOCKET_ERR,BIND_ERR,LISTEN_ERR,EPOLL_CREATE_ERR
};
- log.hpp:略
- sock.hpp:略
- makefile:略
四、Reactor设计模式
基于 epoll 的 Reactor 模式是一种事件驱动的高性能网络编程模型,它通过 I/O 多路复用技术高效管理海量连接,将事件监听与业务处理解耦,是现代服务器应对高并发挑战的核心架构之一。
1、Reactor模式的核心思想
Reactor 模式的核心是事件循环(Event Loop)和回调机制:
- 注册事件:应用程序将需要监听的文件描述符(fd)及其对应的事件类型(如可读、可写)和回调函数注册到 Reactor。
- 事件监听:Reactor 通过底层的 I/O 多路复用器(如 epoll)同步等待这些事件的发生。
- 事件分发:一旦有事件就绪,多路复用器返回,Reactor 将事件分发给预先注册好的回调函数(Handler)进行处理。
这种设计实现了事件驱动,将“等待事件”和“处理事件”清晰分离,使得应用程序无需为每个连接创建独立线程,从而大幅降低系统资源消耗。
2、Reactor原理图
3、类图设计
核心类部分
线程池部分
工具类部分
4、时序图设计
5、示例代码(网络版计算器)
[shenalex@VM-8-16-centos 04-reactor]$ tree .
.
|-- Acceptor.cc
|-- Acceptor.h
|-- Client
| |-- client.cc
| |-- Connector.cc
| |-- Connector.h
| |-- InetAddress.cc
| |-- InetAddress.h
| |-- makefile
| |-- Protocol.hpp
| |-- Service.cc
| |-- Service.h
| |-- Socket.cc
| |-- Socket.h
| |-- SocketIO.cc
| |-- SocketIO.h
| |-- TcpClient.cc
| `-- TcpClient.h
|-- EventLoop.cc
|-- EventLoop.h
|-- InetAddress.cc
|-- InetAddress.h
|-- Log.hpp
|-- makefile
|-- server.cc
|-- Socket.cc
|-- Socket.h
|-- SocketIO.cc
|-- SocketIO.h
|-- TcpConnection.cc
|-- TcpConnection.h
|-- TcpServer.cc
|-- TcpServer.h
|-- ThreadPool.cc
|-- ThreadPool.h
|-- Utils.hpp
|-- WorkQueue.cc
`-- WorkQueue.h1 directory, 37 files
1) 网络接口封装
Socket
- Socket.h
#ifndef __SOCKET_H__
#define __SOCKET_H__class Socket
{
public:Socket();explicit Socket(int fd);~Socket();int fd() const;private:Socket(const Socket &) = delete;Socket &operator=(const Socket &) = delete;private:int _fd;
};#endif
- Socket.cc
#include "Socket.h"
#include <sys/socket.h>
#include <unistd.h>
#include <cstdio>Socket::Socket()
{_fd = ::socket(AF_INET, SOCK_STREAM, 0);if (-1 == _fd){perror("socket faild");return;}
}Socket::Socket(int fd) : _fd(fd)
{
}Socket::~Socket()
{close(_fd);
}int Socket::fd() const
{return _fd;
}
SocketIO
- SocketIO.h
#ifndef __SOCKETIO_H__
#define __SOCKETIO_H__
#include <vector>class Buffer;class SocketIO
{
public:/*** @brief Construct a new Socket I O object** @param fd 文件描述符*/explicit SocketIO(int fd);~SocketIO();/*** @brief 从内核缓冲区中固定读取n个字节数据** @param buffer 用户缓冲区* @param n 读取的字节数* @return int 实际读取的字节数*/int readn(char *buffer, int n);/*** @brief 从内核缓冲区读取一行数据,换行符为'\n'** @param buffer 用户缓冲区* @param n 最大读取的字节数* @return int 实际读取的字节数*/int readLine(char *buffer, int n);/*** @brief 向内核缓冲区中固定写入n个字节数据** @param buffer 用户缓冲区* @param n 写入字节数* @return int 实际读取的字节数*/int writen(const char *buffer, int n);/*** @brief 把内核缓冲区数据循环读取到用户缓冲区** @param pos 当前缓冲区可以从pos位置开始写入,[0, pos)已有数据* @param inBuffer 输入输出型参数* @return 成功读取的字节数, 返回-1意为出错*/int readFromKernel(Buffer &buffer);/*** @brief 把用户输出缓冲区数据循环发送到内核缓冲区** @param pos 当前缓冲区可以从pos位置开始读取,[0, pos)已有数据* @param outBuffer 输入输出型参数* @return int*/int writeToKernel(Buffer &buffer);private:int _fd;
};#endif
- SocketIO.cc
#include "SocketIO.h"
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include <cerrno>
#include <cstdio>
#include "Utils.hpp"
#include "Log.hpp"SocketIO::SocketIO(int fd) : _fd(fd)
{
}SocketIO::~SocketIO()
{// SocketIO只负责对某个文件描述符执行读写操作,但并不关心文件描述符关闭// 文件描述符的关闭,交给Socket类的析构函数,以免对同个文件描述符重复关闭// close(_fd);
}int SocketIO::readn(char *buffer, int n)
{if (!buffer || n <= 0){errno = EINVAL; // 无效参数perror("read failed(invalid input)");return -1;}char *pstr = buffer;int rest = n;while (rest > 0){ssize_t s = read(_fd, pstr, rest);if (-1 == s){if (errno == EINTR)continue; // 被信号意外中断,下一轮续读perror("read failed(read error)"); // 其他读取失败情况,直接返回-1return -1;}else if (0 == s) // 对端关闭,退出循环{break;}else if (s > 0){pstr += s;rest -= s;}}return n - rest; // 返回实际读取数,如果正常读完返回就是n
}int SocketIO::readLine(char *buffer, int n)
{if (!buffer || n <= 0){errno = EINVAL; // 无效参数perror("readLine failed(invalid input)");return -1;}char *pstr = buffer;int rest = n - 1; // 预留一个位置最后填充'\0'while (rest > 0){ssize_t s = recv(_fd, pstr, rest, MSG_PEEK); // 设置为MSG_PEEK后,意味着内核缓冲区的数据不会被清除,而只是被拷贝到用户区// 相当于recv我们并不跟之前一样拷贝到用户缓冲区后直接清除内核缓冲区的数据,而是先"拿出来"看一下,在后面再调用readn()拷贝与清除if (-1 == s){if (errno == EINTR)continue;perror("readLine failed(read error)");return -1;}else if (0 == s){break; // 对端关闭,直接退出循环}else if (s > 0){int i;for (i = 0; i < s; i++){if (pstr[i] == '\n'){int size = i + 1;readn(pstr, size); // 再调用一次readn(), 拷贝并清除内核缓冲区数据pstr += size;*pstr = '\0'; // c语言风格字符串结尾rest -= size;return n - rest; // 在这次读取中找到了换行符,也就是读到了一行,在这这里终结即可}}readn(pstr, s);pstr += s;rest -= s;}}// 此处应该是缓冲区不足或者s==0的情况,此处在末尾处添加\0即可*pstr = '\0';return n - rest;
}int SocketIO::writen(const char *buffer, int n)
{if (buffer == nullptr || n <= 0){errno = EINVAL;perror("write failed(invalid input)");return -1;}int rest = n;int pos = 0;while (rest > 0){ssize_t s = write(_fd, buffer + pos, rest);if (-1 == s){if (errno == EINTR)continue;perror("write failded(write error)");return -1;}else if (0 == s){break;}else if (s > 0){pos += s;rest -= s;}}return n - rest;
}int SocketIO::readFromKernel(Buffer &buffer)
{logMessage(DEBUG, "SocketIO::readFromKernel, oldLength = %d", buffer.length());int oldLength = buffer.length();int rest = buffer.remain();while (true){ssize_t s = ::read(_fd, buffer.space(), rest);if (-1 == s){if (errno == EINTR){continue;}else if (errno == EAGAIN || errno == EWOULDBLOCK) // 如果errno是EAGAIN或EWOULDBLOCK说明数据已经读完(前提是文件描述符fd已经设置为非阻塞状态!!!){logMessage(DEBUG, "SocketIO::readFromKernel/ while/ -1 == s/ errno == EAGAIN, EWOULDBLOCk");break;}else // 其它情况说明出错{perror("SocketIO::readFromKernel failed");return -1;}}else if (s > 0){logMessage(DEBUG, "SocketIO::readFromKernel/ while/ s > 0");rest -= s;buffer.next(s); // 读到数据立马修改缓冲区指向if (rest == 0){buffer.enlarge(); // 从内核缓冲区读到有效数据字节数s与当前缓冲区大小相同,有理由推断可能缓冲区不够,所以扩容rest = buffer.remain(); // 更新扩容后的rest}}// 不会有s == 0的情况,因为我们在EventLoop::handlerMessage已经做了isClosed判断,如果连接关闭,将被截止,不会进入本函数}logMessage(DEBUG, "out SocketIO::readFromKernel, return %d", buffer.length() - oldLength);logMessage(DEBUG, "current Buffer = [%s]", std::string(buffer.start(), buffer.length()).c_str());return buffer.length() - oldLength; // 返回读取字节数
}int SocketIO::writeToKernel(Buffer &buffer)
{int total = 0;while (true){ssize_t s = ::write(_fd, buffer.start(), buffer.length());if (-1 == s){if (errno == EINTR){continue;}else if (errno == EAGAIN || errno == EWOULDBLOCK){logMessage(DEBUG, "SocketIO::writeToKernel/ while/ -1 == s/ errno == EAGAIN, EWOULDBLOCk");break;}else{perror("SocketIO::writeToKernel failed");return -1;}}else if (s > 0){buffer.erase(s);total += s;// 缓冲区数据发送完毕,直接退出循环if (buffer.length() == 0)break;}else if (0 == s){// Tcp连接已经断开,不应该写入数据,未写入的不管了,返回-1return -1;}}return total;
}
InetAddress
- InetAddress.h
#ifndef __INETADDRESS_H__
#define __INETADDRESS_H__
#include <arpa/inet.h>
#include <string>/*** @brief 封装sockaddr_in,使得相关网络字节序转换过程对调用者透明**/
class InetAddress
{
public:// 提供string类型点分十进制ip地址和uint16_t的端口号即可构造一个对象explicit InetAddress(const std::string &ip, uint16_t port);explicit InetAddress(const struct sockaddr_in &addr);~InetAddress();// 点分十进制string类型ip地址std::string ip() const;// 16位无符号端口号uint16_t port() const;// 获取原始对象sockaddr_in类型的指针const struct sockaddr_in *getInetAddrPtr() const;private:struct sockaddr_in _addr;
};#endif
- InetAddress.cc
#include "InetAddress.h"
#include <cstring>InetAddress::InetAddress(const std::string &ip, uint16_t port)
{memset(&_addr, 0, sizeof(_addr));_addr.sin_family = AF_INET;_addr.sin_addr.s_addr = inet_addr(ip.c_str());_addr.sin_port = htons(port);
}InetAddress::InetAddress(const struct sockaddr_in &addr) : _addr(addr)
{
}InetAddress::~InetAddress()
{
}std::string InetAddress::ip() const
{return std::string(inet_ntoa(_addr.sin_addr));
}
uint16_t InetAddress::port() const
{return ntohs(_addr.sin_port);
}
const struct sockaddr_in *InetAddress::getInetAddrPtr() const
{return &_addr;
}
2) 工具与协议
Utils.hpp
#ifndef __UTILS_HPP__
#define __UTILS_HPP__
/*** @file Utils.hpp* @brief 有四个类。1.协议方法类,两个静态方法用于构建和拆分协议报文; 2.请求报文类; 3.响应报文类; 4.缓冲区类*/#include <cstring>
#include <string>
#include <sstream>
#include "Log.hpp"// 需要执行sudo yum -y install jsoncpp-devel安装json序列化反序列化方式
// 如果上述命令执行后显示没有"No package available"意味着软件包不在官方仓库中,
// 可以先安装EPEL仓库(内含很多官方仓库没有的软件包), 再安装, 执行: sudo yum -y install epel-release
#include <jsoncpp/json/json.h>#define SEP " "
#define SEP_LEN strlen(SEP) // 不用sizeof, 它会加上末尾\0
#define LINE_SEP "\r\n"
#define LINE_SEP_LEN strlen(LINE_SEP)/*** exitcode枚举*/
enum
{OK = 0,DIV_ZERO,MOD_ZERO
};/*** @brief 缓冲区类,作为TcpConnection连接的读入用户级缓冲区和写出用户级缓冲区* 优化了直接使用vector<char>直接使用erase效率低下问题, 优化了erase时机与策略* 不过由于其底层使用的仍然是vector,有的时候仍需要使用erase擦除,效率较低(使用双端队列+内存块将是最好的方法,此处暂不使用)*/
class Buffer
{
public:Buffer(int initSize = 1024) : _M_buffer(initSize, 0), _M_start(0), _M_continue(0){}~Buffer(){}/*** @brief 返回缓冲区可用区域(可以接着填充)的起始位置* @return 返回char*类型, buffer可用区域起始指针*/char *space(){return _M_buffer.data() + _M_continue;}/*** @brief 返回缓冲区有效区域的起始位置* @return 返回char*类型, buffer有效区域起始指针*/char *start(){return _M_buffer.data() + _M_start;}/*** @brief 缓冲区有效区域第i个元素*/char &operator[](size_t i){return _M_buffer[_M_start + i];}/*** @brief 调整可用区域起始位置, 将下一次写入缓冲区时的起始位置后移动num位** @return true 修改成功* @return false 修改失败*/bool next(size_t num){logMessage(DEBUG, "in Utils::Buffer::next, num = %d", num);if (_M_continue + num >= _capacity())return false;_M_continue += num;return true;}/*** @brief 缓冲区有效区域大小*/size_t length() const{return _M_continue - _M_start;}/*** @brief 缓冲区可用区域大小*/size_t remain() const{return _M_buffer.size() - _M_continue;}/*** @brief** @param size 擦除用户视角下缓冲区[0, size - 1]的缓冲区数据, 也就是相对于_start的有效区域的大小* @return 擦除成功返回true* @return 擦除失败返回false*/bool erase(size_t size){if (size > length())return false;_M_start += size; // 更改有效区域起始位置,前面size字节已经被擦除// 关键:如果有效区域起始位置与可用区域起始位置相同,说明有效区域为空,前面都是无效区域,直接从头开始if (_M_start == _M_continue){_M_start = _M_continue = 0;}// 我们设计erase结束时整理底层vector, 如果可用区域小于总容量的1/10时,就重整一下底层_bufferif (remain() <= (_capacity() / 10))_reorganize();return true;}/*** @brief 扩充缓冲区可用区域大小*/void enlarge(){size_t oldRemainSize = remain();// 先调用重整函数_reorganize();// 经过重整后可用区域已经是原来的2倍且大于有效区域大小,就不管了if (remain() >= 2 * oldRemainSize && remain() > length())return;// 如果不满足我们假定的要求,就要调用vector.resize()了// 前面经过重整,可以保证有效缓冲区已经是[0, length() - 1]_M_buffer.resize(_capacity() * 2, 0); // 我们选择直接把底层vector扩大两倍}private:void _reorganize(){// 如果缓冲区已经没有可用区域了,此时没办法了,只能调用vector的erase函数整理底层_bufferif (_M_start > 0){_M_buffer.erase(_M_buffer.begin(), _M_buffer.begin() + _M_start);_M_continue -= _M_start;_M_start = 0;}}/*** @brief 缓冲区底层结构容量大小*/size_t _capacity() const{return _M_buffer.size();}private:std::vector<char> _M_buffer;// 通过_start和_continue划分为3个区域:// [0, _start): 已经被erase擦除的无效区域// [_start, _continue): 当前缓冲区有效区域// [_continue, _buffer.size() - 1]: 缓冲区可用区域(未使用过区域)size_t _M_start; // 有效区域开始的位置size_t _M_continue; // 缓冲区可用区域开始的位置
};/*** 协议方法类*/
// 我们规定我们的协议是这样:
// "x op y" -> "content_len"\r\n"x op y"\r\n
class Protocol final
{
public:// 增加报文static void enLength(const std::string &text, std::string *out){std::string send_string = std::to_string(text.size());send_string += LINE_SEP;send_string += text;send_string += LINE_SEP;*out = send_string;}// 去掉报文static bool deLength(const std::string &package, std::string *text){auto pos = package.find(LINE_SEP);if (pos == std::string::npos)return false;std::string text_len_string = package.substr(0, pos);int text_len = std::stoi(text_len_string);*text = package.substr(pos + LINE_SEP_LEN, text_len);return true;}static bool ParseOnePackage(Buffer &buffer, std::string *out){std::string tmp(buffer.start(), buffer.length());logMessage(DEBUG, "in ParseOnePackage, tmp = [%s]", tmp.c_str());auto it = tmp.find(LINE_SEP); // 借助string的find函数寻找行分界符号if (it == std::string::npos){logMessage(DEBUG, "out ParseOnePackage Fatal");return false;}else{// 找到了LINE_SEP就认为至少有一个报文int count_len = it; // 找到的it就是LINE_SEP位置,也就是count_lenint text_len = std::stoi(tmp.substr(0, count_len)); // 直接转为数字int total_len = count_len + LINE_SEP_LEN + text_len + LINE_SEP_LEN; // 计算好完整报文大小// 在用户缓冲区擦除数据buffer.erase(total_len);logMessage(DEBUG, "ing ParseOnePackage, after erase, buffer = [%s]", std::string(buffer.start(), buffer.length()).c_str());// *out输出一个完整报文*out = tmp.substr(0, total_len);logMessage(DEBUG, "out ParseOnePackage Success, *out = [%s]", out->c_str());return true;}}private:Protocol() = delete;~Protocol() = delete;
};class Request
{
public:Request() : _x(0), _y(0), _op(0){}Request(int x, int y, char op) : _x(x), _y(y), _op(op){}public:int _x;int _y;char _op;
};// "exitcode result" -> "content_len"\r\n"exitcode result"\r\n
class Response
{
public:Response(int exitcode, int result) : _exitcode(exitcode), _result(result){}Response() : _exitcode(0), _result(0){}public:int _exitcode; // 0表示计算成功,非0表示计算失败int _result; // 存储计算结果
};/*** 序列化方法类: Request和Response序列化*/
class Serializable final
{
public:static void serialize(const Request &request, std::string *out){Json::Value root;root["first"] = request._x;root["second"] = request._y;root["operator"] = request._op;Json::StreamWriterBuilder writerBuilder;writerBuilder["indentation"] = ""; // 关键设置:禁用缩进(同时也禁用了末尾换行)*out = Json::writeString(writerBuilder, root);}static void serialize(const Response &response, std::string *out){Json::Value root;root["exitcode"] = response._exitcode;root["result"] = response._result;Json::StreamWriterBuilder writerBuilder;writerBuilder["indentation"] = "";*out = Json::writeString(writerBuilder, root);}static void deserialize(Request *request, const std::string &in){Json::Value root;Json::CharReaderBuilder readerBuilder;std::istringstream iss(in); // 将string转换为流Json::parseFromStream(readerBuilder, iss, &root, nullptr);request->_x = root["first"].asInt();request->_y = root["second"].asInt();request->_op = root["operator"].asInt();}static void deserialize(Response *response, const std::string &in){Json::Value root;Json::CharReaderBuilder readerBuilder;std::istringstream iss(in);Json::parseFromStream(readerBuilder, iss, &root, nullptr);response->_exitcode = root["exitcode"].asInt();response->_result = root["result"].asInt();}private:Serializable() = delete;~Serializable() = delete;
};#endif
Log.hpp
#ifndef __LOG_HPP__
#define __LOG_HPP__#include <cstdio>
#include <ctime>
#include <cstdarg>
#include <iostream>
#include <iomanip>
#include <unistd.h>#ifndef __DEBUG__
#define LOG_NORMAL "log.txt"
#define LOG_ERR "log.error"
#endif#define DEBUG 0
#define NORMAL 1
#define WARNING 2
#define ERROR 3
#define FATAL 4inline const char *to_levelstr(int level)
{switch (level){case DEBUG:return "DEBUG";case NORMAL:return "NORMAL";case WARNING:return "WARNING";case ERROR:return "ERROR";case FATAL:return "FATAL";default:return nullptr;}
}inline void logMessage(int level, const char *format, ...) // 补充:形参实例化,压栈顺序从右到左
{
#define NUM 1024char logprefix[NUM];time_t rawtime;struct tm *timeinfo;char time_str[80];time(&rawtime);timeinfo = localtime(&rawtime);strftime(time_str, sizeof(time_str), "%Y-%m-%d %H:%M:%S", timeinfo);snprintf(logprefix, sizeof(logprefix), "[%s][%s][pid:%d]", to_levelstr(level), time_str, getpid());char logcontent[NUM];va_list arg;va_start(arg, format);vsnprintf(logcontent, sizeof(logcontent), format, arg);#ifndef __DEBUG__FILE *log = fopen(LOG_NORMAL, "a");FILE *err = fopen(LOG_ERR, "a");
#elseFILE *log = stdout;FILE *err = stderr;
#endifif (log != nullptr && err != nullptr){FILE *cur = nullptr;if (level == DEBUG)
#ifdef __DEBUG__cur = log;
#elsecur = nullptr;
#endifif (level == NORMAL || level == WARNING)cur = log;if (level == ERROR || level == FATAL)cur = err;if (cur)fprintf(cur, "%s %s\n", logprefix, logcontent);#ifndef __DEBUG__fclose(log);fclose(err);
#endif}
}#endif
3) 线程池
WorkQueue
- WorkQueue.h
#ifndef __WORKQUEUE_H__
#define __WORKQUEUE_H__#include <queue>
#include <functional>
#include <mutex>
#include <condition_variable>/*** 线程池的任务队列*/
using Runnable = std::function<void(void)>; // 任务
class WorkQueue
{
public:/*** @brief Construct a new Work Queue object** @param size 任务队列大小*/explicit WorkQueue(size_t size);~WorkQueue();// 从任务队列中取出一个任务Runnable get();// 放入一个任务到任务队列中void put(Runnable &&rb);// 任务队列是否为空bool empty() const;// 任务队列是否已满bool full() const;// 用于线程池退出时唤醒所有在条件变量等待的子线程void wakeupAll();private:size_t _size; // 任务队列大小std::queue<Runnable> _queue; // 任务队列std::mutex _mtx; // 互斥锁std::condition_variable _space; // 空余空间同步变量std::condition_variable _data; // 有效数据同步变量bool _toWakeFlag; // 唤醒非空条件变量(_data)上等待的线程所用标志位,详见【code:4】
};#endif
- WorkQueue.cc
#include "WorkQueue.h"WorkQueue::WorkQueue(size_t size) : _size(size), _toWakeFlag(false)
{
}WorkQueue::~WorkQueue()
{
}Runnable WorkQueue::get()
{std::unique_lock<std::mutex> ul(_mtx);while (empty() && !_toWakeFlag) // 【code:4】线程池退出时唤醒所有在条件变量等待的线程{_data.wait(ul);}if (_toWakeFlag) // 走到此处说明线程池要退出了,且任务队列也没有任务了,直接返回nullreturn nullptr;Runnable rb = std::move(_queue.front()); // 移动构造, function支持移动构造_queue.pop();_space.notify_one(); // 同理(见【code:1】),此处可以不解锁,跳出get后会自动解锁,而这条语句解锁前解锁后都可return rb;
}void WorkQueue::put(Runnable &&rb)
{std::unique_lock<std::mutex> ul(_mtx);while (full()){_space.wait(ul);}_queue.push(std::move(rb));ul.unlock(); // 【code:1】此处也可以不解锁,这样过了ul的作用域之后会自动解锁_data.notify_one(); // 该语句可以在解锁后也可以在解锁前
}bool WorkQueue::empty() const
{return _queue.empty();
}bool WorkQueue::full() const
{return _queue.size() == _size;
}void WorkQueue::wakeupAll()
{_toWakeFlag = true;_data.notify_all();
}
ThreadPool
- ThreadPool.h
#ifndef __THREADPOOL_H__
#define __THREADPOOL_H__#include <vector>
#include <thread>
#include "WorkQueue.h"/*** 基于C++11线程库实现的线程池*/
class ThreadPool
{
public:/*** @brief Construct a new Thread Pool object** @param corePoolSize 核心线程数* @param queueSize 任务队列容量*/explicit ThreadPool(size_t corePoolSize, size_t queueSize);~ThreadPool();// 线程池的启动void start();// 关闭线程池(等待任务执行完毕后)void shutdown();// 外部调用传参交给线程池一个任务void excute(Runnable &&rb);private:// 线程执行任务void doing();private:size_t _corePoolSize; // 线程数std::vector<std::thread> _threads;WorkQueue _workQueue; // 任务队列bool _isRunning; // 标记:线程池是否正在运行
};#endif
- ThreadPool.cc
#include "ThreadPool.h"ThreadPool::ThreadPool(size_t corePoolSize, size_t queueSize): _corePoolSize(corePoolSize), _workQueue(queueSize), _isRunning(false)
{
}ThreadPool::~ThreadPool()
{
}void ThreadPool::start()
{_isRunning = true; // 设置标志位,线程池正在执行// 线程启动后就马上创建一批线程for (size_t i = 0; i < _corePoolSize; i++){_threads.push_back(std::thread(&ThreadPool::doing, this));}
}void ThreadPool::shutdown()
{// 【注意1】需要保证任务全部执行完毕,线程池才能退出,否则应该卡在此处while (!_workQueue.empty()) // 【code:2】{// 防止CPU空转,判断完任务队列还有任务后自己休眠1秒钟,让出CPU使用权std::this_thread::sleep_for(std::chrono::seconds(1));}// FIXME:即使任务队列为空,并不意味着任务全部执行,因为有的任务执行时间可能比较久// 任务全部执行完才将标志设置为false, 然后等待线程_isRunning = false;_workQueue.wakeupAll(); // join等待子线程前先唤醒所有意外等待在条件变量的子线程,详见【code:3】for (size_t i = 0; i < _corePoolSize; i++){_threads[i].join();}
}void ThreadPool::excute(Runnable &&rb)
{if (rb != nullptr && _isRunning) // cb不为空且线程池正在运行才能添加任务到任务队列中_workQueue.put(std::move(rb));
}void ThreadPool::doing()
{// 只要线程池在运行,就不断地从中取任务执行任务while (_isRunning) // 【code:3】{// 【注意2】线程池退出时,等待所有任务执行完毕后,while退出循环条件---任务队列为空满足(见【code:2】),// 将退出while循环,并将_isRunning设置为false// 不过!!!可能线程池主线程来不及将_isRunning设置为false, 子线程就已经进入本循环(见【code:3】),// 这样子进程将被阻塞在下面的_workQueue.get(),导致主线程不能join等待线程成功,使得线程池不能退出// 【结论】线程池调用shutdown退出时,在设置了标志位_isRunning为false,等待子线程之前,应该先唤醒所有正在等待子线程Runnable rb = _workQueue.get(); // 从任务队列中取出一个任务if (rb)rb(); // 如果任务不为空,就执行该任务}
}
4) 核心模块
*解决一个问题:线程间通信
系统调用eventfd用于进程或者线程间通信(如通知/等待机制的实现)。
#include <sys/eventfd.h>
int eventfd(unsigned int initval, int flags);
参数:
- initval: 初始化计数器值,该值保存在内核。
- flags: 如果是2.6.26或之前版本的内核,flags 必须设置为0。
- flags支持以下标志位:
- EFD_NONBLOCK 类似于使用O_NONBLOCK标志设置文件描述符。
- EFD_CLOEXEC 类似open以O_CLOEXEC标志打开, O_CLOEXEC 应该表示执行exec()时,之前通过open()打开的文件描述符会自动关闭。
**返回值:**函数返回一个文件描述符,与打开的其他文件一样,可以进行读写操作。
使用:
eventfd系统调用返回的是文件描述符,该文件描述符与其他文件描述符一样,可以读、写、监听。
- 调用read函数:如果计数器A的值不为0时,读取成功,获得到该值;如果A的值为0,非阻塞模式时,会直接返回失败,并把error置为EINVAL;如果为阻塞模式,一直会阻塞到A为非0为止。
- 调用write函数:将缓冲区写入的8字节整形值加到内核计数器上,即会增加8字节的整数在计数器A上,如果其值达到
0xfffffffffffffffe
时,就会阻塞(在阻塞模式下),直到A的值被read。 - 调用select/poll/epoll:支持被IO多路复用监听。当内核计数器的值发生变化时,就会触发事件。通过对eventfd函数返回的文件描述符进行通信。一个进程或者线程A执行read操作,如果内核计数器的值为0,并且是阻塞模式,那么A就会阻塞;另外一个进程或者线程B执行write操作,就会向内核计数器写,那么阻塞的A发现内核计数器的值不为0,就会被触发,那么两个进程或者线程A与B就达到通信的目的了。
Accetor
- Acceptor.h
#ifndef __ACCEPTOR_H__
#define __ACCEPTOR_H__#include <string>
#include "Socket.h"
#include "InetAddress.h"/*** @brief 接收器,创建监听套接字,监听新的Tcp连接***/
class Acceptor
{static const int gbacklog = 16; // 半连接队列和全连接队列的长度之和-1public:Acceptor(uint16_t port, const std::string &ip);~Acceptor();// 初始化监听套接字, 包括sock, bind, listen系统调用void ready();// 接收新连接int accept();// 获取监听文件描述符int fd() const;private:// 绑定端口和ipvoid bind();// 设置为监听状态void listen();// ip地址复用void setIpReuse();// 端口复用void setPortReuse();private:Socket _sock;InetAddress _inetaddr;
};#endif
- Acceptor.cc
#include "Acceptor.h"
#include "Log.hpp"Acceptor::Acceptor(uint16_t port, const std::string &ip) : _sock(), _inetaddr(ip, port)
{
}Acceptor::~Acceptor()
{
}void Acceptor::ready()
{setIpReuse();setPortReuse();bind();listen();
}int Acceptor::accept()
{logMessage(DEBUG, "in accept");struct sockaddr_in peer;socklen_t len;int connfd = ::accept(_sock.fd(), reinterpret_cast<sockaddr *>(&peer), &len);logMessage(DEBUG, "ing accept, connfd = %d", connfd);if (-1 == connfd){// ET模式下返回-1可能是正常情况,具体要看errno的结果,交给调用accept的EventLoop::handlerConnction判断// perror("accept failed");return -1;}return connfd;
}int Acceptor::fd() const
{return _sock.fd();
}void Acceptor::bind()
{int ret = ::bind(_sock.fd(), reinterpret_cast<const struct sockaddr *>(_inetaddr.getInetAddrPtr()), sizeof(struct sockaddr_in));if (-1 == ret){perror("bind failed");return;}
}void Acceptor::listen()
{int ret = ::listen(_sock.fd(), gbacklog);if (-1 == ret){perror("listen failed");return;}
}void Acceptor::setIpReuse()
{int opt = 1; // 1表示希望下面setsockopt有效,如果是0则是无效int ret = setsockopt(_sock.fd(), SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));if (-1 == ret){perror("setsockopt failed");return;}
}void Acceptor::setPortReuse()
{int opt = 1;int ret = setsockopt(_sock.fd(), SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt));if (-1 == ret){perror("setsockopt failed");return;}
}
TcpConnection
- TcpConnection.h
#ifndef __TCPCONNECTION_H__
#define __TCPCONNECTION_H__
#include <memory>
#include "InetAddress.h"
#include "Socket.h"
#include "SocketIO.h"
#include "Utils.hpp"class EventLoop; // 前向声明// TcpConnection类继承自std::enable_shared_from_this<TcpConnection>模板类
// 【作用】让TcpConnection类的对象能够在自身成员函数中,安全地获取指向自己的std::shared_ptr智能指针,而不会导致智能指针引用计数错误或内存管理问题。
// 具体来说,当一个类继承自std::enable_shared_from_this<自身类型>后,该类的对象在被std::shared_ptr管理时,
// 就可以通过成员函数shared_from_this()来获取一个指向自身的std::shared_ptr实例。
class TcpConnection : public std::enable_shared_from_this<TcpConnection>
{using TcpConnectionPtr = std::shared_ptr<TcpConnection>; // std::shared_ptr指针管理Tcp连接指针对象using TcpConnectionCallback = std::function<void(const TcpConnectionPtr &)>; // 三个事件回调函数using Functor = std::function<void(void)>; // 执行的任务public:/*** @brief Construct a new Tcp Connection object** @param fd 对应Tcp连接的文件描述符* @param loop Tcp连接所属EventLoop对象指针*/explicit TcpConnection(int fd, EventLoop *loop);~TcpConnection();// 将Tcp连接对象TcpConnection的发送缓冲区_outBuffer数据发送给客户端void send();// 接收客户端发送的数据到Tcp连接对象TcpConnection的接收缓冲区_inBuffervoid receive();// 用于判断是否断开bool isClosed() const;// 获取本Tcp连接的服务器ip地址和端口号InetAddress getLocalAddr();// 获取本Tcp连接的客户端ip地址和端口号InetAddress getPeerAddr();// 自己实现的Tcp连接转为可视化字符串的方法std::string toString();// 将需要执行的任务发送给Tcp连接所属EventLoop, 借助EventLoop发送任务给线程池执行void sendInput(Functor &&fc);// 将任务执行结果发送给Tcp连接所属EventLoop, 借助EventLoop发送任务结果给客户端void sendOutput(const std::string &msg);// 由EventLoop给每个Tcp连接注册三个回调函数, TcpConnection对象本身是临时对象,断开连接后将被销毁,// 因而不能跟EventLoop一样使用移动语义抢夺回调函数(std::funcion对象)资源,因此此处使用const TcpConnectionCallback&void setOnConnection(const TcpConnectionCallback &cb);void setOnMessage(const TcpConnectionCallback &cb);void setOnClose(const TcpConnectionCallback &cb);// 三个回调函数执行void handleOnConnecionCallback();void handleOnMessageCallback();void handleOnCloseCallback();public:// Tcp连接的用户一级缓冲区,存放直接从内核缓冲区拷贝到用户缓冲区的数据// 使用我们自己构建的Buffer类, Buffer类底层用的是char类型,目前网络发送的数据都是字符串, 没问题,// 如果是图片音频可能要将Buffer底层改用unsigned charBuffer _inBuffer; // 读取缓冲区(接收客户端发送的数据)Buffer _outBuffer; // 写出缓冲区(发送数据给客户端)private:Socket _sock; // 该Tcp连接对应的文件描述符(服务器端), 用于isClosed函数检测连接是否已断开SocketIO _socketIO;// 类初始化顺序与初始化列表顺序无关,而是与类中声明顺序有关,// 由于_localaddr与_peeraddr初始化需要_sock.fd(),因而必须在其后声明InetAddress _localaddr;InetAddress _peeraddr;TcpConnectionCallback _onConnection; // 连接建立TcpConnectionCallback _onMessage; // 消息到来TcpConnectionCallback _onClose; // 连接断开EventLoop *_loop; // 每个Tcp连接都要将收到的信息发送给eventLoop处理,因而需要与EventLoop关联(使用引用指针均可)
};#endif
- TcpConnection.cc
#include "TcpConnection.h"
#include <sstream>
#include <cstring>
#include "EventLoop.h"
#include "Log.hpp"TcpConnection::TcpConnection(int fd, EventLoop *loop): _socketIO(fd),_sock(fd),_localaddr(getLocalAddr()),_peeraddr(getPeerAddr()),_loop(loop)
{
}TcpConnection::~TcpConnection()
{
}void TcpConnection::send()
{logMessage(DEBUG, "in TcpConnection::send, connfd = %d", _sock.fd());if (-1 == _socketIO.writeToKernel(_outBuffer)){logMessage(ERROR, "TcpConnecion::send failed, connfd = %d", _sock.fd());return; // 无论只是写入出错还是对端关闭,通通取消写入数据}if (_outBuffer.length() > 0) // 前面循环拷贝数据到内核后发现缓冲区还有数据没发送完{// 将其添加到epoll监听写事件_loop->pushWriteEvent(_sock.fd());}else if (_outBuffer.length() == 0){// 取消对该文件描述符写事件的关心_loop->popWriteEvent(_sock.fd());}logMessage(DEBUG, "out TcpConnection::send");
}void TcpConnection::receive()
{logMessage(DEBUG, "in TcpConnection::receive, connfd = %d", _sock.fd());if (-1 == _socketIO.readFromKernel(_inBuffer))logMessage(ERROR, "TcpConnection::receive failed, connfd = %d", _sock.fd());logMessage(DEBUG, "out TcpConnection::receive");
}bool TcpConnection::isClosed() const
{// 如果resv返回值为0,说明已经断开char buffer[4] = {0};return (0 == ::recv(_sock.fd(), buffer, sizeof(buffer), MSG_PEEK)); // 设置为MSG_PEEK,只是尝试能不能读到数据,不需要清除内核缓冲区数据
}std::string TcpConnection::toString()
{std::ostringstream oss;oss << "[TcpConnection]"<< _localaddr.ip() << ":" << _localaddr.port()<< "--->" << _peeraddr.ip() << ":" << _peeraddr.port();return oss.str();
}void TcpConnection::sendInput(Functor &&fc)
{logMessage(DEBUG, "in sendInput, adding task to thread pool");if (_loop)_loop->runInPool(std::move(fc)); // 发给线程池执行任务
}void TcpConnection::sendOutput(const std::string &msg)
{logMessage(DEBUG, "in sendOutput, msg: [%s]", msg.c_str());if (!_loop)return;if (_outBuffer.remain() >= msg.size()) // 先判断输出缓冲区是否足够大_outBuffer.enlarge(); // 不够大先扩容// 把string类型的msg拷贝到输出缓冲区中msg.copy(_outBuffer.space(), msg.size());_outBuffer.next(msg.size()); // buffer可用区域起始位置后移_loop->runInLoop(std::bind(&TcpConnection::send, shared_from_this())); // 绑定将Tcp连接缓冲区发送任务给EventLoop处理logMessage(DEBUG, "out sendOutput, task added to event loop");
}InetAddress TcpConnection::getLocalAddr()
{struct sockaddr_in local;socklen_t len = sizeof(struct sockaddr_in);memset(&local, 0, len);int ret = getsockname(_sock.fd(), reinterpret_cast<sockaddr *>(&local), &len); // 获取本端地址端口号等信息if (ret == -1){perror("getsockname failed");}return InetAddress(local);
}
InetAddress TcpConnection::getPeerAddr()
{struct sockaddr_in peer;socklen_t len = sizeof(struct sockaddr_in);memset(&peer, 0, len);int ret = getpeername(_sock.fd(), reinterpret_cast<sockaddr *>(&peer), &len); // 获取对端地址端口号等信息if (ret == -1){perror("getpeername failed");}return InetAddress(peer);
}void TcpConnection::setOnConnection(const TcpConnectionCallback &cb)
{_onConnection = cb;
}
void TcpConnection::setOnMessage(const TcpConnectionCallback &cb)
{_onMessage = cb;
}
void TcpConnection::setOnClose(const TcpConnectionCallback &cb)
{_onClose = cb;
}void TcpConnection::handleOnConnecionCallback()
{if (_onConnection)_onConnection(shared_from_this());
}
void TcpConnection::handleOnMessageCallback()
{if (_onMessage)_onMessage(shared_from_this());
}
void TcpConnection::handleOnCloseCallback()
{if (_onClose)_onClose(shared_from_this());
}
EventLoop
- EventLoop.h
#ifndef __EVENTLOOP_H__
#define __EVENTLOOP_H__#include <memory>
#include <functional>
#include <vector>
#include <map>
#include <mutex>// 前向声明
class ThreadPool;
class Acceptor;
class TcpConnection;/*** @brief 封装使用ET模式的epoll系统调用**/
class EventLoop
{static const int timeout = 3000; // epoll超时时间参数static const int eventNum = 10; // epoll监听事件数最大值using TcpConnectionPtr = std::shared_ptr<TcpConnection>; // std::shared_ptr指针管理Tcp连接指针对象using TcpConnectionCallback = std::function<void(const TcpConnectionPtr &)>; // 三个事件回调函数using Functor = std::function<void(void)>; // 任务, 包括交给线程池执行计算的任务以及TcpConnection交给EventLoop发送结果给客户端的任务public:/*** @brief Construct a new Event Loop object** @param acceptor 接收器* @param pool 线程池*/explicit EventLoop(Acceptor &acceptor, ThreadPool &pool);~EventLoop();// 开启事件监听循环void loop();// 结束循环void unloop();// 注册三个回调函数,用于传给每一个TcpConnection连接// 使用移动语义,EventLoop抢夺回调函数资源// 因为TcpConnectionCallback本质是std::function(可调用对象的容器),本身可能占用内存,所以资源可以被移动,而裸指针本身则不占用动态空间void setOnConnection(TcpConnectionCallback &&cb);void setOnMessage(TcpConnectionCallback &&cb);void setOnClose(TcpConnectionCallback &&cb);// 把业务逻辑任务交给线程池执行void runInPool(Functor &&fc);// 把线程池处理的结果, 需要发送给客户端的任务交由EventLoop执行void runInLoop(Functor &&fc);// 对文件描述符追加写事件监听,此调用会添加(如果本身没有关心读事件)epoll对读事件的监听void pushWriteEvent(int fd);// 对文件描述符取消写事件关心,此调用会添加(如果本身没有关心读事件)epoll对读事件的监听void popWriteEvent(int fd);private:// 把文件描述符设置为非阻塞状态bool setNonBlock(int fd);// 创建epoll文件描述符int createEpfd();// 把文件描述符fd读事件添加到epoll红黑树中,开始监听读事件void addEpReadfd(int fd);// 把文件描述符fd所有事件从epoll红黑树移除,取消监听void delEpEventfd(int fd);// 等待事件就绪void waitEpfd();// 处理收到客户端发来的数据void handlerMessage(int fd);// 处理接收新连接管理void handlerConnection();// 处理剩余未发送的任务的发送void handlerRemainSend(int fd);// 创建eventfdint createEvfd();// 封装eventFd文件描述符的读事件和写事件// EventLoop的主线程执行_pendings集合里的所有任务时void readEvfd();// 线程添加结果任务到_pendings集合时void writeEvfd();// 执行_pendings的任务void doPendings();private:int _epfd; // epoll fd 用于监听事件,实现IO多路复用bool _isLooping;ThreadPool &_pool;Acceptor &_acceptor;std::vector<struct epoll_event> _events; // 就绪事件数组std::map<int, TcpConnectionPtr> _connections; // Tcp连接池TcpConnectionCallback _onConnection; // 连接建立TcpConnectionCallback _onMessage; // 消息到来TcpConnectionCallback _onClose; // 连接断开int _evfd; // 特殊的文件描述符,用于线程间通信std::vector<Functor> _pendings; // 存放EventLoop需要执行的发送结果给客户端的任务std::mutex _mutex; // 互斥锁,用于_pendings的互斥访问
};#endif
- EventLoop.cc
#include "EventLoop.h"
#include <unistd.h>
#include <sys/epoll.h>
#include <sys/eventfd.h>
#include <fcntl.h>
#include "Acceptor.h"
#include "ThreadPool.h"
#include "TcpConnection.h"
#include "Log.hpp"EventLoop::EventLoop(Acceptor &acceptor, ThreadPool &pool): _epfd(createEpfd()), _events(eventNum), _isLooping(true), _acceptor(acceptor), _pool(pool), _evfd(createEvfd())
{// ET模式必须先把文件描述符设置为非阻塞模式setNonBlock(_acceptor.fd());setNonBlock(_evfd);addEpReadfd(_acceptor.fd());addEpReadfd(_evfd); // 用于线程通信的文件描述符也加入epoll红黑树
}
EventLoop::~EventLoop()
{close(_epfd);close(_evfd);
}bool EventLoop::setNonBlock(int fd)
{int fl = fcntl(fd, F_GETFL);if (fl < 0)return false;fcntl(fd, F_SETFL, fl | O_NONBLOCK);return true;
}int EventLoop::createEpfd()
{_epfd = ::epoll_create(1);if (-1 == _epfd){perror("epoll_create failed");return -1;}return _epfd;
}void EventLoop::addEpReadfd(int fd)
{logMessage(DEBUG, "in addEPReadfd, fd = %d", fd);struct epoll_event ev;ev.data.fd = fd;ev.events = EPOLLIN | EPOLLET;// 将文件描述符fd加入epoll内核红黑树数据结构监听int ret = ::epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &ev);if (-1 == ret){perror("epoll_ctl failded(EPOLL_CTL_ADD:EPOLLIN)");return;}logMessage(DEBUG, "out addEPReadfd, fd = %d", fd);
}void EventLoop::pushWriteEvent(int fd)
{logMessage(DEBUG, "in EventLoop::pushWriteEvent, fd = %d", fd);struct epoll_event ev;ev.data.fd = fd;ev.events = EPOLLIN | EPOLLOUT | EPOLLET; // 关心写事件一般还是会关心读事件if (-1 == ::epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &ev)){perror("epoll_ctl failed(EPOLL_CTL_MOD:EPOLLOUT)");return;}logMessage(DEBUG, "out EventLoop::pushWriteEvent, fd = %d", fd);
}void EventLoop::popWriteEvent(int fd)
{logMessage(DEBUG, "in EventLoop::popWriteEvent, fd = %d", fd);struct epoll_event ev;ev.data.fd = fd;ev.events = EPOLLIN | EPOLLET; // 取消写事件关心if (-1 == ::epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &ev)){perror("epoll_ctl failed(EPOLL_CTL_MOD:EPOLLOUT)");return;}logMessage(DEBUG, "out EventLoop::popWriteEvent, fd = %d", fd);
}void EventLoop::delEpEventfd(int fd)
{// 将文件描述符fd从epoll内核红黑树中移除,取消监听int ret = ::epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, nullptr); // Linux 2.6.8后,删除时可以传入nullptrif (-1 == ret){perror("epoll_ctl failed(EPOLLOUT)");return;}
}void EventLoop::waitEpfd()
{int nready = 0; // 就绪事件个数do{nready = ::epoll_wait(_epfd, &_events[0], _events.size(), timeout);} while (-1 == nready && errno == EINTR);if (-1 == nready){perror("epoll_wait failed");return;}else if (0 == nready){// 返回0是正常行为,在timeout时间内没有事件就绪logMessage(DEBUG, "epoll_wait timeout...");}else{if (nready == _events.size()){_events.resize(2 * nready); // 给_event扩容,允许容纳更多就绪事件}for (int i = 0; i < nready; i++){logMessage(DEBUG, "wait one success, fd = %d", _events[i].data.fd);if (_events[i].data.fd == _acceptor.fd()){handlerConnection(); // 如果是监听文件描述符就绪,意味着需要处理新连接}else if (_events[i].data.fd == _evfd) // 说明是用于通知的文件描述符就绪,也就是可以EventLoop可以执行发送结果给客户端的任务{readEvfd();doPendings();}else if (_events[i].events & EPOLLIN){handlerMessage(_events[i].data.fd); // 其他文件描述符就绪,意味着需要处理接收的信息}else if (_events[i].events & EPOLLOUT){handlerRemainSend(_events[i].data.fd);}else{// TODO:还有更多事件的处理...目前只关心读写事件}}}
}void EventLoop::handlerConnection()
{logMessage(DEBUG, "in handlerConnecion");// 对于连接任务也是,epfd必须循环获取全部新连接while (true){logMessage(DEBUG, "ing EventLoop::handlerConnecion/ while ");int connfd = _acceptor.accept();if (-1 == connfd){if (errno == EINTR){logMessage(DEBUG, "EventLoop::handerConnection /errno == ETINR");continue;}else if (errno == EAGAIN || errno == EWOULDBLOCK){logMessage(DEBUG, "EventLoop::handerConnection /errno == EAGAIN, EWOULDBLOCK");break;}else{logMessage(WARNING, "EventLoop::handerConnection /accept a new link failed");return;}}else if (connfd > 0){if (!setNonBlock(connfd)){logMessage(ERROR, "EventLoop::setNonBlock failed, connfd = %d", connfd);close(connfd);return;}// 建立连接后,创建TcpConnecion对象管理TcpConnectionPtr tp(new TcpConnection(connfd, this));// 给TcpConnecion对象注册三个事件tp->setOnConnection(_onConnection);tp->setOnMessage(_onMessage);tp->setOnClose(_onClose);_connections.insert(std::make_pair(connfd, tp)); // 加入Tcp连接池中logMessage(NORMAL, "add TcpConnecion[fd = %d] in _connections", connfd);addEpReadfd(connfd); // 加入epoll监听tp->handleOnConnecionCallback(); // 连接就可以执行连接建立事件}else{break;}}logMessage(DEBUG, "out handlerConnecion");
}void EventLoop::handlerMessage(int fd)
{logMessage(DEBUG, "in handlerMessage, fd = %d", fd);auto it = _connections.find(fd);if (it != _connections.end()) // 没有到末尾就说明找到了{// 通过文件描述符取到TcpConnection连接对象TcpConnectionPtr tp = _connections[fd];if (tp->isClosed()){// 连接已经断开了tp->handleOnCloseCallback(); // 调用断开事件delEpEventfd(fd); // 从epoll中取消监听_connections.erase(fd); // 从连接池中清除logMessage(NORMAL, "erase TcpConnecion fd = %d from _connections", fd);return;}else{// 没有断开意味着文件描述符可读tp->handleOnMessageCallback();}}else{// 在连接池中找不到logMessage(WARNING, "fd = %d not in _connections", fd);}logMessage(DEBUG, "out handlerMessage, fd = %d", fd);
}void EventLoop::handlerRemainSend(int fd)
{// 线程池执行完任务后,会把发送结果给客户端的任务交给服务器执行(放置到EventLoop::_pendings),// 并利用eventfd发送"信号"给EventLoop(线程间通信), 如此EventLoop会等待eventfd读事件就绪,// 会立马执行所有_pendings的所有任务(TcpConnecion写出缓冲区的数据全部发送出去),// 但是可能执行doPendings()后还有部分任务因意外没执行完, 则TcpConnection会调用EventLoop的pushWriteEvent函数,添加对写事件关心// 当epoll发现该TcpConnection对应文件描述符的写事件就绪时,就会调用本函数(EventLoop::handlerRemainSend)logMessage(DEBUG, "in handlerRemainSend, fd = %d", fd);auto it = _connections.find(fd);if (it != _connections.end()){TcpConnectionPtr tp = _connections[fd];tp->send(); // 调用该Tcp连接的send任务,继续将缓冲区数据发送给客户端}logMessage(DEBUG, "out handlerRemainSend, fd = %d", fd);
}void EventLoop::loop()
{_isLooping = true;while (_isLooping){waitEpfd();}
}void EventLoop::unloop()
{_isLooping = false;
}void EventLoop::setOnConnection(TcpConnectionCallback &&cb)
{_onConnection = std::move(cb);
}
void EventLoop::setOnMessage(TcpConnectionCallback &&cb)
{_onMessage = std::move(cb);
}
void EventLoop::setOnClose(TcpConnectionCallback &&cb)
{_onClose = std::move(cb);
}void EventLoop::runInPool(Functor &&fc)
{_pool.excute(std::move(fc)); // 调用线程池的excute接口执行任务fc
}void EventLoop::runInLoop(Functor &&fc) // 将执行发送结果给客户端的任务添加到_pendings中,给EventLoop执行
{{std::lock_guard<std::mutex> lg(_mutex);_pendings.push_back(std::move(fc));}writeEvfd();
}int EventLoop::createEvfd()
{int fd = ::eventfd(10, 0);if (-1 == fd){perror("eventfd failed");}return fd;
}void EventLoop::readEvfd()
{uint64_t one = 1;int ret = ::read(_evfd, &one, sizeof(one));if (-1 == ret){perror("read evfd failed");}
}void EventLoop::writeEvfd()
{uint64_t one = 1;int ret = ::write(_evfd, &one, sizeof(one));if (-1 == ret){perror("write evfd failed");}
}void EventLoop::doPendings()
{logMessage(DEBUG, "in doPendings");std::vector<Functor> tmp;{std::lock_guard<std::mutex> lg(_mutex);std::swap(tmp, _pendings); // 先把_pendings里的任务拿出来,然后直接释放锁}logMessage(DEBUG, "executing %d pending tasks", tmp.size());// 到这个地方再慢慢去执行任务,以免执行任务过程中还长期持有锁。使锁的粒度更低for (auto &e : tmp){e(); // 执行任务}
}
5) Tcp服务器封装与运行
TcpServer
- TcpServer.h
#ifndef __TCPSERVER_H__
#define __TCPSERVER_H__
#include <functional>
#include "ThreadPool.h"
#include "Acceptor.h"
#include "EventLoop.h"
#include "TcpConnection.h"class TcpServer
{using TcpConnectionPtr = std::shared_ptr<TcpConnection>; // std::shared_ptr指针管理Tcp连接指针对象using TcpConnectionCallback = std::function<void(const TcpConnectionPtr &)>; // 三个事件回调函数static const std::string defaultIp; // Tcp服务器默认Ip地址,类内声明类外定义static const size_t defaultCorePoolSize = 4; // 服务器线程池默认核心线程数static const size_t defaultWorkQueueSize = 10; // 服务器线程池任务队列最大容量public:explicit TcpServer(uint16_t port, const std::string &ip = defaultIp, size_t corePoolSize = defaultCorePoolSize, size_t workqueueSize = defaultWorkQueueSize);~TcpServer();void start();void stop();void setAllCallBacks(TcpConnectionCallback &&onConnectionCb, TcpConnectionCallback &&onMessageCb, TcpConnectionCallback &&onCloseCb);private:TcpServer &operator=(const TcpServer &) = delete;TcpServer(const TcpServer &) = delete;private:ThreadPool _pool; // 服务器线程池Acceptor _acceptor; // 服务器接收连接接收器EventLoop _eventLoop; // 服务器主循环逻辑
};#endif
- TcpServer.cc
#include "TcpServer.h"// [1]defaultIp是静态成员变量,在编译时就要确定,但是编译阶段并没有给defaultIp分配内存空间,因此不能赋值
// 到链接阶段时,由链接器分配常量区内存(静态变量、全局变量)给defaultIp,此时才可以定义数值
// [2]为什么是静态成员变量:因为构造函数在程序运行时要求使用defaultIp,如果是普通成员变量,则属于类对象,
// 对象未创建出来,当然不能使用对象成员变量
const std::string TcpServer::defaultIp = "0.0.0.0"; // 类内声明,类外定义。【详见TcpServer.h】TcpServer::TcpServer(uint16_t port, const std::string &ip, size_t corePoolSize, size_t workQueueSize): _acceptor(port, ip),_pool(corePoolSize, workQueueSize),_eventLoop(_acceptor, _pool)
{
}TcpServer::~TcpServer()
{
}void TcpServer::start()
{_pool.start();_acceptor.ready();_eventLoop.loop();
}void TcpServer::stop()
{_eventLoop.unloop();
}void TcpServer::setAllCallBacks(TcpConnectionCallback &&onConnectionCb,TcpConnectionCallback &&onMessageCb,TcpConnectionCallback &&onCloseCb)
{_eventLoop.setOnConnection(std::move(onConnectionCb));_eventLoop.setOnMessage(std::move(onMessageCb));_eventLoop.setOnClose(std::move(onCloseCb));
}
server.cc
#include <string>
#include <iostream>
#include "TcpServer.h"
#include "TcpConnection.h"
#include "Log.hpp"
#include "Utils.hpp"using namespace std;
using TcpConnectionPtr = std::shared_ptr<TcpConnection>;
class MyTask
{
public:MyTask(const string &msg, TcpConnectionPtr tp) : _msg(msg), _tp(tp){}void operator()(){service();end();}void service(){// 业务逻辑logMessage(DEBUG, "Mytask::service::_msg = [%s]", _msg.c_str());// 1.先去掉请求报头并反序列化为Request对象string tmp;Protocol::deLength(_msg, &tmp);Request rq;Serializable::deserialize(&rq, tmp);// 2.构建Response响应报文Response rp;calculate(rq, &rp);Serializable::serialize(rp, &tmp);Protocol::enLength(tmp, &tmp);_msg = "";_msg = tmp; // 结果保存_msg中,待EventLoop发送给客户端logMessage(DEBUG, "MyTask::service tmp=[%s]", tmp.c_str());}bool calculate(const Request &req, Response *rep){rep->_exitcode = OK;switch (req._op){case '+':rep->_result = req._x + req._y;break;case '-':rep->_result = req._x - req._y;break;case '*':rep->_result = req._x * req._y;break;case '/':if (req._y != 0)rep->_result = req._x / req._y;elserep->_exitcode = DIV_ZERO;break;case '%':if (req._y != 0)rep->_result = req._x % req._y;elserep->_exitcode = MOD_ZERO;break;}}private:void end(){// 前面业务逻辑处理完后,需要将任务执行的结果经TcpConnection类发送给客户端_tp->sendOutput(_msg);}private:string _msg;TcpConnectionPtr _tp;
};void OnConnecion(const TcpConnectionPtr &tp)
{logMessage(NORMAL, "%s has connected", tp->toString().c_str());
}
void OnMessage(const TcpConnectionPtr &tp)
{logMessage(DEBUG, "in OnMessage");tp->receive();std::string package;while (Protocol::ParseOnePackage(tp->_inBuffer, &package)){MyTask task(package, tp); // 收到的信息构建任务tp->sendInput(std::move(task)); // 将任务交给线程池处理};
}void OnClose(const TcpConnectionPtr &tp)
{logMessage(NORMAL, "%s has closed", tp->toString().c_str());
}void test()
{std::unique_ptr<TcpServer> tpsv(new TcpServer(8080));tpsv->setAllCallBacks(std::move(OnConnecion), std::move(OnMessage), std::move(OnClose));tpsv->start();
}int main()
{test();return 0;
}
makfile
- 线程的使用依赖pthread库,json序列化方法依赖jsoncpp库
server:server.ccg++ -o $@ *.cc -std=c++11 -lpthread -ljsoncpp #-D__DEBUG__
.PHONY:clean
clean:rm -f server
.PHONY:reset
reset:rm -f server log.txt log.error
6) 客户端代码
[shenalex@VM-8-16-centos Client]$ tree .
.
|-- client.cc
|-- Connector.cc
|-- Connector.h
|-- InetAddress.cc
|-- InetAddress.h
|-- makefile
|-- Protocol.hpp
|-- Service.cc
|-- Service.h
|-- Socket.cc
|-- Socket.h
|-- SocketIO.cc
|-- SocketIO.h
|-- TcpClient.cc
`-- TcpClient.h0 directories, 15 files
- Connector
#ifndef __CONNECTOR_H__
#define __CONNECTOR_H__#include "Socket.h"
#include "InetAddress.h"class Connector
{
public:explicit Connector(std::string ip, uint16_t port);~Connector();bool connect();int fd() const;private:Socket _sock;InetAddress _serverAddr;
};#endif
#include "Connector.h"
Connector::Connector(std::string ip, uint16_t port) : _serverAddr(ip, port)
{
}Connector::~Connector()
{
}bool Connector::connect()
{if (-1 == ::connect(fd(), reinterpret_cast<const sockaddr*>(_serverAddr.getInetAddrPtr()), sizeof(struct sockaddr_in))){perror("connect failed");return false;}return true;
}int Connector::fd() const
{return _sock.fd();
}
- Service
#ifndef __SERVICE_H__
#define __SERVICE_H__#include "SocketIO.h"
#include "Protocol.hpp"
class Connector;class Service
{static const int defaultSize = 1024;public:explicit Service(Connector &connector);~Service();void run();private:bool buildRequest(Request *request, const std::string msg);bool isOperator(char c);int readPackage(std::string *buffer); // 调用SocketIO, 基于自定义协议读取一段完整的报文private:Connector &_connector;SocketIO _sockIO;
};#endif
#include "Service.h"
#include <iostream>
#include "Connector.h"
#include <string>Service::Service(Connector &connector) : _connector(connector), _sockIO(_connector.fd())
{
}
Service::~Service()
{
}
void Service::run()
{std::cout << "+-------------------------------------------------------------------+" << std::endl;std::cout << "| WELCOME TO USE INTERNET CACULATOR |" << std::endl;std::cout << "| NOTES: You can only type in positive number and only one operator |" << std::endl;std::cout << "+---------+---------------------------------------------------------+" << std::endl;std::cout << "| Example | |" << std::endl;std::cout << "+---------+---------------------------------------------------------+" << std::endl;std::cout << "| Enter(q for quie)# 20-4 |" << std::endl; std::cout << "| [server echo]Calculate Success! |" << std::endl;std::cout << "| result = 16 |" << std::endl;std::cout << "+-------------------------------------------------------------------+" << std::endl;while (true){// 1.输入linestd::cout << "Enter(q for quit)# ";std::string line;std::getline(std::cin, line);if (line.size() == 1 && line[0] == 'q'){std::cout << "quit client!" << std::endl;return;}// 2. 构建报文Request rq;if (!buildRequest(&rq, line))continue;std::string tmp;Serializable::serialize(rq, &tmp); // 请求结构序列化Protocol::enLength(tmp, &tmp); // 构建完整的请求报文// 3.发送完整请求报文给服务器_sockIO.writen(tmp.c_str(), tmp.size());// 4.接收服务器响应报文readPackage(&tmp); // 重复使用tmp, 前面构建的完整报文保存在原来tmp已经发送出去了Protocol::deLength(tmp, &tmp);Response rp;Serializable::deserialize(&rp, tmp);if (rp._exitcode == OK)tmp = "Calculate Success!";else if (rp._exitcode == DIV_ZERO || rp._exitcode == MOD_ZERO)tmp = "Error: Divisor cannot be 0!";std::cout << "[server echo]" << tmp << "\n\tresult = " << rp._result << std::endl;std::cout << "-------------------------------" << std::endl;}
}bool Service::isOperator(char c)
{if (c == '+' || c == '-' || c == '*' || c == '/' || c == '%')return true;elsereturn false;
}// 构建请求Request对象
bool Service::buildRequest(Request *request, const std::string msg)
{std::string first;char op;std::string second;int step = 0; // 标记当前是哪个数字,是first还是secondint flag = false; // 标记运算符是否已经确定(修复用户输入多个运算符程序崩溃的情况,比如"78+-3")for (int i = 0; i < msg.size(); i++){char c = msg[i];if (isblank(c)){continue;}else if (isdigit(c)){if (step == 0)first += c;else if (step == 1)second += c;}else if (isOperator(c)){if (flag){std::cerr << "invalid input" << std::endl;return false;}op = c;step++; // 遇到了操作符就意味着接下来是第二个数字flag = true; // 运算符已经确定了}else // 意外输入其它字符返回false{std::cerr << "invalid input" << std::endl;return false;}}*request = Request(std::stoi(first), std::stoi(second), op);return true;
}int Service::readPackage(std::string *out)
{// FIXME:其实设置一个用户缓冲区,将内核缓冲区数据全部读取到用户缓冲区中,再进行分析会更好,现在使用MSG_PEEK要多次进行系统调用开销较大char buffer[defaultSize] = {0};int rest = sizeof(buffer);char *pstr = buffer;while (rest > 0){ssize_t s = recv(_connector.fd(), pstr, rest, MSG_PEEK); // 先复制内核缓冲区中的数据到space(不擦除)if (-1 == s){if (errno == EINTR)continue;perror("receive failed(recv error)");return -1;}else if (0 == s){break;}else{std::string tmp(pstr, s);auto it = tmp.find(LINE_SEP); // 借助string的find函数寻找行分界符号if (it == std::string::npos){pstr += s;rest -= s;continue;}else{// 找到了LINE_SEP就认为至少有一个报文// 先把pstr置为LINE_SEP所在位置pstr += it;int count_len = pstr - buffer;int text_len = std::stoi(std::string(buffer, count_len)); // 直接转为数字int total_len = count_len + LINE_SEP_LEN + text_len + LINE_SEP_LEN; // 计算好完整报文大小// 调用_sockIO的readn方法从内核缓冲区中读取固定字节数据_sockIO.readn(buffer, total_len);buffer[total_len] = 0; // buffer的有效数据是[0, total_len), 后面可能有前面尝试读取时留下的不需要的数据break;}}}*out = std::string(buffer);return out->size();
}
- TcpClient
#ifndef __TCPCLIENT_H__
#define __TCPCLIENT_H__
#include "Connector.h"
#include "Service.h"
class TcpClient
{
public:explicit TcpClient(std::string ip, uint16_t port);~TcpClient();void start();private:Connector _connector;Service _service;
};#endif
#include "TcpClient.h"
#include <iostream>
TcpClient::TcpClient(std::string ip, uint16_t port) :_connector(ip, port), _service(_connector)
{
}
TcpClient::~TcpClient()
{
}void TcpClient::start()
{if(!_connector.connect()){std::cerr << "connect to server failed, please try again" << std::endl;return;}_service.run();
}
- client.cc
#include <string>
#include <iostream>
#include <memory>
#include "TcpClient.h"using namespace std;
int main()
{{unique_ptr<TcpClient> pclient(new TcpClient("127.0.0.1", 8080));pclient->start();}return 0;
}
- Socket: 略
- SocketIO: 略
- Protocol: 略
- InetAddress: 略
- makefile: 略
参考资料
- 游双. (2013). Linux高性能服务器编程. 北京: 机械工业出版社.