Linux 编程中的 I/O 复用
Linux 编程中的 I/O 复用
I/O 复用是 Linux 编程中处理多个文件描述符的高效方法,它允许程序同时监视多个文件描述符,当其中任何一个或多个文件描述符就绪(可读、可写或出现异常)时,程序就能得到通知并进行相应的处理。
主要的 I/O 复用机制
1. select
#include <sys/select.h>
int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);
// 相关宏操作
FD_ZERO(fd_set *set); // 清空集合
FD_SET(int fd, fd_set *set); // 添加fd到集合
FD_CLR(int fd, fd_set *set); // 从集合移除fd
FD_ISSET(int fd, fd_set *set); // 检查fd是否在集合中
特点:
- 可移植性好,几乎所有平台都支持
- 有文件描述符数量限制(通常1024)
- 每次调用都需要重新设置fd_set
- 线性扫描所有fd,效率随fd数量增加而下降
2. poll
#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
struct pollfd {
int fd; // 文件描述符
short events; // 等待的事件
short revents; // 实际发生的事件
};
特点:
- 没有文件描述符数量限制
- 使用链表存储,不需要每次重新初始化
- 仍然需要遍历所有fd来检查状态
- 比select稍高效
3. epoll (Linux特有)
#include <sys/epoll.h>
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events,
int maxevents, int timeout);
struct epoll_event {
uint32_t events; // Epoll events
epoll_data_t data; // User data variable
};
特点:
- 高性能,特别适合大量连接
- 使用回调机制,不需要遍历所有fd
- 支持边缘触发(ET)和水平触发(LT)模式
- Linux特有,不具备可移植性
比较
特性 | select | poll | epoll |
---|---|---|---|
可移植性 | 好 | 较好 | Linux特有 |
最大连接数 | 有限制(1024) | 无限制 | 无限制 |
效率 | 低 | 中等 | 高 |
触发方式 | 水平触发 | 水平触发 | 支持边缘触发 |
内存拷贝 | 每次调用都拷贝 | 每次调用都拷贝 | 内核和用户空间共享 |
使用场景建议
- select:适合跨平台、连接数少的简单应用
- poll:适合连接数中等、需要更好可移植性的应用
- epoll:适合Linux平台、高并发连接的应用(如Web服务器)
示例代码
epoll 示例
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#define MAX_EVENTS 10
#define PORT 8080
int main() {
int server_fd, epoll_fd;
struct epoll_event ev, events[MAX_EVENTS];
struct sockaddr_in address;
// 创建socket
if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
perror("socket failed");
exit(EXIT_FAILURE);
}
address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
address.sin_port = htons(PORT);
// 绑定socket
if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
perror("bind failed");
exit(EXIT_FAILURE);
}
// 监听
if (listen(server_fd, 10) < 0) {
perror("listen failed");
exit(EXIT_FAILURE);
}
// 创建epoll实例
epoll_fd = epoll_create1(0);
if (epoll_fd == -1) {
perror("epoll_create1");
exit(EXIT_FAILURE);
}
ev.events = EPOLLIN;
ev.data.fd = server_fd;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &ev) == -1) {
perror("epoll_ctl: server_fd");
exit(EXIT_FAILURE);
}
while (1) {
int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
if (nfds == -1) {
perror("epoll_wait");
exit(EXIT_FAILURE);
}
for (int n = 0; n < nfds; ++n) {
if (events[n].data.fd == server_fd) {
// 新连接
int new_socket = accept(server_fd, NULL, NULL);
if (new_socket == -1) {
perror("accept");
continue;
}
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = new_socket;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, new_socket, &ev) == -1) {
perror("epoll_ctl: new_socket");
close(new_socket);
}
} else {
// 处理客户端数据
char buffer[1024] = {0};
int valread = read(events[n].data.fd, buffer, 1024);
if (valread <= 0) {
// 连接关闭或出错
close(events[n].data.fd);
} else {
// 处理数据
printf("Received: %s\n", buffer);
// 回显
write(events[n].data.fd, buffer, valread);
}
}
}
}
close(server_fd);
return 0;
}
边缘触发(ET) vs 水平触发(LT)
-
水平触发(LT):
- 默认模式
- 只要文件描述符就绪,就会一直通知
- 编程更简单,不容易遗漏事件
-
边缘触发(ET):
- 需要设置EPOLLET标志
- 只在状态变化时通知一次
- 需要一次性处理完所有数据
- 效率更高,但编程更复杂
选择哪种模式取决于具体应用场景和性能需求。
Linux I/O 复用完整示例(select/poll/epoll)
下面是 select、poll 和 epoll 三种 I/O 复用机制的完整示例代码,均实现了一个简单的回显服务器功能。
1. select 示例
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <netinet/in.h>
#define MAX_CLIENTS 10
#define BUFFER_SIZE 1024
#define PORT 8080
int main() {
int server_fd, client_sockets[MAX_CLIENTS], max_sd, sd, activity;
fd_set readfds;
struct sockaddr_in address;
char buffer[BUFFER_SIZE] = {0};
// 初始化客户端socket数组
for (int i = 0; i < MAX_CLIENTS; i++) {
client_sockets[i] = 0;
}
// 创建socket
if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
perror("socket failed");
exit(EXIT_FAILURE);
}
address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
address.sin_port = htons(PORT);
// 绑定socket
if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
perror("bind failed");
exit(EXIT_FAILURE);
}
// 监听
if (listen(server_fd, 3) < 0) {
perror("listen failed");
exit(EXIT_FAILURE);
}
printf("Select server listening on port %d...\n", PORT);
while (1) {
// 清空socket集合
FD_ZERO(&readfds);
// 添加主socket到集合
FD_SET(server_fd, &readfds);
max_sd = server_fd;
// 添加客户端socket到集合
for (int i = 0; i < MAX_CLIENTS; i++) {
sd = client_sockets[i];
if (sd > 0) {
FD_SET(sd, &readfds);
}
if (sd > max_sd) {
max_sd = sd;
}
}
// 等待活动(无限等待)
activity = select(max_sd + 1, &readfds, NULL, NULL, NULL);
if ((activity < 0) && (errno != EINTR)) {
perror("select error");
}
// 如果是主socket活动,表示有新连接
if (FD_ISSET(server_fd, &readfds)) {
int new_socket;
if ((new_socket = accept(server_fd, NULL, NULL)) < 0) {
perror("accept");
exit(EXIT_FAILURE);
}
printf("New connection, socket fd is %d\n", new_socket);
// 添加新socket到数组
for (int i = 0; i < MAX_CLIENTS; i++) {
if (client_sockets[i] == 0) {
client_sockets[i] = new_socket;
break;
}
}
}
// 处理客户端数据
for (int i = 0; i < MAX_CLIENTS; i++) {
sd = client_sockets[i];
if (FD_ISSET(sd, &readfds)) {
int valread = read(sd, buffer, BUFFER_SIZE);
if (valread == 0) {
// 客户端断开连接
getpeername(sd, (struct sockaddr*)&address, (socklen_t*)&addrlen);
printf("Host disconnected, ip %s, port %d\n",
inet_ntoa(address.sin_addr), ntohs(address.sin_port));
close(sd);
client_sockets[i] = 0;
} else {
// 回显数据
buffer[valread] = '\0';
printf("Received from %d: %s\n", sd, buffer);
write(sd, buffer, strlen(buffer));
}
}
}
}
return 0;
}
2. poll 示例
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <poll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#define MAX_CLIENTS 10
#define BUFFER_SIZE 1024
#define PORT 8080
int main() {
int server_fd, client_sockets[MAX_CLIENTS];
struct pollfd fds[MAX_CLIENTS + 1]; // +1 for server socket
struct sockaddr_in address;
char buffer[BUFFER_SIZE] = {0};
// 初始化pollfd结构
for (int i = 0; i < MAX_CLIENTS + 1; i++) {
fds[i].fd = -1;
fds[i].events = POLLIN;
}
// 创建socket
if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
perror("socket failed");
exit(EXIT_FAILURE);
}
address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
address.sin_port = htons(PORT);
// 绑定socket
if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
perror("bind failed");
exit(EXIT_FAILURE);
}
// 监听
if (listen(server_fd, 3) < 0) {
perror("listen failed");
exit(EXIT_FAILURE);
}
printf("Poll server listening on port %d...\n", PORT);
// 添加服务器socket到pollfd数组
fds[0].fd = server_fd;
fds[0].events = POLLIN;
while (1) {
// 等待活动(无限等待)
int ret = poll(fds, MAX_CLIENTS + 1, -1);
if (ret < 0) {
perror("poll error");
continue;
}
// 检查所有socket
for (int i = 0; i < MAX_CLIENTS + 1; i++) {
if (fds[i].revents & POLLIN) {
if (fds[i].fd == server_fd) {
// 新连接
int new_socket;
if ((new_socket = accept(server_fd, NULL, NULL)) < 0) {
perror("accept");
continue;
}
printf("New connection, socket fd is %d\n", new_socket);
// 添加新socket到pollfd数组
for (int j = 1; j < MAX_CLIENTS + 1; j++) {
if (fds[j].fd == -1) {
fds[j].fd = new_socket;
fds[j].events = POLLIN;
break;
}
}
} else {
// 客户端数据
int valread = read(fds[i].fd, buffer, BUFFER_SIZE);
if (valread == 0) {
// 客户端断开连接
printf("Client %d disconnected\n", fds[i].fd);
close(fds[i].fd);
fds[i].fd = -1;
} else {
// 回显数据
buffer[valread] = '\0';
printf("Received from %d: %s\n", fds[i].fd, buffer);
write(fds[i].fd, buffer, strlen(buffer));
}
}
}
}
}
return 0;
}
3. epoll 示例(边缘触发模式)
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <fcntl.h>
#define MAX_EVENTS 10
#define BUFFER_SIZE 1024
#define PORT 8080
// 设置非阻塞
void setnonblocking(int sockfd) {
int flags = fcntl(sockfd, F_GETFL, 0);
fcntl(sockfd, F_SETFL, flags | O_NONBLOCK);
}
int main() {
int server_fd, epoll_fd;
struct epoll_event ev, events[MAX_EVENTS];
struct sockaddr_in address;
char buffer[BUFFER_SIZE] = {0};
// 创建socket
if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
perror("socket failed");
exit(EXIT_FAILURE);
}
address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
address.sin_port = htons(PORT);
// 绑定socket
if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
perror("bind failed");
exit(EXIT_FAILURE);
}
// 监听
if (listen(server_fd, 10) < 0) {
perror("listen failed");
exit(EXIT_FAILURE);
}
printf("Epoll server (ET mode) listening on port %d...\n", PORT);
// 创建epoll实例
epoll_fd = epoll_create1(0);
if (epoll_fd == -1) {
perror("epoll_create1");
exit(EXIT_FAILURE);
}
ev.events = EPOLLIN | EPOLLET; // 边缘触发模式
ev.data.fd = server_fd;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &ev) == -1) {
perror("epoll_ctl: server_fd");
exit(EXIT_FAILURE);
}
while (1) {
int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
if (nfds == -1) {
perror("epoll_wait");
exit(EXIT_FAILURE);
}
for (int n = 0; n < nfds; ++n) {
if (events[n].data.fd == server_fd) {
// 处理新连接
while (1) {
int new_socket = accept(server_fd, NULL, NULL);
if (new_socket == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// 已经处理完所有新连接
break;
} else {
perror("accept");
break;
}
}
printf("New connection, socket fd is %d\n", new_socket);
// 设置非阻塞
setnonblocking(new_socket);
// 添加新socket到epoll
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = new_socket;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, new_socket, &ev) == -1) {
perror("epoll_ctl: new_socket");
close(new_socket);
}
}
} else {
// 处理客户端数据(边缘触发需要一次性读取所有数据)
while (1) {
ssize_t count = read(events[n].data.fd, buffer, BUFFER_SIZE);
if (count == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// 已经读取完所有数据
break;
} else {
perror("read");
close(events[n].data.fd);
break;
}
} else if (count == 0) {
// 客户端断开连接
printf("Client %d disconnected\n", events[n].data.fd);
close(events[n].data.fd);
break;
}
// 处理数据
buffer[count] = '\0';
printf("Received from %d: %s\n", events[n].data.fd, buffer);
// 回显数据
write(events[n].data.fd, buffer, count);
}
}
}
}
close(server_fd);
return 0;
}
三种实现的关键区别
-
select:
- 需要每次重新构建fd_set
- 使用FD_ISSET检查就绪状态
- 有1024文件描述符限制
-
poll:
- 使用pollfd结构数组
- 没有文件描述符数量限制
- 检查revents字段确定就绪状态
-
epoll:
- 使用epoll_ctl管理监控列表
- 边缘触发模式需要非阻塞IO
- 性能最高,适合大量连接
选择建议
- 跨平台需求:选择select或poll
- 少量连接:select足够简单
- 中等规模:poll是更好的选择
- 高性能服务器:优先使用epoll(Linux专属)
所有示例都实现了基本的回显服务器功能,可以根据实际需求进行扩展和优化。