版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
电影数据分析实现
学完本课程后,您将可以:掌握Spark编程模型掌握Spark常用的开发工具掌握IntelliJIDEA工具能完成IntelliJIDEAWindows版环境搭建能使用IntelliJIDEA工具开发第一个程序WordCount能对用IntelliJIDEA打包程序提交到集群运行能对搭建环境过程中遇到的问题进行分析01搭建Spark开发环境02编写第一个Spark程序03打包并运行Spark程序04编程实现电影数据分析301搭建Spark开发环境IntelliJIDEA介绍IntelliJIDEA(简称IDEA)是JetBrains公司推出的Java集成开发环境(IDE),凭借其智能化设计、强大的功能生态和开发者友好的体验,已成为全球Java开发者的主流选择,并在Kotlin、Scala、Python、JavaScript等多语言开发中表现突出IntelliJIDEAIntelliJIDEA凭借其对Java和Scala语言的深度支持、智能化工具链以及开发者体验的全面优化,被广泛认为是Java和Scala开发的首选集成开发环境(IDE)IntelliJIDEA版本IntelliJIDEA版本社区版(免费)专业版(收费)IntelliJIDEA下载官网下载以上是当前最新IntelliJIDEACommunityEditon(社区版),点击“下载”按钮IntelliJIDEA安装IntelliJIDEA安装IntelliJIDEA安装IntelliJIDEA支持Scala开发:安装Scala插件IntelliJIDEA全局配置
Maven配置Maven国内镜像中央仓库(国外)国内镜像同步数据Maven默认从中央仓库下载Jar包配置Maven国内镜像阿里云镜像配置<mirror>
<id>alimaven</id>
<name>aliyunmaven</name>
<url>/nexus/content/groups/public/</url>
<mirrorOf>central</mirrorOf>
</mirror>Spark运行流程Linux服务器(虚拟机)(运行Spark程序)WindowsPC机(编写Spark程序)
打成Jar包Windows配置支持Spark运行环境-winutilswinutils是一个用于在Windows上运行Hadoop的实用程序,它提供了一些Hadoop需要的系统工具和Shell命令下载的winutils压缩包解压后,里面会包含多个hadoop版本,选择兼容版本Zeppelin介绍ApacheZeppelin是一个提供交互数据分析的网页端notebook(用户可以在其中编写和运行Spark代码,并直接在浏览器中查看结果)对于公司的数据分析人员来说,虽然SparkShell提供了交互式数据查询的功能,但是更喜欢使用的是基于Web的Notebook工具Spark和Zeppelin可以很好地结合工作,用于数据处理和可视化Zeppelin下载官方下载地址:/download.html/Zeppelin安装-解压安装包并重命名#tar-xzvfzeppelin-0.12.0-bin-all.tgz-C/data/apps/#mvzeppelin-0.12.0-bin-all/zeppelin-0.12.0解压安装包并修改名称tar解压命令-xzvf参数表示解压并提取由tar和gzip/xz压缩过的归档文件mv命令主要用于移动文件或目录,同时它也常被用来重命名文件或目录Zeppelin安装-环境变量#vim/etc/profileexportZEPPELIN_HOME=/data/apps/zeppelin-0.12.0exportPATH=$PATH:$ZEPPELIN_HOME/binvim编辑环境变量执行source环境变量生效命令#source/etc/profileZeppelin安装-修改配置文件zeppelin-site.xml<property>
<name>zeppelin.server.addr</name>
<value></value>
<description>Serverbindingaddress</description></property><property><name>zeppelin.server.port</name><value>9090</value><description>Serverport.</description></property><property><name>zeppelin.server.ssl.port</name><value>9443</value></property>启动
Zeppelin执行
zeppelin-daemon.shstart
命令jps命令:查看Java进程(JVM)Zeppelin浏览器界面浏览器输入:31:9090,打开访问界面Zeppelin创建新Note点击“Notebook/CreateNewNote”输入“/spark_demo/test”Zeppelin测试输入spark测试代码Zeppelin与SparkShell相比有何优势?
为何要配置Maven国内镜像?
讲解了IntelliJIDEA介绍和安装讲解了Zeppelin安装和基本使用2802编写第一个Spark程序Spark编程模型介绍RDD被表示为对象,通过调用其对象上的方法来对RDD进行转换RDD经过一系列的transformations转换定义之后,就可以通过调用Action算子来触发RDD的计算(Action可以是向应用程序返回结果,也可以是向存储系统保存数据)在Spark中,只有在Action算子被调用时,才会执行RDD的计算(即延迟计算)SparkWordCount案例分析WordCount是大数据处理中一个非常经典且基础的案例,用于统计文本中每个单词出现的次数(通过Spark实现WordCount可以展示Spark处理大规模数据的能力和高效性)代码步骤分析valconf=newSparkConf().setAppName("WordCount"):创建Spark配置对象,并设置应用程序名称valsc=newSparkContext(conf):基于配置创建Spark上下文valinput=sc.textFile("your_file.txt"):从指定文件读取文本数据valwords=input.flatMap(line=>line.split("")):将每行文本按空格分割为单词valwordCounts=words.map(word=>(word,1)):将每个单词映射为键值对,值为1,表示出现一次reduceByKey((a,b)=>a+b):按照单词进行分组,并对每个组内的值进行累加,从而得到每个单词的出现次数SparkWordCount代码实现-创建Maven工程SparkWordCount代码实现-导入Spark依赖<dependencies><!--SparkCore--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.3.4</version></dependency></dependencies>Spark依赖SparkWordCount代码实现-添加Scala支持Maven工程默认不支持Scala(需要手动添加Scala支持)SparkWordCount代码实现-编写Scala代码importorg.apache.spark.rdd.RDDimportorg.apache.spark.{SparkConf,SparkContext}objectWordCount{defmain(args:Array[String]):Unit={valconf=newSparkConf().setAppName("WC").setMaster("local[*]")//1.创建SparkContext,该对象是提交SparkApp的入口valsc=newSparkContext(conf)//2.读取指定位置文件:helloxfxfvallineRdd:RDD[String]=sc.textFile("input")lineRdd.collect()//3.读取的一行一行的数据分解成一个一个的单词(扁平化)(hello)(xf)(xf)valwordRdd:RDD[String]=lineRdd.flatMap(line=>line.split(""))wordRdd.collect()35//4.将数据转换结构:(hello,1)(xf,1)(xf,1)valwordToOneRdd:RDD[(String,Int)]=wordRdd.map(word=>(word,1))//5.将转换结构后的数据进行聚合处理xf:1、1=》1+1(xf,2)valwordToSumRdd:RDD[(String,Int)]=wordToOneRdd.reduceByKey((v1,v2)=>v1+v2)//6.将统计结果采集到控制台打印valwordToCountArray:Array[(String,Int)]=wordToSumRdd.collect()wordToCountArray.foreach(println)
//7.关闭连接sc.stop()}}SparkWordCount代码实现-编译运行控制台输出正确结果,说明本地Spark程序运行成功介绍Spark的编程模型(数据处理流程)?说一说
SparkWordCount程序中使用了哪些转换操作和行动操作?有何作用?讲解了
Spark编程模型介绍讲解了SparkWordCount案例分析讲解了SparkWordCount代码实现
4003打包并运行Spark程序Maven打包插件介绍字段说明Maven提供各种打包插件(可以将代码编译的class文件打成JAR包)插件名称典型用途maven-jar-plugin生成普通JAR包(编译类文件和资源打包)maven-assembly-plugin自定义打包(可执行/可分发包)maven-shade-plugin生成Uber/FatJAR(合一依赖到单一JAR包)maven-war-plugin生成WAR(Web项目打包)maven-source-plugin生成源码包使用maven-jar-plugin插件打包分析用户的活跃时间使用maven-jar-plugin插件打包分析用户的活跃时间打包成功后,将在工程target文件下生成WordCount-1.0-SNAPSHOT.jar包提交Spark程序到集群运行#spark-submit\--classcom.xf.spark.WordCount\--masterspark://master:7077\WordCount.jar\/input\/output运行Spark程序命令Spark程序到集群运行结果
46打包程序是打包源代码文件吗?47思考提交Spark程序到集群的背后运行机制?讲解了Maven打包插件介绍讲解了Maven打包插件实操讲解了提交Spark程序到集群运行4904编程实现电影数据分析项目背景
电影推荐(MovieLens)是美国明尼苏达大学(Minnesota)计算机科学与工程学院的GroupLens项目组创办的,是一个非商业性质的、以研究为目的的实验性站点。电影推荐系统主要使用协同过滤和关联规则相结合的技术对电影数据进行分析,通过分析用户的观影历史、评价、喜好等数据,能够为用户提供个性化的电影推荐。这种个性化服务有助于用户更方便地发现符合其口味和兴趣的电影,提升了观影体验。此外,推荐系统也能够为用户推荐他们可能未曾了解但却符合其兴趣的新作品,从而促使用户拓展观影领域,提升了用户对电影的多样性认知电影推荐电影数据集
电影数据集评级文件ratings.dat用户文件users.data电影文件movies.data职业文件occupations.data电影数据集-评级文件(ratings.data)
UserID::MovieID::Rating::Timestamp1::1193::5::9783007601::661::3::9783021091::914::3::9783019681::314::2::9783019611::314::1::978301965字段说明UserIDMovieIDRatingTimestamp用户ID电影ID电影评级(1-5星级)时间戳以秒为单位表示电影数据集-用户文件(users.data)
UserID::Gender::Age::Occupation::Zip-code1::F::1::10::480672::M::56::16::700723::M::25::15::55117字段说明UserIDGenderAgeOccupation用户ID用户的性别(F:女性M:男性)用户年龄用户职业Zip-code邮政编码电影数据集-电影文件(movies.data)
1::ToyStory(1995)::Animation|Children's|Comedy2::Jumanji(1995)::Adventure|Children's|Fantasy3::GrumpierOldMen(1995)::Comedy|Romance字段说明MovieIDTitleGenres电影ID电影名称电影类型电影数据集-职业文件(occupations.data)
OccupationID::Occupation0::otherornotspecified1::academic/educator2::artist字段说明OccupationIDOccupation职业ID职业名称功能需求计算电影中平均得分最高(口碑最好)的电影及观看人数最高的电影(流行度最高)TopN通过找出电影中平均得分最高的电影,可以了解哪些电影最受用户欢迎,从而可以针对这些电影进行更精准的推荐统计最受男性喜爱的电影TopN和最受女性喜爱的电影TopN目的在于分析不同性别对电影的偏好,从而为电影推荐系统提供性别差异化的推荐策略通过分析男性和女性分别对哪些电影表现出更高的兴趣,可以更好地理解不同性别观众的电影消费习惯和喜好,进而优化推荐算法,确保推荐的电影更符合各自性别观众的口味和需求这种分析有助于提高电影推荐系统的准确性和用户满意度,同时也为电影制作方提供了有价值的市场分析数据,帮助他们更好地了解目标观众群体,从而制作出更受欢迎的电影内容12需求实现-创建SparkContext对象实例objectMovieUsersAnalyzer{defmain(args:Array[String]):Unit={//创建SparkConfvalconf:SparkConf=newSparkConf().setMaster("local[*]")//设置Spark主节点为本地.setAppName("MovieUsersAnalyzer")//设置应用名称//创建SparkContext
valsc:SparkContext=newSparkContext(conf)}需求实现-读取HDFS文件//读取HDFS数据返回RDDvalusersRDD:RDD[String]=sc.textFile(dataPath+"users.dat")valmoviesRDD:RDD[String]=sc.textFile(dataPath+"movies.dat")valoccupationsRDD:RDD[String]=sc.textFile(dataPath+"occupations.dat")valratingsRDD:RDD[String]=sc.textFile(dataPath+"ratings.dat")需求实现-所有电影中平均得分最高的电影(需求一)println("所有电影中平均得分最高(口碑最好)的电影:")valratings:RDD[(String,String,String)]=ratingsRDD.map(_.split("::"))//分割数据.map(x=>(x(0),x(1),x(2)))//(UserID,MovieID,Rating).cache()//缓存数据//(MovieID,平均评分)ratings.map(x=>(x._2,(x._3.toDouble,1)))//(MovieID,(Rating,1)).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2))//(MovieID,(总评分,总评分次数)).map(x=>(x._1,x._2._1/x._2._2))//(MovieID,平均评分).sortBy(_._2,ascending=false)//对value降序排列.take(10)//取前10个.foreach(println)需求实现-所有电影中粉丝或者观看人数最多的电影(需求一)println("所有电影中粉丝或者观看人数最多的电影:")ratings.map(x=>(x._2,1)).reduceByKey(_+_).map(x=>(x._2,x._1)).sortByKey(ascending=false).map(x=>(x._2,x._1)).take(10).foreach(println)//(MovieID,总次数)ratings.map(x=>(x._2,1)).reduceByKey(_+_).sortBy(_._2,ascending=false).take(10).foreach(println)需求实现-读取用户数据,并与评分数据进行关联(需求二)valratings2:RDD[(String,(String,String,String))]=ratings.map(x=>(x._1,(x._1,x._2,x._3)))valusersRDD2:RDD[(String,String)]=usersRDD.map(_.split("::")).map(x=>(x(0),x(1)))valgenderRatings:RDD[(String,((String,String,String),String))]=ratings2.join(usersRDD2).cache()需求实现-统计男性和女性评分数据(需求二)//统计男性的评分数据valmaleRatings:RDD[(String,String,String)]=genderRatings.filter(x=>x._2._2.equals("M")).map(x=>x._2._1)//统计女性的评分数据valfemaleRatings:RDD[(String,String,String)]=genderRatings.filter(x=>x._2._2.endsWith("F")).map(x=>x._2._1)需求实现-所有电影中最受男/女性喜爱的电影Top10(需求二)println("所有电影中最受男性喜爱的电影Top10:")maleRatings.map(x=>(x._2,(x._3.toDouble,1))).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2)).map(x=>(x._1,x._2._1/x._2._2)).sortBy(_._2,ascending=false).take(10).foreach(println)println("所有电影中最受女性喜爱的电影Top10:")femaleRatings.map(x=>(x._2,(x._3.toDouble,1))).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2)).map(x=>(x._1,x._2._1/x._2._2)).sortBy(_._2,ascending=false).take(10).foreach(println)64为什么WordCount案例中需要使用flatMap而非map进行单词分割?65电影数据分析项目中如果两个需求都需要输出电影名称应该如何实现?讲解了项目背景讲解了数据描述讲解了功能需求讲解了需求实现许斌/0181932025.11.19新开发银行客户数据分析
学完本课程后,您将可以:掌握SparkSQL概述掌握性能优化与调优了解银行客户数据简介能够进行DataFrameAPI基础操作实现客户行为分析01认识SparkSQL02SparkSQL基础03SparkSQL进阶操作04分析与统计银行客户数据7201认识SparkSQLSparkSQL概述SparkSQL是Spark上用来处理结构化数据的一个模块,SparkSQL是一个SQL解析引擎,将SQL语句解析成特殊的RDD(DataFrame
或DataSet
),然后在Spark集群中运行SparkSQL(操作结构化数据)SparkStreaming(实时计算)MLlib(机器学习)GraphX(分布式图计算)SparkCore(核心组件)StandaloneYARNMesosSparkSQL优势数据和计算的结构化视角SparkSQL引入了数据和计算的结构化视角。通过提供丰富的数据结构信息,它使得Spark能够更好地理解数据的组织方式和执行中的计算过程。这种结构化视角不仅仅局限于数据的模式,还包括计算过程中的各个环节内部优化引擎的加持SparkSQL内部的优化引擎基于结构化视角对数据和计算进行优化操作,使查询执行速度显著提升,同时减少资源消耗。无论使用SQL还是DatasetAPI,该优化引擎都能保持一致的高效性能多种交互方式的灵活性SparkSQL提供了多种交互方式,包括SQL查询和DatasetAPI。不论更习惯于使用SQL还是更倾向于编程的方式,都可以轻松地与SparkSQL进行交互。而且,不管使用哪种方式,最终都会使用相同的执行引擎,保证了一致的处理效果和性能API的无缝切换这种多交互方式的存在,使得开发人员可以在不同的API之间无缝切换。这种灵活性让开发人员能够更加自由地选择适合自己的方式处理数据SparkSQL引入DataFrame类DataFrame是SparkSQL的核心数据抽象(类似于关系型数据库中的表)SparkSQL引入Catalyst查询优化器Catalyst查询优化器是SparkSQL的核心引擎之一,负责将用户的查询和转换操作进行优化Catalyst通过逻辑和物理优化来重写查询计划,以提高执行效率Catalyst可以根据查询的逻辑结构应用一系列优化规则,然后生成更高效地物理执行计划(这种优化过程在后台自动进行,让用户无需手动干预)SQL查询与优化SQL字符串解析阶段未解析的逻辑计划分析阶段逻辑计划逻辑优化优化后的逻辑计划物理计划物理优化可执行代码Catalyst优化器SQL查询与优化查询解析逻辑优化物理优化查询执行SparkSQL引擎会解析查询语句,识别出SELECT、FROM、GROUPBY和ORDERBY等关键字,并确定需要查询的数据表和字段一旦查询被解析,Catalyst查询优化器就会介入。它会将查询转化为逻辑执行计划,该计划描述了查询的逻辑操作顺序,而不涉及具体的物理执行细节。在这个例子中,优化器可能会注意到GROUPBY和聚合操作,尝试将它们重排以减少数据移动优化器将逻辑执行计划转化为物理执行计划。物理计划决定了实际的执行方式,包括数据如何分区、哪些操作可以并行执行等。优化器可能会选择合适的分区进行聚合操作,以减少数据的传输和处理SparkSQL引擎根据物理执行计划执行查询。数据将被读取、聚合和排序,最终生成结果集。这一阶段的执行是基于优化后的物理计划进行的,以确保高效的查询执行1234(单选)以下哪个不是SparkSQL的特点?(
)支持多种数据源使用Catalyst进行查询优化支持DataFrame和DatasetAPI仅支持Scala语言
为何SparkSQL比RDD性能有显著提高?
讲解了
SparkSQL概述讲解了SparkSQL引入DataFrame讲解了SparkSQL引入Catalyst讲解了
Catalyst工作原理8202SparkSQL基础DataFrameAPI基础操作-创建Maven工程DataFrameAPI基础操作-添加Spark依赖<dependencies><!--SparkCore--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.3.4</version></dependency>
<!--SparkSQL--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>3.3.4</version></dependency></dependencies>DataFrameAPI基础操作-创建SparkSession会话objectDataFrameBasicOperations{defmain(args:Array[String]):Unit={
valspark=SparkSession.builder().appName("DataFrameBasicOperations").getOrCreate()}DataFrameAPI基础操作-创建数据集//创建数据集valdata:List[(Int,String,Double)]=List((1,"ProductA",100.0),(2,"ProductB",150.0),(3,"ProductC",200.0))//定义DataFrame列名valcolumns=Seq("id","product_name","price")
//将数据集转换为DataFrame
valdf:DataFrame=spark.createDataFrame(data).toDF(columns:_*)DataFrameAPI基础操作(1)//1.显示DataFrame数据
df.show()//2.获取DataFrame列名
valcolumnNames:Array[String]=df.columnscolumnNames.foreach(println)//3.获取DataFrame的
前n行数据
valfirstNRows:Array[Row]=df.head(2)firstNRows.foreach(println)DataFrameAPI基础操作(2)//4.统计DataFrame行数valrowCount:Long=df.count()println("rowCount:"+rowCount)//5.筛选出价格大于150的数据valfilteredDF:DataFrame=df.filter(col("price")>150)filteredDF.show()//6.对价格列求平均值valavgPrice:Double=df.agg(avg("price")).collect()(0)(0).asInstanceOf[Double]println("avgPrice:"+avgPrice)DataFrameAPI基础操作(3)//7.添加新列valdfWithNewColumn:DataFrame=df.withColumn("discounted_price",col("price")*0.9)dfWithNewColumn.show()//8.重命名列valrenamedDF:DataFrame=df.withColumnRenamed("product_name","item_name")renamedDF.show()//9.按价格升序排序valsortedDF:DataFrame=df.sort(col("price"))sortedDF.show()DataFrameAPI基础操作(4)//10.分组并聚合求平均值valavgPriceByGroup:DataFrame=df.groupBy("product_name").agg(avg("price"))avgPriceByGroup.show()//11.按照多列分组并聚合valavgPriceByMultiGroup:DataFrame=df.groupBy("product_name","id").agg(avg("price"))avgPriceByMultiGroup.show()//12.使用SQL表达式进行筛选valsqlFilteredDF:DataFrame=df.filter("price>150")sqlFilteredDF.show()DataFrameAPI基础操作(5)//13.使用selectExpr选择并计算新列valselectedDF:DataFrame=df.selectExpr("product_name","price*1.1asincreased_price")selectedDF.show()//14.删除列valdfWithoutColumn:DataFrame=df.drop("id")dfWithoutColumn.show()//15.使用distinct去重valdistinctDF:DataFrame=df.distinct()distinctDF.show()DataFrameAPI基础操作(6)//16.将DataFrame转换为临时表
df.createOrReplaceTempView("products")//17.使用SQL查询临时表valsqlResult:DataFrame=spark.sql("SELECT*FROMproductsWHEREprice>150")sqlResult.show()//18.使用collect将DataFrame转为本地数组vallocalArray:Array[Row]=df.collect()localArray.foreach(println)DataFrameAPI基础操作(7)//19.使用join连接两个DataFramevalotherData:List[(Int,String)]=List((1,"SupplierA"),(2,"SupplierB"))valotherColumns=Seq("id","supplier_name")valotherDF:DataFrame=spark.createDataFrame(otherData).toDF(otherColumns:_*)valjoinedDF:DataFrame=df.join(otherDF,Seq("id"))joinedDF.show()//20.使用union合并两个DataFramevaladditionalData:List[(Int,String,Double)]=List((4,"ProductD",300.0))valadditionalDF:DataFrame=spark.createDataFrame(additionalData).toDF(columns:_*)valcombinedDF:DataFrame=df.union(additionalDF)combinedDF.show()SparkSQL支持多种数据源(1)Parquet是一种列式存储格式,适用于大规模数据仓库和数据湖的数据存储。它提供了高效的压缩和列式存储功能,从而减少了存储和读取开销。ParquetORC(OptimizedRowColumnar,优化行列存储)是另一种列式存储格式,特别优化了Hive和Spark中的查询性能,同时也支持压缩和谓词下推等优化ORCJSON(JavaScript对象表示法)是一种常见的半结构化数据格式,用于存储和交换数据。SparkSQL可以轻松读取和解析JSON格式的数据JSONCSV(Comma-SeparatedValues)是一种常用的文本文件格式,用于存储表格数据。SparkSQL可以从CSV文件中读取数据,并将其解析为DataFrameCSVAvro是一种数据序列化格式,具有架构定义和自我描述能力。它适用于大规模数据的存储和处理AvroSparkSQL支持通过JDBC连接到关系型数据库,从而可以在分布式环境中对数据库中的数据进行查询和分析JDBC数据源SparkSQL支持多种数据源(2)SparkSQL兼容Hive的数据存储和元数据,可以直接读取Hive表和数据Hive数据源SparkSQL支持与ApacheCassandra集成,用于读取和写入Cassandra数据库中的数据CassandraSparkSQL可以与Elasticsearch集成,从中读取和写入数据,实现搜索和分析操作ElasticsearchSparkSQL可以消费Kafka主题中的数据,并将其转换为DataFrame进行分析KafkaSparkSQL支持多种数据源原因不同的业务场景和数据来源可能需要不同的数据存储格式和数据源支持数据来源的多样性数据存储和性能优化遗留系统和数据迁移生态系统集成组织可能从多个数据源获取数据,这些数据源可能使用不同的存储格式。支持多种数据源和格式有助于处理来自不同渠道的数据某些格式适用于不同类型的查询和分析操作。例如,列式存储格式(如Parquet和ORC)适用于分析型查询,而JSON和Avro可以适用于半结构化数据组织可能在不同的时间点使用不同的数据存储格式。支持多种格式使得数据从遗留系统迁移到新系统变得更加容易不同的存储格式和数据源可能在不同的生态系统中得到广泛支持。通过支持多种格式,可以更好地集成SparkSQL到不同的生态系统中原因SparkSQL支持多种数据源-代码演示//将数据保存为Parquet格式productsDF.write.parquet("path/to/save/parquet/data")//从Parquet格式读取数据valparquetDF=spark.read.parquet("path/to/save/parquet/data")//将数据保存为JSON格式productsDF.write.json("path/to/save/json/data")//从JSON格式读取数据valjsonDF=spark.read.json("path/to/save/json/data")//将数据保存为CSV格式productsDF.write.csv("path/to/save/csv/data")//从CSV格式读取数据valcsvDF=spark.read.csv("path/to/save/csv/data")(多选)在SparkSQL中,DataFrameAPI支持以下哪些操作?(
)数据筛选(filter)分组聚合(groupBy)连接操作(join)排序操作(sort)
如何创建DataFrame对象实例?讲解了DataFrameAPI基础操作讲解了SparkSQL支持的数据源和格式10103SparkSQL进阶操作高级操作与功能-窗口函数(WindowFunctions)说明窗口函数是一种强大的功能,允许在特定窗口内的数据集上进行聚合操作(如排名、移动平均、累积求和等)通过定义窗口规范,可以在数据的特定分区中执行计算,使分析更灵活和高效。窗口函数案例按部门对员工薪水进行排名高级操作与功能-窗口函数(WindowFunctions)SparkSQL提供了多种窗口函数,用于在窗口内进行聚合和分析操作窗口函数名称说明rank()计算排名(相同值会得到相同的排名,下一个排名会跳过相同的排名数量)dense_rank()计算密集排名(相同值会得到相同的密集排名,下一个排名不会跳过相同的排名数量)row_number()计算行号(按照窗口规范的顺序分配唯一的行号)lead()获取窗口内下一行的值(可以指定偏移量来获取更远的行)高级操作与功能-窗口函数(WindowFunctions)窗口函数名称说明lag()获取窗口内上一行的值(可以指定偏移量来获取更远的行)first_value()获取窗口内第一行的值last_value()获取窗口内最后一行的值sum()计算窗口内数值列的总和avg()计算窗口内数值列的平均值高级操作与功能-窗口函数(WindowFunctions)窗口函数名称说明min()计算窗口内数值列的最小值max()计算窗口内数值列的最大值count()计算窗口内行数percent_rank()计算百分比排名,以0到1之间的分数表示cume_dist()计算累积分布,以0到1之间的分数表示高级操作与功能-透视表和交叉表透视表和交叉表是在数据分析中常用的工具,用于将数据按照不同的维度进行汇总和分析,从而更好地理解数据的分布和关系透视表(PivotTable)透视表是一种在数据表中重新排列和汇总数据的方法,它将一列数据作为行标签,另一列数据作为列标签,然后在交叉点处显示汇总数据透视表的目的是帮助分析人员从不同的角度查看数据,以便更好地观察数据的趋势和关系交叉表(CrossTab)交叉表是一种汇总数据的方式,它将两列数据的交叉点进行统计计算,并显示在一个表格中交叉表通常用于显示两个不同维度的数据之间的关联和分布情况,从而帮助分析人员更好地理解数据的交叉关系高级操作与功能-用户自定义函数(UDFs)用户自定义函数(UserDefinedFunctions,UDFs)是SparkSQL中的一项强大功能,允许定义自己的函数,以在SQL查询中使用自定义逻辑和计算常见的用户自定义函数类型标量函数(ScalarFunctions)这是最常见的UDF类型,它接受一行输入并返回一个值。标量函数可用于执行单个值的计算,如对单个列进行转换或应用特定逻辑聚合函数(AggregateFunctions)聚合函数是用于计算一组值的单一结果的函数。可以自定义聚合函数,以在SQL查询中执行自定义的聚合操作,如计算中位数、加权平均等窗口函数(WindowFunctions)用户自定义窗口函数可以在窗口内进行聚合和分析操作。这允许在窗口内执行自定义的聚合计算,如在特定范围内计算累积和、移动平均等高级操作与功能-临时视图和全局视图临时视图和全局视图作为SparkSQL中用于管理数据和查询的工具(它们允许将DataFrame注册为临时表或全局视图,并在查询中使用)临时视图(TemporaryViews)临时视图是与SparkSession绑定的,它只在当前SparkSession的上下文中可用临时视图对于在查询中使用某个DataFrame或SQL查询的结果非常有用全局视图(GlobalViews)全局视图是在整个Spark应用程序范围内可见的视图。全局视图对于在不同的SparkSession和查询中共享数据非常有用通过createOrReplaceGlobalTempView方法,可以将DataFrame注册为一个全局视图,然后可以在任何SparkSession中的查询中使用该视图进行操作,只要这些SparkSession都使用相同的全局视图名通过createOrReplaceTempView方法,可以将DataFrame注册为一个临时视图,然后可以在同一SparkSession中的任何查询中使用该视图进行操作高级操作与功能-复杂数据类型在SparkSQL中,可以使用复杂数据类型来处理结构化数据中的嵌套和复杂结构常见的复杂数据类型结构体(Structs)一种复杂的数据类型,可以将多个字段组合在一起,类似于一个结构体或对象。每个结构体字段都有一个名称和数据类型映射(Maps)映射是一种键值对的集合,其中每个键都关联一个值。映射的键和值可以是不同的数据类型数组(Arrays)数组是一种有序集合,其中每个元素都有一个索引,可以按索引访问元素。数组可以包含同一数据类型的多个元素高级操作与功能-性能优化与调优(1)SparkSQL性能优化与调优是在大规模数据处理中至关重要的任务,可以显著提高查询速度和资源利用率常见的SparkSQL性能优化与调优策略策略名称说明数据分区和分桶在数据加载时,将数据分成更小的分区和分桶可以提高查询性能。分区可以使数据更有效地进行过滤,分桶可以减少数据的倾斜数据压缩合适的数据压缩格式可降低存储成本并加快数据读取速度数据持久化将频繁查询的数据持久化至内存或磁盘,以避免重复计算并提高查询性能谓词下推通过将过滤操作下推到数据源引擎,可减少数据传输和处理的量高级操作与功能-性能优化与调优(2)策略名称说明合理选择数据格式Parquet和ORC格式在存储和查询性能方面表现良好,优化数据存储和访问合理使用缓存利用Spark的缓存机制(如cache和persist方法)来缓存中间结果及频繁查询的数据,避免重复计算广播变量动态分区裁剪通过将过滤操作下推到数据源引擎,可减少数据传输和处理的量通过将小型数据集广播到每个节点,可避免数据倾斜和网络传输开销跳过无关数据通过使用谓词下推和过滤操作,可跳过不必要的数据加载和处理适当调整并行度根据集群资源和任务特点,适当调整并行度和分区数,以充分利用资源高级操作与功能-性能优化与调优(3)策略名称说明使用合适的硬件选择适应数据规模和计算需求的硬件配置,以实现更优的性能使用内存和磁盘合理配置Spark内存和磁盘存储,避免内存溢出和性能瓶颈监控和调优工具使用Spark监控工具,如SparkUI和历史服务器,进行性能分析和调优避免数据倾斜当数据分布不均匀时,通过合理的数据分区、分桶、广播等手段来避免数据倾斜数据过滤顺序通过将过滤性能更高的条件放在前面,以减少需要处理的数据量使用并行操作在查询中使用并行操作,如多个过滤条件或转换操作,以利用集群中的多核处理器高级操作与功能-性能优化与调优(4)策略名称说明索引和分析器使用合适的索引、分区和分析器,加速元数据查询和数据解析JVM和垃圾回收配置合适的JVM参数,避免频繁的垃圾回收,提高任务的执行效率批处理与流处理根据实际场景,选择适合的批处理或流处理模式,以获得更好的性能综合运用这些性能优化和调优策略,可以显著提高SparkSQL查询的性能和效率(在实际应用中,需要根据数据规模、集群配置和查询模式来灵活调整这些策略,以达到最佳性能)高级操作与功能-扩展与整合(1)SparkSQL提供了许多扩展和整合选项,能够更好地与外部数据源、工具和服务进行集成常见的SparkSQL扩展和整合方案策略名称说明外部数据源支持SparkSQL可以通过DataSourceAPI支持许多外部数据源,如ApacheHive、ApacheHBase、Cassandra、JDBC数据库、Elasticsearch等。这使得用户能够在Spark中轻松读取和写入不同类型的数据文件格式支持SparkSQL支持多种文件格式,如Parquet、ORC、Avro、JSON、CSV等,可以根据数据需求选择合适的文件格式集成ApacheHiveSparkSQL能够与ApacheHive兼容,可以通过HiveContext提供的接口查询Hive表,同时还支持使用Hive的UDFs、UDAFs和UDTFs高级操作与功能-扩展与整合(2)策略名称说明集成数据湖层SparkSQL可以集成数据湖层工具,如DeltaLake,能够进行事务性数据管理、版本控制和数据一致性维护机器学习整合SparkSQL可以与SparkMLlib集成,将SQL查询与机器学习模型训练和评估相结合可视化工具集成可以将SparkSQL与可视化工具(如Tableau、PowerBI)集成,方便生成报表和可视化分析流处理整合SparkSQL可以与SparkStreaming或结构化流(StructuredStreaming)整合,能够在实时数据处理中使用SQL查询数据质量工具可以将SparkSQL与数据质量工具集成,以进行数据清洗、验证和质量分析高级操作与功能-扩展与整合(3)策略名称说明分布式数据库支持SparkSQL可以与分布式数据库集成,如ApacheCassandra或ApacheHBase,从而在Spark中进行分布式数据查询和操作高级优化器和计划器可以使用SparkSQL的高级优化器和计划器扩展,实现更复杂的查询优化和执行自定义数据源可以开发自定义的数据源插件,使SparkSQL支持自定义数据源格式或数据接入方式扩展函数可以编写自定义的用户定义函数(UDFs)或聚合函数(UDAFs),以扩展SparkSQL的功能和操作与其他大数据工具集成SparkSQL可以与其他大数据工具和框架集成,如Kafka、Presto、Flink等,以实现更广泛的数据处理和分析高级操作与功能-扩展与整合(4)在大数据项目中SparkSQL与ApacheHive的集成比较常规,其能够在Spark中使用Hive的元数据、表定义、UDFs等,并能够在Spark中运行HiveQL查询(这种集成可以在不迁移现有Hive代码的情况下,利用Spark的计算能力来进行数据分析和处理)118你知道SparkSQL有哪些高级操作与功能?119你能说出哪些SparkSQL性能优化和调优策略?讲解了SparkSQL高级操作与功能讲解了SparkSQL性能优化与调优讲解了SparkSQL扩展与整合12104分析与统计银行客户数据银行客户数据简介
某银行已积累了海量的客户数据,现在希望其大数据分析团队能够利用SparkSQL技术对这些数据进行深入分析,以解决以下关键问题:1.查看不同年龄段的客户人数2.根据客户婚姻状况的不同显示对应的客户年龄分布3.年龄与平均余额的关系分析银行客户数据集字段定义字
段定
义age客户年龄job职业marital婚姻状况education受教育程度balance银行账户余额数据预处理与准备在SparkSQL中,数据预处理和准备是非常重要的步骤,这些步骤对于数据分析和挖掘的成功非常关键,以下是对银行数据进行的预处理操作//去除重复行valdeduplicatedDF=df.dropDuplicates()//处理缺失值valfilledDF=deduplicatedDF.na.fill("unknown",Seq("job","marital","education"))//选择所需的字段并创建新的DataFramevalselectedDF=filledDF.select("age","job","marital","education","balance")数据探索与分析SparkSQL提供了多种数据探索功能,可以深入了解数据、分析数据分布、关系和统计特性//注册DataFrame为临时视图df.createOrReplaceTempView("bank_data")//数据探索示例//1.分析不同职业的人数valjobDistribution=spark.sql("SELECTjob,COUNT(*)ascountFROMbank_dataGROUPBYjob")jobDistribution.show()//2.分析婚姻状况的比例valmaritalRatio=spark.sql("SELECTmarital,COUNT(*)ascount,(COUNT(*)*100.0/SUM(COUNT(*))OVER())asratioFROMbank_dataGROUPBYmarital")maritalRatio.show()125//3.统计平均余额和持有房贷的关系valbalanceHousingRelation=spark.sql("SELECThousing,AVG(balance)asavg_balanceFROMbank_dataGROUPBYhousing")balanceHousingRelation.show()//4.分析年龄段和平均余额的关系valageBalanceRelation=spark.sql("SELECTCASEWHENage<30THEN'Under30'WHENage>=30ANDage<40THEN'30-39'ELSE'40+'ENDasage_group,AVG(balance)asavg_balanceFROMbank_dataGROUPBYage_group")ageBalanceRelation.show()//5.统计持有贷款的人数和比例valloanStats=spark.sql("SELECTloan,COUNT(*)ascount,(COUNT(*)*100.0/SUM(COUNT(*))OVER())asratioFROMbank_dataGROUPBYloan")loanStats.show()客户行为分析-查看不同年龄段的客户人数在客户行为分析中,了解不同年龄段客户的分布情况是非常重要的一步。这可以帮助企业更好地了解客户群体的构成,从而制定针对性的市场策略和服务方案selectedDF.createOrReplaceTempView("selected_data")//创建临时视图//查询不同年龄段的客户人数valageGroupCountDF=spark.sql("""SELECTCASEWHENage<30THEN'Under30'WHENage>=30ANDage<40THEN'30-39'ELSE'40+'ENDASage_group,COUNT(*)AScustomer_countFROMselected_dataGROUPBYage_groupORDERBYage_group""")客户行为分析-根据客户婚姻状况的不同显示对应的客户年龄分布selectedDF.createOrReplaceTempView("selected_data")//创建临时视图//查询不同婚姻状况的客户年龄分布valmaritalAgeDistributionDF=spark.sql("""SELECTmarital,CASEWHENage<30THEN'Under30'WHENage>=30ANDage<40THEN'30-39'ELSE'40+'ENDASage_group,COUNT(*)AScustomer_countFROMselected_dataGROUPBYmarital,age_groupORDERBYmarital,age_group""")客户行为分析-年龄与平均余额的关系分析selectedDF.createOrReplaceTempView("selected_data")//创建临时视图//分析不同年龄段客户的平均余额valageBalanceAnalysisDF=spark.sql("""SELECTCASEWHENage<30THEN'Under30'WHENage>=30ANDage<40THEN'30-39'ELSE'40+'ENDASage_group,AVG(balance)ASavg_balanceFROMselected_dataGROUPBYage_groupORDER
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 2025赤峰市林西县招聘14名专职消防员模拟笔试试题及答案解析
- 深度解析(2026)《GBT 26831.5-2017社区能源计量抄收系统规范 第5部分:无线中继》(2026年)深度解析
- 深度解析(2026)《GBT 26020-2010金废料分类和技术条件》(2026年)深度解析
- 2025云南昆明市第三人民医院“凤凰引进计划”高层次人才招引考试笔试备考题库及答案解析
- 2025年12月江苏南京江北新区教育局所属部分事业单位招聘教师20人参考考试试题及答案解析
- 2025甘肃中兰能投有限公司贵州分公司招聘备考考试试题及答案解析
- 2025天津市西青经开区投资促进有限公司第二批次招聘工作人员3人考试笔试备考题库及答案解析
- 锦江区新兴领域党建工作专员招募(20人)参考考试题库及答案解析
- 2025安徽淮北濉溪县龙华高级中学教师招聘20人备考笔试题库及答案解析
- 2025重庆大学高端装备机械传动全国重点实验室科研团队劳务派遣技术人员招聘考试参考试题及答案解析
- 2025中原农业保险股份有限公司招聘67人笔试备考重点试题及答案解析
- 2025中原农业保险股份有限公司招聘67人备考考试试题及答案解析
- 2025年度河北省机关事业单位技术工人晋升高级工考试练习题附正确答案
- 交通运输布局及其对区域发展的影响课时教案
- 2025年中医院护理核心制度理论知识考核试题及答案
- GB/T 17981-2025空气调节系统经济运行
- 比亚迪储能项目介绍
- 2025年9月广东深圳市福田区事业单位选聘博士11人备考题库附答案
- 糖尿病足溃疡VSD治疗创面氧自由基清除方案
- 《公司治理》期末考试复习题库(含答案)
- 自由职业者项目合作合同协议2025年
评论
0/150
提交评论