【大数据】MapReduce 编程--索引倒排--根据“内容 ➜ 出现在哪些文件里(某个单词出现在了哪些文件中,以及在每个文件中出现了多少次)
将 Hadoop 所需的 JAR 文件添加到项目中,确保可以使用 Hadoop 的 API
JAR (Java Archive) 文件是一种用于打包多个 Java 类文件、资源文件(如图片、配置文件等)以及元数据的压缩文件格式。它类似于 ZIP 文件,但 JAR 文件通常用于 Java 应用程序或库的分发。
在 Hadoop 或其他 Java 项目中,JAR 文件是执行程序的主要载体。通过将所有相关的类文件打包到一个 JAR 文件中,可以方便地管理和分发
API (Application Programming Interface) 是一组定义了如何通过不同软件组件之间的交互来实现某个功能的接口
目标:将多个文档(比如 1.txt、2.txt、3.txt)中出现的每一个“单词”以及该单词出现在“哪些文档”中、“每个文档中出现几次”列出。
正排索引(正常顺序)
一个“文件”里有哪些“单词”?
文件名 | 包含的单词 |
---|---|
1.txt | MapReduce, simple, powerful |
倒排索引 ---把每个单词,反过来,查它在哪些文件里出现,并统计每个文件出现几次。
某个“单词”在哪些“文件”里出现过?
搜索引擎用这个来找“某个关键词在哪些网页里
Map 过程
从文本中提取每个单词的出现信息,并记录它出现在了哪个文件中
Mapper<Object, Text, Text, Text>
是泛型类型,表示输入的 key 是 Object
类型(一般用于偏移量,表示文件的每行),输入的 value 是 Text
类型(即一行文本)。Mapper 的输出 key 和 value 都是 Text
类型
private Text keyInfo = new Text();
private Text valueInfo = new Text();
private FileSplit split;
keyInfo
和 valueInfo
是用于输出的 key 和 value,类型是 Text---
字符串类型
map
方法
key 在 MapReduce 中通常是行的偏移量
context
: 用来将 Mapper 的输出写入到下游的 Reducer
split = (FileSplit) context.getInputSplit();
context.getInputSplit()
用于获取当前任务分割的信息。FileSplit
是用于表示输入数据切割的信息,这里通过它来获取当前文本数据所属的文件路径。
StringTokenizer
来分割输入行的单词。StringTokenizer
会根据空格、制表符等字符来切分字符串
MapReduce is powerful and simple变成
["MapReduce", "is", "powerful", "and", "simple"]
while (itr.hasMoreTokens()) {keyInfo.set(itr.nextToken() + " " + split.getPath().getName().toString());valueInfo.set("1");context.write(keyInfo, valueInfo);
}
循环处理每个单词。每个单词都会作为倒排索引的 key,文件名作为值的一部分
keyInfo.set(itr.nextToken() + " " + split.getPath().getName().toString())
: 每个单词和它所在的文件名组成一个新的 key
public class MyMapper extends Mapper<Object, Text, Text, Text> {private Text keyInfo = new Text(); // map输出的key变量(词+文件名)private Text valueInfo = new Text(); // map输出的value变量("1")private FileSplit split; // 用于获取当前行的文件名public void map(Object key, Text value, Context context)throws IOException, InterruptedException {split = (FileSplit) context.getInputSplit(); // 获取当前数据来自哪个文件// 分词,把当前行的字符串按空格分开StringTokenizer itr = new StringTokenizer(value.toString());while (itr.hasMoreTokens()) {// 每个词 + 文件名作为keykeyInfo.set(itr.nextToken() + " " + split.getPath().getName());valueInfo.set("1"); // 出现了一次,记为“1”(注意是字符串"1")context.write(keyInfo, valueInfo); // 输出结果}}
}
MyMapper
是我们自定义的“映射器类”,继承自 Mapper<输入类型, 输出类型>
MapReduce is powerful and simple
程序会依次输出的是 (单词+文件名, 1)MapReduce 2.txt -> 1
is 2.txt -> 1
powerful 2.txt -> 1
and 2.txt -> 1
simple 2.txt -> 1
Combine 阶段(MyCombiner)
局部词频统计,即对 mapper 输出的内容,在每个节点内部做一次小聚合
("MapReduce 2.txt", "1")
("MapReduce 2.txt", "1")
变成("MapReduce", "2.txt:2")
public class MyCombiner extends Reducer<Text, Text, Text, Text> {private Text info = new Text();public void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {int sum = 0;for (Text value : values) {sum += Integer.parseInt(value.toString());}String[] str = key.toString().split(" ");key.set(str[0]); // 把key改成单词info.set(str[1] + ":" + sum); // value为 文件名:词频context.write(key, info); // 输出如:MapReduce -> 2.txt:1}
}
int sum = 0;
for (Text value : values) {
sum += Integer.parseInt(value.toString()); // 把"1"变成数字加起来
}
然后再把 key 拆开(因为 key 是 "MapReduce 2.txt")
String[] str = key.toString().split(" "); // 分成:str[0]="MapReduce", str[1]="2.txt"
key.set(str[0]); // 只留下单词
info.set(str[1] + ":" + sum); // 文件名:次数
context.write(key, info); // ("MapReduce", "2.txt:2")
Reduce 阶段(MyReducer)
将来自多个mapper/combiner的同一个单词的 value 们拼起来
// Reducer类:处理 Map 阶段聚合后的数据(每个“单词”,对应一组“文件名:次数”)
// 泛型说明:输入是<Text, Text>,输出也是<Text, Text>
public class MyReducer extends Reducer<Text, Text, Text, Text> {// 用于存储最终拼接好的 value(也就是:某个词出现在哪些文件中)private Text result = new Text();// reduce 函数是核心,每个单词(key)对应一个文件频率列表(values)// Text key:是某个单词,例如 "MapReduce"// Iterable<Text> values:是这个单词在不同文件中的频次,如 ["1.txt:2", "2.txt:1"]// Context context:是 Hadoop 框架提供的上下文,用于输出结果public void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {// 创建一个字符串用于拼接所有文件名:词频,格式如 "1.txt:2 ; 2.txt:1 ;"String value = "";// 遍历 values 中的每一项,把它们拼接到字符串里for (Text value1 : values) {// 把每个文件名:词频 加到 value 字符串中,并在末尾加上分号分隔value += value1.toString() + " ; ";}// 把拼接好的字符串设置为最终输出的 valueresult.set(value);// 把结果写到上下文中,表示输出一行结果// 比如: ("MapReduce", "1.txt:2 ; 2.txt:1 ;")context.write(key, result);}
}
步骤 | 输入 | 输出 |
---|---|---|
Mapper | 行(每次一行) | (单词 文件名, "1") |
Combiner | (单词 文件名, ["1", "1", ...]) | (单词, 文件名:词频) |
Reducer | (单词, [文件名1:词频, 文件名2:词频...]) | 输出倒排索引 |
执行MapReduce
-
提示用户输入输入路径和输出路径(在 HDFS 上)。
-
设置 Map、Reduce 和 Combiner 类来完成倒排索引操作。
-
运行 MapReduce 程序。
-
程序运行完后,会读取输出文件并把结果打印在控制台上
// 引入必要的包(这些包是 Hadoop 提供的类)
import java.io.IOException;
import org.apache.hadoop.conf.Configuration; // Hadoop配置类
import org.apache.hadoop.fs.Path; // HDFS路径类
import org.apache.hadoop.io.Text; // MapReduce中用到的字符串类型
import org.apache.hadoop.mapreduce.Job; // MapReduce任务对象
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; // 输入路径设置
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; // 输出路径设置// 用于读取控制台输入
import java.util.Scanner;// HDFS 相关类(用于读取输出文件内容)
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import java.net.URI;public class MyRunner {// 主函数,程序的入口public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {// 创建 Hadoop 配置对象(可以理解为 MapReduce 程序的设置说明书)Configuration conf = new Configuration();// 获取一个 MapReduce 作业对象(Job 就是我们要交给 Hadoop 运行的任务)Job job = Job.getInstance(conf);// 指定当前这个 job 的主类(让 Hadoop 知道要打包哪个类)job.setJarByClass(MyRunner.class);// 设置使用的 Mapper、Reducer 和 Combiner(都在你自己写的类中)job.setMapperClass(MyMapper.class); // 指定 Map 阶段使用的类job.setReducerClass(MyReducer.class); // 指定 Reduce 阶段使用的类job.setCombinerClass(MyCombiner.class); // 指定 Combine 阶段使用的类(可选)// 设置 Reducer 输出的 key 和 value 的类型(结果的 key 和 value)job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);// 设置 Mapper 输出的 key 和 value 类型(中间过程的 key 和 value)job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);// 用来获取用户输入的输入路径和输出路径(通过命令行输入)Scanner sc = new Scanner(System.in);System.out.print("inputPath:"); // 提示输入路径String inputPath = sc.next(); // 读取路径(如 /input/)System.out.print("outputPath:"); // 提示输出路径String outputPath = sc.next(); // 读取路径(如 /output/)// 设置输入路径(注意:加上 HDFS 前缀)FileInputFormat.setInputPaths(job, new Path("hdfs://master:9000" + inputPath));// 设置输出路径(注意:不能和以前的输出目录重复)FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000" + outputPath));// 提交 job 并等待完成(true 表示打印运行日志)job.waitForCompletion(true);// --- 以下代码是:任务完成后,读取输出文件内容并打印到控制台 ---try {// 获取 Hadoop 文件系统对象FileSystem fs = FileSystem.get(new URI("hdfs://master:9000"), new Configuration());// 指定要读取的结果文件路径(MapReduce 输出文件名固定为 part-r-00000)Path srcPath = new Path(outputPath + "/part-r-00000");// 打开输出文件FSDataInputStream is = fs.open(srcPath);System.out.println("Results:");// 一行一行读取文件内容并打印while (true) {String line = is.readLine(); // 读取一行if (line == null) {break; // 读完就退出循环}System.out.println(line); // 打印这一行}// 关闭文件输入流is.close();} catch (Exception e) {e.printStackTrace(); // 如果读取失败,输出错误信息}}
}