阿里云E-MapReduce-最佳实践-D_第1页
阿里云E-MapReduce-最佳实践-D_第2页
阿里云E-MapReduce-最佳实践-D_第3页
阿里云E-MapReduce-最佳实践-D_第4页
阿里云E-MapReduce-最佳实践-D_第5页
已阅读5页,还剩47页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

1、E-MapReduce开发手册E-MapReduce/开发手册E-MapReduce/开发手册 PAGE 51 PAGE 51开发手册开发准备开发准备简介本文档假定:您已经开通了阿里云服务,并创建了Access Key ID和Access Key Secret。文中的ID指的是Access Key ID,KEY 指的是Access Key Secret。如果您还没有开通或者还不了解OSS,请登录OSS产品主页获取更多的帮助。您已经对Spark,Hadoop,Hive和Pig具备一定的认识。文中不对Spark,Hadoop,Hive和Pig开发 实践做额外的介绍。更多的开发文档资料可以到apac

2、he官网获取。您已经对Scala语法具备一定的认识,文中的某些例子以Scala语言展示。OSS URI在使用E-MapReduce时,用户将会使用到两种OSS URI,分别是:native URI: oss:/accessKeyId:accessKeySecretbucket.endpoint/object/path用户在作业中指定输入输出数据源时使用这种URI,可以类比hdfs:/。用户操作OSS数据时,可 以将accessKeyId,accessKeySecret以及endpoint配置到Configuration中,也可以在URI中直 接指定accessKeyId,accessKeySe

3、cret以及endpoint。ref URI: ossref:/bucket/object/path只在E-MapReduce作业配置时有效,用来指定作业运行需要的资源。例如以下作业配置示例:在使用过程中,需要特别注意URI中scheme的不同。注意E-MapReduce在支持向OSS写数据时使用OSS的multipart分片上传方式。这里需要提醒的是,当作业异常中 断后,OSS中会残留作业已经生产的部分数据,需要您手动删掉。这里的行为和作业输出到HDFS是一致的,作 业异常中断后,HDFS也会残留数据,也需要手动删掉。但有一个区别,OSS对使用multipart上传的文件,它 是先放在碎片管

4、理中,所以您不仅要删除OSS文件管理中的输出目录残留文件,还需要在OSS的碎片管理中清 理一次,否则会产生数据存储费用。示例Project本项目是一个完整的可编译可运行的项目,包括MapReduce,Pig,Hive和Spark示例代码,详情如下:MapReduceHiveWordCountPigsample.hive:表的简单查询Sparksample.pig:Pig处理OSS数据实例SparkPiPiSparkWordCount: 单词统计LinearRegression: 线性回归OSSSampleOSS使用示例ONSSampleONS使用示例ODPSSampleODPS使用示例MNSS

5、ample:MNS使用示例LoghubSample:Loghub使用示例项目下载地址。依赖资源测试数据(data目录下):The_Sorrows_of_Young_Werther.txt:可作为WordCount(MapReduce/Spark)的输入数据patterns.txt:WordCount(MapReduce)作业的过滤字符u.data:sample.hive脚本的测试表数据abalone:线性回归算法测试数据依赖jar包(lib目录下)tutorial.jar:sample.pig作业需要的依赖jar包准备工作本项目提供了一些测试数据,您可以简单地将其上传到OSS中即可使用。其他示

6、例,例如ODPS,MNS,ONS和Loghub等等,需要您自己准备数据如下:LogStore,参考日志服务用户指南。ODPS项目和表,参考ODPS快速开始。ONS,参考消息队列快速开始。MNS,参考消息服务控制台使用帮助。基本概念:OSSURI: oss:/accessKeyId:accessKeySecretbucket.endpoint/a/b/c.txt,用户在作业中指定输入输出数据源时使用,可以类比hdfs:/。阿里云AccessKeyId/AccessKeySecret是您访问阿里云API的密钥,你可以在这里获取。集群运行SparkSparkWordCount: spark-subm

7、it -class SparkWordCount examples-1.0- SNAPSHOT-shaded.jarinputPathoutputPathnumPartitionRDD分片数目SparkPi: spark-submit -class SparkPi examples-1.0-SNAPSHOT-shaded.jarOSSSample:spark-submit -class OSSSample examples-1.0-SNAPSHOT-shaded.jar inputPathnumPartition:输入数据RDD分片数目ONSSample: spark-submit -clas

8、s ONSSample examples-1.0-SNAPSHOT- shaded.jar accessKeyIdAccessKeyIdaccessKeySecret:阿里云AccessKeySecretconsumerIdConsumerID说明topictopicsubExpression消息过滤。parallelism:指定多少个接收器来消费队列消息。ODPSSample: spark-submit -class ODPSSample examples-1.0-SNAPSHOT- shaded.jaraccessKeyIdAccessKeyIdaccessKeySecret:阿里云Acc

9、essKeySecretenvType: 0表示公网环境,1表示内网环境。如果是本地调试选择0,如果是在E-MapReduce上执行请选择1。table:参考ODPS术语介绍。numPartition:输入数据RDD分片数目MNSSample:spark-submit-classMNSSampleexamples-1.0-SNAPSHOT-shaded.jar queueName:队列名,参考MNS名词解释。accessKeyIdAccessKeyIdaccessKeySecret:阿里云AccessKeySecretendpoint:队列数据访问地址LoghubSample: spark-s

10、ubmit -class LoghubSample examples-1.0-SNAPSHOT- shaded.jar slsprojectLogService项目名slslogstoreloghubgroupname:作业中消费日志数据的组名,可以任意取。sls project,sls store相同时,相同组名的作业会协同消费sls store中的数据;不同组名的作业会相互隔离地消费slsstore中的数据。slsendpoint日志服务入口。accessKeyIdAccessKeyIdaccessKeySecret:阿里云AccessKeySecretbatchintervalsecon

11、dsSparkStreaming作业的批次间隔,单位为秒。LinearRegression: spark-submit -class LinearRegression examples-1.0-SNAPSHOT- shaded.jar inputPath:输入数据numPartition:输入数据RDD分片数目MapreduceWordCount: hadoop jar examples-1.0-SNAPSHOT-shaded.jar WordCount - Dwordcount.case.sensitive=true -skip inputPathl:输入数据路径outputPath:输出路

12、径patternPath:过滤字符文件,可以使用data/patterns.txtHivehive -f sample.hive -hiveconfinputPath=inputPath:输入数据路径Pigpig-xmapreduce-fsample.pig-paramtutorial=-param input= -paramresult=tutorialJarPath:依赖Jar包,可使用lib/tutorial.jarinputPath:输入数据路径resultPath:输出路径注意:本地运行如果在-MapReduce上使用时,请将测试数据和依赖jar包上传到OSS中,路径规则遵循OSSU

13、RI定义,见上。如果集群中使用,可以放在机器本地。这里主要介绍如何在本地运行Spark程序访问阿里云数据源,例如OSS等。如果希望本地调试运行,最好借助一 些开发工具,例如IntellijIDEA或者Eclipse。尤其是Windows环境,否则需要在Windows机器上配置Hadoop和Spark运行环境,很麻烦。Intellij IDEA前提:安装IntellijIDEA,MavenIntellijIDEAMaven插件,Scala,IntellijIDEA Scala插件双击进入SparkWordCount.scala从下图箭头所指处进入作业配置界面选择SparkWordCount,在作

14、业参数框中按照所需传入作业参数点击OK点击运行按钮,执行作业查看作业执行日志Scala IDE for Eclipse前提:安装ScalaIDEforEclipse,Maven,EclipseMaven插件导入项目RunAsMavenbuild,快捷键是AltShilftXM;也可以在项目名上右键,Run As选择Mavenbuild等待编译完后,在需要运行的作业上右键,选择RunConfiguration,进入配置页在配置页中,选择ScalaApplication,并配置作业的MainClass和参数等等。点击Run查看控制台输出日志SparkSpark开发准备安装直接在Eclipse中使用

15、JAR包,步骤如下:在官方网站下载E-MapReduceSDK。解压并将emr-sdk_2.10-1.1.2.jar和emr-core-1.1.2.jar拷贝到您的工程文件夹中。在EclipsePropertiesJavaBuildPathAddJARs选择您下载的SDK。经过上面几步之后,您就可以在工程中读写OSS,LogService,MNS,ONS,ODPS等数 据了。Maven地址com.aliyun.emremr-core1.1.2com.aliyun.emremr-sdk_2.10emr-sdk_配置说明Spark代码中按照如下格式配置:属性名默认值说明spark

16、.hadoop.fs.oss.accessKe yId无访问OSS所需的Access Key ID(可选)spark.hadoop.fs.oss.accessKe ySecret无访问OSS所需的Access Key Secret(可选)spark.hadoop.fs.oss.securityT oken无访问OSS所需的STS token(可选)spark.hadoop.fs.oss.endpoint无访问OSS的endpoint(可选)spark.hadoop.fs.oss.multipart.thread.number5并发进行OSS的upload part copy的并发度spark.

17、hadoop.fs.oss.copy.sim ple.max.byte134217728使用普通接口进行OSS内部copy的文件大小上限spark.hadoop.fs.oss.multipart.split.max.byte67108864使用普通接口进行OSS内部copy的文件分片大小上限spark.hadoop.fs.oss.multipart.split.number5使用普通接口进行OSS内部copy的文件分片数目,默认和拷贝并发数目保持一致spark.hadoop.fs.oss.implcom.aliyun.fs.oss.nat.NativeO ssFileSystemOSS文件系统

18、实现类s/mnt/disk1,/mnt/disk2,oss本地临时文件目录,默认使用集群的数据盘spark.hadoop.fs.oss.buffer.dir s.existsfalse是否确保oss临时目录已经存在spark.hadoop.fs.oss.client.co nnection.timeout50000OSS Client端的连接超时时间(单位毫秒)spark.hadoop.fs.oss.client.soc ket.timeout50000OSS Client端的socket超时时间(单位毫秒)spark.hadoop.fs.oss.client.co nnection.ttl-

19、1连接存活时间spark.hadoop.fs.oss.connecti on.max1024最大连接数目spark.hadoop.job.runlocalfalse当数据源是OSS时,如果需要本地调试运行Spark代码,需要设置此项为true,否认为falseerval200Receiver向LogHub取数据的时.millis间间隔rtrue是否有序消费分裂后的Shard数据 lis30000消费进程的心跳保持间隔spark.mns.batchMsg.size16批量拉取MNS消息条数,最大不能超过16spark.mns.pollingWait.secon ds30MNS队列为空时的拉取等待

20、间隔Spark代码本地调试val conf = new SparkConf().setAppName(getAppName).setMaster(local4) conf.set(spark.hadoop.fs.oss.impl, com.aliyun.fs.oss.nat.NativeOssFileSystem) conf.set(spark.hadoop.job.runlocal, true)val conf = new SparkConf().setAppName(getAppName).setMaster(local4) conf.set(spark.hadoop.fs.oss.imp

21、l, com.aliyun.fs.oss.nat.NativeOssFileSystem) conf.set(spark.hadoop.job.runlocal, true)val sc = new SparkContext(conf)val data = sc.textFile(oss:/.) println(scount: $data.count()注意: 这个配置项spark.hadoop.job.runlocal只是针对需要在本地调试Spark代码读写OSS数据的场景,除此之外只需要保持默认即可。三方依赖说明为了支持在E-MapReduce上操作阿里云的数据源(包括OSS,ODPS等)

22、,需要您的作业依赖一些三方包。 您可以参照这个POM文件来增删需要依赖的三方包。垃圾清理Spark作业失败后,需要检查一下OSS输出目 录是否有文件存在。另外您也要 检查OSS碎片管理中是否还有没有提交的碎片存在,如果存在请及时清理掉。简单操作OSS文件使用OSS SDK存在的问题在Spark或者Hadoop作业中无法直接使用OSS SDK来操作OSS中的文件。这是因为OSS SDK中依赖的http- 运行环境中的http-client存在版本冲突。如果要这么做,就必须先解决 这个依赖冲突问题。实际上在E-MapReduce中,Spark和Hadoop已经对OSS做了无缝兼容,可以像使用 HD

23、FS一样来操作OSS文件。推荐做法Scalaimport org.apache.hadoop.conf.Configuration Scalaimport org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path, FileSystemval dir = oss:/accessKeyId:accessKeySecretbucket.endpoint/dir val path = new Path(dir)val conf = new Configuration()conf.set(fs.oss.impl, com

24、.aliyun.fs.oss.nat.NativeOssFileSystem)val fs = FileSystem.get(path.toUri, conf) val fileList = fs.listStatus(path).Javaimport org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path;import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem;String dir = oss:/acces

25、sKeyId:accessKeySecretbucket.endpoint/dir; Path path = new Path(dir);Configuration conf = new Configuration();conf.set(fs.oss.impl, com.aliyun.fs.oss.nat.NativeOssFileSystem);FileSystem fs = FileSystem.get(path.toUri(), conf); FileStatus fileList = fs.listStatus(path);.Spark开发快速入门Spark接入OSSval conf

26、= new SparkConf().setAppName(Test OSS) conf.set(spark.hadoop.fs.oss.impl, com.aliyun.fs.oss.nat.NativeOssFileSystem) val conf = new SparkConf().setAppName(Test OSS) conf.set(spark.hadoop.fs.oss.impl, com.aliyun.fs.oss.nat.NativeOssFileSystem) val sc = new SparkContext(conf)val pathIn = oss:/accessKe

27、yId:accessKeySecretbucket.endpoint/path/to/read val inputData = sc.textFile(pathIn)val cnt = inputData.count println(scount: $cnt)val outputPath = oss:/accessKeyId:accessKeySecretbucket.endpoint/path/to/write val outpuData = inputData.map(e = s$e has been processed.) outpuData.saveAsTextFile(outputP

28、ath)附录完整示例代码请看:- Spark接入OSSSpark开发快速入门Spark接入ODPS在这一章中,您将学会如何使用E-MapReduce SDK在Spark中完成一次ODPS数据的读写操作。Step-1. 初始化一个OdpsOpsimport com.aliyun.odps.TableSchema import com.aliyun.odps.data.Recordimport com.aliyun.odps.TableSchema import com.aliyun.odps.data.Recordimport org.apache.spark.aliyun.odps.OdpsO

29、ps import org.apache.spark.SparkContext, SparkConfobject Sample def main(args: ArrayString): Unit = / = Step-1 =val accessKeyId = val accessKeySecret = / 以内网地址为例val urls = HYPERLINK /api Seq(/api, HYPERLINK / )val conf = new SparkConf().setAppName(Test Odps)val sc = new SparkContext(conf)val sc = ne

30、w SparkContext(conf)val odpsOps = OdpsOps(sc, accessKeyId, accessKeySecret, urls(0), urls(1)/ 下面是一些调用代码/ = Step-2 =./ = Step-3 =./ = Step-2 =/ 方法定义1/ = Step-3 =/ 方法定义2Step-2. 从ODPS中加载表数据到Spark中/ = Step-2 =val project = val table = / = Step-2 =val project = val table = val numPartitions = 2valinputDa

31、ta=odpsOps.readTable(project,table,read,numPartitions) inputData.top(10).foreach(println)/ = Step-3 =.在上面的代码中,您还需要定义一个read函数,用来解析和预处理ODPS表数据,如下:def read(record: Record, schema: TableSchema): String = record.getString(0)def read(record: Record, schema: TableSchema): String = record.getString(0)这个函数的含

32、义是,将ODPS表的第一列加载到Spark运行环境中。Step-3. 将Spark中的结果数据保存到ODPS表中val resultData = inputData.map(e = s$e has been processed.) odpsOps.saveToTable(project, table, dataRDD, write)通过odpsOps对象的saveToTable方法,可以将Spark RDD持久化到ODPS中。val resultData = inputData.map(e = s$e has been processed.) odpsOps.saveToTable(proje

33、ct, table, dataRDD, write)在上面的代码中,您还需要定义一个write函数,用作写ODPS表前数据预处理,如下:defdefwrite(s:String,emptyReord:Record,schema:TableSchema):Unit= val r =emptyReordr.set(0, s)r.set(0, s)这个函数的含义是将RDD的每一行数据写到对应ODPS表的第一列中。附录完整示例代码请看:- Spark接入ODPSSpark开发快速入门Spark接入ONSval Array(cId, topic, subExpression, parallelism,

34、interval) = args val accessKeyId = val accessKeySecret = val Array(cId, topic, subExpression, parallelism, interval) = args val accessKeyId = val accessKeySecret = val numStreams = parallelism.toIntval batchInterval = Milliseconds(interval.toInt)val conf = new SparkConf().setAppName(Test ONS Streami

35、ng) val ssc = new StreamingContext(conf, batchInterval)def func: Message = ArrayByte = msg = msg.getBody val onsStreams = (0 until numStreams).map i = println(sstarting stream $i)OnsUtils.createStream( ssc,cId, topic,subExpression, accessKeyId, accessKeySecret,StorageLevel.MEMORY_AND_DISK_2, func)va

36、l unionStreams = ssc.union(onsStreams) unionStreams.foreachRDD(rdd = rdd.map(bytes = new String(bytes).flatMap(line = line.split( ).map(word = (word, 1).reduceByKey(_ + _).collect().foreach(e = println(sword: $e._1, cnt: $e._2)ssc.start() ssc.start() ssc.awaitTermination()附录完整示例代码请看:- Spark接入ONSSpar

37、k开发快速入门Spark接入Log Serviceif (args.length 6) System.err.println(if (args.length 6) System.err.println(Usage: TestLoghub | .stripMargin)System.exit(1)vallogserviceProject=args(0) /LogService中project名val logStoreName=args(1)/LogService中logstore名val loghubGroupName = args(2) / loghubGroupName相同的作业将共同消费l

38、ogstore的数据val loghubEndpoint = args(3) / 阿里云日志服务数据类API Endpointval accessKeyId/AccessKeyIdval accessKeySecret/AccessKeySecret val numReceiversargs(4).toInt /Receiver来读取logstore中的数据val batchInterval = Milliseconds(args(5).toInt * 1000) / Spark Streaming中每次处理批次时间间隔val conf = new SparkConf().setAppName

39、(Test Loghub Streaming) val ssc = new StreamingContext(conf, batchInterval)val loghubStream = LoghubUtils.createStream( ssc,loghubProject, logStream, loghubGroupName, endpoint, numReceivers, accessKeyId, accessKeySecret,StorageLevel.MEMORY_AND_DISK)loghubStream.foreachRDD(rdd = println(rdd.count()ss

40、c.start() ssc.start() ssc.awaitTermination()几点说明E-MapReduce的机器(除了Master节点)无法连接公网。配置Log Service endpoint时,请注意使用LogService提供的内网endpoint,否则无法请求到LogService。了解更多关于LogService,请查看相关文档。附录完整示例代码请看:Spark接入LogServiceSpark开发快速入门Spark接入MNSval conf = new SparkConf().setAppName(Test MNS Streaming) val batchInterva

41、l = Seconds(10)val ssc = new StreamingContext(conf, batchInterval)val conf = new SparkConf().setAppName(Test MNS Streaming) val batchInterval = Seconds(10)val ssc = new StreamingContext(conf, batchInterval)val queuename = queuename valaccessKeyId=val accessKeySecret = val endpoint = HYPERLINK http:/

42、xxx.yyy.zzzz/abc http:/xxx.yyy.zzzz/abcvalmnsStream=MnsUtils.createPullingStreamAsRawBytes(ssc,queuename,accessKeyId,accessKeySecret, endpoint,StorageLevel.MEMORY_ONLY) mnsStream.foreachRDD( rdd = rdd.map(bytes = new String(bytes).flatMap(line = line.split( ).map(word = (word, 1).reduceByKey(_ + _).

43、collect().foreach(e = println(sword: $e._1, cnt: $e._2)ssc.start() ssc.awaitTermination()附录完整示例代码请看:Spark接入MNSSpark开发快速入门Spark接入Hbase下面这个例子演示了Spark如何向Hbase写数据。需要指出的是,计算集群需要和Hbase集群处于一个安全组内object ConnectionUtil extends Serializable private val conf = HBaseConfiguration.create()conf.set(HConstants.ZOO

44、KEEPER_QUORUM,ecs1,ecs1,ecs3) conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, /hbase) privateobject ConnectionUtil extends Serializable private val conf = HBaseConfiguration.create()conf.set(HConstants.ZOOKEEPER_QUORUM,ecs1,ecs1,ecs3) conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, /hbase) privatevalconnect

45、ion=ConnectionFactory.createConnection(conf)def getDefaultConn: Connection = connection/ 创 建 数 据 流 unionStreams unionStreams.foreachRDD(rdd = rdd.map(bytes = new String(bytes).flatMap(line = line.split( ).map(word = (word, 1).reduceByKey(_ + _).mapPartitions words = val conn = ConnectionUtil.getDefa

46、ultConn val tableName = TableName.valueOf(tname) val t = conn.getTable(tableName)try words.sliding(100, 100).foreach(slice = val puts = slice.map(word = println(sword: $word)val put = new Put(Bytes.toBytes(word._1 + System.currentTimeMillis() put.addColumn(COLUMN_FAMILY_BYTES, COLUMN_QUALIFIER_BYTES

47、,System.currentTimeMillis(), Bytes.toBytes(word._2) put).toList t.put(puts) finally t.close()Iterator.empty.count().count()ssc.start() ssc.awaitTermination()附录完整示例代码请看:Spark接入HbaseSpark Submit参数设置说明在E-MapReduce中,如果我们要运行一个Spark作业,势必要和它一堆的参数配置打交道。 那么我们该如何按照自己的集群配置情况来设置这些参数呢?下面我们就来介绍一下,如何在我们的E-MapReduc

48、e场景下设置spark-submit的参数首先我们假设我们有这样的一个集群:E-MapReduce产品版本1.1.0Hadoop2.6.0Spark1.6.0Master节点8核16G500G1台Worker节点 x 10台8核16G500G- 10台总资源:8核16G(Worker)x 10 + 8核16G(Master)(由于作业提交的时候资源只计算CPU和内存,所以这里磁盘的大小并未计算到总资源中。)Yarn可分配总资源:12核12.8G(worker)x 10(默认情况下,yarn可分配核=机器核x1.5,yarn可分配内存=机器内存x0.8)提交一个作业有了集群,现在我们要提交我们的

49、Spark作业了首先,我们在E-MapReduce中创建一个作业,类似下面这个:-classorg.apache.spark.examples.SparkPi-classorg.apache.spark.examples.SparkPi-masteryarn-deploy-modeclient-driver-memory4g-num-executors 2-executor-memory2g-executor-cores2/opt/apps/spark-1.6.0-bin-hadoop2.6/lib/spark-examples*.jar10参数说明逐个分析一下:参数参考值说明classorg

50、.apache.spark.examples.S parkPi作业的主类masteryarn因为E-MapReduce使用的Yarn的模式,所以这里只能是yarn模式yarn-client等 同 于 -master yarn - deploy-mode client 此时不需要指定deploy-modeyarn-cluster等 同 于 -master yarn - deploy-mode cluster 此时不需要指定deploy-modedeploy-modeclientclient模式表示作业的AM会放在Master节点上运行。要注意的是,如果设置这个参数,那么需要同时指定上面maste

51、r为yarn。clustercluster模式表示AM会随机的在worker节点中的任意一台上启动运行。要注意的是,如果设置这个参数,那么需要同时指定上面master为yarn。driver-memory4gdriver使用的内存,不可超过单机的core总数num-executors2创建多少个executorexecutor-memory2g各个executor使用的最大内存,不可超过单机的最大可使用内存executor-cores2各个executor使用的并发线程数目,也即每个executor最大可并发执行的Task数目资源计算当我们在不同模式,不同的设置下运行的时候,作业使用的资源情况

52、如下表所示,yarn-client模式的资源计算节点资源类型资源量(结果使用上面的例子计算得到)mastercore1memdriver-memroy = 4gworkercorenum-executors * executor- cores = 4memnum-executors * executor- memory = 4g作业主程序(Driver程序)会在master节点上执行。按照作业配置将分配4g(由-driver-memroy指 定)的内存给它(当然实际上可能没有用到)。会在worker节点上起2个(由-num-executors指定)executor,每一个executor最大能

53、分配2g(由-executor-memory指定)的内存,并最大支持2个(由-executor-cores指定)task的并发执行。yarn-cluster模式的资源计算节点资源类型资源量(结果使用上面的例子计算得到)master一个很小的client程序,负责同步job信息,占用很小。workercorenum-executors * executor- cores+spark.driver.cores = 5memnum-executors*executor- memory+driver-memroy= 8g注意:这里的spark.driver.cores默认是1,也可以设置为更多资源使用

54、的优化yarn-client那么假设我们有了一个大作业,使用yarn-client模式,想要多用一些这个集群的资源,我们要如何设置呢? 注意:Spark在分配内存时,会在用户设定的内存值上溢出375M或7%(取大值)。Yarn分配container内存时,遵循向上取整的原则,这里也就是需要满足1G的整数倍。-master yarn-client -driver-memory 5g -num-executors 20 -executor-memory 4g -executor-cores 4下面给出了一个配置值:-master yarn-client -driver-memory 5g -num

55、-executors 20 -executor-memory 4g -executor-cores 4按照我们上面的资源计算公式,mastercore:1mem:6g(5g+375m向上取整为6g)workerscore: 20*4 =80mem20*5g(4g+375m向上取整为5g100G可以看到总的资源没有超过我们的集群的总资源-master yarn-client -driver-memory 5g -num-executors 40 -executor-memory 1g -executor-cores 2-master yarn-client -driver-memory 5g -

56、num-executors 15 -executor-memory 4g -executor-cores 4-master yarn-client -driver-memory 5g -num-executors 40 -executor-memory 1g -executor-cores 2-master yarn-client -driver-memory 5g -num-executors 15 -executor-memory 4g -executor-cores 4-master yarn-client -driver-memory 5g -num-executors 10 -exe

57、cutor-memory 9g -executor-cores 6原则上,按照上面的公式计算出来的需要资源不超过集群的最大资源量就可以 但是在实际场景中,因为系统,hdfs以及E-MapReduce的服务会需要使用core和mem资源,如果把core和mem都占用完了,反而会导致 性能的下降,甚至无法运行。 executor-cores数一般也都会设置的和集群的可使用核一致,因为如果设置的太多,cpu频繁的切换性能并不会提高。yarn-cluster当使用yarn-cluster模式后,Driver程序会被放到worker节点上。资源会占用到worker的资源池子里面,这个 时候想要多用一些这

58、个集群的资源,我们要如何设置呢?-master yarn-cluster -driver-memory 5g -num-executors 15 -executor-memory 4g -executor-cores 4下面给出了一个配置值:-master yarn-cluster -driver-memory 5g -num-executors 15 -executor-memory 4g -executor-cores 4一些配置建议如果将内存设置的很大,要注意gc所产生的消耗。一般我们会推荐一个executor的内存=64g如果是进行HDFS读写的作业,建议是每个executor中使用=

59、5个并发来读写。如果是进行OSS读写的作业,我们建议是将executor分布在不同的ECS上,这样可以将每一个ECS的 带宽都用上。比如有10台ECS,那么就可以配置num-executors=10,并设置合理的内存和并发。如果作业中使用了非线程安全的代码,那么在设置executor-cores的时候需要注意多并发是否会造成 作业的不正常。如果会,那么推荐就设置executor-cores=1Aliyun Spark SDK ReleaseNote说明与OSS数据源的交互,默认已经存在集群的运行环境中,用户作业 将emr-core打进去,或者要保持和集群中的emr-core版本一致。emr-s

60、dk_2.10包:实现Spark与阿里云其他数据源的交互,例如LogService,MNS,ONS和ODPS等等。用户作业打包时 必须 将emr-sdk_2.10打包进去,否则会出现相关类找不到的错。com.aliyun.emremr-core1.1.2com.aliyun.emremr-sdk_v1.1.2解决作业慢读写OSS出现的ConnectionClosedException问题。解决OSS数据源时部分hadoop命令不可用问题。解决java.text.ParseException:Unparseabledate问题。优化emr-core支持本地调试运行。兼容老版本的

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论