spark的优化-控制数据分区和分布_第1页
spark的优化-控制数据分区和分布_第2页
spark的优化-控制数据分区和分布_第3页
spark的优化-控制数据分区和分布_第4页
spark的优化-控制数据分区和分布_第5页
已阅读5页,还剩2页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

spark的优化控制数据分区和分布 数据分区:在分布式集群里,网络通信的代价很大,减少网络传输可以极大提升性能。mapreduce框架的性能开支主要在io和网络传输,io因为要大量读写文件,它是不可避免的,但是网络传输是可以避免的,把大文件压缩变小文件,从而减少网络传输,但是增加了cpu的计算负载。Spark里面io也是不可避免的,但是网络传输spark里面进行了优化:spark把rdd进行分区(分片),放在集群上并行计算。同一个rdd分片100个,10个节点,平均一个节点10个分区当进行sum型的计算的时候,先进行每个分区的sum,然后把sum值shuffle传输到主程序进行全局sum,所以进行sum型计算对网络传输非常小。但对于进行join型的计算的时候,需要把数据本身进行shuffle,网络开销很大。spark是如何优化这个问题的呢?spark把keyvalue rdd通过key的hashcode进行分区,而且保证相同的key存储在同一个节点上,这样对改rdd进行key聚合时,就不需要shuffle过程我们进行mapreduce计算的时候为什么要尽兴shuffle?,就是说mapreduce里面网络传输主要在shuffle阶段,shuffle的根本原因是相同的key存在不同的节点上,按key进行聚合的时候不得不进行shuffle。shuffle是非常影响网络的,它要把所有的数据混在一起走网络,然后它才能把相同的key走到一起。要尽兴shuffle是存储决定的。spark从这个教训中得到启发,spark会把key进行分区,也就是key的hashcode进行分区,相同的key,hashcode肯定是一样的,所以它进行分区的时候100t的数据分成10分,每部分10个t,它能确保相同的key肯定在一个分区里面,而且它能保证存储的时候相同的key能够存在同一个节点上。比如一个rdd分成了100份,集群有10个节点,所以每个节点存10份,每一分称为每个分区,spark能保证相同的key存在同一个节点上,实际上相同的key存在同一个分区。key的分布不均决定了有的分区大有的分区小。没法分区保证完全相等,但它会保证在一个接近的范围。所以mapreduce里面做的某些工作里边,spark就不需要shuffle了,spark解决网络传输这块的根本原理就是这个。进行join的时候是两个表,不可能把两个表都分区好,通常情况下是把用的频繁的大表事先进行分区,小表进行关联它的时候小表进行shuffle过程。大表不需要shuffle。模版是:val userData = sc.sequenceFileUserID,UserInfo(hdfs:/.).partitionBy(new HashPartition(100)/构造100个分区.persist()从分区中获益的操作:cogroup(), groupwith(),join(),leftOuterJoin(),rightOuterJoin(),groupByKey(),reduceByKey(),cobimeByKey(),lookup()所有基于key的操作都会获益对于诸如cogroup()和join()这样的二元操作,预先进行数据分区会让其中至少一个rdd(使用已知分区器的那个rdd)不发生数据shuffle,如果两个rdd使用同样的分区方式,并且它们还缓存在同样的机器上(比如一个rdd是通过mapvalues()从另一个rdd中创建出来的,这两个rdd就会拥有相同的key和分区方式),或者其中rdd还没有被计算出来,那么跨界点的shuffle(数据混洗)不会发生了。mapreduce一般要求本地网卡达到20兆!即便进行了压缩!代码:import org.apache.hadoop.hive.ql.exec.persistence.HybridHashTableContainer.HashPartitionimport org.apache.hadoop.mapred.libimport org.apache.spark.SparkConfimport org.apache.spark.sql.SparkSessionimport org.apache.spark.storage.StorageLevelimport org.apache.spark.HashPartitioner/* * Created by zengxiaosen on 16/9/23. */object PartitionVisitCount /* 大表小表关联 */ def main(args: ArrayString): Unit = val sparkConf = new SparkConf().setAppName(useUDF).setMaster(local) val ss = SparkSession.builder().config(sparkConf).getOrCreate() val sc = ss.sparkContext val fileRDD = sc.textFile(/opt/tarballs/spark_kafka/beifengspark/src/main/scala/) .filter(line=line.length0) .map line = val arr = line.split(t) val date = arr(17).substring(0,10) val guid = arr(5) val url = arr(1) val uid = arr(18) (uid,(guid,url) /key-value:tuple2 .partitionBy(new HashPartitioner(10) /采用了hashcode分片方式,分成了10份,十个分区,每个分区10分 /* 相同的key在同一个分区,在进行任务调用时候,大表不需要任何shuffle 只需要shuffle小表 */ .persist(StorageLevel.DISK_ONLY) /* parallelize有两个参数,第一个是他的list,第二个是分区数 分区数可以不给,不给的情况下默认就是它的核数 */ /比如里面存的是我的用户id val rdd = sc.parallelize(List(1,2,3

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论