hadoop提交作业分析_第1页
hadoop提交作业分析_第2页
hadoop提交作业分析_第3页
hadoop提交作业分析_第4页
hadoop提交作业分析_第5页
已阅读5页,还剩10页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

Hadoop提交作业流程分析bin/hadoop jar xxx.jar mainclass args 这样的命令,各位玩Hadoop的估计已经调用过NN次了,每次写好一个Project或对Project做修改后,都必须打个Jar包,然后再用上面的命令提交到Hadoop Cluster上去运行,在开发阶段那是极其繁琐的。程序员是“最懒”的,既然麻烦肯定是要想些法子减少无谓的键盘敲击,顺带延长键盘寿命。比如有的人就写了些Shell脚本来自动编译、打包,然后提交到Hadoop。但还是稍显麻烦,目前比较方便的方法就是用Hadoop eclipse plugin,可以浏览管理HDFS,自动创建MR程序的模板文件,最爽的就是直接Run on hadoop了,但版本有点跟不上Hadoop的主版本了,目前的MR模板还是0.19的。还有一款叫Hadoop Studio的软件,看上去貌似是蛮强大,但是没试过,这里不做评论。那么它们是怎么做到不用上面那个命令来提交作业的呢?不知道?没关系,开源的嘛,不懂得就直接看源码分析,这就是开源软件的最大利处。我们首先从bin/hadoop这个Shell脚本开始分析,看这个脚本内部到底做了什么,如何来提交Hadoop作业的。因为是Java程序,这个脚本最终都是要调用Java来运行的,所以这个脚本最重要的就是添加一些前置参数,如CLASSPATH等。所以,我们直接跳到这个脚本的最后一行,看它到底添加了那些参数,然后再逐个分析(本文忽略了脚本中配置环境参数载入、Java查找、cygwin处理等的分析)。#run itexec $JAVA$JAVA_HEAP_MAX $HADOOP_OPTS -classpath $CLASSPATH$CLASS $ 从上面这行命令我们可以看到这个脚本最终添加了如下几个重要参数:JAVA_HEAP_MAX、HADOOP_OPTS、CLASSPATH、CLASS。下面我们来一个个的分析(本文基于Cloudera Hadoop 0.20.1+152分析)。首先是JAVA_HEAP_MAX,这个就比较简单了,主要涉及代码如下:JAVA_HEAP_MAX=-Xmx1000m # check envvars which might override default argsif $HADOOP_HEAPSIZE != ;then#echorun with heapsize $HADOOP_HEAPSIZEJAVA_HEAP_MAX=-Xmx$HADOOP_HEAPSIZEm#echo$JAVA_HEAP_MAXfi 首先赋予默认值-Xmx1000m,然后检查hadoop-env.sh中是否设置并导出了HADOOP_HEAPSIZE,如果有的话,就使用该值覆盖,得到最后的JAVA_HEAP_MAX。接着是分析CLASSPATH,这是这个脚本的重点之一。这部分主要就是添加了相应依赖库和配置文件到CLASSPATH。# 首先用Hadoop的配置文件目录初始化CLASSPATHCLASSPATH=$HADOOP_CONF_DIR# 下面是针对于Hadoop发行版,添加Hadoop核心Jar包和webapps到CLASSPATHif -d $HADOOP_HOME/webapps ;thenCLASSPATH=$CLASSPATH:$HADOOP_HOMEfifor f in $HADOOP_HOME/hadoop-*-core.jar;doCLASSPATH=$CLASSPATH:$f;done# 添加libs里的Jar包for f in $HADOOP_HOME/lib/*.jar;doCLASSPATH=$CLASSPATH:$f;Donefor f in $HADOOP_HOME/lib/jsp-2.1/*.jar;doCLASSPATH=$CLASSPATH:$f;done# 下面的TOOL_PATH只在命令为“archive”时才添加到CLASSPATHfor f in $HADOOP_HOME/hadoop-*-tools.jar;doTOOL_PATH=$TOOL_PATH:$f;donefor f in $HADOOP_HOME/build/hadoop-*-tools.jar;doTOOL_PATH=$TOOL_PATH:$f;done# 最后添加用户的自定义Hadoop Classpathif $HADOOP_CLASSPATH != ;thenCLASSPATH=$CLASSPATH:$HADOOP_CLASSPATHfi 上面只分析一部分,由于代码比较长,针对开发者部分的CLASSPATH添加没有列出来。下面是这个脚本的重点、实体之处:CLASS分析。Shell脚本会根据你输入的命令参数来设置CLASS和HADOOP_OPTS,其中CLASS所指向的类才是最终真正执行你的命令的实体。# figure out which class to runif $COMMAND=namenode ;thenCLASS=node.NameNodeHADOOP_OPTS=$HADOOP_OPTS $HADOOP_NAMENODE_OPTSelif $COMMAND=fs ;thenCLASS=org.apache.hadoop.fs.FsShellHADOOP_OPTS=$HADOOP_OPTS $HADOOP_CLIENT_OPTSelif $COMMAND=jar ;thenCLASS=org.apache.hadoop.util.RunJarelif $COMMAND=archive ;thenCLASS=org.apache.hadoop.tools.HadoopArchivesCLASSPATH=$CLASSPATH:$TOOL_PATHHADOOP_OPTS=$HADOOP_OPTS $HADOOP_CLIENT_OPTSelseCLASS=$COMMANDfi 这里我们要关心的就是$COMMAND = jar时对应的类org.apache.hadoop.util.RunJar,这个类等下我们继续分析,这是我们通向最终目标的下一个路口。脚本在最后还设置了hadoop.log.dir、hadoop.log.file等HADOOP_OPTS。接着,就利用exec命令带上刚才的参数提交任务了。通过对上面的分析,我们知道了,如果想取代这个脚本,那就必须至少把Hadoop依赖的库和配置文件目录给加到CLASSPATH中(JAVA_HEAP_MAX和HADOOP_OPTS不是必须的),然后调用org.apache.hadoop.util.RunJar类来提交Jar到Hadoop。PS:对Bash Shell不熟的可以先看看这/media/ch31s05.html我们分析了bin/hadoop脚本,知道了提交一个Hadoop作业所需要的基本设置以及真正执行任务提交的类。这一篇我们就来分析这个提交任务的类org.apache.hadoop.util.RunJar,看它内部具体又做了些什么。 RunJar是Hadoop中的一个工具类,结构很简单,只有两个方法:main和unJar。我们从main开始一步步分析。main首先检查传递参数是否符合要求,然后从第一个传递参数中获取jar包的名字,并试图从jar中包中获取manifest信息,以查找mainclass name。如果查找不到mainclass name,则把传递参数中的第二个设为mainclass name。接下去,就是在hadoop.tmp.dir下创建一个临时文件夹,并挂载上关闭删除线程。这个临时文件夹用来放置解压后的jar包内容。jar包的解压工作由unJar方法完成,通过JarEntry逐个获取jar包内的内容,包括文件夹和文件,然后释放到临时文件夹中。解压完毕后,开始做classpath的添加,依次把解压临时文件夹、传递进来的jar包、临时文件夹内的classes文件夹和lib里的所有jar包加入到classpath中。接着以这个classpath为搜索URL新建了一个URLClassLoader(要注意这个类加载器的parent包括了刚才bin/hadoop脚本提交时给的classpath),并设置为当前线程的上下文类加载器。最后,利用Class.forName方法,以刚才的那个URLClassLoader为类加载器,动态生成一个mainclass的Class对象,并获取它的main方法,然后以传递参数中剩下的参数作为调用参数来调用这个main方法。好了,从上分析看来,这个RunJar类是一个很简单的类,就是解压传递进来的jar包,再添加一些classpath,然后动态调用jar包里的mainclass的main方法。看到这里,我想你应该知道如何利用java代码来编写一个替代bin/hadoop的程序了,主要就是两步: 添加Hadoop的依赖库和配置文件; 解压jar包,再添加一些classpath,并动态调用相应方法。最偷懒的方法,直接用RunJar类就行了。 通过前面两篇文章的分析,对Hadoop的作业提交流程基本明了了,下面我们就可以开始编写代码模拟这个流程。第一步要做的是添加Hadoop的依赖库和配置文件到classpath。最常用的方法就是用一个容器先把各个要添加到classpath的文件或文件夹存储起来,后面再作为类加载器的URL搜索路径。/* Add a directory or file to classpath.* * param component*/publicstaticvoid addClasspath(String component) if (component !=null) & (component.length() 0) try File f =new File(component);if (f.exists() URL key = f.getCanonicalFile().toURL();if (!classPath.contains(key) classPath.add(key); catch (IOException e) 上面的classPath变量就是我们声明用来装载classpath组件的容器。privatestatic ArrayList classPath =new ArrayList(); 由于需要添加一些文件夹下的所有Jar包,所以我们还要实现一个遍历添加某文件夹下文件的方法。/* Add all jars in directory to classpath, sub-directory is excluded.* * param dirPath*/public static void addJarsInDir(String dirPath) File dir =new File(dirPath);if (!dir.exists() return;File files = dir.listFiles();if (files =null) return;for (int i =0; i 点我下载-到此,以Java方式提交Hadoop作业介绍完毕。但,是否还可以再进一步呢?现在还只能提交打包好的MR程序,尚不能像Hadoop Eclipse Plugin那样能直接对包含Mapper和Reducer的类Run on Hadoop。为什么直接对这些类Run as Java Application提交的作业是在Local运行的呢?这其中又包含有什么秘密呢?我们将在下面的文章中更深入的剖析Hadoop的作业提交代码,去到最底层,慢慢揭开它的黑面纱。前面我们所分析的部分其实只是Hadoop作业提交的前奏曲,真正的作业提交代码是在MR程序的main里,RunJar在最后会动态调用这个main,在(二)里有说明。我们下面要做的就是要比RunJar更进一步,让作业提交能在编码时就可实现,就像Hadoop Eclipse Plugin那样可以对包含Mapper和Reducer的MR类直接Run on Hadoop。一般来说,每个MR程序都会有这么一段类似的作业提交代码,这里拿WordCount的举例:Configuration conf =new Configuration();String otherArgs =new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length !=2) System.err.println(Usage: wordcount );System.exit(2);Job job =new Job(conf, word count);job.setJarByClass(WordCount.class);job.setMapperClass(TokenizerMapper.class);job.setCombinerClass(IntSumReducer.class);job.setReducerClass(IntSumReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path(otherArgs0);FileOutputFormat.setOutputPath(job, new Path(otherArgs1);System.exit(job.waitForCompletion(true) ?0 : 1);首先要做的是构建一个Configuration对象,并进行参数解析。接着构建提交作业用的Job对象,并设置作业Jar包、对应Mapper和Reducer类、输入输出的Key和Value的类及作业的输入和输出路径,最后就是提交作业并等待作业结束。这些只是比较基本的设置参数,实际还支持更多的设置参数,这里就不一一介绍,详细的可参考API文档。一般分析代码都从开始一步步分析,但我们的重点是分析提交过程中发生的事,这里我们先不理前面的设置对后面作业的影响,我们直接跳到作业提交那一步进行分析,当碰到问题需要分析前面的代码时我会再分析。当调用job.waitForCompletion时,其内部调用的是submit方法来提交,如果传入参数为ture则及时打印作业运作信息,否则只是等待作业结束。submit方法进去后,还有一层,里面用到了job对象内部的jobClient对象的submitJobInternal来提交作业,从这个方法才开始做正事。进去第一件事就是获取jobId,用到了jobSubmitClient对象,jobSubmitClient对应的类是JobSubmissionProtocol的实现之一(目前有两个实现,JobTracker和LocalJobRunner),由此可判断出jobSubmitClient对应的类要么是JobTracker,要么是LocalJobRunner。呃,这下有点想法了,作业提交是上到JobTracker去,还是在本地执行?可能就是看这个jobSunmitClient初始化时得到的是哪个类的实例了,我们可以稍稍的先往后看看,你会发现submitJobInternal最后用了jobSubmitClient.submitJob(jobId)来提交作业,再稍稍看看JobTracker和LocalJobRunner的submitJob实现,看来确实是这么回事。好,那我们就先跳回去看看这个jobSubmitClient是如何初始化的。在JobClient的init中我们可以发现jobSubmitClient的初始化语句:String tracker = conf.get(mapred.job.tracker, local);if (local.equals(tracker) this.jobSubmitClient =new LocalJobRunner(conf); else this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf);哈,跟conf中的mapred.job.tracker属性有关,如果你没设置,那默认得到的值就是local,jobSubmitClient也就会被赋予LocalJobRunner的实例。平时,我们开发时一般都只是引用lib里面的库,不引用conf文件夹里的配置文件,这里就能解释为什么我们直接Run as Java Application时,作业被提交到Local去运行了,而不是Hadoop Cluster中。那我们把conf文件夹添加到classpath,就能Run on Hadoop了么?目前下结论尚早,我们继续分析(你添加了conf文件夹后,可以提交试一试,会爆出一个很明显的让你知道还差什么的错误,这里我就卖卖官子,先不说)。jobId获取到后,在SystemDir基础上加jobId构建了提交作业的目录submitJobDir,SystemDir由JobClient的getSystemDir方法得出,这个SystemDir在构建fs对象时很重要,确定了返回的fs的类型。下去的configureCommandLineOptions方法主要是把作业依赖的第三方库或文件上传到fs中,并做classpath映射或Symlink,以及一些参数设置,都是些细微活,这里不仔细分析。我们主要关心里面的两个地方,一个是:FileSystem fs = getFs();看上去很简单,一句话,就是获取FileSystem的实例,但其实里面绕来绕去,有点头晕。因为Hadoop对文件系统进行了抽象,所以这里获得fs实例的类型决定了你是在hdfs上操作还是在local fs上操作。好了,我们冲进去看看。publicsynchronized FileSystem getFs() throws IOException if (this.fs =null) Path sysDir = getSystemDir();this.fs = sysDir.getFileSystem(getConf();return fs;看见了吧,fs是由sysDir的getFileSystem返回的。我们再冲,由于篇幅,下面就只列出主要涉及的语句。FileSystem.get(this.toUri(), conf);CACHE.get(uri, conf);fs = createFileSystem(uri, conf);Class clazz = conf.getClass(fs.+ uri.getScheme() +.impl, null);if (clazz =null) thrownew IOException(No FileSystem for scheme: + uri.getScheme();FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);fs.initialize(uri, conf);return fs;又是跟conf有关,看来conf是得实时跟住的。这里用到了Java的反射技术,用来动态生成相应的类实例。其中的class获取与uri.getScheme有密切关系,而uri就是在刚才的sysDir基础上构成,sysDir的值又最终是由jobSubmitClient的实例决定的。如果jobSubmitClient是JobTracker的实例,那Scheme就是hdfs。如果是LocalJobRunner的实例,那就是file。从core-default.xml你可以找到如下的信息:fs.file.implorg.apache.hadoop.fs.LocalFileSystemThe FileSystem for file: uris.fs.hdfs.implorg.apache.hadoop.hdfs.DistributedFileSystemThe FileSystem for hdfs: uris.所以在前面的作业提交代码中,在初始化Job实例时,很多事已经决定了,由conf文件夹中的配置文件决定。Configuration是通过当前线程上下文的类加载器来加载类和资源文件的,所以要想Run on Hadoop,第一步必须要让Conf文件夹进入Configuration的类加载器的搜索路径中,也就是当前线程上下文的类加载器。第二个要注意的地方是:String originalJarPath =job.getJar();if (originalJarPath !=null) / copyjarto JobTrackers fs/ usejarname if job is not named.if (.equals(job.getJobName()job.setJobName(new Path(originalJarPath).getName();job.setJar(submitJarFile.toString();fs.copyFromLocalFile(new Path(originalJarPath), submitJarFile);fs.setReplication(submitJarFile, replication);fs.setPermission(submitJarFile, new FsPermission(JOB_FILE_PERMISSION); else LOG.warn(No jobjarfileset. User classes may not be found. +See JobConf(Class) or JobConf#setJar(String).);因为client在提交作业到Hadoop时需要把作业打包成jar,然后copy到fs的submitJarFile路径中。如果我们想Run on Hadoop,那就必须自己把作业的class文件打个jar包,然后再提交。在Eclipse中,这就比较容易了。这里假设你启用了自动编译功能。我们可以在代码的开始阶段加入一段代码用来打包bin文件夹里的class文件为一个jar包,然后再执行后面的常规操作。在configureCommandLineOptions方法之后,submitJobInternal会检查输出文件夹是否已存在,如果存在则抛出异常。之后,就开始划分作业数据,并根据split数得到map tasks的数量。最后,就是把作业配置文件写入submitJobFile,并调用jobSubmitClient.submitJob(jobId)最终提交作业。至此,对Hadoop的作业提交分析也差不多了,有些地方讲的比较啰嗦,有些又讲得点到而止,但大体的过程以及一些较重要的东西还是说清楚了,其实就是那么回事。下去的文章我们会在前面的jobUtil基础上增加一些功能来支持Run on Hadoop,其实主要就是增加一个打包Jar的方法。经过上一篇的分析,我们知道了Hadoop的作业提交目标是Cluster还是Local,与conf文件夹内的配置文件参数有着密切关系,不仅如此,其它的很多类都跟conf有关,所以提交作业时切记把conf放到你的classpath中。因为Configuration是利用当前线程上下文的类加载器来加载资源和文件的,所以这里我们采用动态载入的方式,先添加好对应的依赖库和资源,然后再构建一个URLClassLoader作为当前线程上下文的类加载器。publicstatic ClassLoader getClassLoader() ClassLoader parent = Thread.currentThread().getContextClassLoader();if (parent =null) parent = EJob.class.getClassLoader();if (parent =null) parent = ClassLoader.getSystemClassLoader();return new URLClassLoader(classPath.toArray(new URL0), parent);代码很简单,废话就不多说了。调用例子如下:EJob.addClasspath(/usr/lib/hadoop-0.20/conf);ClassLoader classLoader = EJob.getClassLoader();Thread.currentThread().setContextClassLoader(classLoader);设置好了类加载器,下面还有一步就是要打包Jar文件,就是让Project自打包自己的class为一个Jar包,我这里以标准Eclipse工程文件夹布局为例,打包的就是bin文件夹里的class。public static File createTempJar(String root) throws IOException if (!new File(root).exists() returnnull;Manifest manifest =new Manifest();manifest.getMainAttributes().putValue(Manifest-Version, 1.0);final File jarFile = File.createTempFile(EJob-, .jar, new File(System.getProperty(java.io.tmpdir);Runtime.getRuntime().addShutdownHook(new Thread() publicvoid run() jarFile.delete(););JarOutputStream out =new JarOutputStream(new FileOutputStream(jarFile),manifest);createTempJarInner(out, new File(root), );out.flush();out.close();return jarFile;private static void createTempJarInner(JarOutputStream out, File f,String base) throws IOException if (f.isDirectory() File fl = f.listFiles();if (base.length() 0) base = base +/;for (int i =0; i fl.length; i+) createTempJarInner(out, fli, base + fli.getName(); else out.putNextEntry(new JarEntry(base);FileInputStream in =new FileInputStream(f);byte buffer =newbyte1024;int n = in.read(buffer);while (n !=-1) out.write(buffer, 0, n);n = in.read(buffer);in.close();这里的对外接口是createTempJar,接收参数为需要打包的文件夹根路径,支持子文件夹打包。使用递归处理法,依次把文件夹里的结构和文件打包到Jar里。很简单,就是基本的文件流操作,陌生一点的就是Manifest和JarOutputStream,查查API就明了。好,万事具备,只欠东风了,我们来实践一下试试。还是拿WordCount来举例:/ Add these statements. XXXFile jarFile = EJob.createTempJar(bin);EJob.addClasspath(/usr/lib/hadoop-0.20/conf);ClassLoader classLoader = EJob.getClas

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论