自定义InputFormat 案例
无论是HDFS还是MapReduce在处理小文件时,都要消耗大量内存,效率低
一、回顾
1、HDFS
har,对外对应一个NameNode,对内对应多个文件
2、MapReduce
CombineTextInputFormat,分为虚拟存储过程和切片过程
虚拟存储过程和切片过程都要和最大值做比较
改变切片
二、需求分析
1、需求
将多个小文件合并成一个SequencFile文件。
补充:SequenceFile文件是Hadoop用来存储二进制形式的key-value的文件格式,SequenceFile文件存储多个文件,存储格式为:
key:文件路径 + 文件名(Text)
value: 文件的具体内容(BytesWritable)
注意:多个小文件变成一个SequenceFile,切片过程没有改变,把多个文件存储在一起
2、分析过程
a、自定义InputFormat类,继承FileInputFormat
b、重写createRecordReader() ,返回RecordReader数据类型
c、创建类继承RecordReader类
三、代码
1、自定义的InputFormat
package com.whole;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import java.io.IOException;
/**
* 1. 返回的数据类型是 RecordReader, 因此需要返回 recordReader
* 2. 定义一个 wholeRecordReader 继承 RecordReader
* 3. 初始化(后面)
*/
public class wholeInputFormat extends FileInputFormat {
public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
wholeRecordReader recordReader = new wholeRecordReader();
recordReader.initialize(split, context);
return recordReader;
}
}
2、定义继承RecordReader的类
package com.whole;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
/**
* 0. key value 数据类型 对应 Text BytesWritable (自定义的 InputFormat 的 key value)
* 1. 初始化: 定义 FileSplit split
* 2. getCurrentKey() 返回 key
* 3. getCurrentValue() 返回 value
* 4. 编写核心业务, 具体见 下面的 注释
*/
public class wholeRecordReader extends RecordReader {
FileSplit split;
boolean isProcess = true;
Text k = new Text();
Configuration conf;
BytesWritable v = new BytesWritable();
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
// 初始化
this.split = (FileSplit)split;
conf = context.getConfiguration();
}
public boolean nextKeyValue() throws IOException, InterruptedException {
// 核心业务
if (isProcess){
// 0. 定义 buff 区
byte[] buff = new byte[(int) split.getLength()];
// 1. 通过split -> path,通过 path -> fs
Path path = split.getPath();
FileSystem fs = path.getFileSystem(conf);
// 2. 读取数据
FSDataInputStream fIS = fs.open(path);
// 3. 读取文件内容
IOUtils.readFully(fIS, buff, 0, buff.length);
// 4. 设置 v
v.set(buff, 0, buff.length);
// 5. 设置 k
k.set(split.getPath().toString()); IOUtils.closeStream(fIS);
isProcess = false;
return true;
}
return false;
}
public Text getCurrentKey() throws IOException, InterruptedException {
// 获取当前的 key
return k;
}
public BytesWritable getCurrentValue() throws IOException, InterruptedException {
// 获取当前的 value
return v;
}
public float getProgress() throws IOException, InterruptedException {
return 0;
}
public void close() throws IOException {
}
}
3、Mapper
package com.whole;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* 1.输入 key value Text BytesWritable (自定义的 InputFormat 的 key value) ; 输出 key value
*/
public class wholeMapper extends Mapper {
@Override
protected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException {
context.write(key, value);
}
}
4、Reducer
package com.whole;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class wholeReducer extends Reducer {
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
// 循环输出
for (BytesWritable value : values) {
context.write(key, value);
}
}
}
5、Driver
package com.whole;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import java.io.IOException;
public class wholeDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
args = new String[]{"E:\\a\\input", "E:\\a\\output"};
// 1. 获取 job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2. 设置 jar
job.setJarByClass(wholeDriver.class);
// 3. 关联 mapper reducer
job.setMapperClass(wholeMapper.class);
job.setReducerClass(wholeReducer.class);
// 4. 设置 mapper 输出的 key value
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(BytesWritable.class);
// 5. 设置 输出的 key value
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BytesWritable.class);
// 重要 设置 输入 输出 格式
job.setInputFormatClass(wholeInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
// 6. 设置 输入 输出 路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7. 提交 job
boolean wait = job.waitForCompletion(true);
System.exit(wait? 0: 1);
}
}
注意:
自定义的InputForamt类,只是把小文件合成SequenceFile格式的文件,并没有改变切片次数
核心代码:
1、继承RecordReade类中的 nextKeyValue() 方法
2、Driver类中 设置 InputFormat 和OutPutFormat
job.setInputFormatClass(wholeInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
暂时没有评论,来抢沙发吧~