五种IO模型 阻塞IO 多路转接之select 多路转接之poll
目录
一. 五种IO模型
1.1 阻塞IO
1.2 非阻塞IO
1.3 信号驱动IO
1.4 多路转接IO
1.5 异步IO
二. 高级IO概念
2.1同步通信 vs 异步通信
2.2 阻塞 vs 非阻塞
三. 非阻塞IO
fcntl
四. 初识select
返回值
关于time_val
关于fd_set
五.select 相关代码
编辑
select特点
select缺点
六. poll
poll 的介绍
poll的优点
poll的缺点
七. poll的代码
一. 五种IO模型
1.1 阻塞IO
阻塞 IO: 在内核将数据准备好之前, 系统调用会一直等待. 所有的套接字, 默认 都是阻塞方式.
1.2 非阻塞IO
非阻塞 IO: 如果内核还未将数据准备好, 系统调用仍然会直接返回, 并且返回 EWOULDBLOCK 错误码.
非阻塞 IO 往往需要程序员循环的方式反复尝试读写文件描述符, 这个过程称为轮询. 这 对 CPU 来说是较大的浪费, 一般只有特定场景下才使用.
1.3 信号驱动IO
信号驱动 IO: 内核将数据准备好的时候, 使用 SIGIO 信号通知应用程序进行 IO 操作.
1.4 多路转接IO
IO 多路转接: 虽然从流程图上看起来和阻塞 IO 类似. 实际上最核心在于 IO 多 路转接能够同时等待多个文件描述符的就绪状态.
1.5 异步IO
异步 IO: 由内核在数据拷贝完成时, 通知应用程序(而信号驱动是告诉应用程序 何时可以开始拷贝数据).
• 任何 IO 过程中, 都包含两个步骤. 第一是等待, 第二是拷贝. 而且在实际的应用 场景中, 等待消耗的时间往往都远远高于拷贝的时间. 让 IO 更高效, 最核心的办法就 是让等待的时间尽量少.
二. 高级IO概念
2.1同步通信 vs 异步通信
同步和异步关注的是消息通信机制.
• 所谓同步,就是在发出一个调用时,在没有得到结果之前,该调用就不返回. 但是一旦调用返回,就得到返回值了; 换句话说,就是由调用者主动等待这个调用 的结果;
• 异步则是相反,调用在发出之后,这个调用就直接返回了,所以没有返回结 果; 换句话说,当一个异步过程调用发出后,调用者不会立刻得到结果; 而是在调用 发出后,被调用者通过状态、通知来通知调用者,或通过回调函数处理这个调用.
这里的同步跟进程线程中的同步完全不同
• 进程/线程同步也是进程/线程之间直接的制约关系
• 是为完成某种任务而建立的两个或多个线程,这个线程需要在某些位置上协调 他们的工作次序而等待、传递信息所产生的制约关系. 尤其是在访问临界资源的时 候.
参与了就是同步,不参与是异步
2.2 阻塞 vs 非阻塞
阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态.
• 阻塞调用是指调用结果返回之前,当前线程会被挂起. 调用线程只有在得到结 果之后才会返回.
• 非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程.
三. 非阻塞IO
以前使用的方式是使用系统调用函数中的flags标记位,设置非阻塞的方式。
recv的标记位也是这样:
fcntl
一个文件描述符, 默认都是阻塞 IO.
fcntl 函数有 5 种功能:
• 复制一个现有的描述符(cmd=F_DUPFD).
• 获得/设置文件描述符标记(cmd=F_GETFD 或 F_SETFD).
• 获得/设置文件状态标记(cmd=F_GETFL 或 F_SETFL).
• 获得/设置异步 I/O 所有权(cmd=F_GETOWN 或 F_SETOWN).
• 获得/设置记录锁(cmd=F_GETLK,F_SETLK 或 F_SETLKW).
void SetNoBlock(int fd)
{int fl = fcntl(fd, F_GETFL);if (fl < 0) {perror("fcntl");return;}fcntl(fd, F_SETFL, fl | O_NONBLOCK);
}
• 使用 F_GETFL 将当前的文件描述符的属性取出来(这是一个位图).
• 然后再使用 F_SETFL 将文件描述符设置回去. 设置回去的同时, 加上一个 O_NONBLOCK 参数
demo:
#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>
#include <iostream>
void SetNoBlock(int fd)
{int fl = fcntl(fd, F_GETFL);if (fl < 0){perror("fcntl");return;}fcntl(fd, F_SETFL, fl | O_NONBLOCK);
}
int main()
{SetNoBlock(0);while (1){char buf[1024] = {0};ssize_t read_size = read(0, buf, sizeof(buf) - 1);if (read_size < 0){// perror("read");// sleep(1);// 是-1不一定就是出错了,可能是底层数据没准备好if (errno == EAGAIN || errno == EWOULDBLOCK) // 错误码{std::cout << "数据没有准备好" << std::endl;sleep(1);//做其他事情//...}else if(errno == EINTR)//被信号中断了{continue;}else{//这里才是出错了}}else if (read_size == 0){std::cout << "读取完毕,退出" << std::endl;break;}else{std::cout << "input: " << buf << std::endl;}}return 0;
}
输出:
四. 初识select
• select 系统调用是用来让我们的程序监视多个文件描述符的状态变化的;
• 程序会停在 select 这里等待,直到被监视的文件描述符有一个或多个发生了状 态改变;
• 参数 nfds 是需要监视的最大的文件描述符值+1;
• rdset,wrset,exset 分别对应于需要检测的可读文件描述符的集合,可写文件描 述符的集 合及异常文件描述符的集合;
• 参数 timeout 为结构 timeval,用来设置 select()的等待时间
参数 timeout 取值
• NULL:则表示 select()没有 timeout,select 将一直被阻塞,直到某个文件 描述符上发生了事件;
• 0:仅检测描述符集合的状态,然后立即返回,并不等待外部事件的发生。
• 特定的时间值:如果在指定的时间段里没有事件发生,select 将超时返回。假如5s的时间,2s的时候等到了,返回的时候timeout会被返回为3s,它是输入输出型参数
返回值
• 执行成功则返回文件描述词状态已改变的个数
• 如果返回 0 代表在描述词状态改变前已超过 timeout 时间,没有返回
• 当有错误发生时则返回-1,错误原因存于 errno,此时参数 readfds,writefds, exceptfds 和 timeout 的值变成不可预测。
错误值可能为:
• EBADF 文件描述词为无效的或该文件已关闭
• EINTR 此调用被信号所中断
• EINVAL 参数 n 为负值。
• ENOMEM 核心内存不足
关于time_val
timeval 结构用于描述一段时间长度,如果在这个时间内,需要监视的描述符没有事件 发生则函数返回,返回值为 0。
关于fd_set
其实这个结构就是一个整数数组, 更严格的说, 是一个 "位图". 使用位图中对应的位来表 示要监视的文件描述符.
提供了一组操作 fd_set 的接口, 来比较方便的操作位图
void FD_CLR(int fd, fd_set *set); // 用来清除描述词组 set 中相关fd 的位int FD_ISSET(int fd, fd_set *set); // 用来测试描述词组 set 中相关fd 的位是否为真void FD_SET(int fd, fd_set *set); // 用来设置描述词组 set 中相关fd 的位void FD_ZERO(fd_set *set); // 用来清除描述词组 set 的全部位
五.select 相关代码
selectServer.hpp:
#pragma once#include <iostream>
#include <memory>
#include <unistd.h>
#include <cstring>
#include "Socket.hpp"
#include "Common.hpp"using namespace SocketModule;class SelectServer
{const static int size = sizeof(fd_set) * 8;const int defaultfd = -1;public:SelectServer(int port) : _listensock(std::make_unique<TcpSocket>()), _isruning(false){_listensock->BuildTcpSocketMethod(port);for (int i = 0; i < size; i++){_fd_array[i] = defaultfd;}_fd_array[0] = _listensock->Fd(); // 刚开始只有listensocket的fd}void Accepter(){InetAddr client;int sockfd = _listensock->Accept(&client); // 这里一定不会阻塞,等和拷贝分离了if (sockfd >= 0){// 获取新链接成功LOG(LogLevel::INFO) << "get a new link , sockfd: " << sockfd << "client is: " << client.StringAddr();// 就需要把新链接放进辅助数组里int i = 0;for (; i < size; i++){if (_fd_array[i] == defaultfd){break;}}if (i == size){LOG(LogLevel::WARING) << "select server full";close(sockfd);}else{_fd_array[i] = sockfd;}}}void Recver(int fd, int pos){// recv的时候肯定也不会阻塞char buffer[1024];ssize_t n = recv(fd, buffer, sizeof(buffer) - 1, 0);//这样写是有bug的,tcp是面向字节流的if (n > 0){buffer[n] = 0;std::cout << "client say& " << buffer << std::endl;}else if (n == 0){LOG(LogLevel::DEBUG) << "client quit";// 不让select关心这个fd了_fd_array[pos] = defaultfd;close(fd);}else{LOG(LogLevel::ERROR) << "recv error";// 不让select关心这个fd了_fd_array[pos] = defaultfd;close(fd);}}void Dispatcher(fd_set &rfds){// 虽然知道是有数据到来了,但是还要分辨出是新链接到来,还是普通的读事件就绪for (int i = 0; i < size; i++){if (_fd_array[i] == defaultfd)continue;if (FD_ISSET(_fd_array[i], &rfds)){// 等于listensock fd的话就是新链接到来if (_fd_array[i] == _listensock->Fd()){Accepter();}else{Recver(_fd_array[i], i);}}}}void Start(){_isruning = true;while (true){// 注意千万不能直接去accept,accept本身就是阻塞的io,我们写的是select!// accept关注的就是listensocket的读事件,交给select去等待// 我们把新链接到来,也称作是一种读事件就绪fd_set rfds;FD_ZERO(&rfds);int maxfd = -1;// FD_SET(_listensock->Fd(), &rfds);// 1.设置到rfds中,每次的select之前,都要对rfds进行重置for (int i = 0; i < size; i++){if (_fd_array[i] == defaultfd)continue;FD_SET(_fd_array[i], &rfds);// 2.最大的fd,一定是变化的,找到最大的fdif (_fd_array[i] > maxfd)maxfd = _fd_array[i];}int n = select(maxfd + 1, &rfds, nullptr, nullptr, nullptr);switch (n){case -1:LOG(LogLevel::ERROR) << "select error";break;case 0:LOG(LogLevel::INFO) << "time out...";break;default:LOG(LogLevel::DEBUG) << "有事件就绪 n:" << n;Dispatcher(rfds);//事件派发器break;}}}~SelectServer(){}private:std::unique_ptr<Socket> _listensock;bool _isruning;int _fd_array[size]; // 辅助数组
};
Main.cc
#include "SelectServer.hpp"int main(int argc, char* argv[])
{if(argc != 2){std::cout << "usage: " << argv[0] << " port" << std::endl;exit(USAGE_ERR);}Enable_Console_Log_Strtegy();int port = std::stoi(argv[1]);std::unique_ptr<SelectServer> srv = std::make_unique<SelectServer>(port);srv->Start();return 0;
}
select特点
• 可监控的文件描述符个数取决于 sizeof(fd_set)的值. 我这边服务器上 sizeof(fd_set)=1024,每 bit 表示一个文件描述符,则我服务器上支持的最大文件描述 符是 1024*8.
• 将 fd 加入 select 监控集的同时,还要再使用一个数据结构 array 保存放到 select 监控集中的 fd,
○ 一是用于再 select 返回后,array 作为源数据和 fd_set 进行 FD_ISSET 判 断。
○ 二是 select 返回后会把以前加入的但并无事件发生的 fd 清空,则每次开始 select 前都要重新从 array 取得 fd 逐一加入(FD_ZERO 最先),扫描 array 的同时 取得 fd 最大值 maxfd,用于 select 的第一个参数。
备注: fd_set 的大小可以调整,可能涉及到重新编译内核.
select缺点
• 每次调用 select, 都需要手动设置 fd 集合, 从接口使用角度来说也非常不便.
• 同时每次调用 select 都需要在内核遍历传递进来的所有 fd,这个开销在 fd 很 多时也很大
• select 支持的文件描述符数量太小.
六. poll
poll 的介绍
poll的结构:
events事件:
其他的返回结果什么的,跟上面的select完全一样。
poll的优点
不同于 select 使用三个位图来表示三个 fdset 的方式,poll 使用一个 pollfd 的指针实现.
• pollfd 结构包含了要监视的 event 和发生的 event,不再使用 select“参数-值”传 递的方式. 接口使用比 select 更方便.
• poll 并没有最大数量限制 (但是数量过大后性能也是会下降).
poll的缺点
poll 中监听的文件描述符数目增多时
• 和 select 函数一样,poll 返回后,需要轮询 pollfd 来获取就绪的描述符.
• 每次调用 poll 都需要把大量的 pollfd 结构从用户态拷贝到内核中.
• 同时连接的大量客户端在一时刻可能只有很少的处于就绪状态, 因此随着监视 的描述符数量的增长, 其效率也会线性下降.
七. poll的代码
PollServer.hpp
#pragma once#include <iostream>
#include <memory>
#include <unistd.h>
#include <cstring>
#include <sys/poll.h>
#include "Socket.hpp"
#include "Common.hpp"using namespace SocketModule;class PollServer
{const static int size = 4096;const int defaultfd = -1;public:PollServer(int port) : _listensock(std::make_unique<TcpSocket>()), _isruning(false){_listensock->BuildTcpSocketMethod(port);for(int i = 0;i < size;i++){_fds[i].fd = defaultfd;_fds[i].events = 0;_fds[i].revents = 0;}_fds[0].fd = _listensock->Fd();_fds[0].events = POLLIN;}void Accepter(){InetAddr client;int sockfd = _listensock->Accept(&client); // 这里一定不会阻塞,等和拷贝分离了if (sockfd >= 0){// 获取新链接成功LOG(LogLevel::INFO) << "get a new link , sockfd: " << sockfd << "client is: " << client.StringAddr();// 就需要把新链接放进辅助数组里int i = 0;for (; i < size; i++){if (_fds[i].fd == defaultfd){break;}}if (i == size){LOG(LogLevel::WARING) << "poll server full";close(sockfd);}else{_fds[i].fd = sockfd;_fds[i].events |= POLLIN;_fds[i].revents = 0;}}}void Recver(int pos){// recv的时候肯定也不会阻塞char buffer[1024];ssize_t n = recv(_fds[pos].fd, buffer, sizeof(buffer) - 1, 0);//这样写是有bug的,tcp是面向字节流的if (n > 0){buffer[n] = 0;std::cout << "client say& " << buffer << std::endl;}else if (n == 0){LOG(LogLevel::DEBUG) << "client quit";close(_fds[pos].fd);//先关闭,在设置为-1// 不让select关心这个fd了_fds[pos].fd = defaultfd;_fds[pos].events = 0;_fds[pos].revents = 0;}else{LOG(LogLevel::ERROR) << "recv error";close(_fds[pos].fd);// 不让select关心这个fd了_fds[pos].fd = defaultfd;_fds[pos].events = 0;_fds[pos].revents = 0;}}void Dispatcher(){// 虽然知道是有数据到来了,但是还要分辨出是新链接到来,还是普通的读事件就绪for (int i = 0; i < size; i++){if (_fds[i].fd == defaultfd)continue;if (_fds[i].revents & POLLIN){// 等于listensock fd的话就是新链接到来if (_fds[i].fd == _listensock->Fd()){Accepter();}else{Recver(i);}}}}void Start(){int timeout = -1;_isruning = true;while (true){int n = poll(_fds,size,timeout);switch (n){case -1:LOG(LogLevel::ERROR) << "poll error";break;case 0:LOG(LogLevel::INFO) << "time out...";break;default:LOG(LogLevel::DEBUG) << "有事件就绪 n:" << n;Dispatcher();//事件派发器break;}}}~PollServer(){}private:std::unique_ptr<Socket> _listensock;bool _isruning;struct pollfd _fds[size];
};