




版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
RDD编程RDD概念RDD概念DatasourceDatasourceRDDRDDRDDRDDRDDOutput创建创建转换转换转换转换动作RDD特性(1)高效的容错性(2)中间结果持久化到内存,数据在内存中的多个RDD操作之间进行传递,避免了不必要的读写磁盘开销(3)存放的数据可以是Java对象,避免了不必要的对象序列化和反序列化RDD运行原理RDD1RDD2窄依赖RDD3窄依赖RDD7RDD5RDD6RDD4RDD运行原理宽依赖宽依赖RDD8RDD9RDD运行过程Rdd1.join(rdd2).groupBy().filter(…)RDDobjectsDAGScheduler集群资源管理器TaskSchedulerThreadsBlockmanageWorkerDAG
任务集任务PairRDD键值对RDD(PairRDD)是指每个RDD元素都是(key,value)键值对类型,应用场景很多。创建PairRDD(1)从文件系统中加载数据生成RDD,然后使用map()函数转换为PairRDD
scala>vallines=sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")scala>valpairRDD=lines.flatMap(line=>line.split("")).map(word=>(word,1))scala>pairRDD.foreach(println)(Hadoop,1)(Spark,1)(Hive,1)(Spark,1)创建PairRDD(2)通过并行集合创建PairRDD
scala>vallist=List("Hadoop","Spark","Hive","Spark")list:List[String]=List(Hadoop,Spark,Hive,Spark)scala>valrdd=sc.parallelize(list)rdd:org.apache.spark.rdd.RDD[String]=ParallelCollectionRDD[11]scala>valpairRDD=rdd.map(word=>(word,1))pairRDD:org.apache.spark.rdd.RDD[(String,Int)]scala>pairRDD.foreach(println)(Hadoop,1)(Spark,1)(Hive,1)(Spark,1)PairRDD转换操作reduceByKey(func)
scala>pairRDD.reduceByKey((a,b)=>a+b).foreach(println)(Spark,2)(Hive,1)(Hadoop,1)PairRDD转换操作groupByKey(func)scala>scala>pairRDD.groupByKey().foreach(println)(Spark,CompactBuffer(1,1))(Hive,CompactBuffer(1))(Hadoop,CompactBuffer(1))PairRDD转换操作pairRDD.keys
scala>pairRDD.keys.foreach(println)HadoopSparkHiveSparkPairRDD转换操作pairRDD.values
scala>pairRDD.values.foreach(println)1111PairRDD转换操作sortByKey()
scala>pairRDD.sortByKey().foreach(println)(Hadoop,1)(Hive,1)(Spark,1)(Spark,1)小例子scala>valrdd=sc.parallelize(Array(("spark",2),("hadoop",6),("hadoop",4),("spark",6)))scala>rdd.mapValues(x=>(x,1)).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2)).mapValues(x=>(x._1/x._2)).collect()res22:Array[(String,Int)]=Array((spark,4),(hadoop,5))给定一组键值对("spark",2),("hadoop",6),("hadoop",4),("spark",6),键值对的key表示图书名称,value表示某天图书销量,请计算每个键对应的平均值,也就是计算每种图书的每天平均销量。RDD操作RDDRDDRDDRDDRDDOutput创建创建转换转换转换转换动作DatasourceDatasourceRDD综合案例1)
文本文件读取,示例:18001,Spark,10018001,Python,5018001,Hadoop,3018001,Software,9418002,DataBase,1818002,Python,8218002,Hadoop,7618003,Hadoop,92RDD综合案例2)选择RDD或者PairRDD操作(1) 总共有多少学生;(2)共开设来多少门课程;(3)Spark课程共有多少人选修;(4)某位同学的总成绩平均分是多少。RDD综合案例(1) 总共有多少学生;vallines=sc.textFile("file:///usr/local/spark/students.txt")18001,Spark,10018001,Python,5018001,Hadoop,3018001,Software,9418002,DataBase,1818002,Python,8218002,Hadoop,7618003,Hadoop,92。。。。。。。RDD综合案例(1) 总共有多少学生;vallines=sc.textFile("file:///usr/local/spark/students.txt")valstudents=lines.map(row=>row.split(",")(0))180011800118001180011800218002
。。。。。。。RDD综合案例(1) 总共有多少学生;vallines=sc.textFile("file:///usr/local/spark/students.txt")valstudents=lines.map(row=>row.split(",")(0))valstudents_num=students.distinct()//去重操作180011800218003RDD综合案例(1) 总共有多少学生;vallines=sc.textFile("file:///usr/local/spark/students.txt")valstudents=lines.map(row=>row.split(",")(0))valstudents_num=students.distinct()//去重操作students_num.count//取得总数RDD综合案例(2)共开设来多少门课程;vallines=sc.textFile("file:///usr/local/spark/students.txt")18001,Spark,10018001,Python,5018001,Hadoop,3018001,Software,9418002,DataBase,1818002,Python,8218002,Hadoop,7618003,Hadoop,92。。。。。。。RDD综合案例(2)共开设来多少门课程;vallines=sc.textFile("file:///usr/local/spark/students.txt")valcourses=lines.map(row=>row.split(",")(1))SparkPythonHadoopSoftwarePythonHadoop。。。。。。。RDD综合案例(2)共开设来多少门课程;vallines=sc.textFile("file:///usr/local/spark/students.txt")valcourses=lines.map(row=>row.split(",")(1))valcourses_num=num.distinct()//去重操作SparkPythonHadoopSoftwareRDD综合案例(2)共开设来多少门课程;vallines=sc.textFile("file:///usr/local/spark/students.txt")valcourses=lines.map(row=>row.split(",")(1))valcourses_num=num.distinct()//去重操作courses_num.count//取得总数RDD综合案例(3)Spark课程共有多少人选修;vallines=sc.textFile("file:///usr/local/spark/students.txt")18001,Spark,10018001,Python,5018001,Hadoop,3018001,Software,9418002,DataBase,1818002,Python,8218002,Hadoop,7618003,Hadoop,92。。。。。。。RDD综合案例(3)Spark课程共有多少人选修;vallines=sc.textFile("file:///usr/local/spark/students.txt")valSpark=lines.fliter(row=>row.split(",")(1)==“Spark”)Spark.countRDD综合案例(4)某位同学的总成绩平均分是多少。vallines=sc.textFile("file:///usr/local/spark/students.txt")valrdd1=lines.map(row=>(row.split(",")(0),row.split(",")(2).toInt))(18001,100)(18001,50)(18001,30)(18001,94)。。。。。。。RDD综合案例(4)某位同学的总成绩平均分是多少。vallines=sc.textFile("file:///usr/local/spark/students.txt")valrdd1=lines.map(row=>(row.split(",")(0),row.split(",")(2).toInt))valrdd2=rdd1.mapValues(x=>(x,1))(18001,100,1)(18001,50,1)(18001,30,1)(18001,94,1)。。。。。。。RDD综合案例(4)某位同学的总成绩平均分是多少。vallines=sc.textFile("file:///usr/local/spark/students.txt")valrdd1=lines.map(row=>(row.split(",")(0),row.split(",")(2).toInt))valrdd2=rdd1.mapValues(x=>(x,1))valres=rdd2.reduceByKey((x,y)=>(x._1+y._1,x._2+y._2))(18001,274,4)(18002,……)RDD综合案例(4)某位同学的总成绩平均分是多少。vallines=sc.textFile("file:///usr/local/spark/students.txt")valrdd1=lines.map(row=>(row.split(",")(0),row.split(",")(2).toInt))valrdd2=rdd1.mapValues(x=>(x,1))valres=rdd2.reduceByKey((x,y)=>(x._1+y._1,x._2+y._2))res.mapValues(x=>x._1/x._2).collect()RDD操作——创建RDD(1)从本地文件系统中加载数据
(2)从分布式文件系统HDFS中加载数据scala>vallines=sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")scala>vallines=sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")RDD操作——创建RDD(3)调用SparkContext的parallelize方法,在Driver中一个已经存在的集合(数组)上创建。
scala>valarray=Array(1,2,3,4,5)array:Array[Int]=Array(1,2,3,4,5)scala>valrdd=sc.parallelize(array)rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[13]atparallelizeat<console>:29RDD操作——创建RDD调用SparkContext的parallelize方法,也可以从列表中创建scala>vallist=List(1,2,3,4,5)list:List[Int]=List(1,2,3,4,5)scala>valrdd=sc.parallelize(list)rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[14]atparallelizeat<console>:29RDD操作——转换操作lines.filter(line=>line.contains(“spark”))RDD(lines)rdd.filter()IlikehadoopSparkisfastIlikeSparkSparkisfastIlikeSparkRDDRDD操作——转换操作lines.map(line=>line.split(“”))RDD(lines)rdd.map()IlikehadoopSparkisfastIlikeSparkRDDArray(“I”,”like”,”hadoop”)Array(“Spark”,”is”,”fast”)Array(“I”,”like”,”spark”)RDD操作——转换操作lines.flatmap(line=>line.split(“”))RDD(lines)rdd.flatmap()IlikehadoopSparkisfastIlikeSparkRDD“I””like”“hadoop”“Spark””is”“fast”“I””like”“Spark”RDD操作——转换操作rdd.reduceByKey((a,b)=>a+b)RDD(“Iike”,1)(“hadoop”,1)(“Spark”,1)(“fast”,1)(”like”,1)(“Spark”,1)rdd.reduceByKey()(“Iike”,2)(“hadoop”,1)(“Spark”,2)(“fast”,1)RDD中间结果key,Value-list例如:(spark,(1,1))RDD操作——转换操作rdd.reduceByKey((a,b)=>a+b)RDD(“Iike”,1)(“hadoop”,1)(“Spark”,1)(“fast”,1)(”like”,1)(“Spark”,1)rdd.reduceByKey()(“Iike”,2)(“hadoop”,1)(“Spark”,2)(“fast”,1)RDDspark,(1,1))RDD操作——动作操作count()
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 卫生谚语教学课件
- 毛概材料分析试题及答案
- 2024年广告设计师证书全貌试题信息
- 往年云南特岗试题及答案
- 篮球比赛编排试题及答案
- 陈列理论考试题库及答案
- 部队文书考试题库及答案
- 人民相关面试题及答案
- 名校调研初中试题及答案
- 2024年纺织品行业勃兴与挑战试题及答案
- 设备现场调试记录表完整
- 液化天然气接收站安全管理规定
- GB/T 18760-2002消费品售后服务方法与要求
- GB/T 1443-1996机床和工具柄用自夹圆锥
- 影像诊断与手术后符合率统计表
- 中考语文作文专题复习:以小见大的写作技巧
- 高三主题班会三轮复习动员 冲刺高考课件
- 机械厂降压变电所的电气设计概述
- 历史小剧场《万隆会议》剧本
- 国家开放大学《社区护理学(本)》形考任务1-5参考答案
- 施工进度计划网络图及横道图
评论
0/150
提交评论