paimon.disk包:磁盘处理
FileIOChannel接口
FileIOChannel 是 Paimon 内部用于磁盘 I/O 操作的一个核心抽象,尤其在需要将大量数据溢出(spill)到本地磁盘的场景(例如外部排序)中扮演着关键角色。它代表了对一个底层文件的 I/O 通道,并提供了一套管理其生命周期(创建、读写、关闭、删除)的标准化方法。
下面我们分部分来解析这个接口。
这个接口定义了一个 I/O 通道的基本行为。
// ... existing code ...
@Public
public interface FileIOChannel {/*** Gets the channel ID of this I/O channel.** @return The channel ID.*/ID getChannelID();/** Gets the size (in bytes) of the file underlying the channel. */long getSize() throws IOException;/*** Checks whether the channel has been closed.** @return True if the channel has been closed, false otherwise.*/boolean isClosed();/*** Closes the channel. For asynchronous implementations, this method waits until all pending* requests are handled. Even if an exception interrupts the closing, the underlying* <tt>FileChannel</tt> is closed.** @throws IOException Thrown, if an error occurred while waiting for pending requests.*/void close() throws IOException;/*** Deletes the file underlying this I/O channel.** @throws IllegalStateException Thrown, when the channel is still open.*/void deleteChannel();FileChannel getNioFileChannel();/*** Closes the channel and deletes the underlying file. For asynchronous implementations, this* method waits until all pending requests are handled.** @throws IOException Thrown, if an error occurred while waiting for pending requests.*/void closeAndDelete() throws IOException;// ... existing code ...
getChannelID(): 获取此通道的唯一标识符ID。ID对象封装了文件的路径等信息。getSize(): 获取底层文件的大小(字节)。isClosed(): 检查通道是否已经关闭。close(): 关闭通道。关闭后,不能再进行 I/O 操作。deleteChannel(): 删除底层文件。调用此方法前,通道必须是关闭的,否则会抛出IllegalStateException。getNioFileChannel(): 返回原生的 Java NIOFileChannel,允许进行更底层的、直接的文件操作。closeAndDelete(): 一个方便的组合方法,用于关闭通道并立即删除其对应的文件。这在处理临时文件时非常常用。
内部类 ID:通道的唯一标识
ID 类是 FileIOChannel 的一个静态内部类,它作为每个通道的唯一身份标识。它的核心是为临时文件生成一个唯一的、不会冲突的文件路径。
// ... existing code .../** An ID identifying an underlying file channel. */class ID {private static final int RANDOM_BYTES_LENGTH = 16;private final File path;private final int bucketNum;private ID(File path, int bucketNum) {
// ... existing code ...}public ID(File basePath, int bucketNum, Random random) {this.path = new File(basePath, randomString(random) + ".channel");this.bucketNum = bucketNum;}public ID(File basePath, int bucketNum, String prefix, Random random) {this.path = new File(basePath, prefix + "-" + randomString(random) + ".channel");this.bucketNum = bucketNum;}// ... existing code ...private static String randomString(Random random) {byte[] bytes = new byte[RANDOM_BYTES_LENGTH];random.nextBytes(bytes);return StringUtils.byteToHexString(bytes);}}
// ... existing code ...
- 核心字段:
path: 一个File对象,指向底层文件的实际路径。bucketNum: 一个整型的“桶号”。这表明 Paimon 支持将临时文件分散到多个不同的目录(桶)中,以分摊 I/O 负载,避免单个磁盘成为瓶颈。
- 构造逻辑:
- 它总是基于一个基础路径
basePath来创建。 - 文件名由一个随机字符串和固定的
.channel后缀组成。随机字符串通过randomString方法生成一个16字节的随机序列并转换为十六进制字符串,这能极大地保证文件名的唯一性。 - 还支持一个可选的
prefix,使得生成的临时文件在文件名上具有一定的可识别性。
- 它总是基于一个基础路径
内部类 Enumerator:通道ID的生成器
Enumerator(枚举器)是 FileIOChannel 的另一个静态内部类。它的作用是批量生成一系列逻辑上相关联的 FileIOChannel.ID。当一个任务(如外部排序)需要创建多个溢出文件时,使用 Enumerator 可以确保这些文件被合理地分发到不同的临时目录,并且文件名具有逻辑上的关联性。
// ... existing code .../** An enumerator for channels that logically belong together. */final class Enumerator {private static final AtomicInteger GLOBAL_NUMBER = new AtomicInteger();private final File[] paths;private final String namePrefix;private int localCounter;public Enumerator(File[] basePaths, Random random) {this.paths = basePaths;this.namePrefix = FileIOChannel.ID.randomString(random);this.localCounter = 0;}public FileIOChannel.ID next() {int bucketNum = GLOBAL_NUMBER.getAndIncrement() % paths.length;String filename = String.format("%s.%06d.channel", namePrefix, (localCounter++));return new FileIOChannel.ID(new File(paths[bucketNum], filename), bucketNum);}}
}
- 构造函数: 接收一个
File[] basePaths数组,这代表了所有可用的临时文件目录。它会生成一个唯一的namePrefix,这个前缀将用于该枚举器生成的所有ID。 next()方法: 这是该类的核心。- 目录选择: 通过一个静态的原子整型
GLOBAL_NUMBER,以轮询(round-robin)的方式从paths数组中选择一个基础目录。GLOBAL_NUMBER.getAndIncrement() % paths.length确保了文件创建请求被均匀地分发到配置的各个临时目录中。 - 文件名生成: 文件名由三部分构成:共享的
namePrefix、一个本地自增的计数器localCounter(格式化为6位数字)、以及.channel后缀。例如,可能会生成randomPrefix.000000.channel,randomPrefix.000001.channel等。 - 返回ID: 最后,用选定的目录和生成的文件名创建一个新的
FileIOChannel.ID对象并返回。
- 目录选择: 通过一个静态的原子整型
总结
FileIOChannel 及其内部类 ID 和 Enumerator 共同构成了一个强大而灵活的本地临时文件管理框架。
FileIOChannel定义了对单个临时文件的标准操作接口。ID通过随机化和路径封装,确保了每个临时文件的唯一性。Enumerator则提供了一种机制,用于批量、负载均衡地创建一系列逻辑相关的临时文件。
在 Paimon 中,FileChannelManager 接口及其实现(如 FileChannelManagerImpl)会使用 Enumerator 来创建 ID,然后基于这些 ID 创建具体的 FileIOChannel 实例(如 AbstractFileIOChannel 的子类),从而为上层的排序、聚合等需要磁盘溢出的算子提供可靠的临时存储支持。
AbstractFileIOChannel
AbstractFileIOChannel 是对我们之前讨论的 FileIOChannel 接口的一个骨架实现。在软件设计中,抽象类通常用于提供子类共享的通用功能,避免代码重复。AbstractFileIOChannel 正是扮演了这个角色,它处理了所有与文件通道(FileChannel)生命周期管理相关的通用逻辑,让具体的子类可以专注于实现特定的读或写操作。
下面我们来逐一解析这个类的各个部分。
// ... existing code ...
/** Abstract {@link FileIOChannel} to share some implementation. */
public abstract class AbstractFileIOChannel implements FileIOChannel {/** Logger object for channel and its subclasses. */protected static final Logger LOG = LoggerFactory.getLogger(FileIOChannel.class);/** The ID of the underlying channel. */protected final FileIOChannel.ID id;/** A file channel for NIO access to the file. */protected final FileChannel fileChannel;// ... existing code ...
public abstract class AbstractFileIOChannel implements FileIOChannel: 这行定义清晰地说明了它是一个抽象类,并且实现了FileIOChannel接口。这意味着它必须提供(或由其子类提供)接口中定义的所有方法的实现。id:FileIOChannel.ID类型的字段,用于存储通道的唯一标识。这个 ID 在构造时传入,并且是final的,保证了通道与其底层文件的一一对应关系在生命周期内不会改变。fileChannel: 这是该类的核心。它是一个 Java NIO 的FileChannel对象,是所有文件 I/O 操作的执行者。它也是final的,在构造时被初始化。
构造函数:文件的打开与模式设置
// ... existing code ...protected AbstractFileIOChannel(FileIOChannel.ID channelID, boolean writeEnabled)throws IOException {this.id = Preconditions.checkNotNull(channelID);try {@SuppressWarnings("resource")RandomAccessFile file = new RandomAccessFile(id.getPath(), writeEnabled ? "rw" : "r");this.fileChannel = file.getChannel();} catch (IOException e) {throw new IOException("Channel to path '" + channelID.getPath() + "' could not be opened.", e);}}
// ... existing code ...
构造函数是理解这个类行为的关键。
protected访问修饰符: 构造函数是受保护的,意味着只有AbstractFileIOChannel的子类才能调用它。外部代码不能直接创建AbstractFileIOChannel的实例。- 参数:
channelID: 用于确定要打开哪个文件。writeEnabled: 一个布尔标志,决定了文件的打开模式。
- 核心逻辑:
- 它使用
new RandomAccessFile(path, mode)来打开文件。RandomAccessFile允许在文件的任意位置进行读写。 mode参数根据writeEnabled标志来决定:- 如果
writeEnabled为true,模式为"rw"(读写)。如果文件不存在,会被创建。 - 如果
writeEnabled为false,模式为"r"(只读)。如果文件不存在,会抛出异常。
- 如果
- 通过
file.getChannel()方法获取底层的FileChannel并赋值给this.fileChannel。
- 它使用
- 异常处理: 如果文件打开失败,它会捕获原始的
IOException,并包装成一个新的IOException抛出,其中包含了更明确的错误信息(如文件路径),这有助于快速定位问题。
接口方法的通用实现
AbstractFileIOChannel 为 FileIOChannel 接口中的大部分方法提供了通用的、与具体读写逻辑无关的实现。
// ... existing code ...@Overridepublic final FileIOChannel.ID getChannelID() {return this.id;}@Overridepublic long getSize() throws IOException {FileChannel channel = fileChannel;return channel == null ? 0 : channel.size();}@Overridepublic boolean isClosed() {return !this.fileChannel.isOpen();}@Overridepublic void close() throws IOException {if (this.fileChannel.isOpen()) {this.fileChannel.close();}}@Overridepublic void deleteChannel() {if (!isClosed() || this.fileChannel.isOpen()) {throw new IllegalStateException("Cannot delete a channel that is open.");}// make a best effort to delete the file. Don't report exceptions.try {File f = new File(this.id.getPath());if (f.exists()) {f.delete();}} catch (Throwable ignored) {}}@Overridepublic void closeAndDelete() throws IOException {try {close();} finally {deleteChannel();}}@Overridepublic FileChannel getNioFileChannel() {return fileChannel;}
}
getChannelID(),getSize(),isClosed(),getNioFileChannel(): 这些都是对id或fileChannel字段的直接操作或状态查询,逻辑非常直观。close(): 实现了关闭通道的逻辑,即关闭底层的fileChannel。deleteChannel():- 前置检查: 首先检查通道是否已关闭,如果未关闭就尝试删除,会直接抛出
IllegalStateException,这是一个很好的保护机制。 - 尽力而为(Best Effort): 删除操作被包裹在
try-catch(Throwable ignored)中。这意味着即使文件删除失败(例如因为权限问题或文件被其他进程占用),它也不会抛出异常,只会默默地失败。这对于临时文件的清理场景是合理的,因为清理失败通常不应该中断整个程序的执行。
- 前置检查: 首先检查通道是否已关闭,如果未关闭就尝试删除,会直接抛出
closeAndDelete(): 这是一个非常实用的组合方法。它将close()和deleteChannel()结合起来,并使用try-finally结构确保即使close()失败,deleteChannel()仍然会被尝试执行。
总结
AbstractFileIOChannel 是 Paimon 磁盘 I/O 模块中一个优秀的抽象基类。它通过封装通用的文件操作逻辑,为具体的实现类(如 BufferFileWriterImpl 和 BufferFileReaderImpl)提供了极大的便利。
- 职责清晰: 它负责文件的打开、关闭、删除和状态查询等生命周期管理。
- 代码复用: 子类无需重复编写这些通用的文件操作代码,只需继承并实现自己特定的数据读写逻辑即可。
- 健壮性: 它包含了必要的检查(如删除前检查是否关闭)和合理的异常处理策略,提高了代码的稳定性和可靠性。
通过这个抽象类,Paimon 的开发者可以快速地创建新的文件通道类型,而无需关心底层文件管理的细节。
BufferFileChannelReader
BufferFileChannelReader 是 Paimon 磁盘 I/O 模块中的一个底层辅助类。从类名可以推断出,它的核心职责是从一个 FileChannel 中读取数据,并将其填充到 Paimon 的 Buffer 对象中。它本身不管理文件的生命周期(如打开、关闭),而是专注于“读取”这一具体操作。
这个类的设计与 BufferFileWriterImpl 紧密相关,它们共同定义了一种简单而高效的磁盘存储格式。BufferFileWriterImpl 在写入时,会对每一个 Buffer(数据块)执行以下操作:
- 写入一个4字节的整数,代表这个
Buffer中有效数据的长度。 - 写入
Buffer的实际数据。
因此,磁盘上存储的文件格式是一系列连续的 [长度][数据] 块。
BufferFileChannelReader 的任务就是反向操作:按照这个格式,从文件中顺序地读取每一个数据块。
核心字段和构造函数
// ... existing code ...
public class BufferFileChannelReader {private final ByteBuffer header = ByteBuffer.allocateDirect(4);private final FileChannel fileChannel;BufferFileChannelReader(FileChannel fileChannel) {this.fileChannel = fileChannel;}
// ... existing code ...
fileChannel: 一个标准的 Java NIOFileChannel。这是数据读取的来源。这个类假设fileChannel已经打开并定位到了正确的读取位置。header: 一个大小为4字节的ByteBuffer。它被专门用来读取每个数据块前面的那个4字节长度信息。使用allocateDirect创建直接内存,这在与 I/O 操作交互时通常能获得更好的性能,因为它避免了在 Java 堆和本地堆之间进行数据拷贝。BufferFileChannelReader(FileChannel fileChannel): 构造函数是包级私有的(default access),这意味着它只能被org.apache.paimon.disk包内的其他类创建。这是一种封装,表明它是一个内部组件,不希望被外部直接使用。它接收一个外部传入的FileChannel,体现了依赖注入的设计思想。
核心方法 readBufferFromFileChannel
这是该类唯一的方法,实现了完整的读取逻辑。
// ... existing code ...public boolean readBufferFromFileChannel(Buffer buffer) throws IOException {checkArgument(fileChannel.size() - fileChannel.position() > 0);// Read headerheader.clear();fileChannel.read(header);header.flip();int size = header.getInt();if (size > buffer.getMaxCapacity()) {throw new IllegalStateException("Buffer is too small for data: "+ buffer.getMaxCapacity()+ " bytes available, but "+ size+ " needed. This is most likely due to an serialized event, which is larger than the buffer size.");}checkArgument(buffer.getSize() == 0, "Buffer not empty");fileChannel.read(buffer.getNioBuffer(0, size));buffer.setSize(size);return fileChannel.size() - fileChannel.position() == 0;}
}
我们可以将它的执行过程分解为以下几个步骤:
-
前置检查:
checkArgument(fileChannel.size() - fileChannel.position() > 0)确保文件中还有剩余数据可读。如果已经读到文件末尾,再调用此方法会抛出异常。 -
读取长度头:
header.clear(): 重置header这个ByteBuffer,准备接收新的数据。fileChannel.read(header): 从文件通道中读取4个字节到header中。header.flip(): 将header从“写模式”切换到“读模式”,以便后续从中提取数据。int size = header.getInt(): 从header中读取一个整数,这个整数就是接下来要读取的数据块的大小。
-
缓冲区容量检查:
if (size > buffer.getMaxCapacity())这是一个重要的健壮性检查。它确保传入的buffer参数有足够的容量来容纳即将读取的数据。如果容量不足,会抛出带有详细信息的IllegalStateException,有助于快速定位问题。 -
目标缓冲区状态检查:
checkArgument(buffer.getSize() == 0, "Buffer not empty")确保用于接收数据的buffer当前是空的。这是一个约定,防止意外覆盖buffer中已有的数据。 -
读取数据体:
fileChannel.read(buffer.getNioBuffer(0, size))是实际的数据读取操作。它通过buffer.getNioBuffer(0, size)获取一个代表buffer底层内存的、配置好读写范围的ByteBuffer,然后让fileChannel将数据直接读入这块内存。 -
更新Buffer状态:
buffer.setSize(size)在数据成功读入后,更新 PaimonBuffer对象的内部状态,使其size属性正确反映当前持有的数据量。 -
返回文件末尾状态:
return fileChannel.size() - fileChannel.position() == 0。方法返回一个布尔值,告诉调用者在本次读取之后,是否已经到达了文件的末尾。这是一个非常方便的设计,调用者可以通过这个返回值来决定是否继续循环读取。
Buffer就是封装了MemorySegment,加了size。
slice() 会创建一个新的 ByteBuffer 对象,但 共享同一块底层内存数据,新的 ByteBuffer 拥有独立的 position, limit, 和 mark 属性。
public ByteBuffer getNioBuffer(int index, int length) {return segment.wrap(index, length).slice();}
BufferFileWriterImpl:将 Buffer 写入文件
这个类是一个同步的、将 Buffer 写入文件的具体实现。
/** A synchronous {@link BufferFileWriter} implementation. */
public class BufferFileWriterImpl extends AbstractFileIOChannel implements BufferFileWriter {protected BufferFileWriterImpl(ID channelID) throws IOException {super(channelID, true);}@Overridepublic void writeBlock(Buffer buffer) throws IOException {ByteBuffer nioBufferReadable = buffer.getMemorySegment().wrap(0, buffer.getSize()).slice();ByteBuffer header = ByteBuffer.allocateDirect(4);header.putInt(nioBufferReadable.remaining());header.flip();FileIOUtils.writeCompletely(fileChannel, header);FileIOUtils.writeCompletely(fileChannel, nioBufferReadable);}
}
nioBufferReadable.remaining(): 对于这个新的 slice 视图,remaining() 的计算公式依然是 limit - position。
所以,remaining() = buffer.getSize() - 0 = buffer.getSize()。
继承与构造
extends AbstractFileIOChannel implements BufferFileWriter: 它继承了AbstractFileIOChannel,因此自动获得了文件生命周期管理(打开、关闭、删除、获取大小等)的通用能力。同时,它实现了BufferFileWriter接口,承诺提供写Buffer的具体方法。super(channelID, true): 在构造函数中,它调用父类的构造方法,并将writeEnabled参数设置为true。这意味着它会以**读写模式("rw")**打开底层文件,为写入数据做好了准备。
writeBlock 是该类的核心,定义了将一个 Buffer 写入文件的具体格式和逻辑。
-
获取数据视图:
ByteBuffer nioBufferReadable = buffer.getMemorySegment().wrap(0, buffer.getSize()).slice();- 从传入的
Buffer对象中获取其底层的MemorySegment。 wrap(0, buffer.getSize())创建一个ByteBuffer,它仅仅包装了Buffer中有效数据部分(从0到size)的内存。.slice()创建一个独立的视图,拥有自己的 position 和 limit,确保后续操作的隔离性。
- 从传入的
-
准备长度头:
ByteBuffer header = ByteBuffer.allocateDirect(4);: 创建一个4字节的ByteBuffer用于存放数据块的长度。使用直接内存(Direct Buffer)可以提高I/O效率。header.putInt(nioBufferReadable.remaining());: 将nioBufferReadable中剩余的字节数(也就是buffer.getSize())作为一个整数写入header。header.flip();: 将header从写模式切换到读模式,准备将其内容写入文件通道。
-
写入文件:
FileIOUtils.writeCompletely(fileChannel, header);: 将4字节的长度头完全写入文件。FileIOUtils.writeCompletely(fileChannel, nioBufferReadable);: 接着将实际的数据块完全写入文件。
public static void writeCompletely(WritableByteChannel channel, ByteBuffer src)throws IOException {while (src.hasRemaining()) {channel.write(src);}}
经过 BufferFileWriterImpl 的处理,磁盘上存储的文件格式非常清晰,即一系列连续的 [长度][数据] 块:
[4-byte-length-1][data-1][4-byte-length-2][data-2]...
BufferFileReaderImpl:从文件读取 Buffer
这个类与 BufferFileWriterImpl 相对应,负责从文件中按照约定的格式读取数据并填充到 Buffer 对象中。
public class BufferFileReaderImpl extends AbstractFileIOChannel implements BufferFileReader {private final BufferFileChannelReader reader;private boolean hasReachedEndOfFile;public BufferFileReaderImpl(ID channelID) throws IOException {super(channelID, false);this.reader = new BufferFileChannelReader(fileChannel);}@Overridepublic void readInto(Buffer buffer) throws IOException {hasReachedEndOfFile = reader.readBufferFromFileChannel(buffer);}@Overridepublic boolean hasReachedEndOfFile() {return hasReachedEndOfFile;}
}
继承与构造
extends AbstractFileIOChannel implements BufferFileReader: 同样继承自AbstractFileIOChannel,并实现了BufferFileReader接口。super(channelID, false): 调用父类构造函数时,writeEnabled为false,因此文件以**只读模式("r")**打开。this.reader = new BufferFileChannelReader(fileChannel);: 这是设计的关键点。它没有自己实现复杂的读取逻辑,而是创建了一个BufferFileChannelReader的实例,并将自己的fileChannel传递给它。这是一种组合优于继承的设计模式,将具体的读取任务委托给了辅助类reader。
核心方法
readInto(Buffer buffer): 当需要读取数据时,它直接调用reader.readBufferFromFileChannel(buffer)。BufferFileChannelReader会负责处理[长度][数据]格式的解析,并将读取的数据填充到传入的buffer中。该方法返回一个布尔值,表示是否到达了文件末尾,BufferFileReaderImpl将这个结果保存在hasReachedEndOfFile字段中。hasReachedEndOfFile(): 这个方法返回上一次readInto操作后文件的状态。调用者通常在一个循环中调用readInto,然后通过hasReachedEndOfFile来判断是否应该终止循环。
ChannelReaderInputView
ChannelReaderInputView 是 Paimon 磁盘 I/O 模块中一个至关重要的组件。它的核心作用是提供一个从磁盘文件读取数据并进行解压的视图(View)。它专门用于读取由其配对类 ChannelWriterOutputView 写入的数据。在外部排序、数据溢出(Spilling)等场景下,当数据被压缩并分块写入临时文件后,就由 ChannelReaderInputView 负责高效地将这些数据读回内存。
ChannelReaderInputView 继承自 AbstractPagedInputView。这是一个关键的设计决策,意味着它是一个基于页(Page-Based)的输入视图。
- 视图(View): 它本身不存储所有数据,而是提供了一个访问底层数据流(这里是磁盘文件)的接口。
- 页(Paged): 它不是一次性将整个文件读入内存,而是按需、一页(在这里是一个解压后的数据块)一页地加载。当上层消费者(如序列化器)读完当前内存页的数据后,
ChannelReaderInputView会自动从磁盘加载并解压下一个数据块,对上层调用者透明。
这种设计极大地提高了内存使用效率,使得处理远大于内存的磁盘文件成为可能。
核心属性与构造函数
public class ChannelReaderInputView extends AbstractPagedInputView {private final BlockDecompressor decompressor;private final BufferFileReader reader;private final MemorySegment uncompressedBuffer;private final MemorySegment compressedBuffer;private int numBlocksRemaining;private int currentSegmentLimit;public ChannelReaderInputView(FileIOChannel.ID id,IOManager ioManager,BlockCompressionFactory compressionCodecFactory,int compressionBlockSize,int numBlocks)throws IOException {this.numBlocksRemaining = numBlocks;this.reader = ioManager.createBufferFileReader(id);uncompressedBuffer = MemorySegment.wrap(new byte[compressionBlockSize]);decompressor = compressionCodecFactory.getDecompressor();compressedBuffer =MemorySegment.wrap(new byte[compressionCodecFactory.getCompressor().getMaxCompressedSize(compressionBlockSize)]);}
//...
}
reader: 一个BufferFileReader实例,是真正执行文件读取操作的对象。decompressor: 块解压器,用于将从磁盘读出的压缩数据块解压。compressedBuffer: 一个MemorySegment,用作临时缓冲区,存放从磁盘直接读出的、未经解压的原始数据块。uncompressedBuffer: 另一个MemorySegment,用于存放解压后的数据。这个缓冲区是真正暴露给上层消费者的数据页。numBlocksRemaining: 记录文件中还剩下多少个数据块未读取,用于判断是否到达文件末尾。currentSegmentLimit: 记录当前uncompressedBuffer中有效数据的长度。因为解压后的大小不一定等于缓冲区大小。
构造函数负责初始化这些组件,包括通过 IOManager 创建文件读取器、根据压缩算法和块大小分配好压缩和解压所需的内存缓冲区。
nextSegment(MemorySegment current)
这是实现“页式读取”的核心方法,继承自 AbstractPagedInputView。当上层调用者(如 BinaryRowSerializer)消费完当前 uncompressedBuffer 里的数据后,AbstractPagedInputView 的内部逻辑会自动调用此方法来获取下一页数据。
// ... existing code ...@Overrideprotected MemorySegment nextSegment(MemorySegment current) throws IOException {// 1. 检查是否已读完所有块if (this.numBlocksRemaining <= 0) {this.reader.close();throw new EOFException();}// 2. 从文件读取一个压缩块到 compressedBufferBuffer buffer = Buffer.create(compressedBuffer);reader.readInto(buffer);// 3. 解压数据this.currentSegmentLimit =decompressor.decompress(buffer.getMemorySegment().getArray(),0,buffer.getSize(),uncompressedBuffer.getArray(),0);// 4. 更新计数并返回解压后的数据页this.numBlocksRemaining--;return uncompressedBuffer;}@Overrideprotected int getLimitForSegment(MemorySegment segment) {return currentSegmentLimit;}
// ... existing code ...
工作流程:
- 检查文件末尾: 首先检查
numBlocksRemaining,如果已为0,说明所有数据块都已读取,关闭文件并抛出EOFException(文件结束异常)。 - 读取压缩块: 调用
reader.readInto(buffer)从磁盘文件读取下一个数据块,存入compressedBuffer。 - 解压: 调用
decompressor.decompress(),将compressedBuffer中的数据解压到uncompressedBuffer中。该方法返回解压后数据的实际字节数,这个值被保存在currentSegmentLimit中。 - 返回新页: 将
uncompressedBuffer作为新的数据页返回给AbstractPagedInputView的基类逻辑,供上层继续消费。同时将剩余块数减一。
getLimitForSegment 方法则简单地返回 currentSegmentLimit,告诉消费者当前页的有效数据边界。
内部迭代器:BinaryRowChannelInputViewIterator
为了方便上层直接以对象为单位进行迭代,ChannelReaderInputView 提供了一个内部类迭代器。
// ... existing code ...private class BinaryRowChannelInputViewIterator implements MutableObjectIterator<BinaryRow> {protected final BinaryRowSerializer serializer;public BinaryRowChannelInputViewIterator(BinaryRowSerializer serializer) {this.serializer = serializer;}@Overridepublic BinaryRow next(BinaryRow reuse) throws IOException {try {// 关键调用:从页式视图中反序列化return this.serializer.deserializeFromPages(reuse, ChannelReaderInputView.this);} catch (EOFException e) {close();return null;}}
// ... existing code ...}
// ... existing code ...
这个迭代器的 next 方法是整个机制协同工作的体现:
- 它调用
serializer.deserializeFromPages(),并把ChannelReaderInputView自身(ChannelReaderInputView.this)作为数据源传入。 serializer会从这个view中读取字节来构建BinaryRow对象。- 当
serializer读取时跨越了当前数据页(uncompressedBuffer)的边界,view的底层逻辑会自动触发nextSegment()方法,无缝地从磁盘加载并解压下一个数据块。 - 这个过程对
serializer和迭代器的调用者来说是完全透明的,它们感觉就像在操作一个连续的内存流。 - 当
nextSegment()抛出EOFException时,迭代器捕获它,调用close()关闭资源,并返回null,表示迭代结束。
总结
ChannelReaderInputView 是一个设计精巧的磁盘数据读取器。它通过继承 AbstractPagedInputView 实现了页式按需加载,通过组合 BufferFileReader 和 BlockDecompressor 实现了带缓冲的块读取和解压,并通过内部的 BinaryRowChannelInputViewIterator 提供了对上层友好的对象迭代接口。它与其搭档 ChannelWriterOutputView 共同构成了 Paimon 高效、可靠的磁盘溢出(Spilling)机制的基石。
ChannelWriterOutputView
ChannelWriterOutputView 是 ChannelReaderInputView 的配对类,在 Paimon 的磁盘 I/O 体系中扮演着数据写入方的角色。它的核心职责是:接收上层传入的序列化数据,将其缓存、压缩,并以数据块(Block)的形式高效地写入磁盘文件。它是在外部排序、数据溢出(Spilling)等需要将大量数据暂存到磁盘的场景下的关键执行者。
ChannelWriterOutputView 继承自 AbstractPagedOutputView 并实现了 Closeable 接口。
AbstractPagedOutputView: 这个继承关系表明它是一个基于页(Page-Based)的输出视图。上层调用者(如BinaryRowSerializer)向它写入数据时,实际上是写入到一个内存页(MemorySegment)中。当这个内存页被写满时,AbstractPagedOutputView的内部机制会自动调用子类实现的nextSegment方法,将写满的页进行处理(在这里是压缩并写入磁盘),然后提供一个新的空页(或清空旧页)供上层继续写入。这个过程对上层是透明的。Closeable: 实现了这个接口,意味着它管理着需要被显式关闭的资源(主要是文件句柄),调用者必须在使用完毕后调用close()方法来确保数据被完全刷盘并且资源得到释放。
核心属性与构造函数
public final class ChannelWriterOutputView extends AbstractPagedOutputView implements Closeable {private final MemorySegment compressedBuffer;private final BlockCompressor compressor;private final BufferFileWriter writer;private int blockCount;// ... 其他统计属性 ...public ChannelWriterOutputView(BufferFileWriter writer,BlockCompressionFactory compressionCodecFactory,int compressionBlockSize) {// 1. 调用父类构造函数,初始化用于接收数据的内存页super(MemorySegment.wrap(new byte[compressionBlockSize]), compressionBlockSize);// 2. 初始化压缩器和压缩缓冲区compressor = compressionCodecFactory.getCompressor();compressedBuffer =MemorySegment.wrap(new byte[compressor.getMaxCompressedSize(compressionBlockSize)]);// 3. 保存文件写入器this.writer = writer;}// ...
}
writer: 一个BufferFileWriter实例,是真正执行文件块写入操作的对象。compressor: 块压缩器,用于在数据写入磁盘前进行压缩。compressedBuffer: 一个MemorySegment,用作临时缓冲区,存放压缩后的数据,然后再将这块数据写入文件。currentSegment(继承自父类): 一个MemorySegment,这是暴露给上层的数据写入缓冲区,存放未经压缩的原始序列化数据。blockCount,numBytes,numCompressedBytes: 用于统计写入的块数、原始字节数和压缩后字节数,便于监控和调试。
构造函数流程:
- 调用父类
AbstractPagedOutputView的构造函数,创建一个大小为compressionBlockSize的MemorySegment作为初始的写入缓冲区(currentSegment)。 - 根据传入的压缩工厂创建具体的
BlockCompressor。 - 创建一个
compressedBuffer,其大小要能容纳一个块在最坏情况下的压缩结果。 - 保存传入的
BufferFileWriter实例。
nextSegment
这是实现“页式写入”的核心方法,由父类 AbstractPagedOutputView 在当前页写满时自动调用。
// ... existing code ...@Overrideprotected MemorySegment nextSegment(MemorySegment current, int positionInCurrent)throws IOException {// 1. 将写满的当前页进行压缩并写入磁盘writeCompressed(current, positionInCurrent);// 2. 返回同一个页,父类逻辑会将其清空(重置position)return current;}
// ... existing code ...
工作流程:
- 当父类检测到
currentSegment已满时,会调用此方法,并传入当前页current和已写入的数据量positionInCurrent。 - 方法内部直接调用
writeCompressed方法,完成压缩和刷盘的动作。 - 它返回了同一个
MemorySegment实例。父类AbstractPagedOutputView接收到后,会重置它的写入位置指针(positionInSegment),使其可以被重新写入,从而实现了内存页的复用。
writeCompressed(MemorySegment current, int size)
这是一个私有辅助方法,封装了压缩和写入的核心逻辑。
// ... existing code ...private void writeCompressed(MemorySegment current, int size) throws IOException {// 1. 压缩数据int compressedLen =compressor.compress(current.getArray(), 0, size, compressedBuffer.getArray(), 0);// 2. 将压缩后的数据块写入文件writer.writeBlock(Buffer.create(compressedBuffer, compressedLen));// 3. 更新统计信息blockCount++;numBytes += size;numCompressedBytes += compressedLen;}
// ... existing code ...
工作流程:
- 调用
compressor.compress(),将current页中size大小的数据进行压缩,结果存入compressedBuffer。 - 调用
writer.writeBlock(),将compressedBuffer中有效长度为compressedLen的数据作为一个完整的块写入底层文件。 - 更新相关的统计计数器。
close() 方法确保所有缓冲的数据都被最终写入文件。
// ... existing code ...@Overridepublic void close() throws IOException {if (!writer.isClosed()) {// 1. 获取当前页中剩余未写满的数据量int currentPositionInSegment = getCurrentPositionInSegment();// 2. 将这最后的不完整的一页数据也压缩并写入writeCompressed(currentSegment, currentPositionInSegment);// 3. 清理状态并关闭文件写入器clear();this.writeBytes = writer.getSize();this.writer.close();}}
// ... existing code ...
工作流程:
- 检查写入器是否已关闭,防止重复关闭。
- 当
close()被调用时,当前写入页currentSegment中很可能还有一部分数据,但并未写满。 getCurrentPositionInSegment()获取这部分数据的实际大小。- 调用
writeCompressed()将这最后一个“不完整”的块进行压缩和刷盘。这是非常关键的一步,确保了数据不丢失。 - 调用父类的
clear()方法清理内部状态,并最终关闭底层的writer,释放文件句柄。
总结
ChannelWriterOutputView 通过与 AbstractPagedOutputView 的精妙配合,为上层提供了一个看似连续、简单的 DataOutputView 写入接口。其内部则高效地完成了缓冲、成块、压缩、刷盘这一系列复杂操作。它和 ChannelReaderInputView 一起,构成了 Paimon 系统中一个高性能、支持压缩、对内存友好的磁盘 I/O 子系统,是实现大规模数据处理(如外部排序)不可或缺的基础设施。
ChannelWithMeta 类:磁盘文件的元数据
当内存中的数据(通常存放在 Buffer 中)因为内存不足而被溢出(spill)到磁盘时,就形成了一个临时的物理文件。ChannelWithMeta 就是用来描述这个磁盘文件的元数据信息。
public class ChannelWithMeta {private final FileIOChannel.ID channel;private final int blockCount;private final long numBytes;public ChannelWithMeta(FileIOChannel.ID channel, int blockCount, long numEstimatedBytes) {this.channel = channel;this.blockCount = blockCount;this.numBytes = numEstimatedBytes;}public FileIOChannel.ID getChannel() {return channel;}public int getBlockCount() {return blockCount;}public long getNumBytes() {return numBytes;}
}
核心设计与属性
- 不可变性 (Immutability):
ChannelWithMeta是一个典型的不可变数据对象(DTO)。所有字段都是final的,只能在构造时赋值。这使得它在多线程环境中传递和共享是完全安全的。 private final FileIOChannel.ID channel;: 它不持有FileChannel或FileIOChannel等重量级的、包含操作系统资源的对象,而是持有一个轻量级的ID。这是一个非常重要的设计,它将元数据与实际的 I/O 资源解耦。系统可以仅凭这个ID,在需要时通过IOManager重新打开对应的文件通道。private final int blockCount;: 记录了文件中包含了多少个数据块。因为 Paimon 的溢出文件是按块(Block)写入的,这个信息对于后续的读取和归并操作很有用。private final long numBytes;: 记录了文件的总字节数。
应用场景
ChannelWithMeta 通常作为文件写入操作的返回值。例如,在外部排序中,当多个已排序的小文件被归并成一个更大的文件时,归并方法会返回一个 ChannelWithMeta 对象来描述这个新生成的大文件。
// ... existing code ...private ChannelWithMeta mergeChannels(List<ChannelWithMeta> channelIDs) throws IOException {
// ... existing code ...// ... a lot of logic to merge channels ...return new ChannelWithMeta(mergedChannelID, numBlocksWritten, output.getWriteBytes());}
// ... existing code ...
FileChannelManagerImpl
FileChannelManagerImpl 是 Paimon I/O 体系中负责管理临时文件(Spill Files)的后台服务。在数据密集型计算中,当内存不足以容纳所有待处理数据时(例如大规模排序、聚合或Join操作),系统需要将部分数据“溢出(spill)”到磁盘上的临时文件中。FileChannelManagerImpl 的核心职责就是创建、管理和清理这些临时文件所在的目录和文件句柄。
FileChannelManagerImpl 实现了 FileChannelManager 接口,其设计目标是:
- 管理临时目录: 接收一组基础临时目录路径,并在其下创建唯一的、隔离的子目录,用于存放本次任务生命周期内的所有临时文件。
- 分发文件句柄: 以循环(Round-Robin)的方式将新的临时文件请求分发到不同的临时目录中,以实现 I/O 负载均衡。
- 生成唯一ID: 为每个临时文件生成一个唯一的
FileIOChannel.ID。这个ID是一个轻量级的句柄,包含了文件的完整路径和一些元信息,而不是一个打开的文件描述符。 - 生命周期管理与清理: 在任务结束时,负责可靠地删除所有创建的临时子目录及其中的文件,回收磁盘空间。
构造与初始化
// ... existing code ...
public class FileChannelManagerImpl implements FileChannelManager {
// ... existing code .../** The temporary directories for files. */private final File[] paths;// ... existing code .../** The number of the next path to use. */private final AtomicLong nextPath = new AtomicLong(0);public FileChannelManagerImpl(String[] tempDirs, String prefix) {checkNotNull(tempDirs, "The temporary directories must not be null.");checkArgument(tempDirs.length > 0, "The temporary directories must not be empty.");this.random = new Random();// Creates directories after registering shutdown hook to ensure the directories can be// removed if required.this.paths = createFiles(tempDirs, prefix);}private static File[] createFiles(String[] tempDirs, String prefix) {List<File> filesList = new ArrayList<>();for (int i = 0; i < tempDirs.length; i++) {File baseDir = new File(tempDirs[i]);String subfolder = String.format("paimon-%s-%s", prefix, UUID.randomUUID());File storageDir = new File(baseDir, subfolder);if (!storageDir.exists() && !storageDir.mkdirs()) {LOG.warn("Failed to create directory {}, temp directory {} will not be used",storageDir.getAbsolutePath(),tempDirs[i]);continue;}filesList.add(storageDir);
// ... existing code ...}
// ... existing code ...return filesList.toArray(new File[0]);}
// ... existing code ...
- 输入: 构造函数接收两个参数:
String[] tempDirs: 一个字符串数组,包含了用户配置的一个或多个基础临时目录路径(例如"/tmp/paimon1", "/data/paimon_tmp")。String prefix: 一个前缀字符串,用于构建子目录名,通常与任务或作业相关,便于识别。
- 初始化过程 (
createFiles方法):- 遍历用户提供的每个基础临时目录。
- 在每个基础目录下,构建一个唯一的子目录名。格式为
paimon-<prefix>-<UUID>。使用UUID确保了即使在同一台机器上同时运行多个任务,它们的临时文件目录也不会冲突。 - 尝试创建这个子目录。如果创建失败(例如因为权限问题),则会打印警告并跳过该目录。
- 将所有成功创建的子目录(
File对象)存储在private final File[] paths;数组中。这个数组是后续所有操作的基础。 - 如果最终没有任何一个目录可用,会抛出
RuntimeException。
文件通道ID的创建
这是 FileChannelManagerImpl 最核心的运行时功能。当系统的某个部分(如 IOManager)需要一个新的临时文件时,它会调用 createChannel。
// ... existing code ...@Overridepublic ID createChannel() {int num = (int) (nextPath.getAndIncrement() % paths.length);return new ID(paths[num], num, random);}@Overridepublic ID createChannel(String prefix) {int num = (int) (nextPath.getAndIncrement() % paths.length);return new ID(paths[num], num, prefix, random);}
// ... existing code ...
- 负载均衡:
nextPath.getAndIncrement() % paths.length这一行代码是实现负载均衡的关键。nextPath是一个AtomicLong,保证了在多线程环境下的原子性自增。- 通过取模(
%)运算,可以确保每次调用都从paths数组中循环选择下一个目录。如果配置了多个临时目录(比如分别在不同的物理磁盘上),这种循环策略可以将 I/O 请求均匀地分散到这些磁盘上,避免单个磁盘成为瓶颈。
- 返回
FileIOChannel.ID: 它并不直接创建文件或返回一个打开的FileChannel。而是返回一个轻量级的ID对象。这个ID对象封装了文件的预期路径(paths[num])和用于生成唯一文件名的随机数生成器等信息。真正的文件创建和I/O操作会由后续的BufferFileWriter等组件在需要时执行。这种延迟创建(Lazy Creation)的设计避免了不必要的系统资源占用。
public ID(File basePath, int bucketNum, String prefix, Random random) {this.path = new File(basePath, prefix + "-" + randomString(random) + ".channel");this.bucketNum = bucketNum;}
清理与资源回收
FileChannelManagerImpl 实现了 AutoCloseable 接口,意味着它管理的资源需要在生命周期结束时被明确释放。
// ... existing code .../** Remove all the temp directories. */@Overridepublic void close() throws Exception {IOUtils.closeAll(Arrays.stream(paths).filter(File::exists).map(this::getFileCloser).collect(Collectors.toList()));}private AutoCloseable getFileCloser(File path) {return () -> {try {FileIOUtils.deleteDirectory(path);LOG.info("FileChannelManager removed spill file directory {}",path.getAbsolutePath());} catch (IOException e) {String errorMessage =String.format("FileChannelManager failed to properly clean up temp file directory: %s",path);throw new UncheckedIOException(errorMessage, e);}};}
}
close()方法: 这是资源清理的入口。- 实现方式:
- 它遍历在构造时创建的所有临时子目录(
paths数组)。 - 对于每个存在的目录,调用
getFileCloser方法创建一个AutoCloseable的 lambda 表达式。 - 这个 lambda 表达式的核心是调用
FileIOUtils.deleteDirectory(path),该方法会递归地删除整个子目录及其包含的所有临时文件。 - 最后,使用
IOUtils.closeAll来执行所有这些AutoCloseable对象,确保即使其中一个删除失败,也会尝试删除其他的。
- 它遍历在构造时创建的所有临时子目录(
这种设计确保了任务无论正常结束还是异常终止,只要 close() 方法被调用(通常在 finally 块中),所有产生的临时文件和目录都会被清理干净,防止磁盘空间泄漏。
总结
FileChannelManagerImpl 是 Paimon I/O 子系统中一个健壮、高效的后台管家。它通过管理临时目录、循环分发文件ID和可靠的生命周期清理三大核心功能,为上层的数据溢出和外部排序等操作提供了稳定可靠的磁盘存储基础。其设计体现了负载均衡、延迟创建和资源安全回收等重要的工程实践。
IOManagerImpl
IOManagerImpl 是 Paimon I/O 体系的核心门面(Facade),为上层应用提供统一的、简化的 I/O 服务接口。
IOManagerImpl 实现了 IOManager 接口,其在系统中的角色可以概括为:
- I/O 服务总入口: 它是系统中所有需要进行临时文件读写操作的组件的统一入口点。其他组件(如排序器
MergeSorter)不直接与FileChannelManager或具体的BufferFileReader/Writer实现打交道,而是只依赖IOManager接口。 - 封装与解耦: 它封装了底层
FileChannelManager的复杂性。IOManager的使用者无需关心临时目录的创建、负载均衡和清理等细节,只需调用简单的方法即可获得所需服务。这是一种典型的 门面模式(Facade Pattern) 应用,降低了系统各模块间的耦合度。 - 资源生命周期管理: 它持有
FileChannelManager的实例,并负责在自身生命周期结束时(调用close()方法)触发底层资源的清理。
构造与初始化
// ... existing code ...
public class IOManagerImpl implements IOManager {protected static final Logger LOG = LoggerFactory.getLogger(IOManager.class);private static final String DIR_NAME_PREFIX = "io";private final String[] tempDirs;private final FileChannelManager fileChannelManager;// ... existing code ...public IOManagerImpl(String... tempDirs) {this.tempDirs = tempDirs;this.fileChannelManager =new FileChannelManagerImpl(Preconditions.checkNotNull(tempDirs), DIR_NAME_PREFIX);if (LOG.isInfoEnabled()) {LOG.info("Created a new {} for spilling of task related data to disk (joins, sorting, ...). Used directories:\n\t{}",FileChannelManager.class.getSimpleName(),Arrays.stream(fileChannelManager.getPaths()).map(File::getAbsolutePath).collect(Collectors.joining("\n\t")));}}
// ... existing code ...
- 构造器: 接受一个字符串数组
tempDirs作为参数,这代表了用户配置的用于溢出(spill)数据的基础临时目录。 - 核心动作:
- 实例化
FileChannelManager: 在构造函数内部,它立即创建了一个FileChannelManagerImpl的实例。这是整个类实现功能的核心,IOManagerImpl的大部分方法实际上都是对fileChannelManager相应方法的直接委托(delegation)。 - 传递参数: 它将接收到的
tempDirs和一个内部定义的常量前缀DIR_NAME_PREFIX("io") 传递给FileChannelManagerImpl的构造函数,由后者完成实际的临时子目录创建工作。 - 日志记录: 创建成功后,它会打印一条详细的 INFO 级别日志,列出所有实际被创建并正在使用的临时目录的绝对路径。这对于调试和监控系统运行状态非常有用。
- 实例化
委托方法 (Delegation Methods)
这些方法直接将调用转发给内部的 fileChannelManager 实例,充当一个透明的代理。
// ... existing code .../** Removes all temporary files. */@Overridepublic void close() throws Exception {fileChannelManager.close();}@Overridepublic ID createChannel() {return fileChannelManager.createChannel();}@Overridepublic ID createChannel(String prefix) {return fileChannelManager.createChannel(prefix);}
// ... existing code ...
close(): 调用fileChannelManager.close()来触发临时目录的递归删除。createChannel(): 调用fileChannelManager.createChannel()来获取一个新的、唯一的、经过负载均衡的临时文件ID。
通过这种委托,IOManagerImpl 将底层实现的细节完全隐藏起来。
工厂方法 (Factory Methods)
这是 IOManagerImpl 作为 I/O 服务门面的关键体现。它提供了创建具体文件读写器的工厂方法。
// ... existing code ...@Overridepublic BufferFileWriter createBufferFileWriter(FileIOChannel.ID channelID) throws IOException {return new BufferFileWriterImpl(channelID);}@Overridepublic BufferFileReader createBufferFileReader(FileIOChannel.ID channelID) throws IOException {return new BufferFileReaderImpl(channelID);}
// ... existing code ...
createBufferFileWriter(ID channelID): 接收一个文件ID,然后返回一个BufferFileWriter的实例(具体为BufferFileWriterImpl)。调用者拿到这个 writer 后,就可以向这个ID对应的文件中写入数据块了。createBufferFileReader(ID channelID): 接收一个文件ID,然后返回一个BufferFileReader的实例(具体为BufferFileReaderImpl)。调用者拿到这个 reader 后,就可以从这个ID对应的文件中读取数据块。
这些工厂方法的作用是:
- 解耦: 上层应用代码只依赖
BufferFileWriter和BufferFileReader这两个接口,而不需要知道具体的实现是BufferFileWriterImpl还是BufferFileReaderImpl。 - 集中创建逻辑: 将对象的创建逻辑集中在
IOManagerImpl中,如果未来需要更换实现(比如增加一个异步的AsyncBufferFileWriterImpl),只需要修改这个工厂方法即可,对上层代码无影响。
静态工具方法
// ... existing code ...public static void deleteChannel(ID channel) {if (channel != null) {if (channel.getPathFile().exists() && !channel.getPathFile().delete()) {LOG.warn("IOManager failed to delete temporary file {}", channel.getPath());}}}
// ... existing code ...public static String[] splitPaths(@Nonnull String separatedPaths) {return separatedPaths.length() > 0? separatedPaths.split(",|" + File.pathSeparator): new String[0];}
// ... existing code ...
deleteChannel(ID channel): 提供了一个静态的辅助方法,用于立即删除一个指定的临时文件。这在某些需要提前清理单个文件的场景下很有用。splitPaths(...): 一个非常实用的工具方法,用于解析包含多个路径的配置字符串。它能同时处理逗号(,)和系统默认路径分隔符(在Windows是;,在Linux是:)作为分隔符,增强了配置的灵活性。
SpillChannelManager
SpillChannelManager(溢出通道管理器)是一个专门为数据溢出(Spilling)场景设计的资源管理工具。在 Paimon 的外部排序(External Sort)等操作中,当内存不足时,会创建大量的临时文件(Spill Files)来存放中间数据。SpillChannelManager 的核心职责就是追踪和管理这些临时文件的生命周期,确保它们在不再需要时能够被可靠地清理。
与我们之前分析的 FileChannelManager 不同,SpillChannelManager 的作用范围更小,更具针对性。
FileChannelManager: 是一个全局的、服务性质的管理器,负责创建临时文件所在的目录,并以负载均衡的方式分发文件 ID。它管理的是“地皮”。SpillChannelManager: 是一个局部的、实例级别的管理器,通常在某个具体的操作(如一个MergeSorter实例)内部创建和使用。它不创建目录,也不生成 ID,而是记录由IOManager(间接通过FileChannelManager) 创建的那些临时文件,并负责在操作结束或重置时将它们删除。它管理的是“地皮”上的“建筑”。
它的核心职责可以概括为:
- 注册: 记录所有为当前操作创建的溢出文件。
- 状态跟踪: 区分文件是处于“已创建但未打开”状态,还是“已打开”状态。
- 清理: 提供一个统一的清理方法(
reset),用于关闭所有打开的文件句柄并删除所有相关的物理文件。
核心属性与状态管理
public class SpillChannelManager {private final HashSet<FileIOChannel.ID> channels;private final HashSet<FileIOChannel> openChannels;public SpillChannelManager() {this.channels = new HashSet<>(64);this.openChannels = new HashSet<>(64);}
//...
}
SpillChannelManager 内部通过两个 HashSet 来追踪不同状态的文件:
private final HashSet<FileIOChannel.ID> channels;: 这个集合存储的是已经创建但尚未打开的溢出文件。它存放的是轻量级的ID对象。当一个溢出文件被创建时(例如,一个内存中的 sort buffer 被写到磁盘),它的ID会被添加到这个集合中。private final HashSet<FileIOChannel> openChannels;: 这个集合存储的是当前正处于打开状态的文件通道。它存放的是重量级的FileIOChannel对象,这些对象持有实际的操作系统文件句柄。当需要读取一个溢出文件进行归并时,会打开它,并将其FileIOChannel对象放入此集合。
这种区分非常重要,因为它反映了溢出文件的两种不同生命周期阶段,并且清理逻辑也不同。
关键方法分析
所有的方法都使用了 synchronized 关键字,这表明 SpillChannelManager 被设计为在多线程环境下是安全的。在一个复杂的排序操作中,可能存在一个线程负责写溢出文件,而多个线程负责读溢出文件进行归并。
-
addChannel(FileIOChannel.ID id): 当一个溢出文件被成功写入磁盘后,它的ID会被此方法注册到channels集合中。这相当于说:“我产生了一个新的临时文件,请帮我记下来,以后要清理。” -
addOpenChannels(List<FileIOChannel> toOpen): 当需要读取一批溢出文件进行归并时,这些文件会被打开。此方法会将打开的FileIOChannel对象添加到openChannels集合中,并同时从channels集合中移除对应的ID。这个状态转移清晰地表明文件已经从“待处理”状态变为了“正在处理”状态。 -
removeChannel(FileIOChannel.ID id): 提供了一个手动移除追踪的方式。这可能用于某些特殊场景,比如一个溢出文件在归并后被立即删除,不再需要管理器后续统一清理。 -
reset(): 这是最核心的清理方法,它确保了“寸草不生”。// ... existing code ... public synchronized void reset() {for (Iterator<FileIOChannel> channels = this.openChannels.iterator();channels.hasNext(); ) {final FileIOChannel channel = channels.next();channels.remove();try {channel.closeAndDelete();} catch (Throwable ignored) {}}for (Iterator<FileIOChannel.ID> channels = this.channels.iterator(); channels.hasNext(); ) {final FileIOChannel.ID channel = channels.next();channels.remove();try {final File f = new File(channel.getPath());if (f.exists()) {f.delete();}} catch (Throwable ignored) {}} }它的逻辑分为两步,非常严谨:
- 清理打开的通道: 遍历
openChannels集合。对于每一个打开的FileIOChannel,调用其closeAndDelete()方法。这个方法会先关闭文件句柄,然后删除物理文件。这是最直接和高效的清理方式。 - 清理未打开的通道: 遍历
channels集合。对于每一个ID,通过其getPath()方法获取文件路径,然后创建一个File对象并尝试删除它。 - 异常处理: 所有的清理操作都被包裹在
try-catch(Throwable ignored)中。这是一个健壮性设计,确保即使某个文件删除失败(例如因为权限问题或文件被其他进程占用),也不会中断整个清理过程,管理器会继续尝试清理其他文件。
- 清理打开的通道: 遍历
应用场景
在 MergeSorter 中,SpillChannelManager 的使用非常典型。
- 当
MergeSorter需要将内存中的数据溢出到磁盘时,它会通过ioManager.createChannel()获取一个ID,然后创建一个BufferFileWriter将数据写入文件。写入成功后,这个ID会被添加到spillManager.addChannel(channel)。 - 当需要进行多路归并时,
MergeSorter会打开一批溢出文件,并将这些打开的FileIOChannel传递给spillManager.addOpenChannels(...)。 - 在
MergeSorter的close()方法中,会调用spillManager.reset(),确保所有为本次排序操作产生的临时文件都被彻底清理。
总结
SpillChannelManager 是一个专用于管理临时溢出文件生命周期的工具类。它通过区分“已创建”和“已打开”两种状态,并提供一个原子性的、健壮的 reset 方法,极大地简化了上层复杂操作(如外部排序)中的资源管理逻辑。它与 IOManager 和 FileChannelManager 形成了良好的分层协作:FileChannelManager 负责“圈地”,IOManager 负责提供统一的“建筑服务”,而 SpillChannelManager 则像一个“现场监工”,负责记录所有“建筑”并在工程结束后执行“拆除和清场”,确保不留下任何垃圾。
