版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
项目6SparkStreaming学习目标(1)(2)(3)学习SparkStreaming的概念。学习SparkStreaming的基础知识。应用SparkStreaming进行流处理操作。学习目标任务1初识SparkStreaming6.1.1SparkStreaming概述SparkStreaming是SparkAPI核心的扩展内容,除了SparkAPI内部的API和函数,Spark项目还包含另一个主要的子项目SparkStreaming。它是一套并发流处理库,可实现对实时数据流的可扩展、高吞吐量和容错流处理。数据流是一种连续的数据记录。与SparkStreaming函数或处理相关的输入数据流可以来自不同的消息队列系统,如Kafka、Flume、Kinesis、ZeroMQ等,并且输入数据也可以来自传统的TCP套接字。看这里!6.1.1SparkStreaming概述从消息队列中获取数据流以后,可以使用高级复杂的算法进行处理,这些算法函数包括map()、reduce()、join()、windows()等,这些算法通常应用于机器学习、图像处理算法等领域。最后,处理后的数据可以存储到文件系统(HDFS)、数据库(Databases)和实时仪表板(Dashboards)中,如图6-1所示。图6-1SparkStreaming的输入与输出6.1.2SparkStreaming的运行原理一般情况下,处理数据流有两种通用的方法:单独处理每条记录,并在记录出现时就立刻处理;第1种根据记录数量或时间长度将多个记录组合切分为小批量任务。SparkStreaming使用的是第2种方法。第2种6.1.2SparkStreaming的运行原理在SparkStreaming内部工作机制中,SparkStreaming接收实时输入数据流,并将其分成批量输入数据任务,然后由Spark引擎处理,输入数据批生成最终批量数据流,这些批量数据流是指一个小批的作业序列,每个小批量作业表示为SparkRDD,在内部DStream表示为一系列SparkRDD,如图6-2所示。图6-2SparkStreaming数据处理过程6.1.2SparkStreaming的运行原理图6-3SparkStreaming高层次架构SparkStreaming使用“微批次”的架构,把流式计算当作一系列连续的小规模批处理来对待。SparkStreaming从各种输入源中读取数据,并把数据分组为小的批次。新的批次按均匀的时间间隔创建出来。在每个时间区间开始时,一个新的批次就被创建出来,在该区间内收到的数据都会被添加到这个批次中。在时间区间结束时,批次停止增长。处理的结果可以以批处理的方式传给外部系统,如图6-3所示。6.1.3SparkStreaming快速体验案例实训方法的步骤大概分为以下几步。(1)(2)(3)定义输入源。创建StreamingContext对象,这是流处理的关键点。指定流计算需要的指令(间隔时间,划分RDD规则,处理数据规则)。(4)调用StreamingContext的start()方法开始接收并处理数据。(5)处理过程会一直持续到StreamingContext调用stop()方法。6.1.3SparkStreaming快速体验案例(1)创建StreamingContext对象在以下代码中,对于输入数据流使用两个执行线程,并且输入数据流进行时间间隔为1秒的批处理,从而创建一个StreamingContext,将它作为流处理的关键入口点。创建StreamingContext时需要传入SparkConf(Spark配置对象)和批处理间隔时长,也可以使用已经创建好的SparkContext生成新StreamingContext对象。importorg.apache.spark._importorg.apache.spark.streaming._importorg.apache.spark.streaming.StreamingContext._//notnecessarysinceSpark1.3//配置Spark参数,设置使用Master主节点,配置两个核心valconf=newSparkConf().setMaster("local[2]").setAppName("NetworkWordCount")valssc=newStreamingContext(conf,Seconds(1))6.1.3SparkStreaming快速体验案例(2)创建用来表示TCP套接字输入的数据流DStream对象,数据流是连接localhost:9999获取的。上述代码已经创建好了StreamingContext,StreamingContext是用来提供从输入源创建DStream的方法的切入点,然后使用StreamingContext对象的socketTextStream()方法获得指定网络源的输入流。vallines=ssc.socketTextStream("localhost",9999)6.1.3SparkStreaming快速体验案例(2)创建用来表示TCP套接字输入的数据流DStream对象,数据流是连接localhost:9999获取的。提示!!socketTextStream()方法用来从指定的网络源上创建一个输入流,官方API给出的解释如图6-4所示。图6-4socketTextStream()方法文档6.1.3SparkStreaming快速体验案例(3)上述代码中的lines可以理解为从数据服务器接收到的数据流,但是这个数据流中有很多条数据,每一条数据都是一行文本,然后为了处理方便,使用flatmap操作将ReceiverInputDStream转换为可以处理单词的另一种数据形式DStream[U]。flatmap是一种DStream操作,很多DStream可以通过使用flatmap来生成多个新数据,以此来创建新的DStream。转换的方法是使用flatMap()方法,“_”表示所有内容,按照空格分割为单词,从而生成新的DStream[U],代码如下。valwords=lines.flatMap(_.split(""))6.1.3SparkStreaming快速体验案例(4)输入数据流已经被分离成以单词为单位的离散流,与Spark核心部分中RDD提取的操作接口类似,使用map和reduceByKey操作将word进一步映射为(word,1)形式的DStream,然后统计单词的个数并将其输出。//计算每一批数据的单词数量valpairs=words.map(word=>(word,1))valwordCounts=pairs.reduceByKey(_+_)wordCounts.print()6.1.3SparkStreaming快速体验案例(5)至此,代码编写成功。为了能够在Spark环境中运行代码,需要对该代码进行编译打包工作。常用的编译打包工具包括SBT、Maven等。这里使用SBT将上述编写好的Scala代码进行编译打包操作,详细操作如下:单击Terminal按钮,然后进入IDEA的Terminal界面,在该界面中使用SBT工具的基本命令“sbtpackage”,将代码编译成jar包。第一次编译的过程会比较慢,如图6-5所示。图6-5使用SBT编译jar包6.1.3SparkStreaming快速体验案例图6-6编译成功当出现success的字样后,说明编译成功,同时提示“packagingC:\\Users\\an\\......\\target\\scala-2.11\\wordcount_2.11-0.1.jar”,这说明已经在该目录下编译出了jar包,可以在该目录下查看是否有wordcount_2.11-0.1.jar包,如图6-6和图6-7所示。图6-7编译好的jar包6.1.3SparkStreaming快速体验案例(6)在Spark环境下提交运行编译好的jar包。首先,使用netcat作为数据服务器运行,运行的命令为“nc-lk9999”。然后,打开另一个终端窗口,将SBT打包好的jar包复制到指定的目录下,这里复制到/home/workspace下,然后,使用Spark环境提供的spark-submit脚本进行提交操作,Spark脚本在目录“/usr/spark/spark-2.4.0-bin-hadoop2.7/bin”下。6.1.3SparkStreaming快速体验案例使用以下命令将应用进行提交运行,同时可以看到终端界面在不断地更新RDD,如果这段时间没有输入内容,那么输出就为空。./spark-submit\--class"SparkTest"\--masterlocal[2]/home/workspace/com.spark-1.0.jar在netcat数据服务器界面输入“helloworldhellotom”,在运行代码的终端界面可以看到出现了单词的统计数量,如图6-8和图6-9所示。图6-8数据服务器终端输入图6-9Spark统计界面输出任务2理解SparkStreaming6.2.1DStream简介DiscretizedStream(离散流)或DStream是SparkStreaming对流式数据的基本抽象。它表示连续的数据流,这些连续的数据流可以是从数据源接收的输入数据流,也可以是通过对输入数据流执行转换操作而生成的经过处理的数据流。它是一串RDD序列,每个RDD代表数据流中的一个时间片内的数据。如图6-10所示,DStream中的每个RDD都包含一定时间间隔内的数据,时间间隔的大小是由批处理间隔这个参数决定的。批处理间隔一般为500ms到几秒,由应用开发者配置。每个输入批次都形成一个RDD,以Spark作业的方式处理并生成其他RDD。图6-10DStream(一个持续的RDD序列)6.2.1DStream简介可以从外部输入源创建DStream,也可以对其他DStream应用进行转化操作得到新的DStream。DStream支持许多Spark的RDD支持的转化操作。这些底层RDD转换由Spark引擎计算。DStream操作隐藏了大部分细节,并为开发人员提供了更高级别的API以方便使用,如图6-11所示。图6-11DSteam转换图6.2.2DStream接收输入源方法SparkStreaming提供了两类内置流媒体源。StreamingContext中提供了可对接的API,通过调用API可以直接读取数据。例如,文件系统和TCP套接字连接。(1)基本来源:在未提供API的情况下读取资源。例如,Kafka、Flume、Kinesis等资源可通过额外的实用程序获得。(2)高级资源:6.2.2DStream接收输入源方法对于已经提供了API的基本输入源,其常用接收函数如表6-1所示。表6-1输入源常用接收函数6.2.2DStream接收输入源方法SparkStreaming会监控dataDirectory目录,并处理该目录下生成的任何文件,但是需要注意以下几点。当文件进入目录中,该文件将不会发生改变,对于窗口文件中的更改将不会重新读取文件,会忽略更新。该目录下所有的文件必须使用相同的数据格式。(1)(3)(2)若使用通配符来标识目录(如hdfs://namenode:8040/logs/2016-*),重命名整个目录以匹配路径,则会将该目录添加到受监视目录列表中。6.2.3DStream转换操作DStream支持很多转换操作,它可以对RDD进行各种转换,因为DStream是由RDD组成的,SparkStreaming提供了可以在DSteam上使用的转换集合,这些转换操作应用于DStream中的每个RDD的每个元素上,这些常用的转换操作见表6-2。表6-2DStream常用转换函数6.2.3DStream转换操作表6-2DStream常用转换函数6.2.3DStream转换操作updateStateByKey操作允许在使用新信息不断更新时保持任意状态。要使用它,必须执行以下两个步骤。(1)定义状态,状态可以是任意的数据类型。(2)定义状态更新函数,用一个函数指定如何使用先前的状态和从输入流中的新值来更新状态。6.2.4DStream窗口操作滑动窗口转换操作是允许在滑动数据窗口范围内进行应用转换,而脱离了全部数据流的转换。滑动窗口转换的计算过程如图6-12所示,任何窗口操作都需要指定如下两个参数。图6-12滑动窗口转换的计算过程(1)windowLength:窗口时间间隔(窗口的持续时间),图6-12中的窗口长度为3。(2)slidInterval:滑动时间间隔(每隔多长时间执行一次计算),图6-12中的滑动时间间隔为2。6.2.4DStream窗口操作一些常见的窗口操作见表6-3,所有这些操作的参数都包括windowLength和slideInterval。官方API文档中提供了DStream转换的完整操作。表6-3一些常见的窗口操作6.2.5DStream输出操作DStream输出操作允许数据被输出到外部系统。例如,数据库或文件系统。由于输出操作实际上是转换后的数据,因此它们会触发所有DStream转换的实际执行(类似于RDD的操作)。目前,定义了表6-4所示的输出操作。表6-4输出操作表任务3SparkStreaming实战6.3.1统计本地文本单词个数本实训是实时统计指定目录下文本文件中的单词个数,我们指定在/tmp/test目录下每隔15秒钟统计一次新增加的文本中的单词数量。该实训内容主要应用于Internet中日志的统计、网站流量的统计等功能。实训的步骤大致如下。·使用IDEA工具创建名为WordCount的Scala文件,主要完成文本文件的单词个数统计功能。·定义main()方法。·创建StreamingContext对象,这是流处理关键点。·定义输入源。·指定流转换需要的指令,并做出相应处理(间隔时间,划分RDD规则,处理数据规则)。·调用StreamingContext的start()方法开始接收并处理数据。·处理过程会一直持续到StreamingContext调用stop()方法。6.3.1统计本地文本单词个数(1)使用IDEA工具创建名为WordCount的Scala文件,具体操作为:右击scala文件夹,在弹出的快捷菜单中执行New→ScalaClass命令,新建一个Scala类文件,名称为WordCount,如图6-13所示。图6-13新建一个Scala类文件6.3.1统计本地文本单词个数(2)编辑Scala代码,创建StreamingContext对象。StreamingContext对象是SparkStreaming应用程序与集群进行交互的唯一入口,它封装了Spark集群的环境信息和应用程序的一些属性信息。在创建StreamingContext对象之前,首先创建sparkConf,也就是Spark的配置文件,该对象通常需要指明应用程序的运行模式(本例中使用local[3])、设定应用程序名称(本例中为WordCount)、设定批处理时间间隔[本例中设定Seconds(15),即15秒],其中批处理时间间隔需要根据用户的需求和集群的处理能力进行适当的设置。6.3.1统计本地文本单词个数(3)定义输入源,SparkStream需要根据源类型选择相应的创建DStream的方法,处理输入源的方法有很多,这里使用textFileStream方法,表示从本地系统的/tmp/test目录下读取文件,一旦这个目录下有新的文件进来,StreamingContext就会读入这个文件并生成名为line_dstream的RDD。6.3.1统计本地文本单词个数(4)使用SBT对代码进行打包,单击Terminal按钮,然后进入IDEA的Terminal界面,在该界面中使用SBT工具的基本命令“sbtpackage”,将代码编译成jar包。第一次编译过程会比较慢,见图6-5。
当出现“success”的字样后,就说明编译成功,同时提示“packagingC:\\Users\\an\\......\\target\\scala-2.11\\wordcount_2.11-0.1.jar”,这说明已经在该目录下编译出了jar包,可以在该目录下查看是否有wordcount_2.11-0.1.jar包,见图6-6和图6-7。6.3.1统计本地文本单词个数(5)将编译好的jar包复制到Spark主节点下指定的目录下,然后执行命令“./spark-submit--class"WordCount"/home/workspace/wordcount_2.11-0.1.jar”,该命令可以填写除了class外的很多参数,具体的参数使用情况,可以参考官方文档中使用spark-submit启动应用程序部分的内容:/docs/latest/submitting-applications.html。运行结果如下。6.3.1统计本地文本单词个数(6)这里没有显示结果就是因为代码中监控的文件夹下没有文件,在/tmp/路径下新建一个名为words.txt的文件,该文件内容由多个单词组成,中间由空格分割,如图6-14所示。然后使用命令“mvwords.txt/tmp/test/”将words.txt文件移动到监控的文件目录下,然后发现监控的结果显示出了数据,运行结果如下。图6-14文本words.txt的内容
通过上述实例可以看出,该程序的确监控了/tmp/test目录下的文件情况,如果在之前的基础上再将一个文件复制到监控目录下,那么新的运行结果如下。-------------------------------------Time:1553072028000ms-------------------------------------(c,5)(d,5)(a,5)(b,5)6.3.2有状态操作累计统计单词个数本实训的目的是了解检查点的设置、初始化RDD的使用及自定义实现累计的函数function()。实训步骤大致如下。·使用IDEA工具创建名为CumulativeWord的Scala文件;定义main()方法;创建StreamingContext对象,这是流处理关键点。·设置检查点checkpoint。·初始化RDD、定义输入源、定义分割方式。·自定义累加函数mappingFunc()。·使用mapWithState()方法更新pairDStream。·调用StreamingContext的start()方法开始接收并处理数据。·处理过程会一直持续到StreamingContext调用stop()方法。6.3.2有状态操作累计统计单词个数(1)使用IDEA工具创建名为CumulativeWord的Scala文件,具体操作为:右击scala文件夹,在弹出的快捷菜单中执行New→ScalaClass命令,新建一个Scala类文件,名为CumulativeWord。编辑Scala代码,首先创建SparkConf(Spark的配置文件),该对象通常需要指明应用程序的运行模式(本例中使用local[2])、设定应用程序名称(本例中为NetworkWordCount)、设定批处理时间间隔[本例中设定Seconds(1),即1秒]。然后,创建StreamingContext对象。6.3.2有状态操作累计统计单词个数具体代码如下。importorg.apache.spark.SparkConfimportorg.apache.spark.streaming.{Seconds,State,StateSpec,StreamingContext}objectCumulativeWord{defmain(args:Array[String]):Unit={valconf=newSparkConf().setMaster("local[2]").setAppName("NetworkWordCount")valssc=newStreamingContext(conf,Seconds(1))//setcheckpoint6.3.2有状态操作累计统计单词个数(2)因为StreamingContext需要保存之前的信息,也就是有状态的,所以需要设置检查点checkpoint,检查点是为了保证流应用程序全天运行,当设备出现故障或崩溃时防止数据丢失,为了提高其容错性,能够方便地将它从故障中恢复。检查点分为以下两种类型。①元数据检查点:将定义流式计算的信息保存到容错存储(如HDFS)中,元数据包括:配置SparkConf(用于创建流的配置)、DStream操作(定义DStream的操作集)和不完整的批次(未完成的批次)。②数据检查点:将生成的RDD保存到存储中。在RDD的转换过程中,如果生成的RDD依赖于之前批次的RDD,就会导致恢复故障的时间增加,为了解决这样的问题,将之前批次的RDD周期性地保存到存储中,从而提高故障恢复速度。6.3.2有状态操作累计统计单词个数通过设置目录来启动检查点,检查点信息保存到该文件系统中,使用streamingContext.checkpoint(checkpointDirectory)代码,其中参数checkpointDirectory为检查点的目录。设置检查点的具体代码如下。ssc.checkpoint(".")6.3.2有状态操作累计统计单词个数(3)初始化RDD为list类型,list类型中是键值对形式,其中包含一个元素为("hello",0),创建用来表示TCP套接字输入的数据流DStream对象,数据流是从localhost:9999获取的,定义DStream分割条件为按照空格进行分割,将分割的单词设置为(word,1)的形式。//InitialstateRDDformapWithStateoperationvalinitialRDD=ssc.sparkContext.parallelize(List(("hello",0)))//CreateaDStreamthatwillconnecttohostname:port,likelocalhost:9999vallines=ssc.socketTextStream("localhost",9999)//Spliteachlineintowordsvalwords=lines.flatMap(_.split(""))//Counteachwordineachbatchvalpairs=words.map(word=>(word,1))6.3.2有状态操作累计统计单词个数(4)自定义函数mappingFunc(),该函数将作为参数传入pairDStream的mapWithState()方法中。mappingFunc的主要作用是将每次输入保存到state中,将之前的输入值与目前的输入值相加,然后更新state。//UpdatethecumulativecountusingmapWithState//ThiswillgiveaDStreammadeofstate(whichisthecumulativecountofthewords)valmappingFunc=(word:String,one:Option[Int],state:State[Int])=>{valsum=one.getOrElse(0)+state.getOption.getOrElse(0)valoutput=(word,sum)state.update(sum)output}6.3.2有状态操作累计统计单词个数(5)使用pairDStream中的mapWithState方法更新pairDStream,该方法需要一个参数:StateSpec对象,为了创建该对象,传入之前定义好的mappingFunc和initialRDD。如果对该方法的使用仍然存在疑惑,请参考源码或官方文档。valwordCounts=pairs.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD))//PrintthefirsttenelementsofeachRDDgeneratedinthisDStreamtotheconsolewordCounts.print()ssc.start()//Startthecomputationssc.awaitTermination()}}6.3.2有状态操作累计统计单词个数(6)使用SBT进行打包,然后执行命令“./spark-submit--class"CumulativeWord"/home/workspace/cumulativewords_2.11-0.1.jar”运行该jar包,操作流程请参考6.1.3,连续输入3个“helloworld”,如图6-15所示。图6-15输入内容6.3.3windows划窗统计热搜词我们使用windows划窗技术,模拟热点搜索词滑动统计,每隔10秒,统计最近60秒的搜索词的搜索频次,并打印出排名最靠前的3个搜索词及出现次数。实训方法的步骤大致如下。·使用IDEA工具创建名为WindowsWordNum的Scala文件;定义main()方法;创建StreamingContext对象,这是流处理关键点。·初始化RDD、定义输入源、定义分割方式。·定义划窗操作函数及其两个重要参数:窗口长度和滑动间隔。·对划窗函数操作后的结果进行组织排序,得到热度前3的单词。·打印结果且处理过程会一直持续到StreamingContext调用stop()方法。6.3.3windows划窗统计热搜词(1)使用IDEA工具创建名为WindowsWordNum的Scala文件,具体操作为:右击scala文件夹,在弹出的快捷菜单中执行New→ScalaClass命令,新建一个Scala类文件,名为WindowsWordNum。编辑Scala代码,首先创建SparkConf(Spark的配置文件),该对象通常需要指明应用程序的运行模式(本例中使用local[2])、设定应用程序名称(本例中为WindowsWordNum)、设定批处理时间间隔[本例中设定Seconds(5),即5秒]。然后,创建StreamingContext对象。具体代码如下。6.3.3windows划窗统计热搜词(2)创建用来表示TCP套接字输入的数据流DStream对象,设置数据流是通过本机的9999端口进行传输(localhost:9999),然后将接收到的数据集分割成DStream,首先按照空格将所有的搜索数据分割成单词,然后将单词以(word,1)的形式进行转换。valsearchLogsDStream=ssc.socketTextStream("locahost",9999)valsearchWordsDStream=searchLogsDStream.map{searchLog=>searchLog.split("")(1)}valsearchWordPairDStream=searchWordsDStream.map{searchWord=>(searchWord,1)}6.3.3windows划窗统计热搜词(3)使用DStream中的划窗操作,将DStream进行转换,这里对方法reduceByKeyAndWindow的参数进行详细的解析。reduceByKeyAndWindow(func,invFunc,windowLength,slideInterval,[numTasks])。①func:窗口内值变换函数,该实践中的方法是(v1:Int,v2:Int)=>v1+v2,意思是将两个值进行相加。②invFunc:反函数,这里省略。③windowLength:窗口长度,该实践中长度为60s。④slideInterval:滑动间隔,该实践中长度为10s。6.3.3windows划窗统计热搜词(4)使用DStream的transform转换操作,将之前统计的searchWordCountsDStream中的RDD进行转换,转换为一种单词的统计数量在前、单词在后的形式,然后取得统计数量在前3名的热搜单词数。valfinalDStream=searchWordCountsDStream.transform(searchWordCountsRDD=>{valcountSearchWordsRDD=sea
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 司法鉴定所财务制度
- 科创板对财务制度
- 食品会计财务制度
- 小微厂财务制度
- 农家书屋三个制度
- 公路工程施工监理招标投标制度
- 企业设备质量管理制度(3篇)
- 国贸理发活动策划方案(3篇)
- 2026江西九江市田家炳实验中学临聘教师招聘2人备考题库有完整答案详解
- 2026山东泰安市属事业单位初级综合类岗位招聘备考题库及答案详解(夺冠系列)
- GB/T 3487-2024乘用车轮辋规格系列
- CJT 313-2009 生活垃圾采样和分析方法
- 人工智能在塑料零件设计中的应用
- 《剧院魅影:25周年纪念演出》完整中英文对照剧本
- 蒋诗萌小品《谁杀死了周日》台词完整版
- tubeless胸科手术麻醉
- 物业保洁保安培训课件
- 人教版初中英语七至九年级单词汇总表(七年级至九年级全5册)
- 起重机械的安全围挡与隔离区域
- 水泥直塑施工方案
- 山东省安全员B证考试题库(推荐)
评论
0/150
提交评论