



免费预览已结束,剩余1页可下载查看
下载本文档
版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
这两天在看spark-submit之后任务是怎么启动的,写篇文章记录一下自己的理解思路:SparkSubmitSparkDeploySchedulerBackendAppClienttryRegisterAllMaster1:客户端启动,初始化相关的环境变量,包括Application代码的提交2:向Master注册Driver 这里需要注意,Master,Worker是已经启动,换句话说,我们的spark集群已经启动。3: SparkDeploySchedulerBackend中启动Application。向Master注册Application def registerWithMaster() tryRegisterAllMasters() import context.dispatcher var retries = 0 registrationRetryTimer = Some context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) Utils.tryOrExit retries += 1 if (registered) registrationRetryTimer.foreach(_.cancel() else if (retries = REGISTRATION_RETRIES) markDead(All masters are unresponsive! Giving up.) else tryRegisterAllMasters() def tryRegisterAllMasters() for (masterAkkaUrl if (state = RecoveryState.STANDBY) / ignore, dont send response else logInfo(Registering app + ) val app = createApplication(description, sender) registerApplication(app) logInfo(Registered app + + with ID + app.id) persistenceEngine.addApplication(app) sender ! RegisteredApplication(app.id, masterUrl) schedule() 进行注册Application,并且发送已经注册App的消息。然后进行schedule()。Schedule函数如下: /* * Schedule the currently available resources among waiting apps. This method will be called * every time a new app joins or resource availability changes. */ private def schedule(): Unit = if (state != RecoveryState.ALIVE) return / Drivers take strict precedence over executors val shuffledWorkers = Random.shuffle(workers) / Randomization helps balance drivers for (worker - shuffledWorkers if worker.state = WorkerState.ALIVE) for (driver = driver.desc.mem & worker.coresFree = driver.desc.cores) launchDriver(worker, driver) waitingDrivers -= driver startExecutorsOnWorkers() 4:Master接受请求后,根据资源情况向worker发送指令启动Executor。(上面最后一行代码)private def startExecutorsOnWorkers(): Unit = / Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app / in the queue, then the second app, etc. if (spreadOutApps) / Try to spread out each app among all the workers, until it has all its cores for (app 0) val usableWorkers = workers.toArray.filter(_.state = WorkerState.ALIVE) .filter(worker = worker.memoryFree = app.desc.memoryPerExecutorMB & worker.coresFree = app.desc.coresPerExecutor.getOrElse(1) .sortBy(_.coresFree).reverse val numUsable = usableWorkers.length val assigned = new ArrayInt(numUsable) / Number of cores to give on each node var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) var pos = 0 while (toAssign 0) if (usableWorkers(pos).coresFree - assigned(pos) 0) toAssign -= 1 assigned(pos) += 1 pos = (pos + 1) % numUsable / Now that weve decided how many cores to give on each node, lets actually give them for (pos 0) allocateWorkerResourceToExecutors(app, assigned(pos), usableWorkers(pos) else / Pack each app into as few workers as possible until weve assigned all its cores for (worker 0 & worker.state = WorkerState.ALIVE) for (app 0) allocateWorkerResourceToExecutors(app, app.coresLeft, worker) 这段代码就是在worker上启动executor。代码中的注释如下:Schedule executors to be launched on the workers. * There are two modes of launching executors. The first attempts to spread out an applications* executors on as many workers as possible, while the second does the opposite (i.e. launch them on as few workers as possible). The former is usually better for data locality purposes and is the default.注释中同时指出:对于executor的启动有两种方式:第一种是在一台worker上启动最够多的executor,第二种是相反的,在worker上启动足够少的executor。这就和Application的分发刚好对应上。Schedule函数中关键代码allocateWorkerResourceToExecutors功能如下:private def allocateWorkerResourceToExecutors( app: ApplicationInfo, coresToAllocate: Int, worker: WorkerInfo): Unit = val memoryPerExecutor = app.desc.memoryPerExecutorMB val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(coresToAllocate) var coresLeft = coresToAllocate while (coresLeft = coresPerExecutor & worker.memoryFree = memoryPerExecutor) val exec = app.addExecutor(worker, coresPerExecutor) coresLeft -= coresPerExecutor launchExecutor(worker, exec) app.state = ApplicationState.RUNNING 红色字体,这下应该看明白了吧。这里启动了Executor。这里executor已经启动了,然后就等着来运行任务了。而对于任务的运行,是不是想到了DAGScheduler和TaskScheduler。5:DAGScheduler进行Job的Stage划分(这里我之前写过一篇文章,所以不再细贴代码了,大体介绍一下就可以了)这里主要是处理handleJobSubmitted事件,首先需要新建stage和activejob。Stage构建的话有几点需要注意哦。Partition和依赖。然后开始提交Stage,这里可以看到,需要先检测有没有miss的stage。执行submitMissingTasks(stage, jobId.get)。针对不同类型的stage,所做的操作是不一样的,我们的stage类型有ShuffleMapStage和ResultStage。然后就是非常重要的一点了,task的序列化。注意:DAGScheduler提交给TaskScheduler的是TaskSet,即一组相同的任务,只是处理的数据不同。OK,继续说task的序列化。ShuffleMapStage和ResultStage序列化的过程也是不一样的。序列化后的任务用广播的方式发送,每个executor得到之后首先会进行反序列化,这样在不同的executor上运行的task是隔离的,不会互相影响。然后开始构建task,ShuffleMapTask和ResultTask。构建完之后,形成TaskSet,将其提交给TaskScheduler。6:TaskScheduler向注册来的Executor发送LaunchTask命令进行任务的运行。这里解释一下,因为源码中比较绕,首先会在TaskSchedulerImpl中执行SubmitTask函数,backend.reviveOffers()。var backend: SchedulerBackend = null这里能够看到我们是调用了reviveOffers函数。然后进入到原函数中看到如下代码: override def reviveOffers() driverEndpoint.send(ReviveOffers) 这里向driverEndpoint发送了ReviveOffers消息。所以我们需要进入到driverEndpoint中找到处理ReviveOffers的函数。driverEndpoint = rpcEnv.setupEndpoint( CoarseGrainedSchedulerBackend.ENDPOINT_NAME, new DriverEndpoint(rpcEnv, properties)这里能够看到new DriverEndpoint,是在CoarseGrainedSchedulerBackend里面。OK,在这里面找ReviveOffers函数。 case ReviveOffers = makeOffers() def makeOffers() launchTasks(scheduler.resourceOffers(executorDataMap.map case (id, executorData) = new WorkerOffer(id, executorData.executorHost, executorData.freeCores) .toSeq) LaunchTasks函数开始处理了。/ Launch tasks returned by a set of resource offers def launchTasks(tasks: SeqSeqTaskDescription) for (task = akkaFrameSize - AkkaUtils.reservedSizeBytes) val taskSetId = scheduler.taskIdToTaskSetId(task.taskId) scheduler.activeTaskSets.get(taskSetId).foreach taskSet = try var msg = Serialized task %s:%d was %d bytes, which exceeds max allowed: + spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing + spark.akka.frameSize or using broadcast var
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 锅炉司炉工技能试题及答案
- 中医药产业园项目节能评估报告
- 2025亳州小升初考试历年真题及答案
- 新媒体账号运营试题及答案
- 物流配送培训考试题
- 矿用涂层复合钢管生产线项目建筑工程方案
- 2025北师大硕士考试真题及答案
- 安全培训心得园长课件
- 2025年国美网络考试试题及答案
- 环境管理体系内审员继续教育考试年答案全集
- 2025年国家电网有限公司特高压建设分公司招聘10人(第一批)笔试参考题库附带答案详解
- 2025原发性骨质疏松症诊疗指南
- 2.3二次根式(第2课时)(教学课件)数学北师大版2024八年级上册
- 2025年会议行业研究报告及未来发展趋势预测
- 2025年辅警考试公安基础知识考试真题(含答案)
- 武松课件教学课件
- 《医疗器械监督抽验介绍》
- 九年级上学案第13课《湖心亭看雪》学案答案
- 高速消防安全知识培训课件
- 2025年建筑工程师高级职称考试试题集
- 污水处理厂工程监理投标文件(技术标)
评论
0/150
提交评论