Linux事件循环——高效处理多任务(高并发)
Linux事件循环是一个非常核心的概念,是构建高性能网络服务器、图形界面和应用框架的基石。简单来说,事件循环就是一个程序结构,它等待并分派来自多个事件源的事件或消息。可以把它想象成一个无限的 while 循环,其核心工作流程如下:
while (1) {// 1. 等待事件发生events = wait_for_events();// 2. 遍历所有发生的事件for (e in events) {// 3. 根据事件类型,调用相应的处理函数(回调函数)handle_event(e);}
}
核心思想是:当没有什么事情可做时就等待,当有事情发生时(比如网络连接来了、数据可读了、定时器超时了),就迅速地去处理它。
在传统的同步(阻塞)I/O模型中,如果你想从网络套接字读取数据,你的线程会一直卡在 read() 函数那里,直到数据到达。如果你要同时处理多个连接,就需要为每个连接创建一个线程或进程。这在连接数非常多(C10K问题)时,会消耗大量系统资源(内存、上下文切换开销)。
事件循环解决了这个问题,它使用非阻塞 I/O 和多路复用技术,使得单个线程就可以高效地管理成千上万的网络连接或文件描述符。
核心优势:
高性能和高并发:在 I/O 密集型应用中,可以只用少量线程(甚至一个线程)处理大量并发连接。
资源高效:避免了多线程/多进程带来的内存开销和上下文切换成本。
清晰的编程模型:通过回调、Promise 等方式,可以让代码逻辑更清晰,避免了“回调地狱”(虽然处理不当也会导致回调地狱)。
核心组件
非阻塞I/O
大前提。通过 fcntl(fd, F_SETFL, O_NONBLOCK) 将文件描述符(如 Socket)设置为非阻塞模式。这样,当对它进行 read 或 write 操作时,如果数据没有就绪,函数会立即返回一个错误(如 EAGAIN 或 EWOULDBLOCK),而不是阻塞线程。
I/O多路复用
事件循环的引擎。它允许一个线程监听多个文件描述符上的事件(可读、可写、异常等),并在任何一个描述符就绪时通知应用程序。
Linux 上有几种主要的 I/O 多路复用机制,它们都是事件循环的核心(具体可以参考往期对于多路复用的介绍https://blog.csdn.net/weixin_56228133/article/details/144373893):
select
工作方式: 传入三个文件描述符集合(读、写、异常),内核会轮询这些集合,并在有事件发生时修改集合并返回。
缺点:
文件描述符数量有限制(通常是1024)。
每次调用都需要在用户态和内核态之间拷贝整个描述符集合。
内核需要线性扫描所有描述符来找出就绪的,效率低。
poll
工作方式: 与 select 类似,但使用一个 pollfd 结构体数组,没有文件描述符数量的硬限制。
缺点:
仍然需要在内核中线性扫描所有描述符。
大量连接时,性能仍然会下降。
epoll (最重要、最常用)
工作方式:
epoll_create:创建一个 epoll 实例。
epoll_ctl:向 epoll 实例中添加、修改或删除要监听的文件描述符和事件。
epoll_wait:等待事件发生。它只返回已经就绪的事件,而不是所有被监听的描述符。
优点:
高效: 无需每次调用都传递整个描述符集合,内核通过一个红黑树管理描述符。
可扩展: 事件通知是 O(1) 复杂度的,性能不会随着监听描述符数量的增加而线性下降。
边缘触发和水平触发: 提供了更精细的控制。
定时器
事件循环通常还需要处理定时任务,比如“5秒后执行某个函数”。这通常是通过一个最小堆来实现的,堆顶是下一个将要超时的定时器。在调用 epoll_wait 时,可以设置一个超时时间,这个时间就是当前最近的一个定时器到期时间。这样,epoll_wait 既可以响应 I/O 事件,也可以响应定时器事件。
回调函数
事件循环的灵魂。当某个事件(如“socket可读”)发生时,事件循环会调用预先为该事件注册好的函数,这个函数就是回调函数。https://blog.csdn.net/weixin_56228133/article/details/149884545
Epoll 事件循环用例
设置监听/事件注册 -> 事件循环 -> 等待事件 -> 处理事件
#include <sys/epoll.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>#define MAX_EVENTS 10int main() {int epoll_fd = epoll_create1(0);if (epoll_fd == -1) {perror("epoll_create1");return 1;}// 假设 server_socket 是一个已经创建并监听的socket// 将其添加到 epoll 实例中,监听读事件(新连接)struct epoll_event ev, events[MAX_EVENTS];ev.events = EPOLLIN; // 监听可读事件ev.data.fd = server_socket; // 携带的数据,通常是文件描述符epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_socket, &ev);// 事件循环开始while (1) {// 等待事件,超时时间设为 -1(无限等待)int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);if (nfds == -1) {perror("epoll_wait");break;}// 处理所有就绪的事件for (int i = 0; i < nfds; i++) {if (events[i].data.fd == server_socket) {// 服务器socket可读,表示有新连接int client_socket = accept(server_socket, NULL, NULL);// 将新连接的socket设为非阻塞并加入epoll监听fcntl(client_socket, F_SETFL, O_NONBLOCK);ev.events = EPOLLIN | EPOLLET; // 监听读事件,边缘触发模式ev.data.fd = client_socket;epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client_socket, &ev);} else {// 客户端socket可读,处理数据int client_fd = events[i].data.fd;char buffer[1024];int n = read(client_fd, buffer, sizeof(buffer));if (n > 0) {// 处理收到的数据printf("Received: %.*s\n", n, buffer);// ... (可以回写数据等)} else if (n == 0 || (n < 0 && errno != EAGAIN)) {// 连接关闭或出错,从epoll中移除并关闭socketepoll_ctl(epoll_fd, EPOLL_CTL_DEL, client_fd, NULL);close(client_fd);}// 如果 n < 0 且 errno == EAGAIN,表示数据已读完,非阻塞返回}}}close(epoll_fd);return 0;
}
常见的使用事件循环的库和应用:
Libuv: Node.js 使用的跨平台异步 I/O 库,是事件循环的典范实现。
libevent / libev: 老牌的高性能事件库。
glib Main Loop: GTK+ 和 GNOME 应用使用的事件循环。
Boost.Asio (C++): 一个强大的跨平台网络库,其核心也是基于 Proactor 或 Reactor 模式的事件循环。
Nginx / Redis: 这些高性能服务器本身就是一个事件循环的经典应用。
应用场景:网络服务器、GUI 应用、游戏服务器、任何需要高效处理大量并发 I/O 的地方。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <sys/timerfd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <errno.h>
#include <time.h>
#include <stdbool.h>#define MAX_EVENTS 10
#define BUFFER_SIZE 1024// 回调函数类型定义
typedef void (*timer_callback_t)(void* data);
typedef void (*io_callback_t)(int fd, void* data);// 定时器结构
struct timer_info {int timer_fd;timer_callback_t callback;void* user_data;char* description;
};// IO 事件结构
struct io_info {int fd;io_callback_t callback;void* user_data;
};// 全局变量
struct epoll_event events[MAX_EVENTS];
struct timer_info* timers[10];
struct io_info* io_handlers[100];
int timer_count = 0;// 设置定时器
int setup_timer(long sec, long nsec, timer_callback_t callback, void* data, const char* desc) {// 创建 timerfdint timer_fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);if (timer_fd == -1) {perror("timerfd_create");return -1;}// 设置定时器参数struct itimerspec timer_spec;timer_spec.it_value.tv_sec = sec; // 首次超时时间timer_spec.it_value.tv_nsec = nsec;timer_spec.it_interval.tv_sec = sec; // 间隔时间timer_spec.it_interval.tv_nsec = nsec;if (timerfd_settime(timer_fd, 0, &timer_spec, NULL) == -1) {perror("timerfd_settime");close(timer_fd);return -1;}// 保存定时器信息struct timer_info* timer = malloc(sizeof(struct timer_info));timer->timer_fd = timer_fd;timer->callback = callback;timer->user_data = data;timer->description = strdup(desc);timers[timer_count++] = timer;printf("定时器创建成功: %s (fd: %d)\n", desc, timer_fd);return timer_fd;
}// 定时器回调函数示例
void periodic_task(void* data) {static int count = 0;count++;// 读取定时器数据(必须读,否则会持续触发)uint64_t expirations;read(((struct timer_info*)data)->timer_fd, &expirations, sizeof(expirations));time_t now = time(NULL);printf("[定时器 %s] 触发次数: %d, 当前时间: %s", ((struct timer_info*)data)->description, count, ctime(&now));if (count >= 5 && strcmp(((struct timer_info*)data)->description, "5秒定时器") == 0) {printf("5秒定时器已完成5次触发,现在停止它\n");// 停止定时器struct itimerspec stop_spec = {0};timerfd_settime(((struct timer_info*)data)->timer_fd, 0, &stop_spec, NULL);}
}void one_shot_timer(void* data) {uint64_t expirations;read(((struct timer_info*)data)->timer_fd, &expirations, sizeof(expirations));printf("[一次性定时器] 触发! 数据: %s\n", (char*)data);// 一次性定时器触发后关闭close(((struct timer_info*)data)->timer_fd);
}// IO 回调函数
void handle_client_data(int fd, void* data) {char buffer[BUFFER_SIZE];ssize_t n = read(fd, buffer, sizeof(buffer) - 1);if (n > 0) {buffer[n] = '\0';printf("收到客户端数据: %s", buffer);// 回显给客户端write(fd, buffer, n);} else if (n == 0 || (n < 0 && errno != EAGAIN)) {printf("客户端断开连接 (fd: %d)\n", fd);close(fd);}
}void handle_new_connection(int server_fd, void* data) {int epoll_fd = *(int*)data;struct sockaddr_in client_addr;socklen_t client_len = sizeof(client_addr);int client_fd = accept(server_fd, (struct sockaddr*)&client_addr, &client_len);if (client_fd == -1) {perror("accept");return;}// 设置为非阻塞fcntl(client_fd, F_SETFL, O_NONBLOCK);// 添加到 epollstruct epoll_event ev;ev.events = EPOLLIN | EPOLLET;ev.data.fd = client_fd;if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client_fd, &ev) == -1) {perror("epoll_ctl: client_fd");close(client_fd);return;}// 保存 IO 处理器struct io_info* io_handler = malloc(sizeof(struct io_info));io_handler->fd = client_fd;io_handler->callback = handle_client_data;io_handler->user_data = NULL;io_handlers[client_fd] = io_handler;printf("新客户端连接: fd=%d\n", client_fd);
}// 创建服务器 socket
int create_server_socket(int port) {int server_fd = socket(AF_INET, SOCK_STREAM, 0);if (server_fd == -1) {perror("socket");exit(EXIT_FAILURE);}// 设置 SO_REUSEADDRint opt = 1;if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {perror("setsockopt");exit(EXIT_FAILURE);}struct sockaddr_in server_addr = {.sin_family = AF_INET,.sin_addr.s_addr = INADDR_ANY,.sin_port = htons(port)};if (bind(server_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) {perror("bind");exit(EXIT_FAILURE);}if (listen(server_fd, 10) < 0) {perror("listen");exit(EXIT_FAILURE);}// 设置为非阻塞fcntl(server_fd, F_SETFL, O_NONBLOCK);printf("服务器启动在端口 %d\n", port);return server_fd;
}int main() {int epoll_fd, server_fd;// 创建 epoll 实例epoll_fd = epoll_create1(0);if (epoll_fd == -1) {perror("epoll_create1");exit(EXIT_FAILURE);}// 创建服务器 socketserver_fd = create_server_socket(8080);// 添加服务器 socket 到 epollstruct epoll_event ev;ev.events = EPOLLIN;ev.data.fd = server_fd;if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &ev) == -1) {perror("epoll_ctl: server_fd");exit(EXIT_FAILURE);}// 保存服务器 IO 处理器struct io_info* server_handler = malloc(sizeof(struct io_info));server_handler->fd = server_fd;server_handler->callback = handle_new_connection;server_handler->user_data = &epoll_fd;io_handlers[server_fd] = server_handler;// 创建多个定时器printf("\n=== 创建定时器 ===\n");setup_timer(5, 0, periodic_task, NULL, "5秒定时器");setup_timer(2, 0, periodic_task, NULL, "2秒定时器");// 一次性定时器(10秒后触发)setup_timer(10, 0, one_shot_timer, "这是一次性定时器的数据", "一次性定时器");printf("\n=== 事件循环开始 ===\n");printf("服务器运行在: http://localhost:8080\n");printf("定时器已启动,可以使用 telnet 或 curl 测试连接\n\n");// 事件循环while (true) {int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);if (nfds == -1) {perror("epoll_wait");break;}for (int i = 0; i < nfds; i++) {int fd = events[i].data.fd;// 检查是否是定时器事件bool is_timer = false;for (int j = 0; j < timer_count; j++) {if (timers[j]->timer_fd == fd) {// 调用定时器回调函数timers[j]->callback(timers[j]);is_timer = true;break;}}if (!is_timer) {// IO 事件 - 调用相应的回调函数if (io_handlers[fd] != NULL && (events[i].events & EPOLLIN)) {io_handlers[fd]->callback(fd, io_handlers[fd]->user_data);}}}}// 清理资源close(server_fd);close(epoll_fd);for (int i = 0; i < timer_count; i++) {close(timers[i]->timer_fd);free(timers[i]->description);free(timers[i]);}return 0;
}
测试网络连接:
# 编译
gcc -o event_loop event_loop.c# 运行
./event_loop# 在另一个终端执行
telnet localhost 8080
# 或者
curl http://localhost:8080
观察定时器输出,程序会自动显示定时器的触发情况,包括:2秒定时器每2秒触发一次、5秒定时器触发5次后自动停止、10秒一次性定时器触发一次。
定时器管理:使用 timerfd 创建内核定时器,timer_info 结构保存定时器信息和回调函数,支持周期性和一次性定时器。
I/O 事件管理:io_info 结构保存文件描述符和对应的回调函数,支持网络连接和数据读写。事件注册:将所有需要监听的文件描述符(socket、timerfd)添加到 epoll。