MapReduce源码流程分析.docx_第1页
MapReduce源码流程分析.docx_第2页
MapReduce源码流程分析.docx_第3页
MapReduce源码流程分析.docx_第4页
MapReduce源码流程分析.docx_第5页
已阅读5页,还剩44页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

MapReduce源码流程分析MapReduce源码流程分析11.角色21.1JobClient21.2JobTracker21.3TaskTracker22.数据结构22.1JobInProgress22.2TaskInProgress22.3TaskLauncher32.4TaskTracker$TaskInProgress32.5Task32.6TaskRunner32.7JvmManager33.流程33.1 Client端33.1.1初始化33.1.2与JobTracker端的交互43.2 JobTracker端73.2.1启动初始化73.2.2与Client端交互103.2.3与TaskTracker端交互163.3 TaskTracker端173.3.1启动初始化173.3.2与JobTracker端交互273.3.3运行任务271.角色1.1JobClient每一个job都会在用户端通过JobClient类将应用程序以及配置参数打包成jar文件存储在HDFS,并把路径提交到JobTracker,然后由JobTracker创建每一个Task(即MapTask和ReduceTask)并将它们分发到各个TaskTracker服务中去执行。1.2JobTrackerJobTracker是一个master(中心节点)服务, JobTracker负责生成和调度job的每一个子任务task运行于TaskTracker上,并监控它们,如果发现有失败的task就重新运行它。一般情况应该把JobTracker部署在单独的机器上。1.3TaskTrackerTaskTracker是运行于多个节点上的slaver()服务。TaskTracker则负责直接执行每一个task。TaskTracker都需要运行在HDFS的DataNode上。2.数据结构2.1JobInProgressJobClient提交job后,JobTracker会创建一个JobInProgress来跟踪和调度这个job,并把它添加到job队列里。JobInProgress会根据提交的job jar中定义的输入数据集(已分解成FileSplit)创建对应的一批TaskInProgress用于监控和调度MapTask,同时在创建指定数目的TaskInProgress用于监控和调度ReduceTask,缺省为1个ReduceTask。2.2TaskInProgressJobTracker启动任务时通过每一个TaskInProgress来launchTask,这时会把Task对象(即MapTask和ReduceTask)序列化写入相应的TaskTracker服务中,TaskTracker收到后会创建对应的TaskInProgress(此TaskInProgress实现非JobTracker中使用的TaskInProgress,作用类似)用于监控和调度该Task。启动具体的Task进程是通过TaskInProgress管理的TaskRunner对象来运行的。TaskRunner会自动装载job jar,并设置好环境变量后启动一个独立的java child进程来执行Task,即MapTask或者ReduceTask,但它们不一定运行在同一个TaskTracker中。2.3TaskLauncher2.4TaskTracker$TaskInProgress2.5Task2.6TaskRunner2.7JvmManager3.流程启动。3.1 Client端3.1.1初始化客户端初始化主要是创建RPC远程服务器,在创建JobClient的时候进行创建的。是在客户端提交作业代码执行时进行的,提交之后立刻客户端做必要的初始化之后再提交作业。JobClient jc = new JobClient(job);RunningJob rj = jc.submitJob(job);3.1.2与JobTracker端的交互客户端主要负责像JobTracker提交作业,在旧版的api中通过使用JonConf类对作业进行配置,JobConf的功能已被新的类Configuration和Job替换。Configuration类描述了资源,这些资源大多都是从XML配置文件中读取的属性和值组成。比如来自core-default.xml和core-site.xml。Job描述了用户角度的视图,它允许用户配置、提交、控制它的执行和查询状态。如图所示:是一个MapReduce的程序客户端提交作业的代码,通过生成一个JobConf对作业进行配置,也可以用新的api生成一个Job(注释中),最后通过JobClient.runJob(conf)对作业进行提交(也可用新的api job.waitForCompletion(true)来对作业进行提交)。图3.1客户端提交作业代码此方法会返回一个RunningJob对象,它用来跟踪作业的状态。作业提交完毕后,JobClient会根据此对象开始轮询作业的进度,直到作业完成。在JobClient.runJob方法中会创建一个JobClient对象,之后调用JobClient的submitJob方法,此方法返回RunningJob对象,在submitJob方法中调用submitJobInternal方法。submitJobInternal此方法是提交作业的主要方法。它主要实现了Hadoop权威指南P167所描述的作业提交的过程。主要有JobID jobId = jobSubmitClient.getNewJobId();向JobTracker请求一个新的作业ID(通过调用JobTracker的getNewJobId()来实现);Path submitJobDir = new Path(getSystemDir(), jobId.toString();Path submitJarFile = new Path(submitJobDir, job.jar);Path submitSplitFile = new Path(submitJobDir, job.split);configureCommandLineOptions(job, submitJobDir, submitJarFile);Path submitJobFile = new Path(submitJobDir, job.xml);将运行所需要的资源包括作业job.jar文件,配置文件job.xml,即所得的输入划分文件job.split复制到一个以作业ID号命名的目录中。configureCommandLineOptions函数的作用是在fs中创建虚拟路径其中job.xml,job.jar,job.split作用如下job.xml: 作业配置,例如Mapper, Combiner, Reducer的类型,输入输出格式的类型等。job.jar: jar包,里面包含了执行此任务需要的各种类,比如 Mapper,Reducer等实现。job.split: 文件分块的相关信息,比如有数据分多少个块,块的大小(默认64m)等。/ Check the output specification if (reduces = 0 ? job.getUseNewMapper() : job.getUseNewReducer() org.apache.hadoop.mapreduce.OutputFormat output = ReflectionUtils.newInstance(context.getOutputFormatClass(), job); output.checkOutputSpecs(context); else job.getOutputFormat().checkOutputSpecs(fs, job); 检查输出经是否已经存在。/计算作业的输入划分/ Create the splits for the job LOG.debug(Creating splits at + fs.makeQualified(submitSplitFile); int maps; if (job.getUseNewMapper() maps = writeNewSplits(context, submitSplitFile); else maps = writeOldSplits(job, submitSplitFile); job.set(mapred.job.split.file, submitSplitFile.toString();job.setNumMapTasks(maps);/ Write job file to JobTrackers fs FSDataOutputStream out = FileSystem.create(fs, submitJobFile,new FsPermission(JOB_FILE_PERMISSION);try job.writeXml(out); finally out.close();将job.xml写入到JobTracker的文件系统里,具体就是对job.xml把Configuration的配置信息写在job.xml里。 / Now, actually submit the job (using the submit name) JobStatus status = jobSubmitClient.submitJob(jobId); if (status != null) return new NetworkedJob(status); else throw new IOException(Could not launch job); 提交作业。这里通知JobTracker,发送jobId过去,由JobTracker根据jobId去执行。JobClient里面使用RPC机制来构造一个实现 JobSubmissionProtocol接口的JobTracker的代理,然后利用远程发放直接执行JobTracker里的submitJob,与我们的利用Socket通信略有不同。NetworkedJob通过实现JobSubmissionProtocol接口的代理,来与JobTracker进行通信。3.2 JobTracker端JobTracker的地位相当于我们的Master,它负责调度job的每一个子任务task运行于slave上,并监控它们,如果发现有失败的task就重新运行它。JobTracker一直在等待JobClient通过RPC提交作业,而TaskTracker一直通过RPC向 JobTracker发送心跳heartbeat询问有没有任务可做,如果有,让其派发任务给它执行。如果JobTracker的作业队列不为空, 则TaskTracker通过发送心跳来会获得JobTracker给它派发的任务。这是一道pull过程: slave主动向master拉生意。slave节点的TaskTracker接到任务后在其本地发起Task,然后执行任务。3.2.1启动初始化JobTracker启动时首先运行main方法:public static void main(String argv) throws IOException, InterruptedException StringUtils.startupShutdownMessage(JobTracker.class, argv, LOG); if (argv.length != 0) System.out.println(usage: JobTracker); System.exit(-1); try JobTracker tracker = startTracker(new JobConf(); tracker.offerService(); catch (Throwable e) LOG.fatal(StringUtils.stringifyException(e); System.exit(-1); 在main方法中调用startTracker方法和tracker.offerService方法。startTracker方法:主要调用两个方法,如下图所示代码。图3.1 startTracker方法如图所示,一个方法是JobTracker的构造方法,一个是jobTracker.taskSchedule.setTaskTrackerManager方法。在JobTracker的构造方法中首先会初始化常量数据,之后创建队列管理器(queueManager)和作业调度器。queueManager = new QueueManager(this.conf);/ Create the scheduler Class schedulerClass = conf.getClass(mapred.jobtracker.taskScheduler, JobQueueTaskScheduler.class, TaskScheduler.class); taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(schedulerClass, conf);这里调度器的创建使用了反射机制,实例化了一个JobQueueTaskScheduler对象。调度策略默认为FIFO。在JobQueueTaskScheduler构造方法中会生成一个JobQueueJobInProgressListener,即JobQueueJobInProgress监听器。JobQueueJobInProgressListener构造方法会创建一个Map,通过传递一个比较器FIFO_JOB_QUEUE_COMPARATOR,它是JobQueueJobInProgressListener类的静态函数。同时通过反射机制调用了JobQueueTaskScheduler的setConf方法,具体参见相关源代码,这里就不列举了。在setConf方法中创建了EagerTaskInitializationListener监听器。/ Set ports, start RPC servers, setup security policy etc. InetSocketAddress addr = getAddress(conf); this.localMachine = addr.getHostName(); this.port = addr.getPort(); / Set service-level authorization security policy if (conf.getBoolean( ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false) PolicyProvider policyProvider = (PolicyProvider)(ReflectionUtils.newInstance( conf.getClass(PolicyProvider.POLICY_PROVIDER_CONFIG, MapReducePolicyProvider.class, PolicyProvider.class), conf); SecurityUtil.setPolicy(new ConfiguredPolicy(conf, policyProvider); int handlerCount = conf.getInt(mapred.job.tracker.handler.count, 10); erTrackerServer = RPC.getServer(this, addr.getHostName(), addr.getPort(), handlerCount, false, conf);启动RPC服务器。String infoAddr = NetUtils.getServerAddress(conf, .bindAddress, .port, mapred.job.tracker.http.address); InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(infoAddr); String infoBindAddress = infoSocAddr.getHostName(); int tmpInfoPort = infoSocAddr.getPort(); this.startTime = System.currentTimeMillis(); infoServer = new HttpServer(job, infoBindAddress, tmpInfoPort, tmpInfoPort = 0, conf); infoServer.setAttribute(job.tracker, this); / initialize history parameters. boolean historyInitialized = JobHistory.init(conf, this.localMachine, this.startTime); String historyLogDir = null; FileSystem historyFS = null; if (historyInitialized) historyLogDir = conf.get(hadoop.job.history.location); infoServer.setAttribute(historyLogDir, historyLogDir); historyFS = new Path(historyLogDir).getFileSystem(conf); infoServer.setAttribute(fileSys, historyFS); infoServer.addServlet(reducegraph, /taskgraph, TaskGraphServlet.class);infoServer.start();启动TrackInfoServer。构造函数还有其他的内容这里就不细说了(其实是其他的也没仔细看)。现在回到方法startTracker方法,在创建JobTracker下面还有一句代码,如下:result = new JobTracker(conf, identifier);result.taskScheduler.setTaskTrackerManager(result);JobTracker将自己的调度器的TaskTrackerManager设置成自己。这么做使得后来再初始化Task时,调用的是自己的InitJob方法。现在回到main方法,在startTracker方法后,还有一个tracker.offerService()方法要执行。在tracker.offerService()中会启动几个服务,其中关键的有调度器的启用和interTrackerServer的启用。taskScheduler.start();erTrackerServer.start();taskScheduler实质上是JobQueueTaskScheduler,当它启动时,会将事先生成的监听器加入到taskTrackerManager的监听器队列中,taskTrackerManager是一个抽象类,被JobTracker实现。所以调用的是JobTracker的addJobInProgressListener方法。Override public synchronized void start() throws IOException super.start(); taskTrackerManager.addJobInProgressListener(jobQueueJobInProgressListener); eagerTaskInitializationListener.setTaskTrackerManager(taskTrackerManager); eagerTaskInitializationListener.start(); taskTrackerManager.addJobInProgressListener( eagerTaskInitializationListener); 3.2.2与Client端交互客户端使用RPC机制来构造一个实现 JobSubmissionProtocol接口的JobTracker的代理,然后利用远程发放直接执行JobTracker里的submitJob方法进而提交作业。下面来看一下JobTracker.submitJob方法。图3-2 JobTracker提交作业在这个函数中会创建一个JobInProgress对象,并把它加入到jobs队列中,jobs是一个TreeMap:Map jobs = new TreeMap()。是在addJob方法中加入到jobs,并且将它加入到监听队列中。代码如下:private synchronized JobStatus addJob(JobID jobId, JobInProgress job) totalSubmissions+; synchronized (jobs) synchronized (taskScheduler) jobs.put(job.getProfile().getJobID(), job); for (JobInProgressListener listener : jobInProgressListeners) try listener.jobAdded(job); catch (IOException ioe) LOG.warn(Failed to add and so skipping the job : + job.getJobID() + . Exception : + ioe); myInstrumentation.submitJob(job.getJobConf(), jobId); return job.getStatus();这些监听器是在JobTracker启动初始化的时候构造的,有JobQueueJobInProgressListener和EagerTaskInitializationListener。其中eagerTaskInitializationListener负责任务Task的初始化。其具体实现是这样的: 这个listener在初始化时会开启一个JobInitThread线程,当作业通过jobAdded(job)加入到初始化队列 jobInitQueue中,根据作业的优先级排序(resortInitQueue方法)后, 这个线程就会调用JobInProgress.initTasks()立即初始化作业的所有任务。调用关系如下:在JobTracker启动初始化的时候,会启动taskSchedule,即调用taskSchedule.start(),即调用JobQueueTaskScheduler.start(),在JobQueueTaskScheduler.start方法中会调用: super.start(); taskTrackerManager.addJobInProgressListener(jobQueueJobInProgressListener); eagerTaskInitializationListener.setTaskTrackerManager(taskTrackerManager); eagerTaskInitializationListener.start(); taskTrackerManager.addJobInProgressListener(eagerTaskInitializationListener);其中eagerTaskInitializationListener.start方法会启用新的线程JobInitManager:this.jobInitManagerThread = new Thread(jobInitManager, jobInitManager);JobInitManager是eagerTaskInitializationListener的内部类,这是它的run函数,如下public void run() JobInProgress job = null; while (true) try synchronized (jobInitQueue) while (jobInitQueue.isEmpty() jobInitQueue.wait(); job = jobInitQueue.remove(0); threadPool.execute(new InitJob(job); catch (InterruptedException t) LOG.info(JobInitManagerThread interrupted.); break; LOG.info(Shutting down thread pool); threadPool.shutdownNow(); 它还会创建一个新的InitJob对象,这个对象也新启动一个线程,如下所示:class InitJob implements Runnable private JobInProgress job; public InitJob(JobInProgress job) this.job = job; public void run() ttm.initJob(job); 它是JobInitManager的内部类,ttm其实是JobTracker,这个可以从开始的代码得知。接下来就是调用JobTracker.initJob这个方法。在JobTracker.initJob,会调用JobInProgress.initTask,并判断JobInProgress初始化前后状态会不会改变。JobInProgress.initTask方法是初始化Task的主要方法,JobInProgress.initTasks方法首先从JobClient上传的job.split文件中读取所有数据块的列表,然后根据这个列表创建对应数目的Map执行管理对象TaskInProgress。创建这些TaskInProgress对象完毕后,initTasks()方法会通过createCache()方法为这些对象产生一个未执行任务的 Map缓存nonRunningMapCache。slave端的TaskTracker向master发送心跳时,就可以直接从这个cache中取任务去执行。createCache()方法的作用是为以上TaskInProgress对象在网络拓扑结构上分配拥有此任务数据块的节点。从近到远一层一层 地寻找,首先是同一节点,然后在寻找同一机柜上的节点,接着寻找相同关换机下的节点,直到找了maxLevel层结束。这样的话,在JobTracker 给TaskTracker派发任务的时候,可以迅速找到最近的TaskTracker,让它执行任务。代码如下图所示。图3-3 JobInProgress创建并初始化MapTask其次JobInProgress会创建Reduce的监控对象,这个比较简单,根据JobConf里指定的Reduce数目创建,缺省只创建1个 Reduce任务。监控和调度Reduce任务的也是TaskInProgress类,不过构造方法有所不同,TaskInProgress会根据不同参 数分别创建具体的MapTask或者ReduceTask。同样地,initTasks()也会通过createCache()方法对这些 TaskInProgress对象寻找maxLevel层的可行TaskTracker,进而产生nonRunningReduceCache成员。代码如下图示。图3-4 JobInProgress创建并初始化ReduceTask之后创建两个cleanup TaskInProgress, 一个用来清理map,一个用来清理reduce;再创建两个初始化 task,一个初始化map,一个初始化reduce。最后构造JobStatus并记录job正在执行中,然后再调用JobHistory.JobInfo.logStarted()记录job的执行日志。到这里JobTracker里初始化job的过程全部结束,执行则是通过另一异步的方式处理的,下面接着介绍它。图3-5清理初始化tip的创建3.2.3与TaskTracker端交互JobTracker.heartbeat这个方法主要是TaskTracker端远程调用时用到的方法,其主要作用就是分派具体任务,并将该任务分发到TaskTracker端:其关键代码为:1 HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);2 List actions = new ArrayList();3 if (tasks = null) 4 tasks = taskScheduler.assignTasks(taskTrackers.get(trackerName);5 6 if (tasks != null) 7 for (Task task : tasks) 8expireLaunchingTasks.addNewTask(task.getTaskID(); 9actions.add(new LaunchTaskAction(task);10 11这里的调度中主要会根据我们的任务的输入文件dfs中的文件存放节点来分配,数据在哪个节点上任务就分配到哪个节点上的TaskTracker中。简单来说这里会根据我们的上面提到的JobInProgress和TaskInPorgress调度生成具体的MapTask和ReuceTask实例,他们均继承自抽象类Task该实例会放入到LaunchTaskAction中,最后获得的任务列表会被返回到TaskTracker端。3.3 TaskTracker端3.3.1启动初始化TaskTracker启动时运行,main函数,在这个函数中主要代码是TaskTracker(conf).run()函数。图3-6 TaskTracker的Main函数图3-7 TaskTracker的构造函数TaskTracker构造时会初始化一些参数如目前最大可执行的MapTask数量、ReduceTask数量、工作线程数、工作目录、创建info_httpserver服务器。在构造函数中还执行initialize方法,完成初始化,它实质做真正的构造工作。将它独立成方法,以便可以循环利用。Initialize主要工作创建taskReportServer、runningTasks、runningJobs、mapLauncher、reduceLauncher、mapEventsFetcher、jobClient等/* * Do the real constructor work here. Its in a separate method * so we can call it again and recycle the object after calling * close(). */ synchronized void initialize() throws IOException / use configured nameserver & interface to get local hostname this.fConf = new JobConf(originalConf); if (fConf.get() != null) this.localHostname = fConf.get(); if (localHostname = null) this.localHostname = DNS.getDefaultHost (fConf.get(erface,default), fConf.get(server,default); /check local disk checkLocalDirs(this.fConf.getLocalDirs(); fConf.deleteLocalFiles(SUBDIR); / Clear out state tables this.tasks.clear(); this.runningTasks = new LinkedHashMap(); this.runningJobs = new TreeMap(); this.mapTotal = 0; this.reduceTotal = 0; this.acceptNewTasks = true; this.status = null; this.minSpaceStart = this.fConf.getLong(mapred.local.dir.minspacestart, 0L); this.minSpaceKill = this.fConf.getLong(mapred.local.dir.minspacekill, 0L); /tweak the probe sample size (make it a function of numCopiers) probe_sample_size = this.fConf.getInt(mapred.tasktracker.events.batchsize, 500); Class metricsInst = getInstrumentationClass(fConf); try java.lang.reflect.Constructor c = metricsInst.getConstructor(new Class TaskTracker.class ); this.myInstrumentation = c.newInstance(this); catch(Exception e) /Reflection can throw lots of exceptions - handle them all by /falling back on the default. LOG.error(failed to initialize taskTracker metrics, e); this.myInstrumentation = new TaskTrackerMetricsInst(this); / bind address String address = NetUtils.getServerAddress(fConf, mapred.task.tracker.report.bindAddress, mapred.task.tracker.report.port, mapred.task.tracker.report.address); InetSocketAddress socAddr = NetUtils.createSocketAddr(address); String bindAddress = socAddr.getHostName(); int tmpPort = socAddr.getPort(); this.jvmManager = new JvmManager(this); / Set service-level authorization security policy if (this.fConf.getBoolean( ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false) PolicyProvider policyProvider = (PolicyProvider)(ReflectionUtils.newInstance( this.fConf.getClass(Polic

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论