当前位置: 首页 > news >正文

HADOOP——序列化

1.创建一个data目录在主目录下,并且在data目录下新建log.txt文件

2.新建flow软件包,在example软件包下

FlowBean

package com.example.flow;import org.apache.hadoop.io.Writable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;//hadoop序列化
//三个属性:手机号。上行流量,下行流量
public class FlowBean implements Writable {private String phone;private long upFlow;private long downFlow;public FlowBean(String phone, long upFlow, long downFlow) {this.phone = phone;this.upFlow = upFlow;this.downFlow = downFlow;}//定义setter和get方法public String getPhone() {return phone;}public void setPhone(String phone) {this.phone = phone;}public long getUpFlow() {return upFlow;}public void setUpFlow(long upFlow) {this.upFlow = upFlow;}public void setDownFlow(long downFlow) {this.downFlow = downFlow;}//定义无参构造public FlowBean() {}//定义一个获取总量的方法public long getSumFlow(){return upFlow+downFlow;}@Overridepublic void write(DataOutput dataOutput) throws IOException {dataOutput.writeUTF(phone);dataOutput.writeLong(upFlow);dataOutput.writeLong(downFlow);}@Overridepublic void readFields(DataInput dataInput) throws IOException {phone = dataInput.readUTF();upFlow = dataInput.readLong();downFlow = dataInput.readLong();}public long getDownFlow() {return downFlow;}
}

FlowDriver

package com.example.flow;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class FlowDriver {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, IOException {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(FlowDriver.class);job.setMapperClass(FlowMapper.class);job.setReducerClass(FlowReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);FileInputFormat.setInputPaths(job, new Path("data"));FileOutputFormat.setOutputPath(job, new Path("output"));boolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}
}

FlowMapper

package com.example.flow;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;//1.继承Mapper
//2.重写map函数
public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {System.out.println(value);//1.获取一行数据.使用空格拆分//手机号就是第一个元素//上行流量就是第二个元素//下行流量就是第三个元素String[] split = value.toString().split("\\s+");String phone = split[0];long upFlow = Long.parseLong(split[1]);long downFlow = Long.parseLong(split[2]);//封装对象FlowBean flowBean = new FlowBean(phone,upFlow, downFlow);//写入手机号为key,值就是这个对象context.write(new Text(phone),flowBean);}
}

FlowReducer

package com.example.flow;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;//1.继承Reducer
//2.重写reducer
public class FlowReducer extends Reducer<Text,FlowBean,Text,Text> {@Overrideprotected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {//1.遍历集合,取出每一个元素,计算上行流量和下行流量的总和long upFlowSum = 0L;long downFlowSum = 0L;for (FlowBean flowBean : values) {upFlowSum += flowBean.getUpFlow();downFlowSum += flowBean.getDownFlow();}//2.计算总的汇总long sumFlow = upFlowSum + downFlowSum;String flowBean = String.format("总的上行流量是: %d,总的下行流量是:%d,总的流量是:%d",upFlowSum,downFlowSum,sumFlow);context.write(key,new Text(flowBean));}
}

相关文章:

  • 高并发多级缓存架构实现思路
  • 施磊老师基于muduo网络库的集群聊天服务器(一)
  • 微软承认Win11出现极端错误,只能强制关机或重装系统
  • typescript html input无法输入解决办法
  • 《Not All Tokens Are What You Need for Pretraining》全文翻译
  • 二进制和docker两种方式部署Apache pulsar(standalone)
  • Flink运行时架构
  • Vue3 nextTick
  • 基于sherpa-onnx 安卓语音识别尝鲜
  • 与AI深度融合的Go开发框架sponge,解决使用cursor、trae等AI辅助编程工具开发项目时的部分痛点
  • n8n 为技术团队打造的安全工作流自动化平台
  • 优化 Dockerfile 性能之实践(Practice of Optimizing Dockerfile Performance)
  • 【场景应用9】多语言预训练语音模型进行自动语音识别
  • 基于骨骼识别的危险动作报警分析系统
  • 基于uniapp的鸿蒙APP大数据量性能优化
  • 招贤纳士|Walrus 亚太地区招聘高级开发者关系工程师
  • 量化视角:比特币美债黄金三角博弈的DeepSeek推演
  • 1.2 使用RawInputSharp来取得键盘硬件信息以及虚拟码
  • Being-0:具有视觉-语言模型和模块化技能的人形机器人智体
  • QT —— 信号和槽(槽函数)
  • 德清县建设银行官方网站/网络营销平台名词解释
  • DW怎么做招聘网站/东莞网站建设最牛
  • 河南省住房城乡和建设厅网站首页/百度地图网页版进入
  • 做站群网站好优化吗/新手怎么做seo优化
  • php网站建设安装环境/求职seo推荐
  • 商城网站的搜索记录代码怎么做/公关