




版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
智建领航者智建领航者厦门大学林子雨编著《大数据技术原理与应用》进阶学习自学教程Spark快速入门指南–Spark安装与基础使用主讲教师:林子雨厦门大学数据库实验室二零一六年一月智建领航者智建领航者
目录1 前言 12 准备工作 13 安装Spark 14 运行Spark示例 25 通过SparkShell进行交互分析 35.1 基础操作 45.2 RDD的更多操作 55.3 缓存 66 SparkSQL和DataFrames 67 SparkStreaming 88 独立应用程序(Self-ContainedApplication) 98.1 应用程序代码 98.2 安装sbt 108.3 使用sbt打包Scala程序 128.4 通过spark-submit运行程序 139 进阶学习 13附录1:任课教师介绍 13附录2:课程教材介绍 14附录3:中国高校大数据课程公共服务平台介绍 15智建领航者智建领航者
《大数据技术原理与应用》Spark快速入门指南–Spark安装与基础使用主讲教师:林子雨E-mail:ziyulin@个人主页:/linziyu前言ApacheSpark是一个新兴的大数据处理通用引擎,提供了分布式的内存抽象。Spark正如其名,最大的特点就是快(Lightning-fast),可比HadoopMapReduce的处理速度快100倍。此外,Spark提供了简单易用的API,几行代码就能实现WordCount。本教程主要参考官网快速入门教程,介绍了Spark的安装,Sparkshell、RDD、SparkSQL、SparkStreaming等的基本使用。本教程的具体运行环境如下:CentOS6.4Spark1.6Hadoop2.6.0JavaJDK1.7Scala2.10.5准备工作运行Spark需要JavaJDK1.7,CentOS6.x系统默认只安装了JavaJRE,还需要安装JavaJDK,并配置好JAVA_HOME变量。此外,Spark会用到HDFS与YARN,因此请先安装Hadoop,具体请浏览Hadoop安装教程,在此就不再复述。安装Spark待Hadoop安装好之后,我们再开始安装Spark。官网下载地址:/downloads.html本教程选择的是Spark1.6.0版本,选择packagetype为“Pre-buildwithuser-providedHadoop[canusewithmostHadoopdistributions]”,再点击给出的下载连接/dyn/closer.lua/spark/spark-1.6.0/spark-1.6.0-bin-without-hadoop.tgz就可以下载了,如下图所示:PackagetypeSourcecode:Spark源码,需要编译才能使用,另外Scala2.11需要使用源码编译才可使用Pre-buildwithuser-providedHadoop:“Hadoopfree”版,可应用到任意Hadoop版本Pre-buildforHadoop2.6andlater:基于Hadoop2.6的预先编译版,需要与本机安装的Hadoop版本对应。可选的还有Hadoop2.4andlater、Hadoop2.3、Hadoop1.x,以及CDH4。为方便,本教程选择的是Pre-buildwithuser-providedHadoop,简单配置后可应用到任意Hadoop版本。下载后,执行如下命令进行安装:sudotar-zxf~/下载/spark-1.6.0-bin-without-hadoop.tgz-C/usr/local/cd/usr/localsudomv./spark-1.6.0-bin-without-hadoop/./sparksudochown-Rhadoop:hadoop./spark#此处的hadoop为你的用户名安装后,需要在./conf/spark-env.sh中修改Spark的Classpath,执行如下命令拷贝一个配置文件:cd/usr/local/sparkcp./conf/spark-env.sh.template./conf/spark-env.sh编辑./conf/spark-env.sh(vim./conf/spark-env.sh),在最后面加上如下一行:exportSPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoopclasspath)保存后,Spark就可以启动、运行了。运行Spark示例注意,必须安装Hadoop才能使用Spark,但如果使用Spark过程中没用到HDFS,不启动Hadoop也是可以的。此外,接下来教程中出现的命令、目录,若无说明,则一般以Spark的安装目录(/usr/local/spark)为当前路径,请注意区分。在./examples/src/main目录下有一些Spark的示例程序,有Scala、Java、Python、R等语言的版本。我们可以先运行一个示例程序SparkPi(即计算π的近似值),执行如下命令:cd/usr/local/spark./bin/run-exampleSparkPi执行时会输出非常多的运行信息,输出结果不容易找到,可以通过grep命令进行过滤(命令中的2>&1可以将所有的信息都输出到stdout中,否则由于输出日志的性质,还是会输出到屏幕中):./bin/run-exampleSparkPi2>&1|grep"Piisroughly"过滤后的运行结果如下图所示,可以得到π的5位小数近似值:Python版本的SparkPi则需要通过spark-submit运行:./bin/spark-submitexamples/src/main/python/pi.py通过SparkShell进行交互分析Sparkshell提供了简单的方式来学习API,也提供了交互的方式来分析数据。SparkShell支持Scala和Python,本教程选择使用Scala来进行介绍。Scala是一门现代的多范式编程语言,志在以简练、优雅及类型安全的方式来表达常用编程模式。它平滑地集成了面向对象和函数语言的特性。Scala运行于Java平台(JVM,Java虚拟机),并兼容现有的Java程序。Scala是Spark的主要编程语言,如果仅仅是写Spark应用,并非一定要用Scala,用Java、Python都是可以的。使用Scala的优势是开发效率更高,代码更精简,并且可以通过SparkShell进行交互式实时查询,方便排查问题。执行如下命令启动SparkShell:./bin/spark-shell启动成功后如图所示,会有“scala>”的命令提示符。基础操作Spark的主要抽象是分布式的元素集合(distributedcollectionofitems),称为RDD(ResilientDistributedDataset,弹性分布式数据集),它可被分发到集群各个节点上,进行并行操作。RDDs可以通过HadoopInputFormats创建(如HDFS),或者从其他RDDs转化而来。我们从./README文件新建一个RDD,代码如下(代码中//后的内容为注释,本教程以////开头的内容表示交互式输出结果):valtextFile=sc.textFile("file:///usr/local/spark/README.md")////textFile:org.apache.spark.rdd.RDD[String]=MapPartitionsRDD[1]attextFileat<console>:27代码中通过“file://”前缀指定读取本地文件。Sparkshell默认是读取HDFS中的文件,需要先上传文件到HDFS中,否则会有“org.apache.hadoop.mapred.InvalidInputException:Inputpathdoesnotexist:hdfs://localhost:9000/user/hadoop/README.md”的错误。上述命令的输出结果如下图所示:RDDs支持两种类型的操作:actions:在数据集上运行计算后返回值transformations:转换,从现有数据集创建一个新的数据集下面我们就来演示count()和first()操作:textFile.count()//RDD中的item数量,对于文本文件,就是总行数////res0:Long=95textFile.first()//RDD中的第一个item,对于文本文件,就是第一行内容////res1:String=#ApacheSpark接着演示transformation,通过filtertransformation来返回一个新的RDD,代码如下:vallinesWithSpark=textFile.filter(line=>line.contains("Spark"))//筛选出包含Spark的行linesWithSpark.count()//统计行数////res4:Long=17可以看到一共有17行内容包含Spark,这与通过Linux命令cat./README.md|grep"Spark"-c得到的结果一致,说明是正确的。action和transformation可以用链式操作的方式结合使用,使代码更为简洁:textFile.filter(line=>line.contains("Spark")).count()//统计包含Spark的行数////res4:Long=17RDD的更多操作RDD的actions和transformations可用在更复杂的计算中,例如通过如下代码可以找到包含单词最多的那一行内容共有几个单词:textFile.map(line=>line.split("").size).reduce((a,b)=>if(a>b)aelseb)////res5:Int=14代码首先将每一行内容map为一个整数,这将创建一个新的RDD,并在这个RDD中执行reduce操作,找到最大的数。map()、reduce()中的参数是Scala的函数字面量(functionliterals,也称为闭包closures),并且可以使用语言特征或Scala/Java的库。例如,通过使用Math.max()函数(需要导入Java的Math库),可以使上述代码更容易理解:importjava.lang.MathtextFile.map(line=>line.split("").size).reduce((a,b)=>Math.max(a,b))////res6:Int=14HadoopMapReduce是常见的数据流模式,在Spark中同样可以实现(下面这个例子也就是WordCount):valwordCounts=textFile.flatMap(line=>line.split("")).map(word=>(word,1)).reduceByKey((a,b)=>a+b)//实现单词统计////wordCounts:org.apache.spark.rdd.RDD[(String,Int)]=ShuffledRDD[4]atreduceByKeyat<console>:29wordCounts.collect()//输出单词统计结果////res7:Array[(String,Int)]=Array((package,1),(For,2),(Programs,1),(processing.,1),(Because,1),(The,1)...)缓存Spark支持在集群范围内将数据集缓存至每一个节点的内存中,可避免数据传输,当数据需要重复访问时这个特征非常有用,例如查询体积小的“热”数据集,或是运行如PageRank的迭代算法。调用cache(),就可以将数据集进行缓存:linesWithSpark.cache()SparkSQL和DataFramesSparkSQL是Spark内嵌的模块,用于结构化数据。在Spark程序中可以使用SQL查询语句或DataFrameAPI。DataFrames和SQL提供了通用的方式来连接多种数据源,支持Hive、Avro、Parquet、ORC、JSON、和JDBC,并且可以在多种数据源之间执行join操作。下面仍在Sparkshell中演示一下SparkSQL的基本操作,该部分内容主要参考了SparkSQL、DataFrames和Datasets指南。SparkSQL的功能是通过SQLContext类来使用的,而创建SQLContext是通过SparkContext创建的。在Sparkshell启动时,输出日志的最后有这么几条信息:16/01/1613:25:41INFOrepl.SparkILoop:Createdsparkcontext..Sparkcontextavailableassc.16/01/1613:25:41INFOrepl.SparkILoop:Createdsqlcontext..SQLcontextavailableassqlContext.这些信息表明SparkContent和SQLContext都已经初始化好了,可通过对应的sc、sqlContext变量直接进行访问。使用SQLContext可以从现有的RDD或数据源创建DataFrames。作为示例,我们通过Spark提供的JSON格式的数据源文件./examples/src/main/resources/people.json来进行演示,该数据源内容如下:{"name":"Michael"}{"name":"Andy","age":30}{"name":"Justin","age":19}执行如下命令导入数据源,并输出内容:valdf=sqlContext.read.json("file:///usr/local/spark/examples/src/main/resources/people.json")////df:org.apache.spark.sql.DataFrame=[age:bigint,name:string]df.show()//输出数据源内容////+++////|age|name|////+++////|null|Michael|////|30|Andy|////|19|Justin|////+++接着,我们来演示DataFrames处理结构化数据的一些基本操作:df.select("name").show()//只显示"name"列////++////|name|////++////|Michael|////|Andy|////|Justin|////++df.select(df("name"),df("age")+1).show()//将"age"加1////+++////|name|(age+1)|////+++////|Michael|null|////|Andy|31|////|Justin|20|////+++df.filter(df("age")>21).show()#条件语句////+++////|age|name|////+++////|30|Andy|////+++df.groupBy("age").count().show()//groupBy操作////+++////|age|count|////+++////|null|1|////|19|1|////|30|1|////+++当然,我们也可以使用SQL语句来进行操作:df.registerTempTable("people")//将DataFrame注册为临时表peoplevalresult=sqlContext.sql("SELECTname,ageFROMpeopleWHEREage>=13ANDage<=19")//执行SQL查询result.show()//输出结果////+++////|name|age|////+++////|Justin|19|////+++更多的功能可以查看完整的DataFramesAPI,此外DataFrames也包含了丰富的DataFramesFunction可用于字符串处理、日期计算、数学计算等。SparkStreaming流计算除了使用Storm框架,使用SparkStreaming也是一个很好的选择。基于SparkStreaming,可以方便地构建可拓展、高容错的流计算应用程序。SparkStreaming使用SparkAPI进行流计算,这意味着在Spark上进行流处理与批处理的方式一样。因此,你可以复用批处理的代码,使用SparkStreaming构建强大的交互式应用程序,而不仅仅是用于分析数据。下面以一个简单的SparkStreaming示例(基于流的单词统计)来演示一下SparkStreaming:本地服务器通过TCP接收文本数据,实时输出单词统计结果。该部分内容主要参考了SparkStreaming编程指南。运行该示例需要Netcat(在网络上通过TCP或UDP读写数据),CentOS6.x系统中默认没有安装,经过测试,如果通过yum直接安装,运行时会有“nc:Protocolnotavailable”的错误,需要下载较低版本的nc才能正常使用。我们选择Netcat0.6.1版本,在终端中运行如下命令进行安装:wget/project/netcat/netcat/0.6.1/netcat-0.6.1-1.i386.rpm-O~/netcat-0.6.1-1.i386.rpm#下载sudorpm-iUv~/netcat-0.6.1-1.i386.rpm#安装安装好NetCat之后,使用如下命令建立本地数据服务,监听TCP端口9999:#记为终端1nc-l-p9999启动后,该端口就被占用了,需要开启另一个终端运行示例程序,执行如下命令:#需要另外开启一个终端,记为终端2,然后运行如下命令/usr/local/spark/bin/run-examplestreaming.NetworkWordCountlocalhost9999接着在终端1中输入文本,在终端2中就可以实时看到单词统计结果了。SparkStreaming的内容较多,本教程就简单介绍到这,更详细的内容可查看官网教程。最后需要关掉终端2,并按ctrl+c退出终端1的Netcat。独立应用程序(Self-ContainedApplication)接着我们通过一个简单的应用程序SimpleApp来演示如何通过SparkAPI编写一个独立应用程序。使用Scala编写的程序需要使用sbt进行编译打包,相应的,Java程序使用Maven编译打包,而Python程序通过spark-submit直接提交。应用程序代码在终端中执行如下命令创建一个文件夹sparkapp作为应用程序根目录:cd~#进入用户主文件夹mkdir./sparkapp#创建应用程序根目录mkdir-p./sparkapp/src/main/scala#创建所需的文件夹结构在./sparkapp/src/main/scala下建立一个名为SimpleApp.scala的文件(vim./sparkapp/src/main/scala/SimpleApp.scala),添加代码如下:/*SimpleApp.scala*/importorg.apache.spark.SparkContextimportorg.apache.spark.SparkContext._importorg.apache.spark.SparkConfobjectSimpleApp{defmain(args:Array[String]){vallogFile="file:///usr/local/spark/README.md"//Shouldbesomefileonyoursystemvalconf=newSparkConf().setAppName("SimpleApplication")valsc=newSparkContext(conf)vallogData=sc.textFile(logFile,2).cache()valnumAs=logData.filter(line=>line.contains("a")).count()valnumBs=logData.filter(line=>line.contains("b")).count()println("Lineswitha:%s,Lineswithb:%s".format(numAs,numBs))}}该程序计算/usr/local/spark/README文件中包含“a”的行数和包含“b”的行数。代码第8行的/usr/local/spark为Spark的安装目录,如果不是该目录请自行修改。不同于Sparkshell,独立应用程序需要通过valsc=newSparkContext(conf)初始化SparkContext,SparkContext的参数SparkConf包含了应用程序的信息。该程序依赖SparkAPI,因此我们需要通过sbt进行编译打包。在./sparkapp中新建文件simple.sbt(vim./sparkapp/simple.sbt),添加内容如下,声明该独立应用程序的信息以及与Spark的依赖关系:name:="SimpleProject"version:="1.0"scalaVersion:="2.10.5"libraryDependencies+="org.apache.spark"%%"spark-core"%"1.6.0"文件simple.sbt需要指明Spark和Scala的版本。启动Sparkshell的过程中,当输出到Spark的符号图形时,可以看到相关的版本信息。安装sbtSpark中没有自带sbt,需要手动安装sbt,我们选择安装在/usr/local/sbt中:sudomkdir/usr/local/sbtsudochown-Rhadoop/usr/local/sbt#此处的hadoop为你的用户名cd/usr/local/sbt经笔者测试,按官网教程安装sbt0.13.9后,使用时可能存在网络问题,无法下载依赖包,导致sbt无法正常使用,需要进行一定的修改。为方便,请使用笔者修改后的版本,下载地址:/s/1eRyFddw。下载后,执行如下命令拷贝至/usr/local/sbt中:cp~/下载/sbt-launch.jar.接着在/usr/local/sbt中创建sbt脚本(vim./sbt),添加如下内容:#!/bin/bashSBT_OPTS="-Xms512M-Xmx1536M-Xss1M-XX:+CMSClassUnloadingEnabled-XX:MaxPermSize=256M"java$SBT_OPTS-jar`dirname$0`/sbt-launch.jar"$@"保存后,为./sbt脚本增加可执行权限:chmodu+x./sbt最后检验sbt是否可用(首次运行会处于“Gettingorg.scala-sbtsbt0.13.9…”的下载状态,请耐心等待。笔者等待了7分钟才出现第一条下载提示):./sbtsbt-version下载过程中可能会类似“ServeraccessError:java.security.ProviderException:java.security.KeyExceptionurl=/org/scala-sbt/precompiled-2_9_3/0.13.9/precompiled-2_9_3-0.13.9.jar”的错误,可以忽略。可再执行一次./sbtsbt-version,只要能得到如下图的版本信息就没问题:如果由于网络问题无法下载依赖,导致sbt无法正确运行的话,可以下载笔者提供的离线依赖包sbt-0.13.9-repo.tar.gz到本地中(依赖包的本地位置为~/.sbt和~/.ivy2,检查依赖关系时,首先检查本地,本地未找到,再从网络中下载),下载地址:/s/1sjTQ8yD。下载后,执行如下命令解压依赖包:tar-zxf~/下载/sbt-0.13.9-local-repo.tar.gz~通过这个方式,一般可以解决依赖包缺失的问题(读者提供的依赖包仅适合于Spark1.6版本,不同版本依赖关系不一样)。使用sbt打包Scala程序为保证sbt能正常运行,先执行如下命令检查整个应用程序的文件结构:cd~/sparkappfind.文件结构应如下图所示:接着,我们就可以通过如下代码将整个应用程序打包成JAR(首次运行同样需要下载依赖包,如果这边遇到网络问题无法成功,也请下载上述安装sbt提到的离线依赖包sbt-0.13.9-repo.tar.gz):/usr/local/sbt/sbtpackage打包成功的话,会输出如下图内容:生成的jar包的位置为~/sparkapp/target/scala-2.10/simple-project_2.10-1.0.jar。通过spark-submit运行程序最后,我们就可以将生成的jar包通过spark-submit提交到Spark中运行了,命令如下:/usr/local/spark/bin/spark-submit\--class"SimpleApp"~/sparkapp/target/scala-2.10/simple-project_2.10-1.0.jar#输出信息太多,可以通过如下命令过滤直接查看结果/usr/local/spark/bin/spark-submit\--class"SimpleApp"~/sparkapp/target/scala-2.10/simple-project_2.10-1.0.jar2>&1\|grep"Lineswitha:"最终得到的结果如下:Lineswith
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 电梯管理证-试题含答案
- 专业素质笔试题及答案
- 武术教练劳务合同6篇
- 商业调研及分析服务合同
- 疾病预防检验题库及答案
- 2025年街舞理论考题题库及答案
- 普通生物中考试题及答案
- 2025年新高考 听力试题及答案
- 2025年大学生心理健康教育试题库+答案(附解析)
- 环境物品清洁消毒及灭菌培训题库(试题及答案)
- 普外科21个病种临床路径-
- 地质年代周期表
- 新闻编辑(修改版)马工程课件 第六章
- GB/T 2930.8-2017草种子检验规程水分测定
- 勘察设计工作大纲
- GB/T 17188-1997农业灌溉设备滴灌管技术规范和试验方法
- 关于国有集团公司采购管理办法【五篇】
- 2022年资阳市雁江区社区工作者招聘考试笔试试题及答案解析
- 帮助卧床老年人使用便器排便课件
- 【高考英语精品专题】必修1 Unit 1 Life Choices-高考英语-一轮总复习备考方略课件PPT(新教材北师大版)
- 中国传媒大学-新媒体概论(刘行芳)-课件
评论
0/150
提交评论