多路转接 select
I/O 多路转接之 select
select可以一次性等待多个fd,一旦有其中任意一个就绪就会通知上层,告诉调用方,哪些fd已经可以IO了,select通过等待多个fd的一种就绪事件通知机制!
初识 select
系统提供 select 函数来实现多路复用输入/输出模型.select 系统调用是用来让我们的程序监视多个文件描述符的状态变化的,程序会停在 select 这里等待,直到被监视的文件描述符有一个或多个发生了状态改变。
select 函数原型
select 的函数原型如下:
#include <sys/select.h>
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
参数解释:
- 参数 nfds 是需要监视的最大的文件描述符值+1;
- rdset,wrset,exset 分别对应于需要检测的可读文件描述符的集合,可写文件描述符的集 合及异常文件描述符的集合用位图表示,第几个比特位就表示第几文件描述符,比特位的内容01表示是否关心;
- 参数 timeout 为结构 timeval,用来设置 select()的等待时间,有三种方式可以设置
参数 timeout 取值:
- 阻塞,NULL:则表示 select()没有 timeout,select 将一直被阻塞,直到某个文件描述符上发生了事件;
- 非阻塞,0:仅检测描述符集合的状态,然后立即返回,并不等待外部事件的发生。
- 特定的时间值:如果在指定的时间段里没有事件发生,select 将超时返回。
timeout 输入时表示timeout 事件,返回时timeout 代表剩余时间,是一个输入输出型参数,select返回值表示有几个文件描述符就绪,0超时,小于零报错
关于timeval
struct timeval {
time_t tv_sec; // 秒数(seconds)
suseconds_t tv_usec; // 微秒数(microseconds,1秒 = 1,000,000微秒)
};
tv_sec
:类型为time_t
(通常是 32 位或 64 位整数),表示从 Unix 纪元时间(1970 年 1 月 1 日 00:00:00 UTC)到当前时间的总秒数。tv_usec
:类型为suseconds_t
(通常是 32 位有符号整数),表示秒数的小数部分,范围为 0 到 999,999(即不足 1 秒的微秒数)。
关于 fd_set 结构
是一个内核提供给用户的数据结构,一次性可以向fd_set里面添加多个fd,结构体里面套的数组
函数返回值:
- 执行成功则返回文件描述词状态已改变的个数
- 如果返回 0 代表在描述词状态改变前已超过 timeout 时间,没有返回
- 当有错误发生时则返回-1,错误原因存于 errno,此时参数 readfds,writefds,exceptfds 和timeout 的值变成不可预测。
错误值可能为:
- EBADF 文件描述词为无效的或该文件已关闭
- EINTR 此调用被信号所中断
- EINVAL 参数 n 为负值。
- ENOMEM 核心内存不足
fs_set readset;
FD_SET(fd,&readset);
select(fd+1,&readset,NULL,NULL,NULL);
if(FD_ISSET(fd,readset)){……}
其他位操作接口,向文件操作符集添加指定文件描述符,避免自己手动添加位图
echo select server
Common.hpp
#pragma once#include <iostream>
#include<functional>
#include <string>
#include <cstring>
#include <memory>
#include<unistd.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <arpa/inet.h>
#include <netinet/in.h>
enum ExitCode
{OK = 0,USAGE_ERR,SOCKET_ERR,BIND_ERR,LISTEN_ERR,CONNECT_ERR,FORK_ERR
};class NoCopy
{public:
NoCopy(){}
~NoCopy(){}
NoCopy(const NoCopy&)=delete;
const NoCopy& operator= (const NoCopy &)=delete;
};#define CONV(addr) ((struct sockaddr *)&addr)
InetAddr.hpp
#pragma once
#include "Common.hpp"class InetAddr
{public:InetAddr(){}InetAddr(struct sockaddr_in &addr){ SetAddr(addr);}InetAddr(const std::string &ip,uint16_t port):_ip(ip),_port(port){//主机转网络memset(&_addr,0,sizeof(_addr));_addr.sin_family=AF_INET;inet_pton(AF_INET,_ip.c_str(),&_addr.sin_addr);_addr.sin_port=htons(_port);}InetAddr(uint16_t port):_port(port),_ip("0"){ //端口转memset(&_addr,0,sizeof(_addr));_addr.sin_family=AF_INET;_addr.sin_addr.s_addr=INADDR_ANY;_addr.sin_port = htons(_port);}uint16_t Port() { return _port; }std::string Ip() { return _ip; }void SetAddr(struct sockaddr_in &addr) { //网络转主机_addr=addr;_port = ntohs(addr.sin_port);//_ip = inet_ntoa(addr.sin_addr);char ipbuffer[64];inet_ntop(AF_INET,&_addr.sin_addr,ipbuffer,sizeof(_addr));_ip=ipbuffer;}const struct sockaddr_in &NetAddr() { return _addr; }const struct sockaddr *NetAddrPtr() { return CONV(_addr); }socklen_t NetAddrLen() { return sizeof(_addr); }bool operator==(const InetAddr &addr){return addr._ip == _ip && addr._port == _port;}std::string StringAddr(){return _ip + ":" + std::to_string(_port);}~InetAddr() {}
private:struct sockaddr_in _addr;std::string _ip;uint16_t _port;
};
Log.hpp
#ifndef __LOG_HPP__
#define __LOG_HPP__#include <iostream>
#include <string>
#include "Mutex.hpp"
#include <filesystem>
#include <fstream>
#include <memory>
#include <unistd.h>
#include <sstream>
#include<ctime>namespace LogModule
{const std::string sep = "\r\n";using namespace MutexModule ;// 2.刷新策略class LogStrategy{public:~LogStrategy() = default;virtual void SyncLog(const std::string &message) = 0;};// 显示器刷新日志的策略class ConsoleLogStrategy : public LogStrategy{public:ConsoleLogStrategy() {}~ConsoleLogStrategy() {}void SyncLog(const std::string &message) override{LockGuard lockguard(_mutex);std::cout << message << sep;}private:Mutex _mutex;};// 缺省文件路径以及文件本身const std::string defaultpath = "./log";const std::string defaultfile = "my.log";// 文件刷新日志的策略class FileLogStrategy : public LogStrategy{public:FileLogStrategy(const std::string &path = defaultpath, const std::string &file = defaultfile): _path(path), _file(file){LockGuard lockguard(_mutex);if (std::filesystem::exists(_path)) // 判断路径是否存在{return;}try{std::filesystem::create_directories(_path);}catch (const std::filesystem::filesystem_error &e){std::cerr << e.what() << '\n';}}void SyncLog(const std::string &message) override{LockGuard lockguard(_mutex);std::string filename = _path + (_path.back() == '/' ? "" : "/") + _file;std::ofstream out(filename, std::ios::app); // 追加写入if (!out.is_open()){return;}out << message << sep;out.close();}~FileLogStrategy() {}private:Mutex _mutex;std::string _path; // 日志文件的路径std::string _file; // 要打印的日志文件};// 形成日志等级enum class Loglevel{DEBUG,INIF,WARNING,ERROR,FATAL};std::string Level2Str(Loglevel level){switch (level){case Loglevel::DEBUG:return "DEBUG";case Loglevel::INIF:return "INIF";case Loglevel::WARNING:return "WARNING";case Loglevel::ERROR:return "ERROR";case Loglevel::FATAL:return "FATAL";default:return "UNKNOWN";}}std::string GetTimeStamp(){time_t cuur =time(nullptr);struct tm curr_tm;localtime_r(&cuur,&curr_tm);char buffer[128];snprintf(buffer,sizeof(buffer),"%4d-%02d-%02d %02d:%02d:%02d",curr_tm.tm_year+1900,curr_tm.tm_mon+1,curr_tm.tm_mday,curr_tm.tm_hour,curr_tm.tm_min,curr_tm.tm_sec);return buffer;}class Logger{public:Logger(){EnableConsoleLogStrategy();}// 选择某种策略// 1.文件void EnableFileLogStrategy(){_ffush_strategy = std::make_unique<FileLogStrategy>();}// 显示器void EnableConsoleLogStrategy(){_ffush_strategy = std::make_unique<ConsoleLogStrategy>();}// 表示的是未来的一条日志class LogMessage{public:LogMessage(Loglevel &level, std::string &src_name, int line_number, Logger &logger): _curr_time(GetTimeStamp()), _level(level), _pid(getpid()), _src_name(src_name), _line_number(line_number), _logger(logger){// 合并左半部分std::stringstream ss;ss << "[" << _curr_time << "] "<< "[" << Level2Str(_level) << "] "<< "[" << _pid << "] "<< "[" << _src_name << "] "<< "[" << _line_number << "] "<< "- ";_loginfo = ss.str();}template <typename T>LogMessage &operator<<(const T &info){// 右半部分,可变std::stringstream ss;ss << info;_loginfo += ss.str();return *this;}~LogMessage(){if (_logger._ffush_strategy){_logger._ffush_strategy->SyncLog(_loginfo);}}private:std::string _curr_time; // 日志时间Loglevel _level; // 日志状态pid_t _pid; // 进程pidstd::string _src_name; // 文件名称int _line_number; // 对应的行号std::string _loginfo; // 合并之后的一条完整信息Logger &_logger;};LogMessage operator()(Loglevel level, std::string src_name, int line_number){return LogMessage(level, src_name, line_number, *this);}~Logger() {}private:std::unique_ptr<LogStrategy> _ffush_strategy;};//全局日志对象Logger logger;//使用宏,简化用户操作,获取文件名和行号// __FILE__ 一个宏,替换完成后目标文件的文件名// __LINE__ 一个宏,替换完成后目标文件对应的行号#define LOG(level) logger(level,__FILE__,__LINE__) #define Enable_Console_Log_Strategy() logger.EnableConsoleLogStrategy()#define Enable_File_Log_Strategy() logger.EnableFileLogStrategy()}#endif
Main.cc
#include"SelectServer.hpp"int main(int argc,char *argv[])
{if(argc != 2){std::cout<<"Usage: "<<argv[0]<<" prot"<<std::endl;exit(USAGE_ERR);}Enable_Console_Log_Strategy();uint16_t port = std::stoi(argv[1]);std::unique_ptr<SelectServer> svr = std::make_unique<SelectServer>(port);svr->Start();return 0;
}
Makefile
selectserver:Main.ccg++ -o $@ $^ -std=c++17
.PHONY:clean
clean:rm -f selectserver
Mutex.hpp
#pragma once
#include <pthread.h>
#include <iostream>
namespace MutexModule
{ class Mutex{public:Mutex(){pthread_mutex_init(&_mutex, nullptr);}void Lock(){int n = pthread_mutex_lock(&_mutex);(void)n;}void Unlock(){int n = pthread_mutex_unlock(&_mutex);(void)n;}~Mutex(){pthread_mutex_destroy(&_mutex);}pthread_mutex_t *get(){return &_mutex;}private:pthread_mutex_t _mutex;};class LockGuard{public:LockGuard(Mutex &mutex):_mutex(mutex){_mutex.Lock();}~LockGuard(){_mutex.Unlock();}private:Mutex &_mutex;};
}
SelectServer.hpp
#pragma once#include <iostream>
#include<unistd.h>
#include"Socket.hpp"
#include"Log.hpp"
#include<memory>using namespace SocketModule;
using namespace LogModule;class SelectServer
{const static int size = sizeof(fd_set)*8;const static int defaultfd = -1;
public:
SelectServer(int port)
:_listensock(std::make_unique<TcpSocket>())
{_listensock-> BUildTcpLIstenSocketMethod(port);_isrunning = false;for(int i=0;i<size;i++){_fd_arry[i]=defaultfd;}_fd_arry[0] = _listensock->Fd();
}void Start()
{ _isrunning = true;while(true){//accept是阻塞调用,将_listensock放入select中等待fd_set rfds;FD_ZERO(&rfds);//清空文件描述符集int maxfd = defaultfd;for(int i=0;i<size;i++){if(_fd_arry[i] ==defaultfd){continue;}FD_SET(_fd_arry[i],&rfds);if(maxfd <_fd_arry[i]){maxfd =_fd_arry[i];}}//struct timeval timeout = {2,0};Printfd();//最大fd是动态变化的,每次select之前都会对rfds重置int n = select(maxfd+1,&rfds,nullptr,nullptr,nullptr);switch(n){case -1:LOG(Loglevel::ERROR)<<"select error";break;case 0:LOG(Loglevel::INIF)<<" time out ...";break; default://有事件就绪LOG(Loglevel::DEBUG)<<"有事件就绪!n: "<<n;Dispatcher(rfds);break;}}_isrunning = false;
}
//IO处理器
void Recver(int fd,int pos)
{char buffer[1024];ssize_t n =recv(fd,buffer,sizeof(buffer)-1,0);if(n > 0 ){std::cout<<"client say@ "<<buffer<<std::endl;}else if(n == 0){LOG(Loglevel::INIF) << "client quit!";_fd_arry[pos] = defaultfd;close(fd);}else{LOG(Loglevel::ERROR)<<"recv error";close(fd);}
}
//链接管理器
void Accepter()
{InetAddr client;int sockfd = _listensock->Accept(&client);if(sockfd >= 0){LOG(Loglevel::INIF)<<"获取新连接成功,sockfd: "<<sockfd;}//每次select都清空了,引入一个数组来保存历史打开还未被关闭的文件描述符LOG(Loglevel::INIF)<<"获取新连接成功,sockfd:"<<sockfd<<",client is : "<<client.StringAddr();int pos = 0;for(;pos<size;pos++){if(_fd_arry[pos]==defaultfd)break;}if(pos == size){LOG(Loglevel::WARNING)<<"server 的select满了";close(sockfd);//满了,超过了elect的容量,无法转接,直接释放}else{//有合法的fd_fd_arry[pos]=sockfd;}
}
//事件派发器
void Dispatcher(fd_set &rfds)
{//在rfds里面的就是该fd就绪for(int i =0;i<size;i++){if(_fd_arry[i]==defaultfd){continue;}//合法但不一定就绪if(FD_ISSET(_fd_arry[i],&rfds)){//读就绪if(_fd_arry[i] == _listensock->Fd()){Accepter();}else{//普通读事件Recver(_fd_arry[i],i);}}}}void Printfd()
{std::cout<<"fd_array[]:";for(int i=0;i<size;i++){if(_fd_arry[i]==defaultfd) continue;std::cout<<_fd_arry[i]<<" ";}std::cout<<"\r\n";
}
void Stop(){_isrunning = false;}
~SelectServer(){}private:std::unique_ptr<Socket>_listensock;bool _isrunning;int _fd_arry[size];
};
Socket.hpp
#pragma once
#include <iostream>
#include <string>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <cstdlib>
#include <unistd.h>
#include "Log.hpp"
#include "Common.hpp"
#include "InetAddr.hpp"namespace SocketModule
{using namespace LogModule;const static int gbacklog =16;const static int defaultfd =-1;// 基类socketclass Socket{public:virtual ~Socket() {}virtual void SocketOrDie() = 0;virtual void BindOrDie(uint16_t port) = 0;virtual void ListenOrDie(int backlog) = 0;virtual int Accept(InetAddr * client)= 0;virtual void Close()=0;virtual int Recv(std::string * out) = 0;virtual int Send(std::string &message) = 0;virtual int Connect(const std::string &server_ip ,uint16_t port) =0;virtual int Fd() = 0;public:void BuildTcpClientSocketMethod(){SocketOrDie();}void BUildTcpLIstenSocketMethod(uint16_t port,int backlog = gbacklog){SocketOrDie();BindOrDie(port);ListenOrDie(backlog);}// void BUildUdpSocketMethod()// {// SocketOrDie();// BindOrDie();// }};class TcpSocket : public Socket{public:TcpSocket():_sockfd(defaultfd){}TcpSocket(int fd): _sockfd(fd) {}~TcpSocket() {}void SocketOrDie() override{_sockfd = ::socket(AF_INET, SOCK_STREAM, 0);if (_sockfd < 0){LOG(Loglevel::FATAL) << "创建套接字失败!";exit(SOCKET_ERR);}LOG(Loglevel::INIF) << "创建套接字成功!";}void BindOrDie(uint16_t port) override{InetAddr localaddr(port);int n = ::bind(_sockfd, localaddr.NetAddrPtr(), localaddr.NetAddrLen());if (n < 0){LOG(Loglevel::FATAL) << "绑定失败!";exit(BIND_ERR);}LOG(Loglevel::INIF) << "绑定成功!";}void ListenOrDie(int backlog) override{int n = ::listen(_sockfd, backlog);if (n < 0){LOG(Loglevel::FATAL) << "监听失败!";exit(LISTEN_ERR);}LOG(Loglevel::INIF) << "监听成功!";}int Accept(InetAddr * client) override{struct sockaddr_in peer;socklen_t len = sizeof(peer);int fd =::accept(_sockfd,CONV(peer),&len);if (fd < 0){LOG(Loglevel::WARNING)<<"连接失败!";return -1;}LOG(Loglevel::WARNING)<<"连接成功!";client->SetAddr(peer);return fd;}void Close() override{if(_sockfd >=0){::close(_sockfd);}}int Recv(std::string * out) override{//流式读取,不关心读到的是什么char buffer[4096*2];ssize_t n =::recv(_sockfd,buffer,sizeof(buffer)-1,0);if (n >0){buffer[n]=0;*out+=buffer;return n;}return n;}int Send(std::string &message) override{return send(_sockfd,message.c_str(),message.size(),0);}int Connect(const std::string &server_ip ,uint16_t port) override{InetAddr server(server_ip,port);return ::connect(_sockfd,server.NetAddrPtr(),server.NetAddrLen()) ; }int Fd(){return _sockfd;}private:int _sockfd; //};// class UdpSocket : public Socket// {// };
}
理解 select 执行过程
理解 select 模型的关键在于理解 fd_set,为说明方便,取 fd_set 长度为 1 字节,fd_set中的每一 bit 可以对应一个文件描述符 fd。则 1 字节长的 fd_set 最大可以对应 8 个 fd.
- (1)执行 fd_set set; FD_ZERO(&set);则 set 用位表示是 0000,0000。
- (2)若 fd=5,执行 FD_SET(fd,&set);后 set 变为 0001,0000(第 5 位置为 1)
- (3)若再加入 fd=2,fd=1,则 set 变为 0001,0011
- (4)执行 select(6,&set,0,0,0)阻塞等待
- (5)若 fd=1,fd=2 上都发生可读事件,则 select 返回,此时 set 变为0000,0011。注意:没有事件发生的 fd=5 被清空。
socket 就绪条件
读就绪
- socket 内核中, 接收缓冲区中的字节数, 大于等于低水位标记SO_RCVLOWAT. 此时可以无阻塞的读该文件描述符, 并且返回值大于 0;
- socket TCP 通信中, 对端关闭连接, 此时对该 socket 读, 则返回 0;
- 监听的 socket 上有新的连接请求;
- socket 上有未处理的错误;
写就绪
- socket 内核中, 发送缓冲区中的可用字节数(发送缓冲区的空闲位置大小), 大于等于低水位标记 SO_SNDLOWAT, 此时可以无阻塞的写, 并且返回值大于 0;
- socket 的写操作被关闭(close 或者 shutdown). 对一个写操作被关闭的 socket进行写操作, 会触发 SIGPIPE 信号;
- socket 使用非阻塞 connect 连接成功或失败之后;
- socket 上有未读取的错误;
异常就绪
- socket 上收到带外数据. 关于带外数据, 和 TCP 紧急模式相关(回忆 TCP 协议头中, 有一个紧急指针的字段), 同学们课后自己收集相关资料.
select 的特点
- 可监控的文件描述符个数取决于 sizeof(fd_set)的值. 我这边服务器上sizeof(fd_set)=512,每 bit 表示一个文件描述符,则我服务器上支持的最大文件描述符是 512*8=4096.
- 将 fd 加入 select 监控集的同时,还要再使用一个数据结构 array 保存放到 select监控集中的 fd,一是用于再 select 返回后,array 作为源数据和 fd_set 进行 FD_ISSET 判断。二是 select 返回后会把以前加入的但并无事件发生的 fd 清空,则每次开始select 前都要重新从 array 取得 fd 逐一加入(FD_ZERO 最先),扫描 array 的同时取得 fd 最大值 maxfd,用于 select 的第一个参数。
备注:
fd_set 的大小可以调整,可能涉及到重新编译内核.
select 缺点
- 每次调用 select, 都需要手动设置 fd 集合, 从接口使用角度来说也非常不便.
- 每次调用 select,都需要把 fd 集合从用户态拷贝到内核态,这个开销在 fd 很多时会很大
- 同时每次调用 select 都需要在内核遍历传递进来的所有 fd,这个开销在 fd 很多时也很大
- select 支持的文件描述符数量太小.