WritableComparable 案例 全排序

网友投稿 270 2022-11-24

WritableComparable 案例 全排序

一、需求分析 1、需求 按照流量降序排序 2、分析 a、原文件的总流量是value,排序是按照key进行排序的,因此需要把 value -> key b、自定义Hadoop序列化类,(需要有排序功能) 实现 WritableComparable 二、代码 1、自定义Hadoop序列化,实现WritableComparable package com.sort; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class FlowBean implements WritableComparable { private long upFlow; private long downFlow; private long sumFlow; public FlowBean() { } // 排序 public int compareTo(FlowBean bean) { int result; if (this.sumFlow > bean.getSumFlow()){ result = -1; }else if (this.sumFlow < bean.getSumFlow()){ result = 1; }else { result = 0; } return result; } // 序列化 public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } // 反序列化 public void readFields(DataInput in) throws IOException { this.upFlow = in.readLong(); this.downFlow = in.readLong(); this.sumFlow = in.readLong(); } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } @Override public String toString() { return upFlow + "\t" + downFlow + "\t" + sumFlow; } } 2、Mapper package com.sort; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class SortMapper extends Mapper { FlowBean k = new FlowBean(); Text v = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 13509468723 7335 110349 117684 // 1. 读取一行数据 String line = value.toString(); String[] words = line.split("\t"); // 2.设置 key k.setUpFlow(Long.parseLong(words[1])); k.setDownFlow(Long.parseLong(words[2])); k.setSumFlow(Long.parseLong(words[3])); // 3.设置 value v.set(words[0]); // 4.写入 context.write(k, v); } } 注意:需要把FlowBean 作为输出的 Key,Text作为输出的 Value 3、Reducer package com.sort; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class SortReducer extends Reducer { @Override protected void reduce(FlowBean key, Iterable values, Context context) throws IOException, InterruptedException { // 1. 循环写入 for (Text value : values) { context.write(value, key); } } } 注意:Values含有一个数据,但为了以防万一,使用for循环遍历 4、Driver package com.sort; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class SortDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { args = new String[]{"E:\\a\\output", "E:\\a\\output1"}; // 1.获取job Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2.设置jar job.setJarByClass(SortDriver.class); // 3.关联mapper和reducer job.setMapperClass(SortMapper.class); job.setReducerClass(SortReducer.class); // 4.设置mapper输出的k v job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(Text.class); // 5.设置整体输出的 k, v job.setOutputKeyClass(Text.class); job.setOutputKeyClass(FlowBean.class); // 6.设置输入输出路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 7.提交job boolean wait = job.waitForCompletion(true); System.exit(wait? 0: 1); } }

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Spring Boot Admin 快速入门详解
下一篇:VGA的驱动显示以及逻辑分析仪的使用
相关文章

 发表评论

暂时没有评论,来抢沙发吧~