IO多路复用——Linux应用(8)
一、简介
一个服务端要想与多个客户端进行会话,可通过多线程服务端程序或者多进程,每个线程或进程需要对应一个客户端,线/进程的增加会增大CPU的开销。
IO多路复用,多路指多个连接,即与多个客户端进行会话,复用指仅通过一个线程完成。所以IO多路复用是指在一个线程中实现与多个客户端连接、会话。通常需要调用特定函数,让系统内核监视指定的文件描述符(包括网络通信中套接字),当文件描述符发生状态变化时通知线程对目标文件进行操作。
要想实现IO多路复用,通常使用的API有select()/poll()/epoll()
IO多路复用与多线程服务器的区别
多线程服务器:通过每连接一个客户端,就创建一个新的线程专用于处理这个客户端的会话(客户端越多、线程越多);
IO多路复用服务器(IO多路转接服务器):通过让系统内核监视指定的文件描述符集,当文件发生状态变化时,及时将变化的文件描述符反馈给服务器,然后服务器与对应的客户端进行会话(单个线程就可以实现)
二、select 函数
2.1 函数简介
头文件:#include <sys/select.h> #include <sys/time.h>
#include <sys/types.h>#include<unistd.h>
函数原型:int select(int nfds,fd_set *readfds, fd_set *writefds,fd_set *exceptfds,struct timeval *timeout);
函数功能:在规定超时时间内,对指定的文件描述符集进行监视,如果发生状态变化则函数立刻返回发生状态变化的文件描述符的个数;若达到了超时时间,则函数立刻返回
函数参数:
@param1:nfds 表示监视的文件个数,通常填待监视文件的最大描述符值+1,因为标准输入从0 开始 ,标准输出 为1 ,标准错误为 2 ,如果我们在打开一个fd=3,所以总共监视4个文件
@param2:readfds 指向一个fd_set结构,用于设置需要监视哪些文件描述符进入到可以读取状态;若此项无需监视,则填NULL
@param3:writefds 指向一个fd_set结构,用于设置需要监视哪些文件描述符进入到可以写入状态;若此项无需监视,则填NULL
@param4:exceptfds 指向一个fd_set结构,用于设置需要监视哪些文件描述符进入到可以异常状态;若此项无需监视,则填NULL
@param5:timeout 指向一个struct timeval结构,用于设置本次监视的超时时间,如果填NULL 则一直等待
返回值:若成功,则返回发生状态变化的文件描述符个数;若失败,返回-1
备注:select()函数执行时,将根据readfds、writefds、exceptfds指向的3个fd_set结构中哪些位被置位,来让系统内核监视相应的文件是否进入到对应的状态。之后再自动将readfds、writefds、exceptfds指向的3个fd_set结构先清零,若对应的文件在监视时间内发生了指定的状态变化,则自动将对应fd_set相应位置位typedef __kernel_fd_set fd_set;
#define __FD_SETSIZE 1024
typedef struct {
unsigned long fds_bits[__FD_SETSIZE / (8 * sizeof(long))];
} __kernel_fd_set;
struct timeval {
long tv_sec; /* 秒 */
long tv_usec; /* 微秒 */};
fd_set 是一个结构体,该结构体包含一个数组,数组一共有1024位,对应进程中允许打开的最多1024个文件,这种实现方式称为bitmap,每个位对应一个文件。即第0位对应fd=0文件的状态
select相关函数
void FD_CLR(int fd, fd_set *set); 将一个fd_set结构中第fd位清零
int FD_ISSET(int fd, fd_set *set); 返回一个fd_set结构中第fd位的值
void FD_SET(int fd, fd_set *set); 将一个fd_set结构中第fd位置位
void FD_ZERO(fd_set *set); 将一个fd_set结构中所有位清零
2.1 操作流程
1.定义最多3个fd_set结构,分别用于监视文件是否进入到可以读取/可以写入/异常状态
fd_set readfds, wrtiefds, exceptfds;(若不需要监视文件进入到某种状态,则可以相应地省去定义操作) 比如:若只需要监视文件进入到可读状态。 fd_set readfds;
2.将fd_set结构清零,根据想要监视的文件和状态,将对应位置位
比如:若想要监视文件描述符值为3、4、5、6的文件是否进入到可以读取状态
FD_ZERO(&readfds);
FD_SET(3, &readfds);
FD_SET(4, &readfds);
FD_SET(5, &readfds);
FD_SET(6, &readfds);3.调用select(),将fd_set结构和超时时间传入,比如:监视上方指定的文件,设置超时时间2.5s。
int ret;
struct timeval tm = {
.tv_sec = 2,
.tv_usec = 500* 1000,
};
ret = select(7, &readfds, NULL, NULL, & tm);//7是因为最大文件描述符值是6,对于fd_set结构最多访问最低7位4.根据select()返回值,决定是否需要对相应文件操作
if(ret > 0)
{
if(FD_ISSET(3, &readfds))
{
//表示文件描述符值为3的文件进入到可以读取状态
}
if(FD_ISSET(4, &readfds))
{
//表示文件描述符值为4的文件进入到可以读取状态
}
if(FD_ISSET(5, &readfds))
{
//表示文件描述符值为5的文件进入到可以读取状态
}
if(FD_ISSET(6, &readfds))
{
//表示文件描述符值为6的文件进入到可以读取状态
}
}
2.3 基于select实现IO多路复用服务器
设计思路:
1.创建一个监视列表,初始只监视 监听套接字(用于等待客户端连线)每当一个客户端成功连线后,就往监视列表添加与该客户端通信的套接字
2.若监视期间,存在客户端发送数据,则可以通过select()检测到,然后开展与它的会话。
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/msg.h>
#include <sys/sem.h>
#include <sys/socket.h>
#include <sys/select.h>
#include <sys/time.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <unistd.h>
#include <dirent.h>
#include <string.h>
#include <time.h>
#include <pthread.h>
#include <signal.h>
#include <semaphore.h>
#include <sqlite3.h>#define PORTNUM 12345
#define IPADDR "192.168.1.102"typedef struct sockaddr SA ;
typedef struct sockaddr_in SIN ;int fd_max;
//Server
//允许运行: ./main IP portnum 或 ./main IP
//比如: ./main 192.168.0.10 3333 或 ./main 192.168.0.10
int main(int argc, char *argv[])
{int listenSock ;//创建的套接字IDint clientSock ;int ret ;SIN seraddr ;//存储服务器方本地信息SIN cliaddr ;int addrlen ;char tbuf[1024] ;char rbuf[1024] ;/*1. 创建一个基于IPv4 + TCP协议的套接字*/listenSock = socket(AF_INET, SOCK_STREAM, 0);if(listenSock == -1){perror("socket calls error!\n");return -1;}/*2. 绑定本地信息和套接字*/bzero(&seraddr, sizeof(seraddr)) ;//清空变量等价memset(&seraddr, 0, sizeof(seraddr));seraddr.sin_family = AF_INET ;//IPv4协议seraddr.sin_port = htons(PORTNUM) ;//端口号seraddr.sin_addr.s_addr = inet_addr(IPADDR);//IPv4地址对应的32bit整数值if(argc >= 2){//取argv[1]指向的字符串作为IP地址seraddr.sin_addr.s_addr = inet_addr(argv[1]) ;}if(argc >= 3){//取argv[2]指向的字符串作为端口号seraddr.sin_port = htons( atoi(argv[2]) ) ;} ret = bind(listenSock, (SA*)&seraddr, sizeof(SA));if(ret == -1){perror("bind calls error!\n");return -2;}/*3. 创建监听队列*/ret = listen(listenSock, 5);if(ret == -1){perror("listn calls error!\n");return -3;}/* 为select()做准备 */fd_set readfds ;//作为select()参数fd_set readfds_save ;//记录需要监视哪些文件,不直接作为select()参数int nfds ;//用于记录select函数返回值struct timeval tm = {//超时时间2s.tv_sec = 2,.tv_usec = 0,};FD_ZERO(&readfds_save) ;//将结构体readfds_save位所有位清零FD_SET(listenSock, &readfds_save);//初始只需要监视listenSock监听套接字(连接客户端)fd_max = listenSock ;//当前最大套接字就是监听套接字while(1){readfds = readfds_save ;//需要读的套接字ID,也可以memcpy()nfds = select(fd_max + 1, &readfds, NULL, NULL, &tm) ;//返回值表示有多少个文件发生了状态变化 //循环检测readfds各个位,确认哪个位被置位--》确定可以读哪个文件//这个循环可以提前结束,前提处理完nfds个文件for(int fd = 3; (fd <= fd_max) && (nfds > 0) ; fd++){if(FD_ISSET(fd, &readfds)) //若文件描述符为fd的文件当前可以被读取{if(fd == listenSock){ //有客户端连线/*处理客户端连线*//*4. 等待客户端连接*/clientSock = accept(listenSock, (SA*)&cliaddr, &addrlen);nfds-- ;//处理了一个文件,for()循环可以提前结束//对cliaddr结构体,可以得到客户端的IP地址和端口号if(clientSock == -1){perror("accept calls error!\n");}else{ //成功连接一个客户端printf("one client has connected!\n");FD_SET(clientSock, &readfds_save) ;//需要将与该客户端的通信套接字添加到监视列表if(clientSock > fd_max){fd_max = clientSock ;//因为成功连接新的客户端,更新最大文件描述符值}}}else{//收到来自通信套接字为fd的客户端发送的数据/*5. 处理客户端发送的数据*/int r_len = recv(fd, rbuf, sizeof(rbuf), 0);nfds-- ;//处理了一个文件,for()循环可以提前结束if(r_len == 0){//客户端离线printf("one client is offline!\n") ;FD_CLR(fd, &readfds_save) ;//将与该客户端的通信套接字从监视列表中移除/*6. 关闭与客户端的会话*/ close(fd);continue;}send(fd, rbuf, r_len, 0);}} }}return 0;
}
三、poll函数
3.1 函数简介
头文件:#include <poll.h>
函数原型:int poll(struct pollfd *fds, nfds_t nfds, int timeout);
函数功能:监视数组fds的前nfds的元素(每个元素会关联一个文件、以及需要监视的事件)即监视若干个文件
函数参数:
@param1:fds 指向一个数组,这个数组用于存储需要监视哪些文件发生状态变化,同时若被监视的文件在监视期间若发生了状态变化,也可以通过该数组查询到。该数组每个元素指定一个文件描述符、一个待监视的状态和一个用于返回的状态。
@param2:nfds 通常填第一个参数fds指向的数组的元素个数
@param3:timeout 超时时间,单位是ms
返回值:若成功,返回规定时间内,发生指定事件的文件个数;若失败,返回-1
struct pollfd
{
int fd; /* 文件描述符 */
short events; /* 待监视的事件 */
short revents; /* 用于返回事件(在poll()返回后读取这个成员,确认文件有无发生事件) */
};
成员events通常填以下数值:
POLLIN 或 POLLRDNORM 已准备好读取数据
POLLOUT或 POLLWRNORM 已准备号写入数据
3.2 操作流程
示例:监视文件描述符值为3和4的文件是否可以读取数据。
1.根据待监视的文件个数,定义struct pollfd结构体数组。
struct pollfd sz[2];//想监视2个文件 sz 数组至少2个元素
2.对数组元素赋值(文件描述符、待监视的事件)
bzero(sz, sizeof(sz)); //先清零
//监视文件描述符值为3的文件,是否产生可以读取的事件
sz[0].fd = 3;
sz[0].events = POLLRDNORM;
//监视文件描述符值为4的文件,是否产生可以读取的事件
sz[1].fd = 4;
sz[1].events = POLLRDNORM;
3.调用poll进行监视
int nfds;
nfds = poll(sz, sizeof(sz)/sizeof(struct pollfd), 2000);
4.基于poll的返回值,读取数组,检测哪些文件发生了事件
for(int i = 0; (i < sizeof(sz)/sizeof(struct pollfd)) && (nfds > 0); i++ )
{
if(sz[i].revents & POLLRDNORM) //检测数组中第i个元素对应的文件是否进入到可读取状态
{
//读文件 read(……)
nfds--;
}
}
5.通常编程时,会将步骤2—4整合,放到while(1)结构中,以达到循环监视文件的效果
3.3 与select 区别
四、epoll函数
4.1 简介
在系统中通过使用红黑树结构实现监视、查找文件。因为红黑树具有较高的查找效率。
红黑树是一种自平衡的二叉查找树,是一种高效的查找树。它是由 Rudolf Bayer 于1978年发明,在当时被称为平衡二叉 B 树(symmetric binary B-trees)。后来,在1978年被 Leo J. Guibas 和 Robert Sedgewick 修改为如今的红黑树。红黑树具有良好的效率,它可在 O(logN) 时间内完成查找、增加、删除等操作。
4.2 相关API
头文件:#include <sys/epoll.h>
函数原型:int epoll_create(int size);
函数功能:创建一个epoll文件描述符(句柄)
函数参数:size 待监视的文件个数,但是从Linux2.6.8版本后,该参数只要大于0即可
返回值:若成功,返回一个epoll句柄(关联一个红黑树),若失败,返回-1
头文件:#include <sys/epoll.h>
函数原型:int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
函数功能:对epoll的控制操作(添加/移除 监视的文件等 à 对红黑树增/删节点)
函数参数:
@param1:epfd epoll句柄,通常填epoll_create()返回值
@param2:op 操作,三选一
EPOLL_CTL_ADD(添加监视的事件)
EPOLL_CTL_DEL(移除监视的事件)
EPOLL_CTL_MOD(修改监视的事件)
@param3:fd 填待操作(待监视/取消监视)文件的描述符
@param4:event 指向指向struct epoll_event
的指针,告诉内核监听哪些事件以及附带一个 64 bit 用户数据
返回值:0 成功;-1 失败,errno 指示原因typedef union epoll_data
{
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;struct epoll_event
{
uint32_t events; /* 监视事件 通常填EPOLLIN (可以读取) 或 EPOLLOUT(可以写入)*/
epoll_data_t data; /* User data variable */
};
头文件:#include <sys/epoll.h>
函数原型:int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
函数功能:监视epoll红黑树上的文件,将发生变化的文件通过events指向数组返回
函数参数:
@param1:epfd epoll句柄,填epoll_create()返回值
@param2:events 指向一个数组,该数组用于记录哪些文件发生状态变化。数组元素个数应该不小于监视的文件数。监视期间发生变化的文件信息会存到这个数组中,存储规则从数组0号元素开始存储。
@param3:maxevents 通常填第二个参数events指向的数组的元素个数
@param4:timeout 超时时间,单位ms
返回值:若成功,返回发生监视事件的文件个数;若失败,返回-1
4.3 操作流程
示例:监视文件描述符值为3和4文件是否进入到可以读取的状态
1.根据预期监视文件数,创建epoll结构(红黑树),获取返回值
int epfd = epoll_create(2); //实际大于0即可,参数任意
2.将待监视的文件信息添加到红黑树中(无需重复执行)
struct epoll_event ev;
ev. events = EPOLLIN;
ev.data.fd = 3;
epoll_ctl(epfd, EPOLL_CTL_ADD, 3, & ev);
ev. events = EPOLLIN;
ev.data.fd = 4;
epoll_ctl(epfd, EPOLL_CTL_ADD, 4, & ev);
3.在开始监视文件前,准备一个用于返回监视结果的数组
struct epoll_event epoll_sz[2]; //元素个数通常不小于监视的文件总数
4.监视文件
int nfds;
nfds = epoll_wait(epfd, epoll_sz, sizeof(epoll_sz) / sizeof(struct epoll_event), 2000);
//nfds表示本次监视时有多少个文件发生了状态变化,且nfds个数据将写入到数组epoll_sz中
5.基于epoll_wait()返回值,处理发生变化的文件
for(int i = 0; i < nfds; i++)
{
if(epoll_sz[i].data.fd == 3)
{
//文件描述符值为3的文件发生了变
}
if(epoll_sz[i].data.fd == 4)
{
//文件描述符值为4的文件发生了变化
}
}
6.通常将步骤4、5放到while(1)。