mapreduce作业提交分析

网友投稿 267 2022-11-22

mapreduce作业提交分析

背景

本文档聚焦于mapreduce作业的jar包是如何运行到Yarn上。

hadoop jar包执行逻辑

在hadoop客户端运行mapreducejar包,如下命令:

hadoop jar /home/yuliang02/hadoop-2.6.4/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.4.jar pi 10 10

上述命令启动了hadoop脚本,查看hadoop脚本的内容(去掉了不关键的部分):

#定义默认的DEFAULT_LIBEXEC_DIR路径 DEFAULT_LIBEXEC_DIR="$bin"/../libexec #如果没有设置HADOOP_LIBEXEC_DIR,就是用变量DEFAULT_LIBEXEC_DIR值 HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR} #执行hadoop/libexec目录中的hadoop-config.sh脚本,该脚本定义了JAVA变量,hadoop会使用到这个变量:JAVA=$JAVA_HOME/bin/java,用于定义JAVA所在的路径 . $HADOOP_LIBEXEC_DIR/hadoop-config.sh ... #如果是hadoop jar命令,则令CLASS=org.apache.hadoop.util.RunJar COMMAND=$1 elif [ "$COMMAND" = "jar" ] ; then CLASS=org.apache.hadoop.util.RunJar ... #$@的所有参数中,去掉最左边的一个参数,将jar wordcount.jar com.mapred.WordCount中的jar参数列表中去除,只需要/home/yuliang02/hadoop-2.6.4/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.4.jar pi 10 10参数,传递给java类Runjar,Runjar类解析hadoop-mapreduce-examples-2.6.4.jar包并运行com.mapred.WordCount类。 shift #执行java org.apache.hadoop.util.RunJar /home/yuliang02/hadoop-2.6.4/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.4.jar pi 10 10 #其中$@是所有参数,即/home/yuliang02/hadoop-2.6.4/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.4.jar pi 10 10 exec "$JAVA" $JAVA_HEAP_MAX $HADOOP_OPTS $CLASS "$@"

hadoop脚本中执行了org.apache.hadoop.util.RunJar类,查看RunJar类逻辑:

public static void main(String[] args) throws Throwable { String usage = "RunJar jarFile [mainClass] args..."; //如果hadoop jar后没有参数,退出程序,打印提示信息"RunJar jarFile [mainClass] args..." if (args.length < 1) { System.err.println(usage); System.exit(-1); } //第一个参数是jar包名称wordcount.jar int firstArg = 0; String fileName = args[firstArg++]; File file = new File(fileName); //检查jar包是否存在,不存在就退出程序,打印错误信息 if (!file.exists() || !file.isFile()) { System.err.println("Not a valid JAR: " + file.getCanonicalPath()); System.exit(-1); } String mainClassName = null; //获取jar包文件中指定的MainClass JarFile jarFile; try { jarFile = new JarFile(fileName); } catch(IOException io) { throw new IOException("Error opening job jar: " + fileName) .initCause(io); } Manifest manifest = jarFile.getManifest(); if (manifest != null) { mainClassName = manifest.getMainAttributes().getValue("Main-Class"); } jarFile.close(); //如果在打jar包时,没有指定Main-Class,就获取第二参数,即命令行中MainClass参数:com.mapred.WordCount if (mainClassName == null) { if (args.length < 2) { System.err.println(usage); System.exit(-1); } mainClassName = args[firstArg++]; } mainClassName = mainClassName.replaceAll("/", "."); //获取linux中java.io.tmpdir路径,默认是/tmp File tmpDir = new File(System.getProperty("java.io.tmpdir")); //创建该目录 ensureDirectory(tmpDir); //在/tmp目录下,生成一个类似hadoop-unjar1457360152697481534随机文件名的文件 final File workDir; try { workDir = File.createTempFile("hadoop-unjar", "", tmpDir); } catch (IOException ioe) { // If user has insufficient perms to write to tmpDir, default // "Permission denied" message doesn't specify a filename. System.err.println("Error creating temp dir in java.io.tmpdir " + tmpDir + " due to " + ioe.getMessage()); System.exit(-1); return; } //删除该文件 if (!workDir.delete()) { System.err.println("Delete failed for " + workDir); System.exit(-1); } //基于删除的文件名,创建一个目录,这个目录就是临时目录(java6只有createTempFile方法,创建临时目录比较麻烦,java7开始有createTempDirectory方法,比较方便) ensureDirectory(workDir); //进程关闭时,删除hadoop-unjar1457360152697481534目录及目录下的所有文件 ShutdownHookManager.get().addShutdownHook( new Runnable() { @Override public void run() { FileUtil.fullyDelete(workDir); } }, SHUTDOWN_HOOK_PRIORITY); //将hadoop-mapreduce-examples-2.6.4.jar包解压到hadoop-unjar1457360152697481534这个临时目录中 unJar(file, workDir); //加载mainClass ClassLoader loader = createClassLoader(file, workDir); Thread.currentThread().setContextClassLoader(loader); Class mainClass = Class.forName(mainClassName, true, loader); //获取main方法 Method main = mainClass.getMethod("main", new Class[] { Array.newInstance(String.class, 0).getClass() }); String[] newArgs = Arrays.asList(args) .subList(firstArg, args.length).toArray(new String[0]); try { //执行main方法 main.invoke(null, new Object[] { newArgs }); } catch (InvocationTargetException e) { throw e.getTargetException(); } }

RunJar类解压jar包后,形成如下目录:

yuliang02@hadoop-master:/tmp/hadoop-unjar8401487899319139479$ tree . |-- META-INF | |-- MANIFEST.MF | `-- maven | `-- org.apache.hadoop | `-- hadoop-mapreduce-examples | |-- pom.properties | `-- pom.xml `-- org `-- apache `-- hadoop `-- examples |-- AggregateWordCount.class |-- AggregateWordCount$WordCountPlugInClass.class |-- AggregateWordHistogram$AggregateWordHistogramPlugin.class |-- AggregateWordHistogram.class |-- BaileyBorweinPlouffe$1.class |-- BaileyBorweinPlouffe$BbpInputFormat$1.class |-- BaileyBorweinPlouffe$BbpInputFormat.class |-- BaileyBorweinPlouffe$BbpMapper.class |-- BaileyBorweinPlouffe$BbpReducer$1.class |-- BaileyBorweinPlouffe$BbpReducer.class |-- BaileyBorweinPlouffe$BbpSplit.class |-- BaileyBorweinPlouffe.class |-- BaileyBorweinPlouffe$Fraction.class |-- dancing | |-- DancingLinks.class | |-- DancingLinks$ColumnHeader.class | |-- DancingLinks$Node.class | |-- DancingLinks$SolutionAcceptor.class | |-- DistributedPentomino.class | |-- DistributedPentomino$PentMap.class | |-- DistributedPentomino$PentMap$SolutionCatcher.class | |-- OneSidedPentomino.class | |-- Pentomino.class | |-- Pentomino$ColumnName.class | |-- Pentomino$Piece.class | |-- Pentomino$Point.class | |-- Pentomino$SolutionCategory.class | |-- Pentomino$SolutionPrinter.class | |-- Sudoku$CellConstraint.class | |-- Sudoku.class | |-- Sudoku$ColumnConstraint.class | |-- Sudoku$ColumnName.class | |-- Sudoku$RowConstraint.class | |-- Sudoku$SolutionPrinter.class | `-- Sudoku$SquareConstraint.class |-- DBCountPageView$AccessRecord.class |-- DBCountPageView.class |-- DBCountPageView$PageviewMapper.class |-- DBCountPageView$PageviewRecord.class |-- DBCountPageView$PageviewReducer.class |-- ExampleDriver.class |-- Grep.class |-- Join.class |-- MultiFileWordCount.class |-- MultiFileWordCount$CombineFileLineRecordReader.class |-- MultiFileWordCount$MapClass.class |-- MultiFileWordCount$MyInputFormat.class |-- MultiFileWordCount$WordOffset.class |-- pi | |-- Combinable.class | |-- Container.class | |-- DistBbp.class | |-- DistSum$1.class | |-- DistSum.class | |-- DistSum$Computation.class | |-- DistSum$Machine$AbstractInputFormat$1.class | |-- DistSum$Machine$AbstractInputFormat.class | |-- DistSum$Machine.class | |-- DistSum$Machine$SummationSplit.class | |-- DistSum$MapSide.class | |-- DistSum$MapSide$PartitionInputFormat.class | |-- DistSum$MapSide$SummingMapper.class | |-- DistSum$MixMachine.class | |-- DistSum$Parameters.class | |-- DistSum$ReduceSide.class | |-- DistSum$ReduceSide$IndexPartitioner.class | |-- DistSum$ReduceSide$PartitionMapper.class | |-- DistSum$ReduceSide$SummationInputFormat.class | |-- DistSum$ReduceSide$SummingReducer.class | |-- math | | |-- ArithmeticProgression.class | | |-- Bellard$1.class | | |-- Bellard.class | | |-- Bellard$Parameter.class | | |-- Bellard$Sum$1.class | | |-- Bellard$Sum.class | | |-- Bellard$Sum$Tail.class | | |-- LongLong.class | | |-- Modular.class | | |-- Montgomery.class | | |-- Montgomery$Product.class | | `-- Summation.class | |-- Parser.class | |-- SummationWritable$ArithmeticProgressionWritable.class | |-- SummationWritable.class | |-- TaskResult.class | |-- Util.class | `-- Util$Timer.class |-- QuasiMonteCarlo.class |-- QuasiMonteCarlo$HaltonSequence.class |-- QuasiMonteCarlo$QmcMapper.class |-- QuasiMonteCarlo$QmcReducer.class |-- RandomTextWriter.class |-- RandomTextWriter$Counters.class |-- RandomTextWriter$RandomTextMapper.class |-- RandomWriter.class |-- RandomWriter$Counters.class |-- RandomWriter$RandomInputFormat.class |-- RandomWriter$RandomInputFormat$RandomRecordReader.class |-- RandomWriter$RandomMapper.class |-- SecondarySort.class |-- SecondarySort$FirstGroupingComparator.class |-- SecondarySort$FirstPartitioner.class |-- SecondarySort$IntPair.class |-- SecondarySort$IntPair$Comparator.class |-- SecondarySort$MapClass.class |-- SecondarySort$Reduce.class |-- Sort.class |-- terasort | |-- GenSort.class | |-- Random16.class | |-- Random16$RandomConstant.class | |-- TeraChecksum$ChecksumMapper.class | |-- TeraChecksum$ChecksumReducer.class | |-- TeraChecksum.class | |-- TeraGen.class | |-- TeraGen$Counters.class | |-- TeraGen$RangeInputFormat.class | |-- TeraGen$RangeInputFormat$RangeInputSplit.class | |-- TeraGen$RangeInputFormat$RangeRecordReader.class | |-- TeraGen$SortGenMapper.class | |-- TeraInputFormat$1.class | |-- TeraInputFormat.class | |-- TeraInputFormat$SamplerThreadGroup.class | |-- TeraInputFormat$TeraRecordReader.class | |-- TeraInputFormat$TextSampler.class | |-- TeraOutputFormat.class | |-- TeraOutputFormat$TeraRecordWriter.class | |-- TeraScheduler.class | |-- TeraScheduler$Host.class | |-- TeraScheduler$Split.class | |-- TeraSort.class | |-- TeraSort$SimplePartitioner.class | |-- TeraSort$TotalOrderPartitioner.class | |-- TeraSort$TotalOrderPartitioner$InnerTrieNode.class | |-- TeraSort$TotalOrderPartitioner$LeafTrieNode.class | |-- TeraSort$TotalOrderPartitioner$TrieNode.class | |-- TeraValidate.class | |-- TeraValidate$ValidateMapper.class | |-- TeraValidate$ValidateReducer.class | `-- Unsigned16.class |-- WordCount.class |-- WordCount$IntSumReducer.class |-- WordCount$TokenizerMapper.class |-- WordMean.class |-- WordMean$WordMeanMapper.class |-- WordMean$WordMeanReducer.class |-- WordMedian.class |-- WordMedian$WordMedianMapper.class |-- WordMedian$WordMedianReducer.class |-- WordStandardDeviation.class |-- WordStandardDeviation$WordStandardDeviationMapper.class `-- WordStandardDeviation$WordStandardDeviationReducer.class 12 directories, 152 files

打包后生成了Main-Class参数:

hadoop-mapreduce-examples项目中POM文件中指定了Main-Class:

小总结

org.apache.hadoop.util.RunJar类生成了临时目录hadoop-unjar8401487899319139479,将hadoop-mapreduce-examples-2.6.4.jar包解压缩到临时目录中,执行目录中的org.apache.hadoop.examples.ExampleDriver类的main方法,main方法接收pi 10 10参数,main方法里面就是mapreduce业务逻辑。

mapreduce代码解析

org.apache.hadoop.examples.ExampleDriver程序如下:

public class ExampleDriver { public static void main(String argv[]){ int exitCode = -1; //ProgramDriver类用于执行Java类 ProgramDriver pgd = new ProgramDriver(); try { //pgd有一个Map programs对象,key是类的简写,value是要执行的Class类名 pgd.addClass("wordcount", WordCount.class, "A map/reduce program that counts the words in the input files."); pgd.addClass("wordmean", WordMean.class, "A map/reduce program that counts the average length of the words in the input files."); pgd.addClass("wordmedian", WordMedian.class, "A map/reduce program that counts the median length of the words in the input files."); pgd.addClass("wordstandarddeviation", WordStandardDeviation.class, "A map/reduce program that counts the standard deviation of the length of the words in the input files."); pgd.addClass("aggregatewordcount", AggregateWordCount.class, "An Aggregate based map/reduce program that counts the words in the input files."); pgd.addClass("aggregatewordhist", AggregateWordHistogram.class, "An Aggregate based map/reduce program that computes the histogram of the words in the input files."); pgd.addClass("grep", Grep.class, "A map/reduce program that counts the matches of a regex in the input."); pgd.addClass("randomwriter", RandomWriter.class, "A map/reduce program that writes 10GB of random data per node."); pgd.addClass("randomtextwriter", RandomTextWriter.class, "A map/reduce program that writes 10GB of random textual data per node."); pgd.addClass("sort", Sort.class, "A map/reduce program that sorts the data written by the random writer."); pgd.addClass("pi", QuasiMonteCarlo.class, QuasiMonteCarlo.DESCRIPTION); pgd.addClass("bbp", BaileyBorweinPlouffe.class, BaileyBorweinPlouffe.DESCRIPTION); pgd.addClass("distbbp", DistBbp.class, DistBbp.DESCRIPTION); pgd.addClass("pentomino", DistributedPentomino.class, "A map/reduce tile laying program to find solutions to pentomino problems."); pgd.addClass("secondarysort", SecondarySort.class, "An example defining a secondary sort to the reduce."); pgd.addClass("sudoku", Sudoku.class, "A sudoku solver."); pgd.addClass("join", Join.class, "A job that effects a join over sorted, equally partitioned datasets"); pgd.addClass("multifilewc", MultiFileWordCount.class, "A job that counts words from several files."); pgd.addClass("dbcount", DBCountPageView.class, "An example job that count the pageview counts from a database."); pgd.addClass("teragen", TeraGen.class, "Generate data for the terasort"); pgd.addClass("terasort", TeraSort.class, "Run the terasort"); pgd.addClass("teravalidate", TeraValidate.class, "Checking results of terasort"); //参数是pi 10 10,即执行QuasiMonteCarlo.class类,将10 10作为参数传递给main方法 exitCode = pgd.run(argv); } catch(Throwable e){ e.printStackTrace(); } System.exit(exitCode); } }

实际mapreduce执行类解析

QuasiMonteCarlo类代码如下:

public int run(String[] args) throws Exception { if (args.length != 2) { System.err.println("Usage: "+getClass().getName()+" "); ToolRunner.printGenericCommandUsage(System.err); return 2; } BigDeciaml result = new BigDecimal(); //第一个参数是10,表示map数量是10个 final int nMaps = Integer.parseInt(args[0]); //第二个参数是样本数量,具体可以忽略,主要研究执行流程,重点不在业务逻辑 final long nSamples = Long.parseLong(args[1]); long now = System.currentTimeMillis(); int rand = new Random().nextInt(Integer.MAX_VALUE); final Path tmpDir = new Path(TMP_DIR_PREFIX + "_" + now + "_" + rand); System.out.println("Number of Maps = " + nMaps); System.out.println("Samples per Map = " + nSamples); Job job = new Job(conf); //setup job conf job.setJobName(QuasiMonteCarlo.class.getSimpleName()); job.setJarByClass(QuasiMonteCarlo.class); job.setInputFormatClass(SequenceFileInputFormat.class); job.setOutputKeyClass(BooleanWritable.class); job.setOutputValueClass(LongWritable.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); //设置Mapper类 job.setMapperClass(QmcMapper.class); //设置Reducer类 job.setReducerClass(QmcReducer.class); job.setNumReduceTasks(1); // turn off speculative execution, because DFS doesn't handle // multiple writers to the same file. job.setSpeculativeExecution(false); //setup input/output directories final Path inDir = new Path(tmpDir, "in"); final Path outDir = new Path(tmpDir, "out"); FileInputFormat.setInputPaths(job, inDir); FileOutputFormat.setOutputPath(job, outDir); final FileSystem fs = FileSystem.get(conf); if (fs.exists(tmpDir)) { throw new IOException("Tmp directory " + fs.makeQualified(tmpDir) + " already exists. Please remove it first."); } if (!fs.mkdirs(inDir)) { throw new IOException("Cannot create input directory " + inDir); } try { //generate an input file for each map task for(int i=0; i < numMaps; ++i) { final Path file = new Path(inDir, "part"+i); final LongWritable offset = new LongWritable(i * numPoints); final LongWritable size = new LongWritable(numPoints); final SequenceFile.Writer writer = SequenceFile.createWriter( fs, conf, file, LongWritable.class, LongWritable.class, CompressionType.NONE); try { writer.append(offset, size); } finally { writer.close(); } System.out.println("Wrote input for Map #"+i); } //start a map/reduce job System.out.println("Starting Job"); final long startTime = System.currentTimeMillis(); //提交作业,等待作业完成 job.waitForCompletion(true); final double duration = (System.currentTimeMillis() - startTime)/1000.0; System.out.println("Job Finished in " + duration + " seconds"); //read outputs Path inFile = new Path(outDir, "reduce-out"); LongWritable numInside = new LongWritable(); LongWritable numOutside = new LongWritable(); SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, conf); try { reader.next(numInside, numOutside); } finally { reader.close(); } //compute estimated value final BigDecimal numTotal = BigDecimal.valueOf(numMaps).multiply(BigDecimal.valueOf(numPoints)); result = BigDecimal.valueOf(4).setScale(20) .multiply(BigDecimal.valueOf(numInside.get())) .divide(numTotal, RoundingMode.HALF_UP); } finally { fs.delete(tmpDir, true); } System.out.println("Estimated value of Pi is " + result); return 0; }

QuasiMonteCarlo类中有用的是job.waitForCompletion(true)语句,表示QuasiMonteCarlo依赖于mapreduce框架,使用了mapreduce框架的Job类的waitForCompletion提交,来看看hadoop-mapreduce-examples项目依赖于mapreduce哪个模块,即Job类源自于哪个mapreduce模块:

发现依赖于hadoop-mapreduce-client-jobclient模块,整理一下hadoop-mapreduce-client-jobclient模块与其他mapreduce模块的依赖关系:

注:图有点大,单击后可看大图

未完待续

参考资料

https://cloud.tencent.com/developer/article/1039939 RunJar类讲得比较细

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Security框架:如何使用CorsFilter解决前端跨域请求问题
下一篇:支持数据转发和基于FPGA的图像处理卸载的图像采集卡
相关文章

 发表评论

暂时没有评论,来抢沙发吧~