当前位置: 首页 > news >正文

Select、Epoll 与 IOCP模型的介绍与区别

@TOC

一、I/O复用

IO 复用(I/O Multiplexing)是一种并发处理多个 I/O 操作的机制,允许一个进程或线程监视多个文件描述符(sockets、files、pipes等),并在其中任意一个文件描述符就绪(可读或可写)时进行操作。这种机制主要用于提高系统的性能和效率,特别是在处理大量并发连接或事件时非常有用。.

IO复用和多进程服务器区别:
在这里插入图片描述

二、总结

  1. select:通过轮询检测描述符状态,适用低并发或跨平台场景。
  2. epoll:基于事件驱动,支持高并发,是 Linux 下高性能服务的首选(如 Nginx、Redis)。
  3. IOCP:IOCP(I/O Completion Port,完成端口) 是 Windows 操作系统专为高并发 I/O 设计的高效异步模型;
selectepollIOCP
平台跨平台支持(Windows/Linux)LinuxWindows
文件描述符数量限制基于 fd_set 位图,默认上限 1024(可修改宏但效率下降)无硬性限制,内核使用红黑树管理描述符无硬性限制,内核通过 I/O 完成端口管理
时间复杂度每次调用需遍历所有描述符,时间复杂度 O(n)仅处理就绪事件,时间复杂度 O(1)(与总描述符数无关)每次操作为 O(1),通过 I/O 完成端口高效管理任务
数据拷贝开销每次调用需将描述符集合从用户空间拷贝到内核通过 epoll_ctl 注册描述符后,内核与用户空间共享内存(mmap 实现),避免重复拷贝。使用重叠 I/O 重叠结构避免重复拷贝
内核通知机制内核遍历所有描述符,轮询检查就绪状态内核通过回调机制将就绪事件加入就绪链表(eventpoll.rdlist),用户直接读取链表。内核通过 I/O 完成端口将已完成的 I/O 操作报告给用户
并发数低并发(<1k)高并发(>10k)高并发(>10k)
触发模式水平触发(默认)水平触发(默认),可设置边缘触发)完成触发,
适用场景小规模或中等规模的网络应用高并发场景,如大规模网络服务、数据库连接池等高性能 Windows 服务器(如数据库、游戏后端、大规模网络服务)

二、Select详细介绍

I/O 多路复用接口,通过轮询检测文件描述符的就绪状态。步骤如图:
在这里插入图片描述

fd_set结构

通过轮询检测文件描述符的就绪状态,此时首先需要将要监视的文件描述符集中到 一起,集中时要按照io类型(接收、传输、异常)进行区分->引入fd_set数组(存有0和1的位数组):
在这里插入图片描述在fd-set变量中注册或更改值的操作都由下列宏完成:
在这里插入图片描述

select函数

select 函数用来验证 3种类型IO操作监视项的变化情况;select 函数用于监视多个文件描述符,以查看是否有任何一个准备好进行 I/O 操作(如读取、写入或异常条件)在这里插入图片描述指定监视范围

select 函数的第一个参数要求传入待监视的所有文件描述符中最大值加1。原因在于,文件描述符从 0 开始计数,所以只需已注册到 fd_set 中的所有文件描述符的最大值,再加 1,作为该参数传递给 select。

超时时间

select 默认是阻塞操作,只有当监视的文件描述符状态发生变化时才返回。为了防止一直阻塞,我们可以通过最后一个参数设置超时时间。这个参数是一个指向 timeval 结构体的指针,通过设置结构体中的 tv_sec(秒)和 tv_usec(微秒)来指定等待时间。若在指定时间内没有任何变化,select 将返回 0。如果不需要超时功能,可以将此参数设为 NULL,使 select 一直阻塞直到有文件描述符发生变化。:在这里插入图片描述

调用 select 函数后查看结果
在这里插入图片描述select函数调用完成后,向其传递的fd_set中将发生变化, 原来为1的所有位均变为0 ,但发生变化的文件描述符对应位除外。 因此,可以认为值仍为 1的位置上的文件描述符发生了变化

使用select 函数时可以将多个文件描述符集中到起统一监视,项目(监视项称为事件)如下:

select实现IO复用的回声服务器端调用示例

select流程

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/select.h>
#include <netinet/in.h>

#define MAX_CLIENTS 1024
#define BUFFER_SIZE 1024
#define PORT 8080

int main() {
    int listen_fd, client_fd, max_fd;
    struct sockaddr_in server_addr;
    fd_set read_fds;
    char buffer[BUFFER_SIZE];

    // 创建监听socket
    listen_fd = socket(AF_INET, SOCK_STREAM, 0);
    if (listen_fd < 0) {
        perror("socket");
        exit(EXIT_FAILURE);
    }

    // 绑定地址
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = INADDR_ANY;
    server_addr.sin_port = htons(PORT);

    if (bind(listen_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) {
        perror("bind");
        close(listen_fd);
        exit(EXIT_FAILURE);
    }

    // 监听
    if (listen(listen_fd, 5) < 0) {
        perror("listen");
        close(listen_fd);
        exit(EXIT_FAILURE);
    }

    // 初始化文件描述符集合
    FD_ZERO(&read_fds);
    FD_SET(listen_fd, &read_fds);
    max_fd = listen_fd;

    printf("Server running on port %d...\n", PORT);

    while (1) {
        fd_set tmp_fds = read_fds;  // 复制集合(select会修改传入的fd_set)

        // 调用select等待事件
        int ret = select(max_fd + 1, &tmp_fds, NULL, NULL, NULL);
        if (ret < 0) {
            perror("select");
            break;
        }

        // 遍历所有可能的文件描述符
        for (int fd = 0; fd <= max_fd; fd++) {
            if (FD_ISSET(fd, &tmp_fds)) {
                // 处理新连接
                if (fd == listen_fd) {
                    struct sockaddr_in client_addr;
                    socklen_t addr_len = sizeof(client_addr);
                    client_fd = accept(listen_fd, (struct sockaddr*)&client_addr, &addr_len);
                    if (client_fd < 0) {
                        perror("accept");
                        continue;
                    }

                    FD_SET(client_fd, &read_fds);  // 将新连接加入集合
                    if (client_fd > max_fd) max_fd = client_fd;
                    printf("New client connected: fd=%d\n", client_fd);
                } 
                // 处理客户端数据
                else {
                    ssize_t n = read(fd, buffer, BUFFER_SIZE - 1);
                    if (n <= 0) {  // 连接关闭或错误
                        close(fd);
                        FD_CLR(fd, &read_fds);
                        printf("Client fd=%d disconnected.\n", fd);
                    } else {
                        buffer[n] = '\0';
                        printf("Received from fd=%d: %s\n", fd, buffer);
                        // 回显数据(示例逻辑)
                        write(fd, buffer, n);
                    }
                }
            }
        }
    }

    close(listen_fd);
    return 0;
}```

select模型缺点

缺点

  1. 遍历问题:调用 select 后,需要遍历所有注册的文件描述符,判断哪些发生了变化。这种遍历操作会带来一定的开销。
  2. 传递监视对象信息的问题: 每次调用 select 时,都必须将所有的监视对象信息(fd_set)传递给操作系统。由于 fd_set 在调用过程中会被修改,因此在每次调用前都需要重新设置,这样就需要频繁地在用户空间和内核空间之间传递数据。
  3. fd_set 的重置与重建:由于 select 调用后 fd_set 会被修改,程序需要在每次调用前重新设置监视对象,这也增加了额外开销。

那到底哪些因素是提高性能的更大障碍?是调用select函数后常见的针对所有文件描述符对象的循环语句?还是每次需要传递的监视对象信息?

只看代码的话很容易认为是循环。但相比于循环语句,每次都要将监视对象信息传递给操作系统的开销更大。因为这种数据传递是跨越用户空间与内核空间的,不能通过简单的代码优化来消除,往往会成为性能上的致命瓶颈。

“那为何需要把监视对象信息传递给操作系统呢?”

有些函数不需要操作系统的帮助就能完成功能,而有些则必须借助于操作系统。假设各位定义了四则运算相关函数,此时无需操作系统的帮助。但select函数与文件描述符有关,更准确地说,是监视套接字变化的函数。而套接字是由操作系统管理的,所以select函数绝对需要借助于操作系统才能完成功能。

改进方案:

通过只传递一次监视对象信息,并在监视范围或内容发生变化时仅通知变化的部分,可以避免每次调用 select 都进行大量数据传递。

  • Linux 提供了 epoll
  • Windows 提供了 IOCP

三、Epoll

Linux 特有的事件驱动模型,通过回调机制直接通知就绪事件

epoll关键函数

**Epoll的三大函数:epoll_create,epoll_wait, epoll_ctl**
#include<sys/epoll.h>

一、创建epoll实例
**int epoll_create(int size);**
→成功时返回epoll文件描述符,失败时返回-1。
size:epoll实例的大小。
    

二、epoll_ctl用于向 epoll 实例注册、修改或删除感兴趣的文件描述符的事件。

int epoll_ctl(int epfd, int op, int fd, struct epoll_event* event);
→成功时返回0,失败时返回-1。
●epfd 用于注册监视对象的epoll实例
●op表示要执行的操作,取值范围:
		EPOLL_CTL_ADD:注册新的文件描述符到 epfd。
		EPOLL_CTL_MOD:修改已经注册的文件描述符的监听事件。
		EPOLL_CTL_DEL:从 epfd 中删除一个文件描述符。		
● fd 需要注册的监视对象文件描述符。
● event 监视对象的事件类型。(也就是 fd 上发生什么事情时 epoll 应该通知你)
    struct epoll_event {
    uint32_t events;  // 表示要监听的事件类型
    epoll_data_t data;  // 用户数据,可以是文件描述符或指针,当事件发生时,你需要通过 epoll_wait 获取到相关事件。在这个时候,epoll_wait 返回的只是一个事件数组,你并不知道哪些具体的 fd 触发了事件。你可以通过 event.data 来知道哪个文件描述符触发了事件。
};
		EPOLLIN∶ 文件描述符上有数据可读(例如,套接字接收到数据,服务器套接字linsten到了客户端的请求连接也是EPOLLIN)
		EPOLLOUT∶文件描述符可以写入数据(例如,套接字发送缓冲区有空间可用)
		EPOLLPRI∶收到OOB数据的情况。
		EPOLLRDHUP∶断开连接或半关闭的情况,这在边缘触发方式下非常有用。
		EPOLLERR∶发生错误的情况。
		EPOLLET∶以边缘触发的方式得到事件通知。通常默认是水平触发(Level Triggered)。
		EPOLLONESHOT∶发生一次事件后,相应文件描述符不再收到事件通知。因此需要向
		epoll_ctl函数的第二个参数传递
		EPOLLCTL_MOD,再次设置事件。


    
三、epoll_wait:该函数用于等待事件发生。它会阻塞当前线程,直到有事件发生、超时或者被信号中断。

int epoll_wait(int epfd, struct epoll_event*events,int maxevents,int timeout);
→成功时返回发生事件的文件描述符数。返回值表示发生的事件数量,如果出错则返回 -1。
		epfd 表示事件发生监视范围的epol例程的文件描述符
		events 保存发生事件的文件描述符集合的结构体地址值。一个指向 epoll_event 结构体数组的指针,用于存储发生的事件。
		maxevents 第二个参数中可以保存的最大事件数目。 events 数组的大小,即最多可以处理的事件数量。
		Timeout:传递-1时,阻塞一直等待直到发生事件。超时时间,单位为毫秒。如果在这个时间内没有事件发生,epoll_wait 会返回 0

边缘触发和条件触发

在epoll的应用中涉及到关于IO的读写,而读写的状态变化有4种:

  1. 可读:socket上有数据
  2. 不可读:socket上没有数据了
  3. 可写:socket上有空间可写
  4. 不可写:socket上无空间可

1、条件触发(Level-Triggered, LT)
工作原理:只要文件描述符(fd)处于就绪状态(例如可读或可写),epoll_wait 会持续通知应用程序,直到数据被处理完毕。
例如,若 socket 接收缓冲区有数据未读完,每次调用 epoll_wait 都会触发 EPOLLIN 事件 2 5。
特点
默认模式,编程简单,无需一次性处理所有数据。
可能导致多次事件触发,适合对实时性要求不高的场景(如普通 HTTP 服务器)

2、边缘触发(Edge-Triggered, ET)

工作原理:仅在 fd 状态发生 变化时(如从不可读变为可读)通知一次。若数据未处理完,后续不再触发事件。
例如,客户端发送 100 字节数据,ET 模式下 epoll_wait 仅触发一次 EPOLLIN,需一次性读取全部数据 。
特点
必须搭配 非阻塞 I/O,需循环读写直到返回 EAGAIN 错误。
减少事件触发次数,适合高并发场景(如实时通信、高频交易系统) 2 4

epoll服务端例子

流程

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <arpa/inet.h>

#define MAX_EVENTS 10
#define PORT 8080
#define BUFFER_SIZE 1024

// 将文件描述符设置为非阻塞模式
int set_nonblocking(int fd) {
    int flags = fcntl(fd, F_GETFL, 0);
    if (flags == -1) {
        perror("fcntl get");
        return -1;
    }
    flags |= O_NONBLOCK;
    if (fcntl(fd, F_SETFL, flags) == -1) {
        perror("fcntl set");
        return -1;
    }
    return 0;
}

int main() {
    int listen_fd, conn_fd, nfds, epoll_fd;
    struct sockaddr_in server_addr;
    struct epoll_event ev, events[MAX_EVENTS];
    char buffer[BUFFER_SIZE];

    // 创建监听套接字
    listen_fd = socket(AF_INET, SOCK_STREAM, 0);
    if (listen_fd == -1) {
        perror("socket");
        exit(EXIT_FAILURE);
    }
    
    // 设置监听套接字为非阻塞模式
    if (set_nonblocking(listen_fd) == -1) {
        exit(EXIT_FAILURE);
    }
    
    // 设置地址重用
    int opt = 1;
    if (setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) == -1) {
        perror("setsockopt");
        exit(EXIT_FAILURE);
    }
    
    // 绑定IP和端口
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = INADDR_ANY; // 绑定所有网卡IP
    server_addr.sin_port = htons(PORT);
    
    if (bind(listen_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
        perror("bind");
        exit(EXIT_FAILURE);
    }
    
    // 监听连接
    if (listen(listen_fd, SOMAXCONN) == -1) {
        perror("listen");
        exit(EXIT_FAILURE);
    }
    
    // 创建 epoll 实例
    epoll_fd = epoll_create1(0);
    if (epoll_fd == -1) {
        perror("epoll_create1");
        exit(EXIT_FAILURE);
    }
    
    // 将监听套接字添加到 epoll 中进行监视
    ev.events = EPOLLIN;  // 监听可读事件(新连接到来)
    ev.data.fd = listen_fd;//当事件发生时,epoll_wait 返回的事件数组,通过 event.data 来知道哪个文件描述符触发了事件。
    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &ev) == -1) {
        perror("epoll_ctl: listen_fd");
        exit(EXIT_FAILURE);
    }
    
    // 进入事件循环
    while (1) {
        nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
        if (nfds == -1) {
            perror("epoll_wait");
            exit(EXIT_FAILURE);
        }
        
        for (int i = 0; i < nfds; ++i) {
            if (events[i].data.fd == listen_fd) {
                // 有新连接到来,循环调用 accept(非阻塞模式下可能一次返回多个连接)
                while ((conn_fd = accept(listen_fd, NULL, NULL)) != -1) {
                    // 设置新连接为非阻塞
                    if (set_nonblocking(conn_fd) == -1) {
                        close(conn_fd);
                        continue;
                    }
                    // 将新连接添加到 epoll 中,并使用边缘触发模式
                    ev.events = EPOLLIN | EPOLLET;
                    ev.data.fd = conn_fd;
                    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, conn_fd, &ev) == -1) {
                        perror("epoll_ctl: conn_fd");
                        close(conn_fd);
                    }
                }
                // 如果 accept 返回 -1,则可能是因为没有更多连接了(EAGAIN/EWOULDBLOCK)
                if (conn_fd == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
                    perror("accept");
                }
            } else {
                // 处理已连接套接字的数据读取事件
                int client_fd = events[i].data.fd;
                int n;
                while ((n = read(client_fd, buffer, BUFFER_SIZE)) > 0) {
                    //  echo 服务器
                    write(client_fd, buffer, n);
                }
                if (n == 0) {
                    // 客户端关闭连接
                    close(client_fd);
                } else if (n < 0 && errno != EAGAIN) {
                    perror("read");
                    close(client_fd);
                }
            }
        }
    }
    
    // 清理工作(一般不会执行到这里)
    close(listen_fd);
    close(epoll_fd);
    return 0;
}

四、IOCP

IOCP是什么?

● IOCP(I/O Completion Port,完成端口) 是 Windows 操作系统专为高并发 I/O 设计的高效异步模型,核心思想是通过 完成端口队列 和 线程池(IOCP一把和线程池配合使用) 管理大量套接字的异步操作。其特点包括:
○ 内核级异步 I/O:基于重叠 I/O(Overlapped I/O),允许同时投递多个 I/O 请求,由内核完成数据搬运后通知应用层。
○ 线程池优化:复用少量线程处理海量 I/O 事件,避免频繁线程创建销毁的开销。
○ 高扩展性:单 IOCP 实例可管理数万级并发连接,适用于高吞吐场景(如游戏服务器、Web 后端)

核心函数与流程?

//在 Windows 中创建一个 I/O 完成端口(IOCP) 处理多个并发的异步 I/O 操作
HANDLE CreateIoCompletionPort(
  HANDLE FileHandle,   // 句柄,关联 I/O 完成端口(或 INVALID_HANDLE_VALUE)
      ,在实际使用时,通常会传入一个有效的文件句柄(例如网络套接字、文件句柄等),以将 I/O 完成端口与该文件或设备的异步 I/O 操作关联。。
  HANDLE ExistingCompletionPort, // 指定一个已存在的 I/O 完成端口。如果是 NULL,表示我们创建
      一个全新的 I/O 完成端口,而不是将其附加到现有的完成端口上。
  ULONG_PTR CompletionKey, // 完成键用于区分不同的 I/O 任务来源。当 GetQueuedCompletionStatus() 返回时,可以获取这个 Completion Key。如果你绑定的是 socket 句柄,通常传递 socket 本身的值,方便区分不同的客户端连接。

  DWORD NumberOfConcurrentThreads // 处理 IOCP 请求的最大并发线程数。如果设为 0,系统会自动选择(通常是 CPU 核心数 × 2);
    //多个线程可以同时从 I/O 完成端口中 获取请求并处理,每个线程在处理完一个请求后,会继续等待下一个请求的到来。这使得系统能够 高效地并发处理大量异步 I/O 操作。
);

//从 I/O 完成端口(IOCP) 获取已完成的 I/O 操作的信息
BOOL GetQueuedCompletionStatus(
  HANDLE hCompletionPort,         // 建立的完成端口  
  LPDWORD lpNumberOfBytesTransferred, // 返回传输的字节数,例如,在文件或网络 I/O 中,它返回实际读写的字节数
  PULONG_PTR lpCompletionKey,     // 返回与完成的 I/O 操作相关的完成键
  LPOVERLAPPED *lpOverlapped,    // 返回与完成的 I/O 操作相关的重叠结构
  DWORD dwMilliseconds            // 等待的超时时间,0 表示不等待,INFINITE 表示无限等待
);

//向 I/O 完成端口(IOCP) 发送完成通知。它将一个通知放入 I/O 完成端口队列,让等待的线程知道某个 I/O 操作已经完成。
BOOL PostQueuedCompletionStatus(
  HANDLE hCompletionPort,         // 目标 I/O 完成端口
  DWORD dwNumberOfBytesTransferred, // 传输的字节数.指定已完成的 I/O 操作的字节数
  ULONG_PTR dwCompletionKey,      // 当将文件句柄、套接字等绑定到 I/O 完成端口时,会将一个标识符(完成键)与该文件句柄关联起来。此参数可以帮助线程识别哪个 I/O 操作已完成。
  LPOVERLAPPED lpOverlapped       // 与 I/O 操作关联的 OVERLAPPED 结构,OVERLAPPED 结构包含了与异步 I/O 操作相关的信息
);返回值TRUE:表示成功,函数已将完成通知成功放入 I/O 完成端口。


    AcceptEx使用流程
// 1. 调用 AcceptEx 接收连接和数据
AcceptEx(listenSocket, acceptSocket, buffer, ...);

// 2. 解析地址
SOCKADDR_IN *pLocalAddr = nullptr, *pRemoteAddr = nullptr;
int localLen = 0, remoteLen = 0;
GetAcceptExSockaddrs(buffer, 0, sizeof(SOCKADDR_IN)+16, sizeof(SOCKADDR_IN)+16,
                     (SOCKADDR**)&pLocalAddr, &localLen,
                     (SOCKADDR**)&pRemoteAddr, &remoteLen);

// 3. 更新套接字状态
setsockopt(acceptSocket, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char*)&listenSocket, sizeof(listenSocket));

// 异步发送数据
int WSASend(
  SOCKET                             s,//向指定套接字(Socket)异步发送数据,非阻塞模式下立即返回,实际发送操作由系统后台完成
  LPWSABUF                           lpBuffers,//指向 WSABUF 结构数组的指针,每个结构描述一个数据缓冲区。
  DWORD                              dwBufferCount,//WSABUF 数组的元素数量(最多 WSAMAXIOCP)。
  LPDWORD                            lpNumberOfBytesSent,//输出参数,接收实际发送的字节数(同步模式下有效)
  DWORD                              dwFlags,
  LPWSAOVERLAPPED                    lpOverlapped,//重叠结构指针,用于异步操作(IOCP 必填)
  LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine//异步完成时的回调函数(IOCP 中通常设为 NULL)
);
成功:返回 0,且 lpNumberOfBytesSent 更新为实际发送的字节数(同步模式)。
异步操作:返回 SOCKET_ERROR,并调用 WSAGetLastError() 得到 WSA_IO_PENDING,表示操作已提交。
失败:返回 SOCKET_ERROR,错误码可通过 WSAGetLastError() 获取。```


● 步骤:
○ CreateIoCompletionPort创建IO完成端口,
○ GetQueuedCompletionStatus 关联套接字与 IOCP
○ 投递异步操作:使用 WSARecv、WSASend 等函数发起异步 I/O 请求,需传递 重叠结构(Overlapped) 和缓冲区
○ 工作线程通过 GetQueuedCompletionStatus 阻塞等待 I/O 事件完成

IO重叠结构

WSAOVERLAPPED 是Windows异步I/O的核心数据结构,用于标识和管理每个异步操作(如 WSASend/WSARecv)的上下文;确保每个异步操作都有独立的OVERLAPPED结构
OVERLAPPED 结构: 在 WinBase.h 头文件中,OVERLAPPED 结构定义如下 :

typedef struct _OVERLAPPED {
    ULONG_PTR Internal;       // 低级 I/O 状态
    ULONG_PTR InternalHigh;   // 传输的字节数
    union {
        struct {
            DWORD Offset;     // 低 32 位文件偏移(用于文件 I/O)
            DWORD OffsetHigh; // 高 32 位文件偏移(用于文件 I/O)
        };
        PVOID Pointer;        // 其他用途,例如网络 I/O
    };
    HANDLE  hEvent;          // I/O 完成后用于通知的事件句柄
} OVERLAPPED, *LPOVERLAPPED;```

IOCP + 重叠结构+线程池实现高并发异步IO

1、实现线程池
//线程
class CMyThread
{
public:
	CMyThread() {
		m_hThread = NULL;
		m_bStatus = false;
	}

	~CMyThread() {
		Stop();
	}

	//使用 _beginthread 开启线程,并将 this 作为参数传递给 ThreadEntry:true 表示成功 false表示失败
	bool Start() {
		m_bStatus = true;
		m_hThread = (HANDLE)_beginthread(&CMyThread::ThreadEntry, 0, this);//(普通的全局或 static 函数,0,传给线程函数的参数);传入this的目的是为了静态函数能成功调用非静态成员函数
		if (!IsValid()) {
			m_bStatus = false;
		}
		return m_bStatus;
	}
	//返回true表示有效 返回false表示线程异常或者已经终止
	bool IsValid() {
		if (m_hThread == NULL || (m_hThread == INVALID_HANDLE_VALUE))return false;
		return WaitForSingleObject(m_hThread, 0) == WAIT_TIMEOUT;//0-立即返回;WAIT_OBJECT_0(0):线程已结束。WAIT_TIMEOUT(258):线程仍在运行,超时未结束
	}
	//设置线程运行标志位false,阻塞等待线程结束,并把任务m_worker清空
	bool Stop() {
		if (m_bStatus == false)return true;
		m_bStatus = false;
		bool ret = WaitForSingleObject(m_hThread, INFINITE) == WAIT_OBJECT_0;//阻塞等待线程结束
		UpdateWorker();
		return ret;
	}
	//更新当前任务void UpdateWorker(const ::ThreadWorker& worker = ::ThreadWorker())
	void UpdateWorker(const ::ThreadWorker& worker = ::ThreadWorker()) {
		if (m_worker.load() != NULL && (m_worker.load() != &worker)) {
			::ThreadWorker* pWorker = m_worker.load();
			TRACE("delete pWorker = %08X m_worker = %08X\r\n", pWorker, m_worker.load());
			m_worker.store(NULL);
			delete pWorker;
		}
		if (m_worker.load() == &worker) return;
		if (!worker.IsValid()) {
			m_worker.store(NULL);
			return;
		}
		::ThreadWorker* pWorker = new ::ThreadWorker(worker);
		TRACE("new pWorker = %08X m_worker = %08X\r\n", pWorker, m_worker.load());
		m_worker.store(new ::ThreadWorker(worker));
	}

	//true表示空闲 false表示已经分配了工作
	bool IsIdle() {
		if (m_worker == NULL)return true;
		return !m_worker.load()->IsValid();//无效才是空闲
	}
private:
	//CMyThread 线程的 工作函数;inline:成员函数是在类内部定义,编译器通常会将其作为内联函数来处理。因为类定义和实现通常在同一头文件中,编译器可以很容易地将其内联,如果实现在cpp,一般不是inline
	void ThreadWorker() {
		while (m_bStatus) {
			if (m_worker.load() == NULL) {
				Sleep(1);
				continue;
			}
			::ThreadWorker worker = *m_worker.load();
			if (worker.IsValid()) {
				if (WaitForSingleObject(m_hThread, 0) == WAIT_TIMEOUT) {
					int ret = worker();
					if (ret != 0) {
						CString str;
						str.Format(_T("thread found warning code %d\r\n"), ret);
						OutputDebugString(str);
					}
					if (ret < 0) {
						::ThreadWorker* pWorker = m_worker.load();
						m_worker.store(NULL);
						delete pWorker;
					}
				}
			}
			else {
				Sleep(1);
			}
		}
	}
	//线程入口;静态成员函数,不能直接访问类的非静态成员,因此传入this实例,使得可以调用成员函数(非静态成员引用必须与特定对象引用)
	 static void ThreadEntry(void* arg) {
		CMyThread* thiz = (CMyThread*)arg;
		if (thiz) {
			thiz->ThreadWorker();
		}
		_endthread();
	}
private:
	HANDLE m_hThread;
	bool m_bStatus;//false 表示线程将要关闭  true 表示线程正在运行
	std::atomic<::ThreadWorker*> m_worker;//存储了线程要执行的任务
};
//线程池
class CMyThreadPool
{
public:
	CMyThreadPool(size_t size) {
		m_threads.resize(size);
		for (size_t i = 0; i < size; i++)
			m_threads[i] = new CMyThread();
	}
	CMyThreadPool() {}
	~CMyThreadPool() {
		Stop();
		for (size_t i = 0; i < m_threads.size(); i++)
		{
			CMyThread* pThread = m_threads[i];
			m_threads[i] = NULL;
			delete pThread;
		}

		m_threads.clear();
	}
	//启动线程池所有线程
	bool Invoke() {
		bool ret = true;
		for (size_t i = 0; i < m_threads.size(); i++) {
			if (m_threads[i]->Start() == false) {
				ret = false;
				break;
			}
		}
		if (ret == false) {
			for (size_t i = 0; i < m_threads.size(); i++) {
				m_threads[i]->Stop();
			}
		}
		return ret;
	}
	//停止线程池所有线程
	void Stop() {
		for (size_t i = 0; i < m_threads.size(); i++) {
			m_threads[i]->Stop();
		}
	}

	//返回-1 表示分配失败,所有线程都在忙 大于等于0,表示第n个线程分配来做这个worker任务
	int DispatchWorker(const ThreadWorker& worker) {
		int index = -1;
		m_lock.lock();//多个线程同时调用DispatchWorker时候,不会导致同时使用一个工作线程
		//这种方式是轮询线程池找空闲线程,可以搞个列表,只把空闲线程加进去
		for (size_t i = 0; i < m_threads.size(); i++) {
			if (m_threads[i] != NULL && m_threads[i]->IsIdle()) {
				m_threads[i]->UpdateWorker(worker);
				index = i;
				break;
			}
		}
		m_lock.unlock();
		return index;
	}
	//检查第index个线程的有效性
	bool CheckThreadValid(size_t index) {
		if (index < m_threads.size()) {
			return m_threads[index]->IsValid();
		}
		return false;
	}
private:
	std::mutex m_lock;
	std::vector<CMyThread*> m_threads;//线程数组
};```
2、自定义重叠结构

一般需要扩展 Overlapped结构,添加自定义字段以跟踪I/O操作的上下文;例如下面这个:

/基类,封装了 重叠IO结构,用于不同的IO操作(如接受、接收、发送和错误处理)
class COverlapped :public ThreadFuncBase {
public:
	OVERLAPPED m_overlapped;//m_overlapped 必须是第一个成员,保证通过 CONTAINING_RECORD 宏能正确反推父结构地址。
	DWORD m_operator;//I/O 操作类型(EAccept, ERecv, ESend, EError)
	std::vector<char> m_buffer;//缓冲区
	ThreadWorker m_worker;//处理函数 用于处理该操作的任务
	CMyServer* m_server;//服务器对象
	CMyClient* m_client;//对应的客户端
	WSABUF m_wsabuffer;//Windows网络I/O缓冲区,用于发送和接收数据
	virtual ~COverlapped() {
		m_client = NULL;
	}
};

根据自己需求去更改自定义参数,但是OVERLAPPED 结构一定是成员变量第一个,并且在异步操作之前将预先定义好的重叠结构传给异步IO操作(例如AcceptEx,WRecv等);

3、服务端实现(具体步骤)
//1、服务端创建支持重叠结构的socket以及环境
  WSADATA WSAData;
  WSAStartup(MAKEWORD(2, 2), &WSAData);
  m_sock = WSASocket(PF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
  int opt = 1;
  setsockopt(m_sock, SOL_SOCKET, SO_REUSEADDR, (const char*)&opt, sizeof(opt));
//2、服务端socket 进行bind。listen
bind(m_sock, (sockaddr*)&m_addr, sizeof(m_addr)listen(m_sock, 3)//3。创建IOCP
HANDLE hIOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
//4、IOCP绑定socket
    CreateIoCompletionPort((HANDLE)m_sock, m_hIOCP, (ULONG_PTR)this, 0);
//5、唤醒线程池,另工作线程循环调用GetQueuedCompletionStatus
m_pool.Invoke();
m_pool.DispatchWorker(ThreadWorker(this, (FUNCTYPE)&EdoyunServer::threadIocp));
//6、服务端套接字调用异步IO操作
AcceptEx();
// AcceptEx 被调用后,将 OVERLAPPED 结构提交给 Windows 内核,内核会:分配资源 以等待新的客户端连接,存储 OVERLAPPED* 指针 以便在连接完成时使用
//当有客户端连接:内核完成 TCP 握手,建立连接;将预先传入的套接字与新连接绑定; 客户端地址(IP + 端口)和 服务端地址 复制到输出缓冲区(pClient->m_wsabuffer.buf)
     若 dwReceiveDataLength > 0,还会将客户端发送的第一波数据存入缓冲区。
     并投递完成事件到 IOCP队列
 //7、此时工作线程会通过GetQueuedCompletionStatus从IOCP队列获取到完成事件,取出参数里的key和重叠结构后,得到任务类型进行处理;
 //8、处理完成后,线程 再次调用 GetQueuedCompletionStatus 进入阻塞状态,等待下一个事件 
特点

线程池线程数量:通常设置为 CPU 核心数 × 2(如 8 核用 16 线程),兼顾并发与缓存亲和性。
线程唤醒机制
■ 当IOCP队列中有新事件到达时,系统会按 后进先出(LIFO)顺序 唤醒最近空闲的线程处理事件 ;内核级调度:操作系统内核直接管理完成队列,优先选择最近活跃的线程,减少线程切换和CPU缓存失效的开销
线程池优化:复用少量线程处理海量 I/O 事件,避免频繁线程创建销毁的开销。

真正的异步IO
调用I/O函数时,函数立即返回,内核在后台完成I/O操作,线程池的线程通过get完成端口(IOCP)队列的 结果来获取任务。主线程无需阻塞,可继续处理其他任务;
高并发:
当 I/O 操作完成时,系统会将该操作的结果放入 I/O 完成端口队列。工作线程从该队列中获取结果并处理任务。通过这种机制,I/O 操作与任务处理是分离的,避免主线程的阻塞;线程池中的工作线程从 I/O 完成端口队列中异步地获取并处理完成的 I/O 操作,从而能够高效处理大量并发的 I/O 请求。

比较粗糙,有待完善

相关文章:

  • 力扣 跳跃游戏 II
  • Pytorch实现之粒子群优化算法在GAN中的应用
  • 【工具类】 Hutool 中用于生成随机数的工具类
  • Python基于自然语言处理技术的新闻文本分类系统【附源码、文档说明】
  • R语言用逻辑回归贝叶斯层次对本垒打数据与心脏移植数据后验预测检验模拟推断及先验影响分析|附数据代码...
  • 轻松搭建本地大语言模型(二)Open-WebUI安装与使用
  • C++基础知识学习记录—友元
  • 麒麟系统下载软件及依赖包文件方法
  • MSI微星电脑冲锋坦克Pro Vector GP76 12UGS(MS-17K4)原厂Win11系统恢复镜像,含还原功能,预装OEM系统下载
  • Maven如何配置阿里云仓库/国内镜像
  • AutoGen:玩转多智能体团队协作 (Teams)
  • 数据中心精密列头柜 多回路数据采集器 功能参数介绍
  • 【嵌入式Linux应用开发基础】exec()函数族
  • 人工智能(AI)在癌症休眠研究及精准肿瘤学中的应用|顶刊速递·25-02-18
  • 【ELK】【Elasticsearch 】DSL 和 DQL
  • 【算法】双指针(上)
  • sql注入之python脚本进行时间盲注和布尔盲注
  • MySQL 日志
  • bash脚本----传参的处理
  • 五十天精通硬件设计第34天-CMOS 和 JFET 放大器中电流噪声的影响
  • 世界银行最新营商环境体检单:59个测评点,上海22项达全球最优水平
  • 上海飞银川客机触地复飞后备降西安,亲历者:不少乘客都吐了
  • 美CIA发布视频“招募中国官员窃取机密”,外交部:赤裸裸的政治挑衅
  • 罗马尼亚临时总统博洛让任命普雷多尤为看守政府总理
  • 特朗普:不谋求第三个总统任期,中意万斯鲁比奥“接棒”
  • 刘翔的赛会纪录被改写,田径钻石赛在赛场内外交出精彩答卷