当前位置: 首页 > news >正文

io_uring系统调用及示例

io_uring 库接口详解

基础接口

io_uring_queue_init(3)

NAMEio_uring_queue_init - 初始化 io_uring 实例SYNOPSIS#include <liburing.h>int io_uring_queue_init(unsigned entries, struct io_uring *ring, unsigned flags);DESCRIPTION初始化一个 io_uring 实例。entries 指定环形缓冲区的大小(必须是2的幂)。flags 可以是 IORING_SETUP_* 常量的组合。RETURN VALUE成功返回0,失败返回负的错误码。SEE ALSOio_uring_queue_exit(3)

io_uring_queue_exit(3)

NAMEio_uring_queue_exit - 释放 io_uring 实例SYNOPSIS#include <liburing.h>void io_uring_queue_exit(struct io_uring *ring);DESCRIPTION释放之前初始化的 io_uring 实例,关闭相关文件描述符。SEE ALSOio_uring_queue_init(3)

io_uring_get_sqe(3)

NAMEio_uring_get_sqe - 获取提交队列项SYNOPSIS#include <liburing.h>struct io_uring_sqe *io_uring_get_sqe(struct io_uring *ring);DESCRIPTION从提交队列中获取一个可用的 SQE(提交队列项),用于构建 I/O 操作。RETURN VALUE成功返回 SQE 指针,如果队列满则返回 NULL。SEE ALSOio_uring_submit(3)

io_uring_submit(3)

NAMEio_uring_submit - 提交 I/O 操作到内核SYNOPSIS#include <liburing.h>int io_uring_submit(struct io_uring *ring);DESCRIPTION将准备好的 SQE 提交到内核执行。返回提交的操作数量。RETURN VALUE成功返回提交的操作数,失败返回负的错误码。SEE ALSOio_uring_wait_cqe(3)

io_uring_wait_cqe(3)

NAMEio_uring_wait_cqe - 等待完成队列项SYNOPSIS#include <liburing.h>int io_uring_wait_cqe(struct io_uring *ring, struct io_uring_cqe **cqe_ptr);DESCRIPTION等待至少一个 I/O 操作完成,并返回对应的 CQE(完成队列项)。RETURN VALUE成功返回0,失败返回负的错误码。SEE ALSOio_uring_peek_cqe(3), io_uring_cqe_seen(3)

io_uring_peek_cqe(3)

NAMEio_uring_peek_cqe - 非阻塞检查完成队列项SYNOPSIS#include <liburing.h>int io_uring_peek_cqe(struct io_uring *ring, struct io_uring_cqe **cqe_ptr);DESCRIPTION非阻塞地检查是否有完成的 I/O 操作,如果有则返回对应的 CQE。RETURN VALUE成功返回0,没有完成项返回 -EAGAIN,其他错误返回负的错误码。SEE ALSOio_uring_wait_cqe(3)

io_uring_cqe_seen(3)

NAMEio_uring_cqe_seen - 标记完成队列项已处理SYNOPSIS#include <liburing.h>void io_uring_cqe_seen(struct io_uring *ring, struct io_uring_cqe *cqe);DESCRIPTION标记指定的 CQE 已被处理,从完成队列中移除。SEE ALSOio_uring_wait_cqe(3)

I/O 操作接口

io_uring_prep_read(3)

NAMEio_uring_prep_read - 准备读操作SYNOPSIS#include <liburing.h>void io_uring_prep_read(struct io_uring_sqe *sqe, int fd, void *buf, unsigned nbytes, __u64 offset);DESCRIPTION准备一个异步读操作。从 fd 读取 nbytes 字节到 buf,从 offset 开始。SEE ALSOio_uring_prep_write(3)

io_uring_prep_write(3)

NAMEio_uring_prep_write - 准备写操作SYNOPSIS#include <liburing.h>void io_uring_prep_write(struct io_uring_sqe *sqe, int fd, const void *buf,unsigned nbytes, __u64 offset);DESCRIPTION准备一个异步写操作。将 buf 中的 nbytes 字节写入 fd,从 offset 开始。SEE ALSOio_uring_prep_read(3)

io_uring_prep_openat(3)

NAMEio_uring_prep_openat - 准备文件打开操作SYNOPSIS#include <liburing.h>void io_uring_prep_openat(struct io_uring_sqe *sqe, int dfd, const char *path,int flags, mode_t mode);DESCRIPTION准备一个异步文件打开操作。dfd 是目录文件描述符,path 是相对路径。SEE ALSOio_uring_prep_close(3)

io_uring_prep_close(3)

NAMEio_uring_prep_close - 准备文件关闭操作SYNOPSIS#include <liburing.h>void io_uring_prep_close(struct io_uring_sqe *sqe, int fd);DESCRIPTION准备一个异步文件关闭操作。SEE ALSOio_uring_prep_openat(3)

io_uring 使用示例

示例1:异步文件读写

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <liburing.h>#define QUEUE_DEPTH 32
#define BLOCK_SIZE 4096int async_file_copy(const char *src, const char *dst) {struct io_uring ring;int src_fd, dst_fd;char *buffer;struct io_uring_sqe *sqe;struct io_uring_cqe *cqe;int ret;// 初始化 io_uringret = io_uring_queue_init(QUEUE_DEPTH, &ring, 0);if (ret < 0) {fprintf(stderr, "queue_init: %s\n", strerror(-ret));return 1;}// 打开源文件和目标文件src_fd = open(src, O_RDONLY);if (src_fd < 0) {perror("open src");goto cleanup_ring;}dst_fd = open(dst, O_WRONLY | O_CREAT | O_TRUNC, 0644);if (dst_fd < 0) {perror("open dst");goto cleanup_src;}// 分配缓冲区buffer = malloc(BLOCK_SIZE);if (!buffer) {perror("malloc");goto cleanup_dst;}// 异步读取和写入off_t offset = 0;ssize_t bytes_read;while (1) {// 提交读操作sqe = io_uring_get_sqe(&ring);if (!sqe) {fprintf(stderr, "get_sqe failed\n");break;}io_uring_prep_read(sqe, src_fd, buffer, BLOCK_SIZE, offset);sqe->user_data = 1; // 标识读操作ret = io_uring_submit(&ring);if (ret <= 0) {fprintf(stderr, "submit read failed: %s\n", strerror(-ret));break;}// 等待读操作完成ret = io_uring_wait_cqe(&ring, &cqe);if (ret < 0) {fprintf(stderr, "wait_cqe failed: %s\n", strerror(-ret));break;}bytes_read = cqe->res;io_uring_cqe_seen(&ring, cqe);if (bytes_read <= 0) {// 文件读取完成break;}// 提交写操作sqe = io_uring_get_sqe(&ring);if (!sqe) {fprintf(stderr, "get_sqe failed\n");break;}io_uring_prep_write(sqe, dst_fd, buffer, bytes_read, offset);sqe->user_data = 2; // 标识写操作ret = io_uring_submit(&ring);if (ret <= 0) {fprintf(stderr, "submit write failed: %s\n", strerror(-ret));break;}// 等待写操作完成ret = io_uring_wait_cqe(&ring, &cqe);if (ret < 0) {fprintf(stderr, "wait_cqe failed: %s\n", strerror(-ret));break;}if (cqe->res < 0) {fprintf(stderr, "write failed: %s\n", strerror(-cqe->res));io_uring_cqe_seen(&ring, cqe);break;}io_uring_cqe_seen(&ring, cqe);offset += bytes_read;}free(buffer);close(dst_fd);close(src_fd);io_uring_queue_exit(&ring);return 0;cleanup_dst:close(dst_fd);
cleanup_src:close(src_fd);
cleanup_ring:io_uring_queue_exit(&ring);return 1;
}int main(int argc, char *argv[]) {if (argc != 3) {fprintf(stderr, "Usage: %s <src> <dst>\n", argv[0]);return 1;}return async_file_copy(argv[1], argv[2]);
}

示例2:并发网络服务器

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <liburing.h>#define MAX_CONNECTIONS 1024
#define BUFFER_SIZE 4096
#define QUEUE_DEPTH 256struct conn_info {int fd;char buffer[BUFFER_SIZE];
};int setup_listening_socket(int port) {int sock;struct sockaddr_in addr;int opt = 1;sock = socket(AF_INET, SOCK_STREAM, 0);if (sock < 0) {perror("socket");return -1;}if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {perror("setsockopt");close(sock);return -1;}memset(&addr, 0, sizeof(addr));addr.sin_family = AF_INET;addr.sin_addr.s_addr = INADDR_ANY;addr.sin_port = htons(port);if (bind(sock, (struct sockaddr*)&addr, sizeof(addr)) < 0) {perror("bind");close(sock);return -1;}if (listen(sock, 128) < 0) {perror("listen");close(sock);return -1;}return sock;
}void add_accept_request(struct io_uring *ring, int listen_fd, struct conn_info *conn_i) {struct io_uring_sqe *sqe = io_uring_get_sqe(ring);io_uring_prep_accept(sqe, listen_fd, NULL, NULL, 0);sqe->user_data = (uint64_t)(uintptr_t)conn_i;
}void add_read_request(struct io_uring *ring, int fd, struct conn_info *conn_i) {struct io_uring_sqe *sqe = io_uring_get_sqe(ring);io_uring_prep_read(sqe, fd, conn_i->buffer, BUFFER_SIZE - 1, 0);sqe->user_data = (uint64_t)(uintptr_t)conn_i | 1UL << 63;
}void add_write_request(struct io_uring *ring, int fd, struct conn_info *conn_i, size_t len) {struct io_uring_sqe *sqe = io_uring_get_sqe(ring);io_uring_prep_write(sqe, fd, conn_i->buffer, len, 0);sqe->user_data = (uint64_t)(uintptr_t)conn_i | 1UL << 62;
}int main() {struct io_uring ring;int listen_fd;struct conn_info *conn_infos;int i;// 初始化 io_uringif (io_uring_queue_init(QUEUE_DEPTH, &ring, 0) < 0) {perror("io_uring_queue_init");return 1;}// 创建监听套接字listen_fd = setup_listening_socket(8080);if (listen_fd < 0) {return 1;}printf("Server listening on port 8080\n");// 分配连接信息结构conn_infos = malloc(MAX_CONNECTIONS * sizeof(struct conn_info));if (!conn_infos) {perror("malloc");close(listen_fd);io_uring_queue_exit(&ring);return 1;}// 提交初始的 accept 请求for (i = 0; i < MAX_CONNECTIONS; i++) {add_accept_request(&ring, listen_fd, &conn_infos[i]);}io_uring_submit(&ring);// 事件循环while (1) {struct io_uring_cqe *cqe;struct conn_info *conn_i;int type;if (io_uring_wait_cqe(&ring, &cqe) < 0) {perror("io_uring_wait_cqe");continue;}conn_i = (struct conn_info*)(uintptr_t)(cqe->user_data & ~(1UL << 63) & ~(1UL << 62));type = cqe->user_data & (1UL << 63) ? 1 : (cqe->user_data & (1UL << 62) ? 2 : 0);switch (type) {case 0: // acceptif (cqe->res > 0) {int client_fd = cqe->res;printf("New connection: %d\n", client_fd);add_read_request(&ring, client_fd, conn_i);} else {printf("Accept failed: %s\n", strerror(-cqe->res));}// 重新提交 accept 请求add_accept_request(&ring, listen_fd, conn_i);break;case 1: // readif (cqe->res > 0) {ssize_t bytes_read = cqe->res;conn_i->buffer[bytes_read] = '\0';printf("Read %zd bytes: %s\n", bytes_read, conn_i->buffer);// Echo backadd_write_request(&ring, conn_i->fd, conn_i, bytes_read);} else {// Connection closed or errorif (cqe->res < 0) {printf("Read failed: %s\n", strerror(-cqe->res));}close(conn_i->fd);}break;case 2: // writeif (cqe->res < 0) {printf("Write failed: %s\n", strerror(-cqe->res));}// 继续读取add_read_request(&ring, conn_i->fd, conn_i);break;}io_uring_cqe_seen(&ring, cqe);io_uring_submit(&ring);}free(conn_infos);close(listen_fd);io_uring_queue_exit(&ring);return 0;
}

示例3:批量文件操作

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <liburing.h>
#include <sys/stat.h>#define QUEUE_DEPTH 64
#define MAX_FILES 100struct file_info {int fd;char filename[256];char *buffer;size_t size;
};int async_file_operations() {struct io_uring ring;struct file_info files[MAX_FILES];struct io_uring_sqe *sqe;struct io_uring_cqe *cqe;int i, ret;int submitted = 0;// 初始化 io_uringret = io_uring_queue_init(QUEUE_DEPTH, &ring, 0);if (ret < 0) {fprintf(stderr, "queue_init: %s\n", strerror(-ret));return 1;}// 准备文件信息for (i = 0; i < MAX_FILES; i++) {snprintf(files[i].filename, sizeof(files[i].filename), "test_file_%d.txt", i);files[i].buffer = malloc(1024);sprintf(files[i].buffer, "Content of file %d\n", i);files[i].size = strlen(files[i].buffer);}// 批量提交文件创建和写入操作for (i = 0; i < MAX_FILES; i++) {// 打开文件sqe = io_uring_get_sqe(&ring);if (!sqe) {fprintf(stderr, "get_sqe failed\n");break;}io_uring_prep_openat(sqe, AT_FDCWD, files[i].filename, O_WRONLY | O_CREAT | O_TRUNC, 0644);sqe->user_data = (uint64_t)(i * 2); // 偶数标识符表示打开操作// 写入文件sqe = io_uring_get_sqe(&ring);if (!sqe) {fprintf(stderr, "get_sqe failed\n");break;}// 注意:这里需要先提交打开操作,获取fd后再写入// 为了简化示例,我们假设文件已存在submitted += 2;}// 提交所有操作ret = io_uring_submit(&ring);if (ret < 0) {fprintf(stderr, "submit failed: %s\n", strerror(-ret));goto cleanup;}// 收集完成结果for (i = 0; i < submitted; i++) {ret = io_uring_wait_cqe(&ring, &cqe);if (ret < 0) {fprintf(stderr, "wait_cqe failed: %s\n", strerror(-ret));continue;}if (cqe->res < 0) {fprintf(stderr, "Operation failed: %s (user_data: %lu)\n", strerror(-cqe->res), cqe->user_data);} else {printf("Operation completed successfully (user_data: %lu, result: %d)\n", cqe->user_data, cqe->res);}io_uring_cqe_seen(&ring, cqe);}cleanup:// 清理资源for (i = 0; i < MAX_FILES; i++) {if (files[i].buffer) {free(files[i].buffer);}// 删除测试文件unlink(files[i].filename);}io_uring_queue_exit(&ring);return 0;
}int main() {return async_file_operations();
}

io_uring 使用限制条件

内核版本要求

  • Linux 5.1+: 基础功能
  • Linux 5.5+: 更多操作类型支持
  • Linux 5.6+: 性能优化
  • Linux 5.7+: 更完整的功能集

硬件和系统要求

  1. CPU架构: 支持 x86_64, ARM64, RISC-V 等现代架构
  2. 内存: 需要足够的连续内存用于环形缓冲区
  3. 文件系统: 支持大多数现代文件系统(ext4, xfs, btrfs等)

功能限制

  1. 队列大小: 必须是2的幂,最大通常为4096
  2. 并发限制: 受系统资源和内核参数限制
  3. 操作类型: 并非所有系统调用都支持异步化

内存管理限制

  1. 缓冲区生命周期: 应用程序必须确保缓冲区在I/O完成前有效
  2. 内存对齐: 某些操作可能需要特定的内存对齐
  3. 大内存页: 建议使用大页以提高性能

并发和同步限制

  1. 线程安全: 同一 ring 不能被多个线程同时访问
  2. 信号处理: 在信号处理程序中使用需要特别注意
  3. 资源竞争: 需要正确处理多个 ring 之间的资源共享

错误处理限制

  1. 错误报告: 某些错误可能不会立即报告
  2. 恢复机制: 部分错误后可能需要重新初始化
  3. 资源泄漏: 错误处理不当可能导致资源泄漏

性能考虑

  1. 批处理: 小批量操作可能不如直接系统调用快
  2. 延迟: 第一次使用可能有初始化开销
  3. CPU亲和性: 可能受益于CPU绑定优化

这些限制条件在实际使用中需要仔细考虑,以确保 io_uring 能够正确和高效地工作。

http://www.dtcms.com/a/316930.html

相关文章:

  • Houdini Pyro学习笔记
  • [数组]977.有序数组的平方;209.长度最小的子数组
  • VUE+SPRINGBOOT从0-1打造前后端-前后台系统-邮箱重置密码
  • 数据结构——双向链表
  • 【学习嵌入式day-17-数据结构-单向链表/双向链表】
  • C语言:预处理、 库文件、 文件IO
  • Python深度学习:从入门到进阶
  • GPT-1、GPT-2、GPT-3 的区别和联系
  • C语言基础_IDE、进制转换、基本数据类型、输入输出函数、运算符
  • 一文搞定JavaServerPages基础,从0开始写一个登录与人数统计页面
  • 模拟面试总结
  • JSP相关Bug解决
  • Vue.js 教程
  • 市场与销售协同:CRM如何打破部门数据孤岛?
  • 思途Mybatis学习 0805
  • 一个小巧神奇的 USB数据线检测仪
  • LabVIEW 2025 安装攻略(附图文教程)适用于测试与自动控制领域
  • 亚马逊广告进阶指南:大词 VS 长尾词
  • 数据结构2.(双向链表,循环链表及内核链表)
  • 怎么在公司存活下去
  • SAP FI模块凭证增强逻辑的策略
  • 飞算 JavaAI:开启 Java 开发智能自动化新时代
  • open3d python 鞋底点云点胶路径识别
  • windows 系统装机入职版
  • Java 模版进阶
  • C#案例实战
  • 18day-人工智能-机器学习-分类算法-朴素贝叶斯分类
  • 8.5学习总结
  • Linux下部署Minecraft服务器
  • sqli-labs靶场less36-less40