




全文预览已结束
下载本文档
版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
1. HadoopMap/Reduce执行流程关键代码2. 3. JobClient.runJob(conf)|运行job4. |-JobClientjc=newJobClient(job);5. |-RunningJobrj=jc.submitJob(job);6. |-submitJobInternal(job);7. |-intreduces=job.getNumReduceTasks();8. |-JobContextcontext=newJobContext(job,jobId);9. |-maps=writeOldSplits(job,submitSplitFile);10. |-job.setNumMapTasks(maps);11. |-job.writeXml(out);12. |-JobStatusstatus=jobSubmitClient.submitJob(jobId);13. -1. 2. JobTracker.submitJob(JobId)|提交job3. |-JobInProgressjob=newJobInProgress(jobId,this,this.conf);4. |-checkAccess(job,QueueManager.QueueOperation.SUBMIT_JOB);|检查权限5. |-checkMemoryRequirements(job);|检查内存需求6. |-addJob(jobId,job);|添加至job队列7. |-jobs.put(job.getProfile().getJobID(),job);8. |-for(JobInProgressListenerlistener:jobInProgressListeners)添加至监听器,供调度使用9. |-listener.jobAdded(job);10. 11. -12. JobTracker.heartbeat()|JobTracker启动后供TaskTracker以RPC方式来调用,返回Response集合13. |-Listactions=newArrayList();14. |-tasks=taskScheduler.assignTasks(taskTrackerStatus);|通过调度器选择合适的tasks15. |-for(Tasktask:tasks)16. |-expireLaunchingTasks.addNewTask(task.getTaskID();17. |-actions.add(newLaunchTaskAction(task);|实际actions还会添加commmitTask等18. |-response.setHeartbeatInterval(nextInterval);19. |-response.setActions(actions.toArray(newTaskTrackerActionactions.size();20. |-returnresponse;21. 22. 23. -24. 25. TaskTracker.offerService|TaskTracker启动后通过offerservice()不断发心跳至JobTracker中26. |-transmitHeartBeat()27. |-HeartbeatResponseheartbeatResponse=jobClient.heartbeat(status,justStarted,justInited,askForNewTask,heartbeatResponseId);28. |-TaskTrackerActionactions=heartbeatResponse.getActions();29. |-for(TaskTrackerActionaction:actions)30. |-if(actioninstanceofLaunchTaskAction)31. |-addToTaskQueue(LaunchTaskAction)action);|添加至执行Queue,根据map/reducetask分别添加32. |-if(action.getTask().isMapTask()33. |-mapLauncher.addToTaskQueue(action);34. |-TaskInProgresstip=registerTask(action,this);35. |-tasksToLaunch.add(tip);36. |-tasksToLaunch.notifyAll();|唤醒阻塞进程37. |-else38. |-reduceLauncher.addToTaskQueue(action);39. 40. -41. 42. TaskLauncher.run()43. |-while(tasksToLaunch.isEmpty()44. |-tasksToLaunch.wait();45. |-tip=tasksToLaunch.remove(0);46. |-startNewTask(tip);47. |-localizeJob(tip);48. |-launchTaskForJob(tip,newJobConf(rjob.jobConf);49. |-tip.setJobConf(jobConf);50. |-tip.launchTask();|TaskInProgress.launchTask()51. |-this.runner=task.createRunner(TaskTracker.this,this);|区分map/reduce52. |-this.runner.start();53. -54. MapTaskRunner.run()|执行MapTask55. |-FileworkDir=newFile(lDirAlloc.getLocalPathToRead()|准备执行路径56. |-Stringjar=conf.getJar();|准备jar包57. |-Filejvm=newFile(newFile(System.getProperty(java.home),bin),java);|获取jvm58. |-vargs.add(Child.class.getName();添加参数,Child类作为main主函数启动59. |-tracker.addToMemoryManager(t.getTaskID(),t.isMapTask(),conf,pidFile);|添加至内存管理60. |-jvmManager.launchJvm(this,jvmManager.constructJvmEnv(setup,vargs,stdout,stderr,logSize,|统一纳入jvm管理器当中并启动61. workDir,env,pidFile,conf);62. |-mapJvmManager.reapJvm(t,env);|区分map/reduce操作63. 64. -65. JvmManager.reapJvm()|66. |-while(jvmIter.hasNext()67. |-JvmRunnerjvmRunner=jvmIter.next().getValue();68. |-JobIDjId=jvmRunner.jvmId.getJobId();69. |-setRunningTaskForJvm(jvmRunner.jvmId,t);70. |-spawnNewJvm(jobId,env,t);71. |-JvmRunnerjvmRunner=newJvmRunner(env,jobId);72. |-jvmIdToRunner.put(jvmRunner.jvmId,jvmRunner);73. |-jvmRunner.start();|执行JvmRunner的run()方法74. |-jvmRunner.run()75. |-runChild(env);76. |-ListwrappedCommand=TaskLog.captureOutAndError(env.setup,env.vargs,env.stdout,env.stderr,77. env.logSize,env.pidFile);|选取main函数78. |-shexec.execute();|执行79. |-intexitCode=shexec.getExitCode();获取执行状态值80. |-updateOnJvmExit(jvmId,exitCode,killed);更新Jvm状态81. 82. -83. Child.main()执行Task(map/reduce)84. |-JVMIdjvmId=newJVMId(firstTaskid.getJobID(),firstTaskid.isMap(),jvmIdInt);85. |-TaskUmbilicalProtocolumbilical=(TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,86. TaskUmbilicalProtocol.versionID,address,defaultConf);87. |-while(true)88. |-JvmTaskmyTask=umbilical.getTask(jvmId);89. |-task=myTask.getTask();90. |-taskid=task.getTaskID();91. |-TaskRunner.setupWorkDir(job);92. |-task.run(job,umbilical);|以maptask为例93. |-TaskReporterreporter=newTaskReporter(getProgress(),umbilical);94. |-if(useNewApi)95. |-runNewMapper(job,split,umbilical,reporter);96. |-else97. |-runOldMapper(job,split,umbilical,reporter);98. |-inputSplit=(InputSplit)ReflectionUtils.newInstance(job.getClassByName(splitClass),job);99. |-MapRunnablerunner=ReflectionUtils.newInstance(job.getMapRunnerClass(),job);100. |-runner.run(in,newOldOutputCollector(collector,conf),reporter);101. -102. MapRunner.run()103. |-K1key=input.createKey();104. |-V1value=input.createValue();10
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 散文阅读方法:语言与情感的体悟教案
- 纪检业务知识培训课件计划表
- 胜似亲人作文200字12篇
- 高中作文中秋来历7篇范文
- 加油我一定行450字(14篇)
- 合作完成科技成果转化合同
- 童话寓言作文森林王国的预报员500字(13篇)
- 写人作文我和我的同桌是一对冤家600字(13篇)
- 2025年社会工作师职业水平考试社会工作服务对象干预效果推广报告案例分析试卷
- 客户关系管理策略与实施方案模板
- 2025至2030中国太阳能发电中的水泵行业发展趋势分析与未来投资战略咨询研究报告
- 厂内专用垃圾转运方案(3篇)
- 2025年地质勘探与资源矿产管理技术考试试题及答案
- 中小学教师中高级职称答辩备考试题及答案(50题)
- 2025年药品监管与安全知识考试卷及答案
- 高中班级常规管理课件
- 超声波龈下刮治术专题讲解
- 2025年电信传输工程师职称考试试题
- 2024-2025学年人教版八年级数学上册《全等三角形》综合训练练习题(含答案解析)
- 肾内科常见病诊疗与管理
- 口腔医生岗前培训课件
评论
0/150
提交评论