(13) Hadoop Java 实现MapReduce HelloWord 单词统计 更新版 2

网友投稿 275 2022-11-25

(13) Hadoop Java 实现MapReduce HelloWord 单词统计 更新版 2

添加了:setup方法  和 cleanup 方法   setup是在reduce之前做一些动作  cleanup 是在reduce之后做一些动作

添加了shuffle内容介绍

package com.my.hadoop.hadoophdfs.mapreduce; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /**  *    单词统计类  * @author liming  *  */ public class ModuleMapReduce extends Configured implements Tool { /**  * TODO Map  开发时修改四个参数  * @author liming  *  */ public static class ModuleMapper extends Mapper{ //1 public void setup(Context context) throws IOException, InterruptedException { //Nothing } //2 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //TODO 实现业务逻辑 } //3 public void cleanup(Context context) throws IOException, InterruptedException { //Nothing } } /**  * TODO Reduce  开发时修改四个参数  * @author liming  *  */ public static class ModuleReducer extends Reducer{ //1 protected void setup(Context context) throws IOException, InterruptedException { //Nothing } //2 protected void reduce(Text key, Iterable values,    Context content) throws IOException, InterruptedException { //TODO 业务逻辑 } //3 protected void cleanup(Context context) throws IOException, InterruptedException { //Nothing } } /**  * Driver  */ // run 是 Tool中的方法 public int run(String[] args) throws Exception { // 获取configuration 从继承的Configured类中获取 Configuration cf = getConf(); // 创建job try { // 配置文件 job名称 Job job = Job.getInstance(cf, this.getClass().getSimpleName()); // 设置运行类的类型 job.setJarByClass(this.getClass()); /**** input ******/ // input map reduce output 串起来 Path inPath = new Path(args[0]); FileInputFormat.addInputPath(job, inPath); // TextInputFormat /**** map ******/ // map 方法类型嗯 job.setMapperClass(ModuleMapper.class); // map 输出key value 类型 job.setMapOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); /**************************shuffle*******************************************************/ //总: //1.分区  partitioner //2.排序 sort 根据key //3.复制 copy 用户无法干涉 //4.分组 group 也是根据key   //5.压缩 compress  -- 可设置 //6.合并 combiner     --->    不是所有程序都可做   -- 可设置 //shuffle 是贯穿 map阶段  和  reduce阶段  它是在map结尾+reduce头 //partition  //job.setPartitionerClass(cls); //sort  排序 // job.setCombinerClass(cls); //optional 可选 combiner // job.setCombinerClass(cls); //group 分组 // job.setGroupingComparatorClass(cls); //压缩   可以通过mapreduce配置文件进行配置 也可以通过configuration设置    看main方法 /**************************shuffle*******************************************************/ /**** reduce ******/ // reduce类型 job.setReducerClass(ModuleReducer.class); // TODO reduce 输出 也就是job输出的类型 开发时需要修改 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //调优 //设置 reduce个数  默认是1  可以在配置文件中设置mapreduce.job.reduces // job.setNumReduceTasks(2); /**** output ******/ Path outPath = new Path(args[1]); FileOutputFormat.setOutputPath(job, outPath); // TextOutputFormat   每个 对,输出一行  key与value中间分隔符为\t 默认调用key和value的toString() 方法 /**** 提交job ******/ // 返回布尔类型 这里设置true是打印日志信息 设置false是不打印日志 boolean isSucc = job.waitForCompletion(true); return isSucc ? 0 : 1; } catch (IOException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return 1; } public static void main(String[] args) throws Exception { //运行 Configuration  conf = new Configuration(); //压缩  这里可以设置多个属性  覆盖默认属性 conf.set("mapreduce.map.output.compress", "true"); //压缩格式 conf.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.SnappyCodec"); //在这个里边 设置了传递的参数conf  然后在run方法中获取 都是父类的方法 int status= ToolRunner.run(conf, new WordCountMapReduce(), args); System.exit(status); // } }

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:如何正确将传感器连接到物联网上
下一篇:(6)Hadoop 组件启动的三种方式 + 配置SSH无密码登录
相关文章

 发表评论

暂时没有评论,来抢沙发吧~