当前位置: 首页 > news >正文

2.5.3 windows编程iocp

文章目录

  • 2.5.3 windows编程iocp
    • 1. iocp
      • 1. reactor与iocp
      • 2. 阻塞与非阻塞都是同步io:在io检测过程中,如果 io 未就绪,阻塞是阻塞线程等待就绪非阻塞不阻塞线程等待,立刻返回
    • 2. 几个问题
      • 1. 网络编程要解决哪几个问题?分别怎么解决?
      • 2. reactor 与 select\poll\epoll 关系?
      • 3. iocp 与上面哪个概念接近?
      • 4. iocp 原理是什么?
      • 5. iocp 编程细节
      • 6. 重叠 IO

2.5.3 windows编程iocp

1. iocp

1. reactor与iocp

reactor 同步 IO,异步事件
函数返回,说明io已经操作了

iocp 异步 IO,异步事件,proactor
只是投递一个事件,AcceptEx、WSARecv、WSASend、ConnectEx, 在这个接口中 io 未操作

2. 阻塞与非阻塞都是同步io:在io检测过程中,如果 io 未就绪,阻塞是阻塞线程等待就绪非阻塞不阻塞线程等待,立刻返回

阻塞与非阻塞是描述I/O操作在数据准备阶段的行为,而同步与异步是描述I/O操作在数据读写阶段的行为。
阻塞与非阻塞都属于同步I/O,原因如下:

  1. ​阻塞与非阻塞的区别:

​阻塞I/O:在数据准备阶段,如果数据未准备好,调用线程会被挂起,直到数据准备好并完成读写操作后才返回。例如,默认的recv函数会阻塞线程,直到数据到达内核缓冲区并拷贝到用户空间。
​非阻塞I/O:在数据准备阶段,如果数据未准备好,调用线程会立即返回一个错误(如EAGAIN),而不会阻塞。线程需要不断轮询检查数据是否准备好。

  1. ​同步I/O的定义:

同步I/O是指I/O操作的两个阶段(数据准备和数据读写)都需要调用线程主动参与。无论是阻塞还是非阻塞,数据从内核缓冲区拷贝到用户空间的过程都是由调用线程发起的,因此它们都属于同步I/O

  1. ​为什么阻塞与非阻塞都是同步I/O:

在数据读写阶段,无论是阻塞还是非阻塞,调用线程都需要主动发起数据拷贝操作。例如,recv函数在数据准备好后,由调用线程将数据从内核缓冲区拷贝到用户空间。这种主动参与的行为符合同步I/O的定义。

  1. ​异步I/O的区别:

异步I/O在数据读写阶段由内核完成数据拷贝,并通知调用线程。调用线程不需要主动参与数据拷贝过程,因此异步I/O不属于同步I/O。

在这里插入图片描述

2. 几个问题

1. 网络编程要解决哪几个问题?分别怎么解决?

  1. write 将要发送出去的数据写到发送缓冲区,内核解决到底怎么发过去

  2. 连接的建立,连接的断开,数据的接收,数据的发送都不知道到底怎么来到
    阻塞IO网络模型,阻塞线程等待事件发生

  3. 操作 io (accept、read、write、connect)

    1. IO 检测
    2. IO 操作
  4. reactor ->IO多路复用全部包了IO检测功能,有事件之后再回调,通过回调函数(accept,onnect,read,write)再去操作io

  5. iocp-> socket先绑定到iocp中,投递一个AcceptEx请求(或者WSARecv,WSASend,DisconnectEx),会直接完成通知,直接拿到clientFd,addr

2. reactor 与 select\poll\epoll 关系?

reacter包含检测和操作io,select\poll\epoll只有检测io

3. iocp 与上面哪个概念接近?

和reactor更接近,

4. iocp 原理是什么?

在这里插入图片描述

  1. 创建一些底层数据结构
  2. 绑定接口
  3. 发起异步请求
  4. iocp取出来任务,然后帮助进行io检测和操作,
  5. 最后放到完成队列
  6. 用户端调函数去从用户端去取

5. iocp 编程细节

#include <WinSock2.h>       // Windows Socket API
#include <MSWSock.h>        // Microsoft扩展的Socket API
#include <iostream>         // 输入输出流

#define BUFFER_SIZE 1024    // 缓冲区大小
#define THREAD_COUNT 2      // 线程数量
#define PORT 8989           // 服务器端口号

// 定义IO操作类型
enum class IO_OP_TYPE {
    IO_ACCEPT,  // 接受连接
    IO_RECV,    // 接收数据
    IO_SEND     // 发送数据
};

// 每个IO操作的重叠结构体
typedef struct OverlappedPerIO {
    OVERLAPPED overlapped;  // 重叠结构
    SOCKET socket;          // 套接字
    WSABUF wsaBuf;          // 数据缓冲区
    IO_OP_TYPE type;        // IO操作类型
    char buffer[BUFFER_SIZE]; // 数据缓冲区
} *LPOverlappedPerIO;

// 发起AcceptEx请求
void PostAcceptEx(SOCKET listenSocket, HANDLE completionPort) {
    SOCKET sock = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
    if (sock == INVALID_SOCKET) {
        return;
    }
    OverlappedPerIO* overlp = new OverlappedPerIO;
    ZeroMemory(overlp, sizeof(OverlappedPerIO)); // 初始化内存
    overlp->socket = sock;
    overlp->wsaBuf.buf = overlp->buffer;
    overlp->wsaBuf.len = BUFFER_SIZE;
    overlp->type = IO_OP_TYPE::IO_ACCEPT;

    DWORD dwByteRecv = 0;
    if (!AcceptEx(listenSocket, sock, overlp->wsaBuf.buf, 0, sizeof(SOCKADDR_IN) + 16, sizeof(SOCKADDR_IN) + 16, &dwByteRecv, (LPOVERLAPPED)overlp)) {
        if (WSAGetLastError() != WSA_IO_PENDING) { // 如果操作挂起
            std::cout << "AcceptEx failed:" << WSAGetLastError() << std::endl;
            closesocket(sock);
            delete overlp;
        }
    }
}

// 工作线程函数
DWORD WINAPI workerThread(LPVOID lpParam) {
    HANDLE completionPort = (HANDLE)lpParam;
    DWORD bytesTrans;
    ULONG_PTR comletionKey;
    LPOverlappedPerIO overlp;

    while (true) {
        // 获取完成端口状态
        BOOL result = GetQueuedCompletionStatus(completionPort, &bytesTrans, &comletionKey, (LPOVERLAPPED*)&overlp, INFINITE);
        if (!result) {
            std::cout << "GetQueuedCompletionStatus failed:" << GetLastError() << std::endl;
            closesocket(overlp->socket);
            delete overlp;
            continue;
        }

        switch (overlp->type) {
        case IO_OP_TYPE::IO_ACCEPT: // 处理接受连接
        {
            std::cout << "New connection accepted." << std::endl;
            setsockopt(overlp->socket, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char*)&comletionKey, sizeof(SOCKET));

            ZeroMemory(overlp->buffer, BUFFER_SIZE);
            overlp->type = IO_OP_TYPE::IO_RECV;
            overlp->wsaBuf.buf = overlp->buffer;
            overlp->wsaBuf.len = BUFFER_SIZE;

            DWORD dwRecv = 0, dwFlag = 0;
            if (WSARecv(overlp->socket, &overlp->wsaBuf, 1, &dwRecv, &dwFlag, &(overlp->overlapped), 0) == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING) {
                std::cout << "WSARecv failed:" << WSAGetLastError() << std::endl;
            }
        }
        break;
        case IO_OP_TYPE::IO_RECV: // 处理接收数据
        {
            std::cout << "Data received: " << overlp->buffer << std::endl;

            ZeroMemory(&overlp->overlapped, sizeof(OVERLAPPED));
            overlp->type = IO_OP_TYPE::IO_SEND;
            overlp->wsaBuf.buf = "response from server\n";
            overlp->wsaBuf.len = strlen("response from server\n");

            DWORD dwSend = 0;
            if (WSASend(overlp->socket, &overlp->wsaBuf, 1, &dwSend, 0, &(overlp->overlapped), 0) == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING) {
                std::cout << "WSASend failed:" << WSAGetLastError() << std::endl;
            }
        }
        break;
        case IO_OP_TYPE::IO_SEND: // 处理发送数据
        {
            std::cout << "Data sent." << std::endl;
            closesocket(overlp->socket);
            delete overlp;
        }
        break;
        }
    }
    return 0;
}

// 初始化服务器
int InitServer(HANDLE& completionPort, SOCKET& listenSocket) {
    WSADATA wsaData;
    if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) {
        return -1;
    }

    listenSocket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
    if (listenSocket == INVALID_SOCKET) {
        WSACleanup();
        return -1;
    }

    sockaddr_in address;
    address.sin_family = AF_INET;
    address.sin_addr.s_addr = INADDR_ANY;
    address.sin_port = htons(PORT);

    if (bind(listenSocket, (const sockaddr*)&address, sizeof(address)) == SOCKET_ERROR || listen(listenSocket, SOMAXCONN) == SOCKET_ERROR) {
        closesocket(listenSocket);
        WSACleanup();
        return -1;
    }

    completionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
    if (completionPort == NULL) {
        closesocket(listenSocket);
        WSACleanup();
        return -1;
    }

    return 0;
}

// 主函数
int main() {
    HANDLE completionPort;
    SOCKET listenSocket;

    if (InitServer(completionPort, listenSocket) != 0) {
        std::cout << "InitServer Error" << std::endl;
        return 1;
    }

    for (int i = 0; i < THREAD_COUNT; i++) { // 创建工作线程
        CreateThread(NULL, 0, workerThread, completionPort, 0, NULL);
    }

    PostAcceptEx(listenSocket, completionPort); // 发起初始AcceptEx请求

    std::cin.get(); // 等待用户输入

    closesocket(listenSocket); // 关闭监听套接字
    CloseHandle(completionPort); // 关闭完成端口
    WSACleanup(); // 清理Winsock

    return 0;
}

6. 重叠 IO

  1. 异步的:投递io请求,在另一个线程等待io操作完成。尽管 IO 操作是按顺序投递的,但是 IO 操作完成通知可以是随机无序的(在多线程等待 IO 完成通知时)
  2. 无需等待上一个io操作完成就可以调用下一个io操作,这些io操作可以堆叠在一起
  3. 服务器启动时候,通常投递多个AcceptEx,就可以并发接收多个连接
  4. 使用重叠I/O异步读取文件例子
#include <windows.h>
#include <iostream>
#include <memory>

void CALLBACK FileIOCompletionRoutine(DWORD dwErrorCode, DWORD dwNumberOfBytesTransfered, LPOVERLAPPED lpOverlapped) {
    if (dwErrorCode == 0) {
        std::cout << "Read completed successfully. Bytes read: " << dwNumberOfBytesTransfered << std::endl;
    } else {
        std::cerr << "Read failed with error code: " << dwErrorCode << std::endl;
    }
}

int main() {
    // 打开文件,指定重叠I/O标志
    HANDLE hFile = CreateFile(
        L"example.txt",               // 文件名
        GENERIC_READ,                 // 访问模式
        FILE_SHARE_READ,              // 共享模式
        NULL,                         // 安全属性
        OPEN_EXISTING,                // 打开方式
        FILE_FLAG_OVERLAPPED,         // 重叠I/O标志
        NULL                          // 模板文件句柄
    );

    if (hFile == INVALID_HANDLE_VALUE) {
        std::cerr << "Failed to open file. Error: " << GetLastError() << std::endl;
        return 1;
    }

    // 分配缓冲区
    const DWORD bufferSize = 1024;
    std::shared_ptr<BYTE> buffer(new BYTE[bufferSize], std::default_delete<BYTE[]>());

    // 初始化OVERLAPPED结构
    OVERLAPPED overlapped = {0};
    overlapped.Offset = 0;  // 从文件开头读取
    overlapped.OffsetHigh = 0;

    // 异步读取文件
    BOOL result = ReadFileEx(
        hFile,                      // 文件句柄
        buffer.get(),               // 缓冲区
        bufferSize,                 // 读取字节数
        &overlapped,                // OVERLAPPED结构
        FileIOCompletionRoutine     // 完成例程
    );

    if (!result) {
        std::cerr << "ReadFileEx failed. Error: " << GetLastError() << std::endl;
        CloseHandle(hFile);
        return 1;
    }

    // 等待异步操作完成
    SleepEx(INFINITE, TRUE);  // 无限等待,允许完成例程执行

    // 关闭文件句柄
    CloseHandle(hFile);
    return 0;
}
  1. 关联作用:传递一个值给内核,内核不修改它,就绪通知就会和就绪事件去对比,找到自己的

相关文章:

  • 【MATLAB例程】交互式多模型(IMM),模型使用:CV,CT左转、CT右转,二维平面,三个模型的IMM,滤波使用EKF。订阅专栏后可查看代码
  • MD2Card(markdown)
  • 打字时候选如何向后翻页?
  • 前端使用vue,一个项目从零开始开发大概构思
  • 【新能源汽车温度采集与控制系统设计深度解析】
  • 2.3 高阶导数
  • 玩转python: 掌握Python常用库之数据分析pandas
  • 【第二月_day7】Pandas 简介与数据结构_Pandas_ day1
  • kafka删除/创建 topic报错,如何解决
  • Go常见问题与答案笔记(上)
  • 如何给poco f4(redmi k40s)刷入twrp或orange fox
  • 解锁DeepSeek潜能:Docker+Ollama打造本地大模型部署新范式
  • 在 Ubuntu 中配置开机自启动脚本并激活 Anaconda 环境
  • 【GoLang】调用llm时提示词prompt的介绍以及使用方式
  • 玄机-第五章 linux实战-CMS01的测试报告
  • 【拒绝算法PUA】LeetCode 2255. 统计是给定字符串前缀的字符串数目
  • 在Luckfox上使用EdgeVoice语音助手
  • 欢迎来到未来:探索 Dify 开源大语言模型应用开发平台
  • Flutter项目升级到指定版本的详细步骤指南
  • 解决IDEA中maven找不到依赖项的问题
  • 五一期间全国高速日均流量6200万辆,同比增长8.1%
  • “五一”逃离城市计划:带上帐篷去大自然里充电
  • 美参议院通过新任美国驻华大使任命,外交部回应
  • 光明日报:回应辅警“转正”呼声,是一门政民互动公开课
  • 浙商银行外部监事高强无法履职:已被查,曾任建行浙江省分行行长
  • 俄宣布停火三天,外交部:希望各方继续通过对话谈判解决危机