IOCP + 重叠I/O 实例代码
#include <winsock2.h>
#include <ws2tcpip.h>
#include <mswsock.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>#pragma comment(lib, "ws2_32.lib") // 链接Winsock库
#pragma comment(lib, "mswsock.lib") // 链接微软Winsock扩展库#define BUFFER_SIZE 4096 // 数据缓冲区大小
#define MAX_THREADS 8 // 工作线程数量
#define SERVER_PORT "8888" // 服务器监听端口// 每个连接的数据结构 - 用于标识不同的客户端连接
typedef struct {SOCKET socket; // 客户端socket句柄char client_id[32]; // 客户端标识符
} PER_HANDLE_DATA;// 每个I/O操作的数据结构 - 包含重叠I/O所需的信息
typedef struct {OVERLAPPED overlapped; // 重叠I/O结构,必须放在第一个位置WSABUF wsaBuf; // 数据缓冲区描述char buffer[BUFFER_SIZE]; // 实际数据缓冲区int operationType; // 操作类型: 0=接收, 1=发送
} PER_IO_DATA;// 工作线程函数 - 处理I/O完成事件的核心线程
DWORD WINAPI WorkerThread(LPVOID lpParam) {HANDLE iocpHandle = (HANDLE)lpParam; // 获取完成端口句柄DWORD bytesTransferred; // 传输的字节数ULONG_PTR completionKey; // 完成键(指向PER_HANDLE_DATA)OVERLAPPED* overlapped; // 重叠结构指针PER_HANDLE_DATA* handleData; // 连接数据PER_IO_DATA* ioData; // I/O操作数据printf("工作线程 %d 启动\n", GetCurrentThreadId());// 主循环 - 持续等待和处理I/O完成事件while (TRUE) {// 等待I/O完成事件 - 核心函数,线程在此阻塞等待BOOL result = GetQueuedCompletionStatus(iocpHandle, // 完成端口句柄&bytesTransferred, // 输出的传输字节数&completionKey, // 输出的完成键(连接标识)&overlapped, // 输出的重叠结构指针INFINITE // 无限等待);// 从完成键和重叠结构指针获取对应的数据结构handleData = (PER_HANDLE_DATA*)completionKey;ioData = (PER_IO_DATA*)overlapped;// 检查连接是否关闭或出错 (bytesTransferred=0表示连接关闭)if (!result || bytesTransferred == 0) {// 连接关闭或出错的处理if (handleData) {printf("客户端 %s 断开连接\n", handleData->client_id);closesocket(handleData->socket); // 关闭socketfree(handleData); // 释放连接数据结构}if (ioData) {free(ioData); // 释放I/O操作数据结构}continue; // 继续等待下一个完成事件}// 处理成功的I/O完成事件if (ioData->operationType == 0) { // 接收操作完成printf("线程 %d 从 %s 接收: %.*s\n", GetCurrentThreadId(), handleData->client_id,bytesTransferred, ioData->buffer);// 准备回显数据: 将接收到的数据原样发送回去ioData->operationType = 1; // 标记为发送操作ioData->wsaBuf.len = bytesTransferred; // 设置发送长度// 异步发送数据回客户端DWORD sendBytes;int sendResult = WSASend(handleData->socket, // 目标socket&ioData->wsaBuf, // 数据缓冲区1, // 缓冲区数量&sendBytes, // 实际发送字节数(输出)0, // 标志位&ioData->overlapped, // 重叠结构NULL // 完成回调(不使用));// 检查发送是否成功 (WSA_IO_PENDING是正常情况)if (sendResult == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING) {printf("WSASend失败: %d\n", WSAGetLastError());closesocket(handleData->socket);free(handleData);free(ioData);}}else if (ioData->operationType == 1) { // 发送操作完成printf("线程 %d 向 %s 发送完成\n", GetCurrentThreadId(), handleData->client_id);// 发送完成后,重新投递接收操作,等待客户端下一次数据ioData->operationType = 0; // 标记为接收操作memset(ioData->buffer, 0, BUFFER_SIZE); // 清空缓冲区ioData->wsaBuf.len = BUFFER_SIZE; // 重置缓冲区长度// 投递异步接收操作DWORD recvBytes;DWORD flags = 0;int recvResult = WSARecv(handleData->socket, // 目标socket&ioData->wsaBuf, // 数据缓冲区1, // 缓冲区数量&recvBytes, // 实际接收字节数(输出)&flags, // 标志位&ioData->overlapped, // 重叠结构NULL // 完成回调(不使用));// 检查接收投递是否成功if (recvResult == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING) {printf("WSARecv失败: %d\n", WSAGetLastError());closesocket(handleData->socket);free(handleData);free(ioData);}}}return 0;
}// 接受连接的处理函数 - 专门处理客户端连接请求
DWORD WINAPI AcceptThread(LPVOID lpParam) {HANDLE iocpHandle = (HANDLE)lpParam; // 获取完成端口句柄// 创建监听socketSOCKET listenSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);if (listenSocket == INVALID_SOCKET) {printf("创建监听socket失败: %d\n", WSAGetLastError());return 1;}// 设置socket选项: 允许地址重用int optval = 1;setsockopt(listenSocket, SOL_SOCKET, SO_REUSEADDR, (char*)&optval, sizeof(optval));// 绑定服务器地址struct sockaddr_in serverAddr;serverAddr.sin_family = AF_INET; // IPv4serverAddr.sin_addr.s_addr = INADDR_ANY; // 监听所有网卡serverAddr.sin_port = htons(atoi(SERVER_PORT)); // 端口号if (bind(listenSocket, (struct sockaddr*)&serverAddr, sizeof(serverAddr)) == SOCKET_ERROR) {printf("绑定失败: %d\n", WSAGetLastError());closesocket(listenSocket);return 1;}// 开始监听客户端连接if (listen(listenSocket, SOMAXCONN) == SOCKET_ERROR) {printf("监听失败: %d\n", WSAGetLastError());closesocket(listenSocket);return 1;}printf("服务器开始在端口 %s 监听...\n", SERVER_PORT);int clientCount = 0; // 客户端计数器char clientId[32]; // 客户端ID缓冲区// 接受连接主循环while (TRUE) {// 接受客户端连接 (阻塞调用)SOCKET clientSocket = accept(listenSocket, NULL, NULL);if (clientSocket == INVALID_SOCKET) {printf("接受连接失败: %d\n", WSAGetLastError());continue; // 继续接受其他连接}// 为新连接生成客户端IDclientCount++;sprintf_s(clientId, sizeof(clientId), "Client%d", clientCount);printf("新连接: %s\n", clientId);// 为每个连接创建上下文数据PER_HANDLE_DATA* handleData = (PER_HANDLE_DATA*)malloc(sizeof(PER_HANDLE_DATA));if (!handleData) {printf("内存分配失败\n");closesocket(clientSocket);continue;}// 初始化连接数据handleData->socket = clientSocket;strcpy_s(handleData->client_id, sizeof(handleData->client_id), clientId);// 关键步骤: 将客户端socket与完成端口关联if (CreateIoCompletionPort((HANDLE)clientSocket, iocpHandle, (ULONG_PTR)handleData, 0) == NULL) {printf("关联完成端口失败: %d\n", GetLastError());closesocket(clientSocket);free(handleData);continue;}// 为I/O操作创建上下文数据PER_IO_DATA* ioData = (PER_IO_DATA*)malloc(sizeof(PER_IO_DATA));if (!ioData) {printf("内存分配失败\n");closesocket(clientSocket);free(handleData);continue;}// 初始化I/O操作数据ZeroMemory(&ioData->overlapped, sizeof(OVERLAPPED)); // 清空重叠结构ioData->wsaBuf.buf = ioData->buffer; // 设置缓冲区指针ioData->wsaBuf.len = BUFFER_SIZE; // 设置缓冲区大小ioData->operationType = 0; // 标记为接收操作memset(ioData->buffer, 0, BUFFER_SIZE); // 清空数据缓冲区// 关键步骤: 投递异步接收操作,开始等待客户端数据DWORD bytesReceived;DWORD flags = 0;int result = WSARecv(clientSocket, // 客户端socket&ioData->wsaBuf, // 数据缓冲区1, // 缓冲区数量&bytesReceived, // 接收字节数(输出)&flags, // 标志位&ioData->overlapped, // 重叠结构NULL // 完成回调);// 检查接收操作是否成功投递if (result == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING) {printf("WSARecv失败: %d\n", WSAGetLastError());closesocket(clientSocket);free(handleData);free(ioData);}// 如果返回WSA_IO_PENDING是正常情况,表示操作在进行中}closesocket(listenSocket);return 0;
}// 主函数 - 程序入口点
int main() {// 初始化Winsock库WSADATA wsaData;int result = WSAStartup(MAKEWORD(2, 2), &wsaData);if (result != 0) {printf("WSAStartup失败: %d\n", result);return 1;}printf("=== IOCP回声服务器启动 ===\n");printf("CPU核心数: 4\n");printf("工作线程数: 8\n");printf("监听端口: %s\n", SERVER_PORT);// 1. 创建完成端口 - IOCP核心组件HANDLE iocpHandle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);if (iocpHandle == NULL) {printf("创建完成端口失败: %d\n", GetLastError());WSACleanup();return 1;}// 2. 创建工作线程池 - 处理I/O完成事件HANDLE workerThreads[MAX_THREADS];for (int i = 0; i < MAX_THREADS; i++) {workerThreads[i] = CreateThread(NULL, 0, WorkerThread, iocpHandle, 0, NULL);if (workerThreads[i] == NULL) {printf("创建工作线程失败: %d\n", GetLastError());WSACleanup();return 1;}}printf("创建了 %d 个工作线程\n", MAX_THREADS);// 3. 创建接受连接线程 - 专门处理新连接HANDLE acceptThread = CreateThread(NULL, 0, AcceptThread, iocpHandle, 0, NULL);if (acceptThread == NULL) {printf("创建接受线程失败: %d\n", GetLastError());WSACleanup();return 1;}printf("服务器初始化完成,等待客户端连接...\n");printf("按任意键退出...\n");getchar(); // 等待用户输入,保持服务器运行// 清理资源CloseHandle(iocpHandle); // 关闭完成端口for (int i = 0; i < MAX_THREADS; i++) {TerminateThread(workerThreads[i], 0); // 终止工作线程CloseHandle(workerThreads[i]); // 关闭线程句柄}TerminateThread(acceptThread, 0); // 终止接受线程CloseHandle(acceptThread); // 关闭线程句柄WSACleanup(); // 清理Winsockreturn 0;
}
// client_test.cpp - 简单的测试客户端
#include <winsock2.h>
#include <ws2tcpip.h>
#include <stdio.h>
#include <string.h>#pragma comment(lib, "ws2_32.lib")int main() {// 初始化WinsockWSADATA wsaData;WSAStartup(MAKEWORD(2, 2), &wsaData);// 创建客户端socketSOCKET clientSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);// 设置服务器地址struct sockaddr_in serverAddr;serverAddr.sin_family = AF_INET;serverAddr.sin_addr.s_addr = inet_addr("127.0.0.1"); // 本地回环地址serverAddr.sin_port = htons(8888); // 服务器端口// 连接服务器if (connect(clientSocket, (struct sockaddr*)&serverAddr, sizeof(serverAddr)) == 0) {printf("连接到服务器成功\n");// 测试消息数组const char* testMessages[] = {"Hello", "World", "Test"};// 发送三条测试消息for (int i = 0; i < 3; i++) {// 发送消息到服务器send(clientSocket, testMessages[i], strlen(testMessages[i]), 0);printf("发送: %s\n", testMessages[i]);// 接收服务器的回显char buffer[1024];int received = recv(clientSocket, buffer, sizeof(buffer) - 1, 0);if (received > 0) {buffer[received] = '\0'; // 添加字符串结束符printf("接收: %s\n", buffer); // 打印回显数据}Sleep(1000); // 等待1秒,模拟间隔}printf("测试完成,断开连接\n");} else {printf("连接失败\n");}// 清理资源closesocket(clientSocket);WSACleanup();return 0;
}核心工作机制总结
IOCP + 重叠I/O 工作流程:
1. 初始化阶段
· 创建完成端口(IOCP)
· 创建工作线程池(阻塞在GetQueuedCompletionStatus)
· 创建监听线程接受连接
2. 连接处理阶段
· 接受新客户端连接
· 创建连接上下文(PER_HANDLE_DATA)
· 关联socket与完成端口
· 投递初始接收操作(WSARecv)
3. 数据处理循环
· 客户端发送数据 → 系统完成接收 → IOCP队列收到完成通知
· 工作线程被唤醒 → 处理数据 → 发送回显(WSASend)
· 发送完成 → 重新投递接收操作 → 等待下一次数据
4. 资源清理
· 连接关闭时系统发送bytesTransferred=0的完成通知
· 工作线程检测到后释放相关资源
关键优势:
· 高并发: 少量线程处理大量连接
· 零拷贝: 数据直接在系统缓冲区和应用缓冲区间传输
· 智能调度: IOCP自动负载均衡,优先唤醒空闲线程
· 完全异步: 所有I/O操作非阻塞,无线程等待浪费
