版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
项目4RDD编程实践学习目标(1)(2)(5)掌握如何创建RDD。熟悉Spark基本转换操作。掌握数据分区与持久化方法。学习目标(3)熟悉Spark基本动作操作。(4)掌握Spark如何读取不同格式的文件数据。任务1RDD编程基础4.1.1RDD创建从外部存储设备创建RDD是指直接读取一个存放在文件系统的数据文件来创建RDD,这是用于实践操作的常用方法。Spark采用textFile()方法来从文件系统中加载数据创建RDD,该方法把文件的URI作为参数,这个URI可以是本地文件系统的地址、分布式文件系统HDFS的地址等。4.1.1RDD创建1.从HDFS文件创建RDD这种方式最为简单,也最为常用,直接通过textFile命令读取HDFS文件的位置即可。在HDFS上有一个文件“/data/result_bigdata.txt”,读取该文件创建一个RDD。代码如下。scala>vallines=sc.textFile("/data/result_bigdata.txt")lines:org.apache.spark.rdd.RDD[String]=/data/result_bigdata.txtMapPartitionsRDD[1]attextFileat<console>:244.1.1RDD创建2.从Linux本地文件创建RDD本地文件读取也是通过sc.textFile("路径")方法来完成的,在路径前加上“file://”表示从本地Linux文件系统读取。例如,在本地目录/usr/local/spark/mycode/rdd下读取student.txt文件,并统计student.txt的数据行数。在spark-shell交互式环境中,执行如下命令。scala>vallines=sc.textFile("file:///usr/local/spark/mycode/rdd/student.txt")lines:org.apache.spark.rdd.RDD[String]=file:///usr/local/spark/mycode/rdd/student.txtMapPartitionsRDD[1]attextFileat<console>:24scala>lines.count()res0:Long=124.1.1RDD创建3.从内存中已有数据创建RDD可以调用SparkContext中的parallelize方法,从一个已经存在的集合(数组)上创建RDD,命令如下。scala>valdata=Array(1,2,3,4,5)data:Array[Int]=Array(1,2,3,4,5)scala>valdistData=sc.parallelize(data)distData:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[2]atparallelizeat<console>:264.1.2
RDD操作SparkRDD提供了丰富的操作方法用于操作分布式数据集合。其中包含了转换操作(Transformation)和行动(Action)两部分。对于RDD而言,每一次转换操作都会产生不同的RDD,供给下一个“转换”使用。转换得到的RDD是惰性求值的,也就是说,整个转换过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到行动操作时,才会发生真正的计算,开始从血缘关系源头进行物理的转换操作,如图4-1所示。4.1.2
RDD操作图4-1SparkRDD操作4.1.2
RDD操作表4-1常用的RDD转换操作API1.转换操作转换操作通过某种函数将一个RDD转换成一个新的RDD,但是转换操作是懒操作,不会立刻执行计算。行动操作是用于触发转换操作的操作,这时才会真正开始进行计算。表4-1给出了常用的RDD转换操作API,其中很多操作都是高阶函数。例如,filter(func)就是一个高阶函数,这个函数的输入参数func也是一个函数。4.1.2
RDD操作1.转换操作1)filter(func)filter是一种基础的RDD转换操作,filter(func)会筛选出满足函数func的元素,并返回一个新的数据集。例如:scala>vallines=sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")lines:org.apache.spark.rdd.RDD[String]=file:///usr/local/spark/mycode/rdd/word.txtMapPartitionsRDD[4]attextFileat<console>:24scala>vallinesWithSpark=lines.filter(line=>line.contains("Spark"))linesWithSpark:org.apache.spark.rdd.RDD[String]=MapPartitionsRDD[5]atfilterat<console>:264.1.2
RDD操作1.转换操作上述语句的执行过程如图4-2所示。图4-2filter()操作实例执行过程示意图4.1.2
RDD操作1.转换操作2)map(func)map(func)操作将每个元素传递到函数func中,并将结果返回为一个新的数据集。例如:scala>valdata=Array(1,2,3,4,5)data:Array[Int]=Array(1,2,3,4,5)scala>valrdd1=sc.parallelize(data)rdd1:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[6]atparallelizeat<console>:26scala>valrdd2=rdd1.map(x=>x+10)rdd2:org.apache.spark.rdd.RDD[Int]=MapPartitionsRDD[7]atmapat<console>:284.1.2
RDD操作1.转换操作上述语句的执行过程如图4-3所示。图4-3map()操作实例执行过程示意图4.1.2
RDD操作1.转换操作3)flatMap(func)flatMap(func)与map()相似,但每个输入元素都可以映射到0或者多个输出结果。例如:scala>vallines=sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")lines:org.apache.spark.rdd.RDD[String]=file:///usr/local/spark/mycode/rdd/word.txtMapPartitionsRDD[8]attextFileat<console>:24scala>valwords=lines.flatMap(line=>line.split(""))words:org.apache.spark.rdd.RDD[String]=MapPartitionsRDD[9]atflatMapat<console>:264.1.2
RDD操作1.转换操作上述语句的执行过程如图4-4所示。图4-4flatMap()操作实例执行过程示意图4.1.2
RDD操作1.转换操作4)groupByKey()groupByKey()应用于(K,V)键值对的数据集时,返回一个新的(K,Iterable)形式的数据集。如图4-5所示,名称为words的RDD中包含9个元素,每个元素都是<String,Int>类型,也就是(K,V)键值对类型。words.groupByKey()操作执行后,所有key相同的键值对,它们的value都被归并到一起。例如,("is",1)、("is",1)、("is",1)这3个键值对的key相同,就会被归并成一个新的键值对("is",(1,1,1)),其中,key是"is",value是(1,1,1),而且value会被封装成Iterable(一种可迭代集合)。4.1.2
RDD操作1.转换操作4)groupByKey()图4-5groupByKey()操作实例执行过程示意图4.1.2
RDD操作1.转换操作5)reduceByKey(func)reduceByKey(func)应用于(K,V)键值对的数据集时,返回一个新的(K,V)形式的数据集,其中的每个值是将每个key传递到函数func中进行聚合后得到的结果。如图4-6所示,名称为words的RDD中包含9个元素,每个元素都是<String,Int>类型,也就是(K,V)键值对类型。words.reduceByKey((a,b)=>a+b)操作执行后,所有key相同的键值对,它们的4.1.2
RDD操作1.转换操作5)reduceByKey(func)图4-6reduceByKey()操作实例执行过程示意图4.1.2
RDD操作2.行动操作行动操作是真正触发计算的地方。Spark程序执行到行动操作时,才会执行真正的计算,从文件中加载数据,完成一次又一次转换操作,最终,完成行动操作得到结果。表4-2列出了RDD行动操作API。4.1.2
RDD操作2.行动操作表4-2常用的RDD行动操作API4.1.2
RDD操作3.惰性机制所谓惰性机制,是指整个转换过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到行动操作时才会触发“从头到尾”的真正的计算。这里给出一段简单的语句来解释Spark的惰性机制。scala>vallines=sc.textFile("data.txt")scala>vallineLengths=lines.map(s=>s.length())scala>valtotalLength=lineLengths.reduce((a,b)=>a+b)任务2RDD应用——学生成绩分析任务2RDD应用——学生成绩分析每年学校都会有许多考试,现有学生的相关信息,以及大数据技术和Python数据分析两门课程的成绩,分别存储在student.txt、result_bigdata.txt和result_python.txt3个文件中。文件内容如图4-7所示。图4-7学生信息及成绩图4.2.1创建RDD分别读取两个学生成绩表中的数据来创建RDD,bigdata为大数据技术成绩表创建的RDD,python为Python数据分析成绩表创建的RDD。scala>valbigdata=sc.textFile("/data/result_bigdata.txt")bigdata:org.apache.spark.rdd.RDD[String]=/data/result_bigdata.txtMapPartitionsRDD[1]attextFileat<console>:24scala>valpython=sc.textFile("/data/result_python.txt")python:org.apache.spark.rdd.RDD[String]=/data/result_python.txtMapPartitionsRDD[2]attextFileat<console>:244.2.2查找每门课程成绩排名前3的同学先将两个成绩表的RDD中的数据进行转换,每条数据被分割成3列,表示学生ID、课程和成绩,分隔符为“\\t”,存储为三元组格式,将成绩转化为Int类型(可以直接通过toInt来转化):scala>valbigdata=sc.textFile("/data/result_bigdata.txt")bigdata:org.apache.spark.rdd.RDD[String]=/data/result_bigdata.txtMapPartitionsRDD[3]attextFileat<console>:24scala>valpython=sc.textFile("/data/result_python.txt")python:org.apache.spark.rdd.RDD[String]=/data/result_python.txtMapPartitionsRDD[4]attextFileat<console>:244.2.2查找每门课程成绩排名前3的同学scala>valm_bigdata=bigdata.map{x=>valline=x.split("\\t");(line(0),line(1),line(2).toInt)}m_bigdata:org.apache.spark.rdd.RDD[(String,String,Int)]=MapPartitionsRDD[5]atmapat<console>:26scala>valm_python=python.map{x=>valline=x.split("\\t");(line(0),line(1),line(2).toInt)}m_python:org.apache.spark.rdd.RDD[(String,String,Int)]=MapPartitionsRDD[6]atmapat<console>:264.2.2查找每门课程成绩排名前3的同学通过sortBy对元组中的成绩列降序排序,排序位置是每个元组的第3位的成绩:scala>valsort_bigdata=m_bigdata.sortBy(x=>x._3,false)sort_bigdata:org.apache.spark.rdd.RDD[(String,String,Int)]=MapPartitionsRDD[7]atsortByat<console>:28scala>valsort_python=m_python.sortBy(x=>x._3,false)sort_python:org.apache.spark.rdd.RDD[(String,String,Int)]=MapPartitionsRDD[8]atsortByat<console>:284.2.2查找每门课程成绩排名前3的同学通过take()操作取出每个RDD的前3个值,即成绩排在前3的学生:scala>sort_bigdata.take(3)res5:Array[(String,String,Int)]=Array((1003,大数据技术,100),(1007,大数据技术,100),(1004,大数据技术,99))scala>sort_python.take(3)res6:Array[(String,String,Int)]=Array((1003,Python数据分析,100),(1004,Python数据分析,100),(1001,Python数据分析,96))4.2.3输出单科成绩为100分的学生ID首先创建RDD并转换,成绩为Int类型。scala>valbigdata=sc.textFile("/data/result_bigdata.txt").map{x=>valline=x.split("\\t");(line(0),line(1),line(2).toInt)}bigdata:org.apache.spark.rdd.RDD[(String,String,Int)]=MapPartitionsRDD[9]atmapat<console>:24scala>valpython=sc.textFile("/data/result_python.txt").map{x=>valline=x.split("\\t");(line(0),line(1),line(2).toInt)}python:org.apache.spark.rdd.RDD[(String,String,Int)]=MapPartitionsRDD[10]atmapat<console>:244.2.3输出单科成绩为100分的学生ID通过filter操作过滤出成绩为100分的学生数据,并通过map操作提取学生ID。scala>valbigdata_Id=bigdata.filter(x=>x._3==100).map(x=>x._1)bigdata_Id:org.apache.spark.rdd.RDD[String]=MapPartitionsRDD[11]atmapat<console>:26scala>valpython_Id=python.filter(x=>x._3==100).map(x=>x._1)python_Id:org.apache.spark.rdd.RDD[String]=MapPartitionsRDD[12]atmapat<console>:264.2.3输出单科成绩为100分的学生ID将两个表得到的学生ID通过union操作合并到一个RDD中,并利用distinct操作去重,就可以得到所有至少有一科成绩为100分的学生的ID。scala>valid=bigdata_Id.union(python_Id).distinct()id:org.apache.spark.rdd.RDD[String]=MapPartitionsRDD[16]atdistinctat<console>:32scala>id.collectres9:Array[String]=Array(1003,1007,1004)任务3持久化与数据分区4.3.1持久化由于SparkRDD是惰性求值的,因此,如果需要多次使用一个RDD,那么调用行动操作时每次都需要重复计算RDD及它的依赖。在迭代算法计算中,由于常常需要对同一组数据多次使用,因此消耗会很大。下面是多次计算同一个RDD的例子。scala>vallist=List("American","China","Russia","British","France")list:List[String]=List(American,China,Russia,British,France)scala>valrdd=sc.parallelize(list)rdd:org.apache.spark.rdd.RDD[String]=ParallelCollectionRDD[18]atparallelizeat<console>:26scala>println(rdd.count())//行动操作,触发一次真正从头到尾的计算5scala>println(rdd.collect().mkString(","))//行动操作,触发一次真正从头到尾的计算American,China,Russia,British,France4.3.1持久化RDD的持久化操作可以通过persist()方法和cache()方法来完成。persist()方法可以把一个RDD标记为持久化,之所以说“标记为持久化”,是因为出现persist()语句的地方并不会马上计算生成RDD并把它持久化,而是要等到第一个行动操作触发真正计算以后才会把计算结果进行持久化,持久化后的RDD将会被保留在计算节点的内存中以供后面的行动操作重复使用。4.3.1持久化persist()的圆括号中包含的是持久化级别参数,可以有如下不同的级别。(1)persist(MEMORY_ONLY):表示将RDD作为反序列化的对象存储在JVM中,如果内存不足,就要按照LRU原则替换缓存中的内容。(2)persist(MEMORY_AND_DISK):表示将RDD作为反序列化的对象存储在JVM中,内存不足,超出的分区将会被存放在磁盘上。4.3.1持久化使用cache()方法时,会调用persist(MEMORY_ONLY)。针对上面的实例,增加持久化语句后的执行过程如下。scala>vallist=List("American","China","Russia","British","France")list:List[String]=List(American,China,Russia,British,France)scala>valrdd=sc.parallelize(list)rdd:org.apache.spark.rdd.RDD[String]=ParallelCollectionRDD[18]atparallelizeat<console>:26scala>rdd.cache()res14:rdd.type=ParallelCollectionRDD[19]atparallelizeat<console>:264.3.1持久化使用cache()方法时,会调用persist(MEMORY_ONLY)。针对上面的实例,增加持久化语句后的执行过程如下。//第1次行动操作,触发一次真正从头到尾的计算,这时上面的rdd.cache()才会被执行,把这个rdd//放到缓存中scala>println(rdd.count())5//第2次行动操作,不需要触发从头到尾的计算,只需要重复上面缓存中的rdd即可scala>println(rdd.collect().mkString(","))American,China,Russia,British,France4.3.2数据分区SparkRDD是由多个分区组成的数据集合,在分布式程序中,通信的代价是很大的,因此控制数据分区、减少网络传输是提高整体性能的一个重要方面。通常RDD会很大,被分成多个分区,分别保存在不同的节点上。如图4-8所示,一个集群包含4个工作节点(WorkerNode),分别是WorkerNode1、WorkerNode2、WorkerNode3和WorkerNode4,假设有两个RDD,即rdd1和rdd2,其中rdd1包含5个分区(p1、p2、p3、p4和p5),rdd2包含3个分区(p6、p7和p8)。4.3.2数据分区图4-8RDD分区被保存到不同节点上4.3.2数据分区Spark的系统分区方式有两种:1)一种是哈希分区(hashpartitioner),即根据哈希值分区;1)另一种是范围分区(rangepartitioner),即将一定范围的数据映射到一个分区中。4.3.2数据分区要实现自定义分区器,需要继承org.apache.spark.Partitioner类并实现其中的3个方法。这个方法返回想要创建的分区个数。(1)defnumPartitions:Int:
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 肠道健康修复膳食干预方案
- 淡水池塘高密度养殖水质管理规范
- 小针刀微创治疗操作规范流程
- 用坐标表示地理位置(教学课件)2025-2026学年人教版七年级数学下册
- 家政入户礼仪培训标准化手册
- 肠胃调理食疗汤品指引
- 痛经调理问诊话术指导手册
- 生鲜农产品冷链运输操作规程
- 玉米蚜虫绿色防控技术指引
- 电力新能源行业市场前景及投资研究报告:未来产业投资地图“氢能”储能
- GB/T 47355-2026外包指南
- 2025年村公共服务专员招聘笔试试题及答案
- 中国鼻咽癌诊治指南(2026版)
- 市场监督管理局全流程市场监管工作手册(标准版)
- 国航股份信息管理部校园招聘笔试题库2026
- 2026贵州磷化集团社会招聘77人笔试历年备考题库附带答案详解
- 雨课堂学堂在线学堂云《人工智能导论(复旦)》单元测试考核答案
- 水利站人员培训考核制度
- 统编人教版五年级语文下册《田忌赛马》示范教学课件
- 急性气管支气管炎课件教学
- 压力仪表培训课件
评论
0/150
提交评论