MapReduce(期末速成版)
起初在B站看3分钟的速成视频,感觉很多细节没听懂。
具体例子解析(文件内容去重)
对于两个输入文件,即文件A 和文件B,请编写MapReduce 程序,对两个文件进行合并,并剔除
其中重复的内容,得到一个新的输出文件C。
📂 一、输入数据文件
文件 A:
20150101 x
20150102 y
20150103 x
20150104 y
20150105 z
20150106 x
文件 B:
20150101 y
20150102 y
20150103 x
20150104 z
20150105 y
🧠 二、MapReduce 执行流程和中间结果
MapReduce 分为三个主要阶段:
-
Map 阶段
-
Shuffle(分组 & 排序)阶段
-
Reduce 阶段
🔹2.1 Map 阶段(映射阶段)
我们先来看下 Mapper 的代码逻辑:
public static class Map extends Mapper<Object, Text, Text, Text> {private static Text text = new Text();public void map(Object key, Text value, Context context) {text = value;context.write(text, new Text(""));}
}
🔍 Mapper 做了什么?
-
每行文本被视为一个输入记录(
value
),key
是字节偏移量(无关紧要)。 -
该
Mapper
不对数据做任何处理,直接原样输出value
作为key
,并给定空字符串作为value
。 -
这样,相同行的数据(A、B 中相同的行)会生成相同的 key,从而可以在 Shuffle 阶段合并。
🔢 Map 输出结果(中间键值对)
我们对 A、B 两个文件的所有行执行一次 map()
操作,得到如下中间结果(<key, value>
形式):
来源 | key(Text) | value(Text) |
---|---|---|
A | 20150101 x | "" |
A | 20150102 y | "" |
A | 20150103 x | "" |
A | 20150104 y | "" |
A | 20150105 z | "" |
A | 20150106 x | "" |
B | 20150101 y | "" |
B | 20150102 y | "" |
B | 20150103 x | "" |
B | 20150104 z | "" |
B | 20150105 y | "" |
🔹2.2 Shuffle 阶段(分组 & 排序)
MapReduce 框架自动完成以下操作:
-
将所有 Mapper 输出结果根据 key 进行哈希分区、排序、去重分组。
-
每一个唯一的 key 会被送入一次 Reducer。
🎯 分组结果(Reducer 接收到的 key 和 values):
key(唯一行) | values("" 的列表) |
---|---|
20150101 x | ["",] |
20150101 y | ["",] |
20150102 y | ["", ""] |
20150103 x | ["", ""] |
20150104 y | ["",] |
20150104 z | ["",] |
20150105 y | ["",] |
20150105 z | ["",] |
20150106 x | ["",] |
⚠️ 注意:
-
20150102 y
和20150103 x
都在两个文件中出现了,所以它们的values
有两个空字符串。 -
但 Reducer 并不关心这些
values
,它只输出唯一的key
。
🔹2.3 Reduce 阶段(归约阶段)
看一下 Reducer 的代码:
public static class Reduce extends Reducer<Text, Text, Text, Text> {public void reduce(Text key, Iterable<Text> values, Context context) {context.write(key, new Text(""));}
}
🔍 Reducer 做了什么?
-
对于每一个唯一的
key
,Reducer 被调用一次。 -
它忽略 values,直接输出
key
和空的Text("")
。 -
实际效果是:只输出不重复的唯一行内容。
✅ 最终输出文件 C 的内容:
20150101 x
20150101 y
20150102 y
20150103 x
20150104 y
20150104 z
20150105 y
20150105 z
20150106 x
✅ 总结
步骤 | 说明 |
---|---|
Map | 对输入的每一行输出 <Text(该行), Text("")> |
Shuffle | 根据行内容去重、分组、排序 |
Reduce | 忽略 values,只输出唯一的 key(行内容) |
输出文件 | 文件 A 和 B 合并去重后的内容 |
问题一:Reduce端是如何输出文件的?
✅ Reduce中context.write(key, value)
的行为
在 Hadoop MapReduce 中:
context.write(new Text("s"), new Text("a"));
的输出行为是:
-
每一行输出格式为:
key \t value
即,key 和 value 之间用一个制表符(Tab 字符
\t
)分隔。
🔍 所以你举的例子
context.write(new Text("s"), new Text("a"));
最终输出文件中的一行会是:
s a
不是 sa
,而是 s
和 a
之间有一个 Tab 分隔符。
🔧 那么在你的代码中:
context.write(key, new Text(""));
由于 value
是空字符串,所以每一行就是:
key
即没有显示的 value,只输出 key 的内容,所以:
20150101 x
这行实际上是 key 的原样内容,不是 key + "" 的拼接结果,而是 key 后面虽然有个空字符串作为 value,但由于 value 是空的,输出就是 key 自身。
✍️ 补充:可以自定义 key 和 value 的分隔符
在 Hadoop 中默认的 key/value 分隔符是 Tab(\t
),但你可以通过设置:
-D mapreduce.output.textoutputformat.separator=","
来更改分隔符,比如改成逗号,则输出会变成:
s,a
✅ 是不是有几个 Reducer 就会产生几个输出文件?
是的,完全正确。
在 Hadoop MapReduce 中:
-
如果你设置了 N 个 Reducer 任务(比如
job.setNumReduceTasks(N)
), -
那么就会产生 N 个输出文件。
这些输出文件的名称通常是:
part-r-00000
part-r-00001
...
part-r-00(N-1)
每个文件由一个 Reducer 任务写出。
❓ 那这些输出文件的内容一致吗?
不一致!每个文件的内容不同!
➤ 原因:
-
MapReduce 框架会按照 key 的 hash 值把数据**分区(Partition)**给不同的 Reducer。
-
每个 Reducer 只处理自己分到的 key 分区。
-
所以:
-
每个输出文件包含的是不同部分的 key-value 对。
-
输出文件之间是不重合的,也就是说每个 key 只会出现在一个 Reducer 的输出文件中。
-
🧠 举个例子(比如有 2 个 Reducer):
假设你有以下中间 key:
20150101 x
20150101 y
20150102 y
20150103 x
20150104 y
20150104 z
20150105 y
20150105 z
20150106 x
Hadoop 会通过 partitioner(比如默认的 HashPartitioner)决定:
-
Reducer 0 处理:
20150101 x 20150103 x 20150105 z 20150106 x
-
Reducer 1 处理:
20150101 y 20150102 y 20150104 y 20150104 z 20150105 y
然后产生:
-
part-r-00000
← Reducer 0 写入 -
part-r-00001
← Reducer 1 写入
两个文件中的内容互不重复,但合起来是完整的结果。
✅ 1. 默认是不是只有一个 Reducer?
是的,默认情况下 Hadoop MapReduce 只使用 1 个 Reducer。
也就是说,如果你没有显式调用:
job.setNumReduceTasks(N);
则默认 N = 1
,最终只会生成一个输出文件:part-r-00000
。
✅ 2. 可以设置多个 Reducer 吗?
当然可以,而且非常常见。
你可以在驱动代码中显式设置 Reducer 个数,例如设置为 3:
job.setNumReduceTasks(3);
这样 Hadoop 会启用 3 个 Reducer 并行处理数据,输出三个文件:
part-r-00000
part-r-00001
part-r-00002
问题二:Shuffle过程的输出结果与Combiner函数本质是?
✅ 一、Shuffle 输出是啥?
默认情况下:
Map 阶段的输出会经过 Shuffle(排序 + 分区 + 组装) 后变成:
key1 → [v1, v2, v3, ...]
key2 → [v1, v2, ...]
...
这些最终被送入 Reducer。
❓ 问题:为什么会有重复的 value?
因为 同一个 key 可能在同一个 Mapper 中出现多次,比如:
hello → 1
hello → 1
hello → 1
这些数据在传输前就可以局部聚合,先加一加再传过去,不用浪费网络带宽。
✅ 二、Combiner 是什么?
Combiner 就是一个 “局部 Reduce”,在 Mapper 端执行,用来提前聚合。
它的作用是:
-
在 Mapper 本地 就先对 key 进行累加(或合并),
-
减少大量重复的
<key, 1>
传给 Reducer, -
降低网络传输压力,提升性能。
✅ 三、怎么写一个 Combiner?
👉 很简单,其实你可以直接 复用 Reducer 逻辑,只要满足:聚合操作是可交换和结合的(比如加法)。
✅ 1. 定义 Combiner 类(和 Reducer 一样):
public static class IntSumCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}result.set(sum);context.write(key, result); // 输出 key sum}
}
✅ 2. 在 Driver 中设置:
job.setCombinerClass(IntSumCombiner.class);
⚠️ 注意:你也可以直接写成:
job.setCombinerClass(IntSumReducer.class);
因为本质一样(统计加法是符合条件的操作)。
✅ 四、添加了 Combiner 后的数据流是什么样?
假设有两个 Map 输出如下:
Mapper1 输出:
hello → 1
hello → 1
world → 1
经过 Combiner:
hello → 2
world → 1
Mapper2 输出:
hello → 1
world → 1
经过 Combiner:
hello → 1
world → 1
最终 Shuffle 输出给 Reducer:
hello → [2, 1]
world → [1, 1]
Reducer 再聚合:
hello → 3
world → 2
✅ 五、什么时候不要用 Combiner?
虽然 Combiner 很有用,但它不是 always-safe 的,只有在满足可交换、可结合的前提下才可用。
操作类型 | 适合使用 Combiner? | 示例 |
---|---|---|
加法、计数、最大最小值 | ✅ 可以用 | WordCount、MaxTemperature |
求平均、TopN、排序 | ❌ 不建议 | 平均值不能分区计算后再平均 |
✅ 所以完整流程是:
Map → Combiner → Shuffle(聚合 + 分区)→ Reduce