2.5.3 windows编程iocp
文章目录
- 2.5.3 windows编程iocp
- 1. iocp
- 1. reactor与iocp
- 2. 阻塞与非阻塞都是同步io:在io检测过程中,如果 io 未就绪,阻塞是阻塞线程等待就绪非阻塞不阻塞线程等待,立刻返回
- 2. 几个问题
- 1. 网络编程要解决哪几个问题?分别怎么解决?
- 2. reactor 与 select\poll\epoll 关系?
- 3. iocp 与上面哪个概念接近?
- 4. iocp 原理是什么?
- 5. iocp 编程细节
- 6. 重叠 IO
2.5.3 windows编程iocp
1. iocp
1. reactor与iocp
reactor 同步 IO,异步事件
函数返回,说明io已经操作了
iocp 异步 IO,异步事件,proactor
只是投递一个事件,AcceptEx、WSARecv、WSASend、ConnectEx, 在这个接口中 io 未操作
2. 阻塞与非阻塞都是同步io:在io检测过程中,如果 io 未就绪,阻塞是阻塞线程等待就绪非阻塞不阻塞线程等待,立刻返回
阻塞与非阻塞是描述I/O操作在数据准备阶段的行为,而同步与异步是描述I/O操作在数据读写阶段的行为。
阻塞与非阻塞都属于同步I/O,原因如下:
- 阻塞与非阻塞的区别:
阻塞I/O:在数据准备阶段,如果数据未准备好,调用线程会被挂起,直到数据准备好并完成读写操作后才返回。例如,默认的recv函数会阻塞线程,直到数据到达内核缓冲区并拷贝到用户空间。
非阻塞I/O:在数据准备阶段,如果数据未准备好,调用线程会立即返回一个错误(如EAGAIN),而不会阻塞。线程需要不断轮询检查数据是否准备好。
- 同步I/O的定义:
同步I/O是指I/O操作的两个阶段(数据准备和数据读写)都需要调用线程主动参与。无论是阻塞还是非阻塞,数据从内核缓冲区拷贝到用户空间的过程都是由调用线程发起的,因此它们都属于同步I/O
- 为什么阻塞与非阻塞都是同步I/O:
在数据读写阶段,无论是阻塞还是非阻塞,调用线程都需要主动发起数据拷贝操作。例如,recv函数在数据准备好后,由调用线程将数据从内核缓冲区拷贝到用户空间。这种主动参与的行为符合同步I/O的定义。
- 异步I/O的区别:
异步I/O在数据读写阶段由内核完成数据拷贝,并通知调用线程。调用线程不需要主动参与数据拷贝过程,因此异步I/O不属于同步I/O。
2. 几个问题
1. 网络编程要解决哪几个问题?分别怎么解决?
-
write 将要发送出去的数据写到发送缓冲区,内核解决到底怎么发过去
-
连接的建立,连接的断开,数据的接收,数据的发送都不知道到底怎么来到
阻塞IO网络模型,阻塞线程等待事件发生 -
操作 io (accept、read、write、connect)
- IO 检测
- IO 操作
-
reactor ->IO多路复用全部包了IO检测功能,有事件之后再回调,通过回调函数(accept,onnect,read,write)再去操作io
-
iocp-> socket先绑定到iocp中,投递一个AcceptEx请求(或者WSARecv,WSASend,DisconnectEx),会直接完成通知,直接拿到clientFd,addr
2. reactor 与 select\poll\epoll 关系?
reacter包含检测和操作io,select\poll\epoll只有检测io
3. iocp 与上面哪个概念接近?
和reactor更接近,
4. iocp 原理是什么?
- 创建一些底层数据结构
- 绑定接口
- 发起异步请求
- iocp取出来任务,然后帮助进行io检测和操作,
- 最后放到完成队列
- 用户端调函数去从用户端去取
5. iocp 编程细节
#include <WinSock2.h> // Windows Socket API
#include <MSWSock.h> // Microsoft扩展的Socket API
#include <iostream> // 输入输出流
#define BUFFER_SIZE 1024 // 缓冲区大小
#define THREAD_COUNT 2 // 线程数量
#define PORT 8989 // 服务器端口号
// 定义IO操作类型
enum class IO_OP_TYPE {
IO_ACCEPT, // 接受连接
IO_RECV, // 接收数据
IO_SEND // 发送数据
};
// 每个IO操作的重叠结构体
typedef struct OverlappedPerIO {
OVERLAPPED overlapped; // 重叠结构
SOCKET socket; // 套接字
WSABUF wsaBuf; // 数据缓冲区
IO_OP_TYPE type; // IO操作类型
char buffer[BUFFER_SIZE]; // 数据缓冲区
} *LPOverlappedPerIO;
// 发起AcceptEx请求
void PostAcceptEx(SOCKET listenSocket, HANDLE completionPort) {
SOCKET sock = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
if (sock == INVALID_SOCKET) {
return;
}
OverlappedPerIO* overlp = new OverlappedPerIO;
ZeroMemory(overlp, sizeof(OverlappedPerIO)); // 初始化内存
overlp->socket = sock;
overlp->wsaBuf.buf = overlp->buffer;
overlp->wsaBuf.len = BUFFER_SIZE;
overlp->type = IO_OP_TYPE::IO_ACCEPT;
DWORD dwByteRecv = 0;
if (!AcceptEx(listenSocket, sock, overlp->wsaBuf.buf, 0, sizeof(SOCKADDR_IN) + 16, sizeof(SOCKADDR_IN) + 16, &dwByteRecv, (LPOVERLAPPED)overlp)) {
if (WSAGetLastError() != WSA_IO_PENDING) { // 如果操作挂起
std::cout << "AcceptEx failed:" << WSAGetLastError() << std::endl;
closesocket(sock);
delete overlp;
}
}
}
// 工作线程函数
DWORD WINAPI workerThread(LPVOID lpParam) {
HANDLE completionPort = (HANDLE)lpParam;
DWORD bytesTrans;
ULONG_PTR comletionKey;
LPOverlappedPerIO overlp;
while (true) {
// 获取完成端口状态
BOOL result = GetQueuedCompletionStatus(completionPort, &bytesTrans, &comletionKey, (LPOVERLAPPED*)&overlp, INFINITE);
if (!result) {
std::cout << "GetQueuedCompletionStatus failed:" << GetLastError() << std::endl;
closesocket(overlp->socket);
delete overlp;
continue;
}
switch (overlp->type) {
case IO_OP_TYPE::IO_ACCEPT: // 处理接受连接
{
std::cout << "New connection accepted." << std::endl;
setsockopt(overlp->socket, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char*)&comletionKey, sizeof(SOCKET));
ZeroMemory(overlp->buffer, BUFFER_SIZE);
overlp->type = IO_OP_TYPE::IO_RECV;
overlp->wsaBuf.buf = overlp->buffer;
overlp->wsaBuf.len = BUFFER_SIZE;
DWORD dwRecv = 0, dwFlag = 0;
if (WSARecv(overlp->socket, &overlp->wsaBuf, 1, &dwRecv, &dwFlag, &(overlp->overlapped), 0) == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING) {
std::cout << "WSARecv failed:" << WSAGetLastError() << std::endl;
}
}
break;
case IO_OP_TYPE::IO_RECV: // 处理接收数据
{
std::cout << "Data received: " << overlp->buffer << std::endl;
ZeroMemory(&overlp->overlapped, sizeof(OVERLAPPED));
overlp->type = IO_OP_TYPE::IO_SEND;
overlp->wsaBuf.buf = "response from server\n";
overlp->wsaBuf.len = strlen("response from server\n");
DWORD dwSend = 0;
if (WSASend(overlp->socket, &overlp->wsaBuf, 1, &dwSend, 0, &(overlp->overlapped), 0) == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING) {
std::cout << "WSASend failed:" << WSAGetLastError() << std::endl;
}
}
break;
case IO_OP_TYPE::IO_SEND: // 处理发送数据
{
std::cout << "Data sent." << std::endl;
closesocket(overlp->socket);
delete overlp;
}
break;
}
}
return 0;
}
// 初始化服务器
int InitServer(HANDLE& completionPort, SOCKET& listenSocket) {
WSADATA wsaData;
if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) {
return -1;
}
listenSocket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
if (listenSocket == INVALID_SOCKET) {
WSACleanup();
return -1;
}
sockaddr_in address;
address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
address.sin_port = htons(PORT);
if (bind(listenSocket, (const sockaddr*)&address, sizeof(address)) == SOCKET_ERROR || listen(listenSocket, SOMAXCONN) == SOCKET_ERROR) {
closesocket(listenSocket);
WSACleanup();
return -1;
}
completionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
if (completionPort == NULL) {
closesocket(listenSocket);
WSACleanup();
return -1;
}
return 0;
}
// 主函数
int main() {
HANDLE completionPort;
SOCKET listenSocket;
if (InitServer(completionPort, listenSocket) != 0) {
std::cout << "InitServer Error" << std::endl;
return 1;
}
for (int i = 0; i < THREAD_COUNT; i++) { // 创建工作线程
CreateThread(NULL, 0, workerThread, completionPort, 0, NULL);
}
PostAcceptEx(listenSocket, completionPort); // 发起初始AcceptEx请求
std::cin.get(); // 等待用户输入
closesocket(listenSocket); // 关闭监听套接字
CloseHandle(completionPort); // 关闭完成端口
WSACleanup(); // 清理Winsock
return 0;
}
6. 重叠 IO
- 异步的:投递io请求,在另一个线程等待io操作完成。尽管 IO 操作是按顺序投递的,但是 IO 操作完成通知可以是随机无序的(在多线程等待 IO 完成通知时)
- 无需等待上一个io操作完成就可以调用下一个io操作,这些io操作可以堆叠在一起
- 服务器启动时候,通常投递多个AcceptEx,就可以并发接收多个连接
- 使用重叠I/O异步读取文件例子
#include <windows.h>
#include <iostream>
#include <memory>
void CALLBACK FileIOCompletionRoutine(DWORD dwErrorCode, DWORD dwNumberOfBytesTransfered, LPOVERLAPPED lpOverlapped) {
if (dwErrorCode == 0) {
std::cout << "Read completed successfully. Bytes read: " << dwNumberOfBytesTransfered << std::endl;
} else {
std::cerr << "Read failed with error code: " << dwErrorCode << std::endl;
}
}
int main() {
// 打开文件,指定重叠I/O标志
HANDLE hFile = CreateFile(
L"example.txt", // 文件名
GENERIC_READ, // 访问模式
FILE_SHARE_READ, // 共享模式
NULL, // 安全属性
OPEN_EXISTING, // 打开方式
FILE_FLAG_OVERLAPPED, // 重叠I/O标志
NULL // 模板文件句柄
);
if (hFile == INVALID_HANDLE_VALUE) {
std::cerr << "Failed to open file. Error: " << GetLastError() << std::endl;
return 1;
}
// 分配缓冲区
const DWORD bufferSize = 1024;
std::shared_ptr<BYTE> buffer(new BYTE[bufferSize], std::default_delete<BYTE[]>());
// 初始化OVERLAPPED结构
OVERLAPPED overlapped = {0};
overlapped.Offset = 0; // 从文件开头读取
overlapped.OffsetHigh = 0;
// 异步读取文件
BOOL result = ReadFileEx(
hFile, // 文件句柄
buffer.get(), // 缓冲区
bufferSize, // 读取字节数
&overlapped, // OVERLAPPED结构
FileIOCompletionRoutine // 完成例程
);
if (!result) {
std::cerr << "ReadFileEx failed. Error: " << GetLastError() << std::endl;
CloseHandle(hFile);
return 1;
}
// 等待异步操作完成
SleepEx(INFINITE, TRUE); // 无限等待,允许完成例程执行
// 关闭文件句柄
CloseHandle(hFile);
return 0;
}
- 关联作用:传递一个值给内核,内核不修改它,就绪通知就会和就绪事件去对比,找到自己的