版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
SparkSparksqlspark模块。在某些数据处理方面通过添加辅助信息,进行类似于sql方面的优化。Sparksqlsql语句进行处理。简单高效。第一章简单valspark=SparkSession.appName("SparkSQLbasic.config("spark.some.config.option","some-//ForimplicitconversionslikeconvertingRDDstoDataFramesimportspark.implicits._valdf=//DisysthecontentoftheDataFrametostdout//+---- //| // //// // 19|// 文件。通过show方法展示内容。sqlimport//Printtheschemainatreeformat////|--age:long(nullable=//|--name:string(nullable=//Selectonlythe"name"column// // // //// //|// //Selecteverybody,butrementby1df.select($"name",$"age"+1).show()// // name|(age+// // // //| // //Selectpeopleolderthan21df.filter($"age">//+--- ////+--- //|//+--- //Countpeoplebyage// //|// // // // //+---- json。//RegistertheDataFrameasaSQLtemporaryviewvalsqlDF=spark.sql("SELECT*FROMpeople")// //| // //// // 19|// //RegistertheDataFrameasaglobaltemporaryview//Globaltemporaryviewistiedtoasystempdatabase`global_temp`spark.sql("SELECT*FROMglobal_temp.people").show()// //| // //// // 19|// //Globaltemporaryviewiscross-session// //| // //// // 19|// createClobalempiew)caseclass(name:String,age://EncodersarecreatedforcasevalcaseClassDS=Seq(("Andy",32)).toDS()////|Andy|//EncodersformostcommontypesareautomaticallyprovidedbyimportingvalprimitiveDS=Seq(1,2,primitiveDS.map(_+1).collect()//Returns:Array(2,3,//DataFramescanbeconvertedtoaDatasetbyprovidingaclass.MapwillbebyvalpeopleDS=spark.read.json(path).as[]// //| // //// // 19|// RDD//ForimplicitconversionsfromRDDstoDataFramesimportspark.implicits._//CreateanRDDof objectsfromatextfile,convertittoaDataframevalpeopleDF=spark.sparkContext.map(attributes (attributes(0),//RegistertheDataFrameasatemporaryview//SQLstatementscanberunbyusingthesqlmethodsprovidedbyvalteenagersDF=spark.sql("SELECTname,ageFROMpeopleWHEREageBETWEEN13AND19")//ThecolumnsofarowintheresultcanbeaccessedbyfieldindexteenagersDF.map(teenager=>"Name:"+teenager(0)).show()// // // //|Name:// //orbyfielteenagersDF.map(teenager=>"Name:"+// // // //|Name:// //Nopre-definedencodersforDataset[Map[K,V]],defineimplicitvalmapEncoder=org.apache.spark.sql.Encoders.kryo[Map[String,//Primitivetypesandcaseclassescanbealsodefined //row.getValuesMap[T]retrievesmultiplecolumnsatonceintoaMap[String,T] //Array(Map("name"->"Justin","age"->SparksqlRdd。第六章以模式来创建表的型式//Createan //TheschemaisencodedinastringvalschemaString="nameage"//Generatetheschemabasedonthestringofschemavalfields=schemaString.split("").map(fielme=>StructField(fielme,StringType,nullable=true))valschema=StructType(fields)//ConvertrecordsoftheRDD(people)toRowsvalrowRDD=peopleRDD.map(attributes=>Row(attributes(0),//ApplytheschematothevalpeopleDF=spark.createDataFrame(rowRDD,//CreatesatemporaryviewusingtheDataFrame//SQLcanberunoveratemporaryviewcreatedusingDataFramesvalresults=spark.sql("SELECTnameFROMpeople")//TheresultsofSQLqueriesareDataFramesandsupportallthenormalRDD//Thecolumnsofarowintheresultcanbeaccessedbyfieldindexorbyfielresults.map(attributes=>"Name:"+// // // //|Name:// Name://|Name:// Sparksql提供了例如sql的count(), t(),max(),min()等函数。importorg.apache.spark.sql.{Row,importorg.apache.spark.sql.expressions.MutableAggregationBufferimportorg.apache.spark.sql.types._objectMyAverageextendsUserDefinedAggregateFunction//DatatypesofinputargumentsofthisaggregatedefinputSchema:StructType=StructType(StructField("inputColumn",LongType):://DatatypesofvaluesintheaggregationbufferdefbufferSchema:StructType={StructType(StructField("sum",LongType)::StructField("count",LongType)}//ThedatatypeofthereturnedvaluedefdataType:DataType=DoubleType//Whetherthisfunctionalwaysreturnsthesameoutputontheidenticalinputdefdeterministic:Boolean=true//Initializesthegivenaggregationbuffer.Thebufferitselfisa`Row`thatin//standardmethodslikeretrievingavalueatanindex(e.g.,get(),//theopportunitytoupdateitsvalues.Notethatarraysandmapsinsidethebufferare//definitialize(buffer:MutableAggregationBuffer):Unit={=buffer(1)=}//Updatesthegivenaggregationbuffer`buffer`withnewinputdatafrom`input`defupdate(buffer:MutableAggregationBuffer,input:Row):Unit={if(!input.isNullAt(0))buffer(1)=buffer.getLong(1)+1}}//Mergestwoaggregationbuffersandstorestheupdatedbuffervaluesbackdefmerge(buffer1:MutableAggregationBuffer,buffer2:Row):Unit{buffer1(0)=buffer1.getLong(0)+buffer2.getLong(0)buffer1(1)=buffer1.getLong(1)+buffer2.getLong(1)}//Calculatesthefinal}//Registerthefunctiontoaccessit // // // // //| // // valresult=spark.sql("SELECTmyAverage(salary)asaverage_salaryFROM// //// // // initialie
updatemergeimportorg.apache.spark.sql.expressions.AggregatorcaseclassEmployee(name:String,salary:Long)caseclassAverage(varsum:Long,varcount:Long)objectMyAverageextendsAggregator[Employee,Average,Double]//Azerovalueforthisaggregation.Shouldsatisfythepropertythatanyb+zero=bdefzero:Average=Average(0L,0L)//Combwovaluestoproduceanewvalue.Forperformance,thefunctionmaymodify`buffer`//andreturnitinsteadofconstructinganew{buffer.sum+=employee.salarybuffer.count+=1}//Mergetwointermediatedefmerge(b1:Average,b2:Average):Average{b1.sum+=b2.sum}//Transformtheoutputofthedeffinish(reduction:Average):Double=reduction.sum.toDouble///SpecifiestheEncoderfortheintermediatevaluetype//SpecifiestheEncoderforthefinaloutputvaluedefoutputEncoder:Encoder[Double]=} // // // // //| // // //Convertthefunctiontoa`TypedColumn`andgiveitavalresult=ds.select(averageSalary)// //// // // valusersDF=spark.read.load("examples/src/main/resources/users.parquet")parquetJson csv格式导入valpeopleDFCsv=.option("sep",.option("inferSchema",.option("header", Any"error"orWhensavingaDataFrametoadatasoexceptionipectedtobethrown.WhensavingaDataFrametoadatasocontentsoftheDataFrameareOverwritemodemeansthatwhensavindata/tablealreadyexists,existingdatacontentsoftheDataFrame.Ignoremodemeansthatwhensavingaalreadyexists,thesaveoperationiDataFrameandtonotchangetheTABLEIFNOTEXISTSin.bucketBy(42,Rarquet//EncodersformostcommontypesareautomaticallyprovidedbyimportingimportvalpeopleDF=//DataFramescanbesavedasParquetfiles,maintainingtheschemainformation//Readintheparquetfilecreated//Parquetfilesareself-describingsotheschemais//TheresultofloadingaParquetfileisalsoaDataFramevalparquetFileDF=//ParquetfilescsobeusedtocreateatemporaryviewandthenusedinSQLvalnamesDF=spark.sql("SELECTnameFROMparquetFileWHEREagWEEN13AND19")namesDF.map(attributes=>"Name:"+// // // //|Name:// Schema//ThisisusedtoimplicitlyconvertanRDDtoaDataFrame.importspark.implicits._//CreateasimpleDataFrame,storeintoapartition//CreateanotherDataFrameina rtition//addinganewcolumnanddropanexistingvalcubesDF=spark.sparkContext.makeRDD(6to10).map(i=>(i,i*i*i)).toDF("value","cube")//ReadthepartitionedvalmergedDF=spark.read.option("mergeSchema","true").parquet("data/test_table")//Thefinalschemaconsistsofall3columnsintheParquetfiles//withthepartitioningcolumnappearedinthepartitiondirectory// |--value:int(nullable= |--square:int(nullable= |--cube:int(nullable= |--key:int(nullable=sparksqlhiveimportjava.io.FilecaseclassRecord(key:Int,value:String)//warehouseLocationpointstothedefaultlocationformanageddatabasesandtablesvalwarehouseLocation=newFile("spark-warehouse").getAbsolutePathvalspark=.appName("SparkHive.config("spark.sql.warehouse.dir",importspark.sqlsql("CREATETABLEIFNOTEXISTSsrc(keyINT,valueSTRING)USINGsql("LOADDATALOCALINPATH'examples/src/main/resources/kv1.txt'INTOTABLEsrc")//QueriesareexpressedinHiveQLsql("SELECT*FROMsrc").show()// // // ////|86|//////Aggregationqueriesarealsosupported.// //// // 500// //TheresultsofSQLqueriesarethemselvesDataFramesandsupportallnormalvalsqlDF=sql("SELECTkey,valueFROMsrcWHEREkey<10ORDERBY//TheitemsinDataFramesareoftypeRow,whichallowsyoutoaccesseachcolumnbyordinal.valstringsDS=sqlDF.mapcaseRow(key:Int,value:String)=>s"Key:$key,Value:}// // // //|Key:0,Value://|Key:0,Value://|Key:0,Value:////Youc souseDataFramestocreatetemporaryviewswithinaSparkSession.valrecordsDF=spark.createDataFrame((1to100).map(i=>Record(i,s"val_$i")))//QueriescanthenjoinDataFramedatawithdatastoredinsql("SELECT*FROMrecordsrJOINsrcsONr.key=// //|key|value|key|// // 2| 2|// 4| 4|// 5| 5|////CreateaHivemanagedParquettable,withHQLsyntaxinsteadoftheSparkSQLnativesyntax//`USINGsql("CREATETABLEhive_records(keyint,valuestring)STOREDAS//SaveDataFrametotheHivemanagedtablevaldf=spark.table("src")//Afterinsertion,theHivemanagedtablehasdatanowsql("SELECT*FROMhive_records").show()// // // ////|86|//////PrepareaParquetdatadirectoryvaldataDir=//CreateaHiveexternalParquetsql(s"CREATEEXTERNALTABLEhive_ints(keyint)STOREDASPARQUETLOCATION'$dataDir'")//TheHiveexternaltableshouldalreadyhavedatasql("SELECT*FROMhive_ints").show()//+---////+---// // // ////TurnonflagforHive
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 2025-2030中国智慧城市建设项目落地情况与长期发展规划分析报告
- 2025-2030中国智慧城市建设投融资模式与效益评估报告
- 2025-2030中国智慧城市安防系统集成商盈利模式与政策红利分析报告
- 2026内蒙古霍林河机场管理有限责任公司招聘工作人员3人备考题库附完整答案详解【各地真题】
- 2026湖南岳阳市云溪区“四海揽才”教师人才校园招聘13人备考题库及答案详解参考
- 2026浙江宁波逸东豪生大酒店招聘7人备考题库及答案详解(真题汇编)
- 2026广东中山板芙镇社区卫生服务中心招聘见习人员3人备考题库(名师系列)附答案详解
- 2026广东广州市招聘中山医学院医科公共平台技术员1人备考题库附参考答案详解【达标题】
- 2026长鑫存储科技集团股份有限公司招聘16人备考题库及完整答案详解(典优)
- 蒙牛2026届春季校园招聘备考题库含答案详解【b卷】
- 部编版三年级下册语文课课练全册(附答案)
- 军用靶场设计方案
- 管理会计学 第10版 课件 第3章 本-量-利分析
- Unit 3 Zhong Nanshan- Part B(小学英语教学)闽教版英语五年级下册
- 消防维保方案(消防维保服务)(技术标)
- 车辆交通危险点分析预控措施
- QC成果提高SBS防水卷材铺贴质量一次合格率
- 大舜号海难事故案例分析
- TGRM 057.1-2023 非煤岩岩爆倾向性评价规范 第1部分:室内指标测定及等级分类
- 2023年安徽新闻出版职业技术学院单招考试职业技能考试模拟试题及答案解析
- LY/T 2271-2014造林树种与造林模式数据库结构规范
评论
0/150
提交评论