《Hadoop核心技术与大数据平台搭建》课件 单元11 大数据内存计算Spark_第1页
《Hadoop核心技术与大数据平台搭建》课件 单元11 大数据内存计算Spark_第2页
《Hadoop核心技术与大数据平台搭建》课件 单元11 大数据内存计算Spark_第3页
《Hadoop核心技术与大数据平台搭建》课件 单元11 大数据内存计算Spark_第4页
《Hadoop核心技术与大数据平台搭建》课件 单元11 大数据内存计算Spark_第5页
已阅读5页,还剩58页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

单元11

大数据内存计算Spark学习目标知识目标技能目标1.了解Spark与Hadoop的区别。2.了解Spark应用场景。3.掌握Spark特点。4.掌握Spark生态系统。5.掌握Spark原理与架构。6.掌握RDD概念、设计与运行原理1.了解Spark的6种运行模式与配置方法。2.掌握SparkonYARN模式的安装与配置方法。3.掌握Spark命令的使用和RDD的创建与运用单元任务任务11.1认识Spark任务11.2Spark原理与架构任务11.3Spark集群的安装与部署

(1)Spark应用场景。(2)Spark的特点。(3)Spark与MapReduce对比。(4)大数据处理类型与Spark生态系统。通过对Hadoop的学习,读者已经熟悉MapReduce的运用,但HadoopMapReduce存在表达能力有限、磁盘/IO开销大和延迟高等不足。本任务通过介绍Spark应用场景、Spark特点、Spark与MapReduce对比和Spark的生态系统等知识,使读者对Spark有一个清晰的认知。Spark的生态系统可形成一个完整的大数据处理和分析平台,可支持批处理、实时流处理、交互式查询、机器学习等多种数据处理方式。【任务描述】【关键步骤】任务11.1认识Spark

11.1.1Spark应用场景11.1.2Spark的特点11.1.3Spark与HadoopMapReduce对比11.1.4大数据处理类型与Spark生态系统任务11.1认识Spark11.1.1Spark应用场景

Spark最初由美国加利福尼亚大学伯克利分校(UCBerkeley)的AMP实验室于2009年开发。Spark是一种基于内存的快速、通用、可扩展的大数据计算引擎,是一站式解决方案,集批处理、实时流处理、交互式查询、图计算与机器学习于一体,可用于构建大型、低延迟的数据分析应用程序。Spark的场景有:(1)批处理,可用于ETL(抽取、转换、加载);(2)机器学习,可用于自动判断淘宝类商务应用的买家评论是好评还是差评;(3)交互式分析,可用于查询Hive数据仓库;(4)实时流处理,可用于页面单击流分析、推荐系统、舆情分析等实时业务。舆情分析是指对互联网敏感、热点信息进行监控、分析处理。

上述这些应用,因为数据已经在内存中,需要反复操作的次数越多,所需读取的数据量越大,受益就越大。11.1.1Spark应用场景11.1.2Spark的特点11.1.3Spark与HadoopMapReduce对比11.1.4大数据处理类型与Spark生态系统任务11.1认识Spark11.1.2Spark的特点

Spark作为一种基于内存计算的分布式计算框架,具有“轻、快、灵、巧”等特点,主要体现在以下几个方面。(1)轻。Spark的核心代码较轻量级。(2)快。Spark对小数据集可达到亚秒级的延迟。Spark支持在内存中进行数据处理,这大大减少了磁盘读/写的开销,从而显著提高处理速度。(3)灵。Spark提供了不同层面的灵活性。Spark提供了多种运行模式,包括独立的集群模式、Hadoop模式及云环境(如AmazonEC2)等。它还可以访问多种数据源,如HDFS、Cassandra、HBase和Hive等。这种灵活的运行模式和广泛的数据源支持使得Spark能够适应不同的环境和需求。(4)巧。巧妙借力现有大数据组件。Spark借Hadoop之势,与Hadoop无缝结合。Spark不仅支持多种编程语言(如Scala、Java、Python和R),还提供了丰富的API和工具库,包括SQL查询、流式计算、机器学习和图算法等,这使得Spark在数据处理和分析方面非常强大和灵活。11.1.1Spark应用场景11.1.2Spark的特点11.1.3Spark与HadoopMapReduce对比11.1.4大数据处理类型与Spark生态系统任务11.1认识Spark11.1.3Spark与HadoopMapReduce对比表达能力有限。计算必须转化成Map和Reduce两个操作,但这并不适用于所有的情况,难以描述复杂的数据处理过程。磁盘I/O开销大。每次执行时都需要从磁盘读取数据,并且在计算完成后需要将中间结果写入磁盘,I/O开销较大。延迟高。一次计算可能需要被分解成一系列按顺序执行的MapReduce任务,任务之间的衔接由于涉及I/O开销,因此会产生较高延迟。而且,在前一个任务执行完成之前,其他任务无法开始,因此难以胜任复杂、多阶段的计算任务。HadoopMapReduce的执行流程HadoopMapReduce存在如下缺点:11.1.3Spark与HadoopMapReduce对比Spark的计算模式为MapReduce,但不局限于Map和Reduce,还提供了多种数据集操作类型,编程模型比MapReduce更灵活,开发效率更高。Spark提供了内存计算,中间结果可被直接放在内存中,带来了更高的迭代运算效率,使得延迟更低,全内存计算性能提升高于100倍。Spark基于DAG的任务调度执行机制,要优于MapReduce的迭代执行机制。Spark的最大特点就是将计算数据、中间结果都存储在内存中,大大减少了I/O开销,因而更适用于迭代计算比较多的数据挖掘与机器学习运算。Spark的执行流程Spark主要具有如下优点:11.1.3Spark与HadoopMapReduce对比项目MapReduceSparkSpark数据大小102.5TB102TB1000TB耗时72分钟23分钟234分钟节点数2100206206Cores50

40065926592Rate1.4TB/min4.27TB/min4.27TB/minRate/node0.67GB/min20.7GB/min22.5GB/minDaytonaGray类别排序基准规则是是是MapReduce与Spark在排序运算过程中资源使用和执行效率比较11.1.1Spark应用场景11.1.2Spark的特点11.1.3Spark与HadoopMapReduce对比11.1.4大数据处理类型与Spark生态系统任务11.1认识Spark11.1.4大数据处理类型与Spark生态系统复杂的批量数据处理:通常时间跨度在数十分钟到数小时之间基于历史数据的交互式查询:通常时间跨度在数十秒到数分钟之间基于实时数据流的数据处理:通常时间跨度在数百毫秒到数秒之间这样做难免会带来一些问题:不同场景之间输入输出数据无法做到无缝共享,通常需要进行数据格式的转换不同的软件需要不同的开发和维护团队,带来了较高的使用成本比较难以对同一个集群中的各个系统进行统一的资源协调和分配在实际应用中,大数据处理主要包括以下三个类型:当同时存在以上三种场景时,就需要同时部署三种不同的软件MapReduce/Impala/Storm11.1.4大数据处理类型与Spark生态系统Spark的设计遵循“一个软件栈满足不同应用场景”的理念,逐渐形成了一套完整的生态系统既能够提供内存计算框架,也可以支持SQL即席查询、实时流式计算、机器学习和图计算等Spark可以部署在资源管理器YARN之上,提供一站式的大数据解决方案Spark所提供的生态系统足以应对上述三种场景,即同时支持批处理、交互式查询和流数据处理11.1.4大数据处理类型与Spark生态系统Spark生态系统已经成为伯克利数据分析软件栈BDAS(BerkeleyDataAnalyticsStack)的重要组成部分Spark的生态系统主要包含了SparkCore、SparkSQL、SparkStreaming、MLLib和GraphX等组件(1)Spark运行架构。(2)RDD的设计与运行原理。(3)RDD的算子。(4)SparkSQL。本任务通过介绍Spark的基本概念、Spark架构设计方法、Spark运行基本流程、RDD运行原理和SparkSQL设计等知识,帮助读者深入了解Spark的工作原理和架构设计方法,为后续应用打下基础。

【任务描述】【关键步骤】任务11.2Spark原理与架构

11.2.1

Spark运行架构11.2.2RDD的设计与运行原理11.2.3RDD的算子11.2.4SparkSQL任务11.2Spark原理与架构

11.2.1Spark运行架构RDD:是ResillientDistributedDataset(弹性分布式数据集)的简称,是分布式内存的一个抽象概念,提供了一种高度受限的共享内存模型DAG:DirectedAcyclicGraph(有向无环图)的简称,反映RDD之间的依赖关系Executor:是运行在工作节点(WorkerNode)的一个进程,负责运行TaskApplication:用户编写的Spark应用程序Task:运行在Executor上的工作单元

Job:一个Job包含多个RDD及作用于相应RDD上的各种操作Stage:是Job的基本调度单位,一个Job会分为多组Task,每组Task被称为Stage,或者也被称为TaskSet,代表了一组关联的、相互之间没有Shuffle依赖关系的任务组成的任务集Spark基本概念11.2.1Spark运行架构Spark运行架构包括集群资源管理器(ClusterManager)、运行作业任务的工作节点(WorkerNode)、每个应用的任务控制节点(Driver)和每个工作节点上负责具体任务的执行进程(Executor)资源管理器可以自带或Mesos或YARNMesos是Apache下的开源分布式资源管理框架,它被称为是分布式系统的内核。Mesos最初是由加州大学伯克利分校的AMPLab开发的,后在Twitter得到广泛使用。与HadoopMapReduce计算框架相比,Spark所采用的Executor有两个优点:(1)利用多线程来执行具体的任务,减少任务的启动开销;(2)Executor中有一个BlockManager存储模块,会将内存和磁盘共同作为存储设备,有效减少IO开销。Spark架构设计11.2.1Spark运行架构一个Application由一个Driver和若干个Job构成;一个Job由多个Stage构成;一个Stage由多个没有Shuffle关系的Task组成。Spark架构设计11.2.1Spark运行架构(1)首先为应用构建起基本的运行环境,即由Driver创建一个SparkContext,进行资源的申请、任务的分配和监控(2)资源管理器为Executor分配资源,并启动Executor进程(3)SparkContext根据RDD的依赖关系构建DAG图,DAG图提交给DAGScheduler解析成Stage,然后把一个个TaskSet提交给底层调度器TaskScheduler处理;Executor向SparkContext申请Task,TaskScheduler将Task发放给Executor运行,并提供应用程序代码(4)Task在Executor上运行,把执行结果反馈给TaskScheduler,然后反馈给DAGScheduler,运行完毕后写入数据并释放所有资源

Spark运行基本流程11.2.1Spark运行架构总之,Spark运行架构具有以下特点:(1)每个Application都有自己专属的Executor进程,并且该进程在Application运行期间一直驻留。Executor进程以多线程的方式运行Task(2)Spark运行过程与资源管理器无关,只要能够获取Executor进程并保持通信即可(3)Task采用了数据本地性和推测执行等优化机制11.2.1

Spark运行架构11.2.2RDD的设计与运行原理11.2.3RDD的算子11.2.4SparkSQL任务11.2Spark原理与架构

11.2.2RDD的设计与运行原理许多迭代式算法(比如机器学习、图算法等)和交互式数据挖掘工具,共同之处是,不同计算阶段之间会重用中间结果目前的MapReduce框架都是把中间结果写入到HDFS中,带来了大量的数据复制、磁盘IO和序列化开销RDD就是为了满足这种需求而出现的,它提供了一个抽象的数据架构,我们不必担心底层数据的分布式特性,只需将具体的应用逻辑表达为一系列转换处理,不同RDD之间的转换操作形成依赖关系,可以实现管道化,避免中间数据存储RDD的设计背景11.2.2RDD的设计与运行原理一个RDD就是一个分布式对象集合,本质上是一个只读的分区记录集合,每个RDD可分成多个分区,每个分区就是一个数据集片段,并且一个RDD的不同分区可以被保存到集群中不同的节点上,从而可以在集群中的不同节点上进行并行计算RDD提供了一种高度受限的共享内存模型,即RDD是只读的记录分区的集合,不能直接修改,只能基于稳定的物理存储中的数据集创建RDD,或者通过在其他RDD上执行确定的转换操作(如map、join和groupby)而创建得到新的RDDRDD概念(1)11.2.2RDD的设计与运行原理RDD提供了一组丰富的操作以支持常见的数据运算,分为“动作”(Action)和“转换”(Transformation)两种类型RDD提供的转换接口都非常简单,都是类似map、filter、groupBy、join等粗粒度的数据转换操作,而不是针对某个数据项的细粒度修改(不适合网页爬虫)表面上RDD的功能很受限、不够强大,实际上RDD已经被实践证明可以高效地表达许多框架的编程模型(比如MapReduce、SQL、Pregel)Spark用Scala语言实现了RDD的API,程序员可以通过调用API实现对RDD的各种操作RDD概念(2)11.2.2RDD的设计与运行原理RDD读入外部数据源进行创建RDD经过一系列的转换(Transformation)操作,每一次都会产生不同的RDD,供给下一个转换操作使用最后一个RDD经过“动作”操作进行转换,并输出到外部数据源

RDD概念(3)--RDD典型的执行过程如下:这一系列处理称为一个Lineage(血缘关系),即DAG拓扑排序的结果优点:惰性调用、管道化、避免同步等待、不需要保存中间结果、每次操作变得简单动作转换转换转换转换转换创建创建11.2.2RDD的设计与运行原理(1)高效的容错性现有容错机制:数据复制或者记录日志RDD:血缘关系、重新计算丢失分区、无需回滚系统、重算过程在不同节点之间并行、只记录粗粒度的操作(2)中间结果持久化到内存,数据在内存中的多个RDD操作之间进行传递,避免了不必要的读写磁盘开销(3)存放的数据可以是Java对象,避免了不必要的对象序列化和反序列化RDD特性---Spark采用RDD以后能够实现高效计算的原因主要在于:11.2.2RDD的设计与运行原理RDD之间的依赖关系窄依赖表现为一个父RDD的分区对应于一个子RDD的分区或多个父RDD的分区对应于一个子RDD的分区宽依赖则表现为存在一个父RDD的一个分区对应一个子RDD的多个分区11.2.2RDD的设计与运行原理Spark通过分析各个RDD的依赖关系生成了DAG,再通过分析各个RDD中的分区之间的依赖关系来决定如何划分Stage,具体划分方法是:在DAG中进行反向解析,遇到宽依赖就断开遇到窄依赖就把当前的RDD加入到Stage中将窄依赖尽量划分在同一个Stage中,可以实现流水线计算Stage的划分11.2.2RDD的设计与运行原理流水线操作实例分区7通过map操作生成的分区9,可以不用等待分区8到分区10这个map操作的计算结束,而是继续进行union操作,得到分区13,这样流水线执行大大提高了计算的效率Stage的划分被分成三个Stage,在Stage2中,从map到union都是窄依赖,这两步操作可以形成一个流水线操作11.2.2RDD的设计与运行原理RDD运行过程通过上述对RDD概念、依赖关系和Stage划分的介绍,结合之前介绍的Spark运行基本流程,再总结一下RDD在Spark架构中的运行过程:(1)创建RDD对象;(2)SparkContext负责计算RDD之间的依赖关系,构建DAG;(3)DAGScheduler负责把DAG图分解成多个Stage,每个Stage中包含了多个Task,每个Task会被TaskScheduler分发给各个WorkerNode上的Executor去执行。11.2.1

Spark运行架构11.2.2RDD的设计与运行原理11.2.3RDD的算子11.2.4SparkSQL任务11.2Spark原理与架构

11.2.3RDD的算子SparkRDD支持两种类型的操作:动作(action):在数据集上进行运算,返回计算值转换(transformation):基于现有的数据集创建一个新的数据集常用的几个TransformationAPI介绍常用的几个ActionAPI介绍11.2.1

Spark运行架构11.2.2RDD的设计与运行原理11.2.3RDD的算子11.2.4SparkSQL任务11.2Spark原理与架构

11.2.4SparkSQLSparkSQL

和Hive相比,主要区别:SparkSQL的执行引擎为SparkCore,Hive默认的执行引擎为MapReduce;SparkSQL的执行速度是Hive的10~100倍;SparkSQL不支持buckets,而Hive支持。SparkSQL

和Hive的联系:SparkSQL依赖于Hive的元数据;SparkSQL兼容绝大部分Hive的语法和函数;SparkSQL可以使用Hive的自定义函数,完美兼容Hive的函数。SparkSQL和Hive对比11.2.4SparkSQLSparkSQL在Hive兼容层面仅依赖HiveQL解析、Hive元数据,也就是说,从HQL被解析成抽象语法树(AST)起,就全部由SparkSQL接管了。SparkSQL执行计划生成和优化都由Catalyst(函数式关系查询优化框架)负责11.2.4SparkSQL图SparkSQL支持的数据格式和编程语言SparkSQL增加了SchemaRDD(即带有Schema信息的RDD),使用户可以在SparkSQL中执行SQL语句,数据既可以来自RDD,也可以是Hive、HDFS、Cassandra等外部数据源,还可以是JSON格式的数据SparkSQL目前支持Scala、Java、Python三种语言,支持SQL-92规范(1)Spark运行模式。(2)SparkonYARN模式集群安装。(3)SparkRDD操作实战。本任务介绍Spark的6种运行模式,并选择SparkonYARN模式进行安装与配置。完成Spark安装后,利用SparkShell命令进行RDD的创建,介绍常用的转换操作和动作操作命令,以便更好地解决实际问题。【任务描述】【关键步骤】任务11.3Spark集群的安装与部署

11.3.1

Spark运行模式11.3.2SparkonYARN模式集群安装11.3.3SparkRDD操作实战任务11.3Spark集群的安装与部署

11.3.1Spark运行模式Local模式(Spark将作为一个单独的Java进程在本地运行,不需要启动额外的集群资源)Standalone(Spark自带的简单集群管理器,允许Spark自身作为一个独立的集群运行)SparkonYARN(Spark可以作为YARN上的一个应用程序运行,通过YARN向Hadoop集群申请资源并执行作业)SparkonMesos(和Spark有血缘关系,更好支持Mesos,Mesos负责Spark资源调度,在实际应用中相对少见)Kubernetes(K8s)模式(Spark可以作为一个容器化应用部署在Kubernetes集群上)Windows模式(方便用户在Windows环境下学习和使用Spark)Spark支持6种运行模式11.3.1

Spark运行模式11.3.2SparkonYARN模式集群安装11.3.3SparkRDD操作实战任务11.3Spark集群的安装与部署

11.3.2SparkonYARN模式集群安装安装Spark之前需要安装Java环境和Hadoop环境。第1步:下载软件包下载地址:

https:///dist/spark/spark-2.3.1/spark-2.3.1-bin-hadoop2.7.tgz第2步:解压tar-zxvfspark-2.3.1-bin-hadoop2.7.tgz-C/opt/modules/

重命名(cd/opt/modules)mvspark-2.3.0-bin-hadoop2.7spark工具准备11.3.2SparkonYARN模式集群安装第3步:配置环境变量,修改/etc/profile文件,命令如下:vi/etc/profile在profile文件末尾添加如下环境变量:exportSPARK_HOME=/opt/modules/sparkexportPATH=$SPARK_HOME/bin:$PATH使环境变量生效,命令如下:source/etc/profile分发环境变量到其余节点上,命令如下:scp/etc/profileroot@slave1:/etc/profilescp/etc/profileroot@slave2:/etc/profile切换窗口,使slave1、slave2节点上的环境变量生效,命令如下:source/etc/profile11.3.2SparkonYARN模式集群安装第4步:修改配置文件。进入配置文件目录spark/conf,修改Spark配置文件spark-env.sh,命令如下:cd/opt/modules/spark/conf/cpspark-env.sh.templatespark-env.shvispark-env.sh在spark-env.sh文件末尾添加如下代码:exportJAVA_HOME=/opt/modules/jdkexportSPARK_MASTER_IP=masterexportSPARK_MASTER_PORT=7077exportSPARK_WORDER_INSTANCES=1exportSPARK_WORKER_CORES=1exportHADOOP_HOME=/opt/modules/hadoopexportHADOOP_CONF_DIR=/opt/modules/hadoop/etc/hadoop保存并退出。修改集群节点信息slaves,命令如下:cpslaves.templateslavesvislaves删除localhost,并添加节点主机名,内容如下:slave1slave2保存并退出。11.3.2SparkonYARN模式集群安装第5步:分发安装文件。分发安装文件到slave1、slave2节点上,命令如下:scp-r/opt/modules/sparkroot@slave1:/opt/modules/scp-r/opt/modules/sparkroot@slave2:/opt/modules/第6步:启动Spark。在启动Spark前先启动Hadoop。在master节点上执行如下启动命令:start-all.sh在master节点上再启动Spark,命令如下:cd/opt/modules/spark/sbin./start-all.sh查看是否启动成功,如果master节点中有Master进程,slave1、slave2节点中有Worker进程,则说明启动成功。分别在master、slave1、slave2三个节点上执行以下命令:jps进入/opt/module/spark/bin目录,执行命令:spark-shell若出现scala>提示符,则说明spark-shell启动成功。11.3.2SparkonYARN模式集群安装在浏览器中访问29:8080,可查看SparkWebUI启动spark-shell11.3.1

Spark运行模式11.3.2SparkonYARN模式集群安装11.3.3SparkRDD操作实战任务11.3Spark集群的安装与部署

11.3.3SparkRDD操作RDD的创建(1)从内存中读取数据创建RDD。从内存中读取数据创建RDD有两种常用的方式:第一种是将内存中已有的Seq集合转换为RDD;第二种是把已有RDD转换成新的RDD。SparkContext类中主要提供了两个方法:parallelize()方法和makeRDD()方法。命令如下:scala>valrdd=sc.parallelize(Array(1,2,3,4,5,6,7,8))//使用parallelize()方法从集合创建scala>valrdd1=sc.makeRDD(Array(1,2,3,4,5,6,7,8))//使用makeRDD()方法从集合创建11.3.3SparkRDD操作RDD的创建(2)从外部存储系统中读取数据创建RDD。从外部存储系统中读取数据创建RDD是指直接读取存储在文件系统的数据文件中的数据创建RDD。可通过SparkContext类的textFile()方法读取数据集。具体操作如下。先创建一个文本文件text.txt,命令如下:cd/opt/modulesvitext.txt在文件中输入以下单词内容:hadoopmapreducehiveflumehbasesparkstormflumesqoophadoophivekafkasparkhadoopstorm将text.txt上传到HDFS的/user/root目录下,通过从文件中读取数据创建RDD,命令如下:hdfsdfs-mkdir-p/user/root//在HDFS上创建目录/user/roothdfsdfs-puttext.txt/user/root//将text.txt上传到HDFS的/user/root目录下spark-shell//启动spark-shellvalrdd2=sc.textFile("/user/root/test.txt")//通过从HDFS文件中读取数据创建RDDvalrdd3=sc.textFile("file:///opt/modules/test.txt")//通过从Linux本地文件中读取数据创建RDD11.3.3SparkRDD操作RDD常用转换操作1)map(func)操作map功能:返回一个新的RDD,该RDD由每个输入元素经过func函数转换后组成。【例11.3.1】创建一个元素为1~10的数组的RDD,将所有元素乘2形成新的RDD。命令如下:scala>varsource=sc.parallelize(1to10)//创建一个元素为1~10的数组的RDDscala>source.collect()//遍历RDD中的每个元素scala>valmapadd=source.map(_*2)//将source中的所有元素乘2scala>source.map(_*2).collect()//遍历RDD中的每个元素11.3.3SparkRDD操作RDD常用转换操作2)flatMap(func)操作flatMap:类似于map,但是每个输入元素可以被映射为0个或多个输出元素(所以func函数应该返回一个序列,而不是单一元素)。【例11.3.2】创建一个元素为1~5的RDD,运用flatMap创建一个新的RDD,新的RDD为原RDD的每个元素的扩展(1->1,2->1,2,3->1,2,3,4->1,2,3,4,5->1,2,3,4,5)。命令如下:scala>valsourceFlat=sc.parallelize(1to5)//创建RDDscala>sourceFlat.collect()//遍历RDD中的每个元素scala>valflatMap=sourceFlat.flatMap(1to_)//根据原RDD创建新RDD(1->1,2->1,2……5->1,2,3,4,5)scala>flatMap.collec11.3.3SparkRDD操作RDD常用转换操作3)groupBy(func)操作groupBy功能:分组,按照传入函数的返回值进行分组。将相同的Key对应的值放入一个迭代器。【例11.3.3】创建一个RDD,按照元素模2的值进行分组。scala>valrdd=sc.parallelize(1to4)//创建RDDscala>valgroup=rdd.groupBy(_%2)//按照元素模2的值进行分组scala>group.collect//遍历RDD中的每个元素11.3.3SparkRDD操作RDD常用转换操作4)filter(func)操作filter功能:过滤。返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成。【例11.3.4】创建一个RDD(由字符串组成),过滤出一个新RDD(包含“xiao”子串)。命令如下:scala>varsourceFilter=sc.parallelize(Array("xiaoming","xiaojiang","xiaohe","dazhi"))//创建RDDscala>sourceFilter.collect()//遍历每个元素scala>valfilter=sourceFilter.filter(_.contains("xiao"))//过滤出一个新的RDD(包含“xiao”子串)scala>filter.collect()//遍历RDD中的每个元素11.3.3SparkRDD操作RDD常用转换操作5)reduceByKey(func,[numTasks])操作reduceByKey功能:在一个(K,V)形式的数据集上调用,返回一个(K,V)的数据集,使用指定的reduce函数,按照相同Key值聚合,在shuffle之前进行combine(预聚合)操作,返回结果是RDD[k,v],reduce任务的个数可以通过第二个可选的参数来设置。【例11.3.5】创建一个pairRDD,计算相同Key对应值的相加结果。命令如下:scala>valrdd=sc.parallelize(List(("female",1),("male",5),("female",5),("male",2)))//创建一个pairRDDscala>valreduce=rdd.reduceByKey((x,y)=>x+y)//计算相同Key对应值的相加结果scala>reduce.collect()11.3.3SparkRDD操作RDD常用转换操作6)groupByKey()操作groupByKey功能:groupByKey也是对每个Key进行操作,按照相同Key值进行分组,直接进行shuffle,但只生成一个Seq集合。【例11.3.6】创建一个pairRDD,将相同Key对应值聚合到一个Seq集合中,并计算相同Key对应值的相加结果。命令如下:scala>valwords=Array("one","two","two","three","three","three")//创建一个pairRDDscala>valwordPairsRDD=sc.parallelize(words).map(word=>(word,1))scala>valgroup=wordPairsRDD.groupByKey()//将相同Key对应值聚合到一个Seq集合中scala>group.collect()scala>group.map(t=>(t._1,t._2.sum))//计算相同Key对应值的相加结果scala>group.map(t=>(t._1,t._2.sum)).collect()11.3.3SparkRDD操作RDD常用动作操作1)reduce(func)操作reduce功能:通过func函数聚合RDD中的所有元素,先聚合分区内数据,再聚合分区间数据。【例11.3.7】创建一个RDD,将所有元素聚合后得到结果。命令如下:scala>valrdd1=sc.makeRDD(1to10)//创建一个RDD[Int]scala>rdd1.reduce(_+_)//聚合RDD[Int]中的所有元素scala>valrdd2=sc.makeRDD(Array(("a",1),("a",3),("c",3),("d",5)))//创建一个RDD[String]scala>rdd2.collect()//遍历元素scala>rdd2.reduce((x,y)=>

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

最新文档

评论

0/150

提交评论