【Linux高级全栈开发】2.1.2 事件驱动reactor的原理与实现
【Linux高级全栈开发】2.1.2 事件驱动reactor的原理与实现
高性能网络学习目录
基础内容(两周完成):
-
2.1网络编程
- 2.1.1多路复用select/poll/epoll
- 2.1.2事件驱动reactor
- 2.1.3http服务器的实现
-
2.2网络原理
- 百万并发
- PosixAPI
- QUIC
-
2.3协程库
- NtyCo的实现
-
2.4dpdk
- 用户态协议栈的实现
-
2.5高性能异步io机制
项目内容(两周完成):
- 9.1 KV存储项目
- 9.2 RPC项目
- 9.3 DPDK项目
2.1.2 事件驱动reactor的原理与实现
1 基础知识
1.1 什么是事件驱动reactor
事件驱动 Reactor 模式是一种用于处理 I/O 多路复用的设计模式,特别适合构建高性能、高并发的网络服务器。它的核心思想是将事件检测和事件处理分离,通过一个事件多路分离器 (Event Demultiplexer) 来监听多个 I/O 通道上的事件,并将事件分发给对应的事件处理器 (Event Handler) 进行处理。
- Reactor 模式的基本组件
- 事件多路分离器:使用操作系统提供的机制(如 select、poll、epoll 等)来监听多个文件描述符上的 IO 事件
- 事件处理器:为不同的 IO 事件注册回调函数
- Reactor:负责管理事件多路分离器和事件处理器,当有事件发生时,调用对应的回调函数处理事件
1.2 reactor针对业务实现的优点
-
高性能:
- 通过 I/O 多路复用技术,高效处理大量并发连接,
- 避免了多线程的同步开销和复杂性
-
可拓展性:
- 可以根据需要添加不同的事件处理器
- 实现了事件分离(读写逻辑分离)避免频繁创建资源
- 支持多线程版本的 Reactor 模式(如主从 Reactor 模式)
-
资源利用率高:
- 单线程 Reactor 模式可以在单个线程中处理大量连接,减少内存占用
- 通过异步 I/O 和事件驱动机制,充分利用 CPU 和 I/O 资源
1.3 epoll封装 send_cb/recv_cb/accept_cb
epoll
被用作事件多路分离器,结合回调函数(send_cb
/recv_cb
/accept_cb
)实现高效的事件驱动编程。这种模式就是典型的 Reactor 模式:
-
epoll
是 Linux 下高性能的 I/O 多路复用机制,通过epoll_create
、epoll_ctl
和epoll_wait
三个系统调用实现:-
epoll_create
:创建一个 epoll 实例(如您代码中的epfd = epoll_create(MAX_EPOLLSIZE)
)。 -
epoll_ctl
:注册 / 修改 / 删除对特定文件描述符(FD)的事件监听(如EPOLLIN
/EPOLLOUT
)。 -
epoll_wait
:阻塞等待事件发生,返回就绪的 FD 列表。
-
-
定义了三个核心回调函数:
-
accept_cb(int fd)
- 触发条件:当监听套接字(
listen fd
)上有新连接进入时,epoll
返回EPOLLIN
事件。 - 职责:
- 调用
accept()
接受新连接,得到客户端套接字(client fd
)。 - 将
client fd
设置为非阻塞模式(ntySetNonblock
)。 - 为
client fd
注册EPOLLIN
事件到 epoll,绑定recv_cb
回调。
- 调用
- 触发条件:当监听套接字(
-
recv_cb(int fd)
- 触发条件:当客户端套接字(
client fd
)上有数据可读时,epoll
返回EPOLLIN
事件。 - 职责:
- 调用
recv()
读取客户端数据。 - 处理数据(如解析 HTTP/WebSocket 请求)。
- 将
fd
的事件类型从EPOLLIN
修改为EPOLLOUT
(通过epoll_ctl
),准备发送响应。
- 调用
- 触发条件:当客户端套接字(
-
send_cb(int fd)
- 触发条件:当客户端套接字(
client fd
)可写时,epoll
返回EPOLLOUT
事件。 - 职责:
- 调用
send()
向客户端发送响应数据。 - 将
fd
的事件类型从EPOLLOUT
修改回EPOLLIN
,继续等待下一次请求。
- 调用
- 触发条件:当客户端套接字(
-
-
主循环通过
epoll_wait
等待事件,并根据事件类型调用对应回调:- 监听套接字(
listen fd
):仅监听EPOLLIN
,触发accept_cb
。 - 客户端套接字(
client fd
):- 初始监听
EPOLLIN
,触发recv_cb
。 recv_cb
处理完请求后,修改为监听EPOLLOUT
,触发send_cb
。send_cb
发送响应后,再改回监听EPOLLIN
,形成循环。
- 初始监听
- 监听套接字(
-
使用
conn_list
数组存储每个连接的状态:- 索引优化:直接使用 FD 作为数组索引(如
conn_list[fd]
),避免查找开销。 - 状态流转:通过
status
字段或事件类型切换(EPOLLIN
/EPOLLOUT
)管理连接生命周期
- 索引优化:直接使用 FD 作为数组索引(如
-
Reactor 模式的优势体现在:
- 高性能:单线程处理大量连接(34 万 +),避免线程切换开销。
- 事件驱动:仅在事件发生时执行回调,资源利用率高。
- 可扩展性:通过回调函数解耦业务逻辑(如 HTTP/WebSocket 处理)。
- 非阻塞 I/O:
send()
/recv()
立即返回,避免阻塞等待。
总结
代码通过 epoll
+ 回调函数实现了经典的 Reactor 模式,核心在于:
- 事件多路分离:
epoll
监听多个 FD 的事件。 - 回调注册:为不同类型的 FD(监听 / 客户端)注册不同回调。
- 状态流转:通过修改监听的事件类型(
EPOLLIN
/EPOLLOUT
)实现读写状态切换。
2 「代码实现」Reactor设计模式实现epoll
2.1 「单请求」实现过程
核心逻辑:代码实现了一个基于 epoll 的高性能网络服务器,支持 HTTP 和 WebSocket 协议。下面我将逐行解释代码,并介绍事件驱动 reactor 模式的原理。
-
Reactor 模式的实现如下:
-
事件多路分离器:使用 Linux 的 epoll 机制实现
-
Reactor:由
main
函数中的事件循环实现 -
事件处理器:三个回调函数
accept_cb
、recv_cb
和send_cb
分别处理连接接受、数据读取和数据发送事件 -
事件注册:通过
event_register
函数将事件处理器与特定的事件类型关联
-
#include <errno.h>
#include <stdio.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <poll.h>
#include <sys/epoll.h>
#include <errno.h>
#include <sys/time.h>
#include "server.h"
// 发送信息buffer大小
#define BUFFER_LENGTH 1024
// fd大小
#define CONNECTION_SIZE 1024
// 定义回调函数
typedef int (*RCALLBACK)(int fd);// 设置epoll事件为全局的
int epfd = 0;
// 声明三个回调函数
int accept_cb(int fd);
int recv_cb(int fd);
int send_cb(int fd);struct conn {// 文件描述符int fd;// 读写缓冲区\缓冲区长度char rbuffer[BUFFER_LENGTH];int rlength;char wbuffer[BUFFER_LENGTH];int wlength;// 每一个IO与对应的回调函数RCALLBACK send_callback;// 因为这两个回调函数逻辑上是「或」的关系,所以用联合扩起来,以节省内存union {RCALLBACK recv_callback;RCALLBACK accept_callback;} r_action;int status;
};
// 连接列表,使用文件描述符作为索引
struct conn conn_list[CONNECTION_SIZE] = {0};// 设置epoll事件,添加(1)或修改(0)epoll监听的事件
int set_event(int fd, int event, int flag) {if (flag) { // non-zero addstruct epoll_event ev;ev.events = event;ev.data.fd = fd;epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);} else { // zero modstruct epoll_event ev;ev.events = event;ev.data.fd = fd;epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);}
}// 注册事件,初始化连接结构体并设置事件监听
int event_register(int fd, int event) {if (fd < 0) return -1;conn_list[fd].fd = fd;conn_list[fd].r_action.recv_callback = recv_cb;conn_list[fd].send_callback = send_cb;// 数据置零,以防覆写memset(conn_list[fd].rbuffer, 0, BUFFER_LENGTH);conn_list[fd].rlength = 0;memset(conn_list[fd].wbuffer, 0, BUFFER_LENGTH);conn_list[fd].wlength = 0;set_event(fd, event, 1);
}// listenfd(sockfd) --> EPOLLIN --> accept_cb
int accept_cb(int fd) {struct sockaddr_in clientaddr;socklen_t len = sizeof(clientaddr);int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len);//printf("accept finshed: %d\n", clientfd);if (clientfd < 0) {printf("accept errno: %d --> %s\n", errno, strerror(errno));return -1;}// 读事件注册event_register(clientfd, EPOLLIN); // | EPOLLET// 每接受1000个连接打印一次统计信息if ((clientfd % 1000) == 0) {struct timeval current;gettimeofday(¤t, NULL);int time_used = TIME_SUB_MS(current, begin);memcpy(&begin, ¤t, sizeof(struct timeval));printf("accept finshed: %d, time_used: %d\n", clientfd, time_used);}return 0;
}int recv_cb(int fd) {memset(conn_list[fd].rbuffer, 0, BUFFER_LENGTH );int count = recv(fd, conn_list[fd].rbuffer, BUFFER_LENGTH, 0);if (count == 0) { // disconnectprintf("client disconnect: %d\n", fd);close(fd);epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); // unfinishedreturn 0;} else if (count < 0) { // printf("count: %d, errno: %d, %s\n", count, errno, strerror(errno));close(fd);epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);return 0;}conn_list[fd].rlength = count;//printf("RECV: %s\n", conn_list[fd].rbuffer);#if 0 // echoconn_list[fd].wlength = conn_list[fd].rlength;memcpy(conn_list[fd].wbuffer, conn_list[fd].rbuffer, conn_list[fd].wlength);printf("[%d]RECV: %s\n", conn_list[fd].rlength, conn_list[fd].rbuffer);#elif 0http_request(&conn_list[fd]);#elsews_request(&conn_list[fd]);#endifset_event(fd, EPOLLOUT, 0);return count;
}int send_cb(int fd) {#if 0http_response(&conn_list[fd]);#elsews_response(&conn_list[fd]);#endifint count = 0;#if 0if (conn_list[fd].status == 1) {//printf("SEND: %s\n", conn_list[fd].wbuffer);count = send(fd, conn_list[fd].wbuffer, conn_list[fd].wlength, 0);set_event(fd, EPOLLOUT, 0);} else if (conn_list[fd].status == 2) {set_event(fd, EPOLLOUT, 0);} else if (conn_list[fd].status == 0) {if (conn_list[fd].wlength != 0) {count = send(fd, conn_list[fd].wbuffer, conn_list[fd].wlength, 0);}set_event(fd, EPOLLIN, 0);}
#elseif (conn_list[fd].wlength != 0) {count = send(fd, conn_list[fd].wbuffer, conn_list[fd].wlength, 0);}set_event(fd, EPOLLIN, 0);#endif//set_event(fd, EPOLLOUT, 0);return count;
}// 初始化TCP server
int init_server(unsigned short port) {int sockfd = socket(AF_INET, SOCK_STREAM, 0);struct sockaddr_in servaddr;servaddr.sin_family = AF_INET;servaddr.sin_addr.s_addr = htonl(INADDR_ANY); // 0.0.0.0servaddr.sin_port = htons(port); // 0-1023, if (-1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr))) {printf("bind failed: %s\n", strerror(errno));}listen(sockfd, 10);//printf("listen finshed: %d\n", sockfd); // 3 return sockfd;}int main() {unsigned short port = 2000;epfd = epoll_create(1);for (int i = 0;i < MAX_PORTS;i ++) {int sockfd = init_server(port + i);// 为每个监听套接字分配连接对象conn_list[sockfd].fd = sockfd;// 将 accept_cb 回调函数注册到 r_action 中conn_list[sockfd].r_action.recv_callback = accept_cb;// 通过 set_event 函数将监听套接字注册到 epoll,监听 EPOLLIN 事件(可读事件)set_event(sockfd, EPOLLIN, 1);}// 记录服务器启动时间,在后续的 accept_cb 中会定期打印连接建立的时间统计gettimeofday(&begin, NULL);while (1) { // mainloopstruct epoll_event events[1024] = {0};int nready = epoll_wait(epfd, events, 1024, -1);int i = 0;for (i = 0;i < nready;i ++) {int connfd = events[i].data.fd;// EPOLLIN和EPOLLOUT事件被视为互斥的
#if 0 if (events[i].events & EPOLLIN) {conn_list[connfd].r_action.recv_callback(connfd);} else if (events[i].events & EPOLLOUT) {conn_list[connfd].send_callback(connfd);}// EPOLLIN和EPOLLOUT事件会被独立处理
#else if (events[i].events & EPOLLIN) {conn_list[connfd].r_action.recv_callback(connfd);} if (events[i].events & EPOLLOUT) {conn_list[connfd].send_callback(connfd);}
#endif}}}
- 为什么选择独立处理模式而不是互斥处理模式?
- 提高并发效率:在高并发场景下,读写操作可能同时就绪。独立处理模式允许同时响应两种事件,减少事件处理延迟。
- 非阻塞 IO 适配:现代网络编程通常使用非阻塞 IO,读写操作互不影响。例如:
- 读取数据时不影响发送缓冲区的填充
- 发送数据时不阻塞新数据的接收
- 避免逻辑遗漏:使用
else if
可能导致某些事件被意外忽略。例如,当EPOLLIN
和EPOLLOUT
同时发生时,else if
会跳过其中一个事件的处理。
if (events[i].events & EPOLLIN) {// 处理可读事件
} else if (events[i].events & EPOLLOUT) {// 处理可写事件
}
if (events[i].events & EPOLLIN) {// 处理可读事件
}
if (events[i].events & EPOLLOUT) {// 处理可写事件
}
-
events[i].events
存储了某个文件描述符的事件集合,events[i].events
中的每个位都对应一种事件类型。当events[i].events
中的EPOLLIN
位被设置为 1 时,安位与为true -
events
字段是一个位掩码,不同的事件类型通过按位或(|
)组合在一起,通过按位与(&
)操作检查特定事件是否存在:if (events[i].events & EPOLLIN) {// 处理可读事件(如读取数据) } if (events[i].events & EPOLLOUT) {// 处理可写事件(如发送数据) } if (events[i].events & (EPOLLERR | EPOLLHUP)) {// 处理错误或挂起事件(如关闭连接) }
-
如果需要查看某个网络端口的服务有没有启动,可以用
netstat -anop | grep 2000(端口号)
来查询该服务有没有启动- 3306 mysql端口
- 6709 redis端口
cv@ubuntu:~$ netstat -anop | grep 2000
(Not all processes could be identified, non-owned process infowill not be shown, you would have to be root to see it all.)
tcp 0 0 0.0.0.0:2000 0.0.0.0:* LISTEN - off (0.00/0/0)
tcp 80 0 192.168.21.129:2000 192.168.21.1:57037 ESTABLISHED - off (0.00/0/0)
初代代码实现了一个简单的 TCP 服务器,它创建套接字、绑定到本地端口 2000 并监听连接,接受一个客户端连接后接收其发送的数据并原样返回,最后等待用户输入才退出程序。需要解决的问题:
-
现象观察:如果此时再启动一个服务器段的,network程序连接网关,会发现端口占用:
bind failed: Address already in use
如果此时再用网络助手连接2000端口,会出现以下现象,端口没有被占用,连接成功:
cv@ubuntu:~$ netstat -anop | grep 2000 (Not all processes could be identified, non-owned process infowill not be shown, you would have to be root to see it all.) tcp 0 0 0.0.0.0:2000 0.0.0.0:* LISTEN - off (0.00/0/0) tcp 80 0 192.168.21.129:2000 192.168.21.1:57037 ESTABLISHED - off (0.00/0/0)
-
原因是一个端口在同一时刻只能被一个进程绑定,当服务器在某个端口上进行监听时,它可以同时接受多个客户端的连接。
-
每当有一个客户端请求连接到服务器的指定端口时,服务器就会创建一个新的连接套接字(在代码中通常用新的文件描述符表示)来与该客户端进行通信,而服务器监听的端口仍然保持监听状态,继续接受其他客户端的连接请求。
-
-
程序优化:端口被绑定以后,不能再次被绑定。(如何在一个端口建立多个连接)
-
因此建立一个while循环,建立一次连接,就创建一个新的fd。
while (1) {printf("accept\n");int clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len);printf("accept finshed\n");char buffer[1024] = {0};int count = recv(clientfd, buffer, 1024, 0);printf("RECV: %s\n", buffer);count = send(clientfd, buffer, count, 0);printf("SEND: %d\n", count);}
-
-
程序优化:进入listen可以被连接,需要马上收发一次,然后再建立新的连接
-
可以每次建立连接时新开一个线程,专门处理这个线程内的连接
while (1) {printf("accept\n");int clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len);printf("accept finished\n");pthread_t pthread_id;pthread_create(&pthread_id, NULL, client_thread, &clientfd);}
-
-
程序优化:发送消息后只能收发一次
-
recv处加上一个while循环
void *client_thread(void *arg) {int clientfd = *(int*)arg;while (1) {char buffer[1024] = {0};int count = recv(clientfd, buffer, 1024, 0);if (count == 0) { // disconnectprintf("client disconnect: %d\n", clientfd);close(clientfd);break;}// parserprintf("RECV: %s\n", buffer);count = send(clientfd, buffer, count, 0);printf("SEND: %d\n", count);} }
-
-
程序优化:客户端断开后,程序进入死循环
-
加入处理断开
recv()返回0
的逻辑void *client_thread(void *arg) {int clientfd = *(int*)arg;while (1) {char buffer[1024] = {0};int count = recv(clientfd, buffer, 1024, 0);// 加入处理断开 `recv()返回0` 的逻辑if (count == 0) { // disconnectprintf("client disconnect: %d\n", clientfd);close(clientfd);break;}// parserprintf("RECV: %s\n", buffer);count = send(clientfd, buffer, count, 0);printf("SEND: %d\n", count);} }
-
-
现象观察:文件描述符fd依次递增
cv@ubuntu:~/share/0voice/2.High_Performance_Network/2.1.1Network_Io$ sudo ./network listen finished: 3 accept accept finished: 4 accept accept finished: 5 accept accept finished: 6 accept RECV: Welcome to NetAssist SEND: 20
ls /dev/fd
目录下的文件是文件描述符的符号链接,输出为0 1 2
,分别代表标准输入、标准输出和标准错误输出,它们是系统默认的文件描述符, 通过ls /dec/stdin -l
可以查看他们的信息- 因为文件描述符fd的数量是有限制的,所以实现百万并发的时候需要设置
open files
的数量,用ulimit -a
查看
完整代码与解读:
#include <errno.h>
#include <stdio.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <poll.h>
#include <sys/epoll.h>
#include <errno.h>
#include <sys/time.h>
// #include "server.h"
// 发送信息buffer大小
#define BUFFER_LENGTH 1024
// fd大小
#define CONNECTION_SIZE 1048576 // 1024 * 1024
// 端口数量
#define MAX_PORTS 20
// 定义回调函数
typedef int (*RCALLBACK)(int fd);// 设置epoll事件为全局的
int epfd = 0;
// extern struct timeval begin;
// 注意可以用begin来实现
struct timeval begin={0};
// 计算两个时间点之间的毫秒差
#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)// 声明三个回调函数
int accept_cb(int fd);
int recv_cb(int fd);
int send_cb(int fd);struct conn {// 文件描述符int fd;// 读写缓冲区\缓冲区长度char rbuffer[BUFFER_LENGTH];int rlength;char wbuffer[BUFFER_LENGTH];int wlength;// 每一个IO与对应的回调函数RCALLBACK send_callback;// 因为这两个回调函数逻辑上是「或」的关系,所以用联合扩起来,以节省内存union {RCALLBACK recv_callback;RCALLBACK accept_callback;} r_action;int status;
};
// 连接列表,使用文件描述符作为索引
struct conn conn_list[CONNECTION_SIZE] = {0};// 设置epoll事件,添加(1)或修改(0)epoll监听的事件
int set_event(int fd, int event, int flag) {if (flag) { // non-zero addstruct epoll_event ev;ev.events = event;ev.data.fd = fd;epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);} else { // zero modstruct epoll_event ev;ev.events = event;ev.data.fd = fd;epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);}
}// 注册事件,初始化连接结构体并设置事件监听
int event_register(int fd, int event) {if (fd < 0) return -1;conn_list[fd].fd = fd;// 在注册事件里把回调函数注册为recv_cbconn_list[fd].r_action.recv_callback = recv_cb;conn_list[fd].send_callback = send_cb;// 数据置零,以防覆写memset(conn_list[fd].rbuffer, 0, BUFFER_LENGTH);conn_list[fd].rlength = 0;memset(conn_list[fd].wbuffer, 0, BUFFER_LENGTH);conn_list[fd].wlength = 0;set_event(fd, event, 1);
}// listenfd(sockfd) --> EPOLLIN --> accept_cb
int accept_cb(int fd) {struct sockaddr_in clientaddr;socklen_t len = sizeof(clientaddr);int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len);//printf("accept finshed: %d\n", clientfd);if (clientfd < 0) {printf("accept errno: %d --> %s\n", errno, strerror(errno));return -1;}// 读事件注册event_register(clientfd, EPOLLIN); // | EPOLLET// 每接受1000个连接打印一次统计信息
#if 1if ((clientfd % 1000) == 0) {struct timeval current;gettimeofday(¤t, NULL);int time_used = TIME_SUB_MS(current, begin);memcpy(&begin, ¤t, sizeof(struct timeval));printf("accept finshed: %d, time_used: %d\n", clientfd, time_used);}
#endifreturn 0;
}int recv_cb(int fd) {memset(conn_list[fd].rbuffer, 0, BUFFER_LENGTH );int count = recv(fd, conn_list[fd].rbuffer, BUFFER_LENGTH, 0);if (count == 0) { // disconnectprintf("client disconnect: %d\n", fd);close(fd);epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); // unfinishedreturn 0;} else if (count < 0) { // printf("count: %d, errno: %d, %s\n", count, errno, strerror(errno));close(fd);epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);return 0;}conn_list[fd].rlength = count;// printf("RECV: %s\n", conn_list[fd].rbuffer);#if 1 // echo,把读到的数据再发出去conn_list[fd].wlength = conn_list[fd].rlength;memcpy(conn_list[fd].wbuffer, conn_list[fd].rbuffer, conn_list[fd].wlength);printf("[%d]RECV: %s\n", conn_list[fd].rlength, conn_list[fd].rbuffer);
#elif 0http_request(&conn_list[fd]);
#elif 0ws_request(&conn_list[fd]);
#endif// 设置为修改EPOLLset_event(fd, EPOLLOUT, 0);return count;
}int send_cb(int fd) {#if 0http_response(&conn_list[fd]);
#elif 0ws_response(&conn_list[fd]);
#endifint count = 0;
#if 0if (conn_list[fd].status == 1) {//printf("SEND: %s\n", conn_list[fd].wbuffer);count = send(fd, conn_list[fd].wbuffer, conn_list[fd].wlength, 0);set_event(fd, EPOLLOUT, 0);} else if (conn_list[fd].status == 2) {set_event(fd, EPOLLOUT, 0);} else if (conn_list[fd].status == 0) {if (conn_list[fd].wlength != 0) {count = send(fd, conn_list[fd].wbuffer, conn_list[fd].wlength, 0);}set_event(fd, EPOLLIN, 0);}
#elseif (conn_list[fd].wlength != 0) {count = send(fd, conn_list[fd].wbuffer, conn_list[fd].wlength, 0);}set_event(fd, EPOLLIN, 0);#endif//set_event(fd, EPOLLOUT, 0);return count;
}// 初始化TCP server
int init_server(unsigned short port) {int sockfd = socket(AF_INET, SOCK_STREAM, 0);struct sockaddr_in servaddr;servaddr.sin_family = AF_INET;servaddr.sin_addr.s_addr = htonl(INADDR_ANY); // 0.0.0.0servaddr.sin_port = htons(port); // 0-1023, if (-1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr))) {printf("bind failed: %s\n", strerror(errno));}listen(sockfd, 10);//printf("listen finshed: %d\n", sockfd); // 3 return sockfd;}int main() {unsigned short port = 2000;epfd = epoll_create(1);// 解决客户端不能请求地址,5元组不够的问题,增加端口for (int i = 0;i < MAX_PORTS;i ++) {int sockfd = init_server(port + i);// 为每个监听套接字分配连接对象conn_list[sockfd].fd = sockfd;// 将 accept_cb 回调函数注册到 r_action 中conn_list[sockfd].r_action.recv_callback = accept_cb;// 通过 set_event 函数将监听套接字注册到 epoll,监听 EPOLLIN 事件(可读事件)set_event(sockfd, EPOLLIN, 1);}// 记录服务器启动时间,在后续的 accept_cb 中会定期打印连接建立的时间统计gettimeofday(&begin, NULL);while (1) { // mainloopstruct epoll_event events[1024] = {0};int nready = epoll_wait(epfd, events, 1024, -1);int i = 0;for (i = 0;i < nready;i ++) {int connfd = events[i].data.fd;// EPOLLIN和EPOLLOUT事件被视为互斥的
#if 0 if (events[i].events & EPOLLIN) {conn_list[connfd].r_action.recv_callback(connfd);} else if (events[i].events & EPOLLOUT) {conn_list[connfd].send_callback(connfd);}// EPOLLIN和EPOLLOUT事件会被独立处理
#else // 用于判断 events[i].events 所表示的事件集合中是否包含 EPOLLIN 事件if (events[i].events & EPOLLIN) {conn_list[connfd].r_action.recv_callback(connfd);} if (events[i].events & EPOLLOUT) {conn_list[connfd].send_callback(connfd);}
#endif}}}
-
epoll
是 Linux 下高性能的 I/O 多路复用机制,通过epoll_create
、epoll_ctl
和epoll_wait
三个系统调用实现:-
epoll_create
:创建一个 epoll 实例(如您代码中的epfd = epoll_create(MAX_EPOLLSIZE)
)。 -
epoll_ctl
:注册 / 修改 / 删除对特定文件描述符(FD)的事件监听(如EPOLLIN
/EPOLLOUT
)。 -
epoll_wait
:阻塞等待事件发生,返回就绪的 FD 列表。
-
-
定义了三个核心回调函数:
-
accept_cb(int fd)
- 触发条件:当监听套接字(
listen fd
)上有新连接进入时,epoll
返回EPOLLIN
事件。 - 职责:
- 调用
accept()
接受新连接,得到客户端套接字(client fd
)。 - 将
client fd
设置为非阻塞模式(ntySetNonblock
)。 - 为
client fd
注册EPOLLIN
事件到 epoll,绑定recv_cb
回调。
- 调用
- 触发条件:当监听套接字(
-
recv_cb(int fd)
- 触发条件:当客户端套接字(
client fd
)上有数据可读时,epoll
返回EPOLLIN
事件。 - 职责:
- 调用
recv()
读取客户端数据。 - 处理数据(如解析 HTTP/WebSocket 请求)。
- 将
fd
的事件类型从EPOLLIN
修改为EPOLLOUT
(通过epoll_ctl
),准备发送响应。
- 调用
- 触发条件:当客户端套接字(
-
send_cb(int fd)
- 触发条件:当客户端套接字(
client fd
)可写时,epoll
返回EPOLLOUT
事件。 - 职责:
- 调用
send()
向客户端发送响应数据。 - 将
fd
的事件类型从EPOLLOUT
修改回EPOLLIN
,继续等待下一次请求。
- 调用
- 触发条件:当客户端套接字(
-
-
主循环通过
epoll_wait
等待事件,并根据事件类型调用对应回调:- 监听套接字(
listen fd
):仅监听EPOLLIN
,触发accept_cb
。 - 客户端套接字(
client fd
):- 初始监听
EPOLLIN
,触发recv_cb
。 recv_cb
处理完请求后,修改为监听EPOLLOUT
,触发send_cb
。send_cb
发送响应后,再改回监听EPOLLIN
,形成循环。
- 初始监听
- 监听套接字(
-
使用
conn_list
数组存储每个连接的状态:- 索引优化:直接使用 FD 作为数组索引(如
conn_list[fd]
),避免查找开销。 - 状态流转:通过
status
字段或事件类型切换(EPOLLIN
/EPOLLOUT
)管理连接生命周期
- 索引优化:直接使用 FD 作为数组索引(如
-
Reactor 模式的优势体现在:
- 高性能:单线程处理大量连接(34 万 +),避免线程切换开销。
- 事件驱动:仅在事件发生时执行回调,资源利用率高。
- 可扩展性:通过回调函数解耦业务逻辑(如 HTTP/WebSocket 处理)。
- 非阻塞 I/O:
send()
/recv()
立即返回,避免阻塞等待。
总结
代码通过 epoll
+ 回调函数实现了经典的 Reactor 模式,核心在于:
- 事件多路分离:
epoll
监听多个 FD 的事件。 - 回调注册:为不同类型的 FD(监听 / 客户端)注册不同回调。
- 状态流转:通过修改监听的事件类型(
EPOLLIN
/EPOLLOUT
)实现读写状态切换。
2.2 「百万连接的TCP」实现过程
核心逻辑: 通过 select
监听套接字 sockfd
的可读事件,当有数据可读时(如客户端连接或数据到达),select
返回并通知程序处理。
客户端代码,mul_port_client_epoll.c
,这段代码实现了一个基于 epoll 的高性能 TCP 客户端程序,通过非阻塞 I/O 和事件驱动机制,可在单个进程中同时维护大量(最多 34 万个)并发 TCP 连接,并通过复用端口的方式突破本地端口数量限制,向指定服务器持续发送和接收数据,同时统计连接建立速度和处理效率:
#include <stdio.h>
#include <string.h>
#include <stdlib.h>#include <sys/types.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <errno.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <fcntl.h>
#include <sys/time.h>
#include <unistd.h>// 定义常量
#define MAX_BUFFER 128 // 缓冲区大小
#define MAX_EPOLLSIZE (384*1024) // 最大epoll监听数量
#define MAX_PORT 1 // 最大端口复用数量// 计算两个时间点之间的毫秒差
#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)int isContinue = 0; // 控制是否继续创建新连接的标志// 设置文件描述符为非阻塞模式
static int ntySetNonblock(int fd) {int flags;flags = fcntl(fd, F_GETFL, 0);if (flags < 0) return flags;flags |= O_NONBLOCK;if (fcntl(fd, F_SETFL, flags) < 0) return -1;return 0;
}// 设置套接字选项,允许地址重用
static int ntySetReUseAddr(int fd) {int reuse = 1;return setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse, sizeof(reuse));
}int main(int argc, char **argv) {if (argc <= 2) {printf("Usage: %s ip port\n", argv[0]);exit(0);}const char *ip = argv[1]; // 服务器IP地址int port = atoi(argv[2]); // 服务器端口int connections = 0; // 当前连接数char buffer[128] = {0}; // 数据缓冲区int i = 0, index = 0; // 循环计数器和端口索引struct epoll_event events[MAX_EPOLLSIZE]; // epoll事件数组int epoll_fd = epoll_create(MAX_EPOLLSIZE); // 创建epoll实例strcpy(buffer, " Data From MulClient\n"); // 默认发送数据struct sockaddr_in addr; // 服务器地址结构memset(&addr, 0, sizeof(struct sockaddr_in));addr.sin_family = AF_INET;addr.sin_addr.s_addr = inet_addr(ip); // 设置服务器IPstruct timeval tv_begin;gettimeofday(&tv_begin, NULL); // 记录开始时间int sockfd = 0;while (1) {if (++index >= MAX_PORT) index = 0; // 循环使用端口struct epoll_event ev;// 创建新连接直到达到最大连接数或被暂停if (connections < 340000 && !isContinue) {sockfd = socket(AF_INET, SOCK_STREAM, 0);if (sockfd == -1) {perror("socket");goto err;}//ntySetReUseAddr(sockfd);addr.sin_port = htons(port+index); // 设置服务器端口// 连接服务器if (connect(sockfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0) {perror("connect");goto err;}ntySetNonblock(sockfd); // 设置为非阻塞模式ntySetReUseAddr(sockfd); // 允许地址重用// 发送数据到服务器sprintf(buffer, "Hello Server: client --> %d\n", connections);send(sockfd, buffer, strlen(buffer), 0);// 注册epoll事件,同时监听读写事件ev.data.fd = sockfd;ev.events = EPOLLIN | EPOLLOUT;epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sockfd, &ev);connections ++; // 连接数递增}// 每创建1000个连接或达到最大连接数时,进行一次epoll_waitif (connections % 1000 == 999 || connections >= 340000) {struct timeval tv_cur;memcpy(&tv_cur, &tv_begin, sizeof(struct timeval));gettimeofday(&tv_begin, NULL);int time_used = TIME_SUB_MS(tv_begin, tv_cur);printf("connections: %d, sockfd:%d, time_used:%d\n", connections, sockfd, time_used);// 等待事件发生,超时时间100msint nfds = epoll_wait(epoll_fd, events, connections, 100);for (i = 0; i < nfds; i++) {int clientfd = events[i].data.fd;// 处理可写事件if (events[i].events & EPOLLOUT) {// 发送数据send(sockfd, buffer, strlen(buffer), 0);} // 处理可读事件else if (events[i].events & EPOLLIN) {char rBuffer[MAX_BUFFER] = {0}; ssize_t length = recv(sockfd, rBuffer, MAX_BUFFER, 0);if (length > 0) {// 收到服务器响应"quit"时,停止创建新连接if (!strcmp(rBuffer, "quit")) {isContinue = 0;} } else if (length == 0) {// 连接关闭printf(" Disconnect clientfd:%d\n", clientfd);connections --;close(clientfd);} else {// 处理接收错误if (errno == EINTR || errno == EAGAIN || errno == ENOTSOCK) continue;printf(" Error clientfd:%d, errno:%d\n", clientfd, errno);close(clientfd);}} else {// 处理其他事件printf(" clientfd:%d, errno:%d\n", clientfd, errno);close(clientfd);}}}usleep(500); // 短暂休眠,避免CPU占用过高}return 0;
err:printf("error : %s\n", strerror(errno));return 0;}
-
出现问题1:FD最大数量限制1024
too many open files
-
ulimit -a
发现open files只有1024,使用ulinit -n 1048576
设置 -
也可以进入
vim /etc/security/limits.conf
,永久设置* soft nofile 1048576 * hard nofile 1048576
-
在ubuntu16.04中需要修改两次,进入
vim /etc/sysctl.conf
,增加fs.file-max = 1048576 net.nf_conntrack_max = 1048576
修改完后执行一下
sudo systemctl -p
-
-
出现问题2:不能分配请求地址
connect: Cannot assign requested address
- 原因:五元组不够:
(sip, dip, sport, dport, proto)
,localip
已确定,remoteip
已确定,localport
为 1024-65535,remoteport
可以增加为20个 - 解决方法:服务端建立多个ip端口
- 原因:五元组不够:
-
出现问题3:
Error:/proc/sys/net/nf conntrack max no such file or directory
- 解决方法:
sudo modprobe ip conntrack
设置网络防火墙
- 解决方法:
-
出现问题4:
Out of memory
- 解决方法:没办法,把内存设大一点
- 拓展:并发量(网络并发),QPS(每秒处理次数),时延,测试用例——是服务器开发最重要的
-
代码解释:
int nready = select(maxfd+1, &rset, NULL, NULL, NULL);
- 调用
select
函数监听文件描述符集合rset
中的可读事件。 - 参数说明:
maxfd+1
:指定监听的文件描述符范围(从 0 到maxfd
)。&rset
:监听可读事件的文件描述符集合。NULL
:不监听可写事件。NULL
:不监听异常事件。NULL
:阻塞模式,直到有文件描述符就绪。
- 返回值
nready
:就绪的文件描述符总数。 select
返回后,需要遍历文件描述符集合检查哪些就绪
- 调用
-
FD_ZERO
- 用法:
FD_ZERO(fd_set *set)
。 - 作用:将
fd_set
类型的集合set
初始化为空集,即把集合中表示各个文件描述符的位都清零 ,确保集合中不包含任何文件描述符。
- 用法:
-
FD_SET
- 用法:
FD_SET(int fd, fd_set *set)
。 - 作用:把指定的文件描述符
fd
添加到集合set
中 ,也就是将集合中对应fd
的位设置为 1 ,表示该文件描述符在集合内,后续可对其进行相关状态检测。
- 用法:
-
FD_CLR
- 用法:
FD_CLR(int fd, fd_set *set)
。 - 作用:从集合
set
中移除指定的文件描述符fd
,即将集合中对应fd
的位设置为 0 ,表示该文件描述符不在集合内了。
- 用法:
-
FD_ISSET
- 用法:
FD_ISSET(int fd, fd_set *set)
。 - 作用:用于检测文件描述符
fd
是否在集合set
中。如果fd
在集合set
中,返回值为非零(表示真) ;如果不在集合中,返回值为 0(表示假) 。常配合select
等函数使用,在select
返回后,判断哪些文件描述符满足了相应条件。
- 用法:
-
fd_set
是一种用于在多路复用 I/O 操作中存储文件描述符集合的数据结构 ,常与select
函数配合使用。fd_set
是一个 bit 位集合,它采用类似位图(Bitmap)的方式,其中每一位对应一个文件描述符。若某一位被置为 1 ,代表对应的文件描述符在集合内;若为 0 ,则表示不在集合内。- 比如系统中文件描述符范围是 0 - 1023 ,
fd_set
就有 1024 个位与之对应 ,某位为 1 代表对应文件描述符在集合内,为 0 则不在。
-
缺点:
-
单个进程能够监视的文件描述符的数量存在最大限制,在 Linux 系统上一般为 1024。
-
每次调用
select
时都需要将文件描述符集合fd_set
从用户空间拷贝到内核空间,开销较大。 -
当
select
返回后,需要遍历所有文件描述符fd_set
来找到就绪的那些,效率较低。
-
下一章:2.1.3 http服务器的实现
https://github.com/0voice