《大数据分析技术应用》课件-3.RDD_第1页
《大数据分析技术应用》课件-3.RDD_第2页
《大数据分析技术应用》课件-3.RDD_第3页
《大数据分析技术应用》课件-3.RDD_第4页
《大数据分析技术应用》课件-3.RDD_第5页
已阅读5页,还剩34页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

大数据分析技术BigData

analysis

technologyRDD概述RDD(Resilient

Distributed

Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。

RDD具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性。

RDD允许用户在执行多个查询时显式地将工作集缓存在内存中,后续的查询能够重用工作集,这极大地提升了查询速度。

RDD中的弹性是指:

存储的弹性:内存与磁盘的自动切换。

容错的弹性:数据丢失可以自动恢复。 计算的弹性:计算出错重试机制。

分片的弹性:可根据需要重新分片。1.RDD概念

分布式:数据存储在大数据集群不同节点上

数据集:RDD封装了计算逻辑,并不保存数据

数据抽象:

RDD是一个抽象类,需要子类具体实现

不可变:RDD封装了计算逻辑,是不可以改变的,

想要改变,只能产生新的RDD,在新的RDD里面封装计算逻辑

可分区:是指能进行分区

并行计算:因为RDD的分区特性,所以支持并行处理的特性。即不同节点上的数据可以分别被处理,然后生成一个新的RDD1.RDD概念sc.textFile(“xx").flatMap(_.split("")).map((_,1)).reduceByKey(_

+_).saveAsTextFile(“xx")从计算的角度来讲,数据处理过程中需要计算资源(内存&CPU)和计算模型(逻辑)。执行时,需要将计算资源和计算模型进行协调和整合。Spark框架在执行时,

先申请资源,

然后将应用程序的数据处理逻辑分解成一个一个的计算任务。然后将任务发到已经分配资源的计算节点上,按照指定的计算模型进行数据计算。最后得到计算结果。RDD是Spark框架中用于数据处理的核心模型,例如在SparkShell,执行如下命令:1.RDD概念1.RDD概念转换RDD每个

RDD都被分为多个分区,这些分区运行在集群中的不同节点上。

RDD可以包含Python、Java、

Scala

中任意类型的对象,甚至可以包含用户自定义的对象。

RDD具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性。

RDD允许用户在执行多个查询时

显式地将工作集缓存在内存中,

后续的查询能够重用工作集,这极大地提升了查询速度。1.RDD概念RDD操作计算Spark对数据的操作流程创建RDD1)一组分区(Partition),即数据集的基本组成单位。对于RDD来说,每个分区都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分区个数,如果

没有指定,那么就会采用默认值。默认值就是程序所分配到的CPUCore的数目。#RDD数据结构中存在分区列表,用于执行任务时并行计算,是实现分布式计算的重要属性。2)一个计算每个分区的函数。

Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。

compute函数会对迭代器进行复合,不需要保存每次计算的结果。#Spark在计算时,是使用分区函数对每一个分区进行计算。3)

RDD之间的依赖关系。

RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,

Spark可以通过这个依赖关系重

新计算丢失的分区数据,

而不是对RDD的所有分区进行重新计算。#RDD是计算模型的封装,当需求中需要将多个计算模型进行组合时,

就需要将多个RDD建立依赖关系。2.RDD的五大特性4)一个Partitioner,即

RDD的分片函数。当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。只有对于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent

RDDShuffle输出时的分片数量。#当数据为KV类型数据时,可以通过设定分区器自定义数据的分区。5)一个列表,存储存取每个Partition的优先位置(preferredlocation)。对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,

Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。#计算数据时,可以根据计算节点的状态选择不同的节点位置进行计算2.RDD的五大特性在RDD中,通常就代表和包含了Spark应用程序的输入源数据。在创建了初始的RDD之后,

才可以通过SparkCore提供的transformation算子,对该RDD进行transformation(转换)操作,来获取其他的RDD。SparkCore为我们提供了三种创建RDD的方式,包括:1.从集合(内存)中创建RDD2.从外部存储(文件)创建RDD3.从其他RDD创建3.创建RDDval

rdd1:

RDD[Int]

=sc.makeRDD(Array(1,2,

3,4,

5,6))val

rdd2:

RDD[Int]

=sc.parallelize(Array(1,2,

3,4,

5,6))//从集合(内存)中创建RDD:这种创建方式多用于测试使用//makeRDD和parallelize是可以指定分区数量的,有第二个参数,

默认值是2,也可以指定这个值3.创建RDD从集合(内存)中创建RDD:从集合中创建RDD,

Spark主要提供了两个方法:

parallelize和makeRDDval

rdd3:

RDD[String]

=sc.textFile("hdfs://*****/test.txt")val

rdd4:

RDD[String]

=sc.textFile("file:///home/hadoop/test.txt")从外部存储(文件)创建RDD:由外部存储系统的数据集创建RDD包括:

本地的文件系统,所有Hadoop支持的数据集,

比如HDFS、

HBase等。

Spark提供textFile方法从外部存储创建RDD。3.创建RDD//sc.textFile(文件路径)//如果从本地文件系统存储的文件创建RDD,路径前加

“file:///”。val

rdd4:

RDD[String]

=sc.textFile("file:///home/hadoop/test.txt")val

rdd5:

RDD[String]

=

rdd4.flatMap(_.split("

"))3.创建RDD从其他RDD创建RDD:主要是通过一个RDD运算完后,再产生新的RDD。RDD算子RDD中的所有转换都是延迟加载的,它们并不会直接计算结果。它们只是记住这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个要求返回结果给Driver的动作时,这些转换才会真正运行。这种设计让Spark更加有效率地运行。RDD支持两种操作:转换操作(Transformation)和行动操作(Action)

RDD的转换操作是返回一个新的RDD的操作,

比如map和flatMap

,而行动操作则是向Driver返回结果或

将结果写出到外部存在设备,

比如,

collect和saveAsTextFile。1.RDD算子概述转换算子含义map(func)返回一个新的RDD,该RDD由每一个输入元素经过func

函数转换后组成filter(func)返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成flatMap(func)类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)union(otherDataset)对源RDD和参数RDD求并集后返回一个新的RDDintersection(otherDataset)对源RDD和参数RDD求交集后返回一个新的RDDdistinct([numTasks]))对源RDD进行去重后返回一个新的RDDgroupByKey([numTasks])在一个(K,V)的RDD上调用,返回一个(K,

Iterator[V])的

RDD2.转换算子转换算子含义reduceByKey(func,[numTasks])在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,

将相同key的值聚合到一起,

与groupByKey类似,

reduce任务的个数可以通过第二个可选的参数来设置sortByKey([ascending],[numTasks])在一个(K,V)的RDD上调用,

K必须实现Ordered接口,返

回一个按照key进行排序的(K,V)的RDDsortBy(func,[ascending],[numTasks])与sortByKey类似,但是更灵活join(otherDataset,[numTasks])在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD2.转换算子行动算子含义reduce(func)通过func函数聚集RDD中的所有元素,这个功能必须是可交换且可并联的collect()在驱动程序中,以数组的形式返回数据集的所有元素count()返回RDD的元素个数first()返回RDD的第一个元素(类似于take(1))take(n)返回一个由数据集的前n个元素组成的数组saveAsTextFile(path)将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,

Spark将会调用toString方法,将它装换为文件中的文本foreach(func)在数据集的每一个元素上,运行函数func进行更新。3.行动算子Spark内有collect方法,是Action操作里边的一个算子,这个方法可以将RDD类型的数据转化为数组来存放并参与后续运算。defcollect():Array[T]collect用于将一个RDD转换成数组。4.collect算子5.count算子count算子属于行动算子defcount():

Longcount返回RDD中的元素数量。6.first算子first算子属于行动算子。返回数据集中的第一个元素,类似于take(1)。take算子是行动算子,

用于获取RDD中从0到num-1下标的元素(不排序)

。返回RDD中前k个元素,并保存成数组。take:

deftake(num:

Int):Array[T]7.take算子8.map算子map算子是操作算子,将原来

RDD的每个数据项通过

map

中的用户自定义函数f映射转变为一个新的元素。源码中map算子相当于初始化一个

RDD,新

RDD

叫做

MappedRDDreduceByKey属于Transformation算子在一个(K

,V)对的数据集上使用,返回一个(K

,V)对的数据集,

key相同的值,

都被使用指定的reduce函数聚合到一起。9.reduceByKey算子join算子通常有下面几种类型,不同类型的join操作会影响返回的数据结果。INNERJOIN:如果表中有至少一个匹配,则返回行,等同于JOINLEFTJOIN:即使右表中没有匹配,也从左表返回所有的行RIGHTJOIN:即使左表中没有匹配,也从右表返回所有的行FULLJOIN:只要其中一个表中存在匹配,就返回行join属于Transformation算子。join相当于SQL中的内关联join,只返回两个RDD根据K可以关联上的结果,join只能用于两个RDD之间的关联,如果要多个RDD关联,多关联几次即可。10.join算子10.join算子RDD依赖RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区。

RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。由于每个Transformation操作都会生成一个新的RDD,所以RDD之间就会形成类似流水线的前后依赖关系。在spark中,

RDD之间存在两种类型的依赖关系:窄依赖和宽依赖。1.RDD依赖关系的本质2.依赖关系下的数据流视图如下图是RDD依赖关系下的数据流视图:在spark中,会根据RDD之间的依赖关系将DAG图划分为不同的阶段,对于窄依赖,由于partition依赖关系的确定性,

partition的转换处理就可以在同一个线程里完成,窄依赖就

被spark划分到同一个stage中,而对于宽依赖,只能等父RDDshuffle处理完成后,下一个stage才能开始接下来的计算。因此spark划分stage的整体思路是:

从后往前推,

遇到宽依赖就断开,

划分为一个stage;遇到窄依赖就将这个RDD加入该stage中。2.依赖关系下的数据流视图在spark中,

Task的类型分为2种:

ShuffleMapTask和ResultTask;简单来说,

DAG的最后一个阶段会为每个结果的partition生成一个ResultTask,即每个Stage里面的Task的数量是由该Stage中最后一个RDD的Partition的数量所决定的,而其余所有阶段都会生成ShuffleMapTask;之所以称之为ShuffleMapTask,是因为它需要将自己的计算结果通过shuffle到下一个stage中。2.依赖关系下的数据流视图窄依赖是指1个父RDD分区对应1个子RDD的分区。换句话说,

一个父RDD的分区对应于一个子RDD的分区,或者多个父RDD的分区对应于一个子RDD的分区。3.窄依赖窄依赖分为两种情况:

1个子RDD的分区对应于1个父RDD的分区,比如map

,filter,

union等算子。

1个子RDD的分区对应于N个父RDD的分区,比如co-partioned

join。3.窄依赖1.依赖往往对应着Shuffle操作,

需要在运行过程中将同一个父RDD的分区传入到不同的子RDD分区中,中间可能涉及多个节点之间的数据传输;

而窄依赖的每个父RDD的分区只会传入到一个子RDD分区中,

通常可以在一个节点内完成转换。2.当RDD分区丢失时(某个节点故障)

Spark会对数据进行重算。3.对于窄依赖,由于父RDD的一个分区只对应一个子RDD分区,

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

最新文档

评论

0/150

提交评论