Spark Streaming为啥要设置两条线程?.doc_第1页
Spark Streaming为啥要设置两条线程?.doc_第2页
Spark Streaming为啥要设置两条线程?.doc_第3页
Spark Streaming为啥要设置两条线程?.doc_第4页
Spark Streaming为啥要设置两条线程?.doc_第5页
已阅读5页,还剩1页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

JobScheduler详解一:JobSheduler的源码解析 1. JobScheduler是Spark Streaming整个调度的核心,相当于Spark Core上的DAGScheduler. 2. Spark Streaming为啥要设置两条线程? setMaster指定的两条线程是指程序运行的时候至少需要两条线程。一条线程用于接收数据,需要不断的循环。而我们指定的线程数是用于作业处理的。 3. JobSheduler的启动是在StreamContext的start方法被调用的时候启动的。def start(): Unit = synchronized state match case INITIALIZED = startSite.set(DStream.getCreationSite() StreamingContext.ACTIVATION_LOCK.synchronized StreamingContext.assertNoOtherContextIsActive() try validate()/而这里面启动的新线程是调度方面的,因此和我们设置的线程数没有关系。 / Start the streaming scheduler in a new thread, so that thread local properties / like call sites and job groups can be reset without affecting those of the / current thread. ThreadUtils.runInNewThread(streaming-start) sparkContext.setCallSite(startSite.get) sparkContext.clearJobGroup() sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, false) scheduler.start() 4. jobScheduler会负责逻辑层面的Job,并将其物理级别的运行在Spark之上./* * This class schedules jobs to be run on Spark. It uses the JobGenerator to generate * the jobs and runs them using a thread pool. */privatestreamingclass JobScheduler(val ssc: StreamingContext) extends Logging 5. jobScheduler的start方法源码如下:def start(): Unit = synchronized if (eventLoop != null) return / scheduler has already been started logDebug(Starting JobScheduler) eventLoop = new EventLoopJobSchedulerEvent(JobScheduler) override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = reportError(Error in job scheduler, e) eventLoop.start() / attach rate controllers of input streams to receive batch completion updates for inputDStream - ssc.graph.getInputStreams rateController handleJobStart(job, startTime) case JobCompleted(job, completedTime) = handleJobCompletion(job, completedTime) case ErrorReported(m, e) = handleError(m, e) catch case e: Throwable = reportError(Error in job scheduler, e) 7. handleJobStart的源码如下:private def handleJobStart(job: Job, startTime: Long) val jobSet = jobSets.get(job.time) val isFirstJobOfJobSet = !jobSet.hasStarted jobSet.handleJobStart(job) if (isFirstJobOfJobSet) / StreamingListenerBatchStarted should be posted after calling handleJobStart to get the / correct jobScessingStartTime. listenerBus.post(StreamingListenerBatchStarted(jobSet.toBatchInfo) job.setStartTime(startTime) listenerBus.post(StreamingListenerOutputOperationStarted(job.toOutputOperationInfo) logInfo(Starting job + job.id + from job set of time + jobSet.time)8. JobScheduler初始化的时候干了那些事?此时为啥要设置并行度呢? 1) 如果Batch Duractions中有多个Output操作的话,提高并行度可以极大的提高性能。 2) 不同的Batch,线程池中有很多的线程,也可以并发运行。 将逻辑级别的Job转化为物理级别的job就是通过newDaemonFixedThreadPool线程实现的。/ Use of ConcurrentHashMap.keySet later causes an odd runtime problem due to Java 7/8 diff/ /AlainODea/1375759b8720a3f9f094private val jobSets: java.util.MapTime, JobSet = new ConcurrentHashMapTime, JobSet/可以手动设置并行度private val numConcurrentJobs = ssc.conf.getInt(spark.streaming.concurrentJobs, 1)/ numConcurrentJobs 默认是1private val jobExecutor = ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, streaming-job-executor)/初始化JoGeratorprivate val jobGenerator = new JobGenerator(this)val clock = jobGenerator.clock/val listenerBus = new StreamingListenerBus()/ These two are created only when scheduler starts./ eventLoop not being null means the scheduler has been started and not stoppedvar receiverTracker: ReceiverTracker = nullprint的函数源码如下: 1. DStream中的print源码如下:/* * Print the first ten elements of each RDD generated in this DStream. This is an output * operator, so this DStream will be registered as an output stream and there materialized. */def print(): Unit = ssc.withScope print(10)2. 实际调用的时候还是对RDD进行操作。/* * Print the first num elements of each RDD generated in this DStream. This is an output * operator, so this DStream will be registered as an output stream and there materialized. */def print(num: Int): Unit = ssc.withScope def foreachFunc: (RDDT, Time) = Unit = (rdd: RDDT, time: Time) = val firstNum = rdd.take(num + 1) / scalastyle:off println println(-) println(Time: + time) println(-) firstNum.take(num).foreach(println) if (firstNum.length num) println(.) println() / scalastyle:on println foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)3. foreachFunc封装了RDD的操作。/* * Apply a function to each RDD in this DStream. This is an output operator, so * this DStream will be registered as an output stream and therefore materialized. * param foreachFunc RDD function * param displayInnerRDDOps Whether the detailed callsites and scopes of the RDDs generated * in the foreachFunc to be displayed in the UI. If false, then * only the scopes and callsites of foreachRDD will override those * of the RDDs on the display. */private def foreachRDD( foreachFunc: (RDDT, Time) = Unit, displayInnerRDDOps: Boolean): Unit = new ForEachDStream(this, context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()4. 每个BatchDuractions都会根据generateJob生成作业。/* * An internal DStream used to represent output operations like DStream.foreachRDD. * param parent Parent DStream * param foreachFunc Function to apply on each RDD generated by the parent DStream * param displayInnerRDDOps Whether the detailed callsites and scopes of the RDDs generated * by foreachFunc will be displayed in the UI; only the scope and * callsite of DStream.foreachRDD will be displayed. */privatestreamingclass ForEachDStreamT: ClassTag ( parent: DStreamT, foreachFunc: (RDDT, Time) = Unit, displayInnerRDDOps: Boolean ) extends DStreamUnit(parent.ssc) override def dependencies: ListDStream_ = List(parent) override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): OptionRDDUnit = None/每个Batch Duractions都根据generateJob生成Job override def generateJob(time: Time): OptionJob = parent.getOrCompute(time) match case Some(rdd) = val jobFunc = () = createRDDWithLocalProperties(time, displayInnerRDDOps) /foreachFunc基于rdd和time封装为func了,此时的foreachFunc就被job.run/的时候调用了。/此时的RDD就是基于时间生成的RDD,这个RDD就是DStreamGraph中的最后一个DStream决定的。然后 foreachFunc(rdd, time) Some(new Job(time, jobFunc) case None = None 5. 此时的foreachFunc是从哪里来的?privatestreaming/参数传递过来的,这个时候就要去找forEachDStream在哪里被调用。 class ForEachDStreamT: ClassTag ( parent: DStreamT, foreachFunc: (RDDT, Time) = Unit, displayInnerRDDOps: Boolean ) extends DStreamUnit(parent.ssc) 6. 由此可以知道真正Job的生成是通过ForeachDStream通generateJob来生成的,此时是逻辑级别的,但是真正被物理级别的调用是在JobGenerator中generateJobs被调用的。def generateJobs(time: Time): SeqJob = logDebug(Generating jobs for time + time) val

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论