mapreduce源码解读
标题
- Hadoop MapReduce 从任务提交到运行的完整流程详解
- 1. 初始化与配置阶段
- 2. 输入分析与切分阶段
- 3. 作业提交与调度阶段
- 4. Map阶段详细执行
- 5. Shuffle阶段核心机制
- 6. Reduce阶段处理
- 实际案例:SQL查询在MapReduce中的数据流转
- 原始数据假设
- 阶段1: 初始化与配置阶段
- 阶段2: 输入分析与切分阶段
- 阶段3: 作业提交与调度阶段
- 阶段4: Map阶段详细执行
- 阶段5: Shuffle阶段核心机制
- 阶段6: Reduce阶段处理
- 最终结果输出
- 不同SQL函数在MapReduce中的实现策略
- 窗口函数类
- 1. ROW_NUMBER() 行号函数
- 2. RANK() 和 DENSE_RANK() 排名函数
- 3. LAG() 和 LEAD() 偏移函数
- 聚合函数类
- 4. SUM, AVG, MAX, MIN 基础聚合
- 过程记录
- deprecationContext
- writeSplits
- blocklocation
- splits的详细切块
- split的排序sort
- runNewMapper(一个split对应一个runnewmapper)
- 文件压缩reader
- linerecordreader读nextkeyvalue
- maptask.init
- collect方法
- 环形缓冲区
- 分配reduce
- shuffle过程-依赖reduce(一个reduce对应一个shuffle)
- runNewReducer
Hadoop MapReduce 从任务提交到运行的完整流程详解
1. 初始化与配置阶段
配置兼容性处理
- 系统启动时,DeprecationContext维护新老配置的映射关系,确保向后兼容
- 正向映射(老配置→废弃信息)和反向映射(新配置→老配置)双重保障
- 当用户使用老配置时自动映射到新配置,并给出警告提示
作业配置初始化
- 加载job.xml配置文件,设置输入输出路径、Mapper/Reducer类等
- 初始化各种计数器和状态监控器
- 配置压缩、序列化等相关参数
2. 输入分析与切分阶段
文件扫描与元数据收集
- 使用FileInputFormat扫描输入目录,获取所有输入文件
- 为每个文件创建DeprecatedRawLocalFileStatus,包含:
- 文件路径、大小(字节数)、修改时间
- 副本数、默认块大小(本地32MB)
- 权限信息、是否为目录等元数据
BlockLocation信息生成
- 本地文件系统:返回localhost信息,整个文件在本地
- HDFS分布式存储:记录每个块的副本位置(如host1:9866, host2:9866等)
- 纠删码文件:根据RS策略(如RS_3_2)分布到多个主机
Split切分策略
- 每个文件独立进行切分,本地默认按blockSize(32MB)切分
- 关键算法:
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) - SPLIT_SLOP=1.1,最后一个split范围在[blockSize, blockSize*1.1]
- 避免产生过小的split,提高处理效率
Split排序优化
- 使用TimSort算法对split按大小降序排列
- TimSort结合归并排序和插入排序,适应性强
- 大split优先处理,充分利用计算资源
3. 作业提交与调度阶段
调度框架选择
- 根据
mapreduce.framework.name配置选择执行模式 - Local模式:使用LocalJobRunner,单机执行,适合开发测试
- YARN模式:使用YARNRunner,分布式执行,适合生产环境
YARN模式提交流程
- 创建ApplicationSubmissionContext,包含:
- LocalResource:job.xml、job.jar、job.split等文件
- ApplicationMaster启动命令和环境变量
- 容器资源要求和安全令牌
- ResourceManager接收请求并分配AM容器
- 启动MRAppMaster协调整个作业执行
LocalJobRunner模式处理
- 读取SplitMetaInfo,确定Map任务数量
- 为每个Reduce任务创建线程(ReduceTaskRunnable)
- 创建mapOutputFiles映射表管理Map输出
4. Map阶段详细执行
任务初始化
- 每个Split对应一个Map任务(runNewMapper)
- 获取分区数量(等于Reduce数量),默认为1
- 初始化Partitioner(默认HashPartitioner):
(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks
LineRecordReader数据读取
- 初始化Split的起始和结束位置:start、end
- 压缩文件检测和处理:
- 可分割压缩(如bzip2):支持从中间位置读取
- 不可分割压缩(如gzip):必须从文件头开始读取
- Split边界处理:除第一个split外,其他split跳过第一行,避免重复读取
- 智能行读取:即使行结尾超出split边界,也会读取完整行
环形缓冲区机制
- 核心创新:kvbuffer字节数组同时存储数据和元数据
- 数据从equator向右增长,元数据从equator向左增长(反向)
- 默认缓冲区大小:100MB,软限制80%(80MB)
- 每条记录元数据16字节:分区号、key起始位置、value起始位置、value长度
数据序列化与存储
- Text序列化:长度(VInt编码)+ 内容字节
- IntWritable序列化:4字节大端序表示
- 序列化数据直接写入kvbuffer,元数据通过kvmeta(IntBuffer视图)管理
- 处理key跨边界情况:shiftBufferedKey重新排列或写出到磁盘
溢写触发与处理
- 触发条件:bufferRemaining <= 0 且 kvindex != kvend(有未溢写数据)
- 溢写过程:
- 使用QuickSort对元数据排序(先按分区号,再按key值)
- 为每个分区创建Writer,按顺序写入spill文件
- 如配置Combiner,先进行预聚合处理
- 记录每个分区在文件中的位置和大小(IndexRecord)
- 支持并发:主线程继续接收数据,后台SpillThread处理溢写
Map输出合并
- flush()处理剩余缓冲区数据
- mergeParts()合并所有spill文件详细流程:
1. 索引文件加载:indexCacheList.add(new SpillRecord(indexFileName, job))将所有spill文件的索引信息读入内存2. 按分区逐个合并:for (int parts = 0; parts < partitions; parts++) {// 为当前分区创建Segment列表List<Segment<K,V>> segmentList = new ArrayList<>();// 从每个spill文件提取当前分区的数据段for(int i = 0; i < numSpills; i++) {IndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);Segment<K,V> s = new Segment<K,V>(job, rfs, filename[i],indexRecord.startOffset, // 分区在文件中的起始位置indexRecord.partLength, // 分区数据长度codec, true);segmentList.add(i, s);}3. k路归并排序算法:RawKeyValueIterator kvIter = Merger.merge(segmentList, comparator)使用优先队列(PriorityQueue):- 初始化:每个Segment的第一个元素入队- 循环:取出最小元素,从对应Segment读取下一个元素入队- 直到所有Segment数据处理完毕时间复杂度:O(N log K),N是总记录数,K是spill文件数4. 写入最终输出文件:Writer<K, V> writer = new Writer<>(job, finalOutputFile, keyClass, valClass, codec);while (kvIter.next()) {writer.append(kvIter.getKey(), kvIter.getValue());}最终生成:part-m-00000(Map任务最终输出文件)
5. Shuffle阶段核心机制
Reduce任务初始化
- 创建ReduceTask,指定partition和Map任务数量
- 启动ShuffleConsumerPlugin开始数据拉取
并行数据拉取
- 启动多个Fetcher线程,并行从Map节点拉取对应分区数据
- Local模式:直接访问本地Map输出文件
- 分布式模式:通过HTTP协议从远程节点拉取
内存管理策略
- merger.reserve()决定使用内存还是磁盘存储:
- requestedSize > maxSingleShuffleLimit:使用磁盘
- usedMemory > memoryLimit:等待内存释放
- 默认优先使用内存存储
容错与重试机制
- 失败处理流程:
- hostFailed()标记主机失败
- copyFailed()增加失败计数
- putBackKnownMapOutput()任务重新排队
- penalize()主机进入惩罚期(指数退避:1.3^failures)
- Referee线程等待后解除惩罚
- 主机重新进入调度池
最终合并排序
- merger.close()触发finalMerge
- 首先合并内存中的output文件,写入磁盘
- 按文件大小对磁盘segments排序
- 使用k路归并算法生成最终有序数据流
6. Reduce阶段处理
阶段转换
- sortPhase.complete():排序阶段结束
- setPhase(TaskStatus.Phase.REDUCE):进入Reduce阶段
数据分组与处理
- RawKeyValueIterator提供按key分组的数据
- context.nextKey()逐个获取key组
- 调用用户自定义Reducer的reduce方法
- context.getCurrentKey()和context.getValues()提供当前key及其所有value
结果输出
- 通过OutputFormat写入最终结果
- 默认TextOutputFormat以文本格式输出
- 支持多种输出格式和压缩方式
实际案例:SQL查询在MapReduce中的数据流转
以SQL查询 select area, count(distinct mid) from tb group by area 为例,详细说明数据在6个阶段的流转过程:
原始数据假设
输入数据格式(tb表):
area1,mid001
area1,mid002
area1,mid001 # 重复
area2,mid003
area2,mid004
area1,mid005
area2,mid003 # 重复
area3,mid006
area3,mid007
area3,mid006 # 重复
阶段1: 初始化与配置阶段
SQL转换为MapReduce作业
- 查询解析器将SQL转换为MapReduce作业配置
- 设置Mapper输出:Key=area, Value=mid
- 设置Reducer逻辑:对每个area的mid去重并计数
- 配置分区策略:按area进行Hash分区
作业参数设置
- Input: /data/tb/
- Output: /result/area_distinct_count/
- Mapper Class: DistinctCountMapper
- Reducer Class: DistinctCountReducer
阶段2: 输入分析与切分阶段
文件扫描与Split切分
假设输入文件(YARN模式,HDFS默认块大小128MB):
/data/tb/part-00000 (200MB)
/data/tb/part-00001 (150MB)
/data/tb/part-00002 (135MB) # 在128MB*1.1=140.8MB范围内
/data/tb/part-00003 (100MB)Split切分结果(splitSize=128MB,SPLIT_SLOP=1.1):
Split-0: /data/tb/part-00000 [0, 134217728] # 128MB
Split-1: /data/tb/part-00000 [134217728, 209715200] # 72MB(剩余72MB < 128MB*1.1,合并到一个split)
Split-2: /data/tb/part-00001 [0, 134217728] # 128MB
Split-3: /data/tb/part-00001 [134217728, 157286400] # 22MB(剩余22MB < 128MB*1.1,合并到一个split)
Split-4: /data/tb/part-00002 [0, 141557760] # 135MB(整个文件在[128MB, 140.8MB]范围内,不切分)
Split-5: /data/tb/part-00003 [0, 104857600] # 100MB(整个文件小于128MB,作为一个split)
阶段3: 作业提交与调度阶段
YARN资源分配
- ApplicationMaster协调6个Map任务(对应6个Split)
- 设置Reduce任务数:2个Reducer(处理3个area)
- 分区策略:Hash(area) % 2
- Hash(“area1”) % 2 = 1 → Reducer-1
- Hash(“area2”) % 2 = 0 → Reducer-0
- Hash(“area3”) % 2 = 1 → Reducer-1
阶段4: Map阶段详细执行
Map任务1处理Split-0数据
输入数据:
area1,mid001
area1,mid002
area2,mid003
area3,mid006Mapper处理逻辑:
map("area1,mid001") -> emit(area1, mid001)
map("area1,mid002") -> emit(area1, mid002)
map("area2,mid003") -> emit(area2, mid003)
map("area3,mid006") -> emit(area3, mid006)环形缓冲区存储:
kvbuffer实际字节布局:
位置0-4: [5][a][r][e][a][1] // Text("area1")序列化
位置5-9: [7][m][i][d][0][0][1] // Text("mid001")序列化
位置10-14: [5][a][r][e][a][1] // Text("area1")序列化
位置15-19: [7][m][i][d][0][0][2] // Text("mid002")序列化
位置20-24: [5][a][r][e][a][2] // Text("area2")序列化
位置25-29: [7][m][i][d][0][0][3] // Text("mid003")序列化
位置30-34: [5][a][r][e][a][3] // Text("area3")序列化
位置35-39: [7][m][i][d][0][0][6] // Text("mid006")序列化kvmeta元数据(从高位向低位增长):
记录1: partition=1, keyStart=0, valStart=5, valLen=7 // area1,mid001
记录2: partition=1, keyStart=10, valStart=15, valLen=7 // area1,mid002
记录3: partition=0, keyStart=20, valStart=25, valLen=7 // area2,mid003
记录4: partition=1, keyStart=30, valStart=35, valLen=7 // area3,mid006
分区与排序
1. 溢写触发(bufferRemaining <= 0):spillLock.lock() - 上锁进入溢写流程2. QuickSort排序元数据(sorter.sort()):对kvmeta进行排序,排序规则:- 第一关键字:分区号(Partition)- 第二关键字:key值(按字典序)排序前kvmeta索引:[area2,mid003,partition:0] [area1,mid001,partition:1] [area1,mid002,partition:1] [area3,mid006,partition:1]排序后kvmeta索引:[area2,mid003,partition:0] [area1,mid001,partition:1] [area1,mid002,partition:1] [area3,mid006,partition:1]↑ area1 < area3 按字典序排序3. 按分区顺序写入spill文件:for (int i = 0; i < partitions; ++i) {// 为每个分区创建Writerwriter = new Writer<K, V>(job, partitionOut, keyClass, valClass, codec);// 根据排序后的kvmeta索引,从kvbuffer读取实际kv数据Partition 0实际写入过程 (area2记录):- keyStart=20, valStart=25, valLen=7- 读取kvbuffer[20:25] = [5][a][r][e][a][2] → 反序列化 → "area2"- 读取kvbuffer[25:32] = [7][m][i][d][0][0][3] → 反序列化 → "mid003"- writer.append("area2", "mid003")Partition 1实际写入过程 (按排序后索引顺序):第1条: area1,mid001- keyStart=0, valStart=5, valLen=7- kvbuffer[0:5] → "area1", kvbuffer[5:12] → "mid001"第2条: area1,mid002- keyStart=10, valStart=15, valLen=7- kvbuffer[10:15] → "area1", kvbuffer[15:22] → "mid002"第3条: area3,mid006- keyStart=30, valStart=35, valLen=7- kvbuffer[30:35] → "area3", kvbuffer[35:42] → "mid006"4. 记录索引信息:spillRec.putIndex(rec, i) - 记录每个分区在文件中的位置和大小IndexRecord{partition:0, startOffset:0, length:20}IndexRecord{partition:1, startOffset:20, length:60}溢写文件最终布局:
spill-0.out: [area2数据][area1数据][area3数据]
spill-0.out.index: [分区0索引][分区1索引]
阶段5: Shuffle阶段核心机制
Reduce任务拉取数据
按照partition数量/reduce数量生成对应reducer,在shuffle阶段已经生成了对应的RawKeyValueIterator(k+list[v]),在reduce上只要nextkey的方式去读即可
Reducer-0拉取所有area2相关数据:
从各Map任务拉取area2数据:
area2 -> [mid003, mid003, mid004, mid007] # mid003重复
Reducer-1拉取area1和area3相关数据(多key处理):
从Map-1拉取: (area1,mid001), (area1,mid002), (area3,mid006)
从Map-2拉取: (area1,mid001), (area1,mid005), (area3,mid007)
从Map-3拉取: (area1,mid008), (area3,mid006) # area3 mid006重复
从Map-4拉取: (area1,mid002), (area3,mid009)
从Map-5拉取: (area1,mid010)
从Map-6拉取: (area1,mid011), (area3,mid012)k路归并排序详细流程(Merger.merge(),只对k进行排序):
1. 创建Segment列表:从6个Map任务的spill文件中提取Partition 1的数据段Segment1: area1,mid001 | area1,mid002 | area3,mid006Segment2: area1,mid001 | area1,mid005 | area3,mid007Segment3: area1,mid008 | area3,mid006...2. 优先队列(PriorityQueue)k路归并:初始状态:队列头部放入每个Segment的第一个元素[area1,mid001] [area1,mid001] [area1,mid008] ...归并过程(按key字典序):Step1: 取出area1,mid001 → 从对应Segment取下一个元素area1,mid002Step2: 取出area1,mid001 → 从对应Segment取下一个元素area1,mid005Step3: 取出area1,mid002 → 继续......3. 有序输出流:area1,mid001 | area1,mid001 | area1,mid002 | area1,mid002 | area1,mid005 |area1,mid008 | area1,mid010 | area1,mid011 | area3,mid006 | area3,mid006 |area3,mid007 | area3,mid009 | area3,mid0124. ReduceContext自动按key分组(comparator.compare()):检测key变化点,自动将相同key的value聚合:area1 -> [mid001, mid001, mid002, mid002, mid005, mid008, mid010, mid011]area3 -> [mid006, mid006, mid007, mid009, mid012]
内存管理
- 优先使用内存存储拉取的数据
- 数据量大时溢写到磁盘
- 最终k路归并生成有序数据流
阶段6: Reduce阶段处理
Reducer-0处理area2数据
输入数据流:
key: area2
values: [mid003, mid003, mid004, mid007]Reducer处理逻辑:
reduce(area2, [mid003, mid003, mid004, mid007]) {Set<String> distinctMids = new HashSet<>();for (String mid : values) {distinctMids.add(mid); // 自动去重}int count = distinctMids.size(); // count = 3emit(area2, count);
}输出结果:area2, 3
Reducer-1处理多个key(area1和area3)
关键:框架会按key顺序依次调用reduce方法第1次调用 - 处理area1:
context.nextKey() = true // 返回area1
key: area1
values: [mid001, mid001, mid002, mid002, mid005, mid008, mid010, mid011]reduce(area1, [mid001, mid001, mid002, mid002, mid005, mid008, mid010, mid011]) {Set<String> distinctMids = new HashSet<>();for (String mid : values) {distinctMids.add(mid); // 自动去重}int count = distinctMids.size(); // count = 6emit(area1, count);
}第2次调用 - 处理area3:
context.nextKey() = true // 返回area3
key: area3
values: [mid006, mid006, mid007, mid009, mid012]reduce(area3, [mid006, mid006, mid007, mid009, mid012]) {Set<String> distinctMids = new HashSet<>();for (String mid : values) {distinctMids.add(mid); // 自动去重}int count = distinctMids.size(); // count = 4emit(area3, count);
}第3次调用:
context.nextKey() = false // 没有更多key,结束输出结果:
area1, 6
area3, 4
最终结果输出
TextOutputFormat写入HDFS
输出文件:/result/area_distinct_count/part-r-00000
area2 3输出文件:/result/area_distinct_count/part-r-00001
area1 6
area3 4
这个流程完美展现了MapReduce如何处理GROUP BY和COUNT(DISTINCT)这类聚合查询,通过分区、排序、去重的组合实现了分布式的精确去重计数。
不同SQL函数在MapReduce中的实现策略
窗口函数类
核心思想:框架只对key排序,窗口函数需要按特定字段排序,必须使用二次排序技术
// 1. 复合Key类
public class CompositeKey implements WritableComparable<CompositeKey> {private Text partition; // 分区字段 (如area)private Text sortField; // 排序字段 (如mid, score, timestamp)// compareTo: 先按partition,再按sortField排序public int compareTo(CompositeKey other) {int result = this.partition.compareTo(other.partition);if (result == 0) {return this.sortField.compareTo(other.sortField);}return result;}
}// 2. 自定义Partitioner:只按partition分区
public class CustomPartitioner extends Partitioner<CompositeKey, Text> {public int getPartition(CompositeKey key, Text value, int numPartitions) {return (key.getPartition().hashCode() & Integer.MAX_VALUE) % numPartitions;}
}// 3. 分组Comparator:只按partition分组
public class GroupingComparator extends WritableComparator {public int compare(WritableComparable a, WritableComparable b) {CompositeKey k1 = (CompositeKey) a;CompositeKey k2 = (CompositeKey) b;return k1.getPartition().compareTo(k2.getPartition());}
}
1. ROW_NUMBER() 行号函数
SELECT area, mid, ROW_NUMBER() OVER(PARTITION BY area ORDER BY mid) as rn
FROM tb
实现:
// Map阶段
map(area1, mid001) -> emit(CompositeKey("area1", "mid001"), Text("mid001"))// Reduce阶段:框架保证同一area内按mid有序
public void reduce(CompositeKey key, Iterable<Text> values, Context context) {int rowNumber = 1;for (Text value : values) {String area = key.getPartition().toString();String mid = value.toString();context.write(new Text(area + "," + mid), new IntWritable(rowNumber++));}
}
2. RANK() 和 DENSE_RANK() 排名函数
SELECT area, mid, RANK() OVER(PARTITION BY area ORDER BY score DESC) as rank
FROM tb
实现:
// Map阶段
map(area1, mid001, score=95) -> emit(CompositeKey("area1", "95"), Text("mid001"))// Reduce阶段:框架保证同一area内按score有序
public void reduce(CompositeKey key, Iterable<Text> values, Context context) {int rank = 1;int denseRank = 1;String lastScore = null;int sameScoreCount = 0;for (Text value : values) {String score = key.getSortField().toString();String mid = value.toString();if (lastScore != null && !score.equals(lastScore)) {rank += sameScoreCount; // RANK()跳跃denseRank++; // DENSE_RANK()连续sameScoreCount = 1;} else {sameScoreCount++;}context.write(new Text(key.getPartition() + "," + mid), new IntWritable(rank));lastScore = score;}
}
3. LAG() 和 LEAD() 偏移函数
SELECT area, mid, LAG(mid, 1) OVER(PARTITION BY area ORDER BY timestamp) as prev_mid
FROM tb
实现:
// Map阶段
map(area1, mid001, timestamp) -> emit(CompositeKey("area1", timestamp), Text("mid001"))// Reduce阶段:框架保证同一area内按timestamp有序
public void reduce(CompositeKey key, Iterable<Text> values, Context context) {List<String> midList = new ArrayList<>();for (Text value : values) {midList.add(value.toString());}for (int i = 0; i < midList.size(); i++) {String current = midList.get(i);String prev = (i > 0) ? midList.get(i-1) : "NULL"; // LAGString next = (i < midList.size()-1) ? midList.get(i+1) : "NULL"; // LEADcontext.write(new Text(key.getPartition() + "," + current), new Text("prev:" + prev));}
}
聚合函数类
4. SUM, AVG, MAX, MIN 基础聚合
SELECT area, SUM(amount), AVG(amount), MAX(amount), MIN(amount)
FROM tb GROUP BY area
MapReduce实现策略:
// 使用Combiner预聚合优化
public static class AggregateCombiner extends Reducer<Text, DoubleWritable, Text, Text> {public void reduce(Text area, Iterable<DoubleWritable> amounts, Context context) {double sum = 0, max = Double.MIN_VALUE, min = Double.MAX_VALUE;int count = 0;for (DoubleWritable amount : amounts) {double val = amount.get();sum += val;max = Math.max(max, val);min = Math.min(min, val);count++;}// 输出预聚合结果:"sum:100,count:5,max:50,min:10"String result = "sum:" + sum + ",count:" + count + ",max:" + max + ",min:" + min;context.write(area, new Text(result));}
}// Reducer最终聚合
public void reduce(Text area, Iterable<Text> preAggResults, Context context) {double totalSum = 0, globalMax = Double.MIN_VALUE, globalMin = Double.MAX_VALUE;int totalCount = 0;for (Text result : preAggResults) {// 解析预聚合结果并合并String[] parts = result.toString().split(",");totalSum += Double.parseDouble(parts[0].split(":")[1]);totalCount += Integer.parseInt(parts[1].split(":")[1]);// ... 处理max, min}double avg = totalSum / totalCount;context.write(area, new Text("sum:" + totalSum + ",avg:" + avg + "..."));
}
过程记录
deprecationContext
新老配置兼容,一旦旧的配置变动,映射的新的配置也要同步变动
核心作用这个类维护两个映射表,用来处理配置项的废弃关系:1. 两个映射表// 正向映射:老配置 → 废弃信息Map<String, DeprecatedKeyInfo> deprecatedKeyMap;// 例: "fs.default.name" → DeprecatedKeyInfo{newKeys: ["fs.defaultFS"], message: "..."}// 反向映射:新配置 → 老配置 Map<String, String> reverseDeprecatedKeyMap;// 例: "fs.defaultFS" → "fs.default.name"2. 构造过程详解步骤1:复制旧数据if (other != null) {// 把旧的DeprecationContext的所有映射关系复制过来for (Entry<String, DeprecatedKeyInfo> entry :other.deprecatedKeyMap.entrySet()) {newDeprecatedKeyMap.put(entry.getKey(), entry.getValue());}// 反向映射也复制for (Entry<String, String> entry :other.reverseDeprecatedKeyMap.entrySet()) {newReverseDeprecatedKeyMap.put(entry.getKey(), entry.getValue());}}步骤2:添加新的废弃关系for (DeprecationDelta delta : deltas) {// delta例子: "fs.default.name" → "fs.defaultFS"if (!newDeprecatedKeyMap.containsKey(delta.getKey())) {// 创建废弃信息DeprecatedKeyInfo newKeyInfo = new DeprecatedKeyInfo(delta.getNewKeys(), // ["fs.defaultFS"] delta.getCustomMessage() // 自定义消息);// 正向映射: "fs.default.name" → DeprecatedKeyInfonewDeprecatedKeyMap.put(delta.key, newKeyInfo);// 反向映射: "fs.defaultFS" → "fs.default.name"for (String newKey : delta.getNewKeys()) {newReverseDeprecatedKeyMap.put(newKey, delta.key);}}}步骤3:创建不可变映射this.deprecatedKeyMap = UnmodifiableMap.decorate(newDeprecatedKeyMap);this.reverseDeprecatedKeyMap =UnmodifiableMap.decorate(newReverseDeprecatedKeyMap);3. 实际使用例子假设有这样的废弃配置:new DeprecationDelta("fs.default.name", "fs.defaultFS");new DeprecationDelta("dfs.umaskmode", "fs.permissions.umask-mode");构造完成后的映射表:正向映射表 (deprecatedKeyMap):"fs.default.name" → DeprecatedKeyInfo{newKeys: ["fs.defaultFS"],customMessage: null}"dfs.umaskmode" → DeprecatedKeyInfo{newKeys: ["fs.permissions.umask-mode"],customMessage: null}反向映射表 (reverseDeprecatedKeyMap):"fs.defaultFS" → "fs.default.name""fs.permissions.umask-mode" → "dfs.umaskmode"4. 如何使用这两个表// 场景1: 用户使用老配置 "fs.default.name"String oldKey = "fs.default.name";DeprecatedKeyInfo info = deprecatedKeyMap.get(oldKey);if (info != null) {// 显示警告System.out.println("Warning: " + oldKey + " is deprecated, use " +info.newKeys[0]);// 获取新配置的值String value = getProperty(info.newKeys[0]);}// 场景2: 程序内部使用新配置 "fs.defaultFS",但要兼容老配置String newKey = "fs.defaultFS";String oldKey = reverseDeprecatedKeyMap.get(newKey);if (oldKey != null) {// 如果老配置有值,就用老配置的值String oldValue = getProperty(oldKey);if (oldValue != null) {return oldValue;}}
listStatus - fileinputformatFileStatusDeprecatedRawLocalFileStatus
super(f.length(), f.isDirectory(), 1, defaultBlockSize,
ra wlocalfilesystem: 字节数、block副本数、默认blocksize 本地32MDeprecatedRawLocalFileStatus{path=file:/Users/xjh/hadoop-wordcount/test_data/input/sample1.txt; isDirectory=false; length=128; replication=1; blocksize=33554432; modification_time=1761809801595; access_time=1761809803000; owner=; group=; permission=rw-rw-rw-; isSymlink=false; hasAcl=false; isEncrypted=false; isErasureCoded=false}BlockLocation[] locs = result.isFile() ?getFileBlockLocations(result, 0, result.getLen()) :null;BlockLocation(String[] names, String[] hosts, long offset,long length)
提交任务splitint maps = writeSplits(job, submitJobDir); maps相当于splits数量(对输入文件夹下每个文件单独做split切分,每个为blocksize,每个文件最后一个split为【blocksize,blocksize*1.1】)AccessControlList acl = submitClient.getQueueAdmins(queue); 设置队列管理员TokenCache.cleanUpTokenReferral(conf); 清理token引用(防止作业配置复制后token错误)ReservationId reservationId = job.getReservationId(); 设置资源预留信息writeConf(conf, submitJobFile); 写入作业配置文件submitClient.submitJob()state = JobState.RUNNING
writeSplits
List<InputSplit> splits = input.getSplits(job) 对文件夹下每个文件单独做split切分List<FileStatus> files = listStatus(job); 为输入文件添加元数据LocatedFileStatus:文件路径、长度(字节)、副本数、blocksize、是否目录、blocklocations(name + host + offset + filelen)Path[] dirs = getInputPaths(job); 找到job的输入目录FileStatus[] matches = fs.globStatus(p, inputFilter); 获取输入目录的信息for (FileStatus globStat: matches) { 遍历所有的输入文件夹RemoteIterator<LocatedFileStatus> iter =fs.listLocatedStatus(globStat.getPath()); 添加每个输入文件信息string[] names = FileUtil.list(localf); 获取文件夹内所有的文件名results[j] = getFileStatus(new Path(f, new Path(null, null,names[i]))); 添加文件具体信息new DeprecatedRawLocalFileStatus(pathToFile(f), defaultBlockSize, this)DeprecatedRawLocalFileStatus{path=file:/Users/xjh/hadoop-wordcount/test_data/input/sample1.txt; isDirectory=false; length=128; replication=1; blocksize=33554432; modification_time=1761809801595; access_time=1761878245000; owner=; group=; permission=rw-rw-rw-; isSymlink=false; hasAcl=false; isEncrypted=false; isErasureCoded=false} 文件路径、长度(字节)、副本数、blocksize、是否目录LocatedFileStatus stat = iter.next(); deprecatedrawlocalfilestatus+添加block信息->LocatedFileStatusBlockLocation[] locs = result.isFile() ? getFileBlockLocations(result, 0, result.getLen()) :null;分场景本地文件系统、分布式、纠删码 name + host + offset + filelen new LocatedFileStatus(result, locs);for (FileStatus file: files) { 对文件夹下每个文件单独做split切分long blockSize = file.getBlockSize(); 本地默认32Mlong splitSize = computeSplitSize(blockSize, minSize, maxSize); 限制在1字节到Long.MAX_VALUE内while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { 只有超过1.1倍的blocksize才切块(略微超过blocksize的部分将会一起加入到最后一块中,存在最后一块split的blocksize大于splitSize(本地默认32MB)的情况/ 如果最后一个split超过blocksize 那么范围也会是在 【blocksize,blocksize*1.1】内)int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); 找到对应的blockFileSplit(file, start, length, hosts, inMemoryHosts) 进行一个切块
Arrays.sort(array, new SplitComparator()); 按照split的size进行从高到底排序 timsort或者mergesort
JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array); 提交SplitMetaInfo
blocklocation
1. 本地文件系统(默认实现)// 返回本地主机信息BlockLocation("localhost:9866", "localhost", 0, file.getLen())// 含义:整个文件都在本地主机上2. HDFS(分布式存储)// 三副本情况BlockLocation(offset: 0, length: BLOCK_SIZE,hosts: {"host1:9866", "host2:9866", "host3:9866"})BlockLocation(offset: BLOCK_SIZE, length: BLOCK_SIZE,hosts: {"host2:9866", "host3:9866", "host4:9866"})// 含义:// - 第一个块存储在host1、host2、host3上// - 第二个块存储在host2、host3、host4上3. 纠删码文件(Erasure Coding)小文件(< 1个条带大小):// RS_3_2: 3个数据块 + 2个校验块BlockLocation(offset: 0, length: 2 * CELL_SIZE,hosts: {"host1:9866", "host2:9866", "host3:9866", "host4:9866"})// 4台主机:2个数据块 + 2个校验块中等文件(> 1个条带,< 1个组):BlockLocation(offset: 0, length: actual_file_size,hosts: {"host1:9866", "host2:9866", "host3:9866", "host4:9866","host5:9866"})// 5台主机:3个数据块 + 2个校验块大文件(> 1个组大小):// 文件大小 = 3 * BLOCK_SIZE + 123BlockLocation(offset: 0, length: 3 * BLOCK_SIZE,hosts: {"host1:9866", "host2:9866", "host3:9866", "host4:9866","host5:9866"})BlockLocation(offset: 3 * BLOCK_SIZE, length: 123,hosts: {"host1:9866", "host2:9866", ...})
splits的详细切块
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(blockSize, minSize, maxSize);long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);splits.add(makeSplit(path, length-bytesRemaining, splitSize,blkLocations[blkIndex].getHosts(),blkLocations[blkIndex].getCachedHosts()));bytesRemaining -= splitSize;
}if (bytesRemaining != 0) {int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,blkLocations[blkIndex].getHosts(),blkLocations[blkIndex].getCachedHosts()));
}
split的排序sort
Arrays.sort(array, new SplitComparator());
public static <T> void sort(T[] a, Comparator<? super T> c) {if (c == null) {sort(a);} else {if (LegacyMergeSort.userRequested)legacyMergeSort(a, c);elseTimSort.sort(a, 0, a.length, c, null, 0, 0);}
}TimSort 是Java 7引入的排序算法,结合了归并排序和插入排序:- 来源: 由Tim Peters为Python设计,后来移植到Java- 本质: 自适应的稳定归并排序- 时间复杂度:- 最好: O(n) - 数据已经有序- 平均: O(n log n)- 最坏: O(n log n)- 空间复杂度: O(n)- 稳定性: 稳定排序TimSort的优化策略// TimSort会识别数据中的有序片段(runs)数组: [1, 2, 3, 8, 7, 6, 5, 4, 10, 11, 12]识别: [1,2,3] [8,7,6,5,4反转] [10,11,12]然后高效地合并这些有序片段适应性特点// 对于不同数据模式的优化:完全有序: O(n) 时间部分有序: 接近 O(n) 时间完全随机: O(n log n) 时间逆序: O(n) 时间(识别后反转)2. LegacyMergeSort (传统归并排序)基本特性if (LegacyMergeSort.userRequested)legacyMergeSort(a, c);LegacyMergeSort 是Java 6及之前版本使用的传统归并排序:- 算法: 经典的分治归并排序- 时间复杂度:- 最好/平均/最坏都是 O(n log n)- 空间复杂度: O(n)- 稳定性: 稳定排序
runNewMapper(一个split对应一个runnewmapper)
runNewMapper(job, splitMetaInfo, umbilical, reporter);partitions = jobContext.getNumReduceTasks(); 分区数量等于reduce数量(分区数=reduce数=partition数),用户自定义否则是1(mapreduce.job.reduces)partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>) ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job); 大于1则获取job配置的自定义分区类,否则HashPartitioner(return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;)input.initialize(split, mapperContext); 初始化每个split的起始位置real.initialize(split, context); linerecordreader.initialize 设置 start pos in filepositionFileSplit split = (FileSplit) genericSplit; 获取splitstart = split.getStart(); 获取文件的起点终点end = start + split.getLength();CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file); 检查是否文件是压缩的fileIn.seek(start);in = new UncompressedSplitLineReader(fileIn, job, this.recordDelimiterBytes, split.getLength());filePosition = fileIn;if (start != 0) {start += in.readLine(new Text(), 0, maxBytesToConsume(start));} 除了第一个split其他都跳过第一行mapper.run(mapperContext); TaskStatus进入map状态,数据写入mapper的kvbuffer内,写meta达到软限制或者写kv时有溢写数据时溢写spill文件boolean result = real.nextKeyValue();if (key == null) {key = new LongWritable();} 设置初始化0while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) { 多读一行newSize = skipUtfByteOrderMark(); 首行跳过utf-8bom标记 ,读取一整行newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos)); 直接读取一整行,读到的放到value到时候传递给自定义mapper内(无论split边界在哪里,都会读完当前整行,即使行的结尾超出了split的end位置,这样配合initialize就可以避免被错误切分了,只需要保证在读之前pos的位置在切块中)map(context.getCurrentKey(), context.getCurrentValue(), context); 将读到的数据都提供给写好的mapper自定义内 wrappedmapper自定义mapper.writeMapOutputCollector<K,V> collector.collect(key, value,partitioner.getPartition(key, value, partitions)); 调用自定义partitioner的getpartition或者默认为0keySerializer.open(bb) - 将序列化器的输出流设置为缓冲区bbbyte[] kvbuffer blockingbuffer序列化key value会正向写入, kvmeta会metasize四个int反向写入。排序时只需要操作轻量级的kvmeta索引,而不用移动kvbuffer中的大量实际数据kvmeta = ByteBuffer.wrap(kvbuffer).order(ByteOrder.nativeOrder()).asIntBuffer() intbuffer类型,依然是kvbuffer溢写需要同时满足两个条件:1. 空间条件:bufferRemaining <= 0 或达到软限制2. 数据条件:kvindex != kvend(有未溢写的数据bufferRemaining -= METASIZE;if (bufferRemaining <= 0) { spillLock.lock(); 上锁1.溢写完成,重置bufferremaining((kvbend + METASIZE) % kvbuffer.length != equator - (equator % METASIZE) kv+meta)2.达到bufsoftlimit(软限制(kvbuffer.length * spillper):开启后台溢写sortAndSpill;重置equator位置;重置bufferremainingout = rfs.create(filename); 创建溢写文件 spill file[分区0数据][分区1数据][分区2数据][分区3数据]...^offset:0 ^offset:150 ^offset:300 ^offset:500length:150 length:150 length:200 length:180sorter.sort(MapOutputBuffer.this, mstart, mend, reporter); 对元数据进行排序QuickSort;先按分区号,再按key值排序for (int i = 0; i < partitions; ++i) { 对每个分区进行操作,写入同一个spill filewriter = new Writer<K, V>(job, partitionOut, keyClass, valClass, codec,spilledRecordsCounter); 创建writerif (combinerRunner == null) { 无Combiner - 直接溢写;有Combiner - 先合并再溢写;写的是kv对(这个过程会调用对应的job.setCombinerClass(WordCountReducer.class);进行reducespillRec.putIndex(rec, i); // 记录每个分区在文件中的位置和大小if (totalIndexCacheMemory >= indexCacheMemoryLimit) { 写入索引文件到内存或磁盘(以便后续int keystart = bufindex; 记录key的位置 keySerializer.serialize(key) - 序列化key并直接写入到 bb 缓冲区中System.arraycopy(b, off, kvbuffer, bufindex, len); bb缓冲区最终写到kvbuffer中如果remainbuffer不足,有未溢写的数据时进行溢写if (bufindex < keystart) { 如果写入后的位置小于开始位置,key数据跨越了环形缓冲区的边界, RawComparator需要连续的key数据进行比较。如果key被分割存储,比较器就无法正确工作,因此调用 shiftBufferedKey 将分割的key数据重新组合成连续的数据要么在缓冲区内重新排列,要么直接写出到磁盘。- 缓冲区开头是否有足够空间容纳整个key? 是的话走重新排列- 重新排列:两次System.arraycopy将分割的key数据在缓冲区内合并- 写出到磁盘:通过两次out.write()将分割的key数据按顺序写出final int valstart = bufindex; 记录value的位置kvmeta.put(kvindex + PARTITION, partition); kvmeta记录元数据,kvindex从后往前移动kvmeta.put(kvindex + KEYSTART, keystart);kvmeta.put(kvindex + VALSTART, valstart);kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend));kvindex = (kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity();mapPhase.complete();
setPhase(TaskStatus.Phase.SORT); TaskStatus进入sort状态
output.close(mapperContext);collector.flush(); 刷写Map任务执行过程:1. 写入数据 -> 缓冲区满 -> spill_0.out (第1次溢写)2. 继续写入 -> 缓冲区满 -> spill_1.out (第2次溢写)3. 继续写入 -> 缓冲区满 -> spill_2.out (第3次溢写)4. Map任务结束 -> flush():- 处理剩余数据 -> spill_3.out (flush中的溢写)- mergeParts() -> part-r-00000 (合并生成最终文件)if (kvindex != kvend) 将剩余数据进行溢写;只处理还留在缓冲区中、尚未被之前的spill操作处理的剩余数据kvbuffer = null;mergeParts();if numSpills == 1 只有一个spill文件直接重命名returnindexCacheList.add(new SpillRecord(indexFileName, job)); 将磁盘上的索引文件读入内存for (int parts = 0; parts < partitions; parts++) { 按分区进行合并写到segmentListList<Segment<K,V>> segmentList = new ArrayList<Segment<K, V>>(numSpills); // 为每个分区创建segment列表for(int i = 0; i < numSpills; i++) { // 从每个spill文件中提取当前分区的数据段IndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);Segment<K,V> s = new Segment<K,V>(job, rfs, filename[i],indexRecord.startOffset, // 分区在文件中的起始位置indexRecord.partLength, // 分区数据长度codec, true);segmentList.add(i, s);int mergeFactor = job.getInt(MRJobConfig.IO_SORT_FACTOR,MRJobConfig.DEFAULT_IO_SORT_FACTOR); 配置一次性读取文件进行merge的最大文件数RawKeyValueIterator kvIter = Merger.merge MergeQueue进行k路归并排序算法,时间复杂度: O(N log K),其中N是总记录数,K是段数
文件压缩reader
1. 检测文件是否压缩CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);- 根据文件扩展名或文件头检测压缩格式- 比如:.gz, .bz2, .snappy, .lz4 等2. 如果文件未压缩if (null == codec) {// 走正常的文本文件读取逻辑// 直接创建普通的LineReader}3. 如果文件已压缩if (null != codec) {isCompressedInput = true; // 标记为压缩输入// 从池中获取解压器(性能优化)decompressor = CodecPool.getDecompressor(codec);// 然后分两种情况处理...}两种压缩处理方式情况1:可分割的压缩格式 (SplittableCompressionCodec)if (codec instanceof SplittableCompressionCodec) {// 比如:bzip2, 某些自定义格式// 创建可分割的压缩输入流final SplitCompressionInputStream cIn =((SplittableCompressionCodec)codec).createInputStream(fileIn, decompressor, start, end,SplittableCompressionCodec.READ_MODE.BYBLOCK);// 使用专门的压缩分片行读取器in = new CompressedSplitLineReader(cIn, job,this.recordDelimiterBytes);// 调整分片边界(压缩文件的边界可能需要调整)start = cIn.getAdjustedStart();end = cIn.getAdjustedEnd();filePosition = cIn;}特点:- 可以从压缩文件的中间位置开始读取- 支持大文件分片并行处理- 比如bzip2支持块级压缩,可以找到块边界情况2:不可分割的压缩格式 (普通CompressionCodec)else {// 比如:gzip, snappy, lz4if (start != 0) {// 如果尝试从非起始位置读取,抛出异常throw new IOException("Cannot seek in " +codec.getClass().getSimpleName() + " compressed stream");}// 只能从文件开头读取整个文件in = new SplitLineReader(codec.createInputStream(fileIn,decompressor),job, this.recordDelimiterBytes);filePosition = fileIn;}特点:- 必须从文件开头读取整个文件- 不支持分片,整个文件只能作为一个split- gzip就是这种情况
linerecordreader读nextkeyvalue
public boolean nextKeyValue() throws IOException {if (key == null) {key = new LongWritable();}key.set(pos);if (value == null) {value = new Text();}int newSize = 0;// We always read one extra line, which lies outside the upper// split limit i.e. (end - 1)while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {if (pos == 0) {newSize = skipUtfByteOrderMark();} else {newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));pos += newSize;}if ((newSize == 0) || (newSize < maxLineLength)) {break;}// line too long. try againLOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize));}if (newSize == 0) {key = null;value = null;return false;} else {return true;}}
maptask.init
METASIZE = 16字节,表示每条记录的元数据占用16字节。每条Map输出记录的元数据包含4个int:// 每条记录的元数据(16字节):[kvindex+0]: value起始位置[kvindex+1]: key起始位置[kvindex+2]: 分区号[kvindex+3]: value长度
final float spillper = job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT,(float)0.8); 缓冲区80%时溢写到磁盘
final int sortmb = job.getInt(MRJobConfig.IO_SORT_MB,MRJobConfig.DEFAULT_IO_SORT_MB); 排序缓冲区大小:默认100MB
indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,INDEX_CACHE_MEMORY_LIMIT_DEFAULT); 索引缓存限制:默认1MB
int maxMemUsage = sortmb << 20; 调整到字节(原来是MB)
maxMemUsage -= maxMemUsage % METASIZE; 对齐到metasize的长度(向下减少防止溢出最大排序缓冲区)
kvbuffer = new byte[maxMemUsage]; kvbuffer的大小是调整后的排序缓冲区(字节)
maxRec = kvmeta.capacity() / NMETA; 最多记录条数
softLimit = (int)(kvbuffer.length * spillper); 软缓冲区
bufferRemaining = softLimit;
collect方法
byte[] kvbuffer
kvmeta = ByteBuffer.wrap(kvbuffer) // 包装同一个字节数组.order(ByteOrder.nativeOrder()).asIntBuffer(); // 转换为IntBuffer视图
"data"的序列化分析Text("data")的序列化:String: "data"
长度: 4个字符
字节: [d][a][t][a] = 4字节内容VInt编码的长度: 4 < 128,所以用1字节存储长度
序列化结果: [4][d][a][t][a] = 总共5字节内存布局:序列化前: bufindex = 0
kvbuffer: [未使用...........................]0序列化"data"后: bufindex = 5
kvbuffer: [4][d][a][t][a][未使用................]0 1 2 3 4 5↑ ↑keystart=0 bufindex=5IntWritable(1)的序列化:int值: 1二进制: 00000000 00000000 00000000 00000001字节表示: [0][0][0][1] // 大端序,4字节内存布局变化序列化value前:kvbuffer: [4][d][a][t][a][未使用................]0 1 2 3 4 5↑ ↑keystart=0 valstart=5序列化value后:kvbuffer: [4][d][a][t][a][0][0][0][1][未使用.......]0 1 2 3 4 5 6 7 8 9↑ ↑ ↑keystart=0 valstart=5 bufindex=9字节详细分析完整的key-value序列化:// Text("data") + IntWritable(1)kvbuffer[0] = 4 // Text长度kvbuffer[1] = 'd' // kvbuffer[2] = 'a' // Text内容 "data"kvbuffer[3] = 't' //kvbuffer[4] = 'a' //kvbuffer[5] = 0 // int值1的高位字节kvbuffer[6] = 0 // kvbuffer[7] = 0 // kvbuffer[8] = 1 // int值1的低位字节
环形缓冲区
MapOutputBuffer环形缓冲区设计理念MapOutputBuffer使用一个**单一的字节数组kvbuffer**同时存储两种数据:1. 实际的key-value数据(序列化后的字节)2. 元数据信息(每条记录的索引信息)关键创新:两种数据从数组的两端向中间增长,用equator(赤道)作为分界点。⏺ 关键变量详解1. equator(赤道)作用:数据和元数据的分界点int equator; // 第958行定义- 序列化数据从equator向右增长- 元数据从equator向左增长(对齐到METASIZE边界)2. 序列化数据相关变量int bufstart; // spill开始位置int bufend; // spill结束位置 int bufmark; // 当前记录结束位置int bufindex; // 当前写入位置int bufvoid; // 缓冲区"虚空"位置3. 元数据相关变量int kvstart; // spill的元数据开始位置(以int为单位)int kvend; // spill的元数据结束位置(以int为单位)int kvindex; // 当前元数据写入位置(以int为单位)4. 初始化过程从代码第1037-1043行:setEquator(0); // 设置equator为0bufstart = bufend = bufindex = equator; // 所有序列化位置都从equator开始kvstart = kvend = kvindex; // 所有元数据位置相同⏺ 完整生命周期示例假设kvbuffer长度为1000字节,METASIZE=16:阶段1:初始状态equator=0, bufindex=0, kvindex=62(最后一个元数据位置)[元数据区域... |0 ...数据区域]阶段2:写入第一条记录collect(key1, value1, partition0);- bufindex从0开始写入key1和value1的序列化数据- kvindex向前移动,写入元数据:partition、keystart、valstart、vallen- 结果:bufindex=50, kvindex=58阶段3:继续写入记录写入多条记录后:bufindex=300, kvindex=45, 缓冲区使用率达到softLimit阶段4:触发溢写- startSpill()被调用(第1906行)- kvend = kvindex, bufend = bufmark(记录溢写边界)- spillInProgress = true- 后台SpillThread开始排序和写磁盘阶段5:溢写进行中- 主线程继续写入新记录(调整equator位置)- 后台线程处理kvstart到kvend之间的数据阶段6:溢写完成- SpillThread调用相当于resetSpill()的逻辑- kvstart = kvend, bufstart = bufend- spillInProgress = false阶段7:最终flush- flush()方法处理剩余数据- mergeParts()合并所有spill文件关键时机1. kvend设置时机:每次startSpill()时2. kvend重置时机:每次溢写完成后的清理3. equator调整时机:缓冲区空间紧张时重新平衡4. 检查溢写完成:通过比较kvend位置是否已重置⏺ 这样设计的核心优势是:在有限的内存中高效管理大量数据,支持并发的写入和溢写操作,同时保持数据的有序性。
分配reduce
final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient());for (ClientProtocolProvider provider : providerList) 判断 mapreduce.framework.name 是local还是yarnlocalclientprotocolprovider:LocalJobRunner yarnclientprotocolprovider:YARNRunner
submitter.submitJobInternal(Job.this, cluster);Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir); 准备配置文件job.xmlint maps = writeSplits(job, submitJobDir); 进行splitsstatus = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials()); 提交(MapReduce作业转换为YARN应用的过程,创建了AM启动所需的所有信息和资源)ApplicationSubmissionContext appContext = createApplicationSubmissionContext(conf, jobSubmitDir, ts); 2. 创建应用提交上下文1. LocalResource: Map<String, LocalResource> localResources = setupLocalResources(jobConf, jobSubmitDir);告诉YARN需要分发哪些文件到AM容器- job.xml: 作业配置文件- job.jar: 用户JAR文件(如果存在)- job.split: 输入切片信息- job.splitmetainfo: 切片元数据2. ApplicationMaster命令行 List<String> vargs = setupAMCommand(jobConf);$JAVA_HOME/bin/java-Djava.io.tmpdir=$PWD/tmp-Dlog4j.configuration=container-log4j.properties-Xmx1024morg.apache.hadoop.mapreduce.v2.app.MRAppMaster$JAVA_HOME/bin/java -Djava.io.tmpdir=$PWD/tmp -Dlog4j.configuration=container-log4j.properties -Dyarn.app.container.log.dir=<LOG_DIR> -Dyarn.app.container.log.filesize=0 -Dhadoop.root.logger=DEBUG,CLA -Dhadoop.root.logfile=syslog -Xmx1024m -Dmapreduce.job.debug=true <ADD_OPENS> org.apache.hadoop.mapreduce.v2.app.MRAppMaster 1><LOG_DIR>/stdout 2><LOG_DIR>/stderr 3. 容器启动上下文 ContainerLaunchContext amContainer = setupContainerLaunchContextForAM(jobConf, localResources, securityTokens, vargs);- 环境变量: CLASSPATH、LD_LIBRARY_PATH等- 本地资源映射: JAR文件、配置文件等- 安全令牌: 认证信息- 访问控制: 应用的查看和修改权限4. 资源请求 List<ResourceRequest> amResourceRequests = generateResourceRequests();5. ApplicationSubmissionContext- ApplicationId: 唯一应用ID- 队列名称: 提交到的YARN队列- 应用名称: 作业名称- AM容器规格: 包含资源要求、启动命令、环境等- 最大重试次数: AM失败后的重试次数- 标签和优先级: 调度相关属性关键构造对象的作用:1. LocalResource: 告诉YARN需要分发哪些文件到AM容器2. ContainerLaunchContext: 定义如何启动AM容器3. ResourceRequest: 指定AM需要的硬件资源4. ApplicationSubmissionContext: 包装所有信息提交给ResourceManagerApplicationId applicationId = resMgrDelegate.submitApplication(appContext); 提交到ResourceManagerlocaljobrunner TaskSplitMetaInfo[] taskSplitMetaInfos = SplitMetaInfoReader.readSplitMetaInfo(jobId, localFs, conf, systemJobDir); 读splits数(根据文件大小切分)int numReduceTasks = job.getNumReduceTasks();Map<TaskAttemptID, MapOutputFile> mapOutputFiles = Collections.synchronizedMap(new HashMap<TaskAttemptID, MapOutputFile>()); 获取mapoutput文件List<RunnableWithThrowable> reduceRunnables = getReduceTaskRunnables(jobId, mapOutputFiles);for (int i = 0; i < this.numReduceTasks; i++) { 为每一个reduce任务创建一个reduce线程list.add(new ReduceTaskRunnable(taskId++, jobId, mapOutputFiles));
shuffle过程-依赖reduce(一个reduce对应一个shuffle)
init 创建reducetask(里面需要给定partition, 是默认的taskId)ReduceTask(String jobFile, TaskAttemptID taskId,int partition, int numMaps, int numSlotsRequired)
reducetask.runrIter = shuffleConsumerPlugin.run();fetchers[i].start(); 并行从多个Map节点拉取数据(对应partition下) myParent:copy task - 本地local模式doCopy(maps); MapOutput<K, V> mapOutput = merger.reserve(mapTaskId, decompressedLength,id); requestedSize > maxSingleShuffleLimit 用磁盘output;usedMemory > memoryLimit 直接返回null等待; 默认用内存outputmapOutput.shuffle(LOCALHOST, inStream, compressedLength,decompressedLength, metrics, reporter);- 非local模式copyFromHost(host)List<TaskAttemptID> maps = scheduler.getMapsForHost(host); 获取该主机上需要拉取的Map任务列表URL url = getMapOutputURL(host, maps);构造HTTP URL并建立连接DataInputStream input = openShuffleUrl(host, remaining, url);TaskAttemptID[] failedTasks = null;while (!remaining.isEmpty() && failedTasks == null) { 循环拉取每个MapOutputfailedTasks = copyMapOutput(host, input, remaining, fetchRetryEnabled); mapOutput = merger.reserve(mapId, decompressedLength, id); 失败了重试或抛出异常 new TaskAttemptID[] {mapId}scheduler.hostFailed(host.getHostName()); // 标记主机失败scheduler.copyFailed(left, host, true, false); // 标记任务拷贝失败,scheduler.putBackKnownMapOutput(host, left);host.addKnownMap(mapId);2. copyFailed(mapId, host) → 增加失败计数 + 惩罚主机 3. putBackKnownMapOutput(host, mapId) → 任务重新排队 4. penalize(host, delay) → 主机进入惩罚期,暂时不可用5. Referee线程等待 → delay时间后解除惩罚 失败次数越多,pow 惩罚时间越长(1.3^failures)6. host.markAvailable() → 主机重新可用7. pendingHosts.add(host) → 重新进入调度池8. 其他Fetcher获取该主机 → 重新尝试拉取失败的任务RawKeyValueIterator kvIter = merger.close(); myParent:sort(把磁盘和内存的output文件进行k路归并排序)finalMerge(jobConf, rfs, memory, disk)final RawKeyValueIterator rIter = Merger.merge(job, fs,keyClass, valueClass, memDiskSegments, numMemDiskSegments,tmpDir, comparator, reporter, spilledRecordsCounter, null,mergePhase) k路归并排序内存output文件,最终写到磁盘output(XXXmerged)Collections.sort(diskSegments, new Comparator<Segment<K,V>>() 按照文件大小从大到小排序(到这一步只有磁盘output)RawKeyValueIterator diskMerge = Merger.merge(job, fs, keyClass, valueClass, codec, diskSegments,ioSortFactor, numInMemSegments, tmpDir, comparator,reporter, false, spilledRecordsCounter, null, thisPhase); 磁盘output进行快排,检查有没有剩下的内存output,有的话进行k路归并排序
runNewReducer
sortPhase.complete(); sort结束setPhase(TaskStatus.Phase.REDUCE); 进入reduce阶段runNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass);reducer.run(reducerContext);while (context.nextKey()) {reduce(context.getCurrentKey(), context.getValues(), context); 进入到自定义reducer的reduce方法context.write(key, v);TextOutputFormat real.write(key,value);
