当前位置: 首页 > news >正文

Hadoop MapOutputBuffer:Map高性能核心揭秘

MapOutputBuffer

MapOutputBuffer 是 MapOutputCollector 接口的一个实现。它的核心职责是:在内存中高效地收集、分区、排序 Mapper 的输出,并在内存不足时,将数据 溢写(Spill)到磁盘。当 Map 任务结束时,它还要负责将所有溢写文件合并(Merge) 成一个最终的、对 Reduce 任务友好的输出文件。

开发者通常不会直接与 MapOutputBuffer 交互。它的使用是框架自动完成的:

  1. 在 MapTask 的 runNewMapper 或 runOldMapper 方法中,框架会检查作业的 Reduce 任务数量 (job.getNumReduceTasks())。
  2. 如果存在 Reduce 任务(即 numReduceTasks > 0),框架就会创建一个 MapOutputBuffer 实例(通过 createSortingCollector 方法)。
  3. 这个 MapOutputBuffer 实例会被包装进一个 RecordWriter(例如 NewOutputCollector)。
  4. 这个 RecordWriter 会被传递给 Mapper 的 Context 对象。
  5. 当用户在 Mapper 代码中调用 context.write(key, value) 时,这个调用链最终会触发 MapOutputBuffer 的 collect 方法,从而将数据写入环形缓冲区。
// ... existing code ...// get an output objectif (job.getNumReduceTasks() == 0) {// 如果没有 Reducer,数据直接写入 HDFS,不使用 MapOutputBufferoutput = new NewDirectOutputCollector(taskContext, job, umbilical, reporter);} else {// 如果有 Reducer,则使用 NewOutputCollector,其内部封装了 MapOutputBufferoutput = new NewOutputCollector(taskContext, job, umbilical, reporter);}
// ... existing code ...
// 在 NewOutputCollector 的构造函数中,创建了 MapOutputBufferNewOutputCollector(/*...*/) throws IOException, ClassNotFoundException {collector = createSortingCollector(job, reporter); // collector 就是 MapOutputBuffer// ...}
// ... existing code ...
// 在 NewOutputCollector 的 write 方法中,调用了 MapOutputBuffer 的 collect@Overridepublic void write(K key, V value) throws IOException, InterruptedException {collector.collect(key, value,partitioner.getPartition(key, value, partitions));}
// ... existing code ...

核心实现机制:环形缓冲区与元数据

MapOutputBuffer 的设计精髓在于它对内存的极致利用。它不存储 Java 对象,而是将键值对序列化后存入一个巨大的字节数组 kvbuffer。这个数组被当作一个环形缓冲区使用。

为了快速排序和查找,它并没有在 kvbuffer 中移动庞大的序列化数据,而是采用了一种更聪明的方法:元数据与数据分离

  • kvbuffer (byte[]): 实际存储序列化后的 key/value 数据的字节数组。
  • kvmeta (IntBuffer): 这是一个 IntBuffer,它覆盖在 kvbuffer 的同一块内存上。它不存实际数据,而是存储每条记录的元数据

每条记录的元数据占用 NMETA (值为4) 个整数,也就是 METASIZE (值为16) 个字节。

// ... existing code ...private static final int VALSTART = 0;         // value 在 kvbuffer 中的起始位置private static final int KEYSTART = 1;         // key 在 kvbuffer 中的起始位置private static final int PARTITION = 2;        // 这条记录所属的分区号private static final int VALLEN = 3;           // value 的长度private static final int NMETA = 4;            // 元数据的整数个数private static final int METASIZE = NMETA * 4; // 元数据占用的字节数
// ... existing code ...

这个环形缓冲区由一系列指针管理,元数据区域和序列化数据区域从缓冲区的两端相向增长,最大化地利用空间。

kvmeta 和 kvbuffer 的关系:同一块内存,不同的视图

从类的成员变量定义来看,它们确实是两个独立的变量:

// ... existing code ...// k/v accountingprivate IntBuffer kvmeta; // metadata overlay on backing store// ...byte[] kvbuffer;        // main output buffer
// ... existing code ...

但是,关键在于它们的初始化过程。在 MapOutputBuffer 的 init 方法中,你会看到这样一段代码:

// ... existing code ...// buffers and accountingint maxMemUsage = sortmb << 20;maxMemUsage -= maxMemUsage % METASIZE;kvbuffer = new byte[maxMemUsage]; // 1. 首先,分配一块大的字节数组bufvoid = kvbuffer.length;kvmeta = ByteBuffer.wrap(kvbuffer) // 2. 然后,将这个字节数组包装成一个 ByteBuffer.order(ByteOrder.nativeOrder()).asIntBuffer();                 // 3. 最后,从这个 ByteBuffer 派生出一个 IntBuffer 视图setEquator(0);
// ... existing code ...

让我们一步步解析这个过程:

  1. kvbuffer = new byte[maxMemUsage];: 这里,程序向 JVM 申请了一大块连续的内存,并用一个字节数组 kvbuffer 来引用它。这块内存是所有数据存储的物理基础。

  2. ByteBuffer.wrap(kvbuffer)java.nio.ByteBuffer 是 Java NIO 库中的一个核心类,它可以将一个已有的 byte[] 数组“包装”起来,提供更丰富的读写操作。重要的是,wrap 操作不会复制数组,ByteBuffer 实例和原始的 kvbuffer 共享同一块底层内存。对 ByteBuffer 的任何修改都会直接反映在 kvbuffer 上,反之亦然。

  3. .asIntBuffer(): 这是最关键的一步。ByteBuffer 对象有一个 asIntBuffer() 方法,它可以创建一个 IntBuffer 视图(View)。这个 IntBuffer 同样不会分配新的内存,它和 ByteBuffer(也就是和 kvbuffer)共享同一块底层内存。

这种设计的优点是:

  • 节省内存:不需要为元数据额外分配内存空间。
  • 缓存友好:数据和它的元数据在物理上是相邻的,这有利于 CPU 缓存的命中率,提高访问速度。
  • 高效:通过 IntBuffer 直接以整数为单位读写元数据,比从字节数组中手动转换要快得多。

byte[] b0 = new byte[0]; 的作用

这个定义看起来非常奇怪:一个长度为0的、不可变的空字节数组。它的作用体现在 collect 方法的这段代码中:

// ... existing code ...// It's possible for records to have zero length, i.e. the serializer// will perform no writes. To ensure that the boundary conditions are// checked and that the kvindex invariant is maintained, perform a// zero-length write into the buffer. The logic monitoring this could be// moved into collect, but this is cleaner and inexpensive. For now, it// is acceptable.bb.write(b0, 0, 0);// the record must be marked after the preceding write, as the metadata// for this record are not yet writtenint valend = bb.markRecord();
// ... existing code ...

这里的 bb 是 BlockingBuffer 的实例,它是一个内部类,充当了向 kvbuffer 写入数据的代理。

核心目的处理值(value)为空或长度为0的特殊情况,并统一处理逻辑。

在 MapReduce 中,一个 <key, value> 对的 value 完全有可能是空的(例如,NullWritable)或者是一个空的 BytesWritable。当 valSerializer.serialize(value) 执行时,如果 value 是空的,它可能不会向底层的 kvbuffer 写入任何字节。

这会带来一个问题:MapOutputBuffer 的很多内部逻辑,特别是环形缓冲区的指针管理,都依赖于每次 write 操作来更新状态。如果一次 serialize 操作没有发生任何实际的写入,那么一些边界检查和指针更新的逻辑可能就不会被触发,从而导致状态不一致。

bb.write(b0, 0, 0); 的作用就是:

无论 valSerializer 是否真的写入了数据,代码都强制执行一次“写入长度为0”的操作。这个操作本身不会向 kvbuffer 添加任何字节,但它会触发 BlockingBuffer 内部的 write 方法。在这个方法里,可以统一处理环形缓冲区的边界检查、指针回绕(wrap-around)等逻辑。

通过这种方式,代码避免了为“零长度写入”编写一套单独的、复杂的 if-else 判断逻辑,使得 collect 方法的主流程更加清晰和健壮。定义一个 static final 的 b0 数组可以避免在每次调用时都创建一个新的空数组对象,是一种微小的性能优化。

简单来说,b0 是一个“占位符”,用于确保即使在写入空值的情况下,缓冲区的管理逻辑也能被正确地执行一次。


collect 方法:数据收集与溢写触发

collect 方法是数据进入缓冲区的入口。这是一个 synchronized 方法,保证了线程安全。

// ... existing code ...public synchronized void collect(K key, V value, final int partition) throws IOException {reporter.progress();// ... 类型和分区检查 ...checkSpillException(); // 检查后台溢写线程是否有异常bufferRemaining -= METASIZE; // 预留元数据空间if (bufferRemaining <= 0) {// 缓冲区空间不足,触发溢写判断逻辑spillLock.lock();try {do {if (!spillInProgress) {// ... 计算缓冲区使用情况 ...final boolean bufsoftlimit = bUsed >= softLimit;// ...if (bufsoftlimit && kvindex != kvend) {// 如果已用空间超过软限制 (io.sort.spill.percent, 默认80%)// 并且有数据可写,则启动溢写startSpill();// ... 重新计算剩余空间 ...}}} while (false);} finally {spillLock.unlock();}}// ... 序列化 key 和 value 到 kvbuffer ...// ... 将元数据 (partition, key_start, val_start, val_len) 写入 kvmeta ...// ... 更新 kvindex 和 bufindex 指针 ...}
// ... existing code ...

核心流程

  1. 检查空间:每次收集数据前,先检查剩余空间 bufferRemaining
  2. 触发溢写:当 bufferRemaining 小于等于0时,进入一个复杂的判断逻辑。如果当前没有正在进行的溢写,并且已用空间超过了 softLimit(由 mapreduce.map.sort.spill.percent 配置,默认 80%),就会调用 startSpill() 启动后台溢写线程。
  3. 序列化:将 key 和 value 对象序列化成字节,写入 kvbuffer
  4. 记录元数据:将这条记录的分区号、key 的起始位置、value 的起始位置和 value 的长度这四个整数写入 kvmeta
  5. 更新指针:移动 kvindex 和 bufindex 指针,标记新的数据边界。

SpillThread:后台排序与溢写

MapOutputBuffer 最重要的优化之一就是将排序和磁盘I/O操作放在一个独立的后台线程 SpillThread 中执行,这样 Mapper 的 collect 方法就不会因为磁盘操作而长时间阻塞,可以继续接收新的数据。

  1. 启动collect 方法调用 startSpill(),它会获取 spillLock 锁,然后唤醒正在等待 spillReady 条件的 SpillThread

  2. 排序SpillThread 醒来后,执行 sortAndSpill() 方法。该方法的第一步就是对内存中的数据进行排序。

    • MapOutputBuffer 实现了 IndexedSortable 接口,提供了 compare 和 swap 方法。
    • sorter.sort(...) 方法(默认是 QuickSort)只对 kvmeta 中的元数据进行排序,而不是移动 kvbuffer 中庞大的实际数据。
    • compare 方法的逻辑是排序的核心:首先按分区号(PARTITION)排序,如果分区号相同,再按 key 排序。key的比较是直接在 kvbuffer 字节数组上进行的,效率很高。
    // ... existing code ...@Overridepublic int compare(final int mi, final int mj) {// ... 获取两条记录的元数据 ...final int kvip = kvmeta.get(kvi + PARTITION);final int kvjp = kvmeta.get(kvj + PARTITION);// sort by partitionif (kvip != kvjp) {return kvip - kvjp;}// sort by keyreturn comparator.compare(kvbuffer,kvmeta.get(kvi + KEYSTART),kvmeta.get(kvi + VALSTART) - kvmeta.get(kvi + KEYSTART),kvbuffer,kvmeta.get(kvj + KEYSTART),kvmeta.get(kvj + VALSTART) - kvmeta.get(kvj + KEYSTART));}
    // ... existing code ...
    
  3. Combiner (可选):如果用户配置了 Combiner,在排序之后、写入磁盘之前,sortAndSpill 会对每个分区内排序好的数据运行 Combiner。这可以看作是一次 "mini-reduce",能有效减少写入磁盘和后续网络传输的数据量。

  4. 溢写到文件sortAndSpill 遍历排序后的元数据,将每个分区的数据顺序写入一个临时的溢写文件(spill file)。同时,它会为这个溢写文件生成一个 SpillRecord,记录下每个分区数据在这个文件中的偏移量、原始长度和压缩后长度等索引信息。


flush 和 close:最终合并

当 MapTask 处理完所有输入数据后,会调用 collector.close(),这会触发 MapOutputBuffer 的 flush() 和 close() 方法。

  • flush():
    1. 强制执行最后一次溢写,将缓冲区中剩余的所有数据都写入一个新的溢写文件。
    2. 等待 SpillThread 完全结束。
    3. 调用 mergeParts() (该方法在 MapTask 中,但由 MapOutputBuffer 的生命周期触发)。mergeParts 会将磁盘上所有的溢写文件进行多路归并排序,生成一个最终的、全局有序(分区内key有序)的输出文件 file.out 和一个总索引文件 file.out.index。这个索引文件将被 Reduce 任务用来确定需要拉取的数据范围。

总结

MapOutputBuffer 是 MapReduce 框架的性能心脏。它通过以下一系列精巧的设计,实现了在有限内存下对海量中间数据的高效处理:

  • 环形内存缓冲区:最大化利用配置的内存(io.sort.mb)。
  • 元数据与数据分离:通过只排序轻量级的元数据指针,极大地提高了内存排序的速度。
  • 后台溢写线程:将耗时的排序和I/O操作与 Mapper 的数据收集过程解耦,实现了并发执行。
  • 分区内排序:在 Map 端就完成了大部分排序工作,为 Reduce 端的归并排序奠定了基础。
  • 可选的 Combiner:在数据落盘前进行聚合,显著减少了 I/O 负载。

文章转载自:

http://DP1gkB7g.qwqzk.cn
http://TBK7nwEA.qwqzk.cn
http://7tgqUFkB.qwqzk.cn
http://LpOJugbQ.qwqzk.cn
http://6pUwC9qN.qwqzk.cn
http://nG8IQLWa.qwqzk.cn
http://CzSslswM.qwqzk.cn
http://NYy8ykrv.qwqzk.cn
http://NK0Kg3e5.qwqzk.cn
http://AKQMT6Lb.qwqzk.cn
http://aZXLsFyU.qwqzk.cn
http://zxvS5Z9Q.qwqzk.cn
http://DXyCY394.qwqzk.cn
http://P32ZPa0B.qwqzk.cn
http://KW8DpPI5.qwqzk.cn
http://YYuk4TCn.qwqzk.cn
http://FJP5K799.qwqzk.cn
http://HW4nERN0.qwqzk.cn
http://9X47UDpb.qwqzk.cn
http://3vJVwOdx.qwqzk.cn
http://reaG75gE.qwqzk.cn
http://M181s3Vu.qwqzk.cn
http://Y7QONBr7.qwqzk.cn
http://P5nkIeKy.qwqzk.cn
http://d54e84cK.qwqzk.cn
http://tUqgMZJ3.qwqzk.cn
http://dwyEP2sV.qwqzk.cn
http://4cIj3FoE.qwqzk.cn
http://77JcNqf0.qwqzk.cn
http://28nWrdvX.qwqzk.cn
http://www.dtcms.com/a/379662.html

相关文章:

  • Kubernetes 弹性伸缩:深入讲解 HPA 和 VPA
  • 代理服务器是什么?怎么选择?
  • java Redisson 实现限流每秒/分钟/小时限制N个请求 -V2.0
  • 高并发、低延迟全球直播系统架构
  • zookeeper是啥
  • 短波红外相机在机器视觉检测方向的应用
  • 阿里云国际代理:如何利用RDS构建高可用、可扩展的数据库架构
  • 【Python】通俗理解反向传播
  • RFID技术在半导体电子货架上的应用方案
  • Windows 安装 Redis 教程
  • CMake 全流程开发实战:从零开始掌握C++项目构建、测试到一键分发的完整解决方案​
  • 如果数据量小但是点击后需要获取的是最新的定位信息,这种时候采取什么策略最优?
  • 使用 Pyinstaller 打包 PPOCRLabel
  • 科技信息差(9.12)
  • 是德科技 | 关于AI 数据中心时代的光通信的精选问答
  • 深入剖析 Elasticsearch (ES) 的近实时搜索原理
  • Qt5 | TCP服务器开源模板工程实战
  • 飞鹤财报“新解”:科技筑牢护城河,寒冬凸显龙头“硬核力”
  • 第6.2节 Android Agent开发<一>
  • 【 C/C++ 算法】入门动态规划-----一维动态规划基础(以练代学式)
  • YOLOv8 从yaml配置文件生成PyTorch模型
  • 重复文件清理的标准化操作流程
  • Amazon DocumentDB Serverless 技术深度解析:架构特性、弹性扩缩容机制与实操指南
  • 项目管理方法适合什么类型的企业
  • HTTPS(Hypertext Transfer Protocol Secure,超文本传输安全协议)
  • 【LLM越狱】AI大模型DRA攻击解读与复现
  • k8s下的发布策略详解
  • 第 9 篇:深入浅出学 Java 语言(JDK8 版)—— 吃透泛型机制,筑牢 Java 类型安全防线
  • 机器人防爆与隔爆的本质,两者的区别对比
  • 从蛮力清扫到 “会看路”:室外清洁机器人的文明进阶