io_uring系统调用及示例
io_uring 库接口详解
基础接口
io_uring_queue_init(3)
NAMEio_uring_queue_init - 初始化 io_uring 实例SYNOPSIS#include <liburing.h>int io_uring_queue_init(unsigned entries, struct io_uring *ring, unsigned flags);DESCRIPTION初始化一个 io_uring 实例。entries 指定环形缓冲区的大小(必须是2的幂)。flags 可以是 IORING_SETUP_* 常量的组合。RETURN VALUE成功返回0,失败返回负的错误码。SEE ALSOio_uring_queue_exit(3)
io_uring_queue_exit(3)
NAMEio_uring_queue_exit - 释放 io_uring 实例SYNOPSIS#include <liburing.h>void io_uring_queue_exit(struct io_uring *ring);DESCRIPTION释放之前初始化的 io_uring 实例,关闭相关文件描述符。SEE ALSOio_uring_queue_init(3)
io_uring_get_sqe(3)
NAMEio_uring_get_sqe - 获取提交队列项SYNOPSIS#include <liburing.h>struct io_uring_sqe *io_uring_get_sqe(struct io_uring *ring);DESCRIPTION从提交队列中获取一个可用的 SQE(提交队列项),用于构建 I/O 操作。RETURN VALUE成功返回 SQE 指针,如果队列满则返回 NULL。SEE ALSOio_uring_submit(3)
io_uring_submit(3)
NAMEio_uring_submit - 提交 I/O 操作到内核SYNOPSIS#include <liburing.h>int io_uring_submit(struct io_uring *ring);DESCRIPTION将准备好的 SQE 提交到内核执行。返回提交的操作数量。RETURN VALUE成功返回提交的操作数,失败返回负的错误码。SEE ALSOio_uring_wait_cqe(3)
io_uring_wait_cqe(3)
NAMEio_uring_wait_cqe - 等待完成队列项SYNOPSIS#include <liburing.h>int io_uring_wait_cqe(struct io_uring *ring, struct io_uring_cqe **cqe_ptr);DESCRIPTION等待至少一个 I/O 操作完成,并返回对应的 CQE(完成队列项)。RETURN VALUE成功返回0,失败返回负的错误码。SEE ALSOio_uring_peek_cqe(3), io_uring_cqe_seen(3)
io_uring_peek_cqe(3)
NAMEio_uring_peek_cqe - 非阻塞检查完成队列项SYNOPSIS#include <liburing.h>int io_uring_peek_cqe(struct io_uring *ring, struct io_uring_cqe **cqe_ptr);DESCRIPTION非阻塞地检查是否有完成的 I/O 操作,如果有则返回对应的 CQE。RETURN VALUE成功返回0,没有完成项返回 -EAGAIN,其他错误返回负的错误码。SEE ALSOio_uring_wait_cqe(3)
io_uring_cqe_seen(3)
NAMEio_uring_cqe_seen - 标记完成队列项已处理SYNOPSIS#include <liburing.h>void io_uring_cqe_seen(struct io_uring *ring, struct io_uring_cqe *cqe);DESCRIPTION标记指定的 CQE 已被处理,从完成队列中移除。SEE ALSOio_uring_wait_cqe(3)
I/O 操作接口
io_uring_prep_read(3)
NAMEio_uring_prep_read - 准备读操作SYNOPSIS#include <liburing.h>void io_uring_prep_read(struct io_uring_sqe *sqe, int fd, void *buf, unsigned nbytes, __u64 offset);DESCRIPTION准备一个异步读操作。从 fd 读取 nbytes 字节到 buf,从 offset 开始。SEE ALSOio_uring_prep_write(3)
io_uring_prep_write(3)
NAMEio_uring_prep_write - 准备写操作SYNOPSIS#include <liburing.h>void io_uring_prep_write(struct io_uring_sqe *sqe, int fd, const void *buf,unsigned nbytes, __u64 offset);DESCRIPTION准备一个异步写操作。将 buf 中的 nbytes 字节写入 fd,从 offset 开始。SEE ALSOio_uring_prep_read(3)
io_uring_prep_openat(3)
NAMEio_uring_prep_openat - 准备文件打开操作SYNOPSIS#include <liburing.h>void io_uring_prep_openat(struct io_uring_sqe *sqe, int dfd, const char *path,int flags, mode_t mode);DESCRIPTION准备一个异步文件打开操作。dfd 是目录文件描述符,path 是相对路径。SEE ALSOio_uring_prep_close(3)
io_uring_prep_close(3)
NAMEio_uring_prep_close - 准备文件关闭操作SYNOPSIS#include <liburing.h>void io_uring_prep_close(struct io_uring_sqe *sqe, int fd);DESCRIPTION准备一个异步文件关闭操作。SEE ALSOio_uring_prep_openat(3)
io_uring 使用示例
示例1:异步文件读写
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <liburing.h>#define QUEUE_DEPTH 32
#define BLOCK_SIZE 4096int async_file_copy(const char *src, const char *dst) {struct io_uring ring;int src_fd, dst_fd;char *buffer;struct io_uring_sqe *sqe;struct io_uring_cqe *cqe;int ret;// 初始化 io_uringret = io_uring_queue_init(QUEUE_DEPTH, &ring, 0);if (ret < 0) {fprintf(stderr, "queue_init: %s\n", strerror(-ret));return 1;}// 打开源文件和目标文件src_fd = open(src, O_RDONLY);if (src_fd < 0) {perror("open src");goto cleanup_ring;}dst_fd = open(dst, O_WRONLY | O_CREAT | O_TRUNC, 0644);if (dst_fd < 0) {perror("open dst");goto cleanup_src;}// 分配缓冲区buffer = malloc(BLOCK_SIZE);if (!buffer) {perror("malloc");goto cleanup_dst;}// 异步读取和写入off_t offset = 0;ssize_t bytes_read;while (1) {// 提交读操作sqe = io_uring_get_sqe(&ring);if (!sqe) {fprintf(stderr, "get_sqe failed\n");break;}io_uring_prep_read(sqe, src_fd, buffer, BLOCK_SIZE, offset);sqe->user_data = 1; // 标识读操作ret = io_uring_submit(&ring);if (ret <= 0) {fprintf(stderr, "submit read failed: %s\n", strerror(-ret));break;}// 等待读操作完成ret = io_uring_wait_cqe(&ring, &cqe);if (ret < 0) {fprintf(stderr, "wait_cqe failed: %s\n", strerror(-ret));break;}bytes_read = cqe->res;io_uring_cqe_seen(&ring, cqe);if (bytes_read <= 0) {// 文件读取完成break;}// 提交写操作sqe = io_uring_get_sqe(&ring);if (!sqe) {fprintf(stderr, "get_sqe failed\n");break;}io_uring_prep_write(sqe, dst_fd, buffer, bytes_read, offset);sqe->user_data = 2; // 标识写操作ret = io_uring_submit(&ring);if (ret <= 0) {fprintf(stderr, "submit write failed: %s\n", strerror(-ret));break;}// 等待写操作完成ret = io_uring_wait_cqe(&ring, &cqe);if (ret < 0) {fprintf(stderr, "wait_cqe failed: %s\n", strerror(-ret));break;}if (cqe->res < 0) {fprintf(stderr, "write failed: %s\n", strerror(-cqe->res));io_uring_cqe_seen(&ring, cqe);break;}io_uring_cqe_seen(&ring, cqe);offset += bytes_read;}free(buffer);close(dst_fd);close(src_fd);io_uring_queue_exit(&ring);return 0;cleanup_dst:close(dst_fd);
cleanup_src:close(src_fd);
cleanup_ring:io_uring_queue_exit(&ring);return 1;
}int main(int argc, char *argv[]) {if (argc != 3) {fprintf(stderr, "Usage: %s <src> <dst>\n", argv[0]);return 1;}return async_file_copy(argv[1], argv[2]);
}
示例2:并发网络服务器
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <liburing.h>#define MAX_CONNECTIONS 1024
#define BUFFER_SIZE 4096
#define QUEUE_DEPTH 256struct conn_info {int fd;char buffer[BUFFER_SIZE];
};int setup_listening_socket(int port) {int sock;struct sockaddr_in addr;int opt = 1;sock = socket(AF_INET, SOCK_STREAM, 0);if (sock < 0) {perror("socket");return -1;}if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {perror("setsockopt");close(sock);return -1;}memset(&addr, 0, sizeof(addr));addr.sin_family = AF_INET;addr.sin_addr.s_addr = INADDR_ANY;addr.sin_port = htons(port);if (bind(sock, (struct sockaddr*)&addr, sizeof(addr)) < 0) {perror("bind");close(sock);return -1;}if (listen(sock, 128) < 0) {perror("listen");close(sock);return -1;}return sock;
}void add_accept_request(struct io_uring *ring, int listen_fd, struct conn_info *conn_i) {struct io_uring_sqe *sqe = io_uring_get_sqe(ring);io_uring_prep_accept(sqe, listen_fd, NULL, NULL, 0);sqe->user_data = (uint64_t)(uintptr_t)conn_i;
}void add_read_request(struct io_uring *ring, int fd, struct conn_info *conn_i) {struct io_uring_sqe *sqe = io_uring_get_sqe(ring);io_uring_prep_read(sqe, fd, conn_i->buffer, BUFFER_SIZE - 1, 0);sqe->user_data = (uint64_t)(uintptr_t)conn_i | 1UL << 63;
}void add_write_request(struct io_uring *ring, int fd, struct conn_info *conn_i, size_t len) {struct io_uring_sqe *sqe = io_uring_get_sqe(ring);io_uring_prep_write(sqe, fd, conn_i->buffer, len, 0);sqe->user_data = (uint64_t)(uintptr_t)conn_i | 1UL << 62;
}int main() {struct io_uring ring;int listen_fd;struct conn_info *conn_infos;int i;// 初始化 io_uringif (io_uring_queue_init(QUEUE_DEPTH, &ring, 0) < 0) {perror("io_uring_queue_init");return 1;}// 创建监听套接字listen_fd = setup_listening_socket(8080);if (listen_fd < 0) {return 1;}printf("Server listening on port 8080\n");// 分配连接信息结构conn_infos = malloc(MAX_CONNECTIONS * sizeof(struct conn_info));if (!conn_infos) {perror("malloc");close(listen_fd);io_uring_queue_exit(&ring);return 1;}// 提交初始的 accept 请求for (i = 0; i < MAX_CONNECTIONS; i++) {add_accept_request(&ring, listen_fd, &conn_infos[i]);}io_uring_submit(&ring);// 事件循环while (1) {struct io_uring_cqe *cqe;struct conn_info *conn_i;int type;if (io_uring_wait_cqe(&ring, &cqe) < 0) {perror("io_uring_wait_cqe");continue;}conn_i = (struct conn_info*)(uintptr_t)(cqe->user_data & ~(1UL << 63) & ~(1UL << 62));type = cqe->user_data & (1UL << 63) ? 1 : (cqe->user_data & (1UL << 62) ? 2 : 0);switch (type) {case 0: // acceptif (cqe->res > 0) {int client_fd = cqe->res;printf("New connection: %d\n", client_fd);add_read_request(&ring, client_fd, conn_i);} else {printf("Accept failed: %s\n", strerror(-cqe->res));}// 重新提交 accept 请求add_accept_request(&ring, listen_fd, conn_i);break;case 1: // readif (cqe->res > 0) {ssize_t bytes_read = cqe->res;conn_i->buffer[bytes_read] = '\0';printf("Read %zd bytes: %s\n", bytes_read, conn_i->buffer);// Echo backadd_write_request(&ring, conn_i->fd, conn_i, bytes_read);} else {// Connection closed or errorif (cqe->res < 0) {printf("Read failed: %s\n", strerror(-cqe->res));}close(conn_i->fd);}break;case 2: // writeif (cqe->res < 0) {printf("Write failed: %s\n", strerror(-cqe->res));}// 继续读取add_read_request(&ring, conn_i->fd, conn_i);break;}io_uring_cqe_seen(&ring, cqe);io_uring_submit(&ring);}free(conn_infos);close(listen_fd);io_uring_queue_exit(&ring);return 0;
}
示例3:批量文件操作
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <liburing.h>
#include <sys/stat.h>#define QUEUE_DEPTH 64
#define MAX_FILES 100struct file_info {int fd;char filename[256];char *buffer;size_t size;
};int async_file_operations() {struct io_uring ring;struct file_info files[MAX_FILES];struct io_uring_sqe *sqe;struct io_uring_cqe *cqe;int i, ret;int submitted = 0;// 初始化 io_uringret = io_uring_queue_init(QUEUE_DEPTH, &ring, 0);if (ret < 0) {fprintf(stderr, "queue_init: %s\n", strerror(-ret));return 1;}// 准备文件信息for (i = 0; i < MAX_FILES; i++) {snprintf(files[i].filename, sizeof(files[i].filename), "test_file_%d.txt", i);files[i].buffer = malloc(1024);sprintf(files[i].buffer, "Content of file %d\n", i);files[i].size = strlen(files[i].buffer);}// 批量提交文件创建和写入操作for (i = 0; i < MAX_FILES; i++) {// 打开文件sqe = io_uring_get_sqe(&ring);if (!sqe) {fprintf(stderr, "get_sqe failed\n");break;}io_uring_prep_openat(sqe, AT_FDCWD, files[i].filename, O_WRONLY | O_CREAT | O_TRUNC, 0644);sqe->user_data = (uint64_t)(i * 2); // 偶数标识符表示打开操作// 写入文件sqe = io_uring_get_sqe(&ring);if (!sqe) {fprintf(stderr, "get_sqe failed\n");break;}// 注意:这里需要先提交打开操作,获取fd后再写入// 为了简化示例,我们假设文件已存在submitted += 2;}// 提交所有操作ret = io_uring_submit(&ring);if (ret < 0) {fprintf(stderr, "submit failed: %s\n", strerror(-ret));goto cleanup;}// 收集完成结果for (i = 0; i < submitted; i++) {ret = io_uring_wait_cqe(&ring, &cqe);if (ret < 0) {fprintf(stderr, "wait_cqe failed: %s\n", strerror(-ret));continue;}if (cqe->res < 0) {fprintf(stderr, "Operation failed: %s (user_data: %lu)\n", strerror(-cqe->res), cqe->user_data);} else {printf("Operation completed successfully (user_data: %lu, result: %d)\n", cqe->user_data, cqe->res);}io_uring_cqe_seen(&ring, cqe);}cleanup:// 清理资源for (i = 0; i < MAX_FILES; i++) {if (files[i].buffer) {free(files[i].buffer);}// 删除测试文件unlink(files[i].filename);}io_uring_queue_exit(&ring);return 0;
}int main() {return async_file_operations();
}
io_uring 使用限制条件
内核版本要求
- Linux 5.1+: 基础功能
- Linux 5.5+: 更多操作类型支持
- Linux 5.6+: 性能优化
- Linux 5.7+: 更完整的功能集
硬件和系统要求
- CPU架构: 支持 x86_64, ARM64, RISC-V 等现代架构
- 内存: 需要足够的连续内存用于环形缓冲区
- 文件系统: 支持大多数现代文件系统(ext4, xfs, btrfs等)
功能限制
- 队列大小: 必须是2的幂,最大通常为4096
- 并发限制: 受系统资源和内核参数限制
- 操作类型: 并非所有系统调用都支持异步化
内存管理限制
- 缓冲区生命周期: 应用程序必须确保缓冲区在I/O完成前有效
- 内存对齐: 某些操作可能需要特定的内存对齐
- 大内存页: 建议使用大页以提高性能
并发和同步限制
- 线程安全: 同一 ring 不能被多个线程同时访问
- 信号处理: 在信号处理程序中使用需要特别注意
- 资源竞争: 需要正确处理多个 ring 之间的资源共享
错误处理限制
- 错误报告: 某些错误可能不会立即报告
- 恢复机制: 部分错误后可能需要重新初始化
- 资源泄漏: 错误处理不当可能导致资源泄漏
性能考虑
- 批处理: 小批量操作可能不如直接系统调用快
- 延迟: 第一次使用可能有初始化开销
- CPU亲和性: 可能受益于CPU绑定优化
这些限制条件在实际使用中需要仔细考虑,以确保 io_uring 能够正确和高效地工作。