Hadoop MapReduce执行全流程关键代码.doc_第1页
Hadoop MapReduce执行全流程关键代码.doc_第2页
Hadoop MapReduce执行全流程关键代码.doc_第3页
Hadoop MapReduce执行全流程关键代码.doc_第4页
Hadoop MapReduce执行全流程关键代码.doc_第5页
全文预览已结束

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

1. HadoopMap/Reduce执行流程关键代码2. 3. JobClient.runJob(conf)|运行job4. |-JobClientjc=newJobClient(job);5. |-RunningJobrj=jc.submitJob(job);6. |-submitJobInternal(job);7. |-intreduces=job.getNumReduceTasks();8. |-JobContextcontext=newJobContext(job,jobId);9. |-maps=writeOldSplits(job,submitSplitFile);10. |-job.setNumMapTasks(maps);11. |-job.writeXml(out);12. |-JobStatusstatus=jobSubmitClient.submitJob(jobId);13. -1. 2. JobTracker.submitJob(JobId)|提交job3. |-JobInProgressjob=newJobInProgress(jobId,this,this.conf);4. |-checkAccess(job,QueueManager.QueueOperation.SUBMIT_JOB);|检查权限5. |-checkMemoryRequirements(job);|检查内存需求6. |-addJob(jobId,job);|添加至job队列7. |-jobs.put(job.getProfile().getJobID(),job);8. |-for(JobInProgressListenerlistener:jobInProgressListeners)添加至监听器,供调度使用9. |-listener.jobAdded(job);10. 11. -12. JobTracker.heartbeat()|JobTracker启动后供TaskTracker以RPC方式来调用,返回Response集合13. |-Listactions=newArrayList();14. |-tasks=taskScheduler.assignTasks(taskTrackerStatus);|通过调度器选择合适的tasks15. |-for(Tasktask:tasks)16. |-expireLaunchingTasks.addNewTask(task.getTaskID();17. |-actions.add(newLaunchTaskAction(task);|实际actions还会添加commmitTask等18. |-response.setHeartbeatInterval(nextInterval);19. |-response.setActions(actions.toArray(newTaskTrackerActionactions.size();20. |-returnresponse;21. 22. 23. -24. 25. TaskTracker.offerService|TaskTracker启动后通过offerservice()不断发心跳至JobTracker中26. |-transmitHeartBeat()27. |-HeartbeatResponseheartbeatResponse=jobClient.heartbeat(status,justStarted,justInited,askForNewTask,heartbeatResponseId);28. |-TaskTrackerActionactions=heartbeatResponse.getActions();29. |-for(TaskTrackerActionaction:actions)30. |-if(actioninstanceofLaunchTaskAction)31. |-addToTaskQueue(LaunchTaskAction)action);|添加至执行Queue,根据map/reducetask分别添加32. |-if(action.getTask().isMapTask()33. |-mapLauncher.addToTaskQueue(action);34. |-TaskInProgresstip=registerTask(action,this);35. |-tasksToLaunch.add(tip);36. |-tasksToLaunch.notifyAll();|唤醒阻塞进程37. |-else38. |-reduceLauncher.addToTaskQueue(action);39. 40. -41. 42. TaskLauncher.run()43. |-while(tasksToLaunch.isEmpty()44. |-tasksToLaunch.wait();45. |-tip=tasksToLaunch.remove(0);46. |-startNewTask(tip);47. |-localizeJob(tip);48. |-launchTaskForJob(tip,newJobConf(rjob.jobConf);49. |-tip.setJobConf(jobConf);50. |-tip.launchTask();|TaskInProgress.launchTask()51. |-this.runner=task.createRunner(TaskTracker.this,this);|区分map/reduce52. |-this.runner.start();53. -54. MapTaskRunner.run()|执行MapTask55. |-FileworkDir=newFile(lDirAlloc.getLocalPathToRead()|准备执行路径56. |-Stringjar=conf.getJar();|准备jar包57. |-Filejvm=newFile(newFile(System.getProperty(java.home),bin),java);|获取jvm58. |-vargs.add(Child.class.getName();添加参数,Child类作为main主函数启动59. |-tracker.addToMemoryManager(t.getTaskID(),t.isMapTask(),conf,pidFile);|添加至内存管理60. |-jvmManager.launchJvm(this,jvmManager.constructJvmEnv(setup,vargs,stdout,stderr,logSize,|统一纳入jvm管理器当中并启动61. workDir,env,pidFile,conf);62. |-mapJvmManager.reapJvm(t,env);|区分map/reduce操作63. 64. -65. JvmManager.reapJvm()|66. |-while(jvmIter.hasNext()67. |-JvmRunnerjvmRunner=jvmIter.next().getValue();68. |-JobIDjId=jvmRunner.jvmId.getJobId();69. |-setRunningTaskForJvm(jvmRunner.jvmId,t);70. |-spawnNewJvm(jobId,env,t);71. |-JvmRunnerjvmRunner=newJvmRunner(env,jobId);72. |-jvmIdToRunner.put(jvmRunner.jvmId,jvmRunner);73. |-jvmRunner.start();|执行JvmRunner的run()方法74. |-jvmRunner.run()75. |-runChild(env);76. |-ListwrappedCommand=TaskLog.captureOutAndError(env.setup,env.vargs,env.stdout,env.stderr,77. env.logSize,env.pidFile);|选取main函数78. |-shexec.execute();|执行79. |-intexitCode=shexec.getExitCode();获取执行状态值80. |-updateOnJvmExit(jvmId,exitCode,killed);更新Jvm状态81. 82. -83. Child.main()执行Task(map/reduce)84. |-JVMIdjvmId=newJVMId(firstTaskid.getJobID(),firstTaskid.isMap(),jvmIdInt);85. |-TaskUmbilicalProtocolumbilical=(TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,86. TaskUmbilicalProtocol.versionID,address,defaultConf);87. |-while(true)88. |-JvmTaskmyTask=umbilical.getTask(jvmId);89. |-task=myTask.getTask();90. |-taskid=task.getTaskID();91. |-TaskRunner.setupWorkDir(job);92. |-task.run(job,umbilical);|以maptask为例93. |-TaskReporterreporter=newTaskReporter(getProgress(),umbilical);94. |-if(useNewApi)95. |-runNewMapper(job,split,umbilical,reporter);96. |-else97. |-runOldMapper(job,split,umbilical,reporter);98. |-inputSplit=(InputSplit)ReflectionUtils.newInstance(job.getClassByName(splitClass),job);99. |-MapRunnablerunner=ReflectionUtils.newInstance(job.getMapRunnerClass(),job);100. |-runner.run(in,newOldOutputCollector(collector,conf),reporter);101. -102. MapRunner.run()103. |-K1key=input.createKey();104. |-V1value=input.createValue();10

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论