io_uring的异步IO机制
io_uring的异步IO机制
- io_uring 原理
- io_uring接口应用
- 性能测试
io_uring 原理
io_uring 是 Linux 内核 5.1 版本引入的全新异步 I/O 接口,由 Jens Axboe 开发。它通过两个共享环形缓冲区(ring buffers)在内核和用户空间之间高效传递 I/O 请求和完成事件,避免了传统 AIO 的各种限制。
我们之前了解过,在网络高并发的场景下,epoll 在这方面的性能是独树一帜,通过 epoll 的工作模式,实现了 reactor 这种事件触发的机制,在 reactor 中异步的实现我们可以通过多线程,也可以通过协程去进行实现。但是本质上最后还是调用到 read/write/recv/send
这样的接口,来完成数据的收发工作。
理解read/write/recv/send本质
他们作为系统调用函数,其实本质上还是一种拷贝函数,如果底层缓冲区中有数据,就会调用这些接口,如果没有数据,就会等待数据就绪,或者设置为非阻塞的状态,不等就直接返回一个错误码,表示此时数据还未就绪。
IO 操作本质上就是 “等 + 拷贝” 的操作,单纯的去看待 read/write/recv/send
这些函数,其实他返回请求,接收数据和返回数据这些操作都是一起的。 在 epoll 的处理中我们可以看见,他可以一次性处理多个文件描述符,将这种等的操作重叠了起来,透过他的底层实现我们可以发现,他其实是将 “等 + 拷贝” 的操作实现了一种分离,被监听的 fd 与就绪的 fd 实则是两种不同的数据结构进行处理,构成了一种生产者消费者模型,也就是说 “等” 其实在 epoll 的接口中就已经完成了,而我们调用read/write/recv/send
函数的时候就不用再等了,数据已经就绪,由于这种可以对多个 fd 进行处理的机制,加上使用回调函数进行处理,使得epoll 的功能就很强大。
如何理解io_uring
对于 io_uring 来说,其实更倾向于将这种 IO 操作实现成异步的,在 io_uring 的实现当中,提供了两个队列结构,一个是提交队列(SQ)
,另一个是完成队列(CQ)
,注意,这两个队列都是环形队列结构。
提交队列的作用就是提交 IO 请求,完成队列的作用就是内核通知完成的 I/O 操作,假设当前有 100 个客户端发起读的请求,在 io_uring 的工作方式中,会将这 100 个 IO 请求先 push 到提交队列当中,然后进行处理,然后处理完成的在 push 到完成队列当中,返回结果,他也是由不同线程去完成的。这两个队列干的事两件不同的事情,从而产生了异步的效果。
由于 io_uring 的内部使用 mmap 去进行实现,这种方式是他在整个过程中也只会进行一次的数据拷贝,无异于也是对效率的一个提升,而且通过这种无锁的环形队列接口,减少了频繁进行加锁解锁的消耗,这对于高并发的场景无异于是一个巨大的提升,其实这两个流程也是一个典型的生产者消费者模型。
io_uring接口应用
io_uring 在这儿主要提供了 3 个系统调用接口:
int io_uring_setup(unsigned entries, struct io_uring_params *params);
io_uring_setup 是 Linux 内核提供的系统调用,用于初始化一个异步 I/O 上下文(io_uring 实例),参数如下:
entries
:指定提交队列(SQ)和完成队列(CQ)的初始大小(条目数)。通常为 2 的幂次方,内核可能会调整实际大小。params
:指向 io_uring_params 结构的指针,用于传递配置参数并返回队列信息。结构定义如下:
struct io_uring_params {__u32 sq_entries; // 内核实际分配的 SQ 大小__u32 cq_entries; // 内核实际分配的 CQ 大小__u32 flags; // 配置标志(如 IORING_SETUP_IOPOLL)__u32 sq_thread_cpu; // 绑定 SQ 线程的 CPU__u32 sq_thread_idle; // SQ 线程空闲超时(毫秒)__u32 features; // 内核返回的支持特性__u32 resv[4];struct io_sqring_offsets sq_off; // SQ 环的偏移信息struct io_cqring_offsets cq_off; // CQ 环的偏移信息
};
返回值:
- 成功时返回一个文件描述符(fd),代表创建的 io_uring 实例;失败时返回 -1 并设置 errno。
int io_uring_enter(unsigned int fd, unsigned int to_submit,unsigned int min_complete, unsigned int flags, sigset_t *sig);
io_uring_enter 用于提交 I/O 操作请求或等待已完成事件,参数如下:
fd
: 关联的 io_uring 实例的文件描述符。to_submit
: 准备提交的 I/O 操作数量。min_complete
: 要求内核等待至少完成的事件数(若 flags 包含 IORING_ENTER_GETEVENTS)。flags
: 控制行为的标志位(如 IORING_ENTER_GETEVENTS)。sig
: 等待时临时屏蔽的信号集(可为 NULL)。
int io_uring_register(unsigned int fd, unsigned int opcode, void *arg, unsigned int nr_args);
io_uring_register 是 Linux 内核提供的系统调用(syscall),用于为 io_uring 实例注册资源(如文件描述符、缓冲区等),以优化异步 I/O 操作的性能,参数如下:
fd
: io_uring 实例的文件描述符,由 io_uring_setup 创建。opcode
: 注册操作的类型,如 IORING_REGISTER_BUFFERS(注册缓冲区)或 IORING_REGISTER_FILES(注册文件描述符)。arg
: 指向用户空间数据的指针,具体内容取决于 opcode。nr_args
: arg 指向的数组中的条目数。
其中,opcode 的类型有如下几种:
IORING_REGISTER_BUFFERS
:注册固定缓冲区,用于减少 read/write 操作中的内核-用户空间数据拷贝。IORING_REGISTER_FILES
:注册文件描述符,避免每次 I/O 操作重复传递文件描述符。IORING_REGISTER_EVENTFD
:注册事件文件描述符(eventfd),用于异步通知 I/O 完成事件。IORING_REGISTER_PROBE
:检查内核支持的 io_uring 功能(需配合 struct io_uring_probe)。
io_uring 的库其实对这三个函数进行了封装,然后提供给我们一个使用的库。
接下来我们实际写一段代码来看一下:
#include <stdio.h>
#include <sys/socket.h>
#include <liburing.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <string.h>
#include <unistd.h>#define ENTRIES_LENGTH 1024
#define BUFFER_LENGTH 1024#define ACCEPT_EVENT 0
#define READ_EVENT 1
#define WRITE_EVENT 2struct conn_info
{int fd;int event;
};int init_server(int port)
{// 创建socket连接int sockfd = socket(AF_INET, SOCK_STREAM, 0);struct sockaddr_in serveraddr;serveraddr.sin_family = AF_INET;serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); // 0.0.0.0serveraddr.sin_port = htons(port);// 绑定套接字int ret = bind(sockfd, (struct sockaddr *)&serveraddr, sizeof(struct sockaddr));if (ret == -1){perror("bind");return -1;}// 监听套接字listen(sockfd, 10);return sockfd;
}int set_event_accept(struct io_uring *ring, int sockfd, struct sockaddr *addr, socklen_t *len, int flags)
{// 接受队列事件结构, 调用 io_uring_get_sqe 接口提交的 IO 事件struct io_uring_sqe *sqe = io_uring_get_sqe(ring);struct conn_info accept_info = {.fd = sockfd,.event = ACCEPT_EVENT};// 用于准备一个异步接受连接(accept)的请求io_uring_prep_accept(sqe, sockfd, addr, len, flags);memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));return 1;
}int set_event_recv(struct io_uring *ring, int sockfd, void *buf, size_t len, int flags)
{struct io_uring_sqe *sqe = io_uring_get_sqe(ring);struct conn_info accept_info = {.fd = sockfd,.event = READ_EVENT};// 准备一个接收数据的操作请求io_uring_prep_recv(sqe, sockfd, buf, len, flags);return 1;
}int main()
{unsigned short port = 9999;int sockfd = init_server(port);struct io_uring_params params;memset(¶ms, 0, sizeof(params));struct io_uring ring;// 先构建两个队列出来io_uring_queue_init_params(ENTRIES_LENGTH, &ring, ¶ms);
#if 0// 建立与客户端的连接struct sockaddr_in clientaddr;socklen_t len = sizeof(clientaddr);accept(sockfd, (struct sockaddr*)&clientaddr, &len);
#else// 建立与客户端的连接struct sockaddr_in clientaddr;socklen_t len = sizeof(clientaddr);set_event_accept(&ring, sockfd, (struct sockaddr *)&clientaddr, &len, 0);#endifchar buffer[BUFFER_LENGTH] = {0};while (1){// 内部实现就是 io_uring_enter,用于提交 IO 请求io_uring_submit(&ring);// 创建一个完成队列事件结构,通过 io_uring_wait_cqe// 获取完成 IO 操作的事件struct io_uring_cqe *cqe;io_uring_wait_cqe(&ring, &cqe);// 批量获取完成 IO 操作的事件struct io_uring_cqe *cqes[128];int nready = io_uring_peek_batch_cqe(&ring, cqes, 128);int i = 0;for (i = 0; i < nready; i++){// 获取到 accept 事件的入口struct io_uring_cqe *entries = cqes[i];struct conn_info result;memcpy(&result, &entries->user_data, sizeof(struct conn_info));if (result.event == ACCEPT_EVENT){printf("set_event_accept\n");}}}return 0;
}
接口介绍
其中,io_uring_prep_accept 接口作用就是准备一个异步接受连接(accept)的请求。
void io_uring_prep_accept(struct io_uring_sqe *sqe, int sockfd, struct sockaddr *addr,socklen_t *addrlen, int flags);
参数介绍:
sqe
: 指向 io_uring 提交队列条目(Submission Queue Entry)的指针。sockfd
: 监听套接字的文件描述符。addr
: 用于存储客户端地址信息的结构体指针(可选,可为 NULL)。addrlen
: 输入时为 addr 的缓冲区大小,输出时为实际地址长度。flags
: 额外的标志位(如 SOCK_NONBLOCK 或 SOCK_CLOEXEC)。
返回值处理:
- 通过 io_uring_wait_cqe 等待完成事件后,cqe->res 为返回的客户端文件描述符。
- 若返回值 < 0,表示错误(如 -EINVAL 或 -EBADF)
io_uring_sqe 结构体,用于描述一个待提交的 I/O 操作。每个 io_uring_sqe 对应一个异步 I/O 请求(如读写、网络操作等),通过填充该结构并提交到提交队列(Submission Queue, SQ)。
struct io_uring_sqe {__u8 opcode; // 操作类型(如 IORING_OP_READ, IORING_OP_WRITE)__u8 flags; // 请求标志(如 IOSQE_FIXED_FILE, IOSQE_IO_LINK)__u16 ioprio; // I/O 优先级__s32 fd; // 文件描述符__u64 off; // 文件偏移量__u64 addr; // 用户态缓冲区地址(读写操作)__u32 len; // 操作长度union {__kernel_rwf_t rw_flags; // 读写标志(如 RWF_NOWAIT)__u32 fsync_flags;__u16 poll_events;};__u64 user_data; // 用户自定义数据,用于回调识别union {__u16 buf_index; // 固定缓冲区的索引(IORING_OP_READ_FIXED)__u64 __pad2[3];};
};
关键字段说明:
opcode
:指定操作类型,常见值包括:
IORING_OP_READ/IORING_OP_WRITE:文件读写。
IORING_OP_SEND/IORING_OP_RECV:网络通信。
IORING_OP_POLL_ADD:事件监听。flags
:控制请求行为,例如:
IOSQE_FIXED_FILE:使用固定文件描述符(预先注册的文件表)。
IOSQE_IO_LINK:链接多个请求,形成依赖链。user_data
:用于在完成事件(CQE)中标识请求的唯一值。
使用方法:
- 获取空闲 SQE:通过 io_uring_get_sqe 从提交队列中获取一个空闲条目。
- 设置操作参数:填充 opcode、fd、addr、len 等字段。
- 提交请求:调用 io_uring_submit 将请求提交到内核。
运行程序,我们就会发现一个有意思的现象,此时一直在打印 set_event_accept 这条信息。
原因就在于在当前这个循环中,我们并没有将已完成的队列中特定的条目给回收掉,当循环回去以后,此时又继续通知处理该条目,就会一直打印,此时就需要用到 io_uring_cq_advance 接口:
void io_uring_cq_advance(struct io_uring *ring, unsigned nr);
io_uring_cq_advance 接口用于通知内核用户空间已处理完成队列(Completion Queue, CQ)中的特定条目,允许内核回收相关资源。
参数解析:
ring
: 指向 io_uring 实例的指针。nr
: 需要推进的完成队列条目数量,通常为已处理的条目数。
它的作用就在于:
- 推进完成队列:每次从 CQ 中取出并处理一个事件后,需调用此函数更新队列头指针,避免重复处理同一事件。
- 资源管理:内核会回收已标记为 “完成” 的条目,释放相关资源(如内存)。
添加这个接口以后就正常了,但是此时就存在一个问题,我们将这个事件标记为完成以后,后续就不会再发送 accept 请求了,所以在这儿就需要我们每一次都发送一个 accept 请求,所以这儿也是需要进行修改的,修改后代码如下:
#include <stdio.h>
#include <sys/socket.h>
#include <liburing.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <string.h>
#include <unistd.h>#define ENTRIES_LENGTH 1024
#define BUFFER_LENGTH 1024#define ACCEPT_EVENT 0
#define READ_EVENT 1
#define WRITE_EVENT 2struct conn_info
{int fd;int event;
};int init_server(int port)
{// 创建socket连接int sockfd = socket(AF_INET, SOCK_STREAM, 0);struct sockaddr_in serveraddr;serveraddr.sin_family = AF_INET;serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); // 0.0.0.0serveraddr.sin_port = htons(port);// 绑定套接字int ret = bind(sockfd, (struct sockaddr *)&serveraddr, sizeof(struct sockaddr));if (ret == -1){perror("bind");return -1;}// 监听套接字listen(sockfd, 10);return sockfd;
}int set_event_accept(struct io_uring *ring, int sockfd, struct sockaddr *addr, socklen_t *len, int flags)
{// 接受队列事件结构, 调用 io_uring_get_sqe 接口提交的 IO 事件struct io_uring_sqe *sqe = io_uring_get_sqe(ring);struct conn_info accept_info = {.fd = sockfd,.event = ACCEPT_EVENT};// 用于准备一个异步接受连接(accept)的请求io_uring_prep_accept(sqe, sockfd, addr, len, flags);memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));return 1;
}int main()
{unsigned short port = 9999;int sockfd = init_server(port);struct io_uring_params params;memset(¶ms, 0, sizeof(params));struct io_uring ring;// 先构建两个队列出来io_uring_queue_init_params(ENTRIES_LENGTH, &ring, ¶ms);
#if 0// 建立与客户端的连接struct sockaddr_in clientaddr;socklen_t len = sizeof(clientaddr);accept(sockfd, (struct sockaddr*)&clientaddr, &len);
#else// 建立与客户端的连接struct sockaddr_in clientaddr;socklen_t len = sizeof(clientaddr);set_event_accept(&ring, sockfd, (struct sockaddr *)&clientaddr, &len, 0);#endifchar buffer[BUFFER_LENGTH] = {0};while (1){// 内部实现就是 io_uring_enter,用于提交 IO 请求io_uring_submit(&ring);// 创建一个完成队列事件结构,通过 io_uring_wait_cqe// 获取完成 IO 操作的事件struct io_uring_cqe *cqe;io_uring_wait_cqe(&ring, &cqe);// 批量获取完成 IO 操作的事件struct io_uring_cqe *cqes[128];int nready = io_uring_peek_batch_cqe(&ring, cqes, 128);int i = 0;for (i = 0; i < nready; i++){// 获取到 accept 事件的入口struct io_uring_cqe *entries = cqes[i];struct conn_info result;memcpy(&result, &entries->user_data, sizeof(struct conn_info));if (result.event == ACCEPT_EVENT){// 保证每一次都会有 accept 请求set_event_accept(&ring, sockfd, (struct sockaddr *)&clientaddr, &len, 0);printf("set_event_accept\n");}}// 避免重复处理同一事件io_uring_cq_advance(&ring, nready);}return 0;
}
运行代码,正常连接断开,再次进行连接也不会存在问题:
接下来就需要接收到这个信息,我们在这儿使用的也是 io_uring 库里面提供的函数 io_uring_prep_recv ,io_uring_prep_recv 用于准备一个接收数据的操作请求:
void io_uring_prep_recv(struct io_uring_sqe *sqe, int sockfd, void *buf, size_t len, int flags);
参数如下:
sqe
: 指向 io_uring_sqe 结构的指针,表示提交队列条目(Submission Queue Entry)。sockfd
: 文件描述符,通常是套接字。buf
: 缓冲区指针,用于存储接收到的数据。len
: 缓冲区的长度。flags
: 接收操作的标志,与 recv(2) 系统调用中的 flags 参数相同。
代码改写如下:
#include <stdio.h>
#include <sys/socket.h>
#include <liburing.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <string.h>
#include <unistd.h>#define ENTRIES_LENGTH 1024
#define BUFFER_LENGTH 1024#define ACCEPT_EVENT 0
#define READ_EVENT 1
#define WRITE_EVENT 2struct conn_info
{int fd;int event;
};int init_server(int port)
{// 创建socket连接int sockfd = socket(AF_INET, SOCK_STREAM, 0);struct sockaddr_in serveraddr;serveraddr.sin_family = AF_INET;serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); // 0.0.0.0serveraddr.sin_port = htons(port);// 绑定套接字int ret = bind(sockfd, (struct sockaddr *)&serveraddr, sizeof(struct sockaddr));if (ret == -1){perror("bind");return -1;}// 监听套接字listen(sockfd, 10);return sockfd;
}int set_event_accept(struct io_uring *ring, int sockfd, struct sockaddr *addr, socklen_t *len, int flags)
{// 接受队列事件结构, 调用 io_uring_get_sqe 接口提交的 IO 事件struct io_uring_sqe *sqe = io_uring_get_sqe(ring);struct conn_info accept_info = {.fd = sockfd,.event = ACCEPT_EVENT};// 用于准备一个异步接受连接(accept)的请求io_uring_prep_accept(sqe, sockfd, addr, len, flags);memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));return 1;
}int set_event_recv(struct io_uring *ring, int sockfd, void *buf, size_t len, int flags)
{struct io_uring_sqe *sqe = io_uring_get_sqe(ring);struct conn_info accept_info = {.fd = sockfd,.event = READ_EVENT};// 准备一个接收数据的操作请求io_uring_prep_recv(sqe, sockfd, buf, len, flags);memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));return 1;
}int main()
{unsigned short port = 9999;int sockfd = init_server(port);struct io_uring_params params;memset(¶ms, 0, sizeof(params));struct io_uring ring;// 先构建两个队列出来io_uring_queue_init_params(ENTRIES_LENGTH, &ring, ¶ms);
#if 0// 建立与客户端的连接struct sockaddr_in clientaddr;socklen_t len = sizeof(clientaddr);accept(sockfd, (struct sockaddr*)&clientaddr, &len);
#else// 建立与客户端的连接struct sockaddr_in clientaddr;socklen_t len = sizeof(clientaddr);set_event_accept(&ring, sockfd, (struct sockaddr *)&clientaddr, &len, 0);#endifchar buffer[BUFFER_LENGTH] = {0};while (1){// 内部实现就是 io_uring_enter,用于提交 IO 请求io_uring_submit(&ring);// 创建一个完成队列事件结构,通过 io_uring_wait_cqe// 获取完成 IO 操作的事件struct io_uring_cqe *cqe;io_uring_wait_cqe(&ring, &cqe);// 批量获取完成 IO 操作的事件struct io_uring_cqe *cqes[128];int nready = io_uring_peek_batch_cqe(&ring, cqes, 128);int i = 0;for (i = 0; i < nready; i++){// 获取到 accept 事件的入口struct io_uring_cqe *entries = cqes[i];struct conn_info result;memcpy(&result, &entries->user_data, sizeof(struct conn_info));if (result.event == ACCEPT_EVENT){// 保证每一次都会有 accept 请求set_event_accept(&ring, sockfd, (struct sockaddr *)&clientaddr, &len, 0);// printf("set_event_accept\n");int connfd = entries->res;set_event_recv(&ring, connfd, buffer, BUFFER_LENGTH, 0);}else if (result.event == READ_EVENT){int ret = entries->res;printf("set_event_recv ret: %d, %s\n", ret, buffer);}}// 避免重复处理同一事件io_uring_cq_advance(&ring, nready);}return 0;
}
运行程序,此时就可以看见我们的服务端是可以正常的接收到消息的,但是此时只能接受一次,后续客户端继续发送我们又接受不到了,而且我们也不支持回发消息:
我们需要解决上面的问题就需要调用回发数据的接口,回发数据以后,更新状态,当前又需要接收到客户端所发的数据,就像前面 accept 的时候一样,这儿我们每一次都是需要发起一个 recv 的请求的,否则就会被标记为已完成的事件。
完整代码如下:
#include <stdio.h>
#include <sys/socket.h>
#include <liburing.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <string.h>
#include <unistd.h>#define ENTRIES_LENGTH 1024
#define BUFFER_LENGTH 1024#define ACCEPT_EVENT 0
#define READ_EVENT 1
#define WRITE_EVENT 2struct conn_info
{int fd;int event;
};int init_server(unsigned short port)
{// 创建socket连接int sockfd = socket(AF_INET, SOCK_STREAM, 0);struct sockaddr_in serveraddr;serveraddr.sin_family = AF_INET;serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); // 0.0.0.0serveraddr.sin_port = htons(port);// 绑定套接字int ret = bind(sockfd, (struct sockaddr *)&serveraddr, sizeof(struct sockaddr));if (ret == -1){perror("bind");return -1;}// 监听套接字listen(sockfd, 10);return sockfd;
}int set_event_accept(struct io_uring *ring, int sockfd, struct sockaddr *addr, socklen_t *len, int flags)
{// 接受队列事件结构, 调用 io_uring_get_sqe 接口提交的 IO 事件struct io_uring_sqe *sqe = io_uring_get_sqe(ring);struct conn_info accept_info = {.fd = sockfd,.event = ACCEPT_EVENT,};// 用于准备一个异步接受连接(accept)的请求io_uring_prep_accept(sqe, sockfd, addr, len, flags);memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));return 1;
}int set_event_recv(struct io_uring *ring, int sockfd, void *buf, size_t len, int flags)
{struct io_uring_sqe *sqe = io_uring_get_sqe(ring);struct conn_info accept_info = {.fd = sockfd,.event = READ_EVENT,};// 准备一个接收数据的操作请求io_uring_prep_recv(sqe, sockfd, buf, len, flags);memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));return 1;
}int set_event_send(struct io_uring *ring, int sockfd, const void *buf, size_t len, int flags)
{struct io_uring_sqe *sqe = io_uring_get_sqe(ring);struct conn_info accept_info = {.fd = sockfd,.event = WRITE_EVENT,};io_uring_prep_send(sqe, sockfd, buf, len, flags);memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
}int main()
{unsigned short port = 9999;int sockfd = init_server(port);struct io_uring_params params;memset(¶ms, 0, sizeof(params));struct io_uring ring;// 先构建两个队列出来io_uring_queue_init_params(ENTRIES_LENGTH, &ring, ¶ms);
#if 0// 建立与客户端的连接struct sockaddr_in clientaddr;socklen_t len = sizeof(clientaddr);accept(sockfd, (struct sockaddr*)&clientaddr, &len);
#else// 建立与客户端的连接struct sockaddr_in clientaddr;socklen_t len = sizeof(clientaddr);set_event_accept(&ring, sockfd, (struct sockaddr *)&clientaddr, &len, 0);#endifchar buffer[BUFFER_LENGTH] = {0};while (1){// 内部实现就是 io_uring_enter,用于提交 IO 请求io_uring_submit(&ring);// 创建一个完成队列事件结构,通过 io_uring_wait_cqe// 获取完成 IO 操作的事件// struct io_uring_cqe *cqe;// io_uring_wait_cqe(&ring, &cqe);// 批量获取完成 IO 操作的事件struct io_uring_cqe *cqes[128];int nready = io_uring_peek_batch_cqe(&ring, cqes, 128);int i = 0;for (i = 0; i < nready; i++){// 获取到已完成 IO 事件的入口struct io_uring_cqe *entries = cqes[i];struct conn_info result;memcpy(&result, &entries->user_data, sizeof(struct conn_info));if (result.event == ACCEPT_EVENT){// 保证每一次都会有 accept 请求set_event_accept(&ring, sockfd, (struct sockaddr *)&clientaddr, &len, 0);printf("set_event_accept\n");int connfd = entries->res;printf("connfd: %d\n", connfd);set_event_recv(&ring, connfd, buffer, BUFFER_LENGTH, 0);}else if (result.event == READ_EVENT){int ret = entries->res;printf("set_event_recv ret: %d, %s\n", ret, buffer);if (ret == 0){close(result.fd);}else if (ret > 0){set_event_send(&ring, result.fd, buffer, BUFFER_LENGTH, 0);}}else if (result.event == WRITE_EVENT){int ret = entries->res;printf("set_event_send ret: %d, %s\n", ret, buffer);set_event_recv(&ring, result.fd, buffer, BUFFER_LENGTH, 0);}}// 避免重复处理同一事件io_uring_cq_advance(&ring, nready);}return 0;
}
运行代码,创建 3 个客户端,此时每个客户端都可以连接上,对于每次发送的消息,客户端也可以接收到:
性能测试
接下来我们编写一段客户端代码,对 io_uring 的性能进行一下测试
#include <stdio.h>
#include <string.h>
#include <sys/socket.h>
#include <stdlib.h>
#include <unistd.h>
#include <getopt.h>#include <sys/time.h>
#include <pthread.h>
#include <arpa/inet.h>#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)
#define TEST_MESSAGE "ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890abcdefghijklmnopqrstuvwxyz\r\n"
#define RBUFFER_LENGTH 2048
#define WBUFFER_LENGTH 2048typedef struct test_context_s {char serverip[16];int port;int threadnum;int connection;int requestion;#if 1int failed;
#endif} test_context_t;int connect_tcpserver(const char *ip, unsigned short port) {int connfd = socket(AF_INET, SOCK_STREAM, 0);struct sockaddr_in tcpserver_addr;memset(&tcpserver_addr, 0, sizeof(struct sockaddr_in));tcpserver_addr.sin_family = AF_INET;tcpserver_addr.sin_addr.s_addr = inet_addr(ip);tcpserver_addr.sin_port = htons(port);int ret = connect(connfd, (struct sockaddr*)&tcpserver_addr, sizeof(struct sockaddr_in));if (ret) {perror("connect");return -1;}return connfd;
}int send_recv_tcppkt(int fd) {char wbuffer[WBUFFER_LENGTH] = {0};int i = 0;for (i = 0;i < 16;i ++) {strcpy(wbuffer + i * strlen(TEST_MESSAGE), TEST_MESSAGE);}int res = send(fd, wbuffer, strlen(wbuffer), 0);if (res < 0) {exit(1);}char rbuffer[RBUFFER_LENGTH] = {0};res = recv(fd, rbuffer, RBUFFER_LENGTH, 0);if (res <= 0) {exit(1);}if (strcmp(rbuffer, wbuffer) != 0) {printf("failed: '%s' != '%s'\n", rbuffer, wbuffer);return -1;}return 0;
}static void *test_qps_entry(void *arg) {test_context_t *pctx = (test_context_t*)arg;int connfd = connect_tcpserver(pctx->serverip, pctx->port);if (connfd < 0) {printf("connect_tcpserver failed\n");return NULL;}int count = pctx->requestion / pctx->threadnum;int i = 0;int res;while (i++ < count) {res = send_recv_tcppkt(connfd);if (res != 0) {printf("send_recv_tcppkt failed\n");pctx->failed ++; // continue;}}return NULL;
}// ./test_qps_tcpclient -s 127.0.0.1 -p 2048 -t 50 -c 100 -n 10000
int main(int argc, char *argv[]) {int ret = 0;test_context_t ctx = {0};int opt;while ((opt = getopt(argc, argv, "s:p:t:c:n:?")) != -1) {switch (opt) {case 's':printf("-s: %s\n", optarg);strcpy(ctx.serverip, optarg);break;case 'p':printf("-p: %s\n", optarg);ctx.port = atoi(optarg);break;case 't':printf("-t: %s\n", optarg);ctx.threadnum = atoi(optarg);break;case 'c':printf("-c: %s\n", optarg);ctx.connection = atoi(optarg);break;case 'n':printf("-n: %s\n", optarg);ctx.requestion = atoi(optarg);break;default:return -1;}}pthread_t *ptid = malloc(ctx.threadnum * sizeof(pthread_t));int i = 0;struct timeval tv_begin;gettimeofday(&tv_begin, NULL);for (i = 0;i < ctx.threadnum;i ++) {pthread_create(&ptid[i], NULL, test_qps_entry, &ctx);}for (i = 0;i < ctx.threadnum;i ++) {pthread_join(ptid[i], NULL);}struct timeval tv_end;gettimeofday(&tv_end, NULL);int time_used = TIME_SUB_MS(tv_end, tv_begin);printf("success: %d, failed: %d, time_used: %d, qps: %d\n", ctx.requestion-ctx.failed, ctx.failed, time_used, ctx.requestion * 1000 / time_used);free(ptid);return ret;
}
简单介绍一下客户端的代码,就是将一段数据写入到缓冲区当中,然后发送给服务端,我们在这儿的逻辑就是通过启动多个客户端发送请求,然后对应的服务端进行处理,看其处理时间,在这儿跟之前的 epoll 进行对比,查看两者之间的性能差距。
之前 epoll 的代码如下:
#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <errno.h>
#include <string.h>
#include <unistd.h>
#include <sys/time.h>#define BUFFER_LENGTH 1024
#define CONNECTION_SIZE 1024 * 1024
#define MAX_PORTS 1
#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)typedef int (*CALLBACK)(int fd);int accept_cb(int fd);
int recv_cb(int fd);
int send_cb(int fd);
struct timeval begin;int epfd = 3;struct conn
{// 负责IO的文件描述符int fd;// 接收缓冲区的bufferchar rbuffer[BUFFER_LENGTH];int rlength;// 发送缓冲区的bufferchar wbuffer[BUFFER_LENGTH];int wlength;// 三个对应的回调函数CALLBACK send_callback;union{CALLBACK accept_callback;CALLBACK recv_callback;} r_action;
};struct conn con_list[CONNECTION_SIZE] = {0};void send_event(int fd, int event, int flag)
{if (flag){struct epoll_event ev;ev.data.fd = fd;ev.events = event;epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);}else{struct epoll_event ev;ev.data.fd = fd;ev.events = event;epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);}
}int event_register(int fd, int event)
{if (fd < 0){return -1;}con_list[fd].fd = fd;con_list[fd].r_action.recv_callback = recv_cb;con_list[fd].send_callback = send_cb;memset(con_list[fd].rbuffer, 0, BUFFER_LENGTH);con_list[fd].rlength = 0;memset(con_list[fd].wbuffer, 0, BUFFER_LENGTH);con_list[fd].wlength = 0;send_event(fd, event, 1);
}int accept_cb(int fd)
{struct sockaddr_in clientaddr;socklen_t len = sizeof(clientaddr);int clientfd = accept(fd, (struct sockaddr *)&clientaddr, &len);if (clientfd < 0){printf("accept failed !!!\n");}else{// printf("accept finished: %d\n", clientfd);}event_register(clientfd, EPOLLIN);if ((clientfd % 1000) == 0){struct timeval current;gettimeofday(¤t, NULL);int time_used = TIME_SUB_MS(current, begin);memcpy(&begin, ¤t, sizeof(struct timeval));printf("accept finshed: %d, time_used: %d\n", clientfd, time_used);}return 0;
}int recv_cb(int fd)
{// memset(con_list[fd].rbuffer, 0, BUFFER_LENGTH);int count = recv(fd, con_list[fd].rbuffer, BUFFER_LENGTH, 0);if (count == 0){printf("client disconnect: %d\n", fd);epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);close(fd);return 0;}else if (count < 0){printf("count: %d, errno: %d, %s\n", count, errno, strerror(errno));epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);close(fd);return 0;}con_list[fd].rlength = count;// printf("recv succ: %s\n", con_list[fd].rbuffer);#if 1con_list[fd].wlength = con_list[fd].rlength;memcpy(con_list[fd].wbuffer, con_list[fd].rbuffer, con_list[fd].rlength);
#endifsend_event(fd, EPOLLOUT, 0);return count;
}int send_cb(int fd)
{int count = send(fd, con_list[fd].wbuffer, BUFFER_LENGTH, 0);send_event(fd, EPOLLIN, 0);return count;
}int init_server(unsigned short port)
{// 创建套接字int socketfd = socket(AF_INET, SOCK_STREAM, 0);printf("socketfd: %d\n", socketfd);struct sockaddr_in serveraddr;serveraddr.sin_family = AF_INET;serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); // 0.0.0.0serveraddr.sin_port = htons(port); // 0 ~ 1023// 绑定套接字int ret = bind(socketfd, (struct sockaddr *)&serveraddr, sizeof(struct sockaddr));if (ret == -1){printf("bind failed: %s\n", strerror(errno));}// 监听套接字listen(socketfd, 10);return socketfd;
}int main()
{unsigned short port = 2000;int epfd = epoll_create(1);printf("epfd: %d\n", epfd);int i = 0;for (i = 0; i < MAX_PORTS; i++){int sockfd = init_server(port + i);// printf("socket fd: %d\n", sockfd);con_list[sockfd].fd = sockfd;con_list[sockfd].r_action.recv_callback = accept_cb;send_event(sockfd, EPOLLIN, 1);}gettimeofday(&begin, NULL);while (1){struct epoll_event events[1024] = {0};// 将就绪事件放入到就绪队里当中int nready = epoll_wait(epfd, events, 1024, -1);for (int i = 0; i < nready; i++){int connfd = events[i].data.fd;#if 0if((events[i].events & EPOLLIN)){con_list[i].r_action.recv_callback(connfd);}else if ((events[i].events & EPOLLOUT)){con_list[i].send_callback(connfd);}
#elseif ((events[i].events & EPOLLIN)){con_list[connfd].r_action.recv_callback(connfd);}if ((events[i].events & EPOLLOUT)){con_list[connfd].send_callback(connfd);}
#endif}}return 0;
}
测试的两者均创建 100 个线程,发起一百万个请求,查看其处理时间,首先来看 io_uring 的测试结果,花费时间为 16 ms左右:
接下来再来看 epoll 的处理性能:
两者的差距在 2ms 左右,其实对比下来 io_uring 还是有一个提升的,对于更多的连接的时候,io_uring 的效率还是会优于 epoll 的。