Spark大数据技术与应用(第3版)(微课版)课件 项目5-10 分析水稻品种审定数据 Spark SQL结构化数据文件处理-广告检测的流量作弊识别 Spark综合实战_第1页
Spark大数据技术与应用(第3版)(微课版)课件 项目5-10 分析水稻品种审定数据 Spark SQL结构化数据文件处理-广告检测的流量作弊识别 Spark综合实战_第2页
Spark大数据技术与应用(第3版)(微课版)课件 项目5-10 分析水稻品种审定数据 Spark SQL结构化数据文件处理-广告检测的流量作弊识别 Spark综合实战_第3页
Spark大数据技术与应用(第3版)(微课版)课件 项目5-10 分析水稻品种审定数据 Spark SQL结构化数据文件处理-广告检测的流量作弊识别 Spark综合实战_第4页
Spark大数据技术与应用(第3版)(微课版)课件 项目5-10 分析水稻品种审定数据 Spark SQL结构化数据文件处理-广告检测的流量作弊识别 Spark综合实战_第5页
已阅读5页,还剩278页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

分析水稻品种审定数据——SparkSQL结构化数据文件处理项目背景水稻在我国的种植历史悠久,是我国最重要的粮食作物之一。水稻品种的审定与推广关系到国家粮食安全与农业的高质量发展。随着生物技术与大数据技术的飞速发展,水稻育种已进入大数据时代,产生了海量品种审定信息数据。传统的分析方法难以高效地从这些数据中挖掘出有价值的信息,如品种的系谱关系、优势区域分布以及品种更迭趋势等。运用大数据技术对水稻品种审定数据进行深度分析和挖掘,对于指导未来育种方向、优化水稻品种布局具有重要意义。项目背景现有一份水稻品种审定数据文件ricedata.csv,记录了国内多个地区近些年来的水稻审定数据,包含了品种名称、亲本来源、类型等共7个数据字段,字段说明如下。字段名称说明序号数据记录的唯一编号品种名称通过审定的水稻品种名称亲本来源品种的杂交亲本组合,以“×”符号分隔,“×”前为母本,后为父本类型品种的生物学类型,如籼型常规稻、籼型两系杂交稻、粳型常规稻等原产地/选育单位培育该品种的机构或单位名称审定编号中华人民共和国农业农村部颁发的品种审定证书编号省份审定申请或主要审定单位所在的省份(“农业农村部”代表国家级审定)SparkSQL简介DataFrame基础操作项目实施SparkSQL简介SparkSQL基本概念配置SparkSQLSparkSQL与Shell的交互SparkSQL基本概念SparkSQL是一个用于处理结构化数据的框架,可被视为一个分布式的SQL查询引擎,提供了一个抽象的可编程数据模型DataFrame。SparkSQL框架的前身是Shark框架,由于Shark需要依赖于Hive而制约了Spark各个组件的相互集成,所以Spark团队提出了SparkSQL项目。SparkSQL借鉴了Shark的优点同时摆脱了对Hive的依赖性。相对于Shark,SparkSQL在数据兼容、性能优化、组件扩展等方面更有优势。SparkSQL在数据兼容方面的发展,使得开发人员不仅可以直接处理RDD,而且可以处理Parquet文件或JSON文件,甚至可以处理外部数据库中的数据、Hive中存在的表数据。SparkSQL的一个重要特点是能够统一处理关系表数据和RDD数据,开发人员可以轻松地使用SQL或HiveQL语句进行外部查询,也可以进行更复杂的数据分析。SparkSQL基本概念SparkSQL运行过程:配置SparkSQLSparkSQL可以兼容Hive以便在SparkSQL中访问Hive表、使用UDF(用户自定义函数)和使用Hive查询语言。从Spark2.0开始,SparkSQL将模块内原有的SQLContext和HiveContext两个入口统一为SparkSession入口,使得Hive的用户和更熟悉SQL语句的数据库管理员都能够直接使用SparkSession创建SparkSQL程序。即使没有部署好Hive,SparkSQL也可以运行。若想使用SparkSQL的方式访问并操作Hive表数据,则需要对SparkSQL进行额外配置,将SparkSQL连接至一个部署成功的Hive上。配置SparkSQL将hive-site.xml复制至所有节点的Spark安装目录的conf目录。将MySQL驱动包mysql-connector-java-8.0.30.jar复制至所有节点的Spark安装目录的lib目录。在主节点的spark-env.sh文件中配置MySQL驱动。将主节点Spark安装目录的conf目录下的perties.template文件复制并重命名为perties,之后打开perties文件修改SparkSQL运行时的日志级别,将日志等级“rootLogger.level”修改为“warn”。将主节点修改好的spark-env.sh和perties文件分发至子节点。依次启动Hadoop和Spark集群。依次启动MySQL服务和Hive的元数据服务。配置SparkSQL切换至Spark安装目录的bin目录,执行命令“./spark-sql”开启SparkSQL命令行界面,在SparkSQL命令行界面中可以直接执行HiveQL语句。例如,通过SparkSQL在Hive中创建一个students表。showdatabases;createtablestudents(idint,namestring,scoredouble,classesstring)rowformatdelimitedfieldsterminatedby'\t';SparkSQL与Shell的交互SparkSQL框架其实已经集成在spark-shell中,因此,启动spark-shell即可使用SparkSQL的Shell交互接口。SparkSQL查询数据时可以使用两个对象,即SQLContext和HiveContext,Spark2.x版本开始Spark将SQLContext和HiveContext进行整合提供一种全新的创建方式SparkSession。如果在spark-shell中执行SQL语句,那么需要使用SparkSession对象调用sql()方法。在spark-shell启动的过程中会初始化SparkSession对象为spark,此时初始化的spark对象既支持SQL语法解析器,也支持HiveQL语法解析器,即使用这个spark对象便可直接执行SQL语句和HiveQL语句。SparkSQL与Shell的交互如果是使用IntelliJIDEA软件开发SparkSQL程序,则需要在程序开头创建SparkSession对象。mportorg.apache.spark.sql.SparkSession

objectTest{defmain(args:Array[String]):Unit={valspark=SparkSession.builder()//创建SparkSession对象.appName("Test")//设置应用程序名称.master("local[*]")//设置Spark应用程序运行模式为本地模式.getOrCreate()//获取或创建SparkSession对象}}SparkSQL与Shell的交互如果需要支持Hive,还需要启用enableHiveSupport()方法,并且需要确保Hive的配置文件hive-site.xml已经存在于工程中。importorg.apache.spark.sql.SparkSession

objectTest{defmain(args:Array[String]):Unit={valspark=SparkSession.builder()//创建SparkSession对象.appName("Test")//设置应用程序名称.master("local[*]")//设置Spark应用程序运行模式为本地模式.enableHiveSupport()//启用Hive支持.getOrCreate()//获取或创建SparkSession对象}}SparkSQL简介DataFrame基础操作项目实施DataFrame基础操作SparkSQL提供了一个抽象的编程数据模型DataFrame,DataFrame是由SchemaRDD发展而来的,从Spark1.3.0开始,SchemaRDD更名为DataFrame。SchemaRDD直接继承自RDD,而DataFrame则是自身实现RDD的绝大多数功能。可以将SparkSQL的DataFrame理解为一个分布式的Row对象的数据集合,该数据集合提供了由列组成的详细模式信息。本节的任务是学习DataFrame对象的创建方法及基础的操作。通过结构化数据文件创建DataFrame一般情况下,结构化数据文件存储在HDFS中,较为常见的结构化数据文件格式是Parquet文件或JSON文件。SparkSQL可以通过load()方法将HDFS上的结构化文件转换为DataFrame,load()方法默认导入的文件格式是Parquet。读取Parquet文件创建DataFrame读取JSON文件创建DataFrame使用json()方法将JSON文件转换为DataFrame使用csv()方法将CSV文件转换为DataFramevaldfPeople=spark.read.format("json").load("/tipdm/data/SparkSQL/people.json")valdfPeople=spark.read.json("/tipdm/data/SparkSQL/people.json")valdfPeople2=spark.read.option("header","true").option("sep",";").csv("/tipdm/data/SparkSQL/people.csv")valdfUsers=spark.read.load("/tipdm/data/SparkSQL/users.parquet")通过外部数据库创建DataFrameSparkSQL还可以从外部数据库(如MySQL、Oracle数据库)中创建DataFrame,使用该方式创建DataFrame需要通过JDBC连接或ODBC连接的方式访问数据库。以MySQL数据库的表数据为例,将MySQL数据库test中的people表的数据转换成DataFrame,用户需将“user”、“password”的值修改为实际进入MySQL数据库时的账户名称和密码。valurl="jdbc:mysql://master/test"valjdbcDF=spark.read.format("jdbc").options(Map("url"->url,"user"->"root","password"->"123456","dbtable"->"people")).load()通过RDD创建DataFrame1.利用反射机制推断RDD模式

首先需要定义一个caseclass样例类,因为只有caseclass才能被Spark隐式地转换为DataFrame。例如,读取HDFS上的people.txt文件,创建RDD数据集,再将该RDD数据集转换为DataFrame。若是在IDEA软件中使用这种方式转换,需要将caseclass样例类定义的语句部分放在主函数外面,另外在使用toDF()方法前需先通过“importspark.implicit._”语句导入Spark隐式类。caseclassPerson(name:String,age:Int)valdata=sc.textFile("/tipdm/data/SparkSQL/people.txt").map(_.split(","))valpeople=data.map(p=>Person(p(0),p(1).trim.toInt)).toDF()通过RDD创建DataFrame2.采用编程指定Schema的方式将RDD转换成DataFrame加载数据创建RDD。使用StructType创建一个和第一步创建的RDD中的数据结构相匹配的Schema。通过createDataFrame()方法将Schema应用到RDD上,将RDD数据转换成DataFrame。valpeople=sc.textFile("/tipdm/data/SparkSQL/people.txt")valschemaString="nameage"importorg.apache.spark.sql.types.{StructType,StructField,StringType}valschema=StructType(schemaString.split("").map(fieldName=>StructField(fieldName,StringType,true)))importorg.apache.spark.sql.RowvalrowRDD=people.map(_.split(",")).map(p=>Row(p(0),p(1).trim))valpeopleDataFrame=spark.createDataFrame(rowRDD,schema)通过Hive中的表创建DataFrame从Hive表中的表创建DataFrame,可以使用SparkSession对象。使用SparkSession对象并调用sql()方法查询Hive中的表数据,将其转换成DataFrame,如查询test数据库中的people表数据并转换成DataFrame。spark.sql("usetest")valpeople=spark.sql("select*frompeople")查看DataFrame数据SparkDataFrame派生于RDD,因此类似于RDD,DataFrame只有在提交Action操作时才进行计算。DataFrame查看及获取数据常用的函数或方法如下:函数或方法描述printSchema()打印数据模式show()查看数据first()/head()/take()/takeAsList()获取若干行数据collect()/collectAsList()获取所有数据查看DataFrame数据以movies.dat中的电影数据为例,展示DataFrame查看数据的操作,数据包含3个字段:movieId(电影ID)、title(电影名称)和Genres(电影类型)。#定义一个样例类MoviecaseclassMovie(movieId:Int,title:String,Genres:String)#创建RDDvaldata=sc.textFile("/tipdm/data/SparkSQL/movies.dat").map(_.split("::"))#RDD转成DataFramevalmovies=data.map(m=>Movie(m(0).trim.toInt,m(1),m(2))).toDF()printSchema:打印数据模式创建DataFrame对象后,一般会查看DataFrame数据的模式。使用printSchema函数可以查看DataFrame数据模式,打印出列的名称和类型。查看DataFrame对象movies的数据模式。show():查看数据使用show()方法可以查看DataFrame数据,可输入的参数及结果说明如下:方法说明show()显示前20条记录show(numRows:Int)显示numRows条记录show(truncate:Boolean)是否最多只显示20个字符,默认为trueshow(numRows:Int,truncate:Boolean)显示numRows条记录并设置过长字符串的显示格式show():查看数据使用show()方法默认只显示前20条记录,且最多显示20个字符。使用show(false)方法可以显示所有字符。show():查看数据若需要查看前n行记录则可以使用show(numRows:Int)方法。通过“movies.show(5)”命令查看movies前5行记录。first()/head()/take()/takeAsList():获取若干行记录获取DataFrame若干行记录除了使用show()方法之外,还可以使用first()、head()、take()、takeAsList()方法,具体说明如下:方法解释first()获取第一行记录head(n:Int)获取前n行记录take(n:Int)获取前n行记录takeAsList(n:Int)获取前n行数据,并以List的形式展现first()/head()/take()/takeAsList():获取若干行记录分别使用first()、head()、take()、takeAsList()方法查看movies前几行数据记录。first()和head()方法的功能相同,以Row或Array[Row]的形式返回一行或多行数据。take()和takeAsList()方法则会将获得的数据返回Driver端。collect()/collectAsList():获取所有数据collect()方法可以查询DataFrame中所有的数据,并返回一个Array对象。collectAsList()方法和collect()方法类似,可以查询DataFrame中所有数据,但返回的是List对象。分别使用collect()和collectAsList()方法查看movies所有数据。为避免Driver发生OutOfMemoryError,数据量比较大时不建议使用。DataFrame查询操作DataFrame查询数据有两种方法:先将DataFrame注册成为临时表(视图),再通过SQL语句查询数据。直接在DataFrame对象上进行查询。#将peopleDataFrame注册成为临时表peopleDataFrame.createOrReplaceTempView("peopleTempTab")#查询年龄大于20的数据valpersonsRDD=spark.sql("selectname,agefrompeopleTempTabwhereage>20")personsRDD.collect()DataFrame查询操作DataFrame提供了很多查询数据的方法,类似于SparkRDD的Transformation操作,DataFrame的查询操作也是一个懒操作,仅仅生成一个查询计划,只有触发Action操作才会进行计算并返回结果。方法描述where()/filter()条件查询select()/selectExpr()/col()/apply()查询指定字段的数据信息limit()查询前n行记录orderBy()/sort()排序查询groupBy()分组查询join()连接查询DataFrame查询操作将用户对电影评分的数据(ratings.dat)和用户的基本信息数据(users.dat)这两份数据,上传至HDFS,创建DataFrame对象rating和user。#定义样例类RatingcaseclassRating(userId:Int,movieId:Int,rating:Int,timestamp:Long)#读取ratings.dat数据创建RDDvalratingData=sc.textFile("/tipdm/data/SparkSQL/ratings.dat").map(_.split("::"))#将ratingData转换成DataFramevalrating=ratingData.map(r=>Rating(r(0).trim.toInt,r(1).trim.toInt,r(2).trim.toInt,r(3).trim.toLong)).toDF()#定义样例类UsercaseclassUser(userId:Int,gender:String,age:Int,occupation:Int,zip:String)#读取users.dat数据创建RDDvaluserData=sc.textFile("/tipdm/data/SparkSQL/users.dat").map(_.split("::"))#将userData转换成DataFramevaluser=userData.map(u=>User(u(0).trim.toInt,u(1),u(2).trim.toInt,u(3).trim.toInt,u(4))).toDF()条件查询1.where()方法DataFrame可以使用where(conditionExpr:String)方法查询符合指定条件的数据,参数中可以使用and或or连接多个筛选条件。where()方法的返回结果仍然为DataFrame类型。查询user对象中性别为女且年龄为18的用户信息。#使用where查询user对象中性别为女且年龄为18的用户信息valuserWhere=user.where("gender='F'andage=18")#查看查询结果的前3条信息userWhere.show(3)条件查询2.filter()方法DataFrame可以使用filter()方法筛选出符合条件的数据。使用filter()方法查询user对象中性别为女且年龄为18的用户信息。#使用filter()方法查询user对象中性别为女并且年龄为18~24岁之间的用户信息valuserFilter=user.filter("gender='F'andage=18")#查看查询结果的前3条信息userFilter.show(3)查询指定字段的数据信息1.select()方法:获取指定字段值select()方法根据传入的String类型字段名获取指定字段的值,并返回一个DataFrame对象。查询user对象中userId和gender字段的数据。#使用select()方法查询user对象中userId及gender字段的数据valuserSelect=user.select("userId","gender")#查看查询结果的前3条信息userSelect.show(3)查询指定字段的数据信息2.selectExpr()方法:对指定字段进行特殊处理DataFrame提供了selectExpr()方法,可以对某个字段指定别名或调用UDF函数进行其他处理。selectExpr()方法传入string类型的参数,返回一个DataFrame对象。例如,定义一个函数replaced,若user对象中gender字段的值为“M”则替换为“0”,若gender字段的值为“F”则替换为“1”。spark.udf.register("replaced",(x:String)=>{xmatch{case"M"=>0case"F"=>1

}})查询指定字段的数据信息2.selectExpr()方法:对指定字段进行特殊处理使用selectExpr()方法查询user对象中userId、gender和age字段的数据,对gender字段使用replaced函数并取别名为sex。#使用自定义函数替换gender字段值valuserSelectExpr=user.selectExpr("userId","replaced(gender)assex","age")#查看查询结果的前3条信息userSelectExpr.show(3)查询指定字段的数据信息3.col()/apply()方法col()和apply()方法也可以获取DataFrame指定字段,但只能获取一个字段,并且返回的是一个Column类型的对象。分别使用col()和apply()方法查询user对象中zip字段的数据。limit()方法limit()方法可以获取指定DataFrame数据的前n行记录,不同于take()与head()方法,limit()方法不是Action操作,因此并不会直接返回查询结果,需要结合show()方法或其他Action操作才可以显示结果。使用limit()方法查询user对象的前3行记录,并使用show()方法显示查询结果。#查询user对象前3行记录valuserLimit=user.limit(3)#查看查询结果userLimit.show()orderBy()方法orderBy()方法是根据指定字段进行排序,默认为升序排序。若要求降序排序,可以在orderBy()方法中使用“desc("col_name")”或“$"col_name".desc”参数,其中“col_name”表示字段名称,也可以在指定字段前面加“-”表示降序排序。使用orderBy()方法根据userId字段对user对象进行降序排序并查看前3条记录。#使用orderBy()方法根据userId字段对user对象进行降序排序valuserOrderBy=user.orderBy(desc("userId"))valuserOrderBy=user.orderBy($"userId".desc)valuserOrderBy=user.orderBy(-user("userId"))#查看结果的前3条信息userOrderBy.show(3)sort()方法sort()方法可以根据指定字段对数据进行排序,用法与orderBy()方法一样。使用sort()方法根据userId字段对user对象进行升序排序。#使用sort方法根据userId字段对user对象进行升序排序valuserSort=user.sort(asc("userId"))valuserSort=user.sort($"userId".asc)valuserSort=user.sort(user("userId"))#查看查询结果的前3条信息userSort.show(3)groupBy()方法使用groupBy()方法可以根据指定字段进行分组操作。groupBy()方法的输入参数既可以传入String类型的字段名,也可以传入Column类型的对象。根据gender字段对user对象进行分组。#根据gender字段对user对象进行分组valuserGroupBy=user.groupBy("gender")valuserGroupBy=user.groupBy(user("gender"))groupBy()方法groupBy()方法返回的是一个GroupedData对象,GroupedData对象可调用的方法及解释说明如下:方法描述max(colNames:String)获取分组中指定字段或所有的数值类型字段的最大值min(colNames:String)获取分组中指定字段或所有的数值类型字段的最小值mean(colNames:String)获取分组中指定字段或所有的数值类型字段的平均值sum(colNames:String)获取分组中指定字段或所有的数值类型字段的值的和count()获取分组中的元素个数groupBy()方法根据gender字段对user对象进行分组,并计算分组中的元素个数。#根据gender字段对user对象进行分组,并计算分组中的元素个数valuserGroupByCount=user.groupBy("gender").count()userGroupByCount.show()join()方法有时候数据并不一定都是存放在同一个表中,有可能是两个或两个以上的表。DataFrame提供了join()方法用于连接两个表,使用方式如下:方法描述join(right:DataFrame)两个表做笛卡儿积join(right:DataFrame,joinExprs:Column)根据两表中相同的某个字段进行连接join(right:DataFrame,joinExprs:Column,joinType:String)根据两表中相同的某个字段进行连接并指定连接类型join()方法使用join(right:DataFrame)方法连接rating和user两个DataFrame数据。#使用join(right:DataFrame)方法连接rating和user两个DataFrame数据valdfjoin=user.join(rating)#查看前3条记录dfjoin.show(3)join()方法使用join(right:DataFrame,joinExprs:Column)方法根据userId字段连接rating和user两个DataFrame数据。#使用join(right:DataFrame,joinExprs:Column)方法根据userId字段连接rating和uservaldfJoin=user.join(rating,"userId")#查看前3条记录dfJoin.show(3)join()方法join(right:DataFrame,joinExprs:Column,joinType:String)方法可以根据多个字段连接两个表,同时指定连接类型joinType。连接类型只能是inner、outer、left_outer、right_outer、semi_join中的一种。根据userId和gender字段连接dfJoin和user的数据,并指定连接类型为left_outer。#join(right:DataFrame,joinExprs:Column,joinType:String)方法valdfJoin2=dfJoin.join(user,Seq("userId","gender"),"left_outer")#查看前3条记录dfJoin2.show(3)agg()方法agg()方法是一种聚合操作方法,该方法接受对于聚合操作的表达式参数,例如max("col_name")表示求字段col_name的最大值。agg()方法可同时对多个列进行聚合操作,且无需进行分组,常见的聚合操作包括求平均值(mean),最大值(max),最小值(min),求和(sum)和计数(count)等。使用agg()方法统计user对象DataFrame数据。drop()方法drop()方法用于去除DataFrame的某一列,该方法接受参数可以是单个或多个字符串(列名),也可以是单个或多个字段,返回一个新的DataFrame对象。drop()方法可以一次去除一列或多列,如果要去除多列,需在参数中传入多个列名。使用drop()方法去除user对象“zip”列数据。withColumn()方法withColumn()方法用于为DataFrame增加新的一列,该方法接受两个参数,第一个为DataFrame新的列名,第二个为一个表达式,该表达式定义了新列的值,最终返回一个新的DataFrame对象。使用withColumn()方法为user对象增加“newAge”列,新列的值为原有“age”列的数值加5。na()方法na()方法用于对DataFrame的具有空值的行数据进行处理,该方法不需要参数,返回对象类型为DataFrameNAFunction对象,可以选择调用drop()或者fill()方法来进一步处理空值。如果空值列占比较小,可以调用drop()方法删除有空值的行;如果空值列占比较大或超过一定比例,则可以调用fill()方法通过指定的值替换空值。使用na()和drop()方法删除dfPeople对象的空值,或使用na()和fill()方法为空值行插入新值。#na()方法查找空值,并调用drop()方法删除空值行或者用fill()方法插入新值dfPeople.show()dfPeople.na.drop().show()dfPeople.na.fill(20).show()describe()方法describe()方法用于对DataFrame的一个或多个列给出一个包含统计摘要的信息,该方法接收一个或多个列名或列对象作为参数,返回一个新的DataFrame对象。使用describe()方法操作user对象,给出“age”和“gender”列的统计摘要信息。sample()方法sample()方法用于对DataFrame类型的数据集进行采样,该方法可接收三个参数。第一个参数withReplacement表示是否重复抽样,true表示重复抽样,false表示不重复抽样。第二个参数fraction表示生成行的抽样比例,用0到1之间的double类型数值表示,最终返回一个DataFrame对象。第三个参数seed为可选参数,表示随机种子,若不指定则系统将会使用1个随机种子。sample()方法采样常用于海量数据集初期的观察和程序开发测试等用途。使用sample()方法操作user对象,形成一个采样对象。DataFrame输出操作DataFrame提供了很多输出操作的方法,其中save()方法可以将DataFrame数据保存成文件,也可以使用saveAsTable()方法将DataFrame数据保存成持久化的表。saveAsTable()方法会将DataFrame数据保存为表,并在Hive的元数据库中创建一个指针指向该表的位置。持久化的表会一直保留,即使Spark程序重启也没有影响,只要连接至同一个元数据服务即可读取表数据。读取持久化表时,只需要用表名作为参数,调用spark.table()方法即可加载表数据并创建DataFrame。默认情况下,saveAsTable()方法会创建一个内部表,表数据的位置是由元数据服务控制的。如果删除表,那么表数据也会同步删除。DataFrame输出操作将DataFrame数据保存为文件,具体步骤如下:首先创建一个Map对象,用于存储save()方法需要用到的一些数据:从user数据中选择出userId、gender和age这3列字段的数据:调用save()方法将DataFrame数据保存至copyOfUser.json文件夹中:在HDFS的/tipdm/data/SparkSQL/目录下查看保存结果。valsaveOptions=Map("header"->"true","path"->"/tipdm/data/SparkSQL/copyOfUser.json")valcopyOfUser=user.select("userId","gender","age")copyOfUser.write.format("json").mode("overwrite").options(saveOptions).save()DataFrame输出操作除了将DataFrame数据保存成文件外,也可以保存成一张表,使用saveAsTable()方法将DataFrame对象copyOfUser保存为名为copyUser的表。#获取user表的部分字段valcopyOfUser=user.select("userId","gender","age")#保存成一张表copyUsercopyOfUser.write.saveAsTable("copyUser")#查询copyUser表前5条记录spark.sql("select*fromcopyUser").show(5)SparkSQL简介DataFrame基础操作项目实施获取数据首先需要将数据ricedata.csv上传至HDFS的/tipdm/data/SparkSQL目录下。接着在IntelliJIDEA中创建一个Spark工程,导入Spark相关的开发依赖包,创建一个名为rice.scala的类文件。在rice.scala类的主函数中创建SparkSession对象,并将其命名为spark。最后读取HDFS数据创建DataFrame。探索与预处理数据获取数据之后,需要先对数据集进行初步探索,以掌握数据的基本情况。首先探索数据是否存在重复记录数。探索数据各字段中是否存在空值。//探索去重前后的数据总行数println("去重前的数据总行数:"+riceData.count())println("去重后的数据总行数:"+riceData.distinct().count())//探索数据中各字段的空值数量riceData.select(riceData.columns.map(colName=>sum(when(col(colName).isNull,1).otherwise(0)).alias(colName)):_*).show()探索与预处理数据观察原始数据可发现,“亲本来源”字段中存在一些含有“?”的数据,“审定编号”字段中也存在一些含有“/”的数据。分别探索“亲本来源”和“审定编号”字段中含有“?”和“/”的记录并统计记录数。探索与预处理数据由于该部分数据无法表示明确信息,可视为异常数据,且占比较低,因此采用直接删除异常行的方法处理。基于前面的探索结果对数据进行预处理,包括去除重复记录、删除包含空值的行和包含异常值“?”、“/”。//数据预处理,去除重复记录数、删除包含空值的行和包含异常值“?”、“/”的行valriceDataCleaned=riceData.distinct().na.drop().where(!col("亲本来源").contains("?")).where(!col("审定编号").contains("/"))println("数据预处理后的数据总行数:"+riceDataCleaned.count())统计分析数据内容统计省级以上部门审定的水稻数量。统计不同水稻类型的占比情况。统计农业部审定的水稻类型情况。riceDataCleaned.groupBy("省份").count().orderBy(-col("count")).show(10)println("水稻类型的总数量:"+riceDataCleaned.select("类型").distinct().count())riceDataCleaned.groupBy("类型").count().orderBy(-col("count")).show()riceDataCleaned.where(col("省份")==="农业部").groupBy("类型").count().orderBy(-col("count")).show()项目小结本项目介绍了SparkSQL框架,首先简述其基本概念,并讲解配置方法及与Shell的交互。接着详细介绍核心抽象编程模型DataFrame及其基础操作,包括创建对象、查询和输出操作。最后,通过使用SparkSQL对水稻信息数据进行获取、探索、预处理及针对性统计分析,加深学生对SparkSQL的理解。实时计算书籍热度——

SparkStreaming实时计算框架项目背景书籍是人类进步的阶梯,数字时代的来临,也催生出“书”的新形式,即电子书。同时,众多售书的电商平台也应运而生。电商平台想要在激烈的竞争中脱颖而出,需要更着重于改善用户体验,并增加用户的黏性,把更多更好的书推荐给读者,扩大他们的知识视野,通过优质书籍推送实现多读书、全民阅读,增强文化自信,围绕举旗帜、聚民心、育新人、兴文化、展形象建设社会主义文化强国。用户无法找到适宜的书籍时往往会相信大众的选择,购买热度较高的书籍。基于这种情况,电商平台可以根据现有书籍的评分、销量、用户的评分次数等信息构建书籍热度,将一些热度较高的书推荐给用户,进而改善用户体验,增加用户黏性,激发用户的购买欲。项目背景书籍热度的计算公式如下,其中,u表示用户的平均评分,x表示用户的评分次数,y表示书籍的平均评分,z表示书籍被评分的次数。目前已采集了某电商网站上用户对书籍的评分数据文件BookRating.txt,数据字段说明如下,其中Rating字段中评分范围为1~5分。字段名称说明UserID用户IDBookID书籍IDRating用户对书籍的评分SparkStreaming简介DStream基础操作项目实施SparkStreaming简介SparkStreaming框架SparkStreaming运行原理初步使用SparkStreamingSparkStreaming框架SparkStreaming是Spark的子框架,用于处理流式数据的分布式框架,具有可伸缩、高吞吐量、容错能力强等特点。SparkStreaming能够和SparkSQL、SparkMLlib、SparkGraphX进行无缝集成,可以从Kafka、Flume、HDFS、Kinesis等数据源中获取数据,而且不仅可以通过调用map()、reduce()、join()等方法处理数据,也可以使用机器学习算法、图算法处理数据。经SparkStreaming处理后的最终结果可以保存在文件系统(如HDFS)、数据库(如MySQL)中或使用仪表面板进行实时展示。SparkStreaming框架SparkStreaming的运行原理如右图。SparkStreaming接收实时数据流并根据一定的时间间隔将其拆分成多个小的批处理作业

t。通过SparkEngine批处理引擎处理批数据,并批量生成最终的结果

r。SparkStreaming运行原理SparkStreaming运行原理SparkStreaming的输入数据按照时间片分成一段一段的数据,时间片可称为批处理时间间隔。时间片是人为设定的数据定量标准,作为数据拆分的依据。一个时间片的数据对应一个RDD实例。按照时间片划分得到批数据后,每一段数据都转换成Spark中的RDD,再将SparkStreaming中对DStream的转换操作变为对DStream中每个RDD的转换操作,并将中间结果保存在内存中。整个流式计算根据业务需求,可对中间结果进行累加计算或存储至外部设备。DStream即离散流,是SparkStreaming对内部持续的实时数据流的抽象描述。初步使用SparkStreaming使用SparkStreaming一般需进行如下操作:创建StreamingContext对象。创建DStream输入源:SparkStreaming需指明数据源。DStream输入源包括基础来源和高级来源。操作Dstream:对于从数据源得到的DStream,用户可以在DStream的基础上进行各种操作。启动SparkStreaming:之前的步骤仅创建了执行流程,程序未真正连接数据源,也未进行数据操作,仅设定了执行计划。执行“ssc.start()”命令后,程序才进行预期操作。初步使用SparkStreaming以单词实时计数为例,从slave1节点的8888端口上接收一行或多行文本内容,并对接收到的内容根据空格进行分割,实时计算每个单词出现的次数,具体实现过程如下。在slave1节点中通过“dnflist--installed|grepnc”命令查看是否安装nc工具,若查看结果中出现“nmap-ncat.x86_64”,说明已安装nc工具。若未安装nc工具,可通过“dnfinstall-ync”命令进行安装。在master节点上依次启动Hadoop集群和Spark集群,并在spark-shell交互式窗口中编写流处理程序执行流程。importorg.apache.spark.streaming.{Seconds,StreamingContext}valssc=newStreamingContext(sc,Seconds(10))vallines=ssc.socketTextStream("slave1",8888)valwords=lines.flatMap(_.split(""))valwordCounts=words.map(x=>(x,1)).reduceByKey(_+_)wordCounts.print()在创建StreamingContext对象时将会抛出警告信息,因为SparkStreaming自Spark3.4.0起已被官方标记为“不推荐使用”。尽管Spark3.5.1目前仍支持使用SparkStreaming,但在后续版本中可能会完全弃用。初步使用SparkStreaming在slave1节点上通过“nc-l8888”命令启动nc进程进入8888端口,之后在master节点的spark-shell中执行“ssc.start()”命令启动流处理程序。在slave1节点上的8888端口中输入文本内容“IamlearningSparkStreamingnow”并回车,之后将在master节点的spark-shell中输出对文本内容进行单词计数之后的结果。程序运行完毕后,若想关闭slave1节点监听的8888端口,或想停止master节点的spark-shell中的流处理程序,均可通过【Ctrl+C】组合键关闭当前进程。初步使用SparkStreaming除了以Socket连接作为数据源读取数据之外,StreamingContext的API还提供了获取其他数据源的方法。例如,可以从HDFS中获取数据创建DStream作为输入源,使用ssc.textFileStream()方法监听HDFS的目录,一旦有新文件加入该目录,SparkStreaming将实时计算目录下文件中的单词词频。importorg.apache.spark.streaming.{Seconds,StreamingContext}valssc=newStreamingContext(sc,Seconds(10))vallines=ssc.textFileStream("/tipdm/data/sparkStreaming/temp")valwords=lines.flatMap(_.split(""))valwordCounts=words.map(x=>(x,1)).reduceByKey(_+_)wordCounts.print()ssc.start()程序运行后,打开另外一个会话端口,在HDFS中创建/tipdm/data/sparkStreaming/temp目录,分别上传文件a.txt、b.txt到该目录下(两份文件的上传时间间隔10秒以上)。SparkStreaming一旦监控到该目录下有新文件加入,便会在10秒内对文件的单词进行词频统计并输出结果。初步使用SparkStreamingSparkStreaming简介DStream基础操作项目实施DStream基础操作DStream编程模型使用DStream转换操作使用DStream窗口操作使用DStream输出操作DStream编程模型DStream为持续性的数据流,可以通过外部数据源获取DStream,也可以通过DStream的高级操作生成新的DStream。DStream代表着一系列持续的RDD,每个RDD都是按一小段时间分割开的。对DStream的任何操作都会转化成对底层RDD的操作。以单词计数为例,获取文本数据形成文本的输入数据流linesDStream。使用flatMap()方法进行扁平化操作并进行分割,得到每一个单词,形成单词的文本数据流wordsDstream。使用DStream转换操作DStream转换操作常用的方法:方法描述map(func)对源DStream的每个元素应用func函数并返回一个新的DStreamflatMap(func)类似map操作,不同的是每个元素可以被映射成0个、1个或者多个输出元素filter(func)对源DStream中的每一个元素应用func函数进行计算,如果func函数返回结果为true,则保留该元素,否则丢弃该元素,返回一个新的DStreamrepartition(numPartitions)更改DStream的分区,主要用于更改并行度,repartition()方法会作用于DStream中每个RDD,对它们重新分区并返回一个新的DStream使用DStream转换操作方法描述union(otherStream)合并两个DStream,生成一个包含两个DStream中所有元素的新的DStreamcount()统计DStream中每个RDD包含的元素的个数,得到一个只有一个元素的RDD构成的DStreamreduce(func)对源DStream中的每个元素应用func函数进行聚合操作,返回一个内部所包含的RDD只有一个元素的新DStreamcountByKey()计算DStream中每个RDD内的元素出现的频次,并返回新的DStream[(K,Long)],其中K是DStream中键的类型,Long是元素出现的频次使用DStream转换操作方法描述reduceByKey(func,[numTasks])以一个键值RDD为目标,K为键,V为值。当一个(K,V)键值对的DStream被调用时,返回(K,V)键值对的新DStream,其中每个键的值都使用聚合函数func汇总。配置numTasks可以设置不同的并行任务数join(otherStream,[numTasks])当调用的是(K,V1)和(K,V2)键值对的两个DStream时,返回元素为(K,(V1,V2))键值对的一个新DStreamcoGroup(otherStream,[numTasks])当被调用的两个DStream分别含有(K,V1)和(K,V2)键值对时,返回一个元素为(K,Seq[V1],Seq[V2])的新的DStreamtransform(func)通过对源DStream的每个RDD应用func函数返回一个新的DStream,用于在DStream上进行RDD的任意操作使用DStream转换操作将语句分割为单词的操作换用transform()方法实现,并直接输出分割后的单词。运行代码后,在slave1节点中通过“nc-l8888”命令启动nc进程进入8888端口,输入“IamlearningSparkStreamingnow”并回车,再回到spark-shell中查看运行结果。importorg.apache.spark.streaming.{Seconds,StreamingContext}valssc=newStreamingContext(sc,Seconds(10))vallines=ssc.socketTextStream("slave1",8888)valwords=lines.transform(rdd=>rdd.flatMap(_.split("")))words.print()ssc.start()使用DStream窗口操作窗口操作指的是在DStream上,将一个可配置长度的窗口,以一个可配置的速率向前移动,根据窗口操作的具体内容,对窗口内的数据执行计算操作,每次落入窗口内的RDD数据会进行合并并执行相应操作,最后生成的新RDD会作为窗口DStream的一个RDD。使用DStream窗口操作

DStream窗口操作常用的方法:方法描述window(windowLength,slideInterval)返回一个基于源DStream的窗口批次计算后得到的新DStreamcountByWindow(windowLength,slideInterval)返回基于滑动窗口的DStream中的元素的数量reduceByWindow(func,windowLength,slideInterval)基于滑动窗口对源DStream中的元素进行聚合操作,得到一个新的DStreamreduceByKeyAndWindow(func,windowLength,slideInterval,[numTasks])基于滑动窗口对元素为(K,V)键值对的DStream中的值,按K使用func函数进行聚合操作,得到一个新的DStream使用DStream窗口操作方法描述reduceByKeyAndWindow(func,invFunc,windowLength,slideInterval,[numTasks])一个更高效的reduceByKeyAndWindow()的实现版本,其中每个窗口的统计量是通过使用前一个窗口的新数据并减去离开窗口的旧数据来实现的。例如,计算t+4秒这个时刻过去5秒窗口的WordCount,可以将t+3秒时刻过去5秒的统计量加上[t+3秒,t+4秒]的统计量,再减去[t-2秒,t-1秒]的统计量。这种方法可以复用中间3秒的统计量,从而提高统计效率。countByValueAndWindow(windowLength,slideInterval,[numTasks])通过滑动窗口计算源DStream中每个RDD内每个元素出现的频次,并返回DStream[(K,Long)],其中K是RDD中元素的类型,Long是元素频次。与countByValue一样,reduce任务的数量可以通过一个可选参数进行配置使用DStream窗口操作以window,介绍DStream窗口操作的方法。运行完代码后,在slave1节点中通过“nc-l8888”命令启动nc进程进入8888端口,在监听端口每秒输入1个数字并回车,如依次输入1、2、3、4、5,再回到spark-shell中查看运行结果。importorg.apache.spark.streaming.{Seconds,StreamingContext}valssc=newStreamingContext(sc,Seconds(1))vallines=ssc.socketTextStream("slave1",8888)valwords=lines.flatMap(_.split(""))valwindowWords=words.window(Seconds(3),Seconds(2))windowWords.print()ssc.start()使用DStream窗口操作reduceByKeyAndWindow()方法类似于reduceByKey()方法,但两者的数据源不同,reduceByKeyAndWindow()方法的数据源是基于DStream窗口的。例如,将当前长度为3秒的时间窗口中的所有数据元素根据键进行合并,统计当前3秒内不同单词出现的次数。importorg.apache.spark.streaming.{Seconds,StreamingContext}valssc=newStreamingContext(sc,Seconds(1))vallines=ssc.socketTextStream("slave1",8888)valwords=lines.flatMap(_.split(""))valpairs=words.map(word=>(word,1))valwindowWords=pairs.reduceByKeyAndWindow((a:Int,b:Int)=>(a+b),Seconds(3),Seconds(2))windowWords.print()ssc.start()使用DStream输出操作DStream输出操作常用的方法:方法描述print()在Driver中输出DStream中数据的前10个元素saveAsTextFiles(prefix,[suffix])将DStream中的内容以文本的形式保存为文本文件,其中每次批处理间隔内产生的文件再单独保存为文件夹,文件夹以prefix_TIME_IN_MS[.suffix]的方式命名saveAsObjectFiles(prefix,[suffix])将DStream中的内容按对象序列化,并且以SequenceFile的格式保存。其中每次批处理间隔内产生的文件以prefix_TIME_IN_MS[.suffix]的方式命名使用DStream输出操作方法描述saveAsHadoopFiles(prefix,[suffix])将DStream中的内容以Hadoop支持的输出格式保存为Hadoop文件,其中每次批处理间隔内产生的文件以prefix_TIME_IN_MS[.suffix]的方式命名foreachRDD(func)基本的输出操作,将func函数应用于DStream中的RDD上,输出数据至外部系统,如保存RDD到文件或网络数据库等使用DStream输出操作例如,将监听端口中输入的内容保存至HDFS的/tipdm/data/saveAsTextFiles目录下,设置每秒生成一个文件夹。运行完代码后,在slave1节点中通过“nc-l8888”命令启动nc进程进入8888端口,再在HDFS的Web端中查看运行结果,即可看到/tipdm/data/saveAsTextFiles目录下将生成一系列以“sahf”为前缀,“txt”为后缀的文件夹。importorg.apache.spark.streaming.{Seconds,StreamingContext}valssc=newStreamingContext(sc,Seconds(1))vallines=ssc.socketTextStream("slave1",8888)lines.saveAsTextFiles("hdfs://master:8020/tipdm/data/saveAsTextFiles/sahf","txt")ssc.start()使用DStream输出操作使用DStream输出操作在使用foreachRDD()方法的过程中需避免以下错误:在创建连接对象时,避免在SparkDriver端创建连接对象。在Worker上创建连接对象会为每一个记录创建一个连接对象,会产生非常大的开销,且可能会显著降低系统的整体吞吐量。dstream.foreachRDD{rdd=>valconnection=create

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论