版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
实时计算书籍热度——
SparkStreaming实时计算框架项目背景书籍是人类进步的阶梯,数字时代的来临,也催生出“书”的新形式,即电子书。同时,众多售书的电商平台也应运而生。电商平台想要在激烈的竞争中脱颖而出,需要更着重于改善用户体验,并增加用户的黏性,把更多更好的书推荐给读者,扩大他们的知识视野,通过优质书籍推送实现多读书、全民阅读,增强文化自信,围绕举旗帜、聚民心、育新人、兴文化、展形象建设社会主义文化强国。用户无法找到适宜的书籍时往往会相信大众的选择,购买热度较高的书籍。基于这种情况,电商平台可以根据现有书籍的评分、销量、用户的评分次数等信息构建书籍热度,将一些热度较高的书推荐给用户,进而改善用户体验,增加用户黏性,激发用户的购买欲。项目背景书籍热度的计算公式如下,其中,u表示用户的平均评分,x表示用户的评分次数,y表示书籍的平均评分,z表示书籍被评分的次数。目前已采集了某电商网站上用户对书籍的评分数据文件BookRating.txt,数据字段说明如下,其中Rating字段中评分范围为1~5分。字段名称说明UserID用户IDBookID书籍IDRating用户对书籍的评分SparkStreaming简介DStream基础操作项目实施SparkStreaming简介SparkStreaming框架SparkStreaming运行原理初步使用SparkStreamingSparkStreaming框架SparkStreaming是Spark的子框架,用于处理流式数据的分布式框架,具有可伸缩、高吞吐量、容错能力强等特点。SparkStreaming能够和SparkSQL、SparkMLlib、SparkGraphX进行无缝集成,可以从Kafka、Flume、HDFS、Kinesis等数据源中获取数据,而且不仅可以通过调用map()、reduce()、join()等方法处理数据,也可以使用机器学习算法、图算法处理数据。经SparkStreaming处理后的最终结果可以保存在文件系统(如HDFS)、数据库(如MySQL)中或使用仪表面板进行实时展示。SparkStreaming框架SparkStreaming的运行原理如右图。SparkStreaming接收实时数据流并根据一定的时间间隔将其拆分成多个小的批处理作业
t。通过SparkEngine批处理引擎处理批数据,并批量生成最终的结果
r。SparkStreaming运行原理SparkStreaming运行原理SparkStreaming的输入数据按照时间片分成一段一段的数据,时间片可称为批处理时间间隔。时间片是人为设定的数据定量标准,作为数据拆分的依据。一个时间片的数据对应一个RDD实例。按照时间片划分得到批数据后,每一段数据都转换成Spark中的RDD,再将SparkStreaming中对DStream的转换操作变为对DStream中每个RDD的转换操作,并将中间结果保存在内存中。整个流式计算根据业务需求,可对中间结果进行累加计算或存储至外部设备。DStream即离散流,是SparkStreaming对内部持续的实时数据流的抽象描述。初步使用SparkStreaming使用SparkStreaming一般需进行如下操作:创建StreamingContext对象。创建DStream输入源:SparkStreaming需指明数据源。DStream输入源包括基础来源和高级来源。操作Dstream:对于从数据源得到的DStream,用户可以在DStream的基础上进行各种操作。启动SparkStreaming:之前的步骤仅创建了执行流程,程序未真正连接数据源,也未进行数据操作,仅设定了执行计划。执行“ssc.start()”命令后,程序才进行预期操作。初步使用SparkStreaming以单词实时计数为例,从slave1节点的8888端口上接收一行或多行文本内容,并对接收到的内容根据空格进行分割,实时计算每个单词出现的次数,具体实现过程如下。在slave1节点中通过“dnflist--installed|grepnc”命令查看是否安装nc工具,若查看结果中出现“nmap-ncat.x86_64”,说明已安装nc工具。若未安装nc工具,可通过“dnfinstall-ync”命令进行安装。在master节点上依次启动Hadoop集群和Spark集群,并在spark-shell交互式窗口中编写流处理程序执行流程。importorg.apache.spark.streaming.{Seconds,StreamingContext}valssc=newStreamingContext(sc,Seconds(10))vallines=ssc.socketTextStream("slave1",8888)valwords=lines.flatMap(_.split(""))valwordCounts=words.map(x=>(x,1)).reduceByKey(_+_)wordCounts.print()在创建StreamingContext对象时将会抛出警告信息,因为SparkStreaming自Spark3.4.0起已被官方标记为“不推荐使用”。尽管Spark3.5.1目前仍支持使用SparkStreaming,但在后续版本中可能会完全弃用。初步使用SparkStreaming在slave1节点上通过“nc-l8888”命令启动nc进程进入8888端口,之后在master节点的spark-shell中执行“ssc.start()”命令启动流处理程序。在slave1节点上的8888端口中输入文本内容“IamlearningSparkStreamingnow”并回车,之后将在master节点的spark-shell中输出对文本内容进行单词计数之后的结果。程序运行完毕后,若想关闭slave1节点监听的8888端口,或想停止master节点的spark-shell中的流处理程序,均可通过【Ctrl+C】组合键关闭当前进程。初步使用SparkStreaming除了以Socket连接作为数据源读取数据之外,StreamingContext的API还提供了获取其他数据源的方法。例如,可以从HDFS中获取数据创建DStream作为输入源,使用ssc.textFileStream()方法监听HDFS的目录,一旦有新文件加入该目录,SparkStreaming将实时计算目录下文件中的单词词频。importorg.apache.spark.streaming.{Seconds,StreamingContext}valssc=newStreamingContext(sc,Seconds(10))vallines=ssc.textFileStream("/tipdm/data/sparkStreaming/temp")valwords=lines.flatMap(_.split(""))valwordCounts=words.map(x=>(x,1)).reduceByKey(_+_)wordCounts.print()ssc.start()程序运行后,打开另外一个会话端口,在HDFS中创建/tipdm/data/sparkStreaming/temp目录,分别上传文件a.txt、b.txt到该目录下(两份文件的上传时间间隔10秒以上)。SparkStreaming一旦监控到该目录下有新文件加入,便会在10秒内对文件的单词进行词频统计并输出结果。初步使用SparkStreamingSparkStreaming简介DStream基础操作项目实施DStream基础操作DStream编程模型使用DStream转换操作使用DStream窗口操作使用DStream输出操作DStream编程模型DStream为持续性的数据流,可以通过外部数据源获取DStream,也可以通过DStream的高级操作生成新的DStream。DStream代表着一系列持续的RDD,每个RDD都是按一小段时间分割开的。对DStream的任何操作都会转化成对底层RDD的操作。以单词计数为例,获取文本数据形成文本的输入数据流linesDStream。使用flatMap()方法进行扁平化操作并进行分割,得到每一个单词,形成单词的文本数据流wordsDstream。使用DStream转换操作DStream转换操作常用的方法:方法描述map(func)对源DStream的每个元素应用func函数并返回一个新的DStreamflatMap(func)类似map操作,不同的是每个元素可以被映射成0个、1个或者多个输出元素filter(func)对源DStream中的每一个元素应用func函数进行计算,如果func函数返回结果为true,则保留该元素,否则丢弃该元素,返回一个新的DStreamrepartition(numPartitions)更改DStream的分区,主要用于更改并行度,repartition()方法会作用于DStream中每个RDD,对它们重新分区并返回一个新的DStream使用DStream转换操作方法描述union(otherStream)合并两个DStream,生成一个包含两个DStream中所有元素的新的DStreamcount()统计DStream中每个RDD包含的元素的个数,得到一个只有一个元素的RDD构成的DStreamreduce(func)对源DStream中的每个元素应用func函数进行聚合操作,返回一个内部所包含的RDD只有一个元素的新DStreamcountByKey()计算DStream中每个RDD内的元素出现的频次,并返回新的DStream[(K,Long)],其中K是DStream中键的类型,Long是元素出现的频次使用DStream转换操作方法描述reduceByKey(func,[numTasks])以一个键值RDD为目标,K为键,V为值。当一个(K,V)键值对的DStream被调用时,返回(K,V)键值对的新DStream,其中每个键的值都使用聚合函数func汇总。配置numTasks可以设置不同的并行任务数join(otherStream,[numTasks])当调用的是(K,V1)和(K,V2)键值对的两个DStream时,返回元素为(K,(V1,V2))键值对的一个新DStreamcoGroup(otherStream,[numTasks])当被调用的两个DStream分别含有(K,V1)和(K,V2)键值对时,返回一个元素为(K,Seq[V1],Seq[V2])的新的DStreamtransform(func)通过对源DStream的每个RDD应用func函数返回一个新的DStream,用于在DStream上进行RDD的任意操作使用DStream转换操作将语句分割为单词的操作换用transform()方法实现,并直接输出分割后的单词。运行代码后,在slave1节点中通过“nc-l8888”命令启动nc进程进入8888端口,输入“IamlearningSparkStreamingnow”并回车,再回到spark-shell中查看运行结果。importorg.apache.spark.streaming.{Seconds,StreamingContext}valssc=newStreamingContext(sc,Seconds(10))vallines=ssc.socketTextStream("slave1",8888)valwords=lines.transform(rdd=>rdd.flatMap(_.split("")))words.print()ssc.start()使用DStream窗口操作窗口操作指的是在DStream上,将一个可配置长度的窗口,以一个可配置的速率向前移动,根据窗口操作的具体内容,对窗口内的数据执行计算操作,每次落入窗口内的RDD数据会进行合并并执行相应操作,最后生成的新RDD会作为窗口DStream的一个RDD。使用DStream窗口操作
DStream窗口操作常用的方法:方法描述window(windowLength,slideInterval)返回一个基于源DStream的窗口批次计算后得到的新DStreamcountByWindow(windowLength,slideInterval)返回基于滑动窗口的DStream中的元素的数量reduceByWindow(func,windowLength,slideInterval)基于滑动窗口对源DStream中的元素进行聚合操作,得到一个新的DStreamreduceByKeyAndWindow(func,windowLength,slideInterval,[numTasks])基于滑动窗口对元素为(K,V)键值对的DStream中的值,按K使用func函数进行聚合操作,得到一个新的DStream使用DStream窗口操作方法描述reduceByKeyAndWindow(func,invFunc,windowLength,slideInterval,[numTasks])一个更高效的reduceByKeyAndWindow()的实现版本,其中每个窗口的统计量是通过使用前一个窗口的新数据并减去离开窗口的旧数据来实现的。例如,计算t+4秒这个时刻过去5秒窗口的WordCount,可以将t+3秒时刻过去5秒的统计量加上[t+3秒,t+4秒]的统计量,再减去[t-2秒,t-1秒]的统计量。这种方法可以复用中间3秒的统计量,从而提高统计效率。countByValueAndWindow(windowLength,slideInterval,[numTasks])通过滑动窗口计算源DStream中每个RDD内每个元素出现的频次,并返回DStream[(K,Long)],其中K是RDD中元素的类型,Long是元素频次。与countByValue一样,reduce任务的数量可以通过一个可选参数进行配置使用DStream窗口操作以window,介绍DStream窗口操作的方法。运行完代码后,在slave1节点中通过“nc-l8888”命令启动nc进程进入8888端口,在监听端口每秒输入1个数字并回车,如依次输入1、2、3、4、5,再回到spark-shell中查看运行结果。importorg.apache.spark.streaming.{Seconds,StreamingContext}valssc=newStreamingContext(sc,Seconds(1))vallines=ssc.socketTextStream("slave1",8888)valwords=lines.flatMap(_.split(""))valwindowWords=words.window(Seconds(3),Seconds(2))windowWords.print()ssc.start()使用DStream窗口操作reduceByKeyAndWindow()方法类似于reduceByKey()方法,但两者的数据源不同,reduceByKeyAndWindow()方法的数据源是基于DStream窗口的。例如,将当前长度为3秒的时间窗口中的所有数据元素根据键进行合并,统计当前3秒内不同单词出现的次数。importorg.apache.spark.streaming.{Seconds,StreamingContext}valssc=newStreamingContext(sc,Seconds(1))vallines=ssc.socketTextStream("slave1",8888)valwords=lines.flatMap(_.split(""))valpairs=words.map(word=>(word,1))valwindowWords=pairs.reduceByKeyAndWindow((a:Int,b:Int)=>(a+b),Seconds(3),Seconds(2))windowWords.print()ssc.start()使用DStream输出操作DStream输出操作常用的方法:方法描述print()在Driver中输出DStream中数据的前10个元素saveAsTextFiles(prefix,[suffix])将DStream中的内容以文本的形式保存为文本文件,其中每次批处理间隔内产生的文件再单独保存为文件夹,文件夹以prefix_TIME_IN_MS[.suffix]的方式命名saveAsObjectFiles(prefix,[suffix])将DStream中的内容按对象序列化,并且以SequenceFile的格式保存。其中每次批处理间隔内产生的文件以prefix_TIME_IN_MS[.suffix]的方式命名使用DStream输出操作方法描述saveAsHadoopFiles(prefix,[suffix])将DStream中的内容以Hadoop支持的输出格式保存为Hadoop文件,其中每次批处理间隔内产生的文件以prefix_TIME_IN_MS[.suffix]的方式命名foreachRDD(func)基本的输出操作,将func函数应用于DStream中的RDD上,输出数据至外部系统,如保存RDD到文件或网络数据库等使用DStream输出操作例如,将监听端口中输入的内容保存至HDFS的/tipdm/data/saveAsTextFiles目录下,设置每秒生成一个文件夹。运行完代码后,在slave1节点中通过“nc-l8888”命令启动nc进程进入8888端口,再在HDFS的Web端中查看运行结果,即可看到/tipdm/data/saveAsTextFiles目录下将生成一系列以“sahf”为前缀,“txt”为后缀的文件夹。importorg.apache.spark.streaming.{Seconds,StreamingContext}valssc=newStreamingContext(sc,Seconds(1))vallines=ssc.socketTextStream("slave1",8888)lines.saveAsTextFiles("hdfs://master:8020/tipdm/data/saveAsTextFiles/sahf","txt")ssc.start()使用DStream输出操作使用DStream输出操作在使用foreachRDD()方法的过程中需避免以下错误:在创建连接对象时,避免在SparkDriver端创建连接对象。在Worker上创建连接对象会为每一个记录创建一个连接对象,会产生非常大的开销,且可能会显著降低系统的整体吞吐量。dstream.foreachRDD{rdd=>valconnection=createNewConnection()rdd.foreach{record=>connection.send(record)}}dstream.foreachRDD{rdd=>rdd.foreach{record=>valconnection=createNewConnection()connection.send(record)connection.close()}}使用DStream输出操作foreachRDD()方法的正确使用和优化:dstream.foreachRDD{rdd=>rdd.foreachPartition{partitionOfRecords=>valconnection=createNewConnection()partitionOfRecords.foreach(record=>connection.send(record))connection.close()
ConnectionPool.returnConnection(connection)}}使用DStream输出操作以网站热词排名为例,介绍如何正确使用foreachPartition()方法,并将处理结果写入MySQL数据库中。首先在MySQL数据库中创建数据库和表用于接收处理后的数据。在IntelliJIDEA中编写Spark代码(项目需要加入MySQL对应驱动包),将窗口长度和滑动步长均设置为一分钟,即每隔一分钟计算一次这一分钟内每个单词出现的次数,根据出现的次数对单词进行排序。虽然DStream没有提供sort()方法,但可以使用transform()方法调用RDD的sortByKey()方法实现排序。再使用foreachPartition()方法创建MySQL数据库连接对象,使用该连接对象将数据输出到searchKeyWord表中。使用DStream输出操作先在slave1节点中通过“nc-l8888”命令启动nc进程监听8888端口,之后在IntelliJIDEA中运行编写好的代码,再回到slave1节点的监听端口输入的数据。在IntelliJIDEA的控制台中查看程序运行结果。进入MySQL数据库中查看写入searchKeyWord表的数据。SparkStreaming简介DStream基础操作项目实施获取输入数据源使用SparkStreaming实时处理数据时首先需要有实时产生的数据,为此,本任务将自定义一个日志生成模拟器来模拟数据的实时产生。将用户对书籍的评分数据文件BookRating.txt保存至本地“E:\\data”目录下,并创建“E:\\data\\StreamingData”目录。设置每隔60秒随机从BookRating.txt文件中抽取100条记录并添加至新日志文件中,新生成的日志文件存放在“E:\\data\\StreamingData”路径下。在IntelliJIDEA中创建项目工程并编写日志生成模拟器代码后,直接运行程序即可在“E:\\data\\StreamingData”路径下产生一系列以“StreamingData-”为前缀,“txt”为后缀的文件,产生的每个文件中都有100条记录。获取输入数据源编写程序实例化StreamingContext对象并监控“E:\\data\\StreamingData”路径,实时抽取产生的新文件的数据并转化为数据流,设置批处理时间间隔为60秒。在获取到数据流后,通过split()方法按制表符进行切分,并输出数据流进行测试。在IntelliJIDEA中编写rating类后并执行,同时执
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 我国文化产业发展现状与政策解读试卷
- 2026全科高效学习标准化指令模板大全
- 小学三年级上册《美丽的小兴安岭》中春天“树木抽出新的枝条”的“抽”字妙用知识点试卷
- 小学三年级上册《汉字基本笔画巩固(横、竖、撇、捺、点、提、折、钩)》知识点试卷
- 小学科学《血液循环系统》单元知识点试卷
- 湖北省云学联盟2025-2026学年高一上学期12月学科素养测评英语试题
- 小学二年级下册整百整千加减法知识点复习试卷
- 云南省文山壮族苗族自治州2025-2026学年高一上学期1月期末物理试题
- 2026年防火培训测试题及答案
- 2026年异性人气测试题及答案
- 制糖业的环保措施
- 韶音供应商QSA+QPA审核-checklist-V1
- 开胸心肺复苏术技术操作规范
- JGT483-2015 岩棉薄抹灰外墙外保温系统材料
- 减压赋能-轻松前行心理课件
- 建筑节能技术及应用课件
- 墩柱模板计算书1
- 中职数学基础模块下册第八章《直线和圆的方程》单元检测试题及参考答案
- 控规项目投标技术标文件2019.1.18
- 幸存者偏差理论
- 初中英语语法中考复习词性转换精讲 课件 (共14张PPT)
评论
0/150
提交评论