Windows的多路复用IOCP
IOCP
完成端口概述
IOCP(Input/Output Completion Port)提供了一个高效的线程模型,用于处理多处理器系统上的多个异步IO请求。当进程创建IO完成端口时,系统会为线程创建关联的队列对象,该对象的唯一目的是为这些请求提供服务。与在收到IO请求时创建线程相比,处理多个并发异步IO请求的进程可以通过将IO完成端口与预分配的线程池结合使用用来更快、更高效地执行此操作。
完成端口不是指物理上的端口,也不是指网络中的端口,而是指操作系统所提供的一种机制。这种机制是windows提供的一种高效处理io操作结果的通知机制。
IOCP工作原理
CreateIoCompletionPort函数创建IO完成端口,并将一个或多个文件句柄与该端口相关联。当其中一个文件句柄上的异步IO操作完成时,IO完成数据包将排入先进先出(FIFO)相关IO完成端口的队列。
线程如何与IOCP相关联?绑定关系如何?
当线程调用GetQueuedCompletionStatus时,线程将完成与IOCP完成关联,当线程退出、指定其他lOCP、或关闭lOCP时,解除关联。一个线程最多与一个IOCP关联,但是可多个线程关联同一个IOCP。
完成通知如何被应用程序捕获?
当io事件完成,将在IOCP中的完成队列排队,通过正在调用GetQueuedCompletionStatus的线程取出完成通知,或者系统通过唤醒与该IOCP关联并在阻塞等待的线程取出完成通知。
如何正确关闭与IOCP关联的文件句柄?
IO完成端口句柄以及与该特定IO完成端口关联的每个文件句柄称为对IO完成端口的引用。当不再引用IO完成端口时,将释放该端口。因此,必须正确关闭所有这些句柄才能释放IO完成端口及其关联的系统资源。满足这些条件后,应用程序应通过调用CloseHandle函数关闭IO完成端口句柄。
IOCP原理图
什么是重叠IO
无需等待上一个IO操作完成就可以提交下一个IO操作请求。也就是这些IO操作可以堆叠在一起.
注意:尽管IO操作是按顺序投递的,但是IO操作完成通知可以是随机无序的(在多线程等待IO完成通知时)。
typedef struct _OVERLAPPED {ULONG_PTR Internal;ULONG_PTR InternalHigh;union {struct {DWORD Offset;DWORD OffsetHigh;} DUMMYSTRUCTNAME;PVOID Pointer;} DUMMYUNIONNAME;hEvent;HANDLE
} OVERLAPPED, *LPOVERLAPPED;
/*注意:在函数调用中使用结构之前,应始终将此结构的任何河未使用成员初始化为零。*/
同步IO与异步IO的区别
windows异步IO接口
IOCP网络编程案例
#include <iostream>
#include <winsock2.h>
#include <mswsock.h>
#include <vector>
#include <process.h>#pragma comment(lib, "ws2_32.lib")
#pragma comment(lib, "mswsock.lib")// 定义缓冲区大小
#define BUFFER_SIZE 1024
// 工作线程数量,通常设为CPU核心数的2倍
#define WORKER_THREAD_COUNT 4// 客户端上下文结构,保存每个客户端的信息
struct ClientContext {SOCKET socket; // 客户端SocketWSAOVERLAPPED overlapped; // 重叠I/O结构char buffer[BUFFER_SIZE]; // 数据缓冲区WSABUF wsaBuf; // WSABUF结构,用于WSARecv/WSASendint operationType; // 操作类型:1=接收,2=发送DWORD bytesTransferred; // 传输的字节数
};// IOCP句柄
HANDLE g_hCompletionPort;
// 监听Socket
SOCKET g_listenSocket;
// 退出标志
bool g_quit = false;// 函数声明
bool InitWinsock();
bool CreateListenSocket(int port);
unsigned int __stdcall WorkerThread(LPVOID lpParam);
void AcceptClient();
void HandleIOCompletion(ClientContext* pContext, DWORD bytesTransferred, DWORD errorCode);int main(int argc, char* argv[]) {int port = 8888; // 默认端口// 初始化Winsockif (!InitWinsock()) {std::cerr << "初始化Winsock失败" << std::endl;return 1;}// 创建监听Socketif (!CreateListenSocket(port)) {std::cerr << "创建监听Socket失败" << std::endl;WSACleanup();return 1;}std::cout << "服务器启动,监听端口: " << port << std::endl;std::cout << "按Ctrl+C退出..." << std::endl;// 创建IOCP对象g_hCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);if (g_hCompletionPort == NULL) {std::cerr << "CreateIoCompletionPort失败,错误码: " << GetLastError() << std::endl;closesocket(g_listenSocket);WSACleanup();return 1;}// 创建工作线程std::vector<HANDLE> workerThreads;for (int i = 0; i < WORKER_THREAD_COUNT; ++i) {unsigned int threadId;HANDLE hThread = (HANDLE)_beginthreadex(NULL, // 默认安全属性0, // 默认栈大小WorkerThread, // 线程函数NULL, // 传递给线程的参数0, // 立即执行线程&threadId);if (hThread == NULL) {std::cerr << "创建工作线程失败,错误码: " << GetLastError() << std::endl;// 清理已创建的资源for (auto h : workerThreads) CloseHandle(h);CloseHandle(g_hCompletionPort);closesocket(g_listenSocket);WSACleanup();return 1;}workerThreads.push_back(hThread);}// 将监听Socket与IOCP关联(虽然监听Socket不直接用于IO,但可以用于通知新连接)CreateIoCompletionPort((HANDLE)g_listenSocket, g_hCompletionPort, (ULONG_PTR)NULL, 0);// 开始接受客户端连接while (!g_quit) {AcceptClient();Sleep(100); // 简单的延迟,避免CPU占用过高}// 发送退出通知给所有工作线程for (int i = 0; i < WORKER_THREAD_COUNT; ++i) {PostQueuedCompletionStatus(g_hCompletionPort, 0, 0, NULL);}// 等待所有工作线程退出WaitForMultipleObjects(workerThreads.size(), workerThreads.data(), TRUE, INFINITE);// 清理资源for (auto h : workerThreads) CloseHandle(h);CloseHandle(g_hCompletionPort);closesocket(g_listenSocket);WSACleanup();std::cout << "服务器已关闭" << std::endl;return 0;
}// 初始化Winsock
bool InitWinsock() {WSADATA wsaData;int result = WSAStartup(MAKEWORD(2, 2), &wsaData);if (result != 0) {std::cerr << "WSAStartup失败,错误码: " << result << std::endl;return false;}return true;
}// 创建监听Socket
bool CreateListenSocket(int port) {// 创建TCP Socketg_listenSocket = WSASocket(AF_INET, // IPv4SOCK_STREAM, // 流式Socket(TCP)IPPROTO_TCP, // TCP协议NULL, // 无特定协议0, // 无标志WSA_FLAG_OVERLAPPED // 重叠I/O标志);if (g_listenSocket == INVALID_SOCKET) {std::cerr << "WSASocket失败,错误码: " << WSAGetLastError() << std::endl;return false;}// 设置Socket选项,允许地址重用int optVal = 1;if (setsockopt(g_listenSocket, SOL_SOCKET, SO_REUSEADDR, (char*)&optVal, sizeof(optVal)) == SOCKET_ERROR) {std::cerr << "setsockopt失败,错误码: " << WSAGetLastError() << std::endl;closesocket(g_listenSocket);return false;}// 绑定Socket到指定端口sockaddr_in serverAddr;serverAddr.sin_family = AF_INET;serverAddr.sin_addr.s_addr = INADDR_ANY; // 监听所有网络接口serverAddr.sin_port = htons(port); // 端口号if (bind(g_listenSocket, (SOCKADDR*)&serverAddr, sizeof(serverAddr)) == SOCKET_ERROR) {std::cerr << "bind失败,错误码: " << WSAGetLastError() << std::endl;closesocket(g_listenSocket);return false;}// 开始监听,最大等待队列长度为5if (listen(g_listenSocket, 5) == SOCKET_ERROR) {std::cerr << "listen失败,错误码: " << WSAGetLastError() << std::endl;closesocket(g_listenSocket);return false;}return true;
}// 工作线程函数,处理IO完成通知
unsigned int __stdcall WorkerThread(LPVOID lpParam) {UNREFERENCED_PARAMETER(lpParam);DWORD bytesTransferred;ULONG_PTR completionKey;LPOVERLAPPED pOverlapped;while (true) {// 等待IO完成通知BOOL result = GetQueuedCompletionStatus(g_hCompletionPort, // IOCP句柄&bytesTransferred, // 传输的字节数&completionKey, // 完成键&pOverlapped, // 重叠结构INFINITE // 无限等待);// 检查是否是退出通知if (pOverlapped == NULL) {std::cout << "工作线程退出" << std::endl;break;}if (!result) {// 获取完成状态失败DWORD errorCode = GetLastError();std::cerr << "GetQueuedCompletionStatus失败,错误码: " << errorCode << std::endl;// 获取客户端上下文ClientContext* pContext = CONTAINING_RECORD(pOverlapped, ClientContext, overlapped);HandleIOCompletion(pContext, bytesTransferred, errorCode);} else {// 成功获取完成状态ClientContext* pContext = CONTAINING_RECORD(pOverlapped, ClientContext, overlapped);HandleIOCompletion(pContext, bytesTransferred, 0);}}return 0;
}// 接受客户端连接
void AcceptClient() {SOCKET clientSocket = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);if (clientSocket == INVALID_SOCKET) {std::cerr << "创建客户端Socket失败,错误码: " << WSAGetLastError() << std::endl;return;}// 异步接受连接sockaddr_in clientAddr;int addrLen = sizeof(clientAddr);DWORD bytesReceived;// 创建AcceptEx所需的缓冲区char acceptBuffer[sizeof(sockaddr_in) + 16 * 2]; // 两个地址的空间// 调用AcceptEx异步接受连接if (AcceptEx(g_listenSocket,clientSocket,acceptBuffer,0, // 不接收数据sizeof(sockaddr_in) + 16,sizeof(sockaddr_in) + 16,&bytesReceived,(LPOVERLAPPED)new ClientContext{clientSocket} // 使用ClientContext存储客户端Socket) == FALSE) {int errorCode = WSAGetLastError();if (errorCode != WSA_IO_PENDING) {// 发生错误,不是正常的IO挂起状态std::cerr << "AcceptEx失败,错误码: " << errorCode << std::endl;closesocket(clientSocket);return;}}
}// 处理IO完成事件
void HandleIOCompletion(ClientContext* pContext, DWORD bytesTransferred, DWORD errorCode) {// 检查是否有错误发生if (errorCode != 0 || bytesTransferred == 0) {std::cout << "客户端断开连接,Socket: " << pContext->socket << std::endl;closesocket(pContext->socket);delete pContext;return;}// 判断操作类型if (pContext->operationType == 0) {// 新连接完成std::cout << "新客户端连接,Socket: " << pContext->socket << std::endl;// 将客户端Socket与IOCP关联CreateIoCompletionPort((HANDLE)pContext->socket, g_hCompletionPort, (ULONG_PTR)pContext, 0);// 初始化客户端上下文,准备接收数据pContext->operationType = 1; // 接收操作pContext->wsaBuf.buf = pContext->buffer;pContext->wsaBuf.len = BUFFER_SIZE;// 发起异步接收DWORD flags = 0;if (WSARecv(pContext->socket,&pContext->wsaBuf,1,&pContext->bytesTransferred,&flags,&pContext->overlapped,NULL) == SOCKET_ERROR) {int error = WSAGetLastError();if (error != WSA_IO_PENDING) {std::cerr << "WSARecv失败,错误码: " << error << std::endl;closesocket(pContext->socket);delete pContext;return;}}// 继续接受新的连接AcceptClient();}else if (pContext->operationType == 1) {// 接收操作完成std::cout << "从Socket " << pContext->socket << " 接收数据: " << std::string(pContext->buffer, bytesTransferred) << std::endl;// 将接收到的数据原样返回(回声服务器)pContext->operationType = 2; // 发送操作pContext->wsaBuf.len = bytesTransferred;if (WSASend(pContext->socket,&pContext->wsaBuf,1,&pContext->bytesTransferred,0,&pContext->overlapped,NULL) == SOCKET_ERROR) {int error = WSAGetLastError();if (error != WSA_IO_PENDING) {std::cerr << "WSASend失败,错误码: " << error << std::endl;closesocket(pContext->socket);delete pContext;return;}}}else if (pContext->operationType == 2) {// 发送操作完成std::cout << "向Socket " << pContext->socket << " 发送数据完成,字节数: " << bytesTransferred << std::endl;// 继续接收数据pContext->operationType = 1; // 接收操作pContext->wsaBuf.len = BUFFER_SIZE;DWORD flags = 0;if (WSARecv(pContext->socket,&pContext->wsaBuf,1,&pContext->bytesTransferred,&flags,&pContext->overlapped,NULL) == SOCKET_ERROR) {int error = WSAGetLastError();if (error != WSA_IO_PENDING) {std::cerr << "WSARecv失败,错误码: " << error << std::endl;closesocket(pContext->socket);delete pContext;return;}}}
}// 处理Ctrl+C信号,实现优雅退出
BOOL WINAPI ConsoleHandler(DWORD dwCtrlType) {if (dwCtrlType == CTRL_C_EVENT) {std::cout << "接收到退出信号,正在关闭服务器..." << std::endl;g_quit = true;return TRUE;}return FALSE;
}
代码解析
这个IOCP服务器示例实现了一个简单的回声服务器,核心功能包括:
-
初始化部分:
- 初始化Winsock库
- 创建监听Socket并绑定到指定端口
- 创建IOCP对象
- 启动工作线程池(数量通常为CPU核心数的2倍)
-
IOCP核心机制:
- 使用
CreateIoCompletionPort
创建IOCP对象并关联Socket - 工作线程通过
GetQueuedCompletionStatus
等待IO完成通知 - 使用
PostQueuedCompletionStatus
发送退出通知
- 使用
-
异步操作流程:
- 使用
AcceptEx
异步接受客户端连接 - 使用
WSARecv
异步接收数据 - 使用
WSASend
异步发送数据 - 通过
ClientContext
结构体维护每个客户端的状态
- 使用
-
关键数据结构:
WSAOVERLAPPED
:存储重叠I/O操作的状态WSABUF
:用于异步数据传输的缓冲区ClientContext
:自定义结构体,整合每个客户端的所有信息
使用说明
- 编译环境:需要Windows平台的C++编译器(如MSVC)
- 功能:启动后监听8888端口,接收客户端连接并将收到的数据原样返回
- 测试:可使用telnet或其他TCP客户端工具连接测试
- 退出:按Ctrl+C发送退出信号,服务器会优雅关闭
这个示例展示了IOCP的基本工作原理,在实际应用中还可以根据需要进行扩展,如添加更完善的错误处理、连接管理、消息解析等功能。