《Spark大数据分析与实战》课件项目三 Spark RDD分析交通违章记录_第1页
《Spark大数据分析与实战》课件项目三 Spark RDD分析交通违章记录_第2页
《Spark大数据分析与实战》课件项目三 Spark RDD分析交通违章记录_第3页
《Spark大数据分析与实战》课件项目三 Spark RDD分析交通违章记录_第4页
《Spark大数据分析与实战》课件项目三 Spark RDD分析交通违章记录_第5页
已阅读5页,还剩84页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

BigDataAnalyticswithSpark项目三SparkRDD分析交通违章记录项目概述当前,机动车保有量持续上升,机动车成为居民出行、物流运输的主力军;某市部署了数百组交通监控设备,用于采集本辖区内的各类交通违法行为。抽取部分数据得到3张表格:违章记录表用于记录违章详情,包括违章日期、监控设备编号、车牌号、违法类型代码;车主信息表记录车主信息,包括车牌号、车主姓名、手机号;违章条目对照表记录交通违法条目信息,包括违法类型代码、扣分数、罚款金额、违法信息名称。现要求使用SparkRDD完成交通违法数据分析,为相关部门提供各类有用信息。数据样式如下所示:项目效果使用SparkRDD,对交通违法数据分析发现,可以得到如下有价值的信息:(1)单次扣分最多的交通违法行为(违章代码、扣分数、罚款金额、违章名称)(2)某车辆的3次违章记录(日期、监控设备号、车牌号、违章代码)(3)累计扣12分以上的车主信息(车牌号、车主姓名、手机号)目录任务1根据交通违章数据创建RDD找出扣分最高的交通违法条目查找某车辆的违章记录查找违章次数3次以上车辆任务2任务3任务4查找累计扣12分以上车辆信息任务5任务6各类文件的读写操作思维导图根据交通违章数据

创建RDD任务1RDD的概念以及SparkRDD创建方法。弹性分布式数据集(ResilientDistributedDataset,简称RDD)是Spark中最基本的数据抽象,它代表一个不可变、可分区、所含元素可并行计算的集合。、RDD具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性。作为一个容错且可以执行并行操作的元素的集合,SparkRDD屏蔽了复杂的底层分布式计算,为用户提供了一组方便的数据转换与求值方法。Spark中的计算过程可以简单抽象为对RDD的创建、转换和行动(返回操作结果)的过程,如下所示。1.1认识RDDmakeRDD:可以通过调用SparkContext的textFile()方法来读取文件(本地文件或HDFS文件等)并创建一个RDD,也可以对输入数据集合通过调用SparkContext的parallelize()方法来创建一个RDD。RDD被创建后不可被改变,只可以对RDD执行Transformation及Action操作。Transformation(转换)操作:对已有的RDD中的数据执行计算进行转换,并产生新的RDD,在这个过程中有时会产生中间RDD。Spark对于Transformation采用惰性计算机制,即在Transformation过程并不会立即计算结果,而是在Action才会执行计算过程。如map、filter、groupByKey、cache等方法,只执行Transformation操作,而不计算结果。Action(行动)操作:对已有的RDD中的数据执行计算产生结果,将结果返回Driver程序或写入到外部物理存储(如HDFS)。如reduce、collect、count、saveAsTextFile等方法,会对RDD中的数据执行计算。1.1认识RDD使用SparkRDD技术进行分布式数据处理,首先要创建RDD;RDD的创建方法主要有三类:(1)从Seq集合中创建RDD;(2)从外部存储创建RDD;(3)从其他RDD创建。从集合中创建RDD,Spark主要提供了两种方法:parallelize和makeRDD;这两种方法都是利用内存中已有的数据,复制集合中的元素后创建一个可用于并行计算的分布式数据集RDD。1.2从Seq集合创建RDD1.parallelize方法parallelize方式适用于做简单的spark测试、Spark学习,其定义下所示。1.2从Seq集合创建RDD它有两个输入参数:Seq集合、分区(Partitions)数量;一个RDD在物理上可以被切分为多个Partition(即数据分区),这些Partition可以分布在不同的节点上。Partition是Spark计算任务的基本处理单位(每一个分区,都会被一个Task任务处理),有多少个分区就会有多少个任务,因此分区决定了并行计算的粒度。具体用法示例如下所示。1.2从Seq集合创建RDDSpark-shell本身就是一个Driver(驱动节点),它会初始化一个SparkContext对象sc作为Spark程序的入口,用户可以直接调用;如下图,由sc调用parallelize(people,3)方法,创建了一个含有3个分区的RDD。一般情况下,需要为RDD设置合理的分区数量;如果partition数量太少,则直接影响是计算资源不能被充分利用。例如分配8个CPU内核,但partition数量为4,则将有一半的核没有利用到。如果partition数量太多,计算资源能够充分利用,但会导致task数量过多,而task数量过多也会影响执行效率。根据SparkRDDProgrammingGuide上的建议,集群节点的每个核分配2-4个partitions比较合理。1.2从Seq集合创建RDD2.makeRDDR方法makeRDD的使用方法跟parallelize类似,也有两个参数Seq集合、分区数量,如下所示;其具体用法如下所示。1.2从Seq集合创建RDD实践中,常需由数据文件创建RDD;SparkContext采用textFile()方法来从文件系统中加载数据创建RDD,该方法以文件的URI作为参数;该方法支持多种数据源,这个URI可以是本地文件系统的地址,也可以是分布式文件系统HDFS的地址,或者是AmazonS3的地址等。(1)从Linux本地文件创建RDD由文件创建RDD,采用sc.textFile(“文件路径”)方式,路径前面需要加入“file://”以表示本地文件。在IntelliJIDEA开发环境中可以直接读取本地文件,但在Spark-shell环境下,要求所有节点的相同位置均保存该文件。下图所示由本地文件“/home/hadoop/textfile.txt”创建RDD,并计算textfile.txt的行数(即RDD的元素数);1.3从外部存储创建RDD由下图所示,文件myfile.txt的每一行都变成了textFileRDD的一个元素。1.3从外部存储创建RDD(2)从HadoopHDFS文件创建RDD从HDFS文件创建RDD是大数据开发中最为常用的方法,直接通过textFile读取HDFS文件位置即可。为了演示该方法,使用如下命令将本地文件myfile.txt文件上传到hdfs文件系统中。接下来由hdfs文件myfile创建RDD,如图下图所示:1.3从外部存储创建RDDcd/usr/local/hadoop/sbin./start-all.sh#启动hdfs服务cd/usr/local/hadoop/bin./hdfsdfs-put/home/hadoop/myfile.txt/user/hadoop/#上传到hdfs文件系统(1)如果使用了本地文件系统的路径,必须保证所有的工作节点在相同的路径下能够访问该文件,可以将文件复制到所有工作节点的相同目录下,或者也可以使用网络挂载共享文件系统。(2)textFile()方法的输入参数,可以是文件名,也可以是目录,也可以是压缩文件等。比如,textFile(“/my/directory”),textFile(“/my/directory/*.txt”),andtextFile(“/my/directory/*.gz”).(3)textFile()方法也可以接受第2个输入参数(可选),用来指定分区的数目。默认情况下,Spark会为HDFS的每个数据块(block,每个block大小默认值为128MB)创建一个分区,用户也可以提供一个比block数量更大的值作为分区数目。1.3从外部存储创建RDD实际业务中交通违章数据量较大,适合由HDFS文件创建RDD;这里首先将交通违法数据文件上传到HDFS文件系统的/user/hadoop目录下。然后,分别读取HDFS上的records.txt、owner.txt、violation.txt三个文件,创建RDD,如下图所示。1.4交通违法数据文件创建RDD./hdfsdfs-mkdir-ptraffic./hdfsdfs-put/home/hadoop/records.txt/user/hadoop/traffic./hdfsdfs-put/home/hadoop/owner.txt/user/hadoop/traffic./hdfsdfs-put/home/hadoop/violation.txt/user/hadoop/traffic找出扣分最高的

交通违法条目任务2使用RDD相关操作对违章数据进行分析,找出单项扣分最多的交通违法条目。map操作是最常用的转换操作,该操作将RDD中的每个元素传入自定义函数后获取一个新的元素,然后用新的元素组成新的RDD。其目的是将现有的RDD中的每一个元素通过某种函数进行转换并返回一个新的RDD。因为RDD是不可变的数据集(val),所以要对其数据做任何改变,必然产生一个新的RDD。2.1map操作上述代码第一行“valdata=List(1,2,3,4,5,6)”,创建了一个包含6个Int类型元素的数组data;第二行“valdataRDD=sc.parallelize(data)”,由列表data生成数据集dataRDD,它包含6个Int类型元素(1、2、3、4、5、6);第三行“valnewDataRDD=dataRDD.map(x=>x*2)”,逐个取出dataData的元素,并将其交匿名函数“x=>x*2”处理,返回的结果作为一个元素放到新RDD(即newDataRDD)中。最终生成newDataRDD它同样包含6个元素,分别为2、4、6、8、10、12;执行过程如下所示。RDD的map算子与Scala集合的map操作非常相似,但Scala的map是处理单机数据的,而RDD的map算子是处理分布式数据的。2.1map操作collect是一个行动操作,在测试代码时经常使用该算子查看RDD内元素的值;collect操作把RDD的所有元素转换成数组并返回给Driver端,适合小规模数据处理后的返回。该过程需要从集群各个节点收集数据,而后经过网络传输,并加载到Driver内测中;因此如数据量过大,将会给网络传输带来压力,同时可能带来内存溢出风险。2.2使用collect查看元素take方法与collect方法原理类似,collect方法返回RDD中所有元素,而take方法则返回指定的前N个数据。2.3使用take(N)方法查询RDD中的N个值first操作用于返回RDD的第一个元素值,与take(1)返回结果相同,但返回的数据类型为元素类型。2.4使用first操作得到第一个元素值flatMap与map操作类似,它也是一个转换操作;map是将函数用于RDD中的每个元素,将返回值构成新的RDD;而flatmap是将函数应用于RDD中的每个元素,将返回的迭代器的所有内容构成新的RDD,它常用于分割字符串、切分单词。下图示例展示了二者的区别:如果使用map操作,则wordArrayRDD的元素为Array[String];而使用flatMap操作,则wordArrayRDD的元素为String。2.5

flatMap操作flatMap操作执行如下图-21所示,可以将该过程看做两个步骤,首先对textRDD元素(字符串类型)进行切分,每个字符串变为一个数组(由若干单词组成);然后对得到的两个数组进行“flat拍扁”,将两个Array的元素“拍扁”到一起后组成新的RDD。2.5

flatMap操作sortBy是对RDD元素进行排序,并返回排好序的新RDD的转换操作,其定义如下所示;它有3个参数:参数1:f:(T)⇒K,左边为要排序的RDD的每一个元素,右边返回要进行排序的值。参数2:ascending(可选项),升序或降序排列标识,默认为true、升序排列,若要降序排列则需写false。参数3:numPartitions(可选项),排序后新RDD的分区数量,默认分区数量与原RDD相同。2.6使用sortBy操作对RDD的元素进行排序2.6使用sortBy操作对RDD的元素进行排序sortBy方法的使用方法如下图,dataRDD.sortBy(x=>x,false)是将dataRDD的元素按照元素值降序排列,从而的到一个新RDD(newDataRDD);peopleRDD.sortBy(x=>x._2,true,2)是按照二元组(peopleRDD元素为二元组)的第二个元素的值进行升序排列,返回一个新的RDD(newPeopleRDD),且这个新的RDD分区数为2。

文件“violation.txt”为违章条目对照表,内含违章代码、扣分数、罚款金额、违法内容等,样式如下所示。(1)首先由violation.txt文件生成RDD;要想获取扣分最高的交通违法条目,需要对RDD的每一个元素进行字符串切割:按照tab(“/t”)分割为违章代码、扣分数、罚款金额、违法内容4项,将其存储为四元组。要找出扣分最多的条目Top3,需要将“扣分数”转换为Int类型(字符串切割后,数据类型仍为字符串,可以使用toInt方法进行强制转换)。2.7

找出扣分最高的交通违法条目Top3(2)接下来使用sortBy方法进行排序,排序位置是元祖的第2位的“扣分数”,得到排序后的RDD(sort_violation),如下所示:(3)对于排序后的RDD(sort_violation),使用take方法获取扣分最高的三种交通违法行为;如下所示,X03、X09、X12三种行为扣分最多。2.7

找出扣分最高的交通违法条目Top3查找某车辆的

违章记录任务3records.txt文件记录了本市车辆违章信息(包括:日期、监控设备编号、车牌号、违章类型代码)。recordsCityB.txt记录相邻的B城市车辆违章信息(包括:日期、监控设备编号、车牌号、违章类型代码)。根据有关部门需要,需要找出某车辆(车牌号AK0803)在两个城市的所有违章记录。filter是一个转换操作,可用于筛选出满足特定条件元素,返回一个新的RDD;该RDD由经过f函数计算后值为true的输入元素组成,即返回符合条件的所有元素构成。下图演示filter操作过滤出dataRDD中的偶数元素,并返回一个新RDD(newDataRDD)。3.1

filter操作过滤RDD的元素下图演示filter操作过滤出含有“Spark”字符串的元素后,通过collect操作检查结果。3.1

filter操作过滤RDD的元素distinct是一个转换操作,用于RDD的元素去重(去除重复的元素后,返回一个新RDD)。如下图所示,创建一个带有重复元素的RDD(dataRDD),使用distinct方法去重后,得到一个不含重复元素的新RDD(newDataRDD)。3.2distinct方法进行元素去重union是一个转换操作,可将两个RDD的元素合并为一个新RDD,但该操作不进行去重;如下图所示,dataRDD1、dataRDD2通过union操作得到一个新的RDD(该RDD有重复元素“3”)。需要注意的是,要合并的两个RDD,其结构(元素的类型、元素值得数目)必须相同,否则报错。如下图所示,RDD2元素为(String,Int,Int)类型的三元组,两个RDD结构不同。3.3union方法进行RDD合并intersection是两个RDD求交集后返回一个新的RDD,即intersection返回两个RDD的共同元素。如下图所示,data1、data2有共同元素6、8,经过intersection操作后返回一个新RDD(interRDD,含有元素6、8);people1、people2有共同元素(“tom”,20),经过intersection操作后返回一个新RDD,其元素为(“tom”,20)。3.4intersection方法求两个RDD的共同元素subtract的参数为一个RDD,用于返回不在参数RDD中的元素,可以看做集合的求补操作;如下图所示,对于rdd1.subtract(rdd2),返回rdd1中除去与rdd2相同元素后剩余元素组成的RDD;注意,其结果与rdd2.subtract(rdd1)不一样。3.5

subtract求两个RDD的补cartesian用于求两个RDD的笛卡尔积,将两个集合元素组合成一个新的RDD。如下图所示,对于rdd1元素为1、3、5、7,rdd2元素为apple、orange、banana,rdd1.cartesian(rdd2)返回的新RDD共有12个元素。3.6cartesian求两个RDD的笛卡尔积按照要求,需要查找车辆AK0803在本市及临市B的交通违章记录,因此首先由records.txt(本市违章记录)、recordsCityB.txt(临市B违章记录)生成RDD;分析发现,两个RDD包含的信息结构相同(均为日期+监控设备编号+车牌号+违章类型码,均以“\t”作为分隔符),因此可以将上述两个RDD合并(union操作)一个RDD,如下图所示。3.7查找车辆AK0803的违章记录合并后的新RDD元素为字符串(例如“2020-1-05A301CZ8463X04”),接下来对其元素进行字符串切分;应用filter方法,过滤出车牌号为“AK0803”的违章记录,可以发现该车辆违章记录共有3条,如下所示。3.7查找车辆AK0803的违章记录查找违章次数3次

以上车辆任务4键值对RDD的reduceByKey、mapValues、groupByKey等操作。根据交通安全检查工作需要,查找本市违章记录数据(records.txt)中,1月份违章次数3次以上车辆予以重点关注。“键值对”是一种比较常见的RDD元素类型,分组和聚合操作中经常会用到;所谓键值对RDD(PairRDD)是指每个RDD元素都是(Key,Value)键值类型;普通RDD里面存储的数据类型是Int、String等,而“键值对RDD”里面存储的数据类型是“键值对”。键值对RDD的生成主要有两种方法,一种是通过map方法将普通RDD转为PairRDD,另一种是直接通过List创建PairRDD。(1)由普通RDD通过map转换为PairRDD4.1键值对RDD通过List直接创建PairRDD4.1键值对RDDkeys操作会把键值对RDD中的所有key返回形成一个新的RDD。由四个键值对(“spark”,1)、(“hadoop”,2)、(“flink”,3)和(“storm”,4)构成的RDD,采用keys后得到的结果是一个RDD[String],内容是“spark”,“hadoop”,“flink”,“storm”。4.2keys操作得到一个新RDDvalues会把键值对RDD中的所有value返回,形成一个新的RDD。由四个键值对(“spark”,1)、(“hadoop”,2)、(“flink”,3)和(“storm”,4)构成的RDD,采用keys后得到的结果是一个RDD[Int],其元素为1、2、3、4。4.3values操作得到一个新RDDlookup用于查找指定key的所有value值;对于people,people.lookup(“tom”)可以查找健为“tom”的所有value值,返回一个数组。4.4lookup查找valuegroupByKey的功能是,对具有相同键的值进行分组。如图3-44,对5个键值对(“apple”,5.5)、(“orange”,3.0)、(“apple”,8.2)、(“banana”,2.7)、(“orange”,4.2),采用groupByKey后得到的新RDD(gruped),其元素为(banana,CompactBuffer(2.7))、(orange,CompactBuffer(3.0,4.2))、(apple,CompactBuffer(5.5,8.2))。4.5groupByKeygroupByKey的功能是,对具有相同键的值进行分组。如图3-44,对5个键值对(“apple”,5.5)、(“orange”,3.0)、(“apple”,8.2)、(“banana”,2.7)、(“orange”,4.2),采用groupByKey后得到的新RDD(gruped),其元素为(banana,CompactBuffer(2.7))、(orange,CompactBuffer(3.0,4.2))、(apple,CompactBuffer(5.5,8.2))。reduceByKey(func)的功能是,使用func函数合并具有相同键的值。如图3-45,furitsRDD有5个元素:(apple,5.5)、(orange,3.0)、(apple,8.2)、(banana,2.7)、(orange,4.2),调用reduceByKey((a,b)=>a+b)方法后,对具有相同key的键值对进行合并后的结果就为(banana,2.7)、(orange,7.2)、(apple,13.7)。可以看出,(a,b)=>a+b这个匿名函数中,a和b都是指value,其作用既是将key相同的所有value累加。4.6reduceByKeyredreduceByKey操作的用法4.6reduceByKey实际业务中,可能遇到只想对键值对RDD的value部分进行处理,但不对key进行处理的情况;可以使用mapValues(func),它的功能是对键值对RDD中的每个value都应用一个函数、完成相应的处理,但key不做任何改变。例如对5个键值对(“apple”,5.5)、(“orange”,3.0)、(“apple”,8.2)、(“banana”,2.7)、(“orange”,4.2)构成的键值对RDD,如果执行mapValues(x=>x+5),就会得到一个新的键值对RDD,它包含下面5个键值对(“apple”,10.5)、(“orange”,8.0)、(“apple”,13.2)、(“banana”,7.7)、(“orange”,9.2),可以看出原有的key部分不变,但value加了5。4.7mapValues对键值对RDD的value进行处理sortByKey是根据key进行排序,即返回一个根据键排序的RDD;对5个键值对(5.5,“apple”)、(3.0,“orange”)、(8.2,“apple”)、(2.7,“banana”)、(4.2,“orange”)构成的键值对RDD,执行sortByKey操作,则返回排序后的RDD。sortByKey可以加入布尔型参数,默认值为true,标识升序排列;若要降序排列,则sortByKey(false)。4.8sortByKey排序违章记录(records.txt)每一行代表一个违章记录,因此只要找出车牌号出现3次以上车辆即可。首先由records.txt创建RDD,而后将其转换为包含车牌号的键值对,键值对格式:(车牌号,1)。4.9查找违章次数3次以上的车辆而后使用redueByKey操作,统计各车牌出现的次数;最后过滤出违章3次以上车辆,车牌号CZ8463、MU0066。4.9查找违章次数3次以上的车辆实际业务中,解决问题的方法有很多,本项任务也可以采用groupByKey等其他方法,亦可达到同样的效果。4.9查找违章次数3次以上的车辆查找累计扣12分

以上车辆信息任务5从本市违章记录数据(records.txt)中,使用join等操作找出相关车辆,输出相关信息:车牌号、车主姓名、车主电话。根据车主预留电话,模拟发一条短信(打印一句话),提醒其到交管部门协助调查。zip操作是一个转换操作,可以将两个元素数量相同、分区数相同的RDD组合成一个键值对RDD。两个非键值对RDD,通过zip操作,生成一个新的键值对RDD。需要注意的是,如果两个RDD元素数量、分区数量不同,进行zip操作则抛出异常。5.1zip操作将两个RDD组合成键值对RDDjoin概念来自于关系数据库领域,SparkRDD中的join的类型也和关系数据库中的join类型一样,包括内连接(join)、左外连接(leftOuterJoin)、右外连接(rightOuterJoin)等。Spark中,join就表示内连接,对于给定的两个输入数据集(K,V1)和(K,V2),只有在两个数据集中都存在的key才会被输出,最终得到一个(K,(V1,V2))类型的数据集。下图给出了rdd1键值对集合{("tom",1),("jerry",2),("ken",3)},rdd2键值对集合{("tom",4),("jerry",5),("apple",6)},rdd1.join(rdd2)得到新RDD,其元素为(“tom”,(1,4)),(“jerry”,(2,5)).5.2join连接两个RDDrightOuterJoin为根据两个RDD的健进行右连接,rightOuterJoin类似于SQL中的右外关联rightouterjoin,返回结果以右面(第二个)的RDD为主,关联不上的记录为空(None值)。rightOuterJoin只能用于两个RDD之间的关联,如果要多个RDD关联,多关联几次即可。如下图所示,rdd1、rdd2右连接,则以rdd2为主,如rdd1中没有对应的键,显示None值;如rdd1中有相应的键,则显示Some类型。5.3rightOuterJoin右连接leftOuterJoin为根据两个RDD的健进行右连接,leftOuterJoin类似于SQL中的左外关联leftouterjoin,返回结果以左面(第一个)的RDD为主,关联不上的记录为空(None值)。leftOuterJoin只能用于两个RDD之间的关联,如果要多个RDD关联,多关联几次即可。如下图所示,rdd1、rdd2进行左连接,则以rdd1为主,如rdd2中没有对应的键,显示None值;如rdd2中有相应的键,则显示Some类型。5.4leftOuterJoin左连接fullOuterJoin是全连接,会保留两个RDD的所有键连接结果。5.5fullOuterJoin全连接针对车辆违章数据,要想找出累计扣12分以上车辆,并输出车牌号、车主姓名、车主电话等相关信息,则需要用到records.txt、violation.txt、owner.txt3个文件;具体步骤如下:(1)生成RDD5.6查找扣分超过12分车辆信息(2)records信息与violation信息内连接5.6查找扣分超过12分车辆信息(3)找出扣分超过12分车牌号5.6查找扣分超过12分车辆信息(4)与owner信息内连接,找出车主姓名、车主电话将车主信息RDD转为键值对形式。车主信息与penalize_over12内连接,确定扣分超过12分的车主姓名、电话等,输出格式为:车牌号、总扣分、车主姓名、车主电话。5.6查找扣分超过12分车辆信息给车主发短信提示5.6查找扣分超过12分车辆信息各类文件的读写操作任务6各类文件的读取并生成RDD及保存为相应格式文件。文本文件生成RDD,可以直接使用SparkContext类的textFile(“文件位置”)方法;新建一个文本文件myfile.txt,内容如下:myfile.txt文件的置于/home/hadoop/data目录下,读取myfile.txt并生成RDD。6.1读写文本文件Ilikesparkandbigdata!Helikesspark.Shelikesspark,too.在Spark中,可以通过saveAsTextFile方法将RDD中的数据保存成普通文本文件;saveAsTextFile接收一个存储路径,该路径可以是HDFS,也可以是本地文件系统(Linux、Windows等),下图演示RDD保存为文本文件。6.1读写文本文件执行完毕后,我们发现/home/hadoop目录下产生了一个out文件夹,打开该文件夹,发现有两个文件part_00000、_SUCCESS,其中part_00000存储了fruitsRDD的内容数据。6.1读写文本文件对于JSON格式的文件,可以使用Spark自带的Json解析工具读取其数据并生成RDD,也可以使用其他解析包。新建一个JSON文件employees.json,文件中每一行都是一个完整JSON字符串,内容如下:employees.json文件的置于/home/hadoop目录下,使用Spark自带的scala.util.parsing.json包读取employees.json并生成RDD。6.2读写JSON格式的数据{"name":"Michael","salary":3000}{"name":"Andy","salary":4500}{"name":"Justin","salary":3500}{"name":"Berta","salary":4000}注意content的元素类型为“Option[Any]”,ScalaOption(选项)类型用来表示一个值是可选的(有值或无值)。Option[T]是一个类型为T的可选值的容器:如果值存在,则Option[T]就是一个Some[T];如果不存在,则Option[T]就是对象None;使用getOrElse()方法来获取元组中存在的元素或者使用其默认的值。6.2读写JSON格式的数据将RDD数据保存为JSON文件,其操作跟保存为普通文件类似,仅多了一个步骤:数据写入文件前将数据格式化为JSON格式。假设要输出以下内容的JSON数据:{"name":"Michael","salary":3000,"adress":["地址1",”地址2”]}{"name":"Andy","salary":4500,"adress":["地址3",”地址4”,”地址5”]}{"name":"Justin","salary":3500,"adress":["地址6",”地址7”]}6.2读写JSON格式的数据RDD数据保存为JSON文件6.2读写JSON格式的数据上述代码执行完毕后,可以发现/home/hadoop目录下产生了一个outjson文件夹;打开该文件夹,发现有两个文件part_00000、_SUCCESS,其中part_00000存储了dataRDD的内容数据。6.2读写JSON格式的数据CSV(commaseparatedvalues)逗号分隔值、TSV(tabseparatedvalues)制表符分割值文件是常用的文件格式;其读取方式与普通文本文件基本一致。逗号分割值每行数据以英文“,”分割,现有CSV文件author.csv(/home/hadoop/author.csv),数据格式如下:下图中,由文件author.csv生成RDD,为做进一步数据分析,使用匿名函数x=>x.split(",")对其元素进行切割。6.3读写CSV、TSV格式文件李清照,女,宋,词人,68陆游,男,宋,词人,73孟浩然,男,唐,诗人,58制表符分割值每行数据以“\t”(即键盘上的Tab键)分割,现有TSV文件author.tsv(/home/hadoop/author.tsv),数据格式如下:下图中,由文件author.tsv生成RDD,为做进一步数据分析,使用匿名函数x=>x.split(",")对其元素进行切割。6.3读写CSV、TSV格式文件对于RDD保存为CSV文件、TSV文件。6.3读写CSV、TSV格式文件执行完毕上述代码后,在/home/hadoop目录下可以看到生成的文件夹,内有保存的csv、tsv格式文件。6.3读写CSV、TSV格式文件SequenceFile格式较为特殊,只有键值

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

最新文档

评论

0/150

提交评论