自定义 OutputFormat案例
一、需求分析
1、内容
http://baidu.com
http://google.com
http://cn.bing.com
http://atguigu.com
http://sohu.com
http://sina.com
http://sin2a.com
http://sin2desa.com
http://sindsafa.com
2、需求
过滤输入的log日志,包含atguigu的网站输出到e:\a.log,不包含atguigu的网站输出到e:\b.log
3、分析
a、输出结果分离
b、自定义OutputFormat类
二、具体代码
1、Mapper
package com.filter;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FilterMapper extends Mapper {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// value就是一行的数据,因此,value为key
context.write(value, NullWritable.get());
}
}
注意:Mapper的输出的value的类型为 NullWritable
2、Reducer
package com.filter;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FilterReducer extends Reducer {
Text v = new Text();
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
// 1. 防止重复
for (NullWritable value : values) {
// 2.换行输入
String line = key.toString() + "\t\n";
v.set(line);
context.write(v, NullWritable.get());
}
}
}
注意:换行输入
3、自定义OutputFormat类
package com.filter;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FOutputFormat extends FileOutputFormat {
public RecordWriter getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
FrecordWriter recordWriter = new FrecordWriter(job);
return recordWriter;
}
}
注意:继承FileOutputFormat类,输入的数据值类型为,Reducer输出的数据中类型
4、实现自定义OutputFormat类的 RecordWriter继承类
package com.filter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.IOException;
public class FrecordWriter extends RecordWriter {
// 1.获取系统文件
FileSystem fs;
FSDataOutputStream aOS;
FSDataOutputStream bOS;
public FrecordWriter(TaskAttemptContext job) {
try {
Configuration conf = job.getConfiguration();
fs = FileSystem.get(conf);
// 2.创建输出文件路径
Path aPth = new Path("E:\\a.log");
Path bPth = new Path("E:\\b.log");
// 3.创建输出流
aOS = fs.create(aPth);
bOS = fs.create(bPth);
} catch (IOException e) {
e.printStackTrace();
}
}
public void write(Text key, NullWritable value) throws IOException, InterruptedException {
if (key.toString().contains("atguigu")){
aOS.write(key.toString().getBytes());
}else {
bOS.write(key.toString().getBytes());
}
}
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
IOUtils.closeStream(aOS);
IOUtils.closeStream(bOS);
}
}
注意:
a、在构造方法中,获取fs和输出流
b、在write()方法中,写主逻辑
c、在close方法中关闭
5、Driver
package com.filter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FilterDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
args = new String[]{"E:\\a\\input\\test.log", "E:\\a\\output"};
// 1. job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2. 设置jar
job.setJarByClass(FilterDriver.class);
// 3. 关联Mapper和Reduce
job.setMapperClass(FilterMapper.class);
job.setReducerClass(FilterReducer.class);
// 4. 设置Mapper的输出的k和v
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
// 5. 设置输出的k v
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 8. 设置 OutputFormat
job.setOutputFormatClass(FOutputFormat.class);
// 6. 设置输入输出
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7. 提交任务
boolean wait = job.waitForCompletion(true);
System.exit(wait? 0:1);
}
}
注意:设置OutputFormat的驱动
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
暂时没有评论,来抢沙发吧~