火花介绍sparksql为结构化数据处理模块在某些_第1页
火花介绍sparksql为结构化数据处理模块在某些_第2页
火花介绍sparksql为结构化数据处理模块在某些_第3页
火花介绍sparksql为结构化数据处理模块在某些_第4页
火花介绍sparksql为结构化数据处理模块在某些_第5页
已阅读5页,还剩10页未读 继续免费阅读

付费下载

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

SparkSparksqlspark模块。在某些数据处理方面通过添加辅助信息,进行类似于sql方面的优化。Sparksqlsql语句进行处理。简单高效。第一章简单valspark=SparkSession.appName("SparkSQLbasic.config("spark.some.config.option","some-//ForimplicitconversionslikeconvertingRDDstoDataFramesimportspark.implicits._valdf=//DisysthecontentoftheDataFrametostdout//+---- //| // //// // 19|// 文件。通过show方法展示内容。sqlimport//Printtheschemainatreeformat////|--age:long(nullable=//|--name:string(nullable=//Selectonlythe"name"column// // // //// //|// //Selecteverybody,butrementby1df.select($"name",$"age"+1).show()// // name|(age+// // // //| // //Selectpeopleolderthan21df.filter($"age">//+--- ////+--- //|//+--- //Countpeoplebyage// //|// // // // //+---- json。//RegistertheDataFrameasaSQLtemporaryviewvalsqlDF=spark.sql("SELECT*FROMpeople")// //| // //// // 19|// //RegistertheDataFrameasaglobaltemporaryview//Globaltemporaryviewistiedtoasystempdatabase`global_temp`spark.sql("SELECT*FROMglobal_temp.people").show()// //| // //// // 19|// //Globaltemporaryviewiscross-session// //| // //// // 19|// createClobalempiew)caseclass(name:String,age://EncodersarecreatedforcasevalcaseClassDS=Seq(("Andy",32)).toDS()////|Andy|//EncodersformostcommontypesareautomaticallyprovidedbyimportingvalprimitiveDS=Seq(1,2,primitiveDS.map(_+1).collect()//Returns:Array(2,3,//DataFramescanbeconvertedtoaDatasetbyprovidingaclass.MapwillbebyvalpeopleDS=spark.read.json(path).as[]// //| // //// // 19|// RDD//ForimplicitconversionsfromRDDstoDataFramesimportspark.implicits._//CreateanRDDof objectsfromatextfile,convertittoaDataframevalpeopleDF=spark.sparkContext.map(attributes (attributes(0),//RegistertheDataFrameasatemporaryview//SQLstatementscanberunbyusingthesqlmethodsprovidedbyvalteenagersDF=spark.sql("SELECTname,ageFROMpeopleWHEREageBETWEEN13AND19")//ThecolumnsofarowintheresultcanbeaccessedbyfieldindexteenagersDF.map(teenager=>"Name:"+teenager(0)).show()// // // //|Name:// //orbyfielteenagersDF.map(teenager=>"Name:"+// // // //|Name:// //Nopre-definedencodersforDataset[Map[K,V]],defineimplicitvalmapEncoder=org.apache.spark.sql.Encoders.kryo[Map[String,//Primitivetypesandcaseclassescanbealsodefined //row.getValuesMap[T]retrievesmultiplecolumnsatonceintoaMap[String,T] //Array(Map("name"->"Justin","age"->SparksqlRdd。第六章以模式来创建表的型式//Createan //TheschemaisencodedinastringvalschemaString="nameage"//Generatetheschemabasedonthestringofschemavalfields=schemaString.split("").map(fielme=>StructField(fielme,StringType,nullable=true))valschema=StructType(fields)//ConvertrecordsoftheRDD(people)toRowsvalrowRDD=peopleRDD.map(attributes=>Row(attributes(0),//ApplytheschematothevalpeopleDF=spark.createDataFrame(rowRDD,//CreatesatemporaryviewusingtheDataFrame//SQLcanberunoveratemporaryviewcreatedusingDataFramesvalresults=spark.sql("SELECTnameFROMpeople")//TheresultsofSQLqueriesareDataFramesandsupportallthenormalRDD//Thecolumnsofarowintheresultcanbeaccessedbyfieldindexorbyfielresults.map(attributes=>"Name:"+// // // //|Name:// Name://|Name:// Sparksql提供了例如sql的count(), t(),max(),min()等函数。importorg.apache.spark.sql.{Row,importorg.apache.spark.sql.expressions.MutableAggregationBufferimportorg.apache.spark.sql.types._objectMyAverageextendsUserDefinedAggregateFunction//DatatypesofinputargumentsofthisaggregatedefinputSchema:StructType=StructType(StructField("inputColumn",LongType):://DatatypesofvaluesintheaggregationbufferdefbufferSchema:StructType={StructType(StructField("sum",LongType)::StructField("count",LongType)}//ThedatatypeofthereturnedvaluedefdataType:DataType=DoubleType//Whetherthisfunctionalwaysreturnsthesameoutputontheidenticalinputdefdeterministic:Boolean=true//Initializesthegivenaggregationbuffer.Thebufferitselfisa`Row`thatin//standardmethodslikeretrievingavalueatanindex(e.g.,get(),//theopportunitytoupdateitsvalues.Notethatarraysandmapsinsidethebufferare//definitialize(buffer:MutableAggregationBuffer):Unit={=buffer(1)=}//Updatesthegivenaggregationbuffer`buffer`withnewinputdatafrom`input`defupdate(buffer:MutableAggregationBuffer,input:Row):Unit={if(!input.isNullAt(0))buffer(1)=buffer.getLong(1)+1}}//Mergestwoaggregationbuffersandstorestheupdatedbuffervaluesbackdefmerge(buffer1:MutableAggregationBuffer,buffer2:Row):Unit{buffer1(0)=buffer1.getLong(0)+buffer2.getLong(0)buffer1(1)=buffer1.getLong(1)+buffer2.getLong(1)}//Calculatesthefinal}//Registerthefunctiontoaccessit // // // // //| // // valresult=spark.sql("SELECTmyAverage(salary)asaverage_salaryFROM// //// // // initialie

updatemergeimportorg.apache.spark.sql.expressions.AggregatorcaseclassEmployee(name:String,salary:Long)caseclassAverage(varsum:Long,varcount:Long)objectMyAverageextendsAggregator[Employee,Average,Double]//Azerovalueforthisaggregation.Shouldsatisfythepropertythatanyb+zero=bdefzero:Average=Average(0L,0L)//Combwovaluestoproduceanewvalue.Forperformance,thefunctionmaymodify`buffer`//andreturnitinsteadofconstructinganew{buffer.sum+=employee.salarybuffer.count+=1}//Mergetwointermediatedefmerge(b1:Average,b2:Average):Average{b1.sum+=b2.sum}//Transformtheoutputofthedeffinish(reduction:Average):Double=reduction.sum.toDouble///SpecifiestheEncoderfortheintermediatevaluetype//SpecifiestheEncoderforthefinaloutputvaluedefoutputEncoder:Encoder[Double]=} // // // // //| // // //Convertthefunctiontoa`TypedColumn`andgiveitavalresult=ds.select(averageSalary)// //// // // valusersDF=spark.read.load("examples/src/main/resources/users.parquet")parquetJson csv格式导入valpeopleDFCsv=.option("sep",.option("inferSchema",.option("header", Any"error"orWhensavingaDataFrametoadatasoexceptionipectedtobethrown.WhensavingaDataFrametoadatasocontentsoftheDataFrameareOverwritemodemeansthatwhensavindata/tablealreadyexists,existingdatacontentsoftheDataFrame.Ignoremodemeansthatwhensavingaalreadyexists,thesaveoperationiDataFrameandtonotchangetheTABLEIFNOTEXISTSin.bucketBy(42,Rarquet//EncodersformostcommontypesareautomaticallyprovidedbyimportingimportvalpeopleDF=//DataFramescanbesavedasParquetfiles,maintainingtheschemainformation//Readintheparquetfilecreated//Parquetfilesareself-describingsotheschemais//TheresultofloadingaParquetfileisalsoaDataFramevalparquetFileDF=//ParquetfilescsobeusedtocreateatemporaryviewandthenusedinSQLvalnamesDF=spark.sql("SELECTnameFROMparquetFileWHEREagWEEN13AND19")namesDF.map(attributes=>"Name:"+// // // //|Name:// Schema//ThisisusedtoimplicitlyconvertanRDDtoaDataFrame.importspark.implicits._//CreateasimpleDataFrame,storeintoapartition//CreateanotherDataFrameina rtition//addinganewcolumnanddropanexistingvalcubesDF=spark.sparkContext.makeRDD(6to10).map(i=>(i,i*i*i)).toDF("value","cube")//ReadthepartitionedvalmergedDF=spark.read.option("mergeSchema","true").parquet("data/test_table")//Thefinalschemaconsistsofall3columnsintheParquetfiles//withthepartitioningcolumnappearedinthepartitiondirectory// |--value:int(nullable= |--square:int(nullable= |--cube:int(nullable= |--key:int(nullable=sparksqlhiveimportjava.io.FilecaseclassRecord(key:Int,value:String)//warehouseLocationpointstothedefaultlocationformanageddatabasesandtablesvalwarehouseLocation=newFile("spark-warehouse").getAbsolutePathvalspark=.appName("SparkHive.config("spark.sql.warehouse.dir",importspark.sqlsql("CREATETABLEIFNOTEXISTSsrc(keyINT,valueSTRING)USINGsql("LOADDATALOCALINPATH'examples/src/main/resources/kv1.txt'INTOTABLEsrc")//QueriesareexpressedinHiveQLsql("SELECT*FROMsrc").show()// // // ////|86|//////Aggregationqueriesarealsosupported.// //// // 500// //TheresultsofSQLqueriesarethemselvesDataFramesandsupportallnormalvalsqlDF=sql("SELECTkey,valueFROMsrcWHEREkey<10ORDERBY//TheitemsinDataFramesareoftypeRow,whichallowsyoutoaccesseachcolumnbyordinal.valstringsDS=sqlDF.mapcaseRow(key:Int,value:String)=>s"Key:$key,Value:}// // // //|Key:0,Value://|Key:0,Value://|Key:0,Value:////Youc souseDataFramestocreatetemporaryviewswithinaSparkSession.valrecordsDF=spark.createDataFrame((1to100).map(i=>Record(i,s"val_$i")))//QueriescanthenjoinDataFramedatawithdatastoredinsql("SELECT*FROMrecordsrJOINsrcsONr.key=// //|key|value|key|// // 2| 2|// 4| 4|// 5| 5|////CreateaHivemanagedParquettable,withHQLsyntaxinsteadoftheSparkSQLnativesyntax//`USINGsql("CREATETABLEhive_records(keyint,valuestring)STOREDAS//SaveDataFrametotheHivemanagedtablevaldf=spark.table("src")//Afterinsertion,theHivemanagedtablehasdatanowsql("SELECT*FROMhive_records").show()// // // ////|86|//////PrepareaParquetdatadirectoryvaldataDir=//CreateaHiveexternalParquetsql(s"CREATEEXTERNALTABLEhive_ints(keyint)STOREDASPARQUETLOCATION'$dataDir'")//TheHiveexternaltableshouldalreadyhavedatasql("SELECT*FROMhive_ints").show()//+---////+---// // // ////TurnonflagforHive

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

最新文档

评论

0/150

提交评论