Spark Streaming源码解读之流数据不断接收详解.doc_第1页
Spark Streaming源码解读之流数据不断接收详解.doc_第2页
Spark Streaming源码解读之流数据不断接收详解.doc_第3页
Spark Streaming源码解读之流数据不断接收详解.doc_第4页
Spark Streaming源码解读之流数据不断接收详解.doc_第5页
已阅读5页,还剩19页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

Spark Streaming源码解读之流数据不断接收详解特别说明: 在上一遍文章中有详细的叙述Receiver启动的过程,如果不清楚的朋友,请您查看上一篇博客,这里我们就基于上篇的结论,继续往下说。博文的目标是: Spark Streaming在接收数据的全生命周期贯通组织思路如下: a) 接收数据的架构模式的设计 b) 然后再具体源码分析接收数据的架构模式的设计 1. 当有Spark Streaming有application的时候Spark Streaming会持续不断的接收数据。 2. 一般Receiver和Driver不在一个进程中的,所以接收到数据之后要不断的汇报给Driver。 3. Spark Streaming要接收数据肯定要使用消息循环器,循环器不断的接收到数据之后,然后将数据存储起来,再将存储完的数据汇报给Driver。 4. Spark Streaming数据接收的过程也是MVC的架构,M是model也就是Receiver. C是Control也就是存储级别的ReceiverSupervisor。V是界面。 5. ReceiverSupervisor是控制器,Receiver的启动是靠ReceiverTracker启动的,Receiver接收到数据之后是靠ReceiverSupervisor存储数据的。然后Driver就获得元数据也就是界面,通过界面来操作底层的数据,这个元数据就相当于指针。 Spark Streaming接收数据流程如下:具体源码分析 1. ReceiverTracker通过发送Job的方式,并且每个Job只有一个Task,并且Task中只通过一个ReceiverSupervisor启动一个Receiver. 2. 下图就是Receiver启动的流程图,现在就从ReceiverTracker的start开始今天的旅程。3. Start方法中创建Endpoint实例/* Start the endpoint and receiver execution thread. */def start(): Unit = synchronized if (isTrackerStarted) throw new SparkException(ReceiverTracker already started) if (!receiverInputStreams.isEmpty) endpoint = ssc.env.rpcEnv.setupEndpoint( ReceiverTracker, new ReceiverTrackerEndpoint(ssc.env.rpcEnv) if (!skipReceiverLaunch) launchReceivers() logInfo(ReceiverTracker started) trackerState = Started 4. LaunchReceivers源码如下:/* * Get the receivers from the ReceiverInputDStreams, distributes them to the * worker nodes as a parallel collection, and runs them. */private def launchReceivers(): Unit = val receivers = receiverInputStreams.map(nis = val rcvr = nis.getReceiver() rcvr.setReceiverId(nis.id) rcvr ) runDummySparkJob() logInfo(Starting + receivers.length + receivers)/此时的endpoint就是前面实例化的ReceiverTrackerEndpoint endpoint.send(StartAllReceivers(receivers)5. 从图上可以知道,send发送消息之后,ReceiverTrackerEndpoint的receive就接收到了消息。override def receive: PartialFunctionAny, Unit = / Local messages case StartAllReceivers(receivers) = val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors) for (receiver Unit = (iterator: IteratorReceiver_) = if (!iterator.hasNext) throw new SparkException( Could not start receiver as object not found.) if (TaskContext.get().attemptNumber() = 0) val receiver = iterator.next() assert(iterator.hasNext = false)/此时的receiver是根据数据输入来源创建的InputDStream/例如socketInputDStream他有自己的receiver也就是SocketReceiver/此时receiver就相当于一个引用句柄。他只是引用的描述 val supervisor = new ReceiverSupervisorImpl( receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)/当startReceiverFunc被调用的时候ReceiverSupervisorImpl的start方法就会运行。 supervisor.start() supervisor.awaitTermination() else / Its restarted by TaskScheduler, but we want to reschedule it again. So exit it. / Create the RDD using the scheduledLocations to run the receiver in a Spark job val receiverRDD: RDDReceiver_ = if (scheduledLocations.isEmpty) /此时Seq(receiver)中只有一个Receiver ssc.sc.makeRDD(Seq(receiver), 1) else val preferredLocations = scheduledLocations.map(_.toString).distinct ssc.sc.makeRDD(Seq(receiver - preferredLocations) /专门为了创建receiver而创建的RDD receiverRDD.setName(sReceiver $receiverId) ssc.sparkContext.setJobDescription(sStreaming job running receiver $receiverId) ssc.sparkContext.setCallSite(Option(ssc.getStartSite().getOrElse(Utils.getCallSite() val future = ssc.sparkContext.submitJobReceiver_, Unit, Unit( receiverRDD, startReceiverFunc, Seq(0), (_, _) = Unit, () / We will keep restarting the receiver job until ReceiverTracker is stopped future.onComplete case Success(_) = if (!shouldStartReceiver) onReceiverJobFinish(receiverId) else logInfo(sRestarting Receiver $receiverId) self.send(RestartReceiver(receiver) case Failure(e) = if (!shouldStartReceiver) onReceiverJobFinish(receiverId) else logError(Receiver has been stopped. Try to restart it., e) logInfo(sRestarting Receiver $receiverId) self.send(RestartReceiver(receiver) (submitJobThreadPool) logInfo(sReceiver $receiver.streamId started)6. startReceiver源码如下:/* * Start a receiver along with its scheduled executors */private def startReceiver( receiver: Receiver_, scheduledLocations: SeqTaskLocation): Unit = def shouldStartReceiver: Boolean = / Its okay to start when trackerState is Initialized or Started !(isTrackerStopping | isTrackerStopped) val receiverId = receiver.streamId if (!shouldStartReceiver) onReceiverJobFinish(receiverId) return val checkpointDirOption = Option(ssc.checkpointDir) val serializableHadoopConf = new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)/ startReceiverFunc就是我们通过RDD启动Job的那个Func / Function to start the receiver on the worker node/此时虽然是iterator但是就是一个Receiver,因为你如果追溯一下前面StartReceiver被调用的时候是for循环遍历Receivers. val startReceiverFunc: IteratorReceiver_ = Unit = (iterator: IteratorReceiver_) = if (!iterator.hasNext) throw new SparkException( Could not start receiver as object not found.) if (TaskContext.get().attemptNumber() = 0) val receiver = iterator.next() assert(iterator.hasNext = false)/此时的receiver是根据数据输入来源创建的InputDStream/例如socketInputDStream他有自己的receiver也就是SocketReceiver/此时receiver就相当于一个引用句柄。他只是引用的描述 val supervisor = new ReceiverSupervisorImpl( receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)/当startReceiverFunc被调用的时候ReceiverSupervisorImpl的start方法就会运行。 supervisor.start() supervisor.awaitTermination() else / Its restarted by TaskScheduler, but we want to reschedule it again. So exit it. / Create the RDD using the scheduledLocations to run the receiver in a Spark job val receiverRDD: RDDReceiver_ = if (scheduledLocations.isEmpty) /此时Seq(receiver)中只有一个Receiver ssc.sc.makeRDD(Seq(receiver), 1) else val preferredLocations = scheduledLocations.map(_.toString).distinct ssc.sc.makeRDD(Seq(receiver - preferredLocations) /专门为了创建receiver而创建的RDD receiverRDD.setName(sReceiver $receiverId) ssc.sparkContext.setJobDescription(sStreaming job running receiver $receiverId) ssc.sparkContext.setCallSite(Option(ssc.getStartSite().getOrElse(Utils.getCallSite() val future = ssc.sparkContext.submitJobReceiver_, Unit, Unit( receiverRDD, startReceiverFunc, Seq(0), (_, _) = Unit, () / We will keep restarting the receiver job until ReceiverTracker is stopped future.onComplete case Success(_) = if (!shouldStartReceiver) onReceiverJobFinish(receiverId) else logInfo(sRestarting Receiver $receiverId) self.send(RestartReceiver(receiver) case Failure(e) = if (!shouldStartReceiver) onReceiverJobFinish(receiverId) else logError(Receiver has been stopped. Try to restart it., e) logInfo(sRestarting Receiver $receiverId) self.send(RestartReceiver(receiver) (submitJobThreadPool) logInfo(sReceiver $receiver.streamId started)7. 现在就追踪一下receiver参数的传递过程。先找到startReceiver在哪里调用。override def receive: PartialFunctionAny, Unit = / Local messages case StartAllReceivers(receivers) = val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors) for (receiver val rcvr = nis.getReceiver() rcvr.setReceiverId(nis.id) rcvr ) runDummySparkJob() logInfo(Starting + receivers.length + receivers) endpoint.send(StartAllReceivers(receivers)submitJob的时候就提交了作业,在具体的节点上运行Job,此时是通过ReceiverSupervisorImpl完成的。 此时在ReceiverTracker的startReceiver调用的时候完成了两件事:ReceiverTrackerImpl的初始化和start方法的调用。第一步:ReceiverTrackerImpl的初始化 1. ReceiverSupervisor负责接收receiver接收的数据,之后,ReceiverSupervisor会存储数据,然后汇报给Driver。Receiver是一条一条的接收数据(Kafka是 Key Value的形式)。/* * Concrete implementation of org.apache.spark.streaming.receiver.ReceiverSupervisor * which provides all the necessary functionality for handling the data received by * the receiver. Specifically, it creates a org.apache.spark.streaming.receiver.BlockGenerator * object that is used to divide the received data stream into blocks of data. */privatestreaming class ReceiverSupervisorImpl(2. ReceiverSupervisorImpl初始化源码如下:/* Remote RpcEndpointRef for the ReceiverTracker */负责链接ReceiverTracker的消息通信体private val trackerEndpoint = RpcUtils.makeDriverRef(ReceiverTracker, env.conf, env.rpcEnv)/* RpcEndpointRef for receiving messages from the ReceiverTracker in the driver */endpoint负责在Driver端接收ReceiverTracker发送来的消息。private val endpoint = env.rpcEnv.setupEndpoint( Receiver- + streamId + - + System.currentTimeMillis(), new ThreadSafeRpcEndpoint override val rpcEnv: RpcEnv = env.rpcEnv override def receive: PartialFunctionAny, Unit = case StopReceiver = logInfo(Received stop signal) ReceiverSupervisorImpl.this.stop(Stopped by driver, None)/每个Batch处理完数据之后,Driver的ReceiverTracker会发消息给ReceiverTrackerImpl要求清理Block信息。 case CleanupOldBlocks(threshTime) = logDebug(Received delete old batch signal) cleanupOldBlocks(threshTime)/限制receiver接收数据的,也就是限流的。这样的话就可以动态的改变receiver/的数据接收速度。 case UpdateRateLimit(eps) = logInfo(sReceived a new rate limit: $eps.) registeredBlockGenerators.foreach bg = bg.updateRate(eps) )cleanupOldBlocks源码如下: private def cleanupOldBlocks(cleanupThreshTime: Time): Unit = logDebug(sCleaning up blocks older then $cleanupThreshTime) receivedBlockHandler.cleanupOldBlocks(cleanupThreshTliseconds) 3. 从对象中获得限流的速度,这对于实际生产环境非常重要,因为有时间数据请求量非常的多,整个集群又处理不完或者来不及处理,这个时候如果不限流的话,延迟就非常的高。/* * Set the rate limit to newRate. The new rate will not exceed the maximum rate configured by * spark.streaming.receiver.maxRate, even if newRate is higher than that. * * param newRate A new rate in events per second. It has no effect if its 0 or negative. */ privatereceiver def updateRate(newRate: Long): Unit = if (newRate 0) if (maxRateLimit 0) rateLimiter.setRate(newRate.min(maxRateLimit) else rateLimiter.setRate(newRate) 至此上面就完成了ReceiverSupervisorImpl的初始化。这里只是简单的提了一些,后面还会详解第二步:ReceiverTrackerImpl的start方法被调用。 在ReceiverTrackerImpl的函数中,并没有start方法,这个时候的实现是在其父类start方法中实现的。4. 在supervisor启动的时候会调用ReceiverSupervisor的start方法/* Start the supervisor */def start() onStart() startReceiver()5. onstart方法: 此方法必须在receiver.onStart()之前被调用,来确保BlockGenerator被实例化和启动。Receiver在接收数据的时候是通过BlockGenerator转换成Block形式,因为Receiver一条一条的接收数据,需要将此数据合并成Block,RDD的处理单位是Block。/* * Called when supervisor is started. * Note that this must be called before the receiver.onStart() is called to ensure * things like BlockGenerators are started before the receiver starts sending data. */protected def onStart() 6. onStart方法具体实现是在RceiverSupervisorImpl方法中实现的。override protected def onStart() registeredBlockGenerators.foreach _.start() 什么是BlockGenerator? 将接收到的数据以Batch的方式存在,并且以特定的频率存储。 BlockGenerator会启动两条线程: 1. 一条线程会周期性的把Receiver接收到的数据合并成Block。 2. 另一条线程是把接收到的数据使用BlockManager存储。 BlockGenerator继承自RateLimiter,由此可以看出无法限定流熟度,但是可以限定存储的速度,转过来限制流进来的速度。/* * Generates batches of objects received by a * org.apache.spark.streaming.receiver.Receiver and puts them into appropriately * named blocks at regular intervals. This class starts two threads, * one to periodically start a new batch and prepare the previous batch of as a block, * the other to push the blocks into the block manager. * * Note: Do not create BlockGenerator instances directly inside receivers. Use * ReceiverSupervisor.createBlockGenerator to create a BlockGenerator and use it. */privatestreaming class BlockGenerator( listener: BlockGeneratorListener, receiverId: Int, conf: SparkConf, clock: Clock = new SystemClock() ) extends RateLimiter(conf) with Logging BlockGenerator是怎么产生的? 7. 在ReceiverSupervisorImpl的createBlockGenerator方法中实现了BlockGenerator的创建。override def createBlockGenerator( blockGeneratorListener: BlockGeneratorListener): BlockGenerator = / Cleanup BlockGenerators that have already been stopped registeredBlockGenerators -= registeredBlockGenerators.filter _.isStopped() /一个streamId指服务于一个BlockGenerator val newBlockGenerator = new BlockGenerator(blockGeneratorListener, streamId, env.conf) registeredBlockGenerators += newBlockGenerator newBlockGenerator8. 回到上面ReceiverTrackerImpl的onStart方法override protected def onStart() /启动BlockGenerator的定时器不断的把数据放在内存中的Buffer中然后将多条Buffer合并成Block,此时只是准备去接收Receiver的数据 registeredBlockGenerators.foreach _.start() 9. BlockGenerator的start方法启动了BlockGenerator的两条线程。/* Start block generating and pushing threads. */def start(): Unit = synchronized if (state = Initialized) state = Active blockIntervalTimer.start() blockPushingThread.start() logInfo(Started BlockGenerator) else throw new SparkException( sCannot start BlockGenerator as its not in the Initialized state state = $state) blockIntervalTimer是RecurringTimer实例。private val blockIntervalTimer = new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, BlockGenerator)11. blockIntervalTimer的start方法。/* * Start at the earliest time it can start based on the period. */def start(): Long = start(getStartTime()12. 启动线程/* * Start at the given start time. */def start(startTime: Long): Long = synchronized nextTime = startTime thread.start() logInfo(Started timer for + name + at time + nextTime) nextTime13. Tread启动loop.private val thread = new Thread(RecurringTimer - + name) setDaemon(true) override def run() loop 14. Loop也就会调用triggerActionForNextInterval() /* * Repeatedly call the callback every interval. */ private def loop() try while (!stopped) triggerActionForNextInterval() triggerActionForNextInterval() catch case e: InterruptedException = 15. 此时callback函数就会回调updateCurrentBuffer方法。private def triggerActionForNextInterval(): Unit = clock.waitTillTime(nextTime) callback(nextTime) prevTime = nextTime nextTime += period logDebug(Callback for + name + called at time + prevTime)16. 在RecurringTimer实例创建的时候,第三个参数传入的就是updateCurrentBuffer方法。privatestreamingclass RecurringTimer(clock: Clock, period: Long, callback: (Long) = Unit, name: String) extends Logging 17. 把接收到的数据放入到Buffer缓存中,然后再把Buffer按照一定的大小合并成Block./* Change the buffer to which single records are added to. */private def updateCurrentBuffer(time: Long): Unit = try var newBlock: Block = null synchronized if (currentBuffer.nonEmpty) val newBlockBuffer = currentBuffer = new ArrayBufferAny val blockId = StreamBlockId(receiverId, time - blockIntervalMs) listener.onGenerateBlock(blockId) newBlock = new Block(blockId, newBlockBuffer) if (newBlock != null) /将生成成功的Block放入到队列中 blocksForPushing.put(newBlock) / put is blocking when queue is full catch case ie: InterruptedException = logInfo(Block updating timer thread was interrupted) case e: Exception = reportError(Error in block updating thread, e) BlockGenerator的start启动就分析完了,至此准备好接收Receiver数据了。 BlockGenerator的start启动

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

最新文档

评论

0/150

提交评论