《Spark编程基础及项目实践》课件05Spark-SQL实践_第1页
《Spark编程基础及项目实践》课件05Spark-SQL实践_第2页
《Spark编程基础及项目实践》课件05Spark-SQL实践_第3页
《Spark编程基础及项目实践》课件05Spark-SQL实践_第4页
《Spark编程基础及项目实践》课件05Spark-SQL实践_第5页
已阅读5页,还剩48页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

项目5SparkSQL实践学习目标(1)(2)(5)了解SparkSQL的基本概念掌握SparkSQL与Shell交互的方法掌握DataFrame的查询及输出操作学习目标(3)掌握创建DataFrame对象的方法(4)掌握DataFrame查看数据的方法任务1初识SparkSQL5.1.1SparkSQL的前世ApacheHive是一个构建在Hadoop上的数据仓库框架,它提供数据的概要信息、查询和分析功能。Hive提供了一个类SQL的语言——HiveQL,它将对关系数据库的模式操作转换为Hadoop的map/reduce、ApacheTez和Spark执行引擎所支持的操作。当用户向Hive输入一段命令或查询时,Hive需要与Hadoop交互工作来完成该操作。该命令或查询首先进入驱动模块,由驱动模块中的编译器进行解析编译,并由优化器对该操作进行优化计算,然后交给执行器去执行,执行器通常的任务是启动一个或多个MapReduce任务。图5-1描述了用户提交一段SQL查询后,Hive把SQL语句转化成MapReduce任务执行的详细过程。前世?5.1.1SparkSQL的前世图5-1Hive中SQL查询转化成MapReduce任务的过程5.1.1SparkSQL的前世Shark即HiveonSpark,为了与Hive兼容(见图5-2),Shark在HiveQL方面重用了Hive中HiveQL的解析、逻辑执行计划翻译、执行计划优化等逻辑,可以近似认为仅将物理执行计划从MapReduce作业替换成了Spark作业,通过Hive的HiveQL解析,把HiveQL翻译成Spark上的RDD操作。图5-2Shark直接继承了Hive的各个组件5.1.1SparkSQL的前世图5-3Shark与Hive的性能比较Shark的最大特性就是运行速度快和与Hive完全兼容,且可以在Shell模式下使用rdd2sql()这样的API,把HQL得到的结果集继续在scala环境下运算,支持自己编写简单的机器学习或分析处理函数,对HQL结果进行进一步分析计算。Shark的出现使得SQL-on-Hadoop的性能比Hive有了10~100倍的提高,如图5-3所示。5.1.1SparkSQL的前世Shark的设计导致了两个问题:(1)执行计划优化完全依赖于Hive,不方便添加新的优化策略;(2)因为Spark是线程级并行,而MapReduce是进程级并行,因此,Spark在兼容Hive的实现上存在线程安全问题,导致Shark不得不使用另外一套独立维护的打了补丁的Hive源码分支。5.1.2SparkSQL架构SparkSQL架构在Shark原有架构上重写了逻辑执行计划的优化部分,解决了Shark存在的问题,如图5-4所示。SparkSQL在Hive兼容层面仅依赖HiveQL解析和Hive元数据,也就是说,从HQL被解析成抽象语法树(AST)起,就全部由SparkSQL接管了。SparkSQL执行计划生成和优化都由Catalyst(函数式关系查询优化框架)负责。5.1.2SparkSQL架构图5-4SparkSQL架构5.1.2SparkSQL架构SparkSQL增加了DataFrame(带有Schema信息的RDD),使用户可以在SparkSQL中执行SQL语句,数据既可以来自RDD,也可以是Hive、HDFS、Cassandra等外部数据源,还可以是JSON格式的数据。SparkSQL目前支持Scala、Java、Python等编程语言,支持SQL-92规范,如图5-5所示。图5-5SparkSQL支持的数据格式和编程语言5.1.3SparkSQL的优势关系数据库是建立在关系模型基础上的数据库,借助于集合代数等数学概念和方法来处理数据库中的数据,已经流行多年。由于具有规范的行和列结构,存储在关系数据库中的数据通常被称为结构化数据,用来查询和操作关系数据库的语言被称为结构化查询语言(structuredquerylanguage,SQL)。目前主流的关系数据库有Oracle、DB2、SQLServer、Sybase、MySQL等。5.1.3SparkSQL的优势关系SparkSQL的出现填补了这个空白。首先,SparkSQL可以提供DataFrameAPI,可以对内部和外部各种数据源执行各种关系操作;其次,SparkSQL可以支持大量的数据源和数据分析算法,组合使用SparkSQL和SparkMLlib,可以融合传统关系数据库的结构化数据管理能力和机器学习算法的数据处理能力,有效满足各种复杂的应用需求。任务2DataFrame基础操作5.2.1创建DataFrame对象1.从结构化数据文件创建DataFramescala>valdfUsers=sqlContext.read.load("/data/users.parquet")dfUsers:org.apache.spark.sql.DataFrame=[name:string,favorite_color:string,favorite_numbers:array<int>]通常,把结构化数据文件存储在HDFS上,SparkSQL最常见的结构化数据文件格式是Parquet文件或JSON文件。SparkSQL可以通过load()方法将HDFS上的格式化文件转换为DataFrame,load默认导入的文件格式是Parquet。将HDFS上的Parquet文件users.parquet转换为DataFrame的命令如下。5.2.1创建DataFrame对象1.从结构化数据文件创建DataFrame将JSON文件转换为DataFrame,可以使用json()方法,命令如下。scala>valdfPeople=sqlContext.read.json("/data/people.json")dfPeople:org.apache.spark.sql.DataFrame=[age:bigint,name:string]将JSON文件转换为DataFrame,还可以使用format()方法,命令如下。scala>valdfPeople=sqlContext.read.format("json").load("/data/people.json")dfPeople:org.apache.spark.sql.DataFrame=[age:bigint,name:string]5.2.1创建DataFrame对象2.从外部数据库创建DataFrameSparkSQL还可以从外部数据库中创建DataFrame,使用这种方式创建DataFrame需要通过JDBC或ODBC连接的方式访问数据库。例如,将MySQL数据库test中的people表的数据转换成DataFrame,代码如下。scala>valurl="jdbc:mysql://32/test"url:String=jdbc:mysql://32/testscala>valjdbcDF=sqlContext.read.format("jdbc").options(|Map("url"->url,|"user"->"root",|"passwd"->"",|"dbtable"->"people")).load()jdbcDF:org.apache.spark.sql.DataFrame=[name:string,age:int]5.2.1创建DataFrame对象3.从RDD创建DataFrame将RDD数据转化为DataFrame有两种方式。第一种方式是利用反射机制推断RDD模式,使用这种方式首先需要定义一个caseclass,只有caseclass才能被Spark隐式地转换为DataFrame。使用SparkContext读取HDFS上的people.txt文件,得到一个RDD数据集,将该RDD数据集转换成DataFrame的命令如下。scala>caseclassPerson(name:String,age:Int)definedclassPersonscala>valdata=sc.textFile("/data/people.txt").map(_.split(","))data:org.apache.spark.rdd.RDD[Array[String]]=MapPartitionsRDD[2]atmapat<console>:24scala>valpeople=data.map(p=>Person(p(0),p(1).trim.toInt)).toDF()people:org.apache.spark.sql.DataFrame=[name:string,age:int]5.2.1创建DataFrame对象3.从RDD创建DataFrame通过编程指定Schema需要以下3步。从原来的RDD创建一个元组或列表的RDD。(1)用StructType创建一个和步骤(1)中创建的RDD元组或列表的结构相匹配的Schema。通过sqlContext提供的createDataFrame方法将Schema应用到RDD上。(2)(3)5.2.1创建DataFrame对象4.从Hive中的表创建DataFrame从Hive中的表创建DataFrame,可以先声明一个HiveContext对象,然后使用该对象查询Hive中的表并转换成DataFrame,命令如下。scala>hiveContext.sql("usetest")res15:org.apache.spark.sql.DataFrame=[result:string]scala>valpeople=hiveContext.sql("select*frompeople")peopleDataFrame:org.apache.spark.sql.DataFrame=[name:string,age:string]5.2.2DataFrame查看数据SparkDataFrame派生于RDD类,因此类似于RDD。DataFrame只有在提交Action操作时才会进行计算。DataFrame提供了很多查看及获取数据的操作函数,常用的函数或方法见表5-1。表5-1SparkDataFrame常用操作函数或方法5.2.2DataFrame查看数据01printSchema:打印数据模式在创建完DataFrame之后,可以查看DataFrame中数据的模式。查看数据模式可以通过printSchema()函数来查看,它会打印出列的名称和类型。查看DataFrame对象stocks的数据模式命令及结果如下。scala>stocks.printSchemaroot|--Ordernumber:string(nullable=true)|--Itemid:string(nullable=true)|--Price:double(nullable=true)|--Amount:double(nullable=true)5.2.2DataFrame查看数据02show:查看数据打印完模式之后,还需要查看加载进DataFrame里面的数据是否正确,从创建的DataFrame里面采样数据的方法有很多种,其中最简单的是使用show()方法。show()方法有4个版本,如表5-2所示。表5-2show()方法的4个版本5.2.2DataFrame查看数据02show:查看数据下面使用show()方法查看DataFrame对象stocks中的数据。show()方法与show(true)方法相同,只显示前20条记录,并且最多只显示20个字符,如图5-6所示。需要注意的是,图中只截取结果前5条记录(结果有20条记录)。图5-6使用show()方法查看stocks中的数据5.2.2DataFrame查看数据02show:查看数据show()方法默认只显示前20行记录。若想查看前n行记录,则可以使用show(numRows:Int)方法。例如,查看movies前5行记录,查看结果如图5-7所示。图5-7查看stocks前5行记录5.2.2DataFrame查看数据03first/head/take/takeAsList:获取若干行记录获取DataFrame若干行记录除了使用show()方法之外,还可以使用first()、head()、take()、takeAsList()等方法,如表5-3所示。表5-3DataFrame获取若干行记录的方法5.2.2DataFrame查看数据03first/head/take/takeAsList:获取若干行记录first()和head()方法功能相同,以Row或Array[Row]的形式返回一行或多行数据。take()和takeAsList()方法将获得的数据返回到Driver端,为避免Driver发生OutofMemoryError,使用这两个方法时需要注意数据量。这4个方法的使用如图5-8所示。图5-8first、head、take、takeAsList的使用方法5.2.2DataFrame查看数据04collect/collectAsList:获取所有数据不同于show()方法,collect()方法可以将DataFrame中的所有数据都获取到并返回一个Array对象,而collectAsList()方法可以获取所有数据到List,其功能和collect()方法类似,只不过返回的结构变成List对象。collect()和collectAsList()方法的用法如图5-9和图5-10所示。图5-9collect()方法的使用5.2.2DataFrame查看数据04collect/collectAsList:获取所有数据图5-10collectAsList()方法的使用5.2.3DataFrame查询操作表5-4DataFrame常用的查询方法DataFrame提供了很多查询的方法,类似于SparkRDD的Transformation操作,DataFrame的查询操作也是一个懒操作,仅仅生成一个查询计划,只有触发Action操作才会进行计算并返回查询结果。表5-4所示是DataFrame常用的查询方法。5.2.3DataFrame查询操作图5-11where()查询1)where可以使用where(conditionExpr.String)根据指定条件进行查询,参数中可以使用and或or,该方法的返回结果仍然为DataFrame类型。如图5-11所示,查询stocks对象中订单号为BYSL00000897且价格为198的商品信息。1.条件查询5.2.3DataFrame查询操作图5-12filter()查询2)filterDataFrame还可以使用filter筛选符合条件的数据,filter和where的使用方法一样。如图5-12所示,查询stocks对象中订单号为BYSL00000897且价格大于198的商品信息。1.条件查询5.2.3DataFrame查询操作图5-13select()查询2.查询指定字段的数据信息1)select():获取指定字段值select()方法根据传入的String类型字段名获取指定字段的值,以DataFrame类型返回。图5-13所示为查询stocks对象中Itemid及Price字段的信息。5.2.3DataFrame查询操作图5-14selectExpr()查询2.查询指定字段的数据信息selectExpr():对指定字段进行特殊处理例如,查询stocks对象中的Itemid、Price及Amount字段,其中要求对Amount字段取别名为Total。具体操作命令如图5-14所示。2)5.2.3DataFrame查询操作图5-15col()/apply()获取指定字段2.查询指定字段的数据信息3)col()/apply()col()/apply()也可以获取DataFrame的指定字段,但是只能获取一个字段,并且返回对象为Column类型。col()/apply()的用法如图5-15所示。5.2.3DataFrame查询操作图5-16limit()查询limit()方法获取指定DataFrame的前n行记录,得到一个新的DataFrame对象。不同于take()与head()方法,limit()方法不是Action操作。图5-16所示为查询stocks对象的前5行记录。3.limit()5.2.3DataFrame查询操作4.orderBy()/sort()sort()方法与orderBy()方法一样,也是根据指定字段排序,用法也与orderBy一样。下面使用sort()方法根据Price字段对stocks对象进行升序排序,如图5-18所示。orderBy()方法是根据指定字段排序,默认为升序排列。若要求降序排列,可以使用desc("字段名称")或$"字段名".desc,也可以在指定字段前面加“-”。图5-17所示是使用orderBy()方法根据Ordernumber字段对stocks对象进行降序排序。5.2.3DataFrame查询操作4.orderBy()/sort()图5-17orderBy()查询图5-18sort()排序查询5.2.3DataFrame查询操作5.groupBy()图5-19groupBy()分组查询groupBy()方法是根据字段进行分组操作,groupBy()方法有两种调试方式,可以传入String类型的字段名,也可以传入Column类型的对象。图5-19所示为根据Itemid字段对stocks对象进行分组。5.2.3DataFrame查询操作5.groupBy()图5-19groupBy()分组查询5.2.3DataFrame查询操作5.groupBy()表5-5GroupedData的常用方法groupBy()方法返回的是GroupedData对象,GroupedData的常用方法见表5-5。5.2.3DataFrame查询操作5.groupBy()图5-20GroupedData使用方法示例表5-5中的方法都可以用在groupBy()方法之后,如图5-20所示,按照Itemid字段对stocks对象进行分组,然后计算分组中的元素个数。5.2.3DataFrame查询操作6.join()表5-6常用join()方法有时根据业务需求,需要连接两个表才可以查出业务所需的结果。DataFrame提供了3种join()方法用于连接两个表,如表5-6所示。5.2.3DataFrame查询操作6.join()图5-21join(right:DataFramejoinExprs:Column)操作使用join(right:DataFramejoinExprs:Column)方法,根据Ordernumber字段连接stocks和orders对象,如图5-21所示。5.2.4DataFrame输出操作查看DataFrameAPI(/docs/1.3.1/api/scala/index.html#org.apache.spark.sql.DataFrame),发现DataFrame中提供了很多种输出操作方法。其中,save方法可以将DataFrame保存成文件,save操作有一个可选参数SaveMode,用这个参数可以指定如何处理数据已经存在的情况。持久化的表会一直保留,即使Spark程序重启也没有影响,只要连接到同一个Metastore就可以读取其数据。读取持久化表时,只需要用表名作为参数,调用SQLContext.table方法即可得到对应的DataFrame。5.2.4DataFrame输出操作将DataFrame保存到一个文件里面的具体操作包括以下几步。(1)首先创建一个Map对象,用于存储save()函数需要用到的一些数据,这里将指定保存文件路径及JSON文件的头信息,如图5-22所示。图5-22创建Map对象(2)从DataFrame对象中选择Itemid、Price和Amount这3列,如图5-23所示。图5-23创建copyOFUser对象5.2.4DataFrame输出操作将DataFrame保存到一个文件里面的具体操作包括以下几步。(3)用save()函数保存(2)中的DataFrame数据到copyOfStocks.json文件夹中,如图5-24所示。图5-24调用save()函数mode()函数可以接收的参数有Overwrite、Append、Ignore和ErrorIfExists。从名字就可以很好地理解,Overwrite代表覆盖目录中之前存在的数据,Append代表给指定目录下追加数据,Ignore代表目录下已经有文件,那就什么都不执行,ErrorIfExists代表如果保存目录下已经存在文件,那么抛出相应的异常。小结本项目首先介绍了SparkSQL的基本概念,接着详细介绍了SparkSQL的核心抽象编程模型DataFrame,包括创建DataFrame对象、Da

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论