Spark大数据分析与实战(Python+PySpark)课件 第3章 SparkSQL离线数据处理_第1页
Spark大数据分析与实战(Python+PySpark)课件 第3章 SparkSQL离线数据处理_第2页
Spark大数据分析与实战(Python+PySpark)课件 第3章 SparkSQL离线数据处理_第3页
Spark大数据分析与实战(Python+PySpark)课件 第3章 SparkSQL离线数据处理_第4页
Spark大数据分析与实战(Python+PySpark)课件 第3章 SparkSQL离线数据处理_第5页
已阅读5页,还剩78页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

第3章SparkSQL离线数据处理Spark大数据分析与实战01DataFrame基本原理SparkSQL常用操作0203SparkSQL数据处理实例目录CONTENTS04SparkSQL访问数据库05DataFrame创建和保存06Spark的数据类型转换01DataFrame基本原理DataFrame基本原理DataFrame基本原理博世,10,2500西门子,10,3500普莱西,15,2500RDD[String]DataFrametextFileloadDataFrame(数据表)=Schema(表结构字段)+Data(数据行)DataFrame类似传统数据库的二维表,它包含数据所需的结构细节,支持从多种数据源中创建,包括结构化文件、外部数据库、Hive表等。因此,我们可以简单地将DataFrame理解为一张数据表DataFrame基本原理除了DataFrame,Spark还有一种名为DataSet的数据类型,它不仅与DataFrame一样包含数据和字段结构信息,还结合了RDD和DataFrame的优点,既支持很多类似RDD的功能方法,又支持DataFrame的所有操作,使用起来很像RDD,但执行效率和空间资源利用率都要比RDD高,可用来方便地处理结构化和非结构化数据当DataSet存储的元素是Row类型的对象时,它就等同于DataFrame。所以,DataFrame其实是一种特殊的、功能稍弱一点的DataSet的特例。DataSet需要通过构造JVM对象才能使用,目前仅支持Scala和Java两种原生的编程语言02SparkSQL常用操作DataFrame的基本创建DataFrame的数据操作(DSL)DataFrame的数据操作(SQL)DataFrame的基本创建1.从Python集合创建DataFrame使用Python集合创建DataFrame,只需调用createDataFrame()方法就可以得到一个DataFrame对象,这里用到了Spark的自动类型推断机制,以确定各字段的数据类型DataFrame的基本创建1.从Python集合创建DataFrame除了使用createDataFrame()方法的自动类型推断机制确定字段的数据类型,还可以通过一个类似SQL表结构定义的字符串来设定字段的数据类型DataFrame的基本创建1.从Python集合创建DataFrameDataFrame的schema字段结构信息还能够通过代码来实现,字段结构通过StructType设定,字段信息StructField的3个参数分别代表字段名、字段类型以及字段是否允许为空DataFrame的基本创建2.从csv文件创建DataFrameDataFrame的查看数据准备1.DataFrame的属性和查看操作2.查看DataFrame结构信息DataFrame的查看2.查看DataFrame结构信息DataFrame的查看这里的show()方法可以根据需要使用,包括设定显示的数据行数,以及当字段内容超长时是否截断显示(可避免输出的排版出现错乱)等3.查看DataFrame的数据DataFrame的查看上面获取到的DataFrame数据行返回的都是Row对象或Row对象数组,每个Row对象的属性值就是对应数据行的字段内容3.查看DataFrame的数据(foreach/foreachPartition)DataFrame的查看lambda表达式是对每一行数据进行处理,具体处理过程定义在myprocess()函数中。如果要将数据写入数据库,应考虑使用foreachPartition算子,因为它是针对一个分区的批量数据创建数据库连接的,可以避免造成处理效率低下的问题(foreach对每条数据都要创建一个数据库连接,使用完还要断开)DataFrame的数据操作(DSL)条件排序分组查询1.DataFrame的数据查询操作DataFrame的数据操作(DSL)2.where和filter数据过滤过滤条件where()和filter()方法需要提供一个过滤条件的字段参数,该参数有两种形式:一种是形如'age>18'的字符串;另一种是使用df.age>18或df['age']>18的Column对象的形式DataFrame的数据操作(DSL)3.sort和orderBy数据排序如果是默认的按升序排列,则相当于指定了参数ascending=True,Python的True和False值分别与1和0等价,所以在指定升序或降序时也可以写成ascending=1或ascending=0DataFrame的数据操作(DSL)4.groupBy数据分组groupBy就是对DataFrame的数据行依据某种准则进行分组归类,返回的是GroupedData类型的数据,groupBy()方法通常要与聚合函数一起使用,常用的聚合函数包括:count()统计数量、mean()/avg()统计字段的平均值、max()/min()统计字段的最大值和最小值、sum()统计字段的累加和,等等DataFrame的数据操作(DSL)5.select和selectExpr数据查询select和selectExpr算子用于从DataFrame数据集中查询指定的字段,返回一个新的DataFrame数据集selectExpr算子还支持对指定字段进行处理,如取绝对值、四舍五入等,也就是它可以与表达式一起使用DataFrame的数据操作(DSL)6.DataFrame的数据处理操作DataFrame的数据操作(DSL)7.distinct和dropDuplicates去重distinct()要求数据行的每个字段都相同才会删除,dropDuplicates是按指定的字段相同就删除DataFrame的数据操作(DSL)8.dropna和drop按列删除如果DataFrame数据集中存在不完整的数据行,比如某些行缺失部分字段内容,此时就可以通过dropna()方法来过滤这种数据行,或者根据需要“剪掉”某些字段,这也是ETL数据清洗工作中常见的一种做法。但值得一提的是,原始的DataFrame数据集并不会发生变化DataFrame的数据操作(DSL)9.limit限定数据limit()方法返回一个新的DataFrame,通过这种方式减少了后续处理的数据量,这一点与show()方法是不同的,show()方法只显示指定数量的记录,并不影响实际的数据行数应注意的是,limit()方法返回了一个新的DataFrame,原始DataFrame不受影响DataFrame的数据操作(DSL)10.withColumn和withColumnRenamed按列处理withColumn在当前DataFrame添加一列或替换同名列。如果在新增或替换时还要修改字段的值,则可以对它进行简单的运算或借助函数进行修改withColumnRenamed则用于修改现有列的字段名,列数据保持不变DataFrame的数据操作(DSL)11.intersectAll交集与unionByName并集intersectAll用来获取两个DataFrame的交集,得到的是两者按字段顺序的相同数据行(只看字段位置而不看字段名,且交集可能存在重复数据行)unionByName用来获取两个DataFrame的并集,得到的是将两者数据行按字段名合并到一起的记录DataFrame的数据操作(DSL)12.join连接处理join算子用于连接两个DataFrame的数据行,包括内连接、左外连接、右外连接、全外连接方式等,默认采用内连接(inner)的方式处理DataFrame的数据操作(SQL)1.DataFrame数据集的创建DataFrame临时视图表又分为“局部会话范围”和“全局会话范围”两种形式,前者仅限当前的SparkSession会话使用,后者可在当前Spark应用程序的所有SparkSession实例中访问到。在使用全局视图表时,视图名称需要附带一个“global_temp.”前缀,这是因为全局视图是绑定到global_temp系统保留数据库上的无论是哪一类视图,它们在使用完毕后就会被自动删除,这也是被称为“临时视图”的原因,其与在关系数据库中创建视图的做法不一样DataFrame的数据操作(SQL)websiteaccess_log2.DataFrame数据集的准备使用createTempView()和createGlobalTempView()方法创建的视图,其名称不能被重复利用,必须先删除才行,而使用createOrReplaceTempView()和createOrReplaceGlobalTempView()方法创建的视图,其名称可以被直接被替换使用,相对更加方便一点DataFrame的数据操作(SQL)website3.DataFrame视图的SQL查询(查询指定的字段值、无重复的字段值)SQL基本查询包括SELECT、SELECTDISTINCT、WHERE、ORDERBY等。如果熟悉SQL语法,这部分内容与数据库的SQL语句基本是一样的DataFrame的数据操作(SQL)4.DataFrame视图的SQL查询(按条件查询)website条件查询是通过WHERE子句实现的,条件表达式可以与AND、OR、NOT组合使用DataFrame的数据操作(SQL)4.DataFrame视图的SQL查询(按条件查询)websiteDataFrame的数据操作(SQL)website5.DataFrame视图的SQL查询(查询结果排序)ORDERBY默认按升序排列,如果要按降序排列,则可在排序字段后面增加DESC关键字(升序为ASC关键字,可省略)DataFrame的数据操作(SQL)5.DataFrame视图的SQL查询(查询结果排序)website这个例子是先按照country字段降序排列,再按照alexa字段升序排列DataFrame的数据操作(SQL)website6.DataFrame视图的SQL查询(查询指定数量的记录)SELECT-LIMIT可以设定查询返回的记录数,这对于拥有大量记录的数据集来说比较有用DataFrame的数据操作(SQL)website7.DataFrame视图的SQL查询(模糊查询)模糊查询是指字段值是否符合某种匹配模式,而不是精确匹配。SQL中的LIKE操作符可用于模糊匹配WHERE字段列的内容DataFrame的数据操作(SQL)7.DataFrame视图的SQL查询(模糊查询)websiteSQL查询在执行模糊匹配时,百分号(%)代表0或多个字符,下划线(_)代表一个字符DataFrame的数据操作(SQL)8.DataFrame视图的SQL查询(in/between…and范围)websiteSQL的IN操作符允许在WHERE子句中规定多个值,BETWEEN操作符用于选取介于两个值之间的数据范围内的值DataFrame的数据操作(SQL)8.DataFrame视图的SQL查询(in/between…and范围)websiteDataFrame的数据操作(SQL)access_log8.DataFrame视图的SQL查询(in/between…and范围)这里查询的date字段值介于'2016-05-10'和'2016-05-14'(不含'2016-05-14')之间DataFrame的数据操作(SQL)websiteaccess_log9.DataFrame视图的SQL查询(连接查询)SQL连接查询使用JOIN操作符,它按照某些字段将来自两张或两张以上数据表的数据行结合起来,包括INNERJOIN、LEFTJOIN、RIGHTJOIN等。其中,最常见的INNERJOIN用于从多张数据表中返回满足条件的所有行DataFrame的数据操作(SQL)10.DataFrame视图的SQL查询(嵌套查询/子查询)websiteaccess_log这里“SELECTsite_idFROMaccess_logWHEREcount>200”得到的是仅有一个字段的临时表,然后使用IN关键字将其转换为一个查询条件,从而得到二次查询的结果DataFrame的数据操作(SQL)access_log11.DataFrame视图的SQL查询(聚合查询)SparkSQL还实现了常用的聚合函数功能,包括COUNT()、AVG()、MAX()、MIN()等DataFrame的数据操作(SQL)access_log12.DataFrame视图的SQL查询(分组统计)在SparkSQL中,分组统计是通过关键字GROUPBY来实现的,分组统计通常要结合聚合函数一起使用DataFrame的数据操作(SQL)13.DataFrame视图的SQL查询(用户自定义函数UDF)SparkSQL内置的pyspark.sql.functions模块中包含大量可直接使用的数据处理函数,包括标量值函数、聚合函数等,以支持对DataFrame的行或列的数据进行相应处理用户自定义函数(UserDefinedFunction,UDF),是指通过Spark支持的编程语言定义一个函数传递给SparkSQL,使用起来就像内置的sum()、avg()等函数一样。在用户自定义函数中,可以灵活运用编程语言本身提供的各种函数、方法、库等以实现所需功能,从而不受SparkSQL本身的限制DataFrame的数据操作(SQL)website13.DataFrame视图的SQL查询(用户自定义函数UDF)DataFrame的数据操作(SQL)df1=>website用户自定义函数不仅可以应用在SQL中,而且对DSL形式的API操作也是支持的13.DataFrame视图的SQL查询(用户自定义函数UDF)03Spark数据处理实例词频统计案例人口信息统计电影评分数据分析词频统计案例1.需求分析这个文件的数据是非结构化的,每行的单词个数是不固定的,也没有具体的含义。为了使用SparkSQL来处理它,第1步工作就是要将这个文件的数据转换成结构化的形式,由于我们真正关注的是各个单词,因此可以像以往那样将文件数据转换为RDD,然后经过一定的处理后将其转变为DataFrame,这样就可以由SparkSQL来处理使用SparkSQL,统计LICENSE-py4j.txt文件中所包含单词的出现次数词频统计案例2.SparkSQL编程实现首先将文件数据转换为rdd1,由于它是非结构化的数据,因此同样需要把每行包含的单词切解出来。为了能使用SparkSQL进行处理,这里还把每个单词变成一个结构化的Row对象(代表一条表数据记录),并将RDD转换为DataFrame,后续就是常规的SparkSQL操作了使用SparkSQL进行词频统计的过程,比直接通过RDD解决显得更加麻烦,这里的目的主要是提供一种使用SparkSQL处理非结构化数据的思路。通常来说,在面对非结构化数据时,一般要先将其转换成结构化的形式才能由SparkSQL来处理人口信息统计1.需求分析有一个包含600万人口信息的数据存储在当前主目录的people_info.csv文本文件中,其中,每行数据代表一个人的基本信息,3个字段分别是编号、性别(F/M)、身高(单位是cm),现要求使用SparkSQL完成以下数据分析任务:(1)统计男性身高超过170cm以及女性身高超过165cm的总人数。(2)按照性别分组统计男性和女性人数。(3)统计身高大于210cm的前50名男性,并按身高从大到小排序。(4)统计男性的平均身高。(5)统计女性身高的最大值。编号ID、性别(F女/M男)、身高(cm)人口信息统计people_info(1)从csv数据文件构造出DataFrame数据表2.SparkSQL编程实现人口信息统计people_info(2)统计男性身高超过170cm以及女性身高超过165cm的总人数第1个问题,即按照要求的条件统计人数,这需要用到SQL的count()统计函数人口信息统计people_info(3)按照性别分组统计男女人数第2个问题实际是一个分组统计操作,可以先通过GROUPBY按指定字段进行分组,然后将分组后的字段应用count()、avg()、max()等聚合函数人口信息统计people_info(4)统计身高大于210cm的前50名男性,从大到小排序第3个问题是统计身高大于210cm的前50名男性,并按身高从大到小排序,因此需要使用ORDERBY和LIMIT人口信息统计people_info(5)统计男性的平均身高第4个问题是统计男性的平均身高,需要使用avg()聚合函数人口信息统计people_info第5个问题是统计女性身高的最大值,这个问实现起来题也比较简单(6)统计女性身高的最大值电影评分数据分析字段:movieIdtitlegenres

电影id名称类别字段:userIdmovieIdratingtimestamp

用户id电影id用户评分

时间戳1.需求分析MovieLens是一个关于电影评分的公开数据集,里面包含了大量的IMDB(InternetMovieDataBase)用户对电影的评分信息,所以也经常被用来做推荐系统、机器学习算法的测试数据集。MovieLens数据集的movies.csv和ratings.csv文件中分别存放了电影信息、电影的用户评分数据现使用SparkSQL完成以下数据分析任务:(1)查找电影评分次数排名前10的电影。(2)查找电影评分次数超过5000次,且平均评分排名前10的电影及对应的平均评分。电影评分数据分析ratingsmovies(1)从csv数据文件分别构造出两个DataFrame数据表2.SparkSQL编程实现电影评分数据分析(1)从csv数据文件分别构造出两个DataFrame数据表2.SparkSQL编程实现电影评分数据分析max10_ratingsratingsa)先按movieId字段对电影进行分组,将每个电影对应的评分用户个数统计出来b)按用户数进行降序排列,以获取前10部电影c)把查询出来的数据集注册为max10_ratings视图表(2)查找评分次数最多的前10部电影电影评分数据分析movies根据找到的movieId查询出电影的具体信息(2)查找评分次数最多的前10部电影max10_ratings电影评分数据分析ratings(2)查找评分次数最多的前10部电影movies第二种方法,就是将第一种方法的两步合并到一起处理电影评分数据分析moviesratings基本思路:a)求每部电影的评分用户数b)求每部电影的平均评分c)求评分次数(用户数)超过5000且平均评分前10的movieIde)根据movieId获取电影信息(3)查找电影评分次数超过5000,且平均评分最高的前10部电影名称,以及对应的平均评分电影评分数据分析moviesratings(3)查找电影评分次数超过5000,且平均评分最高的前10部电影名称,以及对应的平均评分04SparkSQL访问数据库在Linux操作系统上安装MySQLDataFrame写入MySQL从MySQL中创建DataFrameLinux上安装MySQL更新安装源安装mysql查看mysql服务状态在Ubuntu环境下安装MySQL,可以通过软件源仓库在线安装,也可以自行下载合适的离线版本安装Linux上安装MySQL安装完毕,在使用之前先初始化mysql登录,设置好登录密码(登录账户为mysql中的root用户)Linux上安装MySQLpeople数据库people_info表准备要用的数据库和表,先连接到数据库服务,然后创建一个people数据库以及people_info表,并在表中添加一条测试数据Linux上安装MySQL由于SparkSQL连接MySQL在底层仍是通过Java实现的,因此还需将连接JDBC的jar包文件复制到Spark安装目录的jars文件夹中注意:当在Spark的安装目录新增了一个连接MySQL的JDBC驱动文件,为了使其生效,还应将当前正在运行的PySparkShell交互式编程环境退出(按Ctrl+D快捷键),重新执行pyspark命令进入PySparkShell交互式编程环境DataFrame写入MySQLmy_people_info使用前面人口信息统计案例中的数据,准备将其中的10条记录保存到MySQL中。首先创建一个DataFrameDataFrame写入MySQLpeoplepeople_infomy_people_info前10条从创建的DataFrame中查询10条数据行保存到MySQL数据库的people_info表中DataFrame写入MySQL验证一下mysql表中是否有来自DataFrame的数据从MySQL创建DataFrame从数据库表中读取数据到DataFrame,可以用spark.read.jdbc()方法来实现,在读取时需要提供连接数据库所需的信息(如用户名、密码、数据库驱动类名等),返回的是一个DataFrame对象05DataFrame创建

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论