Spark编程基础(Scala版第3版)课件 第5-9章 RDD编程-Spark MLli_第1页
Spark编程基础(Scala版第3版)课件 第5-9章 RDD编程-Spark MLli_第2页
Spark编程基础(Scala版第3版)课件 第5-9章 RDD编程-Spark MLli_第3页
Spark编程基础(Scala版第3版)课件 第5-9章 RDD编程-Spark MLli_第4页
Spark编程基础(Scala版第3版)课件 第5-9章 RDD编程-Spark MLli_第5页
已阅读5页,还剩936页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

RDD

编程第5章目

录01RDD编程基础02键值对RDD03数据读写04综合案例RDD

编程基础5.1RDD编程基础RDD创建0102RDD操作03持久化分区04一个综合实例055.1.1RDD创建0102从文件系统中加载数据创建RDD通过并行集合(数组)创建RDD5.1.1RDD创建底层文件系统数据Spark采用textFile()方法来从文件系统中加载数据创建RDD5.1.1RDD创建textFile()RDDSpark的SparkContext通过textFile()读取数据生成内存中的RDDSparkContext5.1.1RDD创建.textFile()方法支持的数据类型AmazonS3等等分布式文件系统HDFS本地文件系统5.1.1RDD创建(1)从本地文件系统中加载数据创建RDDscala>vallines=sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")lines:org.apache.spark.rdd.RDD[String]=file:///usr/local/spark/mycode/rdd/word.txtMapPartitionsRDD[12]attextFileat<console>:275.1.1RDD创建5.1.1RDD创建(2)从分布式文件系统HDFS中加载数据scala>vallines=sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")scala>vallines=sc.textFile("/user/hadoop/word.txt")scala>vallines=sc.textFile("word.txt")三条语句是完全等价的,可以使用其中任意一种方式5.1.1RDD创建通过集合或数组创建RDD5.1.1RDD创建可调用SparkContext的parallelize方法,在Driver中已经存在的集合(数组)上创建scala>valarray=Array(1,2,3,4,5)array:Array[Int]=Array(1,2,3,4,5)scala>valrdd=sc.parallelize(array)rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[13]atparallelizeat<console>:29通过并行集合(数组)创建RDD的实例5.1.1RDD创建通过并行集合(数组)创建RDD:

也可以从列表中创建scala>vallist=List(1,2,3,4,5)list:List[Int]=List(1,2,3,4,5)scala>valrdd=sc.parallelize(list)rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[14]atparallelizeat<console>:295.1.1RDD创建5.1.2RDD操作RDD操作转换操作行动操作惰性机制5.1.2RDD操作转换操作5.1.2RDD操作转换类型操作(Transformation)只记录转换的轨迹发生计算5.1.2RDD操作转换类型操作动作类型操作进行从头到尾的计算5.1.2RDD操作filtermapflatmapgroupByKeyreduceByKeyRDD常用转换操作操作含义filter(func)筛选出满足函数func的元素,并返回一个新的数据集map(func)将每个元素传递到函数func中,并将结果返回为一个新的数据集mapPartitions(func)对RDD中的每个分区的迭代器进行操作,并将结果返回为一个新的数据集flatMap(func)与map()相似,但每个输入元素都可以映射到0或多个输出结果groupBy(func)将数据根据指定的规则进行分组groupByKey()应用于(K,V)键值对的数据集时,返回一个新的(K,Iterable)形式的数据集reduceByKey(func)应用于(K,V)键值对的数据集时,返回一个新的(K,V)形式的数据集,其中每个值是将每个key传递到函数func中进行聚合后的结果5.1.2RDD操作5.1.2RDD操作sortBy(func)根据一定的规则对数据进行排序distinct对RDD内部的元素进行去重,并把去重后的元素放到新的RDD中union对两个RDD进行并集运算,并返回新的RDDintersection对两个RDD进行交集运算,并返回新的RDDsubtract对两个RDD进行差集运算,并返回新的RDDzip把两个RDD中的元素以键值对的形式进行合并5.1.2RDD操作转换操作·filter(func)scala>vallines=sc.textFile(file:///usr/local/spark/mycode/rdd/word.txt)scala>vallinesWithSpark=lines.filter(line=>line.contains("Spark"))5.1.2RDD操作map(func)操作将每个元素传递到函数func中,并将结果返回为一个新的数据集scala>data=Array(1,2,3,4,5)scala>valrdd1=sc.parallelize(data)scala>valrdd2=rdd1.map(x=>x+10)转换操作·map(func)5.1.2RDD操作图map()操作实例执行过程示意图5.1.2RDD操作map(func)操作将每个元素传递到函数func中,并将结果返回为一个新的数据集scala>vallines=sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")scala>valwords=lines.map(line=>line.split(""))转换操作·map(func)5.1.2RDD操作图map()操作实例执行过程示意图5.1.2RDD操作importorg.apache.log4j.PropertyConfiguratorimportorg.apache.spark.{SparkConf,SparkContext}importscala.collection.mutableobjectWordCount{defmain(args:Array[String]):Unit={valconf=newSparkConf().setAppName("SimpleApplication").setMaster("local[*]")valsc=newSparkContext(conf)sc.setLogLevel("error")valrdd1=sc.parallelize(List(1,2,3,4),2)valmpRDD=rdd1.mapPartitions(iter=>{println("=============")iter.map(_*2)})mpRDD.foreach(println)sc.stop()}}转换操作

对RDD中的每个分区的迭代器进行操作·mapPartitions(func)=============68=============24运行结果5.1.2RDD操作scala>vallines=sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")scala>valwords=lines.flatMap(line=>line.split(""))转换操作flatMap(func)

的具体实例如下5.1.2RDD操作图flatMap()操作实例执行过程示意图5.1.2RDD操作valrdd1=sc.parallelize(List(List(1,2),List(3,4)))valflatRDD=rdd1.flatMap(list=>list)flatRDD.foreach(println)转换操作这里再给出flatMap的一个实例上面语句的执行结果如下:1234·flatMap(func)5.1.2RDD操作valrdd1=sc.parallelize(List(List(1,2),3,List(4,5)))valflatRDD=rdd1.flatMap(data=>{datamatch{caselist:List[_]=>listcasevalue=>List(value)}})flatRDD.foreach(println)转换操作RDD中的元素类型可能是不相同的,下面是一个具体实例执行结果如下:12345·flatMap(func)5.1.2RDD操作转换操作groupBy()将数据根据指定的规则进行分组,分区默认不变,但是,数据会被打乱重新组合·groupBy(func)5.1.2RDD操作valrdd1=sc.parallelize(List("Hadoop","Spark","Storm","Flink","Flume"))valgroupRDD=rdd1.groupBy(_.charAt(0))groupRDD.foreach(println)转换操作将List(“Hadoop”,”Spark”,”Storm”,”Flink”,”Flume”)根据单词首写字母进行分组上面语句的执行结果如下:(S,CompactBuffer(Spark,Storm))(H,CompactBuffer(Hadoop))(F,CompactBuffer(Flink,Flume))·groupBy(func)5.1.2RDD操作groupByKey()应用于(K,V)键值对的数据集时,返回一个新的(K,Iterable)形式的数据集groupByKey()操作实例执行过程5.1.2RDD操作reduceByKey(func)应用于(K,V)键值对的数据集时,返回一个新的(K,V)形式的数据集,其中的每个值是将每个key传递到函数func中进行聚合后得到的结果reduceByKey()操作实例5.1.2RDD操作<“spark”,<1,1,1>>words.reduceByKey(a,b)=>a+b5.1.2RDD操作valrdd1=sc.parallelize(List(3,1,2,4),1)valsortRDD=rdd1.sortBy(value=>value)sortRDD.foreach(println)转换操作sortBy()会根据一定的规则对数据进行排序。下面是一个具体实例这里设置分区数量为1,这样可以可以做到对4个数进行全局排序。如果设置分区数量为2,那么就只会对每个分区内部进行局部排序·sortBy(func)5.1.2RDD操作valrdd1=sc.parallelize(List(("b",8),("d",2),("a",5),("c",4)),1)valsortRDD=rdd1.sortBy(t=>t._1)sortRDD.foreach(println)转换操作下面看一个元素类型是元组的实例执行结果如下:·sortBy(func)(a,5)(b,8)(c,4)(d,2)5.1.2RDD操作重复的元素FlinkSparkSparkrdd1.distinct

distinct操作会对RDD内部的元素进行去重,该方法是对map方法及reduceByKey方法的封装5.1.2RDD操作会对两个RDD进行并集运算,并返回新的RDD,整个过程不会对元素进行去重。具体实例如下scala>valrdd1=sc.parallelize(Array(1,2,3))scala>valrdd2=sc.parallelize(Array(3,4,5))scala>valrdd3=rdd1.union(rdd2)union操作5.1.2RDD操作intersection操作会对两个RDD进行交集运算,并返回新的RDD。具体实例如下scala>valrdd1=sc.parallelize(Array(1,2,3))scala>valrdd2=sc.parallelize(Array(3,4,5))scala>valrdd3=ersection(rdd2)union操作5.1.2RDD操作会对两个RDD进行差集运算,并返回新的RDD,整个过程不会对元素去重。具体实例如下scala>valrdd1=sc.parallelize(Array(1,2,3))scala>valrdd2=sc.parallelize(Array(3,4,5))scala>valrdd3=rdd1.subtract(rdd2)subtract

操作5.1.2RDD操作会把两个RDD中的元素以键值对的形式进行合并。需要注意的是,在使用zip操作时,需要确保两个RDD中的元素个数是相同的。具体实例如下scala>valrdd1=sc.parallelize(Array(1,2,3))scala>valrdd2=sc.parallelize(Array("Hadoop","Spark","Flink"))scala>valrdd3=rdd1.zip(rdd2)zip

操作5.1.2RDD操作行动操作Spark程序执行执行真正的计算5.1.2RDD操作操作含义count()返回数据集中的元素个数collect()以数组的形式返回数据集中的所有元素first()返回数据集中的第一个元素take(n)以数组的形式返回数据集中的前n个元素reduce(func)通过函数func(输入两个参数并返回一个值)聚合数据集中的元素foreach(func)将数据集中的每个元素传递到函数func中运行aggregate(zeroValue,seqOp,combOp)通过指定初始值zeroValue、分区内合并函数seqOp和分区间合并函数combOp,允许自定义复杂聚合逻辑,适用于非交换或非关联运算场景5.1.2RDD操作scala>valrdd=sc.parallelize(Array(1,2,3,4,5))rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[1]atparallelizeat<console>:24scala>rdd.count()res0:Long=5scala>rdd.first()res1:Int=1scala>rdd.take(3)res2:Array[Int]=Array(1,2,3)scala>rdd.reduce((a,b)=>a+b)res3:Int=15scala>rdd.collect()res4:Array[Int]=Array(1,2,3,4,5)scala>rdd.foreach(elem=>println(elem))123455.1.2RDD操作valrdd=sc.parallelize(List(1,2,3,4),2)valresult=rdd.aggregate(10)(_+_,_+_)println(result)下面再给出一个实例介绍行动操作aggregate()的具体用法。aggregate()接收两个函数(seqOp和combOp)和一个初始化值zeroValue。seqOp函数用于聚集每一个分区,combOp用于聚集所有分区聚集后的结果。每一个分区的聚集和最后所有分区的聚集,都需要初始化值的参与。下面是一个具体实例。5.1.3惰性机制转换类型操作行动操作进行从头到尾的计算惰性机制scala>vallines=sc.textFile("data.txt")scala>vallineLengths=lines.map(s=>s.length)scala>valtotalLength=lineLengths.reduce((a,b)=>a+b)vallist=List("Hadoop","Spark","Hive")valrdd0=sc.parallelize(list)valrdd1=rdd0.map(word=>{println("@@@@@")(word,1)})println(rdd1.count())println(rdd1.collect().mkString(","))在选代计算中重复使用同一个RDD的例子5.1.4

持久化@@@@@@@@@@@@@@@3@@@@@@@@@@@@@@@(Hadoop,1),(Spark,1),(Hive,1)执行结果如下:vallist=List("Hadoop","Spark","Hive")valrdd0=sc.parallelize(list)valrdd1=rdd0.map(word=>{println("@@@@@")(word,1)})rdd1.cache()println(rdd1.count())println(rdd1.collect().mkString(","))下面是使用持久化以后的代码5.1.4

持久化@@@@@@@@@@@@@@@3(Hadoop,1),(Spark,1),(Hive,1)执行结果如下:scala>vallist=List("Hadoop","Spark","Hive")list:List[String]=List(Hadoop,Spark,Hive)scala>valrdd=sc.parallelize(list)rdd:org.apache.spark.rdd.RDD[String]=ParallelCollectionRDD[22]atparallelizeat<console>:29scala>println(rdd.count())//行动操作,触发一次真正从头到尾的计算3scala>println(rdd.collect().mkString(","))//行动操作,触发一次真正从头到尾的计算Hadoop,Spark,HiveRDD采用惰性求值的机制遇到行动操作,都会从头开始执行计算。多次计算同一个RDD实例如下5.1.4

持久化5.1.4

持久化数据集用户持久化数据集数据集5.1.4

持久化persist()对一个RDD标记为持久化并不会马上计算生成RDD并把它持久化persist()真正持久化行动操作5.1.4

持久化persist()使用cache()方法时,会调用persist(MEMORY_ONLY)cache()可使用unpersist()方法手动地把持久化的RDD从缓存中移除unpersist()表示将RDD作为反序列化的对象存储于JVM中,如果内存不足,就要按照LRU原则替换缓存中的内容persist(MEMORY_ONLY)表示将RDD作为反序列化的对象存储在JVM中,如果内存不足,超出的分区将会被存放在硬盘上persist(MEMORY_AND_DISK)5.1.4

持久化scala>vallist=List("Hadoop","Spark","Hive")list:List[String]=List(Hadoop,Spark,Hive)scala>valrdd=sc.parallelize(list)rdd:org.apache.spark.rdd.RDD[String]=ParallelCollectionRDD[22]atparallelizeat<console>:29scala>rdd.cache()//会调用persist(MEMORY_ONLY),但是,语句执行到这里,并不会缓存rdd,因为这时rdd还没有被计算生成scala>println(rdd.count())//第一次行动操作,触发一次真正从头到尾的计算,这时上面的rdd.cache()才会被执行,把这个rdd放到缓存中3scala>println(rdd.collect().mkString(","))//第二次行动操作,不需要触发从头到尾的计算,只需要重复使用上面缓存中的rddHadoop,Spark,Hive持久化—针对上面的实例,增加持久化语句以后的执行过程如下RDDRDDRDD节点节点节点5.1.5

分区RDDRDDRDDRDD分区增加并行度RDD分区被保存到不同节点上5.1.5

分区减少通信开销未分区时对UserData和Events两个表进行连接操作5.1.5

分区减少通信开销

UserDataUserIdUserInfoUserIdLinkInfoEvents连接未分区时对UserData和Events两个表进行连接操作5.1.5

分区LinkInfoUserIDUserInfo未分区时对UserData和Events两个表进行连接操作做连接时不做分区

userData

um

u3

u2

u1……….几千万用户表非常大一台机器放不下放到多台机器上5.1.5

分区减少通信开销5.1.5

分区5.1.5

分区RDD分区原则分区个数集群中CPU核心数目Mesos模式YARN模式Standalone模式Local模式指

定设置spark.default.parallelism参数默认的分区数目5.1.5

分区若设置了local[N]则默认为NLocal模式spark.default.parallelism5.1.5

分区ApacheMesos模式没有设置默认分区数目为85.1.5

分区YARN模式Standalone模式集群中所有CPU核心数目总和“a”5.1.5

分区>“a”“2”默认值5.1.5

分区>默认值“a”“2”5.1.5

分区5.1.5

分区

设置分区的个数—创建RDD时手动指定分区个数在调用textFile()和parallelize()方法时手动指定分区个数即可,语法格式如下sc.textFile(path,partitionNum)指定要加载的文件的地址参数用于指定分区个数path参数partitionNum参数scala>valarray=Array(1,2,3,4,5)scala>valrdd=sc.parallelize(array,2)//设置两个分区5.1.5

分区

设置分区的个数—使用reparititon方法重新设置分区个数通过转换操作得到新RDD时,直接调用repartition方法即可。例如:scala>valdata=sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt",2)data:org.apache.spark.rdd.RDD[String]=file:///usr/local/spark/mycode/rdd/word.txtMapPartitionsRDD[12]attextFileat<console>:24scala>data.partitions.size//显示data这个RDD的分区数量res2:Int=2scala>valrdd=data.repartition(1)//对data这个RDD进行重新分区rdd:org.apache.spark.rdd.RDD[String]=MapPartitionsRDD[11]atrepartitionat:26scala>rdd.partitions.sizeres4:Int=15.1.5

分区HashPartitioner(哈希分区)RangePartitioner(区域分区)自定义分区5.1.5

分区numPartitions:Int返回创建出来的分区数getPartition(key:Any):Int返回给定键的分区编号(0到numPartitions-1)equals()Java判断相等性的标准方法继承org.apache.spark.Partitioner自定义分区方法定义Partitioner类5.1.5

分区10写入到part-0000011写入到part-0000119写入到part-00009……根据key值的最后一位数字,写到不同的文件5.1.5

分区importorg.apache.spark.{Partitioner,SparkContext,SparkConf}//自定义分区类,需要继承org.apache.spark.Partitioner类classMyPartitioner(numParts:Int)extendsPartitioner{//覆盖分区数overridedefnumPartitions:Int=numParts//覆盖分区号获取函数overridedefgetPartition(key:Any):Int={key.toString.toInt%10}}5.1.5

分区objectTestPartitioner{defmain(args:Array[String]){valconf=newSparkConf()valsc=newSparkContext(conf)//模拟5个分区的数据valdata=sc.parallelize(1to10,5)//根据尾号转变为10个分区,分别写到10个文件data.map((_,1)).partitionBy(newMyPartitioner(10)).map(_._1).saveAsTextFile("file:///usr/local/spark/mycode/rdd/partitioner")}}5.1.6

一个综合实例多个单词构成word.txt文本文本文本文本如何进行词频统计?5.1.6

一个综合实例scala>vallines=sc.//代码一行放不下,可以在圆点后回车,在下行继续输入|textFile("file:///usr/local/spark/mycode/wordcount/word.txt")scala>valwordCount=lines.flatMap(line=>line.split("")).|map(word=>(word,1)).reduceByKey((a,b)=>a+b)scala>wordCount.collect()scala>wordCount.foreach(println)5.1.6

一个综合实例5.1.6

一个综合实例图

在一个集群中同时部署Hadoop和Spark5.1.6

一个综合实例图

在集群中执行词频统计过程示意图键值对RDD5.2键值对RDD提纲常用的键值对RDD转换操作一个综合实例键值对RDD的创建1235.2.1键值对RDD的创建

(1)第一种创建方式:从文件中加载,可采用map()函数创建PairRDDscala>vallines=sc.textFile("file:///usr/local/spark/mycode/pairrdd/word.txt")lines:org.apache.spark.rdd.RDD[String]=file:///usr/local/spark/mycode/pairrdd/word.txtMapPartitionsRDD[1]attextFileat<console>:27scala>valpairRDD=lines.flatMap(line=>line.split("")).map(word=>(word,1))pairRDD:org.apache.spark.rdd.RDD[(String,Int)]=MapPartitionsRDD[3]atmapat<console>:29scala>pairRDD.foreach(println)(i,1)(love,1)(hadoop,1)……5.2.1键值对RDD的创建

(2)第二种创建方式:通过并行集合(数组)创建RDDscala>vallist=List("Hadoop","Spark","Hive","Spark")list:List[String]=List(Hadoop,Spark,Hive,Spark)

scala>valrdd=sc.parallelize(list)rdd:org.apache.spark.rdd.RDD[String]=ParallelCollectionRDD[11]atparallelizeat<console>:29

scala>valpairRDD=rdd.map(word=>(word,1))pairRDD:org.apache.spark.rdd.RDD[(String,Int)]=MapPartitionsRDD[12]atmapat<console>:31

scala>pairRDD.foreach(println)(Hadoop,1)(Spark,1)(Hive,1)(Spark,1)5.2.2常用的键值对RDD转换操作

joinreduceByKey(func)groupByKey()

keysvaluessortByKey()mapValues(func)combineByKey常用的键值对RDD转换操作reduceByKey(func)使用func函数合并具有相同键的值功能5.2.2常用的键值对RDD转换操作scala>pairRDD.reduceByKey((a,b)=>a+b).foreach(println)(Spark,2)(Hadoop,1)(Hive,1)(Hadoop,1)(Spark,1)(Hive,1)(Spark,1)

(reduceByKey(func)5.2.2常用的键值对RDD转换操作5.2.2常用的键值对RDD转换操作功能groupByKey()对具有相同键的值进行分组5.2.2常用的键值对RDD转换操作5.2.1键值对RDD的创建groupByKey()("spark",1)("spark",2)("hadoop",3)("hadoop",5)(“spark”,(1,2))(“hadoop",(3,5))groupByKey()

groupByKey()(Hadoop,1)(Spark,1)(Hive,1)(Spark,1)scala>pairRDD.groupByKey()res15:org.apache.spark.rdd.RDD[(String,Iterable[Int])]=ShuffledRDD[15]atgroupByKeyat<console>:345.2.2常用的键值对RDD转换操作5.2.2常用的键值对RDD转换操作只做分组更进一步reduceByKey和groupByKey的区别groupByKeyreduceByKey(key,value-list)不会进行汇总求和value-list进行汇总求和5.2.2常用的键值对RDD转换操作

reduceByKey和groupByKey的区别,具体实例如下scala>val

words

=

Array("one",

"two",

"two",

"three",

"three",

"three")

scala>val

wordPairsRDD

=

sc.parallelize(words).map(word

=>

(word,

1))

scala>val

wordCountsWithReduce

=

wordPairsRDD.reduceByKey(_

+

_)

scala>val

wordCountsWithGroup

=

wordPairsRDD.groupByKey().map(t

=>

(t._1,

t._2.sum))

得到的wordCountsWithReduce和wordCountsWithGroup是一样的,但内部运算过程不同

keys只会把PairRDD中的key返回形成一个新的RDD(Hadoop,1)(Spark,1)(Hive,1)(Spark,1)scala>pairRDD.keysres17:org.apache.spark.rdd.RDD[String]=MapPartitionsRDD[17]atkeysat<console>:34scala>pairRDD.keys.foreach(println)HadoopSparkHiveSpark5.2.2常用的键值对RDD转换操作

values只会把PairRDD中的value返回形成一个新的RDD(Hadoop,1)(Spark,1)(Hive,1)(Spark,1)scala>pairRDD.valuesres0:org.apache.spark.rdd.RDD[Int]=MapPartitionsRDD[2]atvaluesat<console>:34

scala>pairRDD.values.foreach(println)11115.2.2常用的键值对RDD转换操作

sortByKey()的功能是返回一个根据键排序的RDD(Hadoop,1)(Spark,1)(Hive,1)(Spark,1)scala>pairRDD.sortByKey()res0:org.apache.spark.rdd.RDD[(String,Int)]=ShuffledRDD[2]atsortByKeyat<console>:34scala>pairRDD.sortByKey().foreach(println)(Hadoop,1)(Hive,1)(Spark,1)(Spark,1)5.2.2常用的键值对RDD转换操作5.2.2常用的键值对RDD转换操作

sortByKey()

sortBy()具体实例

scala>vald1=sc.parallelize(Array((“c",8),(“b“,25),(“c“,17),(“a“,42),(“b“,4),(“d“,9),(“e“,17),(“c“,2),(“f“,29),(“g“,21),(“b“,9)))scala>d1.reduceByKey(_+_).sortByKey(false).collectres2:Array[(String,Int)]=Array((g,21),(f,29),(e,17),(d,9),(c,27),(b,38),(a,42))scala>vald1=sc.parallelize(Array((“c",8),(“b“,25),(“c“,17),(“a“,42),(“b“,4),(“d“,9),(“e“,17),(“c“,2),(“f“,29),(“g“,21),(“b“,9)))scala>d1.reduceByKey(_+_).sortByKey(false).collectres2:Array[(String,Int)]=Array((g,21),(f,29),(e,17),(d,9),(c,27),(b,38),(a,42))

mapValues(func)对键值对RDD中每个value都应用一个函数(Hadoop,1)(Spark,1)(Hive,1)(Spark,1)scala>pairRDD.mapValues(x=>x+1)res2:org.apache.spark.rdd.RDD[(String,Int)]=MapPartitionsRDD[4]atmapValuesat<console>:34scala>pairRDD.mapValues(x=>x+1).foreach(println)(Hadoop,2)(Spark,2)(Hive,2)(Spark,2)5.2.2常用的键值对RDD转换操作5.2.2常用的键值对RDD转换操作

join就表示内连接,只有在两个数据集中都存在的key才会被输出scala>valpairRDD1=sc.parallelize(Array(("spark",1),("spark",2),("hadoop",3),("hadoop",5)))pairRDD1:org.apache.spark.rdd.RDD[(String,Int)]=ParallelCollectionRDD[24]atparallelizeat<console>:27

scala>valpairRDD2=sc.parallelize(Array(("spark","fast")))pairRDD2:org.apache.spark.rdd.RDD[(String,String)]=ParallelCollectionRDD[25]atparallelizeat<console>:27

scala>pairRDD1.join(pairRDD2)res9:org.apache.spark.rdd.RDD[(String,(Int,String))]=MapPartitionsRDD[28]atjoinat<console>:32

scala>pairRDD1.join(pairRDD2).foreach(println)(spark,(1,fast))(spark,(2,fast))

5.2.2常用的键值对RDD转换操作mergeValuecreateCombinermergeCombinerspartitionermapSideCombinecombineByKey5.2.2常用的键值对RDD转换操作mergeValuecreateCombinermergeCombinerspartitionermapSideCombine第一次遇到key时创建组合器函数,将RDD数据集中V类型值转换成C类型值(V=>C)合并值函数,再次遇到相同的Key时,将createCombiner的C类型值与这次传入的V类型值合并成一个C类型值(C,V)=>C合并组合器函数,将C类型值两两合并成一个C类型值使用已有的或自定义的分区函数,默认是HashPartitioner是否在map端进行Combine操作,默认为true5.2.2常用的键值对RDD转换操作下面通过一个实例来解释如何使用combineByKey操作。假设有一些销售数据,数据采用键值对的形式,即<公司,当月收入>,要求使用combineByKey操作求出每个公司的总收入和每月平均收入,并保存在本地文件中5.2.2常用的键值对RDD转换操作importorg.apache.spark.SparkContextimportorg.apache.spark.SparkConfobjectCombine{defmain(args:Array[String]){valconf=newSparkConf().setAppName("Combine").setMaster("local")valsc=newSparkContext(conf)valdata=sc.parallelize(Array(("company-1",88),("company-1",96),("company-1",85),("company-2",94),("company-2",86),("company-2",74),("company-3",86),("company-3",88),("company-3",92)),3)valres=bineByKey((income)=>(income,1),(acc:(Int,Int),income)=>(acc._1+income,acc._2+1),(acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2)).map({case(key,value)=>(key,value._1,value._1/value._2.toFloat)})res.repartition(1).saveAsTextFile("file:///usr/local/spark/mycode/rdd/result")}}5.2.2常用的键值对RDD转换操作Combine.scala代码中combineByKey()的参数值参数名称参数值createCombiner(income)=>(income,1)mergeValue(acc:(Int,Int),income)=>(acc._1+income,acc._2+1)mergeCombiners(acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2)5.2.2常用的键值对RDD转换操作01设置聚合时的初始值。需要注意的是,初始值并非总是数字,有时候可能是集合zeroValue03跨分区聚合,对数据进行最终的汇总时调用此操作combOp02将值V聚合到类型为U的对象中seqOpaggregateByKey5.2.2常用的键值对RDD转换操作

aggregateByKey

具体实例如下scala>valrdd1=sc.parallelize(Array(("USER1","URL1"),("USER2","URL1"),("USER1","URL1"),("USER1","URL2"),("USER2","URL3")))scala>valrdd2=rdd1.aggregateByKey(collection.mutable.Set[String]())(|(urlSet,url)=>urlSet+url,|(urlSet1,urlSet2)=>urlSet1++=urlSet2)scala>rdd2.collectres12:Array[(String,scala.collection.mutable.Set[String])]=Array((USER1,Set(URL1,URL2)),(USER2,Set(URL1,URL3)))5.2.2常用的键值对RDD转换操作

下面给出一个实例来演示flatMapValues与mapValues的区别rdd1=sc.parallelize(Array(("file1","storm/hadoop/spark/flink"),("file1","hbase/hdfs/spark/flink"),("file2","zookeeper/flink/hadoop/hive"),("file2","flink/hive/flume")))scala>valrdd2=rdd1.flatMapValues(_.split("/"))scala>rdd2.collectres0:Array[(String,String)]=Array((file1,storm),(file1,hadoop),(file1,spark),(file1,flink),(file1,hbase),(file1,hdfs),(file1,spark),(file1,flink),(file2,zookeeper),(file2,flink),(file2,hadoop),(file2,hive),(file2,flink),(file2,hive),(file2,flume))scala>valrdd3=rdd1.mapValues(_.split("/"))scala>rdd3.collectres1:Array[(String,Array[String])]=Array((file1,Array(storm,hadoop,spark,flink)),(file1,Array(hbase,hdfs,spark,flink)),(file2,Array(zookeeper,flink,hadoop,hive)),(file2,Array(flink,hive,flume)))5.2.2常用的键值对RDD转换操作flatMapValues(func)("spark",6)("hadoop",6)("hadoop",4)("spark",2)5.2.3一个综合实例键值对

一个综合实例—计算每种图书的每天平均销量scala>valrdd=sc.parallelize(Array(("spark",2),("hadoop",6),("hadoop",4),("spark",6)))rdd:org.apache.spark.rdd.RDD[(String,Int)]=ParallelCollectionRDD[38]atparallelizeat<console>:27

scala>rdd.mapValues(x=>(x,1)).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2)).mapValues(x=>(x._1/x._2)).collect()res22:Array[(String,Int)]=Array((spark,4),(hadoop,5))5.2.3一个综合实例5.2.3一个综合实例计算图书平均销量过程示意图

一个综合实例—计算每种图书的每天平均销量scala>valrdd=sc.parallelize(Array(("spark",2),("hadoop",6),("hadoop",4),("spark",6)))rdd:org.apache.spark.rdd.RDD[(String,Int)]=ParallelCollectionRDD[38]atparallelizeat<console>:27

scala>rdd.mapValues(x=>(x,1)).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2)).mapValues(x=>(x._1/x._2)).collect()res22:Array[(String,Int)]=Array((spark,4),(hadoop,5))5.2.3一个综合实例数据读写5.3数据读写本地文件系统的数据读写分布式文件系统HDFS的数据读写读写MySQL数据库5.3.1本地文件系统的数据读写020103从文件中读取数据创建RDD把RDD写入到文本文件中JSON文件的读取(1)从文件中读取数据创建RDD具体实例

scala>valtextFile=sc.|textFile("file:///usr/local/spark/mycode/wordcount/word.txt")

因为Spark采用了惰性机制,即使输入错误语句,spark-shell也不会马上报错

scala>valtextFile=sc.|textFile("file:///usr/local/spark/mycode/wordcount/word123.txt")5.3.1本地文件系统的数据读写

(2)把RDD写入到文本文件中具体实例

scala>valtextFile=sc.|textFile("file:///usr/local/spark/mycode/wordcount/word.txt")scala>textFile.|saveAsTextFile("file:///usr/local/spark/mycode/wordcount/writeback")$cd/usr/local/spark/mycode/wordcount/writeback/$lspart-00000__SUCCESS5.3.1本地文件系统的数据读写5.3.2HDFS的数据读写

如下三条语句都是等价的

scala>valtextFile=sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")scala>textFile.first()

从分布式文件系统HDFS中读取数据,也是采用textFile()方法

scala>valtextFile=sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")scala>valtextFile=sc.textFile("/user/hadoop/word.txt")scala>valtextFile=sc.textFile("word.txt")可以使用saveAsTextFile()方法把RDD中的数据保存到HDFS文件中scala>textFile.saveAsTextFile("writeback")5.3.3读写MySQL数据库

在MySQLShell环境中,输入下面SQL语句完成数据库和表的创建

$servicemysqlstart$mysql-uroot-p#屏幕会提示输入密码

安装成功以后,在Linux中启动MySQL数据库,命令如下

mysql>createdatabasespark;mysql>usespark;mysql>createtablestudent(idint(4),namechar(20),genderchar(4),ageint(4));mysql>insertintostudentvalues(1,'Xueqian','F',23);mysql>insertintostudentvalues(2,'Weiliang','M',24);mysql>select*fromstudent;5.3.3读写MySQL数据库

2.从MySQL数据库读取数据:执行如下命令新建一个代码文件ReadMySQL.scala

1.

准备工作:下载MySQL数据库驱动程序文件mysql-connector-java-8.0.30.jar,并把这个文件存放到Spark的安装目录“/usr/local/spark/jars”下$cd~/sparkapp/src/main/scala#假设该目录已经存在$vimReadMySQL.scala5.3.3读写MySQL数据库importjava.sql.DriverManagerimportorg.apache.spark.rdd.JdbcRDDimportorg.apache.spark.{SparkConf,SparkContext}objectReadMySQL{defmain(args:Array[String]){valconf=newSparkConf().setAppName("ReadMySQL").setMaster("local[2]")valsc=newSparkContext(conf)sc.setLogLevel("ERROR")valinputMySQL=newJdbcRDD(sc,()=>{Class.forName("com.mysql.jdbc.Driver")DriverManager.getConnection("jdbc:mysql://localhost:3306/spark?useUnicode=true&characterEncoding=utf8","root","123456")//root是数据库用户名,123456是密码},"SELECT*FROMstudentwhereid>=?andid<=?;",1,//设置条件查询中id的下界2,//设置条件查询中id的上界1,//设置分区数r=>(r.getInt(1),r.getString(2),r.getString(3),r.getInt(4)))inputMySQL.foreach(println)sc.stop()}}5.3.3读写MySQL数据库

执行如下命令对代码进行编译打包name:="SimpleProject"version:="1.0"scalaVersion:="2.12.18"libraryDependencies+="org.apache.spark"%%"spark-core"%"3.5.6"

在sparkapp目录下新建simple.sbt文件并输入以下内容

$cd~/sparkapp$/usr/local/sbt/sbtpackage5.3.3读写MySQL数据库

执行如下命令对代码进行编译打包$cd~/sparkapp$/usr/local/spark/bin/spark-submit\>--class"ReadMySQL"\>./target/scala-2.12/simple-project_2.12-1.0.jar

然后执行如下命令运行程序

(1,Xueqian,F,23)(2,Weiliang,M,24)5.3.3读写MySQL数据库importjava.sql.DriverManagerimportorg.apache.spark.rdd.JdbcRDDimportorg.apache.spark.{SparkConf,SparkContext}objectWriteMySQL{defmain(args:Array[String]){valconf=newSparkConf().setAppName("WriteMySQL").setMaster("local[2]")valsc=newSparkContext(conf)sc.setLogLevel("ERROR")Class.forName("com.mysql.jdbc.Driver")valrddData=sc.parallelize(List((3,"Rongcheng","M",26),(4,"Guanhua","M",27)))

3.

向MySQL数据库写入数据

5.3.3读写MySQL数据库

rddData.foreachPartition((iter:Iterator[(Int,String,String,Int)])=>{valconn=DriverManager.getConnection("jdbc:mysql://localhost:3306/spark?useUnicode=true&characterEncoding=utf8","root","123456")conn.setAutoCommit(false)valpreparedStatement=conn.prepareStatement("INSERTINTOstudent(id,name,gender,age)VALUES(?,?,?,?)")iter.foreach(t=>{preparedStatement.setInt(1,t._1)preparedStatement.setString(2,t._2)preparedStatement.setString(3,t._3)preparedStatement.setInt(4,t._4)preparedStatement.addBatch()})preparedStatement.executeBatch()mit()conn.close()})sc.stop()}}$/usr/local/spark/bin/spark-submit\>--class"WriteMySQL"\>./target/scala-2.12/simple-project_2.12-1.0.jar

对代码进行编译打包,然后执行如下命令运行程序

5.3.3读写MySQL数据库mysql>select*fromstudent;+------+-----------+--------+------+|id|name|gender|age|+------+-----------+--------+------+|1|Xueqian|F|23||2|Weiliang|M|24||3|Rongcheng|M|26||4|Guanhua|M|27|+------+-----------+--------+------+4rowsinset(0.00sec)(2,Weiliang,M,24)

可以到MySQLShell环境中使用SQL语句查询student表5.3.3读写MySQL数据库5.4

综合案例提纲12文件排序求TOP值二次排序35.4.1求TOP值——实例1ordereiduseridpaymentproductid1,1768,50,1552,1218,600,2113,2239,788,2424,3101,28,5995,4899,290,1296,3110,54,12017,4436,259,8778,2369,7890,27100,4287,226,233101,6562,489,124102,1124,33,17103,3267,159,179104,4569,57,125105,1438,37,116file1.txt

file2.txt5.4.1求TOP值——实例1求TopN个payme

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论