当前位置: 首页 > news >正文

hadoop实现一个序列化案例

1)需求

统计每一个手机号耗费的总上行流量、总下行流量、总流量

1)输入数据1)需求
统计每一个手机号耗费的总上行流量、总下行流量、总流量
(1)输入数据

        创建一个名为:phone_data.txt

        1    13736230513    192.196.100.1    www.atguigu.com    2481    24681    200
2    13846544121    192.196.100.2            264    0    200
3     13956435636    192.196.100.3            132    1512    200
4     13966251146    192.168.100.1            240    0    404
5     18271575951    192.168.100.2    www.atguigu.com    1527    2106    200
6     84188413    192.168.100.3    www.atguigu.com    4116    1432    200
7     13590439668    192.168.100.4            1116    954    200
8     15910133277    192.168.100.5    www.hao123.com    3156    2936    200
9     13729199489    192.168.100.6            240    0    200
10     13630577991    192.168.100.7    www.shouhu.com    6960    690    200
11     15043685818    192.168.100.8    www.baidu.com    3659    3538    200
12     15959002129    192.168.100.9    www.atguigu.com    1938    180    500
13     13560439638    192.168.100.10            918    4938    200
14     13470253144    192.168.100.11            180    180    200
15     13682846555    192.168.100.12    www.qq.com    1938    2910    200
16     13992314666    192.168.100.13    www.gaga.com    3008    3720    200
17     13509468723    192.168.100.14    www.qinghua.com    7335    110349    404
18     18390173782    192.168.100.15    www.sogou.com    9531    2412    200
19     13975057813    192.168.100.16    www.baidu.com    11058    48243    200
20     13768778790    192.168.100.17            120    120    200
21     13568436656    192.168.100.18    www.alibaba.com    2481    24681    200
22     13568436656    192.168.100.19            1116    954    200

输入数据格式:

7      13560436666     120.196.100.99           1116          954                  200

id      手机号码           网络ip                       上行流量  下行流量     网络状态码

3)期望输出数据格式

13560436666             1116                954                         2070

手机号码               上行流量        下行流量                 总流量

2)需求分析

3)编写MapReduce程序

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>MRDemo</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target></properties><dependencies>
<!--        Hadoop所需要的依赖--><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version></dependency>
<!--        单元测试--><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency>
<!--        log4j:日志管理工具--><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.25</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-core</artifactId><version>3.1.3</version></dependency></dependencies></project>

在resources下面创建一个文件将下面这个复制进去

log4j.rootLogger=INFO, stdout  
log4j.appender.stdout=org.apache.log4j.ConsoleAppender  
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout  
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n  
log4j.appender.logfile=org.apache.log4j.FileAppender  
log4j.appender.logfile.File=target/spring.log  
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout  
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
package com.ygre.mr.witable2;import org.apache.hadoop.io.Writable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;/*JavaBean:1.自定义类,并实现Writable接口2.重写write和readFields方法*/
public class FlowBean implements Writable {private long upFlow;private long downFlow;private long sumFlow;public FlowBean(){}/*序列化时调用该方法*/@Overridepublic void write(DataOutput out) throws IOException {out.writeLong(upFlow);out.writeLong(downFlow);out.writeLong(sumFlow);}/*反序列化时调用该方法注意:反序列化时的顺序和序列化时的顺序要保持一致*/@Overridepublic void readFields(DataInput in) throws IOException {upFlow =  in.readLong();downFlow = in.readLong();sumFlow = in.readLong();}@Overridepublic String toString() {return upFlow + " "+downFlow + " "+sumFlow;}public long getUpFlow() {return upFlow;}public void setUpFlow(long upFlow) {this.upFlow = upFlow;}public long getDownFlow() {return downFlow;}public void setDownFlow(long downFlow) {this.downFlow = downFlow;}public long getSumFlow() {return sumFlow;}public void setSumFlow(long sumFlow) {this.sumFlow = sumFlow;}
}
package com.ygre.mr.witable2;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*** Mapper阶段会运行MapTask,MapTask会调用Mapper类* 作用:在该类中实现需要在MapTask实现的业务逻辑代码**** <KEYIN, VALUEIN, KEYOUT, VALUEOUT>*     第一组泛型:*          KEYIN:读取数据时的偏移量的类型*          VALUEIN:读取的一行一行的数据的类型*     第二组泛型:*          KEYOUT:写出的key的类型(在这是手机号的类型)*          VALUEOUT:写出的value的类型(在这是FlowBean)**/public class FlowMapper extends Mapper<LongWritable, Text,Text,FlowBean> {private Text outKey = new Text(); //创建的key的对象private FlowBean outValue = new FlowBean(); //创建的value的对象/*** 1.在map方法中实现需要在MapTask中实现的业务逻辑代码* 1.该方法在被循环调用,每调用一次传入一行数据* @param key 读取数据时的偏移量* @param value 读取的数据(一行一行的数据)* @param context 上下文(在这里用来将key,value写出去)* @throws IOException* @throws InterruptedException** 1	13736230513	192.196.100.1	www.atguigu.com	2481	24681	200**/@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {//1.将数据切割String[] phoneInfo = value.toString().split("\t");//2.封装key,value// 给key赋值outKey.set(phoneInfo[1]);//给value赋值outValue.setUpFlow(Long.parseLong(phoneInfo[phoneInfo.length-3]));outValue.setDownFlow(Long.parseLong(phoneInfo[phoneInfo.length-2]));outValue.setSumFlow(outValue.getUpFlow() + outValue.getDownFlow());//3.将key,value写出去context.write(outKey,outValue);}
}
package com.ygre.mr.witable2;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.io.Text;
import java.io.IOException;/*** Reducer阶段会运行ReducerTask,ReducerTask会调用Reducer类* 作用:在该类中实现需要在ReducerTask实现的业务逻辑代码***** Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>*     第一组泛型:*          KEYIN:读取的key的类型(Mapper写出的key的类型)*          VALUEIN:读取的value的类型(Mapper写的的value的类型)*     第二组泛型:*          KEYOUT:写出的key的类型(在这是手机号的类型)*          VALUEOUT:写出的value的类型(在这是FlowBean)*/
public class FlowReducer extends Reducer<Text,FlowBean, Text,FlowBean> {private FlowBean outValue = new FlowBean(); //创建value对象/*** 1.在reduce方法中实现需要在ReduceTask中实现的逻辑代码* 2.reduce方法在被循环调用,每调用一次传入一组数据(在这key值相同为一组)* @param key 读取的key* @param values 读取的所有的value* @param context 上下文(在这用来将key,value写出去)* @throws IOException* @throws InterruptedException*** 15322222222 10 10 20* 15322222222 20 6 26*/@Overrideprotected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context) throws IOException, InterruptedException {long sumUpFlow = 0; // 总上行long sumDownFlow = 0; // 总下行//1.遍历所有Valuefor (FlowBean value:values){// 将上行流量累加sumUpFlow += value.getUpFlow();// 将下行流量累加sumDownFlow += value.getDownFlow();}//2. 封装key,value// 给value赋值outValue.setUpFlow(sumUpFlow);outValue.setDownFlow(sumDownFlow);outValue.setSumFlow(outValue.getUpFlow() + outValue.getDownFlow());// 3. 将key,value写出去context.write(key,outValue);}
}
package com.ygre.mr.witable2;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class FlowDriver {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {//1.创建Job实例Job job = Job.getInstance(new Configuration());// 2.给Job赋值// 2.1 关联本程序的Jar-->如果是本地可以不写在集群上必须写job.setJarByClass(FlowDriver.class);//2.2 设置Mapper和Reducerjob.setMapperClass(FlowMapper.class);job.setReducerClass(FlowReducer.class);// 2.3 设置Mapper输出的key,value的类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);// 2.4 设置最终输出的key,value的类型(在这是reducer输出的key,value的类型)job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);//2.5 设置输入输出路径FileInputFormat.setInputPaths(job,new Path("D:\\WelcomeCode\\Hadoop_Code\\MRDemo\\input2"));FileOutputFormat.setOutputPath(job,new Path("D:\\WelcomeCode\\Hadoop_Code\\MRDemo\\output2"));// 输出路径不能存在,否则报错// 3.运行Job/*boolean waitForCompletion(boolean verbose)verbose:是否打印信息返回值:如果job执行true*/job.waitForCompletion(true);}
}

http://www.dtcms.com/a/389223.html

相关文章:

  • DBG数据库加密网关实现mySQL敏感数据动态脱敏与加密全攻略
  • 解决 Vue SPA 刷新导致 404 的问题
  • 大型语言模型 (LLMs) 的演进历程:从架构革命到智能涌现
  • 大语言模型为什么要叫【模型】
  • 教程上新丨ACL机器翻译大赛30个语种摘冠,腾讯Hunyuan-MT-7B支持33种语言翻译
  • 《C++程序设计》笔记
  • NVR接入录像回放平台EasyCVR海康设备视频平台视频监控系统常见故障与排查全解析
  • 半导体制造常提到的Fan-Out晶圆级封装是什么?
  • Qt 系统相关 - 文件
  • P2242 公路维修问题
  • 安装wsl
  • 牛客多校04C :Computational Geometry Problem(p-Dyck路计数)
  • CMake+visual studio 2022 +qt6 , 从Linux平台移植到windows下平台开发
  • 大模型系列——Playwright MCP 可以复用 Chrome 登录态了
  • 三星S25 Edge 与iPhone 17 Air:最新对比
  • 电脑怎么连接wifi?【图文详解】笔记本电脑怎么连接无线wifi?笔记本电脑连不上wifi怎么办?
  • 设计模式-代理模式详解
  • 怎样让AI图生3D更加高质高效
  • Java 集合框架 Set 接口:实现类的底层数据结构与核心特点
  • 【大模型】使用Qwen-VL大模型进行验证码识别的完整指南
  • 深度学习体系化入门:从理论到实践的完整框架
  • 餐饮行业系统集成分享:OMS 订单数据推送ERP 核算
  • 深入剖析OpenHarmony ClearPlay DRM驱动:从HDI接口到动态加载的完整实现路径
  • [WesternCTF2018]shrine
  • 硬件 - RK3588部分(2) - 原理图 - 最小系统
  • Android进阶之路 - 从 URL Scheme 到 Deep Link 与 App Link
  • MySQL监控Shell脚本实战指南
  • 【Android】Jetpack Media3 如何播放音频文件 实现视频播放器
  • Android 开发 集成 uni 小程序,并实现相互通信
  • 【office】怎么设置第一章二级标题为1.1 1.2 1.3然后第二章为2.1 2.2 2.3这样子