当前位置: 首页 > news >正文

Linux 编程中的 I/O 复用

Linux 编程中的 I/O 复用

I/O 复用是 Linux 编程中处理多个文件描述符的高效方法,它允许程序同时监视多个文件描述符,当其中任何一个或多个文件描述符就绪(可读、可写或出现异常)时,程序就能得到通知并进行相应的处理。

主要的 I/O 复用机制

1. select

#include <sys/select.h>

int select(int nfds, fd_set *readfds, fd_set *writefds,
           fd_set *exceptfds, struct timeval *timeout);

// 相关宏操作
FD_ZERO(fd_set *set);        // 清空集合
FD_SET(int fd, fd_set *set); // 添加fd到集合
FD_CLR(int fd, fd_set *set); // 从集合移除fd
FD_ISSET(int fd, fd_set *set); // 检查fd是否在集合中

特点:

  • 可移植性好,几乎所有平台都支持
  • 有文件描述符数量限制(通常1024)
  • 每次调用都需要重新设置fd_set
  • 线性扫描所有fd,效率随fd数量增加而下降

2. poll

#include <poll.h>

int poll(struct pollfd *fds, nfds_t nfds, int timeout);

struct pollfd {
    int fd;         // 文件描述符
    short events;   // 等待的事件
    short revents;  // 实际发生的事件
};

特点:

  • 没有文件描述符数量限制
  • 使用链表存储,不需要每次重新初始化
  • 仍然需要遍历所有fd来检查状态
  • 比select稍高效

3. epoll (Linux特有)

#include <sys/epoll.h>

int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events,
               int maxevents, int timeout);

struct epoll_event {
    uint32_t events;    // Epoll events
    epoll_data_t data;  // User data variable
};

特点:

  • 高性能,特别适合大量连接
  • 使用回调机制,不需要遍历所有fd
  • 支持边缘触发(ET)和水平触发(LT)模式
  • Linux特有,不具备可移植性

比较

特性selectpollepoll
可移植性较好Linux特有
最大连接数有限制(1024)无限制无限制
效率中等
触发方式水平触发水平触发支持边缘触发
内存拷贝每次调用都拷贝每次调用都拷贝内核和用户空间共享

使用场景建议

  1. select:适合跨平台、连接数少的简单应用
  2. poll:适合连接数中等、需要更好可移植性的应用
  3. epoll:适合Linux平台、高并发连接的应用(如Web服务器)

示例代码

epoll 示例

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>

#define MAX_EVENTS 10
#define PORT 8080

int main() {
    int server_fd, epoll_fd;
    struct epoll_event ev, events[MAX_EVENTS];
    struct sockaddr_in address;
    
    // 创建socket
    if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
        perror("socket failed");
        exit(EXIT_FAILURE);
    }
    
    address.sin_family = AF_INET;
    address.sin_addr.s_addr = INADDR_ANY;
    address.sin_port = htons(PORT);
    
    // 绑定socket
    if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
        perror("bind failed");
        exit(EXIT_FAILURE);
    }
    
    // 监听
    if (listen(server_fd, 10) < 0) {
        perror("listen failed");
        exit(EXIT_FAILURE);
    }
    
    // 创建epoll实例
    epoll_fd = epoll_create1(0);
    if (epoll_fd == -1) {
        perror("epoll_create1");
        exit(EXIT_FAILURE);
    }
    
    ev.events = EPOLLIN;
    ev.data.fd = server_fd;
    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &ev) == -1) {
        perror("epoll_ctl: server_fd");
        exit(EXIT_FAILURE);
    }
    
    while (1) {
        int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
        if (nfds == -1) {
            perror("epoll_wait");
            exit(EXIT_FAILURE);
        }
        
        for (int n = 0; n < nfds; ++n) {
            if (events[n].data.fd == server_fd) {
                // 新连接
                int new_socket = accept(server_fd, NULL, NULL);
                if (new_socket == -1) {
                    perror("accept");
                    continue;
                }
                
                ev.events = EPOLLIN | EPOLLET;
                ev.data.fd = new_socket;
                if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, new_socket, &ev) == -1) {
                    perror("epoll_ctl: new_socket");
                    close(new_socket);
                }
            } else {
                // 处理客户端数据
                char buffer[1024] = {0};
                int valread = read(events[n].data.fd, buffer, 1024);
                if (valread <= 0) {
                    // 连接关闭或出错
                    close(events[n].data.fd);
                } else {
                    // 处理数据
                    printf("Received: %s\n", buffer);
                    // 回显
                    write(events[n].data.fd, buffer, valread);
                }
            }
        }
    }
    
    close(server_fd);
    return 0;
}

边缘触发(ET) vs 水平触发(LT)

  1. 水平触发(LT)

    • 默认模式
    • 只要文件描述符就绪,就会一直通知
    • 编程更简单,不容易遗漏事件
  2. 边缘触发(ET)

    • 需要设置EPOLLET标志
    • 只在状态变化时通知一次
    • 需要一次性处理完所有数据
    • 效率更高,但编程更复杂

选择哪种模式取决于具体应用场景和性能需求。

Linux I/O 复用完整示例(select/poll/epoll)

下面是 select、poll 和 epoll 三种 I/O 复用机制的完整示例代码,均实现了一个简单的回显服务器功能。

1. select 示例

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <netinet/in.h>

#define MAX_CLIENTS 10
#define BUFFER_SIZE 1024
#define PORT 8080

int main() {
    int server_fd, client_sockets[MAX_CLIENTS], max_sd, sd, activity;
    fd_set readfds;
    struct sockaddr_in address;
    char buffer[BUFFER_SIZE] = {0};
    
    // 初始化客户端socket数组
    for (int i = 0; i < MAX_CLIENTS; i++) {
        client_sockets[i] = 0;
    }
    
    // 创建socket
    if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
        perror("socket failed");
        exit(EXIT_FAILURE);
    }
    
    address.sin_family = AF_INET;
    address.sin_addr.s_addr = INADDR_ANY;
    address.sin_port = htons(PORT);
    
    // 绑定socket
    if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
        perror("bind failed");
        exit(EXIT_FAILURE);
    }
    
    // 监听
    if (listen(server_fd, 3) < 0) {
        perror("listen failed");
        exit(EXIT_FAILURE);
    }
    
    printf("Select server listening on port %d...\n", PORT);
    
    while (1) {
        // 清空socket集合
        FD_ZERO(&readfds);
        
        // 添加主socket到集合
        FD_SET(server_fd, &readfds);
        max_sd = server_fd;
        
        // 添加客户端socket到集合
        for (int i = 0; i < MAX_CLIENTS; i++) {
            sd = client_sockets[i];
            if (sd > 0) {
                FD_SET(sd, &readfds);
            }
            if (sd > max_sd) {
                max_sd = sd;
            }
        }
        
        // 等待活动(无限等待)
        activity = select(max_sd + 1, &readfds, NULL, NULL, NULL);
        if ((activity < 0) && (errno != EINTR)) {
            perror("select error");
        }
        
        // 如果是主socket活动,表示有新连接
        if (FD_ISSET(server_fd, &readfds)) {
            int new_socket;
            if ((new_socket = accept(server_fd, NULL, NULL)) < 0) {
                perror("accept");
                exit(EXIT_FAILURE);
            }
            
            printf("New connection, socket fd is %d\n", new_socket);
            
            // 添加新socket到数组
            for (int i = 0; i < MAX_CLIENTS; i++) {
                if (client_sockets[i] == 0) {
                    client_sockets[i] = new_socket;
                    break;
                }
            }
        }
        
        // 处理客户端数据
        for (int i = 0; i < MAX_CLIENTS; i++) {
            sd = client_sockets[i];
            if (FD_ISSET(sd, &readfds)) {
                int valread = read(sd, buffer, BUFFER_SIZE);
                if (valread == 0) {
                    // 客户端断开连接
                    getpeername(sd, (struct sockaddr*)&address, (socklen_t*)&addrlen);
                    printf("Host disconnected, ip %s, port %d\n", 
                           inet_ntoa(address.sin_addr), ntohs(address.sin_port));
                    close(sd);
                    client_sockets[i] = 0;
                } else {
                    // 回显数据
                    buffer[valread] = '\0';
                    printf("Received from %d: %s\n", sd, buffer);
                    write(sd, buffer, strlen(buffer));
                }
            }
        }
    }
    
    return 0;
}

2. poll 示例

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <poll.h>
#include <sys/socket.h>
#include <netinet/in.h>

#define MAX_CLIENTS 10
#define BUFFER_SIZE 1024
#define PORT 8080

int main() {
    int server_fd, client_sockets[MAX_CLIENTS];
    struct pollfd fds[MAX_CLIENTS + 1]; // +1 for server socket
    struct sockaddr_in address;
    char buffer[BUFFER_SIZE] = {0};
    
    // 初始化pollfd结构
    for (int i = 0; i < MAX_CLIENTS + 1; i++) {
        fds[i].fd = -1;
        fds[i].events = POLLIN;
    }
    
    // 创建socket
    if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
        perror("socket failed");
        exit(EXIT_FAILURE);
    }
    
    address.sin_family = AF_INET;
    address.sin_addr.s_addr = INADDR_ANY;
    address.sin_port = htons(PORT);
    
    // 绑定socket
    if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
        perror("bind failed");
        exit(EXIT_FAILURE);
    }
    
    // 监听
    if (listen(server_fd, 3) < 0) {
        perror("listen failed");
        exit(EXIT_FAILURE);
    }
    
    printf("Poll server listening on port %d...\n", PORT);
    
    // 添加服务器socket到pollfd数组
    fds[0].fd = server_fd;
    fds[0].events = POLLIN;
    
    while (1) {
        // 等待活动(无限等待)
        int ret = poll(fds, MAX_CLIENTS + 1, -1);
        if (ret < 0) {
            perror("poll error");
            continue;
        }
        
        // 检查所有socket
        for (int i = 0; i < MAX_CLIENTS + 1; i++) {
            if (fds[i].revents & POLLIN) {
                if (fds[i].fd == server_fd) {
                    // 新连接
                    int new_socket;
                    if ((new_socket = accept(server_fd, NULL, NULL)) < 0) {
                        perror("accept");
                        continue;
                    }
                    
                    printf("New connection, socket fd is %d\n", new_socket);
                    
                    // 添加新socket到pollfd数组
                    for (int j = 1; j < MAX_CLIENTS + 1; j++) {
                        if (fds[j].fd == -1) {
                            fds[j].fd = new_socket;
                            fds[j].events = POLLIN;
                            break;
                        }
                    }
                } else {
                    // 客户端数据
                    int valread = read(fds[i].fd, buffer, BUFFER_SIZE);
                    if (valread == 0) {
                        // 客户端断开连接
                        printf("Client %d disconnected\n", fds[i].fd);
                        close(fds[i].fd);
                        fds[i].fd = -1;
                    } else {
                        // 回显数据
                        buffer[valread] = '\0';
                        printf("Received from %d: %s\n", fds[i].fd, buffer);
                        write(fds[i].fd, buffer, strlen(buffer));
                    }
                }
            }
        }
    }
    
    return 0;
}

3. epoll 示例(边缘触发模式)

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <fcntl.h>

#define MAX_EVENTS 10
#define BUFFER_SIZE 1024
#define PORT 8080

// 设置非阻塞
void setnonblocking(int sockfd) {
    int flags = fcntl(sockfd, F_GETFL, 0);
    fcntl(sockfd, F_SETFL, flags | O_NONBLOCK);
}

int main() {
    int server_fd, epoll_fd;
    struct epoll_event ev, events[MAX_EVENTS];
    struct sockaddr_in address;
    char buffer[BUFFER_SIZE] = {0};
    
    // 创建socket
    if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
        perror("socket failed");
        exit(EXIT_FAILURE);
    }
    
    address.sin_family = AF_INET;
    address.sin_addr.s_addr = INADDR_ANY;
    address.sin_port = htons(PORT);
    
    // 绑定socket
    if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
        perror("bind failed");
        exit(EXIT_FAILURE);
    }
    
    // 监听
    if (listen(server_fd, 10) < 0) {
        perror("listen failed");
        exit(EXIT_FAILURE);
    }
    
    printf("Epoll server (ET mode) listening on port %d...\n", PORT);
    
    // 创建epoll实例
    epoll_fd = epoll_create1(0);
    if (epoll_fd == -1) {
        perror("epoll_create1");
        exit(EXIT_FAILURE);
    }
    
    ev.events = EPOLLIN | EPOLLET; // 边缘触发模式
    ev.data.fd = server_fd;
    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &ev) == -1) {
        perror("epoll_ctl: server_fd");
        exit(EXIT_FAILURE);
    }
    
    while (1) {
        int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
        if (nfds == -1) {
            perror("epoll_wait");
            exit(EXIT_FAILURE);
        }
        
        for (int n = 0; n < nfds; ++n) {
            if (events[n].data.fd == server_fd) {
                // 处理新连接
                while (1) {
                    int new_socket = accept(server_fd, NULL, NULL);
                    if (new_socket == -1) {
                        if (errno == EAGAIN || errno == EWOULDBLOCK) {
                            // 已经处理完所有新连接
                            break;
                        } else {
                            perror("accept");
                            break;
                        }
                    }
                    
                    printf("New connection, socket fd is %d\n", new_socket);
                    
                    // 设置非阻塞
                    setnonblocking(new_socket);
                    
                    // 添加新socket到epoll
                    ev.events = EPOLLIN | EPOLLET;
                    ev.data.fd = new_socket;
                    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, new_socket, &ev) == -1) {
                        perror("epoll_ctl: new_socket");
                        close(new_socket);
                    }
                }
            } else {
                // 处理客户端数据(边缘触发需要一次性读取所有数据)
                while (1) {
                    ssize_t count = read(events[n].data.fd, buffer, BUFFER_SIZE);
                    if (count == -1) {
                        if (errno == EAGAIN || errno == EWOULDBLOCK) {
                            // 已经读取完所有数据
                            break;
                        } else {
                            perror("read");
                            close(events[n].data.fd);
                            break;
                        }
                    } else if (count == 0) {
                        // 客户端断开连接
                        printf("Client %d disconnected\n", events[n].data.fd);
                        close(events[n].data.fd);
                        break;
                    }
                    
                    // 处理数据
                    buffer[count] = '\0';
                    printf("Received from %d: %s\n", events[n].data.fd, buffer);
                    
                    // 回显数据
                    write(events[n].data.fd, buffer, count);
                }
            }
        }
    }
    
    close(server_fd);
    return 0;
}

三种实现的关键区别

  1. select:

    • 需要每次重新构建fd_set
    • 使用FD_ISSET检查就绪状态
    • 有1024文件描述符限制
  2. poll:

    • 使用pollfd结构数组
    • 没有文件描述符数量限制
    • 检查revents字段确定就绪状态
  3. epoll:

    • 使用epoll_ctl管理监控列表
    • 边缘触发模式需要非阻塞IO
    • 性能最高,适合大量连接

选择建议

  • 跨平台需求:选择select或poll
  • 少量连接:select足够简单
  • 中等规模:poll是更好的选择
  • 高性能服务器:优先使用epoll(Linux专属)

所有示例都实现了基本的回显服务器功能,可以根据实际需求进行扩展和优化。

http://www.dtcms.com/a/122345.html

相关文章:

  • Element UI 设置 el-table-column 宽度 width 为百分比无效
  • React九案例中
  • 敏捷迭代实战经验分享
  • leetcode_数组 189. 轮转数组
  • Odrive0.5.1-FOC电机控制 arm_cos_f32.cpp arm_sin_f32.cpp代码实现(一)
  • 中科岩创基坑自动化监测解决方案
  • 【11】数据结构之基于线性表的查找算法
  • 【消息队列kafka_中间件】一、快速入门分布式消息队列
  • Android 中Intent 相关问题
  • STM32CubeMX-H7-12-IIC读写MPU6050模块(中)-MPU6050模块详解以及软件IIC驱动
  • Node.js是js语言在服务器编译运行的环境,什么是IP和域名
  • Python包管理工具uv简单使用
  • nginx或tengine服务器,配置HTTPS下使用WebSocket的线上环境实践!
  • 【详细】MySQL 8 安装解压即用 (包含MySQL 5 卸载)
  • Python从入门到精通全套视频教程免费
  • UniApp基于xe-upload实现文件上传组件
  • 12. git merge
  • 【LeetCode 题解】数据库:1321.餐馆营业额变化增长
  • 使用RabbitMQ实现异步秒杀
  • 网络安全公司推荐:F5荣膺IDC全球Web应用与API防护领导者
  • 游戏引擎学习第212天
  • TimeDART:结合扩散去噪与自回归建模的时间序列自监督学习新框架
  • oracle 动态性能视图
  • CV - 目标检测
  • PyCharm显示主菜单和工具栏
  • 计算机视觉——图像金字塔与目标图像边缘检测原理与实践
  • 【人工智能】大语言模型多义词解析技术揭秘——以“项目“歧义消解为例
  • View UI (iview)表格拖拽排序
  • Dinky 和 Flink CDC 在实时整库同步的探索之路
  • 每日一题(小白)数组娱乐篇21