




已阅读5页,还剩53页未读, 继续免费阅读
版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
,厦门大学计算机科学系2017年版,林子雨厦门大学计算机科学系E-mail:ziyulin主页:,第5章Spark编程基础(PPT版本号:2017年春季学期),厦门大学研究生课程大数据处理技术Spark,温馨提示:编辑幻灯片母版,可以修改每页PPT的厦大校徽和底部文字,扫一扫访问班级主页,5.1.4分区,RDD是弹性分布式数据集,通常RDD很大,会被分成很多个分区,分别保存在不同的节点上,为什么要分区?(1)增加并行度(2)减少通信开销,5.1.4分区,在分布式程序中,通信的代价是很大的,因此控制数据分布以获得最少的网络传输可以极大地提升整体性能。所以对RDD进行分区的目的就是减少网络传输的代价以提高系统的性能只有当数据集多次在诸如连接这种基于键的操作中使用时,分区才会有帮助。若RDD只需要扫描一次,就没有必要进行分区处理能从spark分区中的操作有:cogroup()、groupWith()、join()、leftOuterJoin()、rightOuterJoin()、groupByKey()、reduceByKey()、combineByKey()以及lookup(),5.1.4分区,RDD分区的一个分区原则是使得分区的个数尽量等于集群中的CPU核心(core)数目对于不同的Spark部署模式而言(本地模式、Standalone模式、YARN模式、Mesos模式),都可以通过设置spark.default.parallelism这个参数的值,来配置默认的分区数目,一般而言:*本地模式:默认为本地机器的CPU数目,若设置了localN,则默认为N*ApacheMesos:默认的分区数为8*Standalone或YARN:在“集群中所有CPU核心数目总和”和“2”二者中取较大值作为默认值,如何手动设置分区:(1)创建RDD时:在调用textFile和parallelize方法时候手动指定分区个数即可,语法格式:sc.textFile(path,partitionNum)(2)通过转换操作得到新RDD时:直接调用repartition方法即可,5.1.4分区,scalavarrdd2=data.repartition(1)rdd2:org.apache.spark.rdd.RDDString=MapPartitionsRDD11atrepartitionat:23scalardd2.partitions.sizeres4:Int=1scalavarrdd2=data.repartition(4)rdd2:org.apache.spark.rdd.RDDString=MapPartitionsRDD15atrepartitionat:23scalardd2.partitions.sizeres5:Int=4,repartition的用法,5.1.4分区,对于parallelize而言,如果没有在方法中指定分区数,则默认为spark.default.parallelism,scalavalarray=Array(1,2,3,4,5)array:ArrayInt=Array(1,2,3,4,5)scalavalrdd=sc.parallelize(array,2)#设置两个分区rdd:org.apache.spark.rdd.RDDInt=ParallelCollectionRDD13atparallelizeat:29,对于textFile而言,如果没有在方法中指定分区数,则默认为min(defaultParallelism,2),其中,defaultParallelism对应的就是spark.default.parallelism,如果是从HDFS中读取文件,则分区数为文件分片数(比如,128MB/片),5.1.4分区,实例:根据key值的最后一位数字,写到不同的文件例如:10写入到part-0000011写入到part-00001.19写入到part-00009,5.1.4分区,importorg.apache.spark.Partitioner,SparkContext,SparkConf/自定义分区类,需继承Partitioner类classUsridPartitioner(numParts:Int)extendsPartitioner/覆盖分区数overridedefnumPartitions:Int=numParts/覆盖分区号获取函数overridedefgetPartition(key:Any):Int=key.toString.toInt%10objectTestdefmain(args:ArrayString)valconf=newSparkConf()valsc=newSparkContext(conf)/模拟5个分区的数据valdata=sc.parallelize(1to10,5)/根据尾号转变为10个分区,分写到10个文件data.map(_,1).partitionBy(newUsridPartitioner(10).saveAsTextFile(/chenm/partition),5.2PairRDD,5.2.1PairRDD的创建5.2.2常用的PairRDD转换操作5.2.3一个综合实例,5.2.1PairRDD的创建,(1)第一种创建方式:从文件中加载,可以采用多种方式创建PairRDD,其中一种主要方式是使用map()函数来实现,scalavallines=sc.textFile(file:/usr/local/spark/mycode/pairrdd/word.txt)lines:org.apache.spark.rdd.RDDString=file:/usr/local/spark/mycode/pairrdd/word.txtMapPartitionsRDD1attextFileat:27scalavalpairRDD=lines.flatMap(line=line.split().map(word=(word,1)pairRDD:org.apache.spark.rdd.RDD(String,Int)=MapPartitionsRDD3atmapat:29scalapairRDD.foreach(println)(i,1)(love,1)(hadoop,1),5.2.1PairRDD的创建,(2)第二种创建方式:通过并行集合(数组)创建RDD,scalavallist=List(Hadoop,Spark,Hive,Spark)list:ListString=List(Hadoop,Spark,Hive,Spark)scalavalrdd=sc.parallelize(list)rdd:org.apache.spark.rdd.RDDString=ParallelCollectionRDD11atparallelizeat:29scalavalpairRDD=rdd.map(word=(word,1)pairRDD:org.apache.spark.rdd.RDD(String,Int)=MapPartitionsRDD12atmapat:31scalapairRDD.foreach(println)(Hadoop,1)(Spark,1)(Hive,1)(Spark,1),5.2.2常用的PairRDD转换操作,reduceByKey(func)groupByKey()keysvaluessortByKey()mapValues(func)joincombineByKey,5.2.2常用的PairRDD转换操作,reduceByKey(func)reduceByKey(func)的功能是,使用func函数合并具有相同键的值,(Hadoop,1)(Spark,1)(Hive,1)(Spark,1),scalapairRDD.reduceByKey(a,b)=a+b).foreach(println)(Spark,2)(Hive,1)(Hadoop,1),5.2.2常用的PairRDD转换操作,groupByKey()的功能是,对具有相同键的值进行分组,groupByKey(),比如,对四个键值对(spark,1)、(spark,2)、(hadoop,3)和(hadoop,5),采用groupByKey()后得到的结果是:(spark,(1,2)和(hadoop,(3,5),(Hadoop,1)(Spark,1)(Hive,1)(Spark,1),scalapairRDD.groupByKey()res15:org.apache.spark.rdd.RDD(String,IterableInt)=ShuffledRDD15atgroupByKeyat:34,5.2.2常用的PairRDD转换操作,reduceByKey用于对每个key对应的多个value进行merge操作,最重要的是它能够在本地先进行merge操作,并且merge操作可以通过函数自定义,reduceByKey和groupByKey的区别,groupByKey也是对每个key进行操作,但只生成一个sequence,groupByKey本身不能自定义函数,需要先用groupByKey生成RDD,然后才能对此RDD通过map进行自定义函数操作,5.2.2常用的PairRDD转换操作,scalavalwords=Array(one,two,two,three,three,three)scalavalwordPairsRDD=sc.parallelize(words).map(word=(word,1)scalavalwordCountsWithReduce=wordPairsRDD.reduceByKey(_+_)scalavalwordCountsWithGroup=wordPairsRDD.groupByKey().map(t=(t._1,t._2.sum),reduceByKey和groupByKey的区别,上面得到的wordCountsWithReduce和wordCountsWithGroup是完全一样的,但是,它们的内部运算过程是不同的,5.2.2常用的PairRDD转换操作,(1)当采用reduceByKey时,Spark可以在每个分区移动数据之前将待输出数据与一个共用的key结合,5.2.2常用的PairRDD转换操作,(2)当采用groupByKey时,由于它不接收函数,Spark只能先将所有的键值对(key-valuepair)都移动,这样的后果是集群节点之间的开销很大,导致传输延时,5.2.2常用的PairRDD转换操作,keys,keys只会把PairRDD中的key返回形成一个新的RDD,scalapairRDD.keysres17:org.apache.spark.rdd.RDDString=MapPartitionsRDD17atkeysat:34scalapairRDD.keys.foreach(println)HadoopSparkHiveSpark,(Hadoop,1)(Spark,1)(Hive,1)(Spark,1),5.2.2常用的PairRDD转换操作,values,values只会把PairRDD中的value返回形成一个新的RDD。,(Hadoop,1)(Spark,1)(Hive,1)(Spark,1),scalapairRDD.valuesres0:org.apache.spark.rdd.RDDInt=MapPartitionsRDD2atvaluesat:34scalapairRDD.values.foreach(println)1111,5.2.2常用的PairRDD转换操作,sortByKey(),sortByKey()的功能是返回一个根据键排序的RDD,(Hadoop,1)(Spark,1)(Hive,1)(Spark,1),scalapairRDD.sortByKey()res0:org.apache.spark.rdd.RDD(String,Int)=ShuffledRDD2atsortByKeyat:34scalapairRDD.sortByKey().foreach(println)(Hadoop,1)(Hive,1)(Spark,1)(Spark,1),5.2.2常用的PairRDD转换操作,sortByKey()和sortBy(),scalavald1=sc.parallelize(Array(“c,8),(“b“,25),(“c“,17),(“a“,42),(“b“,4),(“d“,9),(“e“,17),(“c“,2),(“f“,29),(“g“,21),(“b“,9)scalad1.reduceByKey(_+_).sortByKey(false).collectres2:Array(String,Int)=Array(g,21),(f,29),(e,17),(d,9),(c,27),(b,38),(a,42),scalavald2=sc.parallelize(Array(“c,8),(“b“,25),(“c“,17),(“a“,42),(“b“,4),(“d“,9),(“e“,17),(“c“,2),(“f“,29),(“g“,21),(“b“,9)scalad2.reduceByKey(_+_).sortBy(_._2,false).collectres4:Array(String,Int)=Array(a,42),(b,38),(f,29),(c,27),(g,21),(e,17),(d,9),5.2.2常用的PairRDD转换操作,mapValues(func),对键值对RDD中的每个value都应用一个函数,但是,key不会发生变化,scalapairRDD.mapValues(x=x+1)res2:org.apache.spark.rdd.RDD(String,Int)=MapPartitionsRDD4atmapValuesat:34scalapairRDD.mapValues(x=x+1).foreach(println)(Hadoop,2)(Spark,2)(Hive,2)(Spark,2),(Hadoop,1)(Spark,1)(Hive,1)(Spark,1),5.2.2常用的PairRDD转换操作,join,join就表示内连接。对于内连接,对于给定的两个输入数据集(K,V1)和(K,V2),只有在两个数据集中都存在的key才会被输出,最终得到一个(K,(V1,V2)类型的数据集。,scalavalpairRDD1=sc.parallelize(Array(spark,1),(spark,2),(hadoop,3),(hadoop,5)pairRDD1:org.apache.spark.rdd.RDD(String,Int)=ParallelCollectionRDD24atparallelizeat:27scalavalpairRDD2=sc.parallelize(Array(spark,fast)pairRDD2:org.apache.spark.rdd.RDD(String,String)=ParallelCollectionRDD25atparallelizeat:27scalapairRDD1.join(pairRDD2)res9:org.apache.spark.rdd.RDD(String,(Int,String)=MapPartitionsRDD28atjoinat:32scalapairRDD1.join(pairRDD2).foreach(println)(spark,(1,fast)(spark,(2,fast),5.2.2常用的PairRDD转换操作,combineByKeycombineByKey(createCombiner,mergeValue,mergeCombiners),createCombiner:在分区中第一次遇到某个Key时触发的函数,将RDD数据集中的V类型值转换C类型值(V=C),mergeValue:合并值函数,再次遇到相同的Key时,将createCombiner的C类型值与这次传入的V类型值合并成一个C类型值(C,V)=C,mergeCombiners:合并组合器函数,将C类型值两两合并成一个C类型值,5.2.2常用的PairRDD转换操作,例:编程实现自定义Spark合并方案。给定一些销售数据,数据采用键值对的形式,求出每个公司的总收入和平均收入,保存在本地文件,5.2.2常用的PairRDD转换操作,importorg.apache.spark.SparkContextimportorg.apache.spark.SparkConfobjectCombinedefmain(args:ArrayString)valconf=newSparkConf().setAppName(Combine).setMaster(“local”)valsc=newSparkContext(conf)valdata=sc.parallelize(Array(company-1,92),(company-1,85),(company-1,82),(company-2,78),(company-2,96),(company-2,85),(company-3,88),(company-3,94),(company-3,80),3)valres=bineByKey(income)=(income,1),(acc:(Int,Int),income)=(acc._1+income,acc._2+1),(acc1:(Int,Int),acc2:(Int,Int)=(acc1._1+acc2._1,acc1._2+acc2._2).mapcase(key,value)=(key,value._1,value._1/value._2.toFloat)res.repartition(1).saveAsTextFile(./result),5.2.2常用的PairRDD转换操作,一个综合实例,题目:给定一组键值对(spark,2),(hadoop,6),(hadoop,4),(spark,6),键值对的key表示图书名称,value表示某天图书销量,请计算每个键对应的平均值,也就是计算每种图书的每天平均销量。,scalavalrdd=sc.parallelize(Array(spark,2),(hadoop,6),(hadoop,4),(spark,6)rdd:org.apache.spark.rdd.RDD(String,Int)=ParallelCollectionRDD38atparallelizeat:27scalardd.mapValues(x=(x,1).reduceByKey(x,y)=(x._1+y._1,x._2+y._2).mapValues(x=(x._1/x._2).collect()res22:Array(String,Int)=Array(spark,4),(hadoop,5),5.5WordCount程序解析,5.5.1WordCount程序运行原理5.5.2通过WordCount理解Spark与HDFS组合使用原理5.5.3解析分片、分区、CPU核数之间的关系,5.5.1WordCount程序运行原理,scalavaltextFile=sc.textFile(file:/usr/local/spark/mycode/wordcount/word.txt)scalavalwordCount=textFile.flatMap(line=line.split().map(word=(word,1).reduceByKey(a,b)=a+b)scalawordCount.collect()scalawordCount.foreach(println),5.5.1WordCount程序运行原理,5.5.1WordCount程序运行原理,5.5.1WordCount程序运行原理,5.5.2通过WordCount理解Spark与HDFS组合使用原理,分布式处理的核心观念在于“计算向数据靠拢”,优点如下:节省网络带宽计算逻辑在数据侧执行,消除了集中式处理中计算逻辑侧的性能瓶颈,Spark+HDFS运行架构,5.5.2通过WordCount理解Spark与HDFS组合使用原理,对于WordCount而言,分布式程序运行在每个Slave的每个分区上,统计本分区内的单词计数,生成一个Map,然后将它传回给Driver,再由Driver两两合并来自各个分区的所有Map,形成最终的单词计数。,5.5.3解析分片、分区、CPU核数之间的关系,梳理一下Spark中关于并发度涉及的几个概念File、Block、Split、Task、Partition、RDD以及节点数、Executor数、core数目的关系,5.5.3解析分片、分区、CPU核数之间的关系,输入可能以多个文件的形式存储在HDFS上,每个File都包含了很多块,称为Block,当Spark读取这些文件作为输入时,会根据具体数据格式对应的InputFormat进行解析,一般是将若干个Block合并成一个输入分片,称为InputSplit,注意InputSplit不能跨越文件,随后将为这些输入分片生成具体的Task。InputSplit与Task是一一对应的关系,随后这些具体的Task每个都会被分配到集群上的某个节点的某个Executor去执行。每个节点可以起一个或多个Executor,每个Executor由若干core组成,每个Executor的每个core一次只能执行一个Task,每个Task执行的结果就是生成了目标RDD的一个partiton,Task被执行的并发度=Executor数目*每个Executor核数,5.5.3解析分片、分区、CPU核数之间的关系,至于partition的数目:,对于数据读入阶段,例如sc.textFile,输入文件被划分为多少InputSplit就会需要多少初始Task,在Map阶段partition数目保持不变,在Reduce阶段,RDD的聚合会触发shuffle操作,聚合后的RDD的partition数目跟具体操作有关,例如repartition操作会聚合成指定分区数,还有一些算子是可配置的,RDD分区数决定了Task数量,为并行提供了可能。至于能否并行执行由CPU的核数来决定。比如8核,那么那么一台机器可以同时跑8个任务,如果是3核的话,8个任务轮流执行,5.6综合案例,5.6.1案例1:求TOP值5.6.2案例2:求最大最小值5.6.4案例4:二次排序,5.6.1案例1:求TOP值,任务描述:,orderid,userid,payment,productid,1,1768,50,1552,1218,600,2113,2239,788,2424,3101,28,5995,4899,290,1296,3110,54,12017,4436,259,8778,2369,7890,27,100,4287,226,233101,6562,489,124102,1124,33,17103,3267,159,179104,4569,57,125105,1438,37,116,file1.txt,file2.txt,求TopN个payment值,5.6.1案例1:求TOP值,importorg.apache.spark.SparkConf,SparkContextobjectTopValuedefmain(args:ArrayString):Unit=valconf=newSparkConf().setAppName(“TopValue“).setMaster(local)valsc=newSparkContext(conf)sc.setLogLevel(ERROR)vallines=sc.textFile(hdfs:/localhost:9000/user/hadoop/spark/chapter5/*”,2)varnum=0;valresult=lines.filter(line=(line.trim().length0)&(line.split(,).length=4).map(_.split(,)(2).map(x=(x.toInt,).sortByKey(false).map(x=x._1).take(5).foreach(x=num=num+1println(num+t+x),5.6.2案例2:求最大最小值,12954167324111542669748563418,5329144567218645735267,file1.txt,file2.txt,任务描述:求出多个文件中数值的最大、最小值,5.6.2案例2:求最大最小值,importorg.apache.spark.SparkConf,SparkContextobjectMaxAndMindefmain(args:ArrayString):Unit=valconf=newSparkConf().setAppName(“MaxAndMin“).setMaster(local)valsc=newSparkContext(conf)sc.setLogLevel(ERROR)vallines=sc.textFile(hdfs:/localhost:9000/user/hadoop/spark/chapter5/*,2),剩余代码见下一页,5.6.2案例2:求最大最小值,valresult=lines.filter(_.trim().length0).map(line=(key,line.trim.toInt).groupByKey().map(x=varmin=Integer.MAX_VALUEvarmax=Integer.MIN_VALUEfor(nummax)max=numif(numprintln(maxt+x._1)println(mint+x._2),5.6.4案例4:二次排序,5369375632,输入文件file1.txt,任务要求:对于一个给定的文件(数据如file1.txt所示),请对数据进行排序,首
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 集成项目审计与合规性检查考核试卷
- 食品加工企业2025年节能减排技术改造项目风险评估与控制报告
- 新能源汽车二手车市场2025年评估与流通环节风险防范与控制策略分析报告
- 杭州市八上数学试卷
- 品牌战略规划发布行业论坛演讲考核试卷
- 绿化工程合同管理与纠纷解决机制考核试卷
- 金融量化投资策略在2025年金融风险管理咨询中的实践与风险控制报告
- 消防联动控制器理论课件
- 2025年工业互联网NFV技术推动工业设备智能化制造实践报告
- 文旅融合视角下乡村文化旅游产业人才队伍建设与培养报告
- 2025年消毒供应室专科理论考试题库及答案
- 贵州省2025年中考物理试题(含答案)
- 专利转让许可培训课件
- 机床备件采购管理办法
- 张力性气胸个案护理
- 铁路客运安全与应急处理
- 煲仔饭外卖活动方案
- 工厂三级安全教育培训考核试卷(含答案)
- 建筑大厦工程技术难题与解决方案
- 汽车车身涂胶设计规范
- 2025至2030再加工一次性设备行业产业运行态势及投资规划深度研究报告
评论
0/150
提交评论