io_uring 异步 socket 编程
文章目录
- 简单理解
- 过程代码
- sqe
- cqe
- 一个基本用法示例:
简单理解
网络IO时,数据拷贝并不是CPU在工作,所以我们可以不阻塞recv
io_uring 内部主要依赖环形队列(ring buffer,也叫循环队列)来管理 I/O 请求和完成事件:
- 提交队列(Submission Queue,SQ)
应用程序向内核提交 I/O 请求时,这些请求会被放入提交队列中。 // io_uring_get_sqe() ,e: entry - 完成队列(Completion Queue,CQ)
核处理完 I/O 请求后,会将完成事件放入完成队列中。
编程中往往是去获取 entry ,即sqe和cqe
两个队列位于用户态和内核态之间的共享内存区域
过程代码
对于提交队列,我们想三次握手和接收数据,都要提交信息给 提交队列。
首先定义 io_uring,之后都通过其交互。
struct io_uring ring;
io_uring_queue_init(QUEUE_DEPTH, &ring, 0);
最后清理释放资源:
io_uring_queue_exit(&ring);
sqe
通过 io_uring 获取 sqe:
struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
accept 的设置 io_uring_prep_accept(sqe, server_fd, (struct sockaddr *)&client_addr, &client_len, 0);
recv 的设置 io_uring_prep_recv(sqe, client_fd, info->buffer, BUFFER_SIZE, 0);
// 每次读都要设置
send 的设置 io_uring_prep_send(sqe, info->client_fd, info->buffer, bytes_read, 0);
以上 prep方法 后一般伴随 io_uring_sqe_set_data(sqe, info);
,关联对象信息。
(仅通过完成队列项中的结果信息(如 cqe->res)可能无法明确这个请求对应的具体上下文;
set后就可以通过 cqe->user_data
获取到这个关联的信息,从而知道该请求信息(自己设计结构体)。)
提交:
设置好后提交 io_uring_submit(&ring);
,才会加入队列。
这是系统调用!因为得通知内核请求就绪。
cqe
通过 cqe 获取结果:
struct io_uring_cqe *cqe;
io_uring_wait_cqe(&ring, &cqe)
cqe->res
cqe->user_data // 对应 io_uring_sqe_set_data()
处理完清理cq
io_uring_cqe_seen(&ring, cqe);
标记一个完成队列项已经被应用程序处理,允许内核回收该 CQE 所占用的资源
一个基本用法示例:
没有取 user_data 做分类,看看就好。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <unistd.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <liburing.h>
#define PORT 8888
#define QUEUE_DEPTH 256 // 在队列中可以同时存在的最大 I/O 请求数量
#define BUFFER_SIZE 2048
typedef struct {
int client_fd;
char buffer[BUFFER_SIZE];
} conn_info; // 谁,发送了什么
// 服务器通常使用一个 io_uring 实例,所以可以设置 type 字段,进而进行区分
// 通常在 prep 后通过 io_uring_sqe_set_data 关联
int main() {
int server_fd; // 文件描述符
// 创建 socket
server_fd = socket(AF_INET, SOCK_STREAM, 0); // Address Family => IPv4
if (server_fd < 0) {
perror("socket");
return 1;
}
// 设置地址复用
int opt = 1;// 启用
setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
// 套接字级别 地址复用选项【允许在同一地址和端口上重新绑定套接字,即使之前的套接字还处于 TIME_WAIT 状态 (之前用该端口的服务器正在四次握手)】
struct sockaddr_in addr; // 结构体存储网络地址信息
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = INADDR_ANY; // any任何 => 绑定所有可用网络接口(IP地址)
addr.sin_port = htons(PORT);
if (bind(server_fd, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
perror("bind");
return 1;
}
listen(server_fd, 10);
// backlog 10: 最多可以有 10 个客户端连接请求在队列中等待
// 服务器开始监听 #######################################################################
// 设置非阻塞:accept、recv等
fcntl(server_fd, F_SETFL, O_NONBLOCK);
// 初始化 io_uring
struct io_uring ring; // ↓
if(io_uring_queue_init(QUEUE_DEPTH, &ring, 0) < 0){
perror("io_uring_queue_init: ");
return -1;
}
while (1) {
// 1.提交 接收accept的请求
struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
if(!sqe){
perror("io_uring_get_sqe: ");
return 1;
}
struct sockaddr_in client_addr;
socklen_t client_len = sizeof(client_addr);
int *pclient_fd = malloc(sizeof(int)); // 用于保存 fd
if (!pclient_fd) {
perror("pclient_fd malloc: ");
return 1;
}
*pclient_fd = server_fd; // 服务器fd
io_uring_prep_accept(sqe, server_fd, (struct sockaddr *)&client_addr, &client_len, 0);
io_uring_sqe_set_data(sqe, pclient_fd);
io_uring_submit(&ring);
// 等待事件完成
struct io_uring_cqe *cqe;
if (io_uring_wait_cqe(&ring, &cqe) < 0) {
perror("io_uring_wait_cqe");
break;
}
int client_fd = cqe->res;
if (client_fd < 0) {
fprintf(stderr, "accept failed: %s\n", strerror(-client_fd));
free(pclient_fd);
io_uring_cqe_seen(&ring, cqe);
continue;
}
printf("Client connected: %d\n", client_fd);
// 2.准备读取客户端数据: 刚三次握手,接着接收数据。
conn_info *info = malloc(sizeof(conn_info));
info->client_fd = client_fd;
sqe = io_uring_get_sqe(&ring);
io_uring_prep_recv(sqe, client_fd, info->buffer, BUFFER_SIZE, 0);// 用于 recv
io_uring_sqe_set_data(sqe, info);
io_uring_submit(&ring);
// 处理读取结果
if (io_uring_wait_cqe(&ring, &cqe) < 0) {
perror("io_uring_wait_cqe");
break;
}
int bytes_read = cqe->res;
if (bytes_read <= 0) {
printf("Client disconnected: %d\n", info->client_fd);
close(info->client_fd);
free(info);
io_uring_cqe_seen(&ring, cqe);
continue;
}
printf("Received: %.*s", bytes_read, info->buffer);
// 3.发送回应数据(echo)
sqe = io_uring_get_sqe(&ring);
io_uring_prep_send(sqe, info->client_fd, info->buffer, bytes_read, 0);
io_uring_sqe_set_data(sqe, info);
io_uring_submit(&ring);
if (io_uring_wait_cqe(&ring, &cqe) < 0) {
perror("io_uring_wait_cqe");
break;
}
printf("Echo sent to client %d\n", info->client_fd);
io_uring_cqe_seen(&ring, cqe);
// 清理
close(info->client_fd);
free(info);
}
io_uring_queue_exit(&ring);
close(server_fd);
return 0;
}