版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
大数据导论第十章大数据导论第十章1CONTENTS目录PART01SparkStreaming简介PART02SparkStreaming的执行模型PART03编程模型PART04DStream的操作PART05持久化和性能调优PART06编程实战PART07作业CONTENTS目录PART01SparkStrea2PART01SparkStreaming简介SparkStreaming是Spark核心API的一个扩展,可以实现高吞吐量的、具备容错机制的实时流数据的处理。PART01SparkStreaming简介Spa3概述SparkStreaming是Spark核心API的一个扩展,可以实现高吞吐量的、具备容错机制的实时流数据的处理。支持从多种数据源获取数据,再使用高级函数进行复杂算法的处理。最后还可以将处理结果存储到文件系统,数据库和现场仪表盘。SparkStreaming数据流:概述SparkStreaming是Spark核心API的一4概述SparkStreaming工作原理:接收实时的输入数据流根据一定的时间间隔(比如1秒钟)拆分成一批批的数据然后通过Spark引擎处理这些批数据最终得到处理后的一批批结果数据概述SparkStreaming工作原理:5概述SparkStreaming支持一个高层的抽象,叫做离散流(DiscretizedStream)或者DStream,它代表连续的数据流。在内部,DStream是由一系列RDD组成。对应的批数据,在Spark内核对应一个RDD实例。因此,对应流数据的DStream可以看成是一组RDD,即RDD的一个序列。概述SparkStreaming支持一个高层的抽象,叫做离6术语定义离散流(DiscretizedStream)或DStream这是SparkStreaming对内部持续的实时数据流的抽象描述,即处理的一个实时数据流,在SparkStreaming中对应于一个DStream实例。批数据(BatchData)
是化整为零的第一步。将实时流数据以时间片为单位进行分批,将流处理转化为时间片数据的批处理。随着持续时间的推移,这些处理结果就形成了对应的结果数据流了。术语定义离散流(DiscretizedStream)或D7术语定义时间片或批处理时间间隔(BatchInterval)这是人为地对流数据进行定量的标准,以时间片作为拆分流数据的依据。一个时间片的数据对应一个RDD实例。窗口长度(WindowLength)一个窗口覆盖的流数据的时间长度。必须是批处理时间间隔的倍数。术语定义时间片或批处理时间间隔(BatchInterval8术语定义滑动时间间隔前一个窗口到后一个窗口所经过的时间长度。必须是批处理时间间隔的倍数。InputDStream一个InputDStream是一个特殊的DStream,将SparkStreaming连接到一个外部数据源来读取数据。术语定义滑动时间间隔InputDStream9PART02SparkStreaming的执行模型SparkStreaming是一个对实时数据流进行高通量、容错处理的流式处理系统,可以对多种数据源进行类似Map、Reduce等复杂操作,并将结果保存到外部文件系统、数据库或应用到实时仪表盘。PART02SparkStreaming的执行模型10传统流处理系统架构传统流处理系统架构传统流处理系统架构传统流处理系统架构11传统流处理系统架构大部分传统的流处理系统被设计为连续算子模型。系统包含一系列的工作节点,每组节点运行一至多个连续算子;对于流数据,每个连续算子一次处理一条记录,并且将记录传输给管道中别的算子;源算子(SourceOperator)从采集系统接收数据,接着沉算子(SinkOperator)输出到下游系统。连续算子是一种较为简单、自然的模型。然而,在大数据时代,这个传统的架构也面临着严峻的挑战。故障恢复问题2.负载均衡问题支持统一的流处理与批处理以及交互工作高级分析能力传统流处理系统架构大部分传统的流处理系统被设计为连续算子模型12SparkStreaming系统架构SparkStreaming系统架构SparkStreaming系统架构SparkStrea13SparkStreaming系统架构SparkStreaming引入了一个称之为DiscretizedStreams(离散化的流数据处理)的新结构,它可以直接使用Spark引擎中丰富的库并且拥有优秀的故障容错机制。对于传统流处理中一次处理一条记录的方式而言,SparkStreaming取而代之的是将流数据离散化处理,使之能够进行秒级以下的微型批处理。与传统连续算子模型不同,传统模型是静态分配给一个节点进行计算,而SparkTask可基于数据的来源以及可用资源情况动态分配给工作节点。这能够更好的实现流处理所需要的两个特性:负载均衡2.快速故障恢复此外,Executor除了可以处理Task,还可以将数据存在Cache或者HDFS上面。SparkStreaming系统架构SparkStrea14计算流程SparkStreaming计算流程计算流程SparkStreaming计算流程15计算流程SparkStreaming首先把输入数据按照批段大小(BatchSize),如1秒,分成一段一段的数据(DiscretizedStream)每一段数据都转换成Spark中的RDD(ResilientDistributedDataset)然后将SparkStreaming中对DStream的Transformation操作变为针对Spark中对RDD的Transformation操作将RDD经过操作变成中间结果保存在内存中。计算流程SparkStreaming首先把输入数据按照批段16动态负载均衡动态负载均衡动态负载均衡动态负载均衡17动态负载均衡Spark系统将数据划分为小批量,允许对资源进行细粒度分配。例如:当输入数据流需要由一个键值来分区处理。传统流处理系统采用传统静态分配任务给节点,如果其中一个分区计算比别的更密集,那么该节点处理将会遇到性能瓶颈,同时将会减缓管道处理。在SparkStreaming中,作业任务将会动态地平衡分配给各个节点,一些节点会处理数量较少且耗时较长任务,别的节点将会处理数量更多且耗时更短的任务。动态负载均衡Spark系统将数据划分为小批量,允许对资源进行18容错性对于流式计算来说,容错性至关重要。RDD的容错机制:每一个RDD都是一个不可变的分布式可重算的数据集,其记录着确定性的操作继承关系(Lineage),所以只要输入数据是可容错的,那么任意一个RDD的分区(Partition)出错或不可用,都是可以利用原始输入数据通过转换操作而重新计算来得到。容错性对于流式计算来说,容错性至关重要。19容错性RDD的传承关系:容错性RDD的传承关系:20容错性故障恢复方式比较容错性故障恢复方式比较21实时性SparkStreaming将流式计算分解成多个SparkJob,对于每一段数据的处理都会经过SparkDAG图分解以及Spark的任务集的调度过程。对于目前版本的SparkStreaming而言,其最小的批次大小的选取在0.5~2秒钟之间,所以SparkStreaming能够满足除对实时性要求非常高(如高频实时交易)之外的所有流式准实时计算场景。实时性SparkStreaming将流式计算分解成多个Sp22扩展性和吞吐量Spark目前在EC2上已能够线性扩展到100个节点(每个节点4Core),可以以数秒的延迟处理6GB/秒的数据量,60M条记录/秒。扩展性和吞吐量Spark目前在EC2上已能够线性扩展到10023PART03编程模型DStream作为SparkStreaming的基础抽象,它代表持续性的数据流。这些数据流既可以通过外部输入源来获取,也可以通过现有的Dstream的Transformation操作来获得。PART03编程模型DStream作为SparkSt24编程模型在内部实现上,DStream由一组时间序列上连续的RDD来表示。每个RDD都包含了自己特定时间间隔内的数据流:对DStream中数据的各种操作也是映射到内部的RDD上来进行的:编程模型在内部实现上,DStream由一组时间序列上连续的R25如何使用SparkStreamingimportorg.apache.spark._importorg.apache.spark.streaming._importorg.apache.spark.streaming.StreamingContext._//创建一个拥有2个工作线程,时间片长度为1秒的StreamContext//主节点需要2核以免产生饥饿状态发生valconf=newSparkConf().setMaster("local[2]").setAppName("NetworkWordCount")valssc=newStreamingContext(conf,Seconds(1))//创建连接到hostname:port的DStreamCreate,比如localhost:9999vallines=ssc.socketTextStream("localhost",9999)如何使用SparkStreamingimportorg26如何使用SparkStreaming//把每一行分解为单词valwords=lines.flatMap(_.split(""))importorg.apache.spark.streaming.StreamingContext._//计数每一个时间片内的单词量valpairs=words.map(word=>(word,1))valwordCounts=pairs.reduceByKey(_+_)
//打印该DStream生成的每个RDD中的前10个单词wordCounts.print()ssc.start()//启动计算ssc.awaitTermination()//等待计算完成如何使用SparkStreaming//把每一行分解为27DStream的输入源在SparkStreaming中所有的操作都是基于流的,而输入源是这一系列操作的起点。输入DStream和DStream接收的流都代表输入数据流的来源,在SparkStreaming提供两种内置数据流来源:基础来源在StreamingContextAPI中直接可用的来源。例如:文件系统、Socket(套接字)连接和Akkaactors;高级来源如Kafka、Flume、Kinesis、Twitter等,可以通过额外的实用工具类创建。DStream的输入源在SparkStreaming中所28与RDD类似,DStream也提供了自己的一系列操作方法,这些操作可以分成三类:普通的转换操作、窗口转换操作和输出操作。PART04
DStream的操作与RDD类似,DStream也提供了自己的一系列操作方法,这29普通的转换操作转换描述map(func)源DStream的每个元素通过函数func返回一个新的DStream。flatMap(func)类似与map操作,不同的是每个输入元素可以被映射出0或者更多的输出元素。filter(func)在源DSTREAM上选择Func函数返回仅为true的元素,最终返回一个新的DSTREAM。repartition(numPartitions)通过输入的参数numPartitions的值来改变DStream的分区大小。union(otherStream)返回一个包含源DStream与其他DStream的元素合并后的新DSTREAMcount()对源DStream内部的所含有的RDD的元素数量进行计数,返回一个内部的RDD只包含一个元素的DStreaam。普通的转换操作转换描述map(func)源DStream的30普通的转换操作转换描述reduce(func)使用函数func(有两个参数并返回一个结果)将源DStream中每个RDD的元素进行聚合操作,返回一个内部所包含的RDD只有一个元素的新DStream。countByValue()计算DStream中每个RDD内的元素出现的频次并返回新的DStream[(K,Long)],其中K是RDD中元素的类型,Long是元素出现的频次。reduceByKey(func,[numTasks])当一个类型为(K,V)键值对的DStream被调用的时候,返回类型为类型为(K,V)键值对的新DStream,其中每个键的值V都是使用聚合函数func汇总。可以通过配置numTasks设置不同的并行任务数。join(otherStream,[numTasks])当被调用类型分别为(K,V)和(K,W)键值对的2个DStream时,返回类型为(K,(V,W))键值对的一个新DStream。普通的转换操作转换描述reduce(func)使用函数f31普通的转换操作转换描述cogroup(otherStream,[numTasks])当被调用的两个DStream分别含有(K,V)和(K,W)键值对时,返回一个(K,Seq[V],Seq[W])类型的新的DStream。transform(func)通过对源DStream的每RDD应用RDD-to-RDD函数返回一个新的DStream,这可以用来在DStream做任意RDD操作。updateStateByKey(func)返回一个新状态的DStream,其中每个键的状态是根据键的前一个状态和键的新值应用给定函数func后的更新。这个方法可以被用来维持每个键的任何状态数据。普通的转换操作转换描述cogroup(otherStrea32窗口转换操作转换描述window(windowLength,slideInterval)返回一个基于源DStream的窗口批次计算后得到新的DStream。countByWindow(windowLength,slideInterval)返回基于滑动窗口的DStream中的元素的数量。reduceByWindow(func,windowLength,slideInterval)基于滑动窗口对源DStream中的元素进行聚合操作,得到一个新的DStream。reduceByKeyAndWindow(func,windowLength,slideInterval,[numTasks])基于滑动窗口对(K,V)键值对类型的DStream中的值按K使用聚合函数func进行聚合操作,得到一个新的DStream。窗口转换操作转换描述window(windowLength33窗口转换操作转换描述reduceByKeyAndWindow(func,invFunc,windowLength,slideInterval,[numTasks])一个更高效的reduceByKkeyAndWindow()的实现版本,先对滑动窗口中新的时间间隔内数据增量聚合并移去最早的与新增数据量的时间间隔内的数据统计量。例如,计算t+4秒这个时刻过去5秒窗口的WordCount,那么我们可以将t+3时刻过去5秒的统计量加上[t+3,t+4]的统计量,在减去[t-2,t-1]的统计量,这种方法可以复用中间三秒的统计量,提高统计的效率countByValueAndWindow(windowLength,slideInterval,[numTasks])基于滑动窗口计算源DStream中每个RDD内每个元素出现的频次并返回DStream[(K,Long)],其中K是RDD中元素的类型,Long是元素频次。reduce任务的数量可以通过一个可选参数进行配置。窗口转换操作转换描述reduceByKeyAndWindow34输出操作转换描述print()在Driver中打印出DStream中数据的前10个元素。saveAsTextFiles(prefix,[suffix])将DStream中的内容以文本的形式保存为文本文件,其中每次批处理间隔内产生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。saveAsObjectFiles(prefix,[suffix])将DStream中的内容按对象序列化并且以SequenceFile的格式保存。其中每次批处理间隔内产生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。saveAsHadoopFiles(prefix,[suffix])将DStream中的内容以文本的形式保存为Hadoop文件,其中每次批处理间隔内产生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。foreachRDD(func)最基本的输出操作,将func函数应用于DStream中的RDD上,这个操作会输出数据到外部系统,比如保存RDD到文件或者网络数据库等。需要注意的是func函数是在运行该Streaming应用的Driver进程里执行的。输出操作转换描述print()在Driver中打印出DS35持久化和性能调优PART05持久化和性能调优持久化和性能调优PART05持久化和性能调优36持久化与RDD一样,DStream同样也能通过persist()方法将数据流存放在内存中,默认的持久化方式是MEMORY_ONLY_SER,也就是在内存中存放数据同时序列化的方式。这样做的好处是遇到需要多次迭代计算的程序时,速度优势十分的明显。而对于一些基于窗口的操作,如reduceByWindow、reduceByKeyAndWindow,以及基于状态的操作,如updateStateBykey,其默认的持久化策略就是保存在内存中。对于来自网络的数据源(Kafka、Flume、sockets等),默认的持久化策略是将数据保存在两台机器上,这也是为了容错性而设计的。持久化与RDD一样,DStream同样也能通过persist37性能调优性能优化从大方面来分可以分为两种:优化运行时间增加并行度减少数据序列化,反序列化的负担设置合理的批处理时间减少因任务提交和分发所带来的负担优化内存使用控制批处理间隔内的数据量及时清理不再使用的数据观察及适当调整GC策略性能调优性能优化从大方面来分可以分为两种:38性能调优优化运行时间——增加并行度确保使用整个集群的资源,而不是把任务集中在几个特定的节点上。对于包含shuffle的操作,增加其并行度以确保更为充分地使用集群资源。优化运行时间——减少数据序列化,反序列化的负担序列化和反序列化需要更多的CPU时间,因此更加高效的序列化方式(Kryo)和自定义的系列化接口可以更高效地使用CPU。性能调优优化运行时间——增加并行度优化运行时间——减少数据序39性能调优优化运行时间——设置合理的批处理时间在SparkStreaming中,Job之间有可能存在依赖关系,后面的Job必须确保前面的作业执行结束后才能提交。若前面的Job执行的时间超出了批处理时间间隔,那么后面的Job就无法按时提交,这样就会进一步拖延接下来的Job,造成后续Job的阻塞。因此设置一个合理的批处理间隔以确保作业能够在这个批处理间隔内结束时必须的。优化运行时间——减少因任务提交和分发所带来的负担通常情况下,Akka框架能够高效地确保任务及时分发,但是当批处理间隔非常小(500ms)时,提交和分发任务的延迟就变得不可接受了。性能调优优化运行时间——设置合理的批处理时间优化运行时间——40性能调优优化内存使用——控制批处理间隔内的数据量SparkStreaming会把批处理间隔内接收到的所有数据存放在Spark内部的可用内存区域中,因此必须确保当前节点Spark的可用内存中能容纳这个批处理时间间隔内的所有数据,否则必须增加新的资源以提高集群的处理能力。优化内存使用——及时清理不再使用的数据对于处理过的不再需要的数据应及时清理,以确保SparkStreaming有富余的可用内存空间。性能调优优化内存使用——控制批处理间隔内的数据量优化内存使用41性能调优优化内存使用——观察及适当调整GC策略GC会影响Job的正常运行,可能延长Job的执行时间,引起一系列不可预料的问题。观察GC的运行情况,采用不同的GC策略以进一步减小内存回收对Job运行的影响。性能调优优化内存使用——观察及适当调整GC策略42编程实战PART06编程实战编程实战PART06编程实战43流数据模拟器在实例演示中模拟实际情况,需要源源不断地接入流数据,为了在演示过程中更接近真实环境,首先需要定义流数据模拟器。该模拟器主要功能是通过Socket方式监听指定的端口号,当外部程序通过该端口连接并请求数据时,模拟器将定时将指定的文件数据随机获取发送给外部程序。importjava.io.{PrintWriter}.ServerSocketimportscala.io.Source
objectStreamingSimulation{
//定义随机获取整数的方法
defindex(length:Int)={importjava.util.Randomvalrdm=newRandomrdm.nextInt(length)}
defmain(args:Array[String]){
//调用该模拟器需要三个参数,分别为文件路径、端口号和间隔时间(单位:毫秒)
if(args.length!=3){System.err.println("Usage:<filename><port><millisecond>")System.exit(1)}
//获取指定文件总的行数
valfilename=args(0)vallines=Source.fromFile(filename).getLines.toListvalfilerow=lines.length
//指定监听某端口,当外部程序请求时建立连接
vallistener=newServerSocket(args(1).toInt)while(true){valsocket=listener.accept()newThread(){overridedefrun={println("Gotclientconnectedfrom:"+socket.getInetAddress)valout=newPrintWriter(socket.getOutputStream(),true)while(true){Thread.sleep(args(2).toLong)
//当该端口接受请求时,随机获取某行数据发送给对方
valcontent=lines(index(filerow))println(content)out.write(content+'\n')out.flush()}socket.close()}}.start()}}}流数据模拟器在实例演示中模拟实际情况,需要源源不断地接入流数44实例1:读取文件演示在该实例中SparkStreaming将监控某目录中的文件,获取在间隔时间段内变化的数据,然后通过SparkStreaming计算出该时间段内单词统计数。importorg.apache.spark.SparkConfimportorg.apache.spark.streaming.{Seconds,StreamingContext}importorg.apache.spark.streaming.StreamingContext._
objectFileWordCount{defmain(args:Array[String]){valsparkConf=newSparkConf().setAppName("FileWordCount").setMaster("local[2]")
//创建Streaming的上下文,包括Spark的配置和时间间隔,这里时间为间隔20秒
valssc=newStreamingContext(sparkConf,Seconds(20))
//指定监控的目录,在这里为/home/hadoop/temp/vallines=ssc.textFileStream("/home/hadoop/temp/")
//对指定文件夹变化的数据进行单词统计并且打印
valwords=lines.flatMap(_.split(""))valwordCounts=words.map(x=>(x,1)).reduceByKey(_+_)wordCounts.print()
//启动Streamingssc.start()ssc.awaitTermination()}}实例1:读取文件演示在该实例中SparkStreaming45实例2:网络数据演示在该实例中将由流数据模拟以1秒的频度发送模拟数据,SparkStreaming通过Socket接收流数据并每20秒运行一次用来处理接收到数据,处理完毕后打印该时间段内数据出现的频度,即在各处理段时间之间状态并无关系。importorg.apache.spark.{SparkContext,SparkConf}importorg.apache.spark.streaming.{Milliseconds,Seconds,StreamingContext}importorg.apache.spark.streaming.StreamingContext._importorg.apache.spark.storage.StorageLevel
objectNetworkWordCount{defmain(args:Array[String]){valconf=newSparkConf().setAppName("NetworkWordCount").setMaster("local[2]")valsc=newSparkContext(conf)valssc=newStreamingContext(sc,Seconds(20))
//通过Socket获取数据,需要提供Socket的主机名和端口号,数据保存在内存和硬盘中
vallines=ssc.socketTextStream(args(0),args(1).toInt,StorageLevel.MEMORY_AND_DISK_SER)
//对读入的数据进行分割、计数
valwords=lines.flatMap(_.split(","))valwordCounts=words.map(x=>(x,1)).reduceByKey(_+_)wordCounts.print()ssc.start()ssc.awaitTermination()}}实例2:网络数据演示在该实例中将由流数据模拟以1秒的频度发送46实例3:Stateful演示该实例为SparkStreaming状态操作,模拟数据由流数据模拟以1秒的频度发送,SparkStreaming通过Socket接收流数据并每5秒运行一次用来处理接收到数据,处理完毕后打印程序启动后单词出现的频度,相比较实例2,在该实例中各时间段之间状态是相关的。importorg.apache.log4j.{Level,Logger}importorg.apache.spark.{SparkContext,SparkConf}importorg.apache.spark.streaming.{Seconds,StreamingContext}importorg.apache.spark.streaming.StreamingContext._
objectStatefulWordCount{defmain(args:Array[String]){if(args.length!=2){System.err.println("Usage:StatefulWordCount<filename><port>")System.exit(1)}Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
//定义更新状态方法,参数values为当前批次单词频度,state为以往批次单词频度
valupdateFunc=(values:Seq[Int],state:Option[Int])=>{valcurrentCount=values.foldLeft(0)(_+_)valpreviousCount=state.getOrElse(0)Some(currentCount+previousCount)}valconf=newSparkConf().setAppName("StatefulWordCount").setMaster("local[2]")valsc=newSparkContext(conf)
//创建StreamingContext,SparkSteaming运行时间间隔为5秒
valssc=newStreamingContext(sc,Seconds(5))
//定义checkpoint目录为当前目录
ssc.checkpoint(".")
//获取从Socket发送过来数据
vallines=ssc.socketTextStream(args(0),args(1).toInt)valwords=lines.flatMap(_.split(","))valwordCounts=words.map(x=>(x,1))//使用updateStateByKey来更新状态,统计从运行开始以来单词总的次数
valstateDstream=wordCounts.updateStateByKey[Int](updateFunc)stateDstream.print()ssc.start()ssc.awaitTermination()}}实例3:Stateful演示该实例为SparkStream47实例4:窗口演示该实例为SparkStreaming窗口操作,模拟数据由流数据模拟以1秒的频度发送,SparkStreaming通过Socket接收流数据并每10秒运行一次用来处理接收到数据,处理完毕后打印程序启动后单词出现的频度。相比前面的实例,SparkStreaming窗口统计是通过reduceByKeyAndWindow()方法实现的,在该方法中需要指定窗口时间长度和滑动时间间隔。实例4:窗口演示该实例为SparkStreaming窗口操48实例4:窗口演示importorg.apache.log4j.{Level,Logger}importorg.apache.spark.{SparkContext,SparkConf}importorg.apache.spark.storage.StorageLevelimportorg.apache.spark.streaming._importorg.apache.spark.streaming.StreamingContext._
objectWindowWordCount{defmain(args:Array[String]){if(args.length!=4){System.err.println("Usage:WindowWorldCount<filename><port><windowDuration><slideDuration>")System.exit(1)}Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)valconf=newSparkConf().setAppName("WindowWordCount").setMaster("local[2]")valsc=newSparkContext(conf)
//创建StreamingContextvalssc=newStreamingContext(sc,Seconds(5))
//定义checkpoint目录为当前目录
ssc.checkpoint(".")
//通过Socket获取数据,需提供Socket的主机名和端口号,数据保存在内存和硬盘中
vallines=ssc.socketTextStream(args(0),args(1).toInt,StorageLevel.MEMORY_ONLY_SER)valwords=lines.flatMap(_.split(","))
//windows操作,第一种方式为叠加处理,第二种方式为增量处理
valwordCounts=words.map(x=>(x,1)).reduceByKeyAndWindow((a:Int,b:Int)=>(a+b),Seconds(args(2).toInt),Seconds(args(3).toInt))//valwordCounts=words.map(x=>(x,1)).reduceByKeyAndWindow(_+_,_-_,Seconds(args(2).toInt),Seconds(args(3).toInt))wordCounts.print()ssc.start()ssc.awaitTermination()}}实例4:窗口演示importorg.apache.log449PART07作业
PART07作业50作业作业:什么是SparkStreaming?请画图描述并解释SparkStreaming的内部处理机制和工作原理。请解释下列与SparkStreaming相关的术语:离散流(DStream);批数据(BatchData);时间片(BatchInterval);窗口长度(WindowLength);滑动时间间隔(SlideInterval)传统流处理系统架构的主要缺陷是什么?请描述SparkStreaming的系统架构,并进行解释。请画图描述并解释SparkStreaming的计算流程。请阐述SparkStreaming的容错能力。作业作业:什么是SparkStreaming?51作业作业:请描述DStream中的数据操作流程。DStream的输入源有哪些,请举例说明。请描述下述DStream普通转换操作的作用:window(windowLength,slideInterval);transform(func);updateByKey(func)请描述下述DStream窗口转换操作的作用:map(func);flatMap(func);union(otherStream);countByWindow(windowLength,slideInterval);reduceByWindow(func,windowLength,slideInterval);reduceByKeyAndWindow(func,windowLength,slideInterval);reduceByKeyAndWindow(func,invFunc,windowLength,slideInterval);countByValueAndWindow(windowLength,slideInterval,[numTasks]);作业作业:请描述DStream中的数据操作流程。52作业作业:假定图中批处理的时间间隔是1秒钟,请回答窗口间隔是多长,滑动间隔是多长。作业作业:假定图中批处理的时间间隔是1秒钟,请回答窗口间隔是53作业作业:请编写完成求单词计数的任务。SparkStreaming将监控某目录中的文件,获取在间隔时间段内变化的数据,然后通过SparkStreaming计算出改时间段内单词统计数。作业作业:请编写完成求单词计数的任务。SparkStrea54谢谢FORYOURLISTENING谢谢FORYOURLISTENING55
大数据导论第十章大数据导论第十章56CONTENTS目录PART01SparkStreaming简介PART02SparkStreaming的执行模型PART03编程模型PART04DStream的操作PART05持久化和性能调优PART06编程实战PART07作业CONTENTS目录PART01SparkStrea57PART01SparkStreaming简介SparkStreaming是Spark核心API的一个扩展,可以实现高吞吐量的、具备容错机制的实时流数据的处理。PART01SparkStreaming简介Spa58概述SparkStreaming是Spark核心API的一个扩展,可以实现高吞吐量的、具备容错机制的实时流数据的处理。支持从多种数据源获取数据,再使用高级函数进行复杂算法的处理。最后还可以将处理结果存储到文件系统,数据库和现场仪表盘。SparkStreaming数据流:概述SparkStreaming是Spark核心API的一59概述SparkStreaming工作原理:接收实时的输入数据流根据一定的时间间隔(比如1秒钟)拆分成一批批的数据然后通过Spark引擎处理这些批数据最终得到处理后的一批批结果数据概述SparkStreaming工作原理:60概述SparkStreaming支持一个高层的抽象,叫做离散流(DiscretizedStream)或者DStream,它代表连续的数据流。在内部,DStream是由一系列RDD组成。对应的批数据,在Spark内核对应一个RDD实例。因此,对应流数据的DStream可以看成是一组RDD,即RDD的一个序列。概述SparkStreaming支持一个高层的抽象,叫做离61术语定义离散流(DiscretizedStream)或DStream这是SparkStreaming对内部持续的实时数据流的抽象描述,即处理的一个实时数据流,在SparkStreaming中对应于一个DStream实例。批数据(BatchData)
是化整为零的第一步。将实时流数据以时间片为单位进行分批,将流处理转化为时间片数据的批处理。随着持续时间的推移,这些处理结果就形成了对应的结果数据流了。术语定义离散流(DiscretizedStream)或D62术语定义时间片或批处理时间间隔(BatchInterval)这是人为地对流数据进行定量的标准,以时间片作为拆分流数据的依据。一个时间片的数据对应一个RDD实例。窗口长度(WindowLength)一个窗口覆盖的流数据的时间长度。必须是批处理时间间隔的倍数。术语定义时间片或批处理时间间隔(BatchInterval63术语定义滑动时间间隔前一个窗口到后一个窗口所经过的时间长度。必须是批处理时间间隔的倍数。InputDStream一个InputDStream是一个特殊的DStream,将SparkStreaming连接到一个外部数据源来读取数据。术语定义滑动时间间隔InputDStream64PART02SparkStreaming的执行模型SparkStreaming是一个对实时数据流进行高通量、容错处理的流式处理系统,可以对多种数据源进行类似Map、Reduce等复杂操作,并将结果保存到外部文件系统、数据库或应用到实时仪表盘。PART02SparkStreaming的执行模型65传统流处理系统架构传统流处理系统架构传统流处理系统架构传统流处理系统架构66传统流处理系统架构大部分传统的流处理系统被设计为连续算子模型。系统包含一系列的工作节点,每组节点运行一至多个连续算子;对于流数据,每个连续算子一次处理一条记录,并且将记录传输给管道中别的算子;源算子(SourceOperator)从采集系统接收数据,接着沉算子(SinkOperator)输出到下游系统。连续算子是一种较为简单、自然的模型。然而,在大数据时代,这个传统的架构也面临着严峻的挑战。故障恢复问题2.负载均衡问题支持统一的流处理与批处理以及交互工作高级分析能力传统流处理系统架构大部分传统的流处理系统被设计为连续算子模型67SparkStreaming系统架构SparkStreaming系统架构SparkStreaming系统架构SparkStrea68SparkStreaming系统架构SparkStreaming引入了一个称之为DiscretizedStreams(离散化的流数据处理)的新结构,它可以直接使用Spark引擎中丰富的库并且拥有优秀的故障容错机制。对于传统流处理中一次处理一条记录的方式而言,SparkStreaming取而代之的是将流数据离散化处理,使之能够进行秒级以下的微型批处理。与传统连续算子模型不同,传统模型是静态分配给一个节点进行计算,而SparkTask可基于数据的来源以及可用资源情况动态分配给工作节点。这能够更好的实现流处理所需要的两个特性:负载均衡2.快速故障恢复此外,Executor除了可以处理Task,还可以将数据存在Cache或者HDFS上面。SparkStreaming系统架构SparkStrea69计算流程SparkStreaming计算流程计算流程SparkStreaming计算流程70计算流程SparkStreaming首先把输入数据按照批段大小(BatchSize),如1秒,分成一段一段的数据(DiscretizedStream)每一段数据都转换成Spark中的RDD(ResilientDistributedDataset)然后将SparkStreaming中对DStream的Transformation操作变为针对Spark中对RDD的Transformation操作将RDD经过操作变成中间结果保存在内存中。计算流程SparkStreaming首先把输入数据按照批段71动态负载均衡动态负载均衡动态负载均衡动态负载均衡72动态负载均衡Spark系统将数据划分为小批量,允许对资源进行细粒度分配。例如:当输入数据流需要由一个键值来分区处理。传统流处理系统采用传统静态分配任务给节点,如果其中一个分区计算比别的更密集,那么该节点处理将会遇到性能瓶颈,同时将会减缓管道处理。在SparkStreaming中,作业任务将会动态地平衡分配给各个节点,一些节点会处理数量较少且耗时较长任务,别的节点将会处理数量更多且耗时更短的任务。动态负载均衡Spark系统将数据划分为小批量,允许对资源进行73容错性对于流式计算来说,容错性至关重要。RDD的容错机制:每一个RDD都是一个不可变的分布式可重算的数据集,其记录着确定性的操作继承关系(Lineage),所以只要输入数据是可容错的,那么任意一个RDD的分区(Partition)出错或不可用,都是可以利用原始输入数据通过转换操作而重新计算来得到。容错性对于流式计算来说,容错性至关重要。74容错性RDD的传承关系:容错性RDD的传承关系:75容错性故障恢复方式比较容错性故障恢复方式比较76实时性SparkStreaming将流式计算分解成多个SparkJob,对于每一段数据的处理都会经过SparkDAG图分解以及Spark的任务集的调度过程。对于目前版本的SparkStreaming而言,其最小的批次大小的选取在0.5~2秒钟之间,所以SparkStreaming能够满足除对实时性要求非常高(如高频实时交易)之外的所有流式准实时计算场景。实时性SparkStreaming将流式计算分解成多个Sp77扩展性和吞吐量Spark目前在EC2上已能够线性扩展到100个节点(每个节点4Core),可以以数秒的延迟处理6GB/秒的数据量,60M条记录/秒。扩展性和吞吐量Spark目前在EC2上已能够线性扩展到10078PART03编程模型DStream作为SparkStreaming的基础抽象,它代表持续性的数据流。这些数据流既可以通过外部输入源来获取,也可以通过现有的Dstream的Transformation操作来获得。PART03编程模型DStream作为SparkSt79编程模型在内部实现上,DStream由一组时间序列上连续的RDD来表示。每个RDD都包含了自己特定时间间隔内的数据流:对DStream中数据的各种操作也是映射到内部的RDD上来进行的:编程模型在内部实现上,DStream由一组时间序列上连续的R80如何使用SparkStreamingimportorg.apache.spark._importorg.apache.spark.streaming._importorg.apache.spark.streaming.StreamingContext._//创建一个拥有2个工作线程,时间片长度为1秒的StreamContext//主节点需要2核以免产生饥饿状态发生valconf=newSparkConf().setMaster("local[2]").setAppName("NetworkWordCount")valssc=newStreamingContext(conf,Seconds(1))//创建连接到hostname:port的DStreamCreate,比如localhost:9999vallines=ssc.socketTextStream("localhost",9999)如何使用SparkStreamingimportorg81如何使用SparkStreaming//把每一行分解为单词valwords=lines.flatMap(_.split(""))importorg.apache.spark.streaming.StreamingContext._//计数每一个时间片内的单词量valpairs=words.map(word=>(word,1))valwordCounts=pairs.reduceByKey(_+_)
//打印该DStream生成的每个RDD中的前10个单词wordCounts.print()ssc.start()//启动计算ssc.awaitTermination()//等待计算完成如何使用SparkStreaming//把每一行分解为82DStream的输入源在SparkStreaming中所有的操作都是基于流的,而输入源是这一系列操作的起点。输入DStream和DStream接收的流都代表输入数据流的来源,在SparkStreaming提供两种内置数据流来源:基础来源在StreamingContextAPI中直接可用的来源。例如:文件系统、Socket(套接字)连接和Akkaactors;高级来源如Kafka、Flume、Kinesis、Twitter等,可以通过额外的实用工具类创建。DStream的输入源在SparkStreaming中所83与RDD类似,DStream也提供了自己的一系列操作方法,这些操作可以分成三类:普通的转换操作、窗口转换操作和输出操作。PART04
DStream的操作与RDD类似,DStream也提供了自己的一系列操作方法,这84普通的转换操作转换描述map(func)源DStream的每个元素通过函数func返回一个新的DStream。flatMap(func)类似与map操作,不同的是每个输入元素可以被映射出0或者更多的输出元素。filter(func)在源DSTREAM上选择Func函数返回仅为true的元素,最终返回一个新的DSTREAM。repartition(numPartitions)通过输入的参数numPartitions的值来改变DStream的分区大小。union(otherStream)返回一个包含源DStream与其他DStream的元素合并后的新DSTREAMcount()对源DStream内部的所含有的RDD的元素数量进行计数,返回一个内部的RDD只包含一个元素的DStreaam。普通的转换操作转换描述map(func)源DStream的85普通的转换操作转换描述reduce(func)使用函数func(有两个参数并返回一个结果)将源DStream中每个RDD的元素进行聚合操作,返回一个内部所包含的RDD只有一个元素的新DStream。countByValue()计算DStream中每个RDD内的元素出现的频次并返回新的DStream[(K,Long)],其中K是RDD中元素的类型,Long是元素出现的频次。reduceByKey(func,[numTasks])当一个类型为(K,V)键值对的DStream被调用的时候,返回类型为类型为(K,V)键值对的新DStream,其中每个键的值V都是使用聚合函数func汇总。可以通过配置numTasks设置不同的并行任务数。join(otherStream,[numTasks])当被调用类型分别为(K,V)和(K,W)键值对的2个DStream时,返回类型为(K,(V,W))键值对的一个新DStream。普通的转换操作转换描述reduce(func)使用函数f86普通的转换操作转换描述cogroup(otherStream,[numTasks])当被调用的两个DStream分别含有(K,V)和(K,W)键值对时,返回一个(K,Seq[V],Seq[W])类型的新的DStream。transform(func)通过对源DStream的每RDD应用RDD-to-RDD函数返回一个新的DStream,这可以用来在DStream做任意RDD操作。updateStateByKey(func)返回一个新状态的DStream,其中每个键的状态是根据键的前一个状态和键的新值应用给定函数func后的更新。这个方法可以被用来维持每个键的任何状态数据。普通的转换操作转换描述cogroup(otherStrea87窗口转换操作转换描述window(windowLength,slideInterval)返回一个基于源DStream的窗口批次计算后得到新的DStream。countByWindow(windowLength,slideInterval)返回基于滑动窗口的DStream中的元素的数量。reduceByWindow(func,windowLength,slideInterval)基于滑动窗口对源DStream中的元素进行聚合操作,得到一个新的DStream。reduceByKeyAndWindow(func,windowLength,slideInterval,[numTasks])基于滑动窗口对(K,V)键值对类型的DStream中的值按K使用聚合函数func进行聚合操作,得到一个新的DStream。窗口转换操作转换描述window(windowLength88窗口转换操作转换描述reduceByKeyAndWindow(func,invFunc,windowLength,slideInterval,[numTasks])一个更高效的reduceByKkeyAndWindow()的实现版本,先对滑动窗口中新的时间间隔内数据增量聚合并移去最早的与新增数据量的时间间隔内的数据统计量。例如,计算t+4秒这个时刻过去5秒窗口的WordCount,那么我们可以将t+3时刻过去5秒的统计量加上[t+3,t+4]的统计量,在减去[t-2,t-1]的统计量,这种方法可以复用中间三秒的统计量,提高统计的效率countByValueAndWindow(windowLength,slideInterval,[numTasks])基于滑动窗口计算源DStream中每个RDD内每个元素出现的频次并返回DStream[(K,Long)],其中K是RDD中元素的类型,Long是元素频次。reduce任务的数量可以通过一个可选参数进行配置。窗口转换操作转换描述reduceByKeyAndWindow89输出操作转换描述print()在Driver中打印出DStream中数据的前10个元素。saveAsTextFiles(prefix,[suffix])将DStream中的内容以文本的形式保存为文本文件,其中每次批处理间隔内产生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。saveAsObjectFiles(prefix,[suffix])将DStream中的内容按对象序列化并且以SequenceFile的格式保存。其中每次批处理间隔内产生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。saveAsHadoopFiles(prefix,[suffix])将DStream中的内容以文本的形式保存为Hadoop文件,其中每次批处理间隔内产生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。foreachRDD(func)最基本的输出操作,将func函数应用于DStream中的RDD上,这个操作会输出数据到外部系统,比如保存RDD到文件或者网络数据库等。需要注意的是func函数是在运行该Streaming应用的Driver进程里执行的。输出操作转换描述print()在Driver中打印出DS90持久化和性能调优PART05持久化和性能调优持久化和性能调优PART05持久化和性能调优91持久化与RDD一样,DStream同样也能通过persist()方法将数据流存放在内存中,默认的持久化方式是MEMORY_ONLY_SER,也就是在内存中存放数据同时序列化的方式。这样做的好处是遇到需要多次迭代计算的程序时,速度优势十分的明显。而对于一些基于窗口的操作,如reduceByWindow、reduceByKeyAndWindow,以及基于状态的操作,如updateStateBykey,其默认的持久化策略就是保存在内存中。对于来自网络的数据源(Kafka、Flume、sockets等),默认的持久化策略是将数据保存在两台机器上,这也是为了容错性而设计的。持久化与RDD一样,DStream同样也能通过persist92性能调优性能优化从大方面来分可以分为两种:优化运行时间增加并行度减少数据序列化,反序列化的负担设置合理的批处理时间减少因任务提交和分发所带来的负担优化内存使用控制批处理间隔内的数据量及时清理不再使用的数据观察及适当调整GC策略性能调优性能优化从大方面来分可以分为两种:93性能调优优化运行时间——增加并行度确保使用整个集群的资源,而不是把任务集中在几个特定的节点上。对于包含shuffle的操作,增加其并行度以确保更为充分地使用集群资源。优化运行时间——减少数据序列化,反序列化的负担序列化和反序列化需要更多的CPU时间,因此更加高效的序列化方式(Kryo)和自定义的系列化接口可以更高效地使用CPU。性能调优优化运行时间——增加并行度优化运行时间——减少数据序94性能调优优化运行时间——设置合理的批处理时间在SparkStreaming中,Job之间有可能存在依赖关系,后面的Job必须确保前面的作业执行结束后才能提交。若前面的Job执行的时间超出了批处理时间间隔,那么后面的Job就无法按时提交,这样就会进一步拖延接下来的Job,造成后续Job的阻塞。因此设置一个合理的批处理间隔以确保作业能够在这个批处理间隔内结束时必须的。
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 2026年天津职业大学单招职业适应性测试题库有答案解析
- 2025年新疆中国铁路乌鲁木齐局集团有限公司招聘普通高校毕业生11人三笔试参考题库附带答案详解
- 2025年度陕西陕煤韩城矿业有限公司高校毕业生招聘88人笔试参考题库附带答案详解
- 2025年度北京建工路桥集团有限公司校园招聘笔试参考题库附带答案详解
- 2025年国网河北省电力有限公司高校毕业生招聘225人(第二批)笔试参考题库附带答案详解
- 未来五年雕塑保护市场需求变化趋势与商业创新机遇分析研究报告
- 未来五年柳编橱柜市场需求变化趋势与商业创新机遇分析研究报告
- 未来五年门球项目组织与服务行业市场营销创新战略制定与实施分析研究报告
- 未来五年泥瓦工维修服务市场需求变化趋势与商业创新机遇分析研究报告
- 2025年国际残疾人日宣传方案
- 2026年山东商务职业学院综合评价招生《素质测试》模拟试题及答案(一)
- 幼儿园安全管理考核细则及执行方案
- 《烧伤外科诊疗指南及操作规范(2025版)》
- 法学基础理论考试试题及答案
- 2026春季学期教务处工作计划(小学学校)
- 2025eber原位杂交检测技术专家共识解读 (1)课件
- 西点实训室安全教育培训课件
- 威尔第课件教学课件
- 【北师大版】六年级下册数学教案-总复习《图形与位置》
- 2026年抖音小店开店运营实操指南
- 小学象棋校本课程
评论
0/150
提交评论