WritableComparable 案例部分排序

网友投稿 328 2022-11-24

WritableComparable 案例部分排序

一、需求分析 1、需求 手机号136、137、138、139开头都分别放到一个独立的4个文件中,其它开头的放到一个文件中 2、分析 a、分区 继承 Partitioner b、排序 实现 WritableComparable 二、具体代码(结合上篇博客) 1、自定义Haoop序列化类、排序 package com.sort; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class FlowBean implements WritableComparable { private long upFlow; private long downFlow; private long sumFlow; public FlowBean() { } // 排序 public int compareTo(FlowBean bean) { int result; if (this.sumFlow > bean.getSumFlow()){ result = -1; }else if (this.sumFlow < bean.getSumFlow()){ result = 1; }else { result = 0; } return result; } // 序列化 public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } // 反序列化 public void readFields(DataInput in) throws IOException { this.upFlow = in.readLong(); this.downFlow = in.readLong(); this.sumFlow = in.readLong(); } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } @Override public String toString() { return upFlow + "\t" + downFlow + "\t" + sumFlow; } } 2、Mapper package com.sort; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class SortMapper extends Mapper { FlowBean k = new FlowBean(); Text v = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 13509468723 7335 110349 117684 // 1. 读取一行数据 String line = value.toString(); String[] words = line.split("\t"); // 2.设置 key k.setUpFlow(Long.parseLong(words[1])); k.setDownFlow(Long.parseLong(words[2])); k.setSumFlow(Long.parseLong(words[3])); // 3.设置 value v.set(words[0]); // 4.写入 context.write(k, v); } } 3、自定义分区 package com.sort; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class PhonePartitioner extends Partitioner { @Override public int getPartition(FlowBean bean, Text text, int numPartitions) { // 136、137、138、139 // 1.注意 text 是 电话 int partition; String phonePre = text.toString().substring(0, 3); // 2.分区 if ("136".equals(phonePre)){ partition = 0; }else if ("137".equals(phonePre)){ partition = 1; }else if ("138".equals(phonePre)){ partition = 2; }else if ("139".equals(phonePre)){ partition = 3; }else { partition = 4; } return partition; } } 4、Reducer package com.sort; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class SortReducer extends Reducer { @Override protected void reduce(FlowBean key, Iterable values, Context context) throws IOException, InterruptedException { // 1. 循环写入 for (Text value : values) { context.write(value, key); } } } 5、Driver package com.sort; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class SortDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { args = new String[]{"E:\\a\\output", "E:\\a\\output2"}; // 1.获取job Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2.设置jar job.setJarByClass(SortDriver.class); // 3.关联mapper和reducer job.setMapperClass(SortMapper.class); job.setReducerClass(SortReducer.class); // 4.设置mapper输出的k v job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(Text.class); // 5.设置整体输出的 k, v job.setOutputKeyClass(Text.class); job.setOutputKeyClass(FlowBean.class); // 8. 设置分区 job.setPartitionerClass(PhonePartitioner.class); // 9.设置 NumReduceTask job.setNumReduceTasks(5); // 6.设置输入输出路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 7.提交job boolean wait = job.waitForCompletion(true); System.exit(wait? 0: 1); } }

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:西门子plc拓扑编辑器
下一篇:Spring Boot Admin 快速入门详解
相关文章

 发表评论

暂时没有评论,来抢沙发吧~