版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
2016年11月岳凯Spark2学习(xuéxí)总结共一百零五页目录I
大数据(shùjù)基本概念II
Spark基本知识 Spark数据分析案例(ànlì)IVIII
Spark进阶知识共一百零五页大数据(shùjù)基本概念大数据的5V特点(IBM提出):Volume(大量(dàliàng))Velocity(高速)Variety(多样)Value(价值)Veracity(真实性)大数据不解释因果关系,只关心相关性共一百零五页大数据(shùjù)基本概念Google三篇论文,奠定了大数据算法的基础2003年,发布(fābù)GoogleFileSystem论文
这是一个可扩展的分布式文件系统,用于大型的、分布式的、对大量数据进行访问的应用,运行于廉价的普通硬件上,提供容错功能。从根本上说:文件被分割成很多块,使用冗余的方式储存于商用机器集群上。2004年,发布MapReduce论文
论文描述了大数据的分布式计算方式,主要思想是将任务分解后在多台处理能力较弱的计算节点中同时处理,然后将结果合并从而完成大数据处理。2006年,发布Bigtable论文,
启发了无数的NoSQL数据库,如:Cassandra、HBase、MongoDB等等。共一百零五页大数据(shùjù)基本概念互联网大数据的典型代表包括:用户行为(xíngwéi)数据:精准广告投放、内容推荐、行为习惯和喜好分析、产品优化等用户消费数据:精准营销、信用记录分析、活动促销、理财等用户地理位置数据:O2O推广,商家推荐,交友推荐等互联网金融数据:P2P,小额贷款,支付,信用,供应链金融等用户社交等UGC数据:趋势分析、流行元素分析、受欢迎程度分析、舆论监控分析、社会问题分析等共一百零五页大数据(shùjù)基本概念-Hadoop生态系统Hadoop是一个由Apache基金会所开发的分布式系统基础架构。用户可以在不了解分布式底层细节的情况下,开发分布式程序,充分利用集群的威力进行高速(ɡāosù)运算和存储。Hadoop的核心的设计是:HDFS和MapReduce。HDFS为海量的数据提供了存储,MapReduce为海量的数据提供了计算。共一百零五页大数据(shùjù)基本概念-Hadoop-HDFSHDFS(Hadoop分布式文件系统)源自于Google的GFS论文,HDFS是GFS克隆版。HDFS是Hadoop体系中数据存储管理的基础。它是一个高度容错的系统,能检测和应对硬件故障,用于在低成本的通用(tōngyòng)硬件上运行。HDFS简化了文件的一致性模型,通过流式数据访问,提供高吞吐量应用程序数据访问功能,适合带有大型数据集的应用程序。它提供了一次写入多次读取的机制,数据以块的形式,同时分布在集群的不同物理机器上。共一百零五页大数据(shùjù)基本概念-Hadoop-MapReduceMapReduce(分布式计算框架)源自于google的MapReduce论文,HadoopMapReduce是googleMapReduce克隆版。MapReduce是一种分布式计算模型,用以进行大数据量的计算。它屏蔽(píngbì)了分布式计算框架细节,将计算抽象成map和reduce两部分,其中Map对数据集上的独立元素进行指定的操作,生成键值对形式的中间结果。Reduce则对中间结果中相同“键”的所有“值”进行规约,以得到最终结果。MapReduce非常适合在大量计算机组成的分布式并行环境里进行数据处理。共一百零五页大数据(shùjù)基本概念-Hadoop-HBASEHBASE(分布式列存数据库)源自Google的Bigtable论文,HBase是GoogleBigtable克隆版HBase是一个建立在HDFS之上,面向列的,针对结构化数据的可伸缩、高可靠、高性能、分布式的动态模式数据库。HBase采用了BigTable的数据模型:增强的稀疏排序(páixù)映射表(Key/Value),其中,键由行关键字、列关键字和时间戳构成。HBase提供了对大规模数据的随机、实时读写访问。HBase中保存的数据可以使用MapReduce来处理,它将数据存储和并行计算完美地结合在一起。共一百零五页大数据(shùjù)基本概念-Hadoop-Zookeeper&HIVEZookeeper(分布式协作服务)解决分布式环境下的数据管理问题:统一命名,状态同步,集群管理,配置同步等。Hadoop的许多组件依赖于Zookeeper,它运行在计算机集群上面,用于管理Hadoop操作。HIVE(数据仓库)Hive定义了一种类似SQL的查询语言(HQL),将SQL转化为MapReduce任务在Hadoop上执行。通常用于离线分析。HQL用于运行存储(cúnchǔ)在Hadoop上的查询语句,Hive让不熟悉MapReduce的开发人员也能编写数据查询语句,然后这些语句被翻译为Hadoop上面的MapReduce任务。共一百零五页大数据(shùjù)基本概念-Hadoop-YarnYarn(分布式资源管理器)Yarn是MapReducev2版本,是在第一代MapReduce基础上演变而来的。重构的根本思想是将JobTracker两个主要的功能分离成单独的组件,这两个功能是资源管理和任务调度/监控。Yarn是一种分层的集群框架,。分层结构的本质是ResourceManager,这个实体控制整个(zhěnggè)集群并管理应用程序向基础计算资源的分配。ResourceManager将各个资源部分(计算、内存、带宽等)精心安排给基础NodeManager(Yarn的每节点代理)。共一百零五页大数据(shùjù)基本概念-Hadoop-MesosMesos(分布式资源管理器)Mesos是诞生于UCBerkeley的一个研究(yánjiū)项目,现已成为Apache项目,当前有一些公司使用Mesos管理集群资源,比如Twitter。与Yarn类似,Mesos是一个资源统一管理和调度的平台,同样支持比如MR、streaming等多种运算框架。共一百零五页大数据(shùjù)基本概念-Hadoop适用的场景Hadoop主要应用于数据量大的离线场景。特征为:数据量大。一般真正线上用Hadoop的,集群规模都在上百台到几千台的机器。离线。MapReduce框架下,很难处理实时计算,作业(zuòyè)都以日志分析这样的线下作业(zuòyè)为主。数据块大。由于HDFS设计的特点,Hadoop适合处理文件块大的文件。如:百度每天都会有用户对侧边栏广告进行点击。这些点击都会被记入日志。然后在离线场景下,将大量的日志使用Hadoop进行处理,分析用户习惯等信息。共一百零五页大数据(shùjù)基本概念-后Hadoop时代大数据应用未来的价值在于预测,而预测的核心是分析。未来的几年,将进入“后Hadoop时代”:内存计算时代的来临随着高级分析和实时应用的增长,对处理能力提出了更高的要求,数据处理重点从IO重新回到CPU。以内存计算为核心的Spark将代替以IO吞吐为核心的MapReduce成为分布式大数据处理的缺省通用引擎。统一数据访问管理现在的数据访问由于数据存储的格式不同、位置不同,用户需要使用不同的接口、模型甚至语言。未来的趋势是将底层部署运维细节和上层业务开发(kāifā)进行隔离。简化实时应用现在用户不仅关心如何实时的收集数据,而且关心同时尽快的实现数据可见和分析结果上线,希望能够有一种解决快速数据的方案,使用HDFS和HBase的混合体,在快速更新数据的同时进行快速分析。共一百零五页目录I
大数据(shùjù)基本概念II
Spark基本知识 Spark数据分析案例(ànlì)IVIII
Spark进阶知识共一百零五页Spark是什么(shénme)?Lightning-fastclustercomputingApacheSpark™isafastandgeneralengineforlarge-scaledataprocessing.Spark是一个快速和通用(tōngyòng)的大数据处理引擎。闪电快速集群计算共一百零五页Spark是什么(shénme)?Spark最初是由加州大学伯克利分校的AMPLab于2009年提交的一个项目,现在是Apache软件基金会旗下最活跃的开源项目之一Spark是一个分布式的大数据处理框架,基于(jīyú)RDD(弹性分布式数据集),立足内存计算的一栈式计算平台2016年11月14日发布Spark2.0.2版本共一百零五页Spark四大(sìdà)优势优势一:快速(kuàisù)处理(Speed)
Spark内存运行速度是HadoopMapReduce100倍,硬盘运行速度为10倍。使用逻辑回归算法处理同样数据,Hadoop用时110秒,Spark用时0.9秒共一百零五页Spark四大(sìdà)优势优势二:易于(yìyú)使用(EaseofUse)
Spark代码量小;支持Scala、Java、Python,R语言的API。Spark代码示例,计算一个文件中有多少个单词:valfile=sc.textFile(“hdfs://…”)valcounts=file.flatMap(line=>line.split(““)).map(word=>(word,1)).reduceByKey(_+_)counts.saveAsTextFile(“hdfs://…”)共一百零五页Spark四大(sìdà)优势优势三:通用性强(Generality)
Spark提供了一个(yīɡè)强大的技术堆栈,是一个(yīɡè)可以进行即时查询、流处理、机器学习、图处理等多种大数据处理的计算平台。共一百零五页Spark四大(sìdà)优势优势(yōushì)四:可以与Hadoop数据集成(RunsEverywhere)
Spark可以独立运行,也可以运行在Mesos、Yarn等集群资源管理系统上;还可以运行在Hadoop数据源上,如Hive、HBase、HDFS等。共一百零五页Spark生态系统(shēnɡtàixìtǒnɡ)BDAS-旧版BDAS(theBerkeleyDataAnalyticsStack),全称伯克利数据分析栈,是AMP实验室打造的一个(yīɡè)开源的大数据处理一体化的技术生态系统,其核心框架是Spark共一百零五页Spark生态系统(shēnɡtàixìtǒnɡ)BDAS-新版BDAS(theBerkeleyDataAnalyticsStack),全称伯克利数据分析栈,是AMP实验室打造的一个开源(kāiyuán)的大数据处理一体化的技术生态系统,其核心框架是Spark共一百零五页Spark组件(zǔjiàn)-SparkCoreSpark的核心(héxīn)组件分布式大数据处理框架比MapReduce计算速度快核心技术为RDDSparkCoreBDAS伯克利数据分析栈SparkSparkCoreRDD弹性分布式数据集核心框架核心组件核心技术共一百零五页Spark组件(zǔjiàn)-SparkSQL集成性好:在Spark程序(chéngxù)中无缝混合SQL查询统一的数据访问:以同样的方式连接到任何数据源兼容Hive:已有Hive查询语句不用修改支持标准连接:JDBC、ODBCSparkSQLSparkSQLisApacheSpark'smoduleforworkingwithstructureddata.SparkSQL在Spark内核基础上提供了对结构化数据的处理共一百零五页Spark组件(zǔjiàn)-SparkStreamingSparkStreamingSparkStreamingmakesiteasytobuildscalablefault-tolerantstreamingapplications.SparkStreaming是一个对实时数据流进行高通量、容错处理(chǔlǐ)的流式处理(chǔlǐ)系统。易于使用:可通过高层操作构建应用,支持Java,Scala和Python容错:可以不用额外代码就恢复丢失的工作和操作集成:基于Spark运行,可以组合流的批处理和交互式查询共一百零五页Spark组件(zǔjiàn)-GraphX灵活:与图形和集合无缝工作速度(sùdù)快:性能堪比最快的专业图形处理系统算法:从一个不断增长的图形算法库选取GraphXGraphXisApacheSpark'sAPIforgraphsandgraph-parallelcomputation.GraphX是一个基于Spark的分布式图计算子框架,提供了图计算中用于图和图并行计算的接口。共一百零五页Spark组件(zǔjiàn)-MLlibMLlibMLlibisApacheSpark'sscalablemachinelearninglibrary.MLlib(MachineLearninglib)是Spark对常用的机器(jīqì)学习算法的实现库。易于使用:可用于Java,Scala,Python,andR高性能:高质量的算法,比MapReduce快100倍易于部署:可在现有的Hadoop集群和数据上运行共一百零五页Spark常用(chánɡyònɡ)概念-RDD是Spark的核心和架构基础是一个容错的、并行的数据结构是一个只读的分区记录(jìlù)集合是一个内存数据集可以简单看成是一个分区存储的数组RDD(弹性分布式数据集)英文全称:ResilientDistributedDatasetsRDDisafault-tolerantcollectionofelementsthatcanbeoperatedoninparallel.共一百零五页Spark学习的几个(jǐɡè)阶段第一阶段:熟练的掌握Scala语言Spark框架是采用Scala语言编写的,精致而优雅(yōuyǎ)。要阅读Spark的源代码,就必须掌握Scala。第二阶段:精通Spark平台本身提供给开发者的API掌握Spark中面向RDD的开发模式,掌握Spark中的宽依赖和窄依赖以及lineage机制,掌握RDD的计算流程。第三阶段:深入Spark内核通过源码掌握Spark的任务提交过程,掌握Spark集群的任务调度。第四阶级:掌握基于Spark上的核心框架的使用SparkSQL、SparkStreaming、MLlib、GraphX。第五阶级:做商业级别的Spark项目第六阶级:提供Spark解决方案共一百零五页目录I
大数据(shùjù)基本概念II
Spark基本知识 Spark数据分析案例(ànlì)IVIII
Spark进阶知识RDD操作基本工作流程环境及部署交互工具与提交工具Web监控共一百零五页RDD的依赖(yīlài)关系表示(biǎoshì)RDD表示分区RDD之间的依赖关系可以分为两类,即:(1)窄依赖:子RDD的每个分区依赖于常数个父分区(即与数据规模无关);(2)宽依赖:子RDD的每个分区依赖于所有父RDD分区。共一百零五页RDD的操作(cāozuò)方式RDD有两种操作(cāozuò)方式:转换(Transformations):返回RDD,如:map,filter,groupBy,join等动作(Actions):返回值不是一个RDD,如:count,collect,save等Transformations操作是惰性的,只记录,不马上执行;Actions才会触发计算共一百零五页Spark中的Stage划分(huàfēn)从HDFS中读入数据生成3个不同的RDD,通过一系列操作后,再将计算结果保存回HDFS。只有join操作是宽依赖,以此为边界将其前后划分(huàfēn)成不同的StageStage2中,从map到union都是窄依赖,可以形成流水线操作共一百零五页RDD操作(cāozuò)-Transformations转换操作说明map通过自定义函数进行映射filter对元素过滤,保留符合条件的元素flatMap先映射(map),再把元素合并为一个集合sample对元素采样,返回一个子集union将两个RDD集合合并,返回并集groupByKeyKey相同的值被分为一组reduceByKey返回KV对数据集,key相同的值被聚合sortByKey通过Key值对KV对数据集排序join对两个RDD进行cogroup、笛卡尔积、展平操作,(K,V)和(K,W)转换为(K,(V,W))cogroup组合两个key值相同的KV元素其他转换操作(cāozuò)还包括:mapPartitions、mapPartitionsWithIndex、intersection、distinct、aggregateByKey、cartesian、pipe、coalesce、repartition、repartitionAndSortWithinPartitions等共一百零五页RDD操作(cāozuò)-ActionAction操作说明reduce通过自定义函数聚合数据集中的元素collect相当于toArray,将分布式的RDD返回为一个单机的Array数组。count返回RDD中元素的个数first返回数据集中第一个元素,与take(1)类似take(n)返回数据集中前n个元素形成的数组saveAsTextFile保存数据到文本文件countByKey只用于KV类型RDD,返回每个Key的个数foreach(func)对数据集中每个元素使用func函数其他Action操作(cāozuò)还包括:takeSample、takeOrdered、saveAsSequenceFile、saveAsObjectFile等共一百零五页Spark的基本工作(gōngzuò)流程①②③③④⑤⑤⑥⑥⑦⑦⑧初始化SparkContext申请资源初始化Executor解析RDD,划分Stage,调度任务发送任务到Executor执行计算任务返回(fǎnhuí)计算结果关闭SparkContex,回收资源共一百零五页Spark环境(huánjìng)准备环境种类需要的软件Spark应用环境1、Linux(REHL或Ubuntu)
2、JDK1.83、Scala2.11.84、Hadoop2.7Spark源码调试环境(Linux或
Windows)1、JDK1.82、Scala2.11.83、Maven(或SBT)4、git5、IntelliJIDEA6、Cygwin
(用于windows系统)共一百零五页Spark部署(bùshǔ)模式部署模式说明Local模式本地模式,可以启动本地一个线程来运行job,可以启动N个线程或者使用系统所有核运行job,方便调试。Standalone模式简单模式或称独立模式,可以单独部署到一个集群中,不依赖任何其他资源管理系统。Yarn-Cluster模式运行在Yarn上的模式,Driver运行在Worker节点(工作节点)上。Yarn-Client模式运行在Yarn上的模式,Driver运行在Client(客户端)上。Mesos模式官方推荐模式,通用集群管理,有两种调度模式:粗粒度模式与细粒度模式。共一百零五页Spark交互(jiāohù)工具-SparkShellSparkShell是Spark的交互式命令(mìnglìng)终端,是一种学习API的简单途径,也是分析数据集交互的有力工具。共一百零五页Spark提交应用(yìngyòng)工具-SparkSubmitSparkSubmit是Spark向集群提交应用的工具,使用脚本部署,根据具体的集群模式,配置相应(xiāngyīng)的选项参数。例子如下:#Runapplicationlocallyon8cores./bin/spark-submit\--classorg.apache.spark.examples.SparkPi\--masterlocal[8]\/path/to/examples.jar\100#RunonaSparkstandaloneclusterinclientdeploymode./bin/spark-submit\--classorg.apache.spark.examples.SparkPi\--masterspark://38:7077\--executor-memory20G\--total-executor-cores100\/path/to/examples.jar\1000共一百零五页SparkWeb界面(jièmiàn)Spark可以通过WebUI控制台查看(chákàn)作业的运行情况。如:MasterIP:8080共一百零五页DriverProgram的Web界面(jièmiàn)SparkShell启动后,可以通过(tōngguò)MasterIP:4040查看Jobs信息、Stages信息、Storage信息、Environment信息、Executors信息和SQL信息。共一百零五页Hadoop的Web界面(jièmiàn)如果启动了Hadoop,可以通过MasterIP:50070查看(chákàn)Hadoop相关信息,如HDFS文件情况。共一百零五页目录I
大数据(shùjù)基本概念II
Spark基本知识 Spark数据分析案例(ànlì)IVIII
Spark进阶知识入门实例SparkSQL实例-人事信息处理SparkMLlib–常用算法实例SparkStreaming实例GraphX实例-社会关系图操作共一百零五页Spark入门(rùmén)实例-README.md统计实例主题(zhǔtí):对Spark的README.md文件进行统计,按单词出现次数降序排列。#ApacheSparkSparkisafastandgeneralclustercomputingsystemforBigData.Itprovideshigh-levelAPIsinScala,Java,Python,andR,andanoptimizedenginethatsupportsgeneralcomputationgraphsfordataanalysis.Italsosupportsarichsetofhigher-leveltoolsincludingSparkSQLforSQLandDataFrames,MLlibformachinelearning,GraphXforgraphprocessing,andSparkStreamingforstreamprocessing.</>……README.md文件内容如下:共一百零五页Spark入门实例(shílì)-README.md统计实例主题(zhǔtí):对Spark的README.md文件进行统计,按单词出现次数降序排列。//读出README.md文件到内存,生成一个RDDvalline=sc.textFile("/usr/local/spark-2.0.2/README.md")//对数据加工,统计,列出结果line.flatMap(_.split('')).map((_,1)).reduceByKey(_+_).map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1)).foreach(println)Spark脚本如下:共一百零五页Spark入门(rùmén)实例-README.md统计scala>valline=sc.textFile("/usr/local/spark-2.0.1/README.md")line:org.apache.spark.rdd.RDD[String]=/usr/local/spark-2.0.1/README.mdMapPartitionsRDD[1]attextFileat<console>:24scala>line.countres0:Long=99scala>line.collectres1:Array[String]=Array(#ApacheSpark,"",SparkisafastandgeneralclustercomputingsystemforBigData....scala>line.flatMap(_.split('')).map((_,1)).reduceByKey(_+_).map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1)).foreach(println)(,68)(the,22)(Spark,15)(to,14)…….Spark脚本执行(zhíxíng)结果如下:共一百零五页Spark入门实例(shílì)-README.md统计scala>valline01=sc.textFile("/usr/local/spark-2.0.1/README.md")scala>line01.countres0:Long=99scala>line01.collectres1:Array[String]=Array(#ApacheSpark,"",SparkisafastandgeneralclustercomputingsystemforBigData....scala>valflatmap02=line01.flatMap(_.split(''))scala>flatmap02.countres7:Long=540scala>flatmap02.collectres8:Array[String]=Array(#,Apache,Spark,"",Spark,is,a,fast,and,general,cluster,...scala>valmap03=flatmap02.map((_,1))scala>map03.countres9:Long=540scala>map03.collectres10:Array[(String,Int)]=Array((#,1),(Apache,1),(Spark,1),("",1),(Spark,1),(is,1),(a,1),(fast,1),(and,1),(general,1),(cluster,1),...Spark脚本分解执行(zhíxíng),结果如下:line.flatMap(_.split('')).map((_,1)).reduceByKey(_+_).map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1)).foreach(println)共一百零五页Spark入门实例(shílì)-README.md统计scala>valreduce04=map03.reduceByKey(_+_)scala>reduce04.countres11:Long=275scala>reduce04.collectres12:Array[(String,Int)]=Array((package,1),(For,3),(Programs,1),(processing.,1),(Because,1),(The,1),(cluster.,1),(its,1),([run,1),...scala>valmap05=reduce04.map(x=>(x._2,x._1))scala>map05.collectres13:Array[(Int,String)]=Array((1,package),(3,For),(1,Programs),(1,processing.),(1,Because),(1,The),(1,cluster.),(1,its),(1,[run),...scala>valsort06=map05.sortByKey(false)scala>sort06.collectres14:Array[(Int,String)]=Array((68,""),(22,the),(15,Spark),(14,to),(11,for),(11,and),(8,##),(8,a),(7,run),(7,can),(6,is),(6,in),...scala>valmap07=sort06.map(x=>(x._2,x._1))scala>map07.collectres15:Array[(String,Int)]=Array(("",68),(the,22),(Spark,15),(to,14),(for,11),(and,11),(##,8),(a,8),(run,7),(can,7),(is,6),(in,6),...scala>map07.foreach(println)Spark脚本分解执行,结果(jiēguǒ)如下:line.flatMap(_.split('')).map((_,1)).reduceByKey(_+_).map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1)).foreach(println)共一百零五页SparkSQL实例(shílì)-概念SparkSQLSparkSQL在Spark内核基础上提供(tígōng)了对结构化数据的处理DataFrameADataFrameisaDatasetorganizedintonamedcolumns.Itisconceptuallyequivalenttoatableinarelationaldatabase
DataFrame是一个以命名列方式组织的分布式数据集,在概念上类似于关系数据库中的一个表。共一百零五页SparkSQL实例(shílì)-概念SparkSQL所有功能(gōngnéng)的入口点是SparkSessionHive是Shark的前身,Shark是SparkSQL的前身HQL定义Hive是Hadoop上的数据仓库基础架构,可以进行ETL,Hive定义了简单的类SQL查询语言,称为HQL。DataFrame可以由结构化数据文件转换得到,或从Hive中的表得来,也可以转换自外部数据库或现有的RDD。共一百零五页SparkSQL实例(shílì)-人事信息处理实例(shílì)主题:对人事信息进行合并、查询、统计等操作。people.json{"name":"Michael","jobnumber":"001","age":33,"gender":"male","deptId":1,"salary":3000}{"name":"Andy","jobnumber":"002","age":30,"gender":"female","deptId":2,"salary":4000}{"name":"Justin","jobnumber":"003","age":19,"gender":"male","deptId":3,"salary":5000}{"name":"John","jobnumber":"004","age":32,"gender":"male","deptId":1,"salary":6000}{"name":"Herry","jobnumber":"005","age":20,"gender":"female","deptId":2,"salary":7000}{"name":"Jack","jobnumber":"006","age":26,"gender":"male","deptId":3,"salary":3000}newPeople.json{"name":"John","jobnumber":"007","age":32,"gender":"male","deptId":1,"salary":4000}{"name":"Herry","jobnumber":"008","age":20,"gender":"female","deptId":2,"salary":5000}{"name":"Jack","jobnumber":"009","age":26,"gender":"male","deptId":3,"salary":6000}department.json{"name":"DevelopmentDept","deptId":1}{"name":"personnelDept","deptId":2}{"name":"TestingDept","deptId":3}数据准备,创建如下3个数据文件:共一百零五页SparkSQL实例-人事(rénshì)信息处理-加载数据//导入隐式转换(zhuǎnhuàn)包,部分API调用需要scala>importspark.implicits._importspark.implicits._scala>valpeople=spark.read.json("/usr/local/test/people.json")people:org.apache.spark.sql.DataFrame=[age:bigint,deptId:bigint...4morefields]//通过show方法,以表格形式输出DataFrame的内容scala>people.show+++++++|age|deptId|gender|jobnumber|name|salary|+++++++|33|1|male|001|Michael|3000||30|2|female|002|Andy|4000||19|3|male|003|Justin|5000||32|1|male|004|John|6000||20|2|female|005|Herry|7000||26|3|male|006|Jack|3000|+++++++共一百零五页SparkSQL实例(shílì)-人事信息处理-加载数据scala>valdept=spark.read.json("/usr/local/test/department.json")scala>dept.show+++|deptId|name|+++|1|DevelopmentDept||2|personnelDept||3|TestingDept|+++scala>valnewPeople=spark.read.json("/usr/local/test/newPeople.json")scala>newPeople.show+++++++|age|deptId|gender|jobnumber|name|salary|+++++++|32|1|male|007|John|4000||20|2|female|008|Herry|5000||26|3|male|009|Jack|6000|+++++++共一百零五页SparkSQL实例(shílì)-人事信息处理-基本操作举例//显示(xiǎnshì)树形的表结构scala>people.printSchema()root|--age:long(nullable=true)|--deptId:long(nullable=true)|--gender:string(nullable=true)|--jobnumber:string(nullable=true)|--name:string(nullable=true)|--salary:long(nullable=true)//只显示name列,show用表格形式输出scala>people.select("name").show++|name|++|Michael||Andy||Justin||John||Herry||Jack|++共一百零五页SparkSQL实例-人事(rénshì)信息处理-基本操作举例//显示(xiǎnshì)所有人,但年龄加1scala>people.select($"name",$"age"+1).show()+++|name|(age+1)|+++|Michael|34||Andy|31||Justin|20||John|33||Herry|21||Jack|27|+++//显示年龄大于21的人scala>people.filter($"age">21).show()+++++++|age|deptId|gender|jobnumber|name|salary|+++++++|33|1|male|001|Michael|3000||30|2|female|002|Andy|4000||32|1|male|004|John|6000||26|3|male|006|Jack|3000|+++++++共一百零五页SparkSQL实例(shílì)-人事信息处理-基本操作举例//显示(xiǎnshì)每个年龄的人数scala>people.groupBy("age").count().show()+++|age|count|+++|26|1||19|1||32|1||33|1||30|1||20|1|+++//查询全部列信息,以数组形式返回列名组scala>people.columnsres0:Array[String]=Array(age,deptId,gender,jobnumber,name,salary)//统计记录条数scala>people.countres1:Long=6//取前三条记录,以数组形式呈现scala>people.take(3)res2:Array[org.apache.spark.sql.Row]=Array([33,1,male,001,Michael,3000],[30,2,female,002,Andy,4000],[19,3,male,003,Justin,5000])共一百零五页SparkSQL实例(shílì)-人事信息处理-基本操作举例//以"jobnumber"列升序,"deptId"列降序显示(xiǎnshì)内容scala>people.sort($"jobnumber".asc,col("deptId").desc).show+++++++|age|deptId|gender|jobnumber|name|salary|+++++++|33|1|male|001|Michael|3000||30|2|female|002|Andy|4000||19|3|male|003|Justin|5000||32|1|male|004|John|6000||20|2|female|005|Herry|7000||26|3|male|006|Jack|3000|+++++++共一百零五页SparkSQL实例(shílì)-人事信息处理-基本操作举例//合并(hébìng)新员工scala>people.unionAll(newPeople).show+++++++|age|deptId|gender|jobnumber|name|salary|+++++++|33|1|male|001|Michael|3000||30|2|female|002|Andy|4000||19|3|male|003|Justin|5000||32|1|male|004|John|6000||20|2|female|005|Herry|7000||26|3|male|006|Jack|3000||32|1|male|007|John|4000||20|2|female|008|Herry|5000||26|3|male|009|Jack|6000|+++++++共一百零五页SparkSQL实例-人事(rénshì)信息处理-基本操作举例//查找同名(tóngmíng)员工,先合并,然后用“name”列分组scala>valgroupName=people.unionAll(newPeople).groupBy(col("name")).countgroupName:org.apache.spark.sql.DataFrame=[name:string,count:bigint]scala>groupName.show+++|name|count|+++|Jack|2||Michael|1||John|2||Andy|1||Justin|1||Herry|2|+++//对数量大于1的过滤,得到同名员工scala>groupName.filter($"count">1).show+++|name|count|+++|Jack|2||John|2||Herry|2|+++共一百零五页SparkSQL实例(shílì)-人事信息处理-基本操作举例//注册SQL临时视图scala>people.createOrReplaceTempView("t_people")//使用(shǐyòng)SQL查询scala>valsqlDF=spark.sql("SELECT*FROMt_people")sqlDF:org.apache.spark.sql.DataFrame=[age:bigint,deptId:bigint...4morefields]scala>sqlDF.show()+++++++|age|deptId|gender|jobnumber|name|salary|+++++++|33|1|male|001|Michael|3000||30|2|female|002|Andy|4000||19|3|male|003|Justin|5000||32|1|male|004|John|6000||20|2|female|005|Herry|7000||26|3|male|006|Jack|3000|+++++++共一百零五页SparkStreaming实例(shílì)-概念SparkStreamingSparkStreaming是一个对实时数据流进行高通量、容错(rónɡcuò)处理的流式处理系统。共一百零五页SparkStreaming实例(shílì)-概念SparkStreaming内部(nèibù)处理机制是,接收实时流的数据,并根据一定的时间间隔拆分成一批批的数据,然后通过SparkEngine处理这些数据,最终得到处理后的一批批结果数据。DStream(discretizedstream)代表一个连续的数据流,一个DStream可以看成一组RDD,即RDD的一个序列。共一百零五页SparkStreaming实例(shílì)-操作类型
对DStream进行操作时,会被SparkStreaming引擎转化成对底层RDD的操作。对DStream的操作类型(lèixíng)有:Transformations:类似于对RDD的操作,SparkStreaming提供了一系列的转换操作去支持对DStream的修改。如map,union,filter,transform等WindowOperations:窗口操作,支持通过设置窗口长度和滑动间隔的方式操作数据。常用的操作有reduceByWindow,reduceByKeyAndWindow,window等OutputOperations:输出操作,允许将DStream数据推送到其他外部系统或存储平台,如HDFS,Database等,类似于RDD的Action操作,Output操作也会实际上触发对DStream的转换操作。常用的操作有print,saveAsTextFiles,saveAsHadoopFiles,foreachRDD等。共一百零五页SparkStreaming实例(shílì)-窗口操作SparkStreaming提供了窗口的计算,允许通过滑动窗口对数据进行(jìnxíng)转换。每次窗口在源DStream滑动,落入窗口内的源RDD被组合和操作以产生窗口化DStream的RDD,在这个特定情况下,操作被应用于过去的3个时间单元的数据,并且在每2个时间单元滑动。这里涉及两个参数:窗口长度(windowlength),指窗口的持续时间(图中为3)。滑动间隔(slidinginterval),指经过多长时间窗口滑动一次形成新的窗口(图中为2)。这两个参数必须是源DStream批处理间隔(图中为1)的倍数。共一百零五页SparkStreaming实例(shílì)-Kafka介绍Kafka是一个分布式的,高吞吐量,易于扩展地基于主题发布/订阅的消息系统(xìtǒng),最早是由Linkedin开发,并于2011年开源并贡献给Apache软件基金会。一般来说,Kafka有以下几个典型的应用场景:作为消息队列。由于Kafka拥有高吞吐量,并且内置消息主题分区,备份,容错等特性,使得它更适合使用在大规模,高强度的消息数据处理的系统中。流计算系统的数据源。流数据产生系统作为Kafka消息数据的生产者将数据流分发给Kafka消息主题,流数据计算系统(Storm,SparkStreaming等)实时消费并计算数据。这也是本文将要介绍的应用场景。系统用户行为数据源。这种场景下,系统将用户的行为数据,如访问页面,停留时间,搜索日志,感兴趣的话题等数据实时或者周期性的发布到Kafka消息主题,作为对接系统数据的来源。日志聚集。Kafka可以作为一个日志收集系统的替代解决方案,我们可以将系统日志数据按类别汇集到不同的Kafka消息主题中。事件源。在基于事件驱动的系统中,我们可以将事件设计成合理的格式,作为Kafka消息数据存储起来,以便相应系统模块做实时或者定期处理。由于Kafka支持大数据量存储,并且有备份和容错机制,所以可以让事件驱动型系统更加健壮和高效。共一百零五页SparkStreaming实例-读取Kafka数据(shùjù)代码示例*Usage:KafkaWordCount<zkQuorum><group><topics><numThreads>*4个参数:zookeeper地址、kafka消费者组名、kafkatopics、线程数objectKafkaWordCount{defmain(args:Array[String]){valArray(zkQuorum,group,topics,numThreads)=args//获取4个参数valsparkConf=newSparkConf().setAppName("KafkaWordCount")//创建sparkConfvalssc=newStreamingContext(sparkConf,Seconds(2))valtopicMap=topics.split(",").map((_,numThreads.toInt)).toMap//获取topicvallines=KafkaUtils.createStream(ssc,zkQuorum,group,topicMap).map(_._2)valwords=lines.flatMap(_.split(""))//用空格分隔获取的数据,生成数组valwordCounts=words.map(x=>(x,1L))//窗口操作(cāozuò)计算单词数量.reduceByKeyAndWindow(_+_,_-_,Minutes(10),Seconds(2),2)wordCounts.print()//窗口长度为10分钟,滑动间隔为2秒ssc.start()//启动StreamingContext,实际执行前面操作}}共一百零五页GraphX实例(shílì)-概念GraphXGraphX是一个(yīɡè)基于Spark的分布式图计算子框架,提供了图计算中用于图和图并行计算的接口。共一百零五页GraphX实例(shílì)-概念-属性图属性(shǔxìng)图(PropertyGraph)是一个有向多重图,用于表示具有点和边的用户定义对象。具备RDD的3个关键特性:Immutable(不变性)、Distributed(分布式)和Fault-Tolerant(容错)。包括带属性的顶点和边,这些属性是用来描述顶点和边的特征。示例图说明:顶点属性为名称和职业,边的属性是两个顶点之间的关系。如:顶点3名称为rxin,职业为学生;顶点5到顶点3表示5是3的指导教授。共一百零五页GraphX实例-概念(gàiniàn)-图的切分与存储一般(yībān)图的分布式存储有边分割和点分割两种方式。边分割(Edge-Cut):每个顶点都存储一次,但有的边会被打断分到两台机器上。这样做的好处是节省存储空间;坏处是对图进行基于边的计算时,对于一条两个顶点被分到不同机器上的边来说,要跨机器通信传输数据,内网通信流量大。点分割(Vertex-Cut):每条边只存储一次,都只会出现在一台机器上。邻居多的点会被复制到多台机器上,增加了存储开销,同时会引发数据同步问题。好处是可以大幅减少内网通信量。Graphx使用点分割方式共一百零五页GraphX实例(shílì)-概念-主要的图算法GraphX提供系列图算法来简化任务的分析,主要有:PageRank
又称网页排名、网页级别。PageRank是Google专有的算法,用于衡量特定网页相对于搜索引擎索引中的其他网页而言的重要程度。它由Google的创始人LarryPage和SergeyBrin在1998年发明(fāmíng)。PageRank实现了将链接价值概念作为排名因素。
PageRank将对页面的链接看成投票,指示了重要性。ConnectedComponents
连通分支图算法,连通分支图算法用ID标注图中每个连通分支,将连通分支中序号最小的顶点的ID作为连通分支的ID。该算法是图深度优先搜索算法的另一项重要应用。TriangleCounting
三角形计数算法,当一个顶点有两个相邻的顶点以及相邻顶点之间的边时,这个顶点是一个三角形的一部分共一百零五页GraphX实例(shílì)-社会关系图操作实例主题:对一个社会关系(shèhuìguānxi)图进行操作。数据准备,下图中有6个人,每个人有名字和年龄,这些人根据社会关系形成8条边,每条边有其属性。图中每个顶点的属性包括一个人的姓名和年龄,每条边有一个属性值,假设为源顶点向目的顶点追求的次数。共一百零五页GraphX实例-社会关系(shèhuìguānxi)图操作-数据准备importorg.apache.spark.graphx._importorg.apache.spark.rdd.RDD//准备顶点的数据(shùjù),类型为VD:(String,Int)valvertexArray=Array((1L,("Alice",28)),(2L,("Bob",27)),(3L,("Charlie",65)),(4L,("David",42)),(5L,("Ed",55)),(6L,("Fran",50)))//准备边的数据,类型为ED:IntvaledgeArray=Array(Edge(2L,1L,7),Edge(2L,4L,2),Edge(3L,2L,4),Edge(3L,6L,3),Edge(4L,1L,1),Edge(5L,2L,2),Edge(5L,3L,8),Edge(5L,6L,3))共一百零五页GraphX实例-社会关系(shèhuìguānxi)图操作-数据准备//构造(gòuzào)vertexRDD和edgeRDDvalvertexRDD:RDD[(Long,(String,Int))]=sc.parallelize(vertexArray)valedgeRDD:RDD[Edge[Int]]=sc.parallelize(edgeArray)//构造图Graph[VD,ED]valgraph:Graph[(String,Int),Int]=Graph(vertexRDD,edgeRDD)scala>graph.vertices.collectres3:Array[(org.apache.spark.graphx.VertexId,(String,Int))]=Array((1,(Alice,28)),(2,(Bob,27)),(3,(Charlie,65)),(4,(David,42)),(5,(Ed,55)),(6,(Fran,50)))scala>graph.edges.collectres4:Array[org.apache.spark.graphx.Edge[Int]]=Array(Edge(2,1,7),Edge(2,4,2),Edge(3,2,4),Edge(3,6,3),Edge(4,1,1),Edge(5,2,2),Edge(5,3,8),Edge(5,6,3))共一百零五页GraphX实例(shílì)-社会关系图操作-图的属性操作//找出图中年龄大于30的顶点的方法(fāngfǎ)scala>graph.vertices.filter(v=>v._2._2>30).collect.foreach(v=>println(s"${v._2._1}is${v._2._2}"))Charlieis65Davidis42Edis55Franis50//找出图中属性大于5的边scala>graph.edges.filter(e=>e.attr>5).collect.foreach(e=>println(s"${e.srcId}to${e.dstId}att${e.attr}"))2to1att75to3att8共一百零五页GraphX实例-社会关系(shèhuìguānxi)图操作-转换操作//顶点(dǐngdiǎn)的转换操作,顶点(dǐngdiǎn)
+10scala>graph.mapVertices{case(id,(name,age))=>(name,age+10)}.vertices.collect.foreach(v=>println(s"${v._2
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
评论
0/150
提交评论