Spark源代码导读02.pdf_第1页
Spark源代码导读02.pdf_第2页
Spark源代码导读02.pdf_第3页
Spark源代码导读02.pdf_第4页
Spark源代码导读02.pdf_第5页
已阅读5页,还剩33页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

Spark源代码导读 讲师 冰风影 DATAGURU专业数据分析社区 Spark源代码导读 第2周 Spark源代码导读 讲师 冰风影 DATAGURU专业数据分析社区 法律声明 声明 本视频和幻灯片为炼数成金网络课程的教学资料 所有资料只能在课程内使用 不得在课程以外范围散 播 违者将可能被追究法律和经济责任 课程详情访问炼数成金培训网站 Spark源代码导读 讲师 冰风影 DATAGURU专业数据分析社区 Spark源代码导读 WordCount SparkContext Spark RDD Spark源代码导读 讲师 冰风影 DATAGURU专业数据分析社区 Spark源代码导读 SimpleApp scala import org apache spark SparkContext import org apache spark SparkContext import org apache spark SparkConf object SimpleApp def main args Array String val logFile file home jifeng code spark 1 4 0 README md val conf new SparkConf setAppName Scala Application val sc new SparkContext conf val file sc textFile logFile val count file flatMap line line split map word word 1 reduceByKey count collect count saveAsTextFile file home jifeng code out bb Spark源代码导读 讲师 冰风影 DATAGURU专业数据分析社区 Spark源代码导读 SBT打包 目录结构 simple sbt src src main src main scala src main scala SimpleApp scala cat simple sbt name Simple Project version 1 0 scalaVersion 2 10 4 libraryDependencies org apache spark spark core 1 4 0 命令 sbt package Spark源代码导读 讲师 冰风影 DATAGURU专业数据分析社区 Spark源代码导读 运行方式 1 spark shell bin spark shell executor memory 1g driver memory 1g master spark feng03 7077 2 spark submit bin spark submit class SimpleApp master spark feng03 7077 home jifeng code simple target scala 2 10 simple project 2 10 1 0 jar Spark源代码导读 讲师 冰风影 DATAGURU专业数据分析社区 Spark源代码导读 IDEA中运行 spark submit bin spark submit class SimpleApp master spark feng05 7077 home jifeng code simple target scala 2 10 simple project 2 10 1 0 jar Spark源代码导读 讲师 冰风影 DATAGURU专业数据分析社区 SparkContext new SparkConf 作为SparkContext构造函数的参数 SparkContext scala 207 private var env SparkEnv 257 def isLocal Boolean master local master startsWith local 260 private spark val listenerBus new LiveListenerBus 263 private spark def createSparkEnv conf SparkConf isLocal Boolean 判断本地运行 listenerBus LiveListenerBus SparkEnv SparkEnv createDriverEnv conf isLocal listenerBus 270 private spark def env SparkEnv env 424 env createSparkEnv conf isLocal listenerBus 425 SparkEnv set env 根据SparkConf 创建 SparkEnv Spark源代码导读 讲师 冰风影 DATAGURU专业数据分析社区 Spark源代码导读 listenerBus LiveListenerBus 异步处理事件的类 保存有消息队列 负责消息的缓存 保存有注册过的侦听器 负责消息的分发 SparkContext scala 260 private spark val listenerBus new LiveListenerBus private spark class LiveListenerBus extends AsynchronousListenerBus SparkListener SparkListenerEvent SparkListenerBus with SparkListenerBus private val logDroppedEvent new AtomicBoolean false override def onDropEvent event SparkListenerEvent Unit if logDroppedEpareAndSet false true Only log the following message once to avoid duplicated annoying logs logError Dropping SparkListenerEvent because no remaining room in event queue This likely means one of the SparkListeners is too slow and cannot keep up with the rate at which tasks are being started by the scheduler Spark源代码导读 讲师 冰风影 DATAGURU专业数据分析社区 Spark源代码导读 listenerBus LiveListenerBus 异步处理事件的类 SparkContext scala 260 private spark val listenerBus new LiveListenerBus private spark class LiveListenerBus extends AsynchronousListenerBus SparkListener SparkListenerEvent SparkListenerBus with SparkListenerBus org apache spark util AsynchronousListenerBus scala private val EVENT QUEUE CAPACITY 10000 队列的长度 private val eventQueue new LinkedBlockingQueue E EVENT QUEUE CAPACITY 使用基于链表阻塞队列 private val started new AtomicBoolean false 启劢成功标志 private val stopped new AtomicBoolean false 停止标志 private var processingEvent false 处理事件 private val eventLock new Semaphore 0 信号量 Spark源代码导读 讲师 冰风影 DATAGURU专业数据分析社区 Spark源代码导读 消息机制的核心 既然是消息队列 就涉及到消息的生产和消费 消费方式 会创建一个消费者线程listenerThread 来从消息队列中取得消息并进行分发 下面是实现代码 def start sc SparkContext if pareAndSet false true sparkContext sc listenerThread start else throw new IllegalStateException s name already started Spark源代码导读 讲师 冰风影 DATAGURU专业数据分析社区 Spark源代码导读 private val listenerThread new Thread name setDaemon true 线程设为守护线程 override def run Unit Utils tryOrStopSparkContext sparkContext while true eventLock acquire 信号量 获取一个许可 self synchronized processingEvent true try val event eventQueue poll if event null if stopped get 发生了错误 throw new IllegalStateException Polling null from eventQueue means 正常结束 此时stopped值已经被设置为true return postToAll event 事件的后续处理 分发消息 finally self synchronized processingEvent false Spark源代码导读 讲师 冰风影 DATAGURU专业数据分析社区 Spark源代码导读 def post event E if stopped get Drop further events to make listenerThread exit ASAP logError s name has already stopped Dropping event event return val eventAdded eventQueue offer event 如果成功加入队列 则在信号量的值加一 消费者线程就可以消费这个事件 if eventAdded eventLock release else 队列满了 onDropEvent event Spark源代码导读 讲师 冰风影 DATAGURU专业数据分析社区 Spark源代码导读 程序运行结束后 会调用stop函数 def stop if started get throw new IllegalStateException s Attempted to stop name that has not yet started if pareAndSet false true Call eventLock release so that listenerThread will poll null from eventQueue and know stop is called eventLock release eventLock release 来增加信号量的值 然而并未向消息队列中加入新的消息 这就导致在消费者线程listenerThread读取队列时会返回null值 通过这个方式来结束listenerThread线程 listenerThread join else Keep quiet Spark源代码导读 讲师 冰风影 DATAGURU专业数据分析社区 Spark源代码导读 class SparkEnv val executorId String private spark val rpcEnv RpcEnv val serializer Serializer val closureSerializer Serializer val cacheManager CacheManager 缓存管理 中间结果 val mapOutputTracker MapOutputTracker 缓存MapStatus信息 val shuffleManager ShuffleManager 路由表 val broadcastManager BroadcastManager 广播管理 val blockTransferService BlockTransferService 块传输 val blockManager BlockManager 块管理 val securityManager SecurityManager 安全管理 val httpFileServer HttpFileServer http文件服务 val sparkFilesDir String 文件存储目录 val metricsSystem MetricsSystem 测量 val shuffleMemoryManager ShuffleMemoryManager shuffle内存管理 val executorMemoryManager ExecutorMemoryManager executor内存管理 val outputCommitCoordinator OutputCommitCoordinator val conf SparkConf extends Logging SparkEnv scala Spark源代码导读 讲师 冰风影 DATAGURU专业数据分析社区 Spark源代码导读 212 private var ui Option SparkUI None 285 private spark def ui Option SparkUI ui 438 ui if conf getBoolean spark ui enabled true Some SparkUI createLiveUI this conf listenerBus jobProgressListener env securityManager appName startTime startTime else For tests do not enable the UI None Bind the UI before starting the task scheduler to communicate the bound port to the cluster manager properly ui foreach bind 绑定端口 Spark源代码导读 讲师 冰风影 DATAGURU专业数据分析社区 Spark源代码导读 SparkContext scala 创建和启劢Scheduler 216 private var taskScheduler TaskScheduler 308 private spark def taskScheduler TaskScheduler taskScheduler private spark def taskScheduler ts TaskScheduler Unit taskScheduler ts 488 Create and start the scheduler val sched ts SparkContext createTaskScheduler this master schedulerBackend sched taskScheduler ts dagScheduler new DAGScheduler this heartbeatReceiver send TaskSchedulerIsSet start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler s constructor taskScheduler start Spark源代码导读 讲师 冰风影 DATAGURU专业数据分析社区 Spark源代码导读 2403 private def createTaskScheduler sc SparkContext master String SchedulerBackend TaskScheduler Regular expression used for local N and local master formats val LOCAL N REGEX local 0 9 r Regular expression for local N maxRetries used in tests with failing tasks val LOCAL N FAILURES REGEX local 0 9 s s 0 9 r Regular expression for simulating a Spark cluster of N cores memory locally val LOCAL CLUSTER REGEX local cluster s 0 9 s s 0 9 s s 0 9 s r Regular expression for connecting to Spark deploy clusters val SPARK REGEX spark r Regular expression for connection to Mesos cluster by mesos or zk url val MESOS REGEX mesos zk r Regular expression for connection to Simr cluster val SIMR REGEX simr r Spark源代码导读 讲师 冰风影 DATAGURU专业数据分析社区 Spark源代码导读 master match case local val scheduler new TaskSchedulerImpl sc MAX LOCAL TASK FAILURES isLocal true val backend new LocalBackend sc getConf scheduler 1 scheduler initialize backend backend scheduler case LOCAL N REGEX threads def localCpuCount Int Runtime getRuntime availableProcessors local estimates the number of cores on the machine local N uses exactly N threads val threadCount if threads localCpuCount else threads toInt if threadCount def localCpuCount Int Runtime getRuntime availableProcessors local M means the number of cores on the computer with M failures local N M means exactly N threads with M failures val threadCount if threads localCpuCount else threads toInt val scheduler new TaskSchedulerImpl sc maxFailures toInt isLocal true val backend new LocalBackend sc getConf scheduler threadCount scheduler initialize backend backend scheduler case SPARK REGEX sparkUrl val scheduler new TaskSchedulerImpl sc val masterUrls sparkUrl split map spark val backend new SparkDeploySchedulerBackend scheduler sc masterUrls scheduler initialize backend backend scheduler Spark源代码导读 讲师 冰风影 DATAGURU专业数据分析社区 Spark源代码导读 Spark源代码导读 讲师 冰风影 DATAGURU专业数据分析社区 Spark源代码导读 org apache spark scheduler TaskScheduler private spark trait TaskScheduler private val appId spark application System currentTimeMillis def rootPool Pool def schedulingMode SchedulingMode def start Unit wait for slave registrations etc def postStartHook Disconnect from the cluster def stop Unit Submit a sequence of tasks to run def submitTasks taskSet TaskSet Unit 接收DAGScheduler提交来的tasks Cancel a stage def cancelTasks stageId Int interruptThread Boolean 取消一个stage的tasks Set the DAG scheduler for upcalls This is guaranteed to be set before submitTasks is called def setDAGScheduler dagScheduler DAGScheduler Unit Get the default level of parallelism to use in the cluster as a hint for sizing jobs def defaultParallelism Int Spark源代码导读 讲师 冰风影 DATAGURU专业数据分析社区 Spark源代码导读 org apache spark scheduler TaskSchedulerImpl scala private spark class TaskSchedulerImpl val sc SparkContext val maxTaskFailures Int isLocal Boolean false extends TaskScheduler with Logging 123行 def initialize backend SchedulerBackend this backend backend temporarily set rootPool name to empty rootPool new Pool schedulingMode 0 0 schedulableBuilder schedulingMode match 调度模式 FIFO和公平调度的模式 case SchedulingMode FIFO new FIFOSchedulableBuilder rootPool case SchedulingMode FAIR new FairSchedulableBuilder rootPool conf schedulableBuilder buildPools Spark源代码导读 讲师 冰风影 DATAGURU专业数据分析社区 Spark源代码导读 140行 override def start backend start if isLocal it will call back into the listener actor actorSystem actorOf Props new ClientActor 61 行 private class ClientActor extends Actor with ActorLogReceive with Logging var master ActorSelection null var alreadyDisconnected false To avoid calling listener disconnected multiple times var alreadyDead false To avoid calling listener dead multiple times var registrationRetryTimer Option Cancellable None Spark源代码导读 讲师 冰风影 DATAGURU专业数据分析社区 Spark源代码导读 override def preStart context system eventStream subscribe self classOf RemotingLifecycleEvent try registerWithMaster catch case e Exception logWarning Failed to connect to master e markDisconnected context stop self def tryRegisterAllMasters for masterAkkaUrl REGISTRATION RETRIES markDead All masters are unresponsive Giving up else tryRegisterAllMasters Spark源代码导读 讲师 冰风影 DATAGURU专业数据分析社区 Spark源代码导读 Spark源代码导读 讲师 冰风影 DATAGURU专业数据分析社区 Spark RDD val file sc textFile logFile 795 def textFile path String minPartitions Int defaultMinPartitions RDD String withScope assertNotStopped hadoopFile path classOf TextInputFormat classOf LongWritable classOf Text minPartitions map pair pair 2 toString 1925 def defaultMinSplits Int math min defaultParallelism 2 参数 TextInputFormat Mapper的参数LongWritable Text 最小分区 转化成MapPartitionsRDD Spark源代码导读 讲师 冰风影 DATAGURU专业数据分析社区 Spark RDD 969 def hadoopFile K V path String inputFormatClass Class FileInputFormat setInputPaths jobConf path 设置路径 new HadoopRDD HadoopRDD返回 this confBroadcast Some setInputPathsFunc inputFormatClass keyClass valueClass minPartitions setName path Spark源代码导读 讲师 冰风影 DATAGURU专业数据分析社区 Spark RDD 199 HadoopRDD scala override def getPartitions Array Partition val jobConf getJobConf add the credentials here as this can be called before SparkContext initialized SparkHadoopUtil get addCredentials jobConf val inputFormat getInputFormat jobConf if inputFormat isInstanceOf Configurable inputFormat asInstanceOf Configurable setConf jobConf val inputSplits inputFormat getSplits jobConf minPartitions getSplits方法来计算分片 val array new Array Partition inputSplits size for i U RDD U withScope val cleanF sc clean f 闭包清理 new MapPartitionsRDD U T this context pid iter iter map cleanF MapPartitionsRDD scala private spark class MapPartitionsRDD U ClassTag T ClassTag 匿名函数f prev RDD T f TaskContext Int Iterator T Iterator U TaskContext partition index iterator preservesPartitioning Boolean false extends RDD U prev override val partition

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论