《Spark编程基础(Scala版第2版)》课件 第6-9章 Spark SQL-Spark编程基础- Spark MLlib-Spark编程基础_第1页
《Spark编程基础(Scala版第2版)》课件 第6-9章 Spark SQL-Spark编程基础- Spark MLlib-Spark编程基础_第2页
《Spark编程基础(Scala版第2版)》课件 第6-9章 Spark SQL-Spark编程基础- Spark MLlib-Spark编程基础_第3页
《Spark编程基础(Scala版第2版)》课件 第6-9章 Spark SQL-Spark编程基础- Spark MLlib-Spark编程基础_第4页
《Spark编程基础(Scala版第2版)》课件 第6-9章 Spark SQL-Spark编程基础- Spark MLlib-Spark编程基础_第5页
已阅读5页,还剩723页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

SparkSQL第6章目

录01SparkSQL简介02结构化数据DataFrame03DataFrame的创建和保存04DataFrame的常用操作05从RDD转换得到DataFrame06使用SparkSQL读写数据库07DataSetSparkSQL简介6.1SparkSQL简介从Shark说起0102SparkSQL设计03为什么推出SparkSQLSparkSQL特点04编程实例056.1.1从Shark说起ClientMetastoreJDBCDriverSQLParserQueryOptimizerPhysicalplanExecutionMapReduceHDFSCLIHive:SQL-on-Hadoop6.1.1从Shark说起输入ParserSemanticAnalyzerLogicalPlanGeneratorLogicalOptimizerPhysicalPlanGeneratorPhysicalOptimizer将SQL转换成抽象语法树将抽象语法树转换成查询块将查询块转换成逻辑查询计划重写逻辑查询计划将逻辑查询计划转成物理计划选择最佳的优化查询策略输出6.1.1从Shark说起Shark即HiveonSpark,为了实现与Hive兼容,Shark在HiveQL方面重用了Hive中HiveQL的解析、逻辑执行计划翻译、执行计划优化等逻辑,可以近似认为仅将物理执行计划从MapReduce作业替换成了Spark作业,通过Hive的HiveQL解析,把HiveQL翻译成Spark上的RDD操作6.1.1从Shark说起Shark使SQL-on-Hadoop性能比Hive提高了10-100倍6.1.1从Shark说起1Shark导致的两个问题???完全依赖执行计划优化不方便添加新的优化策略6.1.1从Shark说起Spark线程级并行MapReduce进程级并行Shark导致的两个问题2???6.1.1从Shark说起线程安全问题Spark兼容2???Shark导致的两个问题6.1.1从Shark说起停止对Shark的开发2014年6月1日6.1.1从Shark说起AnewSQLenginedesignedFromground-upforSparkSparkSQLSharkDevelopmentending;TransitioningtoSparkSQLHelpexisitingHiveuseMigratetoSparkHiveonSparkHive又能够支持MapReduce6.1.1从Shark说起SparkSQL作为Spark生态的一员不再受限于Hive,只是兼容Hive01HiveonSpark是一个Hive发展计划,该计划将Spark作为Hive的底层引擎之一,Hive将不再受限于一个引擎,可以采用Map-Reduce、Tez、Spark等引擎026.1.2SparkSQL架构ClientMetastoreJDBCDriverSQLParserQueryOptimizerPhysicalplanExecutionMapReduceHDFSCLIHive:SQL-on-HadoopSparkSQL在Hive兼容层面仅依赖HiveQL解析、Hive元数据,从HQL被解析成抽象语法树(AST)起,就全部由SparkSQL接管了。SparkSQL执行计划生成和优化都由Catalyst(函数式关系查询优化框架)负责SparkSQL支持的数据格式和编程语言6.1.2SparkSQL架构6.1.3为什么推出SparkSQL6.1.3为什么推出SparkSQL6.1.3为什么推出SparkSQL关系数据库存储一部分结构化数据6.1.3为什么推出SparkSQL结构化数据非结构化数据大数据10%90%存储在关系数据库中与人类信息密切相关6.1.3为什么推出SparkSQL结构化数据非结构化数据半结构化数据关系数据库在大数据时代已经不能满足要求不能用SQL语句6.1.3为什么推出SparkSQL需要执行高级分析机器学习算法决策树数据分析6.1.3为什么推出SparkSQL传统关系数据库数据分析SQL机器学习算法分析6.1.3为什么推出SparkSQL图像处理机器学习关系数据库没办法处理6.1.3为什么推出SparkSQL关系查询复杂分析算法能够处理结构化、半结构化和非结构化数据6.1.3为什么推出SparkSQLSQL非结构化数据半结构化数据构建DataFrame6.1.3为什么推出SparkSQLSQL非结构化数据半结构化数据构建DataFrame文本类型进行解析相关查询DataFrame关系型表格6.1.3为什么推出SparkSQL文本类型进行解析相关查询关系型表格能够融合非结构化数据分析方便调用6.1.3为什么推出SparkSQL关系数据查询复杂分析算法SQL6.1.3为什么推出SparkSQL机器学习算法的数据处理能力关系数据库的结构化数据管理能力融合SparkSQL6.1.4

SparkSQL的特点SparkSQL可将SQL查询和Spark程序无缝集成允许使用SQL或熟悉的DataFrameAPI在Spark程序中查询结构化数据容易整合(集成)DataFrame和SQL提供了访问各种数据源的方法,包括Hive、Avro、Parquet、ORC、JSON和JDBC统一的数据访问方式parkSQL支持HiveQL语法以及HiveSerDes和UDF允许我们访问现有的Hive仓库兼容HiveSparkSQL支持JDBC或ODBC连接标准的数据库连接6.1.5SparkSQL简单编程实例SparkSession接口SparkSession实现了SQLContext及HiveContext所有功能。此外,SparkSession也封装了SparkConf、SparkContext和StreamingContextSparkSession支持从不同的数据源加载数据,以及把数据转换成DataFrame,并且支持把DataFrame转换成SQLContext自身的表,然后使用SQL语句来操作数据可以通过如下语句创建一个SparkSession对象importorg.apache.spark.sql.SparkSessionvalspark=SparkSession.builder().appName(“SparkSessionExample”).getOrCreate()6.1.5SparkSQL简单编程实例在Linux终端中,执行如下命令创建一个目录sparkapp作为应用程序根目录$cd~#进入用户主目录$mkdir./sparkapp#创建应用程序根目录,如果已经存在,则不用创建$mkdir-p./sparkapp/src/main/scala

#创建所需的目录结构6.1.5SparkSQL简单编程实例下面给出一个具体实例,介绍SparkSQL的编程方法6.1.5SparkSQL简单编程实例importorg.apache.spark.sql.SparkSessionobjectSimpleApp{defmain(args:Array[String]){vallogFile="file:///usr/local/spark/README.md"valspark=SparkSession.builder.appName("SimpleApplication").getOrCreate()vallogData=spark.read.textFile(logFile).cache()valnumAs=logData.filter(line=>line.contains("a")).count()valnumBs=logData.filter(line=>line.contains("b")).count()println(s"Lineswitha:$numAs,Lineswithb:$numBs")spark.stop()}}在“src/main/scala”目录下创建一个代码文件SimpleApp.scala,其内容如下

6.1.5SparkSQL简单编程实例name:="SimpleProject"version:="1.0"scalaVersion:="2.12.15"libraryDependencies+="org.apache.spark"%%"spark-sql"%"3.2.0"

在sparkapp目录下创建一个文件simple.sbt,并设置为如下内容6.1.5SparkSQL简单编程实例$/usr/local/sbt/sbtpackage然后,使用spark-submit命令运行程序,就可以得到执行结果,具体如下:$cd~/sparkapp$/usr/local/spark/bin/spark-submit\>--class"SimpleApp"\>./target/scala-2.12/simple-project_2.12-1.0.jar2>&1|grep"Lineswitha:"

执行如下代码使用sbt工具对代码进行编译打包在Linux终端中,执行如下命令创建一个目录sparkapp作为应用程序根目录Lineswitha:65,Lineswithb:33结构化数据DataFrame6.2结构化数据DataFrame6.2.1DataFrame概述6.2.2DataFrame的优点6.2.1DataFrame概述DataFrameRDD(Person)DataFrame与RDD的区别PersonPersonPersonPersonPersonPersonNameAgeHeightStringIntDoubleStringIntDoubleStringIntDoubleStringIntDoubleStringIntDoubleStringIntDouble6.2.1DataFrame概述处理大规模结构化数据SparkSQL

MySQL→DataFrame支持SQL查询6.2.1DataFrame概述仓库保存洗漱用具6.2.1DataFrame概述张三传统RDD方式6.2.1DataFrame概述张三张三打开传统RDD方式6.2.1DataFrame概述DataFrame方式张三DataFrame方式6.2.1DataFrame概述张三张三RDD是分布式的Java对象的集合,但是,对象内部结构对于RDD而言却是不可知的DataFrame是一种以RDD为基础的分布式数据集,提供了详细的结构信息6.2.1DataFrame概述6.2.2

DataFrame的优点风格一致更优的空间效率更好的性能易组合简洁表达能力强优

点6.2.2

DataFrame的优点("spark",2)("hadoop",6)("hadoop",4)("spark",6)scala>valbookRDD=sc.|parallelize(Array(("spark",2),("hadoop",6),("hadoop",4),("spark",6)))scala>valsaleRDD=bookRDD.map(x=>(x._1,(x._2,1))).|reduceByKey((x,y)=>(x._1+y._1,x._2+y._2)).|map(x=>(x._1,x._2._1/x._2._2)).collect()计算每个键对应的平均值,即每种图书的每天平均销量。使用RDD编程时,语句如下

6.2.2

DataFrame的优点scala>valbookDF=spark.|createDataFrame(Array(("spark",2),("hadoop",6),("hadoop",4),("spark",6))).|toDF("book","amount")scala>valavgDF=bookDF.groupBy("book").agg(avg("amount"))scala>avgDF.show()+------+-----------+|book|avg(amount)|+------+-----------+|spark|4.0||hadoop|5.0|+------+-----------+如果是使用DataFrameAPI来表达相同的查询,就会简单很多提高了代码的表达能力实现更高的执行效率DataFrame的创建和保存6.3DataFrame的创建和保存6.3.2JSON6.3.4文本文件6.3.1Parquet6.3.3CSV010203046.3.1Parquetscala>valfilePath=|"file:///usr/local/spark/examples/src/main/resources/users.parquet"scala>valdf=spark.read.format("parquet").load(filePath)scala>df.show()+------+--------------+----------------+|name|favorite_color|favorite_numbers|+------+--------------+----------------+|Alyssa|null|[3,9,15,20]||Ben|red|[]|+------+--------------+----------------+

1.

从Parquet文件创建DataFrame6.3.1Parquet或者也可以使用如下方式读取Parquet文件生成DataFramescala>valfilePath=|"file:///usr/local/spark/examples/src/main/resources/users.parquet"scala>valdf=spark.read.parquet(filePath)6.3.1Parquet

2.

将DataFrame保存为Parquet文件(在上面的代码基础上继续执行下面代码)scala>df.write.format("parquet").mode("overwrite").option("compression","snappy").|save("file:///home/hadoop/otherusers")上面代码执行以后,在本地文件系统中的“/home/hadoop/”目录下会生成一个名称为“otherusers”的子目录,该目录下包含两个文件,即_SUCCESS文件和像part-00000-XXXX.snappy.parquet这样的文件,后者是使用snappy压缩算法得到的压缩文件。如果要再次读取文件生成DataFrame,load()中可以直接使用目录“file:///home/hadoop/otherusers”也可以使用文件“file:///home/hadoop/otherusers/part-00000-XXXX.snappy.parquet”scala>df.write.parquet("file:///home/hadoop/otherusers")6.3.2JSONscala>valfilePath=|"file:///usr/local/spark/examples/src/main/resources/people.json"scala>valdf=spark.read.format("json").load(filePath)scala>df.show()+----+-------+|age|name|+----+-------+|null|Michael||30|Andy||19|Justin|+----+-------+

1.

从JSON文件创建DataFrame:从JSON文件创建DataFrame的具体方法如下或者也可以使用如下方式读取JSON文件生成DataFramescala>valfilePath=|"file:///usr/local/spark/examples/src/main/resources/people.json"scala>valdf=spark.read.json(filePath)6.3.2JSON

2.

将DataFrame保存为JSON文件(在上面的代码基础上继续执行下面代码)scala>df.write.format("json").mode("overwrite").|save("file:///home/hadoop/otherpeople")上面代码执行以后,在本地文件系统中的“/home/hadoop/”目录下会生成一个名称为“otherpeople”的子目录,该目录下包含两个文件,即_SUCCESS文件和像part-00000-XXXX.json这样的文件。如果要再次读取文件生成DataFrame,load()中可以直接使用目录“file:///home/hadoop/otherpeople”,也可以使用文件“file:///home/hadoop/otherpeople/part-00000-XXXX.json”scala>df.write.json("file:///home/hadoop/otherpeople")6.3.2JSON6.3.3CSVscala>valfilePath=|"file:///usr/local/spark/examples/src/main/resources/people.csv"scala>valschema="nameSTRING,ageINT,jobSTRING"scala>valdf=spark.read.format("csv").schema(schema).option("header","true").|option("sep",";").load(filePath)scala>df.show()+-----+---+---------+|name|age|job|+-----+---+---------+|Jorge|30|Developer||Bob|32|Developer|+-----+---+---------+

1.

从CSV文件创建DataFrame:从CSV文件创建DataFrame的具体方法如下或者也可以使用如下方式读取CSV文件生成DataFrame6.3.3CSVscala>valfilePath=|"file:///usr/local/spark/examples/src/main/resources/people.csv"scala>valschema="nameSTRING,ageINT,jobSTRING"scala>valdf=spark.read.schema(schema).option("header","true").|option("sep",";").csv(filePath)

2.

将DataFrame保存为CSV文件(在上面的代码基础上继续执行下面代码)scala>df.write.format("csv").mode("overwrite").|save("file:///home/hadoop/anotherpeople")上面代码执行以后,在本地文件系统中的“/home/hadoop/”目录下会生成一个名称为“anotherpeople”的子目录,该目录下包含两个文件,即_SUCCESS文件和像part-00000-XXXX.csv这样的文件。如果要再次读取文件生成DataFrame,load()中可以直接使用目录“file:///home/hadoop/anotherpeople”,也可以使用文件“file:///home/hadoop/anotherpeople/part-00000-XXXX.csv”scala>df.write.csv("file:///home/hadoop/anotherpeople")6.3.3CSV6.3.4文本文件scala>valfilePath="file:///home/hadoop/word.txt"scala>valdf=spark.read.format("text").load(filePath)scala>df.show()+---------------+|value|+---------------+|hadoopisgood||sparkisbetter||sparkisfast|+---------------+

从一个文本文件创建DataFrame的方法如下或者也可以使用如下方式读取文本文件生成DataFramescala>valfilePath=|"file:///usr/local/spark/examples/src/main/resources/people.txt"scala>valdf=spark.read.text(filePath)6.3.4文本文件

如果要把一个DataFrame保存成文本文件,则需要使用如下语句格式scala>valpeopleDF=spark.read.format("json").|load("file:///usr/local/spark/examples/src/main/resources/people.json")scala>peopleDF.rdd.saveAsTextFile("file:///home/hadoop/newpeople")上面代码执行以后,会在生成目录"file:///home/hadoop/newpeople",这个目录下会包含两个文件,part-00000和_SUCCESS,其中,part-00000文件中包含了具体数据scala>df.write.format("text").save("file:///home/hadoop/newpeople")6.3.4文本文件DataFrame的基本操作6.4DataFrame的基本操作DSL语法风格SQL语法风格6.4.1DSL语法风格DSL:“领域专用语言”允许开发者通过调用方法对DataFrame内部的数据进行分析6.4.1DSL语法风格show()withColumn()filter()groupBy()select()sort()printSchema()drop()6.4.1DSL语法风格scala>valfilePath=|"file:///usr/local/spark/examples/src/main/resources/people.json"scala>valdf=spark.read.json(filePath)

首先从Spark自带的样例文件people.json创建一个名称为df的DataFrame6.4.1DSL语法风格.printSchema().show()6.4.1DSL语法风格.select()6.4.1DSL语法风格.filter().groupBy()6.4.1DSL语法风格.sort()6.4.1DSL语法风格.withColumn()6.4.1DSL语法风格.drop()6.4.1DSL语法风格其他常用操作6.4.2

SQL语法风格SQL使用SQL语句操作DataFrameSQL函数6.4.2

SQL语法风格熟练使用SQL语法的开发者,可直接使用SQL语句进行数据操作。相比于DSL语法风格,在执行SQL语句之前,需通过DataFrame实例创建临时视图。创建临时视图的方法是调用DataFrame的createTempView或createOrReplaceTempView方法,二者的区别是,后者会进行判断01对于createOrReplaceTempView方法而言,如果在当前会话中存在相同名称的临时视图,则用新视图替换原来的临时视图,如果在当前会话中不存在相同名称的临时视图,则创建临时视图。对于createTempView方法而言,如果在当前会话中存在相同名称的临时视图,则会直接报错02scala>valfilePath=|"file:///usr/local/spark/examples/src/main/resources/people.json"scala>valdf=spark.read.json(filePath)scala>df.show()+----+-------+|age|name|+----+-------+|null|Michael||30|Andy||19|Justin|+----+-------+

1.

使用SQL语句操作DataFrame6.4.2

SQL语法风格scala>df.createTempView("people")scala>spark.sql("SELECT*FROMpeople").show()+----+-------+|age|name|+----+-------+|null|Michael||30|Andy||19|Justin|+----+-------+scala>spark.sql("SELECTnameFROMpeoplewhereage>20").show()+----+|name|+----+|Andy|+----+6.4.2

SQL语法风格6.4.2

SQL语法风格2.

SQL函数:提供了丰富的函数供用户选择一共200多个,基本涵盖了大部分的日常应用场景,包括转换函数、数学函数、字符串函数、二进制函数、日期时间函数、正则表达式函数、JSON函数、URL函数、聚合函数、窗口函数和集合函数等。当Spark自带的这些系统函数无法满足用户需求时,用户还可以创建“用户自定义函数”scala>importorg.apache.spark.sql.Rowscala>importorg.apache.spark.sql.types._scala>valschema=StructType(List(StructField("name",StringType,true),|StructField("age",IntegerType,true),|StructField("create_time",LongType,true)))scala>valjavaList=newjava.util.ArrayList[Row]()scala>javaList.add(Row("Xiaomei",21,System.currentTimeMillis()/1000))scala>javaList.add(Row("Xiaoming",22,System.currentTimeMillis()/1000))scala>javaList.add(Row("Xiaoxue",23,System.currentTimeMillis()/1000))scala>valdf=spark.createDataFrame(javaList,schema)

使用用户自定义函数将用户名转化为大写英文字母。具体实现代码如下6.4.2

SQL语法风格nameagecreate_timescala>df.show()+--------+---+-----------+|name|age|create_time|+--------+---+-----------+|Xiaomei|21|1644480595||Xiaoming|22|1644480607||Xiaoxue|23|1644480615|+--------+---+-----------+scala>df.createTempView("user_info")scala>spark.sql("SELECTname,age,from_unixtime(create_time,'yyyy-MM-ddHH:mm:ss')FROMuser_info").show()+--------+---+-----------------------------------------------+|name|age|from_unixtime(create_time,yyyy-MM-ddHH:mm:ss)|+--------+---+-----------------------------------------------+|Xiaomei|21|2022-02-1000:09:55||Xiaoming|22|2022-02-1000:10:07||Xiaoxue|23|2022-02-1000:10:15|+--------+---+-----------------------------------------------+6.4.2

SQL语法风格scala>spark.udf.register("toUpperCaseUDF",(column:String)=>column.toUpperCase)scala>spark.sql("SELECTtoUpperCaseUDF(name),age,from_unixtime(create_time,'yyyy-MM-ddHH:mm:ss')FROMuser_info").show()+--------------------+---+-----------------------------------------------+|toUpperCaseUDF(name)|age|from_unixtime(create_time,yyyy-MM-ddHH:mm:ss)|+--------------------+---+-----------------------------------------------+|XIAOMEI|21|2022-02-1000:09:55||XIAOMING|22|2022-02-1000:10:07||XIAOXUE|23|2022-02-1000:10:15|+--------------------+---+-----------------------------------------------+6.4.2

SQL语法风格从RDD转换得到DataFrame提纲12使用编程方式定义RDD模式利用反射机制推断RDD模式6.5.1利用反射机制推断RDD模式“/usr/local/spark/examples/src/main/resources/”目录下Michael,29Andy,30Justin,19people.txt6.5.1利用反射机制推断RDD模式Michael,29Andy,30Justin,19DataFrame内存加载生成people.txt6.5.1利用反射机制推断RDD模式RDD利用反射机制推断RDD的模式6.5.1利用反射机制推断RDD模式

首先需定义一个caseclass,只有caseclass才能被Spark隐式地转换为DataFramescala>importorg.apache.spark.sql.catalyst.encoders.ExpressionEncoderimportorg.apache.spark.sql.catalyst.encoders.ExpressionEncoderscala>importorg.apache.spark.sql.Encoderimportorg.apache.spark.sql.Encoderscala>importspark.implicits._//导入包,支持把一个RDD隐式转换为一个DataFrameimportspark.implicits._6.5.1利用反射机制推断RDD模式scala>caseclassPerson(name:String,age:Long)//定义一个caseclassdefinedclassPersonscala>valpeopleDF=spark.sparkContext.|textFile("file:///usr/local/spark/examples/src/main/resources/people.txt").|map(_.split(",")).|map(attributes=>Person(attributes(0),attributes(1).trim.toInt)).toDF()peopleDF:org.apache.spark.sql.DataFrame=[name:string,age:bigint]

首先需定义一个caseclass,只有caseclass才能被Spark隐式地转换为DataFrame6.5.1利用反射机制推断RDD模式scala>peopleDF.createOrReplaceTempView("people")//必须注册为临时表才能供下面的查询使用scala>valpersonsRDD=spark.sql("selectname,agefrompeoplewhereage>20")//最终生成一个DataFrame,下面是系统执行返回的信息personsRDD:org.apache.spark.sql.DataFrame=[name:string,age:bigint]scala>personsRDD.map(t=>"Name:"+t(0)+","+"Age:"+t(1)).show()//DataFrame中的每个元素都是一行记录,包含name和age两个字段,分别用t(0)和t(1)来获取值//下面是系统执行返回的信息+------------------+|value|+------------------+|Name:Michael,Age:29||Name:Andy,Age:30|+------------------+6.5.2使用编程方式定义RDD模式采用编程方式定义RDD模式?6.5.2使用编程方式定义RDD模式提前定义caseclassname字段age字段6.5.2使用编程方式定义RDD模式构建关系表事先不能够知道字段后面通过动态的方式得到信息6.5.2使用编程方式定义RDD模式采用编程方式定义RDD模式第二种方式6.5.2使用编程方式定义RDD模式磁盘people.txtMichael,29Andy,30Justin,19加载进来DataFrame进行SQL查询6.5.2使用编程方式定义RDD模式6.5.2使用编程方式定义RDD模式模式信息也叫表头什么类型?什么类型?是否为空?6.5.2使用编程方式定义RDD模式什么类型?是否为空?模式信息几个字段?字段类型?是否为空?6.5.2使用编程方式定义RDD模式三条记录从文本加载进来6.5.2使用编程方式定义RDD模式6.5.2使用编程方式定义RDD模式scala>importorg.apache.spark.sql.types._importorg.apache.spark.sql.types._scala>importorg.apache.spark.sql.Rowimportorg.apache.spark.sql.Row//生成字段scala>valfields=Array(StructField("name",StringType,true),StructField("age",IntegerType,true))fields:Array[org.apache.spark.sql.types.StructField]=Array(StructField(name,StringType,true),StructField(age,IntegerType,true))scala>valschema=StructType(fields)schema:org.apache.spark.sql.types.StructType=StructType(StructField(name,StringType,true),StructField(age,IntegerType,true))//从上面信息可以看出,schema描述了模式信息,模式中包含name和age两个字段//shcema就是“表头”6.5.2使用编程方式定义RDD模式//下面加载文件生成RDDscala>valpeopleRDD=spark.sparkContext.|textFile("file:///usr/local/spark/examples/src/main/resources/people.txt")peopleRDD:org.apache.spark.rdd.RDD[String]=file:///usr/local/spark/examples/src/main/resources/people.txtMapPartitionsRDD[1]attextFileat<console>:26//对peopleRDD这个RDD中的每一行元素都进行解析scala>valrowRDD=peopleRDD.map(_.split(",")).|map(attributes=>Row(attributes(0),attributes(1).trim.toInt))rowRDD:org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]=MapPartitionsRDD[3]atmapat<console>:29//上面得到的rowRDD就是“表中的记录”6.5.2使用编程方式定义RDD模式//下面把“表头”和“表中的记录”拼装起来scala>valpeopleDF=spark.createDataFrame(rowRDD,schema)peopleDF:org.apache.spark.sql.DataFrame=[name:string,age:int]//必须注册为临时表才能供下面查询使用scala>peopleDF.createOrReplaceTempView("people")scala>valresults=spark.sql("SELECTname,ageFROMpeople")results:org.apache.spark.sql.DataFrame=[name:string,age:int]

scala>results.|map(attributes=>"name:"+attributes(0)+","+"age:"+attributes(1)).|show()+--------------------+|value|+--------------------+|name:Michael,age:29||name:Andy,age:30||name:Justin,age:19|+--------------------+使用SparkSQL读写数据库6.6使用SparkSQL读写数据库SparkSQL

准备工作

01

编写独立应用

程序访问MySQL04读取MySQL数据库中的数据02

向MySQL数据库写入数据036.6.1准备工作请参考厦门大学数据库实验室博客教程《Ubuntu安装MySQL》教程地址:

/blog/install-mysql/,在Linux系统中安装好MySQL数据库6.6.1准备工作

输入下面SQL语句完成数据库和表的创建

$servicemysqlstart$mysql-uroot-p#屏幕会提示你输入密码

在Linux中启动MySQL数据库

mysql>createdatabasespark;mysql>usespark;mysql>createtablestudent(idint(4),namechar(20),genderchar(4),ageint(4));mysql>insertintostudentvalues(1,'Xueqian','F',23);mysql>insertintostudentvalues(2,'Weiliang','M',24);mysql>select*fromstudent;6.6.1准备工作010203下载驱动程序下载MySQL的JDBC驱动程序,比如mysql-connector-java-5.1.40.tar.gz拷贝到安装目录把该驱动程序拷贝到spark

的安装目录下”/usr/local/spark/jars”启动spark-shell启动spark-shell,启动SparkShell时,必须指定mysql连接驱动jar包6.6.1准备工作

准备工作:下载驱动程序,拷贝到安装目录,启动spark-shell

$cd/usr/local/spark$./bin/spark-shell\--jars/usr/local/spark/jars/mysql-connector-java-5.1.40/mysql-connector-java-5.1.40-bin.jar\--driver-class-path/usr/local/spark/jars/mysql-connector-java-5.1.40/mysql-connector-java-5.1.40-bin.jar6.6.1准备工作scala>valjdbcDF=spark.read.format("jdbc").|option("url","jdbc:mysql://localhost:3306/spark").|option("driver","com.mysql.jdbc.Driver").|option("dbtable","student").|option("user","root").|option("password","123456").|load()scala>jdbcDF.show()+---+--------+------+---+|id|name|gender|age|+---+--------+------+---+|1|Xueqian|F|23||2|Weiliang|M|24|+---+--------+------+---+

执行以下命令连接数据库,读取数据,并显示6.6.3

向MySQL数据库写入数据

在MySQL数据库中创建了一个名称为spark的数据库,查看一下数据库内容importjava.util.Propertiesimportorg.apache.spark.sql.types._importorg.apache.spark.sql.Row

//下面我们设置两条数据表示两个学生信息valstudentRDD=spark.sparkContext.parallelize(Array("3RongchengM26","4GuanhuaM27")).map(_.split(""))

//下面要设置模式信息valschema=StructType(List(StructField("id",IntegerType,true),StructField("name",StringType,true),StructField("gender",StringType,true),StructField("age",IntegerType,true)))

在spark-shell中编写程序,往spark.student表中插入两条记录6.6.3

向MySQL数据库写入数据6.6.3

向MySQL数据库写入数据//下面创建Row对象,每个Row对象都是rowRDD中的一行valrowRDD=studentRDD.map(p=>Row(p(0).toInt,p(1).trim,p(2).trim,p(3).toInt))

//建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来valstudentDF=spark.createDataFrame(rowRDD,schema)

//下面创建一个prop变量用来保存JDBC连接参数valprop=newProperties()prop.put("user","root")//表示用户名是rootprop.put("password","hadoop")//表示密码是hadoopprop.put("driver","com.mysql.jdbc.Driver")//表示驱动程序是com.mysql.jdbc.Driver

//下面就可以连接数据库,采用append模式,表示追加记录到数据库spark的student表中studentDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/spark","spark.student",prop)mysql>select*fromstudent;+------+-----------+--------+------+|id|name|gender|age|+------+-----------+--------+------+|1|Xueqian|F|23||2|Weiliang|M|24||3|Rongcheng|M|26||4|Guanhua|M|27|+------+-----------+--------+------+4rowsinset(0.00sec)

查看MySQL数据库中的spark.student表发生了什么变化6.6.3

向MySQL数据库写入数据6.6.4编写独立应用程序访问MySQL1.

读取MySQLimportorg.apache.log4j.{Level,Logger}importorg.apache.spark.sql.SparkSessionobjectSparkReadMySQL{defmain(args:Array[String]):Unit={Logger.getLogger("org").setLevel(Level.ERROR)valspark=SparkSession.builder().appName("SparkReadMySQL").getOrCreate()valdf=spark.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/spark").option("driver","com.mysql.jdbc.Driver").option("dbtable","student").option("user","root").option("password","123456").load()df.show()spark.stop()}}

$/usr/local/spark/bin/spark-submit\>--jars\>/usr/local/spark/jars/mysql-connector-java-5.1.40/mysql-connector-java-5.1.40-bin.jar\>--class"SparkReadMySQL"\>/home/hadoop/sparkapp/target/scala-2.12/simple-project_2.12-1.0.jar

对代码进行编译打包,然后执行如下命令运行程序6.6.4编写独立应用程序访问MySQL6.6.4编写独立应用程序访问MySQL2.

写入MySQLimportjava.util.Propertiesimportorg.apache.spark.sql.types._importorg.apache.spark.sql.Rowimportorg.apache.log4j.{Level,Logger}importorg.apache.spark.sql.SparkSessionobjectSparkWriteMySQL{defmain(args:Array[String]):Unit={Logger.getLogger("org").setLevel(Level.ERROR)valspark=SparkSession.builder().appName("SparkWriteMySQL").getOrCreate()//下面我们设置两条数据表示两个学生信息valstudentRDD=spark.sparkContext.parallelize(Array("3RongchengM26","4GuanhuaM27")).map(_.split(""))6.6.4编写独立应用程序访问MySQL

//下面要设置模式信息valschema=StructType(List(StructField("id",IntegerType,true),StructField("name",StringType,true),StructField("gender",StringType,true),StructField("age",IntegerType,true)))

//下面创建Row对象,每个Row对象都是rowRDD中的一行valrowRDD=studentRDD.map(p=>Row(p(0).toInt,p(1).trim,p(2).trim,p(3).toInt))

//建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来valstudentDF=spark.createDataFrame(rowRDD,schema)

//下面创建一个prop变量用来保存JDBC连接参数valprop=newProperties()prop.put("user","root")//表示用户名是rootprop.put("password","123456")//表示密码是123456prop.put("driver","com.mysql.jdbc.Driver")//表示驱动程序是com.mysql.jdbc.Driver

//下面就可以连接数据库,采用append模式,表示追加记录到数据库spark的student表中studentDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/spark","spark.student",prop)spark.stop()}}$/usr/local/spark/bin/spark-submit\>--jars\>/usr/local/spark/jars/mysql-connector-java-5.1.40/mysql-connector-java-5.1.40-bin.jar\>--class"SparkWriteMySQL"\>/home/hadoop/sparkapp/target/scala-2.12/simple-project_2.12-1.0.jar

对代码进行编译打包,然后执行如下命令运行程序6.6.4编写独立应用程序访问MySQLDataSet6.7DataSetDataFrame、DataSet和RDD的区别创建DataSetRDD、DataFrame和DataSet之间的相互转换词频统计实例目录CONTENT6.7.1DataFrame、DataSet和RDD的区别RDDDataSetDataFrame6.7.1DataFrame、DataSet和RDD的区别

RDD中的数据保存方式

DataFrame中的数据保存方式6.7.1DataFrame、DataSet和RDD的区别

DataSet中的数据保存方式之一

DataSet中的数据保存方式之二6.7.1DataFrame、DataSet和RDD的区别RDDDataFrameDataSet不可变性是是是分区是是是模式没有有有查询优化器没有有有API级别低高高是否类型安全是否是何时检测语法错误编译时编译时编译时何时检测分析错误编译时运行时编译时6.7.1DataFrame、DataSet和RDD的区别图SparkSQL中的查询优化6.7.1DataFrame、DataSet和RDD的区别(1)没有针对特殊场景进行优化,比如对于结构化数据处理相对于SQL来比显得非常麻烦(2)默认采用的是Java序列化方式,序列化结果比较大,而且数据存储在Java堆内存中,导致垃圾回收比较频繁(1)相比于传统的MapReduce框架,Spark在RDD中内置了很多函数操作,如map、filter、sort等,方便处理结构化或非结构化数据(2)面向对象编程,直接存储Java对象,类型转化比较安全RDD缺点RDD优点6.7.1DataFrame、DataSet和RDD的区别(1)结构化数据处理非常方便,支持Avro、CSV、Elasticsearch、Cassandra等类型数据,也支持Hive、MySQL等传统数据表;(2)可以进行有针对性的优化,比如采用Kryo序列化,由于Spark中已经保存了数据结构元信息,因此,序列化时就不需要带上元信息,这就大大减少了序列化开销,而且数据保存在堆外内存中,减少了垃圾回收次数,所以运行更快DataFrame优点(1)不支持编译时类型安全,运行时才能确定是否有问题;(2)对于对象支持不友好,RDD内部数据直接以Java对象存储,而DataFrame内存存储的是Row对象,而不是自定义对象DataFrame缺点6.7.1DataFrame、DataSet和RDD的区别04030102和DataFrame一样DataSet整合了RDD和DataFrame的优点和RDD一样Dataset支持结构化和非结构化数据DataSet支持结构化数据的SQL查询采用堆外内存存储垃圾回收比较高效DataSet支持自定义对象存储6.7.1DataFrame、DataSet和RDD的区别01则使用DataFrame或DataSet如需丰富的语义、高层次的抽象和特定API02或lambda函数,则用DataFrame或DataSet如果处理要求涉及到filter、map查询04则使用DataFrame或DataSet如果想统一和简化Spark的API03需要类型化的JVM对象,并希望利用Tungsten编码进行高效的序列化和反序列化,则使用DataSet如需在编译时获得更高的类型安全性05则使用DataFrame如果与R语言或Python语言结合使用06尽量使用RDD如果需要更多的控制功能6.7.1DataFrame、DataSet和RDD的区别基于RDD的结构化数据抽象提供的DataFrame

DataSetSQL6.7.2

创建DataSetscala>valds1=spark.createDataset(1to5)ds1:org.apache.spark.sql.Dataset[Int]=[value:int]scala>ds1.show()+--------+|value|+--------+|1||2||3||4||5|+--------+

1.

使用createDataset方法创建6.7.2

创建DataSetscala>valds2=spark.createDataset(sc.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt"))ds2:org.apache.spark.sql.Dataset[String]=[value:string]scala>ds2.show()+---------------+|value|+---------------+|Michael,29||Andy,30||Justin,19|+---------------+

1.

使用createDataset方法创建6.7.2

创建DataSetscala>caseclassPerson(name:String,age:Int)definedclassPersonscala>valdata=List(Person("ZhangSan",23),Person("LiSi",35))data:List[Person]=List(Person(ZhangSan,23),Person(LiSi,35))scala>valds3=data.toDSds3:org.apache.spark.sql.Dataset[Person]=[name:string,age:int]scala>ds3.show()+--------------+-----+|name|age|+--------------+-----+|ZhangSan|23||LiSi|35|+--------------+----+

2.

通过toDS方法生成DataSet6.7.2

创建DataSetscala>caseclassPerson(name:String,age:Long)definedclassPersonscala>val

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

最新文档

评论

0/150

提交评论