当前位置: 首页 > news >正文

Windows的多路复用IOCP

IOCP

完成端口概述

IOCP(Input/Output Completion Port)提供了一个高效的线程模型,用于处理多处理器系统上的多个异步IO请求。当进程创建IO完成端口时,系统会为线程创建关联的队列对象,该对象的唯一目的是为这些请求提供服务。与在收到IO请求时创建线程相比,处理多个并发异步IO请求的进程可以通过将IO完成端口与预分配的线程池结合使用用来更快、更高效地执行此操作。
完成端口不是指物理上的端口,也不是指网络中的端口,而是指操作系统所提供的一种机制。这种机制是windows提供的一种高效处理io操作结果的通知机制。

IOCP工作原理

CreateIoCompletionPort函数创建IO完成端口,并将一个或多个文件句柄与该端口相关联。当其中一个文件句柄上的异步IO操作完成时,IO完成数据包将排入先进先出(FIFO)相关IO完成端口的队列。
线程如何与IOCP相关联?绑定关系如何?
当线程调用GetQueuedCompletionStatus时,线程将完成与IOCP完成关联,当线程退出、指定其他lOCP、或关闭lOCP时,解除关联。一个线程最多与一个IOCP关联,但是可多个线程关联同一个IOCP。
完成通知如何被应用程序捕获?
当io事件完成,将在IOCP中的完成队列排队,通过正在调用GetQueuedCompletionStatus的线程取出完成通知,或者系统通过唤醒与该IOCP关联并在阻塞等待的线程取出完成通知。
如何正确关闭与IOCP关联的文件句柄?
IO完成端口句柄以及与该特定IO完成端口关联的每个文件句柄称为对IO完成端口的引用。当不再引用IO完成端口时,将释放该端口。因此,必须正确关闭所有这些句柄才能释放IO完成端口及其关联的系统资源。满足这些条件后,应用程序应通过调用CloseHandle函数关闭IO完成端口句柄。

IOCP原理图

在这里插入图片描述

什么是重叠IO

无需等待上一个IO操作完成就可以提交下一个IO操作请求。也就是这些IO操作可以堆叠在一起.
注意:尽管IO操作是按顺序投递的,但是IO操作完成通知可以是随机无序的(在多线程等待IO完成通知时)。

typedef struct _OVERLAPPED {ULONG_PTR Internal;ULONG_PTR InternalHigh;union {struct {DWORD Offset;DWORD OffsetHigh;} DUMMYSTRUCTNAME;PVOID Pointer;} DUMMYUNIONNAME;hEvent;HANDLE
} OVERLAPPED, *LPOVERLAPPED;
/*注意:在函数调用中使用结构之前,应始终将此结构的任何河未使用成员初始化为零。*/

同步IO与异步IO的区别

windows异步IO接口

IOCP网络编程案例


#include <iostream>
#include <winsock2.h>
#include <mswsock.h>
#include <vector>
#include <process.h>#pragma comment(lib, "ws2_32.lib")
#pragma comment(lib, "mswsock.lib")// 定义缓冲区大小
#define BUFFER_SIZE 1024
// 工作线程数量,通常设为CPU核心数的2倍
#define WORKER_THREAD_COUNT 4// 客户端上下文结构,保存每个客户端的信息
struct ClientContext {SOCKET socket;               // 客户端SocketWSAOVERLAPPED overlapped;    // 重叠I/O结构char buffer[BUFFER_SIZE];    // 数据缓冲区WSABUF wsaBuf;               // WSABUF结构,用于WSARecv/WSASendint operationType;           // 操作类型:1=接收,2=发送DWORD bytesTransferred;      // 传输的字节数
};// IOCP句柄
HANDLE g_hCompletionPort;
// 监听Socket
SOCKET g_listenSocket;
// 退出标志
bool g_quit = false;// 函数声明
bool InitWinsock();
bool CreateListenSocket(int port);
unsigned int __stdcall WorkerThread(LPVOID lpParam);
void AcceptClient();
void HandleIOCompletion(ClientContext* pContext, DWORD bytesTransferred, DWORD errorCode);int main(int argc, char* argv[]) {int port = 8888; // 默认端口// 初始化Winsockif (!InitWinsock()) {std::cerr << "初始化Winsock失败" << std::endl;return 1;}// 创建监听Socketif (!CreateListenSocket(port)) {std::cerr << "创建监听Socket失败" << std::endl;WSACleanup();return 1;}std::cout << "服务器启动,监听端口: " << port << std::endl;std::cout << "按Ctrl+C退出..." << std::endl;// 创建IOCP对象g_hCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);if (g_hCompletionPort == NULL) {std::cerr << "CreateIoCompletionPort失败,错误码: " << GetLastError() << std::endl;closesocket(g_listenSocket);WSACleanup();return 1;}// 创建工作线程std::vector<HANDLE> workerThreads;for (int i = 0; i < WORKER_THREAD_COUNT; ++i) {unsigned int threadId;HANDLE hThread = (HANDLE)_beginthreadex(NULL,           // 默认安全属性0,              // 默认栈大小WorkerThread,   // 线程函数NULL,           // 传递给线程的参数0,              // 立即执行线程&threadId);if (hThread == NULL) {std::cerr << "创建工作线程失败,错误码: " << GetLastError() << std::endl;// 清理已创建的资源for (auto h : workerThreads) CloseHandle(h);CloseHandle(g_hCompletionPort);closesocket(g_listenSocket);WSACleanup();return 1;}workerThreads.push_back(hThread);}// 将监听Socket与IOCP关联(虽然监听Socket不直接用于IO,但可以用于通知新连接)CreateIoCompletionPort((HANDLE)g_listenSocket, g_hCompletionPort, (ULONG_PTR)NULL, 0);// 开始接受客户端连接while (!g_quit) {AcceptClient();Sleep(100); // 简单的延迟,避免CPU占用过高}// 发送退出通知给所有工作线程for (int i = 0; i < WORKER_THREAD_COUNT; ++i) {PostQueuedCompletionStatus(g_hCompletionPort, 0, 0, NULL);}// 等待所有工作线程退出WaitForMultipleObjects(workerThreads.size(), workerThreads.data(), TRUE, INFINITE);// 清理资源for (auto h : workerThreads) CloseHandle(h);CloseHandle(g_hCompletionPort);closesocket(g_listenSocket);WSACleanup();std::cout << "服务器已关闭" << std::endl;return 0;
}// 初始化Winsock
bool InitWinsock() {WSADATA wsaData;int result = WSAStartup(MAKEWORD(2, 2), &wsaData);if (result != 0) {std::cerr << "WSAStartup失败,错误码: " << result << std::endl;return false;}return true;
}// 创建监听Socket
bool CreateListenSocket(int port) {// 创建TCP Socketg_listenSocket = WSASocket(AF_INET,         // IPv4SOCK_STREAM,     // 流式Socket(TCP)IPPROTO_TCP,     // TCP协议NULL,            // 无特定协议0,               // 无标志WSA_FLAG_OVERLAPPED  // 重叠I/O标志);if (g_listenSocket == INVALID_SOCKET) {std::cerr << "WSASocket失败,错误码: " << WSAGetLastError() << std::endl;return false;}// 设置Socket选项,允许地址重用int optVal = 1;if (setsockopt(g_listenSocket, SOL_SOCKET, SO_REUSEADDR, (char*)&optVal, sizeof(optVal)) == SOCKET_ERROR) {std::cerr << "setsockopt失败,错误码: " << WSAGetLastError() << std::endl;closesocket(g_listenSocket);return false;}// 绑定Socket到指定端口sockaddr_in serverAddr;serverAddr.sin_family = AF_INET;serverAddr.sin_addr.s_addr = INADDR_ANY; // 监听所有网络接口serverAddr.sin_port = htons(port);       // 端口号if (bind(g_listenSocket, (SOCKADDR*)&serverAddr, sizeof(serverAddr)) == SOCKET_ERROR) {std::cerr << "bind失败,错误码: " << WSAGetLastError() << std::endl;closesocket(g_listenSocket);return false;}// 开始监听,最大等待队列长度为5if (listen(g_listenSocket, 5) == SOCKET_ERROR) {std::cerr << "listen失败,错误码: " << WSAGetLastError() << std::endl;closesocket(g_listenSocket);return false;}return true;
}// 工作线程函数,处理IO完成通知
unsigned int __stdcall WorkerThread(LPVOID lpParam) {UNREFERENCED_PARAMETER(lpParam);DWORD bytesTransferred;ULONG_PTR completionKey;LPOVERLAPPED pOverlapped;while (true) {// 等待IO完成通知BOOL result = GetQueuedCompletionStatus(g_hCompletionPort,      // IOCP句柄&bytesTransferred,      // 传输的字节数&completionKey,         // 完成键&pOverlapped,           // 重叠结构INFINITE                // 无限等待);// 检查是否是退出通知if (pOverlapped == NULL) {std::cout << "工作线程退出" << std::endl;break;}if (!result) {// 获取完成状态失败DWORD errorCode = GetLastError();std::cerr << "GetQueuedCompletionStatus失败,错误码: " << errorCode << std::endl;// 获取客户端上下文ClientContext* pContext = CONTAINING_RECORD(pOverlapped, ClientContext, overlapped);HandleIOCompletion(pContext, bytesTransferred, errorCode);} else {// 成功获取完成状态ClientContext* pContext = CONTAINING_RECORD(pOverlapped, ClientContext, overlapped);HandleIOCompletion(pContext, bytesTransferred, 0);}}return 0;
}// 接受客户端连接
void AcceptClient() {SOCKET clientSocket = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);if (clientSocket == INVALID_SOCKET) {std::cerr << "创建客户端Socket失败,错误码: " << WSAGetLastError() << std::endl;return;}// 异步接受连接sockaddr_in clientAddr;int addrLen = sizeof(clientAddr);DWORD bytesReceived;// 创建AcceptEx所需的缓冲区char acceptBuffer[sizeof(sockaddr_in) + 16 * 2]; // 两个地址的空间// 调用AcceptEx异步接受连接if (AcceptEx(g_listenSocket,clientSocket,acceptBuffer,0, // 不接收数据sizeof(sockaddr_in) + 16,sizeof(sockaddr_in) + 16,&bytesReceived,(LPOVERLAPPED)new ClientContext{clientSocket} // 使用ClientContext存储客户端Socket) == FALSE) {int errorCode = WSAGetLastError();if (errorCode != WSA_IO_PENDING) {// 发生错误,不是正常的IO挂起状态std::cerr << "AcceptEx失败,错误码: " << errorCode << std::endl;closesocket(clientSocket);return;}}
}// 处理IO完成事件
void HandleIOCompletion(ClientContext* pContext, DWORD bytesTransferred, DWORD errorCode) {// 检查是否有错误发生if (errorCode != 0 || bytesTransferred == 0) {std::cout << "客户端断开连接,Socket: " << pContext->socket << std::endl;closesocket(pContext->socket);delete pContext;return;}// 判断操作类型if (pContext->operationType == 0) {// 新连接完成std::cout << "新客户端连接,Socket: " << pContext->socket << std::endl;// 将客户端Socket与IOCP关联CreateIoCompletionPort((HANDLE)pContext->socket, g_hCompletionPort, (ULONG_PTR)pContext, 0);// 初始化客户端上下文,准备接收数据pContext->operationType = 1; // 接收操作pContext->wsaBuf.buf = pContext->buffer;pContext->wsaBuf.len = BUFFER_SIZE;// 发起异步接收DWORD flags = 0;if (WSARecv(pContext->socket,&pContext->wsaBuf,1,&pContext->bytesTransferred,&flags,&pContext->overlapped,NULL) == SOCKET_ERROR) {int error = WSAGetLastError();if (error != WSA_IO_PENDING) {std::cerr << "WSARecv失败,错误码: " << error << std::endl;closesocket(pContext->socket);delete pContext;return;}}// 继续接受新的连接AcceptClient();}else if (pContext->operationType == 1) {// 接收操作完成std::cout << "从Socket " << pContext->socket << " 接收数据: " << std::string(pContext->buffer, bytesTransferred) << std::endl;// 将接收到的数据原样返回(回声服务器)pContext->operationType = 2; // 发送操作pContext->wsaBuf.len = bytesTransferred;if (WSASend(pContext->socket,&pContext->wsaBuf,1,&pContext->bytesTransferred,0,&pContext->overlapped,NULL) == SOCKET_ERROR) {int error = WSAGetLastError();if (error != WSA_IO_PENDING) {std::cerr << "WSASend失败,错误码: " << error << std::endl;closesocket(pContext->socket);delete pContext;return;}}}else if (pContext->operationType == 2) {// 发送操作完成std::cout << "向Socket " << pContext->socket << " 发送数据完成,字节数: " << bytesTransferred << std::endl;// 继续接收数据pContext->operationType = 1; // 接收操作pContext->wsaBuf.len = BUFFER_SIZE;DWORD flags = 0;if (WSARecv(pContext->socket,&pContext->wsaBuf,1,&pContext->bytesTransferred,&flags,&pContext->overlapped,NULL) == SOCKET_ERROR) {int error = WSAGetLastError();if (error != WSA_IO_PENDING) {std::cerr << "WSARecv失败,错误码: " << error << std::endl;closesocket(pContext->socket);delete pContext;return;}}}
}// 处理Ctrl+C信号,实现优雅退出
BOOL WINAPI ConsoleHandler(DWORD dwCtrlType) {if (dwCtrlType == CTRL_C_EVENT) {std::cout << "接收到退出信号,正在关闭服务器..." << std::endl;g_quit = true;return TRUE;}return FALSE;
}

代码解析

这个IOCP服务器示例实现了一个简单的回声服务器,核心功能包括:

  1. 初始化部分

    • 初始化Winsock库
    • 创建监听Socket并绑定到指定端口
    • 创建IOCP对象
    • 启动工作线程池(数量通常为CPU核心数的2倍)
  2. IOCP核心机制

    • 使用CreateIoCompletionPort创建IOCP对象并关联Socket
    • 工作线程通过GetQueuedCompletionStatus等待IO完成通知
    • 使用PostQueuedCompletionStatus发送退出通知
  3. 异步操作流程

    • 使用AcceptEx异步接受客户端连接
    • 使用WSARecv异步接收数据
    • 使用WSASend异步发送数据
    • 通过ClientContext结构体维护每个客户端的状态
  4. 关键数据结构

    • WSAOVERLAPPED:存储重叠I/O操作的状态
    • WSABUF:用于异步数据传输的缓冲区
    • ClientContext:自定义结构体,整合每个客户端的所有信息

使用说明

  1. 编译环境:需要Windows平台的C++编译器(如MSVC)
  2. 功能:启动后监听8888端口,接收客户端连接并将收到的数据原样返回
  3. 测试:可使用telnet或其他TCP客户端工具连接测试
  4. 退出:按Ctrl+C发送退出信号,服务器会优雅关闭

这个示例展示了IOCP的基本工作原理,在实际应用中还可以根据需要进行扩展,如添加更完善的错误处理、连接管理、消息解析等功能。

http://www.dtcms.com/a/419862.html

相关文章:

  • 做网站的规范暴雪公司现状
  • 前端提效工具清单,常用前端效率工具推荐与开发提效实战经验
  • 做的网站怎么把技术支持去掉个人社保缴费年限怎么查询
  • 李宏毅机器学习笔记15
  • 数字化转型:开发者思维破局之道
  • 网站会员功能介绍营销背景包括哪些内容
  • 【NCS随笔】peripheral_hids_mouse例程修改为不使用PIN码绑定
  • 第三方软件验收测试:【AutoIt与Selenium结合测试文件上传/下载等Windows对话框】
  • 网站的二级目录是什么10个不愁销路的小型加工厂
  • K8S中关于容器对外提供服务网络类型
  • 建设网站需要虚拟空间嘛专业网站制作公司采用哪些技术制作网站?
  • 超声波水表:原理、实现与核心技术解析
  • 怎样 建设电子商务网站直播网站app开发
  • Nginx 核心功能配置:访问控制、用户认证、HTTPS 与 URL 重写等
  • 大模型显存占用完全指南:从训练到推理的计算公式与实战案例(建议收藏)
  • 惠州做网站采招网招标官网
  • 烟台做网站找哪家好哪个网站可以做海报
  • 【星海出品】计算机科学之磁盘数据读取时间逻辑
  • 模力通AI风格仿写 让公文写作告别“风格焦虑”
  • 构建AI智能体:四十七、Agent2Agent多智能体系统:基础通信与任务协作实现
  • 天猫建设网站的意义张家港网站建设做网站
  • python爬虫进阶版练习(只说重点,selenium)
  • 东莞网站设计教程为企业做好服务保障
  • 福州网站建设q.479185700強网页翻译网站
  • 134、【OS】【Nuttx】【周边】效果呈现方案解析:端口映射(三)
  • 网站开发 报价单网站源码asp
  • Java HHH000490: Using JtaPlatform implementation
  • 网站关键词检测郑州外贸网站推广
  • 苏州网站开发的企业wordpress 结合qq
  • 在Linux中安装应用