版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
RDD应用编程熟悉SparkRDD的常用编程操作掌握Spark实现基本的数据分析掌握Spark的应用程序运行方法实验目标实验说明本实验教程包括了SparkRDD实验案例操作,包括RDD的数据输入,处理,输出,API的应用练习。实验的操作系统是Linux,本实验选择了Ubuntu,其它的Linux操作系统也可以完成本实验,但是涉及到操作系统的具体安装命令可能会有所区别Spark的运行环境可以是单机运行或集群运行,而集群运行方式可通过SparkStandalone、YARN、MESOS等实现。本实验为熟练Spark应用程序的设计与运行原理,基于单机运行Spark应用程序spark命令行运行在Spark/sbin目录中运行以下的命令来启动命令行运行环境:$./spark-shellSparkRDD的常用编程操作练习1.map\flatMapvala=sc.parallelize(List(“bit”,”liu”,”neusoft”,”chengdu”),3)valb=a.map(word=>word.length)valc=a.zip(b)c.collectvala=sc.parallelize(1to10,5)a.flatMap(num=>1tonum).collect//把每一个num映射到从1到num的序列练习2.mapPartitions/mapPartitionWithIndexvala=sc.parallelize(1to9,3)defmyfunc[T](iter:Iterator[T]):Iterator[(T,T)]={ varres=List[(T,T)]() varpre=iter.next while(iter.hasNext){ valcur=iter.next res.::=(pre,cur) pre=cur} res.Iterator}a.mapPartitions(myfunc).collectres3:Array[(Int,Int)]=Array((2,3),(1,2),(5,6),(4,5),(8,9),(7,8))因为有3个分区每个分区数(1,2,3)(4,5,6)(7,8,9),然后调用mapPartitions方法传入myfun,函数是先构建一个空list,输入单元素集合iter,输出双元素Tuple集合,把分区中的一个元素+它下一个元素组成Tuple。因为每个分区最后一个元素没有下一个元素所以(3,4)(6,7)不在结果中1,2,34,5,67,8,91,22,34,55,67,88,9练习2.mapPartitions/mapPartitionWithIndexvala=sc.parallelize(List(1,2,3,4,5,6,7,8,9,10),3)defmyfunc(index:Int,iter:Iterator[Int]):Iterator[String]={ iter.toList.map(x=>index+”,”+x).iterator}a.mapPartitionsWithIndex(myfunc).collectres3:Array[String]=Array(0,1,0,2,0,3,1,4,1,5,1,6,2,7,2,8,2,9,2,10)
分区从0开始,每个分区的数据分别列出练习3、foreach\foreachPartitionvala=sc.parallelize(List(“bit”,”liu”,”neusoft”,”chengdu”),3)a.foreach(x=>println(x+“isneusoft”))打印对象a内容
vala=sc.parallelize(List(1,2,3,4,5,6,7,8,9),3)a.foreachPartition(x=>println((a,b)=>x.reduce(a+b)))61524每个分区的数据分别reduce操作累加练习4、共享变量1.广播变量允许用户将一个只读变量缓存在每一台机器上,而不用复制到每个任务当中。一台机子的不同任务共享该变量,可供一个或多个Spark操作使用。用法:通过对一个类型T的对象调用SparkContext.broadcast创建出一个BroadCast[T]对象,任何可序列化的类型都可以这么实现。通过value属性访问该对象的值变量只会被发到各个节点一次,应作为只读值处理。(修改这个值不会影响到别的节点)练习4、共享变量scala>valbroadcastVar=sc.broadcast(Array(1,2,3))broadcastVar:org.apache.spark.broadcast.Broadcast[Array[Int]]=Broadcast(0)scala>broadcastVar.valueres0:Array[Int]=Array(1,2,3)练习5、分区数设置加载文件创建RDD时的分区数valtextFile=sc.textFile("/input/README.md")textFile.toDebugStringtextFile.dependenciestextFile.partitions.sizesc.defaultParallelismsc.getConf.getInt(“spark.default.parallelism”,8)//parallelism当前默认并行数,对本地文件以默认并行数为RDD的分区数,设置默认值8,如不设置默认local为总的内核数。实际分区数由加载文件的splits数决定。练习6、分区相关API1.coalease对RDD重新分区(增加分区需要设置shuffle为true)valarrRdd=sc.parallelize(List(“a”,“b”,”c”,”c”,”e”),2)//设置分区数2arrRdd.partitions.size//查看分区数2arrRdd.toDebugString//查看RDD的Lineage关系图arrRdd.coalesce(1).partitions.size//没有指定true,设置更小的分区数不起作用。arrRdd.coalesce(1,true).partitions.size//如果shuffle为true,对应的lineage关系图中会有shuffleRDD,会存在shuffle过程练习6、分区相关API2.repartition通过coalesce来实现,但是shuffle为true,会产生shuffle过程valarrRdd=sc.parallelize(List(“a”,“b”,”c”,”c”,”e”),2)//设置分区数2arrRdd.partitions.size//查看分区数2arrRdd.repartition(1).partitions.size//设置分区数1练习7、集合操作1.cartesian两个RDD的笛卡尔积valstrRdd=sc.parallelize(List(“a”,“b”,”c”,”c”,”e”),2)//设置分区数2valintRdd=sc.parallelize(List(1,2,34,5,6),2)
valfirst=strRddcartesianintRddfirst.collect练习7、集合操作2.unionvalstrRdd=sc.parallelize(List(“a”,“b”,”c”,”c”,”e”),2)//设置分区数2valrightRdd=sc.parallelize(List(“e”,”f”),1)
strRdd.union(rightRdd).collectvala=sc.parallelize(1to4,2)valb=sc.parallelize(2to4,1)(a++b).collect
练习7、集合操作2.unionvalstrRdd=sc.parallelize(List(“a”,“b”,”c”,”c”,”e”),2)//设置分区数2valrightRdd=sc.parallelize(List(“e”,”f”),1)
strRdd.union(rightRdd).collect不会去重,新RDD的分区数是分区数和strRdd.union(rightRdd).partitions.size//3练习7、集合操作3.zip拉链功能将两个RDD的第i个元素组成一个元祖形成k-v类型RDDvalkRdd=sc.parallelize(1to4,2);valstrRdd=sc.parallelize(“abcd”.split(“”),2)//设置分区数2kRdd.zip(vRdd).toDebugString对应分区的个数需一致,元素个数需一致。练习7、集合操作4.groupBy查看以下三种写法效果vala=sc.parallelize(1to9,3)a.groupBy(x=>{if(x%2==0)”even”else“odd”}).collectvala=sc.parallelize(1to9,3)defmyfunc(a:Int):Int={a%2}a.groupBy(myfunc).collectvala=sc.parallelize(1to9,3)defmyfunc(a:Int):Int={a%2}a.groupBy(myfunc(_),1).collect练习7、集合操作5.distinct\subtractvala=sc.parallelize(List(1,2,2,3,4,5,6,6,7,8,9,4,10))a.distinct.collecta.distinct(2).partitions.length//重载参数是分区数使用reduceByKey来去重vala=sc.parallelize(1to9,3)valb=sc.parallelize(1to3,3)valc=a.subtract(b)c.collect//把a中包含b的元素删除掉,根据subtractByKey来删除练习7、集合操作6.cache/persistvalc=sc.parallelize(List(1,2,3,4,5),1)c.cache//把RDD缓存在内存中vala=sc.parallelize(1to5,1)a.persist(StorageLevel.MEMORY_ONLY)//指定持久化等级,等价于cache方法练习7、集合操作7.sample随机对集合元素采样,获得一个新RDDvala=sc.parallelize(1to10000,2)a.sample(false,0.1,0).collecta.sample(true,0.3,0).collect第一个参数true使用放回采样(泊松抽样会产生重复)false不放回(伯努力抽样)第二个参数fraction是百分比第三个参数seed是种子练习8、键值对Transformation操作1.groupByKey类似groupBy把相同key的value集聚形成一个序列。开销较大如果对相同key的value聚合推荐aggregateByKey或者reduceByKeyvala=sc.parallelize(List(“mk”,”liu”,”wuh”,”cd”,”abc”),2)valb=a.keyBy(x=>x.length)//使用keyBy生成key的键值对集合b.groupByKey.colllect//以单词字数为Key的values练习8、键值对Transformation操作2.combineByKey高效将键相同的value合并成序列,能自定义分区器和是否在map端进行聚合操作vala=sc.parallelize(List(“mk”,”liu”,”wuh”,”cd”,”abc”,“zp”,”lp”),2)valb=sc.parallelize(List(1,2,2,3,4,2,2),2)valc=b.zip(a)//合成(1,mk)vald=bineByKey(List(_),(x:List[String],y:String)=>y::x,(x:List[String],y:List[String])=>x:::y)d.collect第一个参数将输入放入List集合中第二个参数把y字符串合并到x链表中第三个参数把x链表合并到y链表中练习8、键值对Transformation操作3.reduceByKey对相同key的value聚集操作,在发送结果给reduce前会在map端执行本地的merge操作,底层就是调用combineByKey方法查看以下两例的结果vala=sc.parallelize(List(“mk”,”liu”,”wuh”,”cd”,”abc”,“zp”,”lp”),2)valb=a.map(x=>(x.length,x))b.reduceByKey((a,b)=>a+b).collectvala=sc.parallelize(List(3,12,123,23,4),2)valb=a.map(x=>(x.toString.length,x))b.reduceByKey(_+_).collect练习8、键值对Transformation操作4.sortByKey对key值进行排序(字母按顺序,数字从小到大。true升序false降序)vala=sc.parallelize(List(“mk”,”liu”,”wuh”,”cd”,”abc”,“zp”,”lp”),2)valb=sc.parallelize(1toa.count.toInt,2)valc=a.zip(b)c.sortByKey(true).collectc.sortByKey(false).collect练习8、键值对Transformation操作5.cogroup根据相同key聚集value,最多3个键值对的RDD。是一个高效的函数vala=sc.parallelize(List(1,2,2,3,1,3),1)valb=a.map(x=>(x,”b”))valc=a.map(y=>(y,”c”))b.cogroup(c).collectvald=a.map(z=>(z,”z”))b.cogroup(c,d).collect练习8、键值对Transformation操作6.join本质是cogroup实现聚集和flatMapValues实现笛卡尔积vala=sc.parallelize(List(“mk”,”liu”,”wuh”,”cd”,”abc”),2)valb=a.keyBy(_.length)//生成key得到k-vRDD(3,liu)valc=sc.parallelize(List(“mk”,”liu”,”wuh”,”cd”,”abc”,“zp”,”lp”),2)vald=c.keyBy(_.length)b.join(d).collect练习9、Action操作1.collect以数组返回vala=parallelize(1to9,3)a.collect2.reduce累加和valb=parallelize(1to10)b.reduce((a,b)=>a+b)3.take取前n个元素valc=sc.parallelize(List(“a”,”b”,”c”,”d”),2)c.take(2)练习9、Action操作4.top得到前k个元素vald=sc.parallelize(Array(1,2,13,4,5,6,7,8,9),3)d.count//返回RDD中的个数d.top(3)//Array(13,9,8)5.takeSample返回固定大小数组采用集vale=sc.parallelize(1to100,2)e.takeSample(true,30,1)//true是否放回,30固定大小,1随机种子6.countByKey根据key计算countvalf=sc.parallelize(List((1,”a”),(2,”bc”),(3,”liu”),(3,”neu”),(3,”cd”)),2)f.countByKey练习9、Action操作7.aggregate先将每个分区里元素进行聚合,然后用聚合函数将每个分区的结果和初始值进行聚合分区0reduce操作max(0,2,3)=3分区1reducemax(0,4,5)=5分区2reducemax(0,6,7)=7最后combine操作是0+3+5+7=15valz=sc.parallelize(List(2,3,4,5,6,7),3)z.aggregate(0)((a,b)=>math.max(a,b),(c,d)=>c+d)//0是初始值,传入两个函数seqOp,combOp。seqOp是一个分区reduce,combOp是第二个reduce聚合各分区valz=sc.parallelize(List(“a”,”b”,”c”,”d”,”e”,”f”),2)z.aggregate(“x”)(_+_,_+_)//xxdefxabc最后的combine=x+xdef+xabc基本的数据分析练习一、
文本数据的操作公司文件大部分以文本文件格式保存,如访问日志,监控日志等。在收集后需要进行各种数据处理,这里需要进行外部文件加载对文件数据进行转换、过滤各种数据统计处理结果存储2.实现过程2.1准备测试数据将README.md上传到Hadoophadoopfs-putREADME.md/input/README.mdhadoopfs-ls/input/加载转换操作练习1.外部文件加载./bin/spark-shell--masterSpark://master:7077scala>valtextFile=sc.textFile("/input/README.md")练习2.修改显示日志scala>importorg.apache.log4j.{Level,Logger}scala>Logger.getLogger("org.apache.spark").setLevel(Level.WARN)练习3.使用take获取textFile前n个元素,first获取第1个元素,使用filter过滤”Spark”,collect获取RDD结果scala>textFile.take(1)scala>textFile.firstscala>textFile.filter(_.contains("Spark"))scala>textFile.filter(_.contains("spark"))scala>textFile.filter(_.contains("spark")).collect运行并查看结果运行并查看结果统计分析操作练习4.使用count统计textFile个数(行数),获取单词数,获取单词个数最多的行的单词个数scala>textFile.countres26:Long=98scala>textFile.flatMap(_.split("")).countres27:Long=527scala>textFile.map(_.split(“”).size).reduce((a,b)=>if(a>b)aelseb)res30:Int=14//对每一行单词计数后reduce对RDD每个分区的数据进行归并操作,最终对每个分区数据的归并结果再进行归并。这里是获取map的RDD元素的最大值就是比较每行包含的单词个数可以改用Math.max(a,b)统计分析操作练习5实现WordCountscala>valwor
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 2024年河北劳动关系职业学院马克思主义基本原理概论期末考试真题汇编
- 2025年河北工业大学马克思主义基本原理概论期末考试笔试题库
- 2025年重庆五一职业技术学院马克思主义基本原理概论期末考试笔试真题汇编
- 2024年安徽工业经济职业技术学院马克思主义基本原理概论期末考试真题汇编
- 2024年西安理工大学高科学院马克思主义基本原理概论期末考试笔试题库
- 2025年赤峰职业技术学院马克思主义基本原理概论期末考试笔试真题汇编
- 2024年芜湖学院马克思主义基本原理概论期末考试笔试题库
- 2025年上海建设管理职业技术学院马克思主义基本原理概论期末考试模拟试卷
- 2025年深圳大学马克思主义基本原理概论期末考试真题汇编
- 2024年黄河水利职业技术大学马克思主义基本原理概论期末考试真题汇编
- GB/T 10454-2025包装非危险货物用柔性中型散装容器
- 贵州药品追溯管理办法
- 租车牌车辆抵押合同范本
- 家电基础知识培训内容课件
- 大健康行业趋势
- 电商公司选品管理制度
- 石化企业保密管理制度
- 娱乐直播公司全套管理制度
- 软件开发生命周期考核题及答案
- (高清版)DG∕TJ 08-2299-2019 型钢混凝土组合桥梁设计标准
- 污泥干化项目施工组织设计
评论
0/150
提交评论