hadoop实现一个序列化案例
1)需求
统计每一个手机号耗费的总上行流量、总下行流量、总流量
(1)输入数据1)需求
统计每一个手机号耗费的总上行流量、总下行流量、总流量
(1)输入数据
创建一个名为:phone_data.txt
1 13736230513 192.196.100.1 www.atguigu.com 2481 24681 200
2 13846544121 192.196.100.2 264 0 200
3 13956435636 192.196.100.3 132 1512 200
4 13966251146 192.168.100.1 240 0 404
5 18271575951 192.168.100.2 www.atguigu.com 1527 2106 200
6 84188413 192.168.100.3 www.atguigu.com 4116 1432 200
7 13590439668 192.168.100.4 1116 954 200
8 15910133277 192.168.100.5 www.hao123.com 3156 2936 200
9 13729199489 192.168.100.6 240 0 200
10 13630577991 192.168.100.7 www.shouhu.com 6960 690 200
11 15043685818 192.168.100.8 www.baidu.com 3659 3538 200
12 15959002129 192.168.100.9 www.atguigu.com 1938 180 500
13 13560439638 192.168.100.10 918 4938 200
14 13470253144 192.168.100.11 180 180 200
15 13682846555 192.168.100.12 www.qq.com 1938 2910 200
16 13992314666 192.168.100.13 www.gaga.com 3008 3720 200
17 13509468723 192.168.100.14 www.qinghua.com 7335 110349 404
18 18390173782 192.168.100.15 www.sogou.com 9531 2412 200
19 13975057813 192.168.100.16 www.baidu.com 11058 48243 200
20 13768778790 192.168.100.17 120 120 200
21 13568436656 192.168.100.18 www.alibaba.com 2481 24681 200
22 13568436656 192.168.100.19 1116 954 200
输入数据格式:
7 13560436666 120.196.100.99 1116 954 200
id 手机号码 网络ip 上行流量 下行流量 网络状态码
(3)期望输出数据格式
13560436666 1116 954 2070
手机号码 上行流量 下行流量 总流量
2)需求分析
3)编写MapReduce程序
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>MRDemo</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target></properties><dependencies>
<!-- Hadoop所需要的依赖--><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version></dependency>
<!-- 单元测试--><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency>
<!-- log4j:日志管理工具--><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.25</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-core</artifactId><version>3.1.3</version></dependency></dependencies></project>
在resources下面创建一个文件将下面这个复制进去
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
package com.ygre.mr.witable2;import org.apache.hadoop.io.Writable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;/*JavaBean:1.自定义类,并实现Writable接口2.重写write和readFields方法*/
public class FlowBean implements Writable {private long upFlow;private long downFlow;private long sumFlow;public FlowBean(){}/*序列化时调用该方法*/@Overridepublic void write(DataOutput out) throws IOException {out.writeLong(upFlow);out.writeLong(downFlow);out.writeLong(sumFlow);}/*反序列化时调用该方法注意:反序列化时的顺序和序列化时的顺序要保持一致*/@Overridepublic void readFields(DataInput in) throws IOException {upFlow = in.readLong();downFlow = in.readLong();sumFlow = in.readLong();}@Overridepublic String toString() {return upFlow + " "+downFlow + " "+sumFlow;}public long getUpFlow() {return upFlow;}public void setUpFlow(long upFlow) {this.upFlow = upFlow;}public long getDownFlow() {return downFlow;}public void setDownFlow(long downFlow) {this.downFlow = downFlow;}public long getSumFlow() {return sumFlow;}public void setSumFlow(long sumFlow) {this.sumFlow = sumFlow;}
}
package com.ygre.mr.witable2;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*** Mapper阶段会运行MapTask,MapTask会调用Mapper类* 作用:在该类中实现需要在MapTask实现的业务逻辑代码**** <KEYIN, VALUEIN, KEYOUT, VALUEOUT>* 第一组泛型:* KEYIN:读取数据时的偏移量的类型* VALUEIN:读取的一行一行的数据的类型* 第二组泛型:* KEYOUT:写出的key的类型(在这是手机号的类型)* VALUEOUT:写出的value的类型(在这是FlowBean)**/public class FlowMapper extends Mapper<LongWritable, Text,Text,FlowBean> {private Text outKey = new Text(); //创建的key的对象private FlowBean outValue = new FlowBean(); //创建的value的对象/*** 1.在map方法中实现需要在MapTask中实现的业务逻辑代码* 1.该方法在被循环调用,每调用一次传入一行数据* @param key 读取数据时的偏移量* @param value 读取的数据(一行一行的数据)* @param context 上下文(在这里用来将key,value写出去)* @throws IOException* @throws InterruptedException** 1 13736230513 192.196.100.1 www.atguigu.com 2481 24681 200**/@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {//1.将数据切割String[] phoneInfo = value.toString().split("\t");//2.封装key,value// 给key赋值outKey.set(phoneInfo[1]);//给value赋值outValue.setUpFlow(Long.parseLong(phoneInfo[phoneInfo.length-3]));outValue.setDownFlow(Long.parseLong(phoneInfo[phoneInfo.length-2]));outValue.setSumFlow(outValue.getUpFlow() + outValue.getDownFlow());//3.将key,value写出去context.write(outKey,outValue);}
}
package com.ygre.mr.witable2;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.io.Text;
import java.io.IOException;/*** Reducer阶段会运行ReducerTask,ReducerTask会调用Reducer类* 作用:在该类中实现需要在ReducerTask实现的业务逻辑代码***** Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>* 第一组泛型:* KEYIN:读取的key的类型(Mapper写出的key的类型)* VALUEIN:读取的value的类型(Mapper写的的value的类型)* 第二组泛型:* KEYOUT:写出的key的类型(在这是手机号的类型)* VALUEOUT:写出的value的类型(在这是FlowBean)*/
public class FlowReducer extends Reducer<Text,FlowBean, Text,FlowBean> {private FlowBean outValue = new FlowBean(); //创建value对象/*** 1.在reduce方法中实现需要在ReduceTask中实现的逻辑代码* 2.reduce方法在被循环调用,每调用一次传入一组数据(在这key值相同为一组)* @param key 读取的key* @param values 读取的所有的value* @param context 上下文(在这用来将key,value写出去)* @throws IOException* @throws InterruptedException*** 15322222222 10 10 20* 15322222222 20 6 26*/@Overrideprotected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context) throws IOException, InterruptedException {long sumUpFlow = 0; // 总上行long sumDownFlow = 0; // 总下行//1.遍历所有Valuefor (FlowBean value:values){// 将上行流量累加sumUpFlow += value.getUpFlow();// 将下行流量累加sumDownFlow += value.getDownFlow();}//2. 封装key,value// 给value赋值outValue.setUpFlow(sumUpFlow);outValue.setDownFlow(sumDownFlow);outValue.setSumFlow(outValue.getUpFlow() + outValue.getDownFlow());// 3. 将key,value写出去context.write(key,outValue);}
}
package com.ygre.mr.witable2;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class FlowDriver {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {//1.创建Job实例Job job = Job.getInstance(new Configuration());// 2.给Job赋值// 2.1 关联本程序的Jar-->如果是本地可以不写在集群上必须写job.setJarByClass(FlowDriver.class);//2.2 设置Mapper和Reducerjob.setMapperClass(FlowMapper.class);job.setReducerClass(FlowReducer.class);// 2.3 设置Mapper输出的key,value的类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);// 2.4 设置最终输出的key,value的类型(在这是reducer输出的key,value的类型)job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);//2.5 设置输入输出路径FileInputFormat.setInputPaths(job,new Path("D:\\WelcomeCode\\Hadoop_Code\\MRDemo\\input2"));FileOutputFormat.setOutputPath(job,new Path("D:\\WelcomeCode\\Hadoop_Code\\MRDemo\\output2"));// 输出路径不能存在,否则报错// 3.运行Job/*boolean waitForCompletion(boolean verbose)verbose:是否打印信息返回值:如果job执行true*/job.waitForCompletion(true);}
}