Select、Epoll 与 IOCP模型的介绍与区别
@TOC
一、I/O复用
IO 复用(I/O Multiplexing)是一种并发处理多个 I/O 操作的机制,允许一个进程或线程监视多个文件描述符(sockets、files、pipes等),并在其中任意一个文件描述符就绪(可读或可写)时进行操作。这种机制主要用于提高系统的性能和效率,特别是在处理大量并发连接或事件时非常有用。.
IO复用和多进程服务器区别:
二、总结
- select:通过轮询检测描述符状态,适用低并发或跨平台场景。
- epoll:基于事件驱动,支持高并发,是 Linux 下高性能服务的首选(如 Nginx、Redis)。
- IOCP:IOCP(I/O Completion Port,完成端口) 是 Windows 操作系统专为高并发 I/O 设计的高效异步模型;
select | epoll | IOCP | |
---|---|---|---|
平台 | 跨平台支持(Windows/Linux) | Linux | Windows |
文件描述符数量限制 | 基于 fd_set 位图,默认上限 1024(可修改宏但效率下降) | 无硬性限制,内核使用红黑树管理描述符 | 无硬性限制,内核通过 I/O 完成端口管理 |
时间复杂度 | 每次调用需遍历所有描述符,时间复杂度 O(n) | 仅处理就绪事件,时间复杂度 O(1)(与总描述符数无关) | 每次操作为 O(1),通过 I/O 完成端口高效管理任务 |
数据拷贝开销 | 每次调用需将描述符集合从用户空间拷贝到内核 | 通过 epoll_ctl 注册描述符后,内核与用户空间共享内存(mmap 实现),避免重复拷贝。 | 使用重叠 I/O 重叠结构避免重复拷贝 |
内核通知机制 | 内核遍历所有描述符,轮询检查就绪状态 | 内核通过回调机制将就绪事件加入就绪链表(eventpoll.rdlist),用户直接读取链表。 | 内核通过 I/O 完成端口将已完成的 I/O 操作报告给用户 |
并发数 | 低并发(<1k) | 高并发(>10k) | 高并发(>10k) |
触发模式 | 水平触发(默认) | 水平触发(默认),可设置边缘触发) | 完成触发, |
适用场景 | 小规模或中等规模的网络应用 | 高并发场景,如大规模网络服务、数据库连接池等 | 高性能 Windows 服务器(如数据库、游戏后端、大规模网络服务) |
二、Select详细介绍
I/O 多路复用接口,通过轮询检测文件描述符的就绪状态。步骤如图:
fd_set结构
通过轮询检测文件描述符的就绪状态,此时首先需要将要监视的文件描述符集中到 一起,集中时要按照io类型(接收、传输、异常)进行区分->引入fd_set数组(存有0和1的位数组):
在fd-set变量中注册或更改值的操作都由下列宏完成:
select函数
select 函数用来验证 3种类型IO操作监视项的变化情况;select
函数用于监视多个文件描述符,以查看是否有任何一个准备好进行 I/O 操作(如读取、写入或异常条件)指定监视范围
select 函数的第一个参数要求传入待监视的所有文件描述符中最大值加1。原因在于,文件描述符从 0 开始计数,所以只需取已注册到 fd_set 中的所有文件描述符的最大值,再加 1,作为该参数传递给 select。
超时时间
select 默认是阻塞操作,只有当监视的文件描述符状态发生变化时才返回。为了防止一直阻塞,我们可以通过最后一个参数设置超时时间。这个参数是一个指向 timeval 结构体的指针,通过设置结构体中的 tv_sec(秒)和 tv_usec(微秒)来指定等待时间。若在指定时间内没有任何变化,select 将返回 0。如果不需要超时功能,可以将此参数设为 NULL,使 select 一直阻塞直到有文件描述符发生变化。:
调用 select 函数后查看结果
select函数调用完成后,向其传递的fd_set中将发生变化, 原来为1的所有位均变为0 ,但发生变化的文件描述符对应位除外。 因此,可以认为值仍为 1的位置上的文件描述符发生了变化
使用select 函数时可以将多个文件描述符集中到起统一监视,项目(监视项称为事件)如下:
select实现IO复用的回声服务器端调用示例:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/select.h>
#include <netinet/in.h>
#define MAX_CLIENTS 1024
#define BUFFER_SIZE 1024
#define PORT 8080
int main() {
int listen_fd, client_fd, max_fd;
struct sockaddr_in server_addr;
fd_set read_fds;
char buffer[BUFFER_SIZE];
// 创建监听socket
listen_fd = socket(AF_INET, SOCK_STREAM, 0);
if (listen_fd < 0) {
perror("socket");
exit(EXIT_FAILURE);
}
// 绑定地址
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = htons(PORT);
if (bind(listen_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) {
perror("bind");
close(listen_fd);
exit(EXIT_FAILURE);
}
// 监听
if (listen(listen_fd, 5) < 0) {
perror("listen");
close(listen_fd);
exit(EXIT_FAILURE);
}
// 初始化文件描述符集合
FD_ZERO(&read_fds);
FD_SET(listen_fd, &read_fds);
max_fd = listen_fd;
printf("Server running on port %d...\n", PORT);
while (1) {
fd_set tmp_fds = read_fds; // 复制集合(select会修改传入的fd_set)
// 调用select等待事件
int ret = select(max_fd + 1, &tmp_fds, NULL, NULL, NULL);
if (ret < 0) {
perror("select");
break;
}
// 遍历所有可能的文件描述符
for (int fd = 0; fd <= max_fd; fd++) {
if (FD_ISSET(fd, &tmp_fds)) {
// 处理新连接
if (fd == listen_fd) {
struct sockaddr_in client_addr;
socklen_t addr_len = sizeof(client_addr);
client_fd = accept(listen_fd, (struct sockaddr*)&client_addr, &addr_len);
if (client_fd < 0) {
perror("accept");
continue;
}
FD_SET(client_fd, &read_fds); // 将新连接加入集合
if (client_fd > max_fd) max_fd = client_fd;
printf("New client connected: fd=%d\n", client_fd);
}
// 处理客户端数据
else {
ssize_t n = read(fd, buffer, BUFFER_SIZE - 1);
if (n <= 0) { // 连接关闭或错误
close(fd);
FD_CLR(fd, &read_fds);
printf("Client fd=%d disconnected.\n", fd);
} else {
buffer[n] = '\0';
printf("Received from fd=%d: %s\n", fd, buffer);
// 回显数据(示例逻辑)
write(fd, buffer, n);
}
}
}
}
}
close(listen_fd);
return 0;
}```
select模型缺点
缺点:
- 遍历问题:调用 select 后,需要遍历所有注册的文件描述符,判断哪些发生了变化。这种遍历操作会带来一定的开销。
- 传递监视对象信息的问题: 每次调用 select 时,都必须将所有的监视对象信息(fd_set)传递给操作系统。由于 fd_set 在调用过程中会被修改,因此在每次调用前都需要重新设置,这样就需要频繁地在用户空间和内核空间之间传递数据。
- fd_set 的重置与重建:由于 select 调用后 fd_set 会被修改,程序需要在每次调用前重新设置监视对象,这也增加了额外开销。
那到底哪些因素是提高性能的更大障碍?是调用select函数后常见的针对所有文件描述符对象的循环语句?还是每次需要传递的监视对象信息?
只看代码的话很容易认为是循环。但相比于循环语句,每次都要将监视对象信息传递给操作系统的开销更大。因为这种数据传递是跨越用户空间与内核空间的,不能通过简单的代码优化来消除,往往会成为性能上的致命瓶颈。
“那为何需要把监视对象信息传递给操作系统呢?”
有些函数不需要操作系统的帮助就能完成功能,而有些则必须借助于操作系统。假设各位定义了四则运算相关函数,此时无需操作系统的帮助。但select函数与文件描述符有关,更准确地说,是监视套接字变化的函数。而套接字是由操作系统管理的,所以select函数绝对需要借助于操作系统才能完成功能。
改进方案:
通过只传递一次监视对象信息,并在监视范围或内容发生变化时仅通知变化的部分,可以避免每次调用 select 都进行大量数据传递。
- Linux 提供了 epoll
- Windows 提供了 IOCP
三、Epoll
Linux 特有的事件驱动模型,通过回调机制直接通知就绪事件;
epoll关键函数
**Epoll的三大函数:epoll_create,epoll_wait, epoll_ctl**
#include<sys/epoll.h>
一、创建epoll实例
**int epoll_create(int size);**
→成功时返回epoll文件描述符,失败时返回-1。
size:epoll实例的大小。
二、epoll_ctl用于向 epoll 实例注册、修改或删除感兴趣的文件描述符的事件。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event* event);
→成功时返回0,失败时返回-1。
●epfd 用于注册监视对象的epoll实例
●op表示要执行的操作,取值范围:
EPOLL_CTL_ADD:注册新的文件描述符到 epfd。
EPOLL_CTL_MOD:修改已经注册的文件描述符的监听事件。
EPOLL_CTL_DEL:从 epfd 中删除一个文件描述符。
● fd 需要注册的监视对象文件描述符。
● event 监视对象的事件类型。(也就是 fd 上发生什么事情时 epoll 应该通知你)
struct epoll_event {
uint32_t events; // 表示要监听的事件类型
epoll_data_t data; // 用户数据,可以是文件描述符或指针,当事件发生时,你需要通过 epoll_wait 获取到相关事件。在这个时候,epoll_wait 返回的只是一个事件数组,你并不知道哪些具体的 fd 触发了事件。你可以通过 event.data 来知道哪个文件描述符触发了事件。
};
EPOLLIN∶ 文件描述符上有数据可读(例如,套接字接收到数据,服务器套接字linsten到了客户端的请求连接也是EPOLLIN)
EPOLLOUT∶文件描述符可以写入数据(例如,套接字发送缓冲区有空间可用)
EPOLLPRI∶收到OOB数据的情况。
EPOLLRDHUP∶断开连接或半关闭的情况,这在边缘触发方式下非常有用。
EPOLLERR∶发生错误的情况。
EPOLLET∶以边缘触发的方式得到事件通知。通常默认是水平触发(Level Triggered)。
EPOLLONESHOT∶发生一次事件后,相应文件描述符不再收到事件通知。因此需要向
epoll_ctl函数的第二个参数传递
EPOLLCTL_MOD,再次设置事件。
三、epoll_wait:该函数用于等待事件发生。它会阻塞当前线程,直到有事件发生、超时或者被信号中断。
int epoll_wait(int epfd, struct epoll_event*events,int maxevents,int timeout);
→成功时返回发生事件的文件描述符数。返回值表示发生的事件数量,如果出错则返回 -1。
epfd 表示事件发生监视范围的epol例程的文件描述符
events 保存发生事件的文件描述符集合的结构体地址值。一个指向 epoll_event 结构体数组的指针,用于存储发生的事件。
maxevents 第二个参数中可以保存的最大事件数目。 events 数组的大小,即最多可以处理的事件数量。
Timeout:传递-1时,阻塞一直等待直到发生事件。超时时间,单位为毫秒。如果在这个时间内没有事件发生,epoll_wait 会返回 0。
边缘触发和条件触发
在epoll的应用中涉及到关于IO的读写,而读写的状态变化有4种:
- 可读:socket上有数据
- 不可读:socket上没有数据了
- 可写:socket上有空间可写
- 不可写:socket上无空间可
1、条件触发(Level-Triggered, LT)
工作原理:只要文件描述符(fd)处于就绪状态(例如可读或可写),epoll_wait 会持续通知应用程序,直到数据被处理完毕。
例如,若 socket 接收缓冲区有数据未读完,每次调用 epoll_wait 都会触发 EPOLLIN 事件 2 5。
特点:
默认模式,编程简单,无需一次性处理所有数据。
可能导致多次事件触发,适合对实时性要求不高的场景(如普通 HTTP 服务器)
2、边缘触发(Edge-Triggered, ET)
工作原理:仅在 fd 状态发生 变化时(如从不可读变为可读)通知一次。若数据未处理完,后续不再触发事件。
例如,客户端发送 100 字节数据,ET 模式下 epoll_wait 仅触发一次 EPOLLIN,需一次性读取全部数据 。
特点:
必须搭配 非阻塞 I/O,需循环读写直到返回 EAGAIN 错误。
减少事件触发次数,适合高并发场景(如实时通信、高频交易系统) 2 4
epoll服务端例子
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <arpa/inet.h>
#define MAX_EVENTS 10
#define PORT 8080
#define BUFFER_SIZE 1024
// 将文件描述符设置为非阻塞模式
int set_nonblocking(int fd) {
int flags = fcntl(fd, F_GETFL, 0);
if (flags == -1) {
perror("fcntl get");
return -1;
}
flags |= O_NONBLOCK;
if (fcntl(fd, F_SETFL, flags) == -1) {
perror("fcntl set");
return -1;
}
return 0;
}
int main() {
int listen_fd, conn_fd, nfds, epoll_fd;
struct sockaddr_in server_addr;
struct epoll_event ev, events[MAX_EVENTS];
char buffer[BUFFER_SIZE];
// 创建监听套接字
listen_fd = socket(AF_INET, SOCK_STREAM, 0);
if (listen_fd == -1) {
perror("socket");
exit(EXIT_FAILURE);
}
// 设置监听套接字为非阻塞模式
if (set_nonblocking(listen_fd) == -1) {
exit(EXIT_FAILURE);
}
// 设置地址重用
int opt = 1;
if (setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) == -1) {
perror("setsockopt");
exit(EXIT_FAILURE);
}
// 绑定IP和端口
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY; // 绑定所有网卡IP
server_addr.sin_port = htons(PORT);
if (bind(listen_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
perror("bind");
exit(EXIT_FAILURE);
}
// 监听连接
if (listen(listen_fd, SOMAXCONN) == -1) {
perror("listen");
exit(EXIT_FAILURE);
}
// 创建 epoll 实例
epoll_fd = epoll_create1(0);
if (epoll_fd == -1) {
perror("epoll_create1");
exit(EXIT_FAILURE);
}
// 将监听套接字添加到 epoll 中进行监视
ev.events = EPOLLIN; // 监听可读事件(新连接到来)
ev.data.fd = listen_fd;//当事件发生时,epoll_wait 返回的事件数组,通过 event.data 来知道哪个文件描述符触发了事件。
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &ev) == -1) {
perror("epoll_ctl: listen_fd");
exit(EXIT_FAILURE);
}
// 进入事件循环
while (1) {
nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
if (nfds == -1) {
perror("epoll_wait");
exit(EXIT_FAILURE);
}
for (int i = 0; i < nfds; ++i) {
if (events[i].data.fd == listen_fd) {
// 有新连接到来,循环调用 accept(非阻塞模式下可能一次返回多个连接)
while ((conn_fd = accept(listen_fd, NULL, NULL)) != -1) {
// 设置新连接为非阻塞
if (set_nonblocking(conn_fd) == -1) {
close(conn_fd);
continue;
}
// 将新连接添加到 epoll 中,并使用边缘触发模式
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = conn_fd;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, conn_fd, &ev) == -1) {
perror("epoll_ctl: conn_fd");
close(conn_fd);
}
}
// 如果 accept 返回 -1,则可能是因为没有更多连接了(EAGAIN/EWOULDBLOCK)
if (conn_fd == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
perror("accept");
}
} else {
// 处理已连接套接字的数据读取事件
int client_fd = events[i].data.fd;
int n;
while ((n = read(client_fd, buffer, BUFFER_SIZE)) > 0) {
// echo 服务器
write(client_fd, buffer, n);
}
if (n == 0) {
// 客户端关闭连接
close(client_fd);
} else if (n < 0 && errno != EAGAIN) {
perror("read");
close(client_fd);
}
}
}
}
// 清理工作(一般不会执行到这里)
close(listen_fd);
close(epoll_fd);
return 0;
}
四、IOCP
IOCP是什么?
● IOCP(I/O Completion Port,完成端口) 是 Windows 操作系统专为高并发 I/O 设计的高效异步模型,核心思想是通过 完成端口队列 和 线程池(IOCP一把和线程池配合使用) 管理大量套接字的异步操作。其特点包括:
○ 内核级异步 I/O:基于重叠 I/O(Overlapped I/O),允许同时投递多个 I/O 请求,由内核完成数据搬运后通知应用层。
○ 线程池优化:复用少量线程处理海量 I/O 事件,避免频繁线程创建销毁的开销。
○ 高扩展性:单 IOCP 实例可管理数万级并发连接,适用于高吞吐场景(如游戏服务器、Web 后端)
核心函数与流程?
//在 Windows 中创建一个 I/O 完成端口(IOCP) 处理多个并发的异步 I/O 操作
HANDLE CreateIoCompletionPort(
HANDLE FileHandle, // 句柄,关联 I/O 完成端口(或 INVALID_HANDLE_VALUE)
,在实际使用时,通常会传入一个有效的文件句柄(例如网络套接字、文件句柄等),以将 I/O 完成端口与该文件或设备的异步 I/O 操作关联。。
HANDLE ExistingCompletionPort, // 指定一个已存在的 I/O 完成端口。如果是 NULL,表示我们创建
一个全新的 I/O 完成端口,而不是将其附加到现有的完成端口上。
ULONG_PTR CompletionKey, // 完成键用于区分不同的 I/O 任务来源。当 GetQueuedCompletionStatus() 返回时,可以获取这个 Completion Key。如果你绑定的是 socket 句柄,通常传递 socket 本身的值,方便区分不同的客户端连接。
DWORD NumberOfConcurrentThreads // 处理 IOCP 请求的最大并发线程数。如果设为 0,系统会自动选择(通常是 CPU 核心数 × 2);
//多个线程可以同时从 I/O 完成端口中 获取请求并处理,每个线程在处理完一个请求后,会继续等待下一个请求的到来。这使得系统能够 高效地并发处理大量异步 I/O 操作。
);
//从 I/O 完成端口(IOCP) 获取已完成的 I/O 操作的信息
BOOL GetQueuedCompletionStatus(
HANDLE hCompletionPort, // 建立的完成端口
LPDWORD lpNumberOfBytesTransferred, // 返回传输的字节数,例如,在文件或网络 I/O 中,它返回实际读写的字节数
PULONG_PTR lpCompletionKey, // 返回与完成的 I/O 操作相关的完成键
LPOVERLAPPED *lpOverlapped, // 返回与完成的 I/O 操作相关的重叠结构
DWORD dwMilliseconds // 等待的超时时间,0 表示不等待,INFINITE 表示无限等待
);
//向 I/O 完成端口(IOCP) 发送完成通知。它将一个通知放入 I/O 完成端口队列,让等待的线程知道某个 I/O 操作已经完成。
BOOL PostQueuedCompletionStatus(
HANDLE hCompletionPort, // 目标 I/O 完成端口
DWORD dwNumberOfBytesTransferred, // 传输的字节数.指定已完成的 I/O 操作的字节数
ULONG_PTR dwCompletionKey, // 当将文件句柄、套接字等绑定到 I/O 完成端口时,会将一个标识符(完成键)与该文件句柄关联起来。此参数可以帮助线程识别哪个 I/O 操作已完成。
LPOVERLAPPED lpOverlapped // 与 I/O 操作关联的 OVERLAPPED 结构,OVERLAPPED 结构包含了与异步 I/O 操作相关的信息
);返回值TRUE:表示成功,函数已将完成通知成功放入 I/O 完成端口。
AcceptEx使用流程
// 1. 调用 AcceptEx 接收连接和数据
AcceptEx(listenSocket, acceptSocket, buffer, ...);
// 2. 解析地址
SOCKADDR_IN *pLocalAddr = nullptr, *pRemoteAddr = nullptr;
int localLen = 0, remoteLen = 0;
GetAcceptExSockaddrs(buffer, 0, sizeof(SOCKADDR_IN)+16, sizeof(SOCKADDR_IN)+16,
(SOCKADDR**)&pLocalAddr, &localLen,
(SOCKADDR**)&pRemoteAddr, &remoteLen);
// 3. 更新套接字状态
setsockopt(acceptSocket, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char*)&listenSocket, sizeof(listenSocket));
// 异步发送数据
int WSASend(
SOCKET s,//向指定套接字(Socket)异步发送数据,非阻塞模式下立即返回,实际发送操作由系统后台完成
LPWSABUF lpBuffers,//指向 WSABUF 结构数组的指针,每个结构描述一个数据缓冲区。
DWORD dwBufferCount,//WSABUF 数组的元素数量(最多 WSAMAXIOCP)。
LPDWORD lpNumberOfBytesSent,//输出参数,接收实际发送的字节数(同步模式下有效)
DWORD dwFlags,
LPWSAOVERLAPPED lpOverlapped,//重叠结构指针,用于异步操作(IOCP 必填)
LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine//异步完成时的回调函数(IOCP 中通常设为 NULL)
);
成功:返回 0,且 lpNumberOfBytesSent 更新为实际发送的字节数(同步模式)。
异步操作:返回 SOCKET_ERROR,并调用 WSAGetLastError() 得到 WSA_IO_PENDING,表示操作已提交。
失败:返回 SOCKET_ERROR,错误码可通过 WSAGetLastError() 获取。```
● 步骤:
○ CreateIoCompletionPort创建IO完成端口,
○ GetQueuedCompletionStatus 关联套接字与 IOCP
○ 投递异步操作:使用 WSARecv、WSASend 等函数发起异步 I/O 请求,需传递 重叠结构(Overlapped) 和缓冲区
○ 工作线程通过 GetQueuedCompletionStatus 阻塞等待 I/O 事件完成
IO重叠结构
WSAOVERLAPPED 是Windows异步I/O的核心数据结构,用于标识和管理每个异步操作(如 WSASend/WSARecv)的上下文;确保每个异步操作都有独立的OVERLAPPED结构
OVERLAPPED 结构: 在 WinBase.h 头文件中,OVERLAPPED 结构定义如下 :
typedef struct _OVERLAPPED {
ULONG_PTR Internal; // 低级 I/O 状态
ULONG_PTR InternalHigh; // 传输的字节数
union {
struct {
DWORD Offset; // 低 32 位文件偏移(用于文件 I/O)
DWORD OffsetHigh; // 高 32 位文件偏移(用于文件 I/O)
};
PVOID Pointer; // 其他用途,例如网络 I/O
};
HANDLE hEvent; // I/O 完成后用于通知的事件句柄
} OVERLAPPED, *LPOVERLAPPED;```
IOCP + 重叠结构+线程池实现高并发异步IO
1、实现线程池
//线程
class CMyThread
{
public:
CMyThread() {
m_hThread = NULL;
m_bStatus = false;
}
~CMyThread() {
Stop();
}
//使用 _beginthread 开启线程,并将 this 作为参数传递给 ThreadEntry:true 表示成功 false表示失败
bool Start() {
m_bStatus = true;
m_hThread = (HANDLE)_beginthread(&CMyThread::ThreadEntry, 0, this);//(普通的全局或 static 函数,0,传给线程函数的参数);传入this的目的是为了静态函数能成功调用非静态成员函数
if (!IsValid()) {
m_bStatus = false;
}
return m_bStatus;
}
//返回true表示有效 返回false表示线程异常或者已经终止
bool IsValid() {
if (m_hThread == NULL || (m_hThread == INVALID_HANDLE_VALUE))return false;
return WaitForSingleObject(m_hThread, 0) == WAIT_TIMEOUT;//0-立即返回;WAIT_OBJECT_0(0):线程已结束。WAIT_TIMEOUT(258):线程仍在运行,超时未结束
}
//设置线程运行标志位false,阻塞等待线程结束,并把任务m_worker清空
bool Stop() {
if (m_bStatus == false)return true;
m_bStatus = false;
bool ret = WaitForSingleObject(m_hThread, INFINITE) == WAIT_OBJECT_0;//阻塞等待线程结束
UpdateWorker();
return ret;
}
//更新当前任务void UpdateWorker(const ::ThreadWorker& worker = ::ThreadWorker())
void UpdateWorker(const ::ThreadWorker& worker = ::ThreadWorker()) {
if (m_worker.load() != NULL && (m_worker.load() != &worker)) {
::ThreadWorker* pWorker = m_worker.load();
TRACE("delete pWorker = %08X m_worker = %08X\r\n", pWorker, m_worker.load());
m_worker.store(NULL);
delete pWorker;
}
if (m_worker.load() == &worker) return;
if (!worker.IsValid()) {
m_worker.store(NULL);
return;
}
::ThreadWorker* pWorker = new ::ThreadWorker(worker);
TRACE("new pWorker = %08X m_worker = %08X\r\n", pWorker, m_worker.load());
m_worker.store(new ::ThreadWorker(worker));
}
//true表示空闲 false表示已经分配了工作
bool IsIdle() {
if (m_worker == NULL)return true;
return !m_worker.load()->IsValid();//无效才是空闲
}
private:
//CMyThread 线程的 工作函数;inline:成员函数是在类内部定义,编译器通常会将其作为内联函数来处理。因为类定义和实现通常在同一头文件中,编译器可以很容易地将其内联,如果实现在cpp,一般不是inline
void ThreadWorker() {
while (m_bStatus) {
if (m_worker.load() == NULL) {
Sleep(1);
continue;
}
::ThreadWorker worker = *m_worker.load();
if (worker.IsValid()) {
if (WaitForSingleObject(m_hThread, 0) == WAIT_TIMEOUT) {
int ret = worker();
if (ret != 0) {
CString str;
str.Format(_T("thread found warning code %d\r\n"), ret);
OutputDebugString(str);
}
if (ret < 0) {
::ThreadWorker* pWorker = m_worker.load();
m_worker.store(NULL);
delete pWorker;
}
}
}
else {
Sleep(1);
}
}
}
//线程入口;静态成员函数,不能直接访问类的非静态成员,因此传入this实例,使得可以调用成员函数(非静态成员引用必须与特定对象引用)
static void ThreadEntry(void* arg) {
CMyThread* thiz = (CMyThread*)arg;
if (thiz) {
thiz->ThreadWorker();
}
_endthread();
}
private:
HANDLE m_hThread;
bool m_bStatus;//false 表示线程将要关闭 true 表示线程正在运行
std::atomic<::ThreadWorker*> m_worker;//存储了线程要执行的任务
};
//线程池
class CMyThreadPool
{
public:
CMyThreadPool(size_t size) {
m_threads.resize(size);
for (size_t i = 0; i < size; i++)
m_threads[i] = new CMyThread();
}
CMyThreadPool() {}
~CMyThreadPool() {
Stop();
for (size_t i = 0; i < m_threads.size(); i++)
{
CMyThread* pThread = m_threads[i];
m_threads[i] = NULL;
delete pThread;
}
m_threads.clear();
}
//启动线程池所有线程
bool Invoke() {
bool ret = true;
for (size_t i = 0; i < m_threads.size(); i++) {
if (m_threads[i]->Start() == false) {
ret = false;
break;
}
}
if (ret == false) {
for (size_t i = 0; i < m_threads.size(); i++) {
m_threads[i]->Stop();
}
}
return ret;
}
//停止线程池所有线程
void Stop() {
for (size_t i = 0; i < m_threads.size(); i++) {
m_threads[i]->Stop();
}
}
//返回-1 表示分配失败,所有线程都在忙 大于等于0,表示第n个线程分配来做这个worker任务
int DispatchWorker(const ThreadWorker& worker) {
int index = -1;
m_lock.lock();//多个线程同时调用DispatchWorker时候,不会导致同时使用一个工作线程
//这种方式是轮询线程池找空闲线程,可以搞个列表,只把空闲线程加进去
for (size_t i = 0; i < m_threads.size(); i++) {
if (m_threads[i] != NULL && m_threads[i]->IsIdle()) {
m_threads[i]->UpdateWorker(worker);
index = i;
break;
}
}
m_lock.unlock();
return index;
}
//检查第index个线程的有效性
bool CheckThreadValid(size_t index) {
if (index < m_threads.size()) {
return m_threads[index]->IsValid();
}
return false;
}
private:
std::mutex m_lock;
std::vector<CMyThread*> m_threads;//线程数组
};```
2、自定义重叠结构
一般需要扩展 Overlapped结构,添加自定义字段以跟踪I/O操作的上下文;例如下面这个:
/基类,封装了 重叠IO结构,用于不同的IO操作(如接受、接收、发送和错误处理)
class COverlapped :public ThreadFuncBase {
public:
OVERLAPPED m_overlapped;//m_overlapped 必须是第一个成员,保证通过 CONTAINING_RECORD 宏能正确反推父结构地址。
DWORD m_operator;//I/O 操作类型(EAccept, ERecv, ESend, EError)
std::vector<char> m_buffer;//缓冲区
ThreadWorker m_worker;//处理函数 用于处理该操作的任务
CMyServer* m_server;//服务器对象
CMyClient* m_client;//对应的客户端
WSABUF m_wsabuffer;//Windows网络I/O缓冲区,用于发送和接收数据
virtual ~COverlapped() {
m_client = NULL;
}
};
根据自己需求去更改自定义参数,但是OVERLAPPED 结构一定是成员变量第一个,并且在异步操作之前将预先定义好的重叠结构传给异步IO操作(例如AcceptEx,WRecv等);
3、服务端实现(具体步骤)
//1、服务端创建支持重叠结构的socket以及环境
WSADATA WSAData;
WSAStartup(MAKEWORD(2, 2), &WSAData);
m_sock = WSASocket(PF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
int opt = 1;
setsockopt(m_sock, SOL_SOCKET, SO_REUSEADDR, (const char*)&opt, sizeof(opt));
//2、服务端socket 进行bind。listen
bind(m_sock, (sockaddr*)&m_addr, sizeof(m_addr);
listen(m_sock, 3);
//3。创建IOCP
HANDLE hIOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
//4、IOCP绑定socket
CreateIoCompletionPort((HANDLE)m_sock, m_hIOCP, (ULONG_PTR)this, 0);
//5、唤醒线程池,另工作线程循环调用GetQueuedCompletionStatus
m_pool.Invoke();
m_pool.DispatchWorker(ThreadWorker(this, (FUNCTYPE)&EdoyunServer::threadIocp));
//6、服务端套接字调用异步IO操作
AcceptEx();
// AcceptEx 被调用后,将 OVERLAPPED 结构提交给 Windows 内核,内核会:分配资源 以等待新的客户端连接,存储 OVERLAPPED* 指针 以便在连接完成时使用
//当有客户端连接:内核完成 TCP 握手,建立连接;将预先传入的套接字与新连接绑定; 客户端地址(IP + 端口)和 服务端地址 复制到输出缓冲区(pClient->m_wsabuffer.buf)
若 dwReceiveDataLength > 0,还会将客户端发送的第一波数据存入缓冲区。
并投递完成事件到 IOCP队列
//7、此时工作线程会通过GetQueuedCompletionStatus从IOCP队列获取到完成事件,取出参数里的key和重叠结构后,得到任务类型进行处理;
//8、处理完成后,线程 再次调用 GetQueuedCompletionStatus 进入阻塞状态,等待下一个事件
特点
线程池线程数量:通常设置为 CPU 核心数 × 2(如 8 核用 16 线程),兼顾并发与缓存亲和性。
线程唤醒机制
■ 当IOCP队列中有新事件到达时,系统会按 后进先出(LIFO)顺序 唤醒最近空闲的线程处理事件 ;内核级调度:操作系统内核直接管理完成队列,优先选择最近活跃的线程,减少线程切换和CPU缓存失效的开销
线程池优化:复用少量线程处理海量 I/O 事件,避免频繁线程创建销毁的开销。
真正的异步IO
调用I/O函数时,函数立即返回,内核在后台完成I/O操作,线程池的线程通过get完成端口(IOCP)队列的 结果来获取任务。主线程无需阻塞,可继续处理其他任务;
高并发:
当 I/O 操作完成时,系统会将该操作的结果放入 I/O 完成端口队列。工作线程从该队列中获取结果并处理任务。通过这种机制,I/O 操作与任务处理是分离的,避免主线程的阻塞;线程池中的工作线程从 I/O 完成端口队列中异步地获取并处理完成的 I/O 操作,从而能够高效处理大量并发的 I/O 请求。