JAVA学习-练习试用Java实现“实现一个Hadoop MapReduce任务,对大数据集中的数值进行排序和筛选”
问题:
使用java语言编辑,实现一个Hadoop MapReduce任务,对大数据集中的数值进行排序和筛选。
解答思路:
下面是一个使用Java编写的Hadoop MapReduce任务,该任务将读取一个数值数据集,然后对数值进行排序和筛选。我们将筛选出大于某个特定阈值的数值,并对这些数值进行排序。
首先,我们需要定义Map类、Reduce类和驱动类。
'IntSumMapper.java':
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class IntSumMapper extends Mapper<Object, Text, IntWritable, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private IntWritable outputKey = new IntWritable();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
// 将输入的文本转换成整数
int number = Integer.parseInt(value.toString());
// 输出该整数及其计数1
outputKey.set(number);
context.write(outputKey, one);
}
}
'IntSumReducer.java':
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class IntSumReducer extends Reducer<IntWritable, IntWritable, IntWritable, Text> {
private IntWritable result = new IntWritable();
private Text outputValue = new Text();
public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
// 仅当数值大于阈值时输出
if (sum > 1000) { // 假设阈值为1000
outputValue.set(Integer.toString(sum));
context.write(key, outputValue);
}
}
}
'IntSumDriver.java':
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class IntSumDriver {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: IntSumDriver <input path> <output path>");
System.exit(-1);
}
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "int sum");
job.setJarByClass(IntSumDriver.class);
job.setMapperClass(IntSumMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
job.setSortComparatorClass(LongWritable.class); // 使用LongWritable作为排序的key
job.setGroupingComparatorClass(LongWritable.class); // 确保所有数值都在一起进行reduce
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
编译这些类:
javac -d classes IntSumMapper.java IntSumReducer.java IntSumDriver.java
打包成jar文件:
jar -cvf IntSum.jar -C classes .
在Hadoop集群上运行MapReduce作业:
hadoop jar IntSum.jar IntSumDriver /input /output
这里'/input'是输入数据所在的HDFS路径,'/output'是输出结果将写入的HDFS路径。
该程序将输出所有大于1000的数值,并且这些数值是排序后的。需要注意,这里我们使用了'LongWritable'作为排序的key,因为'IntWritable'不能直接用于排序,而且'IntWritable'的排序是字典序,而不是数值序。使用'LongWritable'可以确保数值按照数值大小进行排序。如果你的数值范围在'IntWritable'的有效范围内,你可以直接使用'IntWritable'。
(文章为作者在学习java过程中的一些个人体会总结和借鉴,如有不当、错误的地方,请各位大佬批评指正,定当努力改正,如有侵权请联系作者删帖。)