NLineInputFormat 案例

网友投稿 264 2022-11-24

NLineInputFormat 案例

一、需求分析 1、文件 hadoop is ok hadoop not ok java is fun php is ok php is pretty python is all go is new 2、需求 对上述文件中每个单词出现的数量进行统计,2行数据一个切片 3、分析 与传统的WordCount相似,但是按行切片,而不是BlockSize 二、代码 前提条件:创建Maven项目,导入依赖,配置log日志 1、Mapper package com.ln; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class LNMapper extends Mapper { Text k = new Text(); IntWritable v = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1.获取行 String line = value.toString(); // 2.切割 String[] words = line.split("\\s+"); // 3.循环写入 for (String word : words) { k.set(word); context.write(k, v); } } } 2、Reducer package com.ln; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class LNReducer extends Reducer { IntWritable v = new IntWritable(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { // 1.累加 int sum = 0; for (IntWritable value : values) { sum += value.get(); } // 2. 写入 v.set(sum); context.write(key, v); } } 3、Driver package com.ln; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class LNDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { args = new String[]{"E:\\a\\input", "E:\\a\\output"}; // 1. 获取job Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2. 设置Jar job.setJarByClass(LNDriver.class); // 3. 关联 mapper 和 reducer job.setMapperClass(LNMapper.class); job.setReducerClass(LNReducer.class); // 4. 设置 mapper的输出 kv job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 5. 设置 最终 输出 kv job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // a. 设置 每个切片中 2 调记录 NLineInputFormat.setNumLinesPerSplit(job, 2); // b、设置 inputFormat 的 格式 job.setInputFormatClass(NLineInputFormat.class); // 6. 设置 输入 输出路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 7. 提交 job boolean wait = job.waitForCompletion(true); System.exit(wait? 0: 1); } } 注意: 核心代码 1、设置一个切片有多少行数据 NLineInputFormat.setNumLinesPerSplit(job, 2); 2、设置InputFormat的格式 job.setInputFormatClass(NLineInputFormat.class); 结果: 运行完成后: number of splits:4

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:详解如何在springcloud分布式系统中实现分布式锁
下一篇:控制EMC的主要方法
相关文章

 发表评论

暂时没有评论,来抢沙发吧~