SpringBoot 如何实现零拷贝:深度解析零拷贝技术
一、前言
1. 为什么引入零拷贝?
如果服务端要提供文件传输的功能,我们能想到的最简单的方式是:将磁盘上的文件读取出来,然后通过网络协议发送给客户端。
传统 I/O 的工作方式是,数据读取和写入是从用户空间到内核空间来回复制,而内核空间的数据是通过操作系统层面的 I/O 接口从磁盘读取或写入。
C:
read(file, tmp_buf, len);
write(socket, tmp_buf, len);
Java:
// 传统文件读取和发送的伪代码表示
public class TraditionalFileTransfer {public void transferFile(File file, Socket socket) throws IOException {byte[] buffer = new byte[8192]; // 用户空间缓冲区// 1. 从磁盘读取到内核缓冲区(DMA拷贝)// 2. 从内核缓冲区拷贝到用户缓冲区(CPU拷贝)FileInputStream fis = new FileInputStream(file);int bytesRead;while ((bytesRead = fis.read(buffer)) != -1) {// 3. 从用户缓冲区拷贝到Socket缓冲区(CPU拷贝)// 4. 从Socket缓冲区拷贝到网卡缓冲区(DMA拷贝)socket.getOutputStream().write(buffer, 0, bytesRead);}}
}
代码很简单,虽然就两行代码,但是这里面发生了不少的事情。
首先,期间共发生了 4 次用户态与内核态的上下文切换,因为发生了两次系统调用,一次是 read() ,一次是 write(),每次系统调用都得先从用户态切换到内核态,等内核完成任务后,再从内核态切换回用户态。
上下文切换到成本并不小,一次切换需要耗时几十纳秒到几微秒,虽然时间看上去很短,但是在高并发的场景下,这类时间容易被累积和放大,从而影响系统的性能。
其次,还发生了 4 次数据拷贝,其中两次是 DMA 的拷贝,另外两次则是通过 CPU 拷贝的,下面说一下这个过程:
- 第一次拷贝,把磁盘上的数据拷贝到操作系统内核的缓冲区里,这个拷贝的过程是通过 DMA 搬运的。
- 第二次拷贝,把内核缓冲区的数据拷贝到用户的缓冲区里,于是我们应用程序就可以使用这部分数据了,这个拷贝到过程是由 CPU 完成的。
- 第三次拷贝,把刚才拷贝到用户的缓冲区里的数据,再拷贝到内核的 socket 的缓冲区里,这个过程依然还是由 CPU 搬运的。
- 第四次拷贝,把内核的 socket 缓冲区里的数据,拷贝到网卡的缓冲区里,这个过程又是由 DMA 搬运的。
我们回过头看这个文件传输的过程,我们只是搬运一份数据,结果却搬运了 4 次,过多的数据拷贝无疑会消耗 CPU 资源,大大降低了系统性能。
这种简单又传统的文件传输方式,存在冗余的上文切换和数据拷贝,在高并发系统里是非常糟糕的,多了很多不必要的开销,会严重影响系统性能。
所以,要想提高文件传输的性能,就需要减少「用户态与内核态的上下文切换」和「内存拷贝」的次数。
二、零拷贝原理
1. mmap + write 实现零拷贝
在前面我们知道,read() 系统调用的过程中会把内核缓冲区的数据拷贝到用户的缓冲区里,于是为了减少这一步开销,我们可以用 mmap() 替换 read() 系统调用函数。
buf = mmap(file, len);
write(sockfd, buf, len);
mmap() 系统调用函数会直接把内核缓冲区里的数据「映射」到用户空间,这样,操作系统内核与用户空间就不需要再进行任何的数据拷贝操作。
具体过程如下:
- 应用进程调用了 mmap() 后,DMA 会把磁盘的数据拷贝到内核的缓冲区里。接着,应用进程跟操作系统内核「共享」这个缓冲区;
- 应用进程再调用 write(),操作系统直接将内核缓冲区的数据拷贝到 socket 缓冲区中,这一切都发生在内核态,由 CPU 来搬运数据;
- 最后,把内核的 socket 缓冲区里的数据,拷贝到网卡的缓冲区里,这个过程是由 DMA 搬运的。
我们可以得知,通过使用 mmap() 来代替 read(), 可以减少一次数据拷贝的过程。
但这还不是最理想的零拷贝,因为仍然需要通过 CPU 把内核缓冲区的数据拷贝到 socket 缓冲区里,而且仍然需要 4 次上下文切换,因为系统调用还是 2 次。
2. sendfile 实现零拷贝
在 Linux 内核版本 2.1 中,提供了一个专门发送文件的系统调用函数 sendfile(),函数形式如下:
#include <sys/socket.h>
ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count);
它的前两个参数分别是目的端和源端的文件描述符,后面两个参数是源端的偏移量和复制数据的长度,返回值是实际复制数据的长度。
首先,它可以替代前面的 read() 和 write() 这两个系统调用,这样就可以减少一次系统调用,也就减少了 2 次上下文切换的开销。
其次,该系统调用,可以直接把内核缓冲区里的数据拷贝到 socket 缓冲区里,不再拷贝到用户态,这样就只有 2 次上下文切换,和 3 次数据拷贝。如下图:
但是这还不是真正的零拷贝技术,如果网卡支持 SG-DMA(The Scatter-Gather Direct Memory Access)技术(和普通的 DMA 有所不同),我们可以进一步减少通过 CPU 把内核缓冲区里的数据拷贝到 socket 缓冲区的过程。
你可以在你的 Linux 系统通过下面这个命令,查看网卡是否支持 scatter-gather 特性:
$ ethtool -k eth0 | grep scatter-gather
scatter-gather: on
于是,从 Linux 内核 2.4 版本开始起,对于支持网卡支持 SG-DMA 技术的情况下, sendfile() 系统调用的过程发生了点变化,具体过程如下:
- 第一步,通过 DMA 将磁盘上的数据拷贝到内核缓冲区里;
- 第二步,缓冲区描述符和数据长度传到 socket 缓冲区,这样网卡的 SG-DMA 控制- 器就可以直接将内核缓存中的数据拷贝到网卡的缓冲区里,此过程不需要将数据从操作系统内核缓冲区拷贝到 socket 缓冲区中,这样就减少了一次数据拷贝;
所以,这个过程之中,只进行了 2 次数据拷贝,如下图:
这就是所谓的零拷贝(Zero-copy)技术,因为我们没有在内存层面去拷贝数据,也就是说全程没有通过 CPU 来搬运数据,所有的数据都是通过 DMA 来进行传输的。
零拷贝技术的文件传输方式相比传统文件传输的方式,减少了 2 次上下文切换和数据拷贝次数,只需要 2 次上下文切换和数据拷贝次数,就可以完成文件的传输,而且 2 次的数据拷贝过程,都不需要通过 CPU,2 次都是由 DMA 来搬运。
三、SpringBoot 零拷贝实现
1. 基于NIO的FileChannel实现
核心文件传输服务
@Service
@Slf4j
public class ZeroCopyFileService {private static final int BUFFER_SIZE = 8192;/*** 使用FileChannel.transferTo实现零拷贝文件传输* 这是最高效的零拷贝实现方式*/public void transferFileWithZeroCopy(File file, ServletResponse response) throws IOException {try (FileChannel fileChannel = new FileInputStream(file).getChannel();WritableByteChannel outputChannel = Channels.newChannel(response.getOutputStream())) {long position = 0;long fileSize = fileChannel.size();// 使用transferTo进行零拷贝传输while (position < fileSize) {long transferred = fileChannel.transferTo(position, fileSize - position, outputChannel);if (transferred <= 0) {break;}position += transferred;}log.debug("零拷贝文件传输完成: {}, 文件大小: {} bytes", file.getName(), fileSize);} catch (IOException e) {log.error("零拷贝文件传输失败: {}", file.getName(), e);throw e;}}/*** 使用MappedByteBuffer实现内存映射文件传输* 适合大文件的分块处理*/public void transferFileWithMmap(File file, ServletResponse response, long chunkSize) throws IOException {if (chunkSize <= 0) {chunkSize = 1024 * 1024; // 默认1MB分块}try (FileChannel fileChannel = new RandomAccessFile(file, "r").getChannel()) {long fileSize = fileChannel.size();long position = 0;while (position < fileSize) {long size = Math.min(chunkSize, fileSize - position);// 创建内存映射MappedByteBuffer mappedBuffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, position, size);// 通过Channel发送映射的缓冲区WritableByteChannel outputChannel = Channels.newChannel(response.getOutputStream());while (mappedBuffer.hasRemaining()) {outputChannel.write(mappedBuffer);}// 清理映射clean(mappedBuffer);position += size;}log.debug("内存映射文件传输完成: {}, 文件大小: {} bytes", file.getName(), fileSize);}}/*** 清理MappedByteBuffer*/private void clean(MappedByteBuffer buffer) {if (buffer == null || !buffer.isDirect()) return;try {Method cleanerMethod = buffer.getClass().getMethod("cleaner");cleanerMethod.setAccessible(true);Object cleaner = cleanerMethod.invoke(buffer);if (cleaner != null) {Method cleanMethod = cleaner.getClass().getMethod("clean");cleanMethod.setAccessible(true);cleanMethod.invoke(cleaner);}} catch (Exception e) {log.warn("清理MappedByteBuffer失败", e);}}
}
2. Spring Web 零拷贝控制器实现
RESTful文件传输接口
@RestController
@RequestMapping("/api/file")
@Slf4j
public class ZeroCopyFileController {@Autowiredprivate ZeroCopyFileService zeroCopyFileService;@Value("${file.upload.dir:/tmp/uploads}")private String uploadDir;/*** 零拷贝文件下载*/@GetMapping("/download/{filename}")public void downloadFile(@PathVariable String filename, HttpServletRequest request,HttpServletResponse response) throws IOException {File file = new File(uploadDir, filename);if (!file.exists() || !file.isFile()) {response.sendError(HttpStatus.NOT_FOUND.value(), "文件不存在");return;}// 设置响应头setupFileDownloadHeaders(response, file, filename);try {// 使用零拷贝传输文件zeroCopyFileService.transferFileWithZeroCopy(file, response);log.info("文件下载完成: {}, 大小: {} bytes, 客户端: {}", filename, file.length(), getClientIp(request));} catch (IOException e) {log.error("文件下载失败: {}", filename, e);if (!response.isCommitted()) {response.sendError(HttpStatus.INTERNAL_SERVER_ERROR.value(), "文件下载失败");}}}/*** 大文件分块下载(支持断点续传)*/@GetMapping("/download/{filename}/chunked")public void downloadFileChunked(@PathVariable String filename,@RequestHeader(value = "Range", required = false) String rangeHeader,HttpServletRequest request,HttpServletResponse response) throws IOException {File file = new File(uploadDir, filename);if (!file.exists() || !file.isFile()) {response.sendError(HttpStatus.NOT_FOUND.value(), "文件不存在");return;}long fileLength = file.length();long start = 0;long end = fileLength - 1;// 处理范围请求(断点续传)if (rangeHeader != null && rangeHeader.startsWith("bytes=")) {String[] ranges = rangeHeader.substring(6).split("-");start = Long.parseLong(ranges[0]);if (ranges.length > 1) {end = Long.parseLong(ranges[1]);}end = Math.min(end, fileLength - 1);response.setStatus(HttpStatus.PARTIAL_CONTENT.value());response.setHeader("Content-Range", String.format("bytes %d-%d/%d", start, end, fileLength));}long contentLength = end - start + 1;setupFileDownloadHeaders(response, file, filename);response.setContentLengthLong(contentLength);// 使用内存映射进行分块传输try (RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r");FileChannel fileChannel = randomAccessFile.getChannel()) {// 定位到指定位置WritableByteChannel outputChannel = Channels.newChannel(response.getOutputStream());fileChannel.transferTo(start, contentLength, outputChannel);log.info("文件分块下载完成: {} [{} - {}], 客户端: {}", filename, start, end, getClientIp(request));} catch (IOException e) {log.error("文件分块下载失败: {}", filename, e);if (!response.isCommitted()) {response.sendError(HttpStatus.INTERNAL_SERVER_ERROR.value(), "文件下载失败");}}}/*** 设置文件下载响应头*/private void setupFileDownloadHeaders(HttpServletResponse response, File file, String filename) {// 内容类型String contentType = determineContentType(filename);response.setContentType(contentType);// 内容处置String encodedFilename = URLEncoder.encode(filename, StandardCharsets.UTF_8).replaceAll("\\+", "%20");response.setHeader("Content-Disposition", "attachment; filename*=UTF-8''" + encodedFilename);// 缓存控制response.setHeader("Cache-Control", "private, max-age=300");response.setHeader("Pragma", "private");response.setDateHeader("Expires", System.currentTimeMillis() + 300 * 1000);// 文件大小response.setContentLengthLong(file.length());// 支持断点续传response.setHeader("Accept-Ranges", "bytes");}/*** 根据文件名确定内容类型*/private String determineContentType(String filename) {String extension = FilenameUtils.getExtension(filename).toLowerCase();switch (extension) {case "pdf": return "application/pdf";case "jpg": case "jpeg": return "image/jpeg";case "png": return "image/png";case "txt": return "text/plain";case "zip": return "application/zip";default: return "application/octet-stream";}}/*** 获取客户端IP*/private String getClientIp(HttpServletRequest request) {String xForwardedFor = request.getHeader("X-Forwarded-For");if (xForwardedFor != null && !xForwardedFor.isEmpty()) {return xForwardedFor.split(",")[0];}return request.getRemoteAddr();}
}
3. 高级特性:异步零拷贝处理
异步文件处理器
@Component
@Slf4j
public class AsyncZeroCopyProcessor {@Autowiredprivate ZeroCopyFileService zeroCopyFileService;private final ExecutorService asyncExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(),new ThreadFactoryBuilder().setNameFormat("zero-copy-async-%d").setDaemon(true).build());/*** 异步零拷贝文件处理*/public CompletableFuture<Void> processFileAsync(File file, ServletResponse response) {return CompletableFuture.runAsync(() -> {try {zeroCopyFileService.transferFileWithZeroCopy(file, response);} catch (IOException e) {log.error("异步文件处理失败: {}", file.getName(), e);throw new CompletionException(e);}}, asyncExecutor);}/*** 批量文件异步传输*/public CompletableFuture<Void> processBatchFilesAsync(List<File> files,ServletResponse response) {List<CompletableFuture<Void>> futures = files.stream().map(file -> processFileAsync(file, response)).collect(Collectors.toList());return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));}@PreDestroypublic void shutdown() {asyncExecutor.shutdown();try {if (!asyncExecutor.awaitTermination(5, TimeUnit.SECONDS)) {asyncExecutor.shutdownNow();}} catch (InterruptedException e) {asyncExecutor.shutdownNow();Thread.currentThread().interrupt();}}
}
4. Netty 零拷贝集成
Netty文件服务器实现
@Component
@Slf4j
public class NettyZeroCopyServer {@Value("${netty.server.port:8080}")private int port;private EventLoopGroup bossGroup;private EventLoopGroup workerGroup;private ChannelFuture channelFuture;/*** 启动Netty零拷贝文件服务器*/@PostConstructpublic void start() throws InterruptedException {bossGroup = new NioEventLoopGroup(1);workerGroup = new NioEventLoopGroup();try {ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new HttpServerCodec());pipeline.addLast(new HttpObjectAggregator(65536));pipeline.addLast(new ZeroCopyFileHandler());}}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);channelFuture = bootstrap.bind(port).sync();log.info("Netty零拷贝文件服务器启动成功,端口: {}", port);} catch (Exception e) {log.error("Netty服务器启动失败", e);stop();throw e;}}/*** Netty零拷贝文件处理器*/private static class ZeroCopyFileHandler extends SimpleChannelInboundHandler<FullHttpRequest> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) {if (!request.decoderResult().isSuccess()) {sendError(ctx, HttpResponseStatus.BAD_REQUEST);return;}if (request.method() != HttpMethod.GET) {sendError(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED);return;}String uri = request.uri();if (!uri.startsWith("/file/")) {sendError(ctx, HttpResponseStatus.NOT_FOUND);return;}String filename = uri.substring(6); // 去掉"/file/"File file = new File("/data/files", filename);if (!file.exists() || file.isDirectory()) {sendError(ctx, HttpResponseStatus.NOT_FOUND);return;}// 使用零拷贝发送文件try {RandomAccessFile raf = new RandomAccessFile(file, "r");long fileLength = raf.length();HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);HttpUtil.setContentLength(response, fileLength);response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/octet-stream");// 设置文件下载头String encodedFilename = URLEncoder.encode(filename, StandardCharsets.UTF_8).replaceAll("\\+", "%20");response.headers().set(HttpHeaderNames.CONTENT_DISPOSITION,"attachment; filename*=UTF-8''" + encodedFilename);// 写入HTTP响应头ctx.write(response);// 使用零拷贝发送文件内容FileRegion region = new DefaultFileRegion(raf.getChannel(), 0, fileLength);ctx.write(region, ctx.newProgressivePromise()).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) {if (!future.isSuccess()) {log.error("文件传输失败: {}", filename, future.cause());}try {raf.close();} catch (IOException e) {log.warn("关闭文件失败: {}", filename, e);}}});// 写入结束标记ChannelFuture lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);lastContentFuture.addListener(ChannelFutureListener.CLOSE);log.debug("Netty零拷贝文件发送完成: {}, 大小: {} bytes", filename, fileLength);} catch (Exception e) {log.error("文件处理失败: {}", filename, e);sendError(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR);}}private void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status);response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {log.error("通道处理异常", cause);ctx.close();}}/*** 停止服务器*/@PreDestroypublic void stop() {if (channelFuture != null) {channelFuture.channel().close();}if (bossGroup != null) {bossGroup.shutdownGracefully();}if (workerGroup != null) {workerGroup.shutdownGracefully();}log.info("Netty零拷贝文件服务器已停止");}
}
四、最佳实践与注意事项
1. 配置优化
零拷贝相关配置
# application.yml
server:servlet:multipart:max-file-size: 10MBmax-request-size: 10MBtomcat:max-swallow-size: 10MBmax-http-form-post-size: 10MB# 自定义零拷贝配置
zerocopy:enabled: truebuffer-size: 8192chunk-size: 1MBmax-file-size: 1GBasync-enabled: trueasync-threads: 4
2. 内存管理优化
直接内存监控与管理
@Component
@Slf4j
public class DirectMemoryMonitor {private final BufferPoolMXBean directBufferPool;public DirectMemoryMonitor() {this.directBufferPool = ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class).stream().filter(pool -> "direct".equals(pool.getName())).findFirst().orElse(null);}/*** 监控直接内存使用情况*/@Scheduled(fixedRate = 30000)public void monitorDirectMemory() {if (directBufferPool != null) {long used = directBufferPool.getMemoryUsed();long total = directBufferPool.getTotalCapacity();long count = directBufferPool.getCount();if (used > total * 0.8) {log.warn("直接内存使用率过高: {}/{} bytes ({} buffers)", used, total, count);}log.debug("直接内存使用: {}/{} bytes, 缓冲区数量: {}", used, total, count);}}/*** 获取直接内存统计*/public DirectMemoryStats getDirectMemoryStats() {if (directBufferPool == null) {return null;}DirectMemoryStats stats = new DirectMemoryStats();stats.setUsedMemory(directBufferPool.getMemoryUsed());stats.setTotalCapacity(directBufferPool.getTotalCapacity());stats.setBufferCount(directBufferPool.getCount());stats.setUsagePercentage((double) stats.getUsedMemory() / stats.getTotalCapacity() * 100);return stats;}@Datapublic static class DirectMemoryStats {private long usedMemory;private long totalCapacity;private long bufferCount;private double usagePercentage;}
}
架构设计之道在于在不同的场景采用合适的架构设计,架构设计没有完美,只有合适。
在代码的路上,我们一起砥砺前行。用代码改变世界!
- 工作 3 年还在写 CRUD,无法突破技术瓶颈?
- 想转技术管理但不会带团队?
- 想跳槽没有面试的机会?
- 不懂如何面试,迟迟拿不到 offer?
- 面试屡屡碰壁,失败原因无人指导?
- 在竞争激烈的大环境下,只有不断提升核心竞争力才能立于不败之地。
欢迎从事编程开发、技术招聘 HR 进群,欢迎大家分享自己公司的内推信息,相互帮助,一起进步!
—— 斩获心仪Offer,破解面试密码 ——