Hadoop数据分析与应用MapReduce_第1页
Hadoop数据分析与应用MapReduce_第2页
Hadoop数据分析与应用MapReduce_第3页
Hadoop数据分析与应用MapReduce_第4页
Hadoop数据分析与应用MapReduce_第5页
已阅读5页,还剩47页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

第3章MapReduce(一)Hadoop数据分析与应用回顾Hadoop分布式模式配置与启动步骤:配置Hadoop配置文件,包括core-site.xml、hdfs-site.xml、mapred-site.xml、yarn-site.xml和slaves远程拷贝资源,将Master上的jdk文件夹、hadoop文件夹、hadoop-record文件夹以及环境变量hadoop-eco.sh远程拷贝到其他机器上关闭服务器防火墙格式化NameNode系统启动HDFS启动YARN通过网页管理端访问回顾HDFS的设计特点:能够存储超大文件流式数据访问商用硬件不能处理低时间延迟的数据访问不能存放大量小文件无法高效实现多用户写入或者任意修改文件封装MapReduce执行流程MapReduce编写与运行Hadoop序列化与反序列化Hadoop分区使用Hadoop内置jar文件进行单词计数统计编写MapReduce实现气象数据最高温与最低温统计使用Hadoop序列化与反序列化进行用户流量统计使用Hadoop分区根据用户类型进行分区统计定义属性MapReduce的概念MapReduce的功能MapReduce键值对MapReduce执行流程MapReduce程序运行使用Hadoop内置jar文件进行单词计数统计25203.1.1MapReduce的概念MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。概念Map(映射)和Reduce(归约)是它们的主要思想,都是从函数式和矢量编程语言里借来的特性,它极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。HadoopMapReduce是一个易于编写应用程序的软件框架,该应用程序以可靠、容错的方式并行处理大量硬件(数千个节点)上的大量数据(多兆字节数据集)。3.1.2MapReduce的功能MapReduce提供了以下主要功能:数据拆分和计算任务调度数据与代码互定位系统优化出错检测和恢复3.1.3MapReduce键值对MapReduce的<key,value>存储模型能够存储任意格式的数据,Map()方法和reduce()方法可以进行各种复杂的数据处理,这也使得程序员的负担加重,在对上层业务的开发效率上不如SQL简单。MapReduce在速度上会占有一定优势,MapReduce是为非结构化大数据的复杂处理而设计的,这些业务具有一次性处理的特点;此外由于采取了全数据扫描的模式以及对中间结果逐步汇总的策略,使其在拥有良好扩展能力和容错能力的同时,也导致了较高的磁盘和网络I/O的负载以及较高的数据解析代价。3.1.4MapReduce执行流程MapReduce执行流程是先执行Map后执行Reduce的过程3.1.4MapReduce执行流程Map任务处理读取HDFS中的文件,每一行解析成一个<k,v>,每一个键值对调用一次map()方法覆盖map()方法,接收上一步产生的<k,v>进行处理,转换为新的<k,v>输出对上一步输出的<k,v>进行分区,并默认分为一个区对不同分区中的数据按照k进行排序和分组。分组指的是相同key的value放到一个集合中对分组后的数据进行归约(可选)3.1.4MapReduce执行流程Reduce任务处理多个Map任务的输出,按照不同的分区,通过网络Copy到不同的Reduce节点上对多个Map的输出进行合并和排序将Reduce输出的<k,v>写到HDFS中3.1.5MapReduce程序运行Hadoop的发布包中内置了一个hadoop-mapreduce-examples-2.7.3.jar供用户使用,该安装包位于安装目录下的share/hadoop/mapreduce文件夹中,该jar包中有各种MR示例程序3.1.5MapReduce程序运行以单词计数为例,可以通过以下步骤运行:启动HDFS与YARN。上传需要统计的文件到HDFS下的data目录。在集群中的任意一台服务器上启动执行程序。输入:hadoopjarhadoop-mapreduce-example-2.7.3.jarwordcount/data/output3.1.6学生实践练习上传一个英文文档,使用hadoop-mapreduce-example-2.7.3.jar对英文文档进行单词计数的统计。253.1.6学生实践练习上传文件到HDFS系统;使用“hadoopfs-put<localsrc>...<dst>”命令将文件上传到HDFS。使用“hadoopjar”命令执行Java程序;使用hadoop-mapreduce-example-2.7.3.jar这个jar包中的wordcount()方法进行单词计数。最后使用浏览器通过50070端口访问站点,并在文件系统中下载文件,查看统计结果。Hadoop分布式模式搭建新旧API的差异MapReduceJavaAPI的介绍编写MapReduce程序运行MapReduce程序Combiner的介绍编写MapReduce实现气象数据最高温与最低温统计25203.2.1新旧API的差异adoop在0.20.0版本中第一次使用新的API,部分早期的Hadoop0.20.0版本不支持使用旧的API,但在接下来的Hadoop1.x和Hadoop2.x版本中新旧API都可以使用3.2.1新旧API的差异新旧API的差异主要有以下9点:新的API放在org.apache.hadoop.mapreduce包中,旧的API放在org.apache.hadoop.mapred中新API倾向于使用抽象类,而不是接口,更有利于扩展新API充分使用上下文对象context,允许用户能与MapReduce系统通信键值对记录在这两类API中都被推给了Mapper和Reducer,除此之外,新的API通过重写run()方法,允许Mapper和Reducer控制数据执行流程,允许数据按条处理或者分批处理。旧的API只在Map中允许。新的API中,作业控制由Job类实现,而非旧API中的JobClient类,新的API中删除了JobClient类新增的API实现了配置的统一输出文件的命名方式略有不同新API中的用户重载函数(java.lang.InterruptedException)被声明为抛出异常,这意味着可以用代码来实现中断响应。在新的API中,reduce()传递的值是java.lang.Iterable类型的,而非旧API中传递的java.lang.Iterator类型。这一改变使我们更容易通过Java的for-each循环结构来迭代这些值。3.2.2MapReduceJavaAPI的介绍org.apache.hadoop.mapreduce包提供了Mapper和Reduce基类。在大多数情况下,MapReduce作业会使用以上两个基类实现针对该作业的Mapper和Reduce的子类。Mapper类:Mappermaps将键值对输入到一组中间的键值对。maps是将输入记录转换为中间记录的单个任务,转换后的中间记录与输入记录的类型不一定是相同的。常见方法:map()、setup()、cleanup()、run()Reducer类:Reducerreduces是一组中间值,这些值通过key组成较小的单元。作业的reduces数由用户通过Java.StUnNoTrimeTebug(int)方法设置。常见方法:reduce()、setup()、cleanup()、run()Driver类:负责与Hadoop框架通信,并指定运行MapReduce作业所需的配置元素的驱动程序3.2.3编写MapReduce程序实现一个WordCount程序,并将WordCount程序打包发布到Hadoop集群中运行。publicclassWCMapextendsMapper<LongWritable,Text,Text,LongWritable>{@Overrideprotectedvoidmap(LongWritablekey,Textvalue,Mapper<LongWritable,Text,Text,LongWritable>.Contextcontext)throwsIOException,InterruptedException{ String[]splited=value.toString().split(""); for(Stringstr:splited) { context.write(newText(str),newLongWritable(1)); }}}创建Mapper子类继承Mapper基类3.2.3编写MapReduce程序publicclassWCReduceextendsReducer<Text,LongWritable,Text,LongWritable>{@Overrideprotectedvoidreduce(Texttext,Iterable<LongWritable>iterable,Reducer<Text,LongWritable,Text,LongWritable>.Contextcontext)throwsIOException,InterruptedException{ longtime=0; for(LongWritablelw:iterable) { time+=lw.get(); } context.write(text,newLongWritable(time));}}创建Reducer子类继承Reducer基类3.2.3编写MapReduce程序publicstaticvoidmain(String[]args)throwsIOException,ClassNotFoundException,InterruptedException{Configurationconf=newConfiguration();Jobjob=Job.getInstance(conf);job.setJarByClass(MapReduceDemo.class);job.setMapperClass(WCMap.class);job.setReducerClass(WCReduce.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(LongWritable.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(LongWritable.class);PathinputPath=newPath("/data");PathoutputPath=newPath("/output");FileInputFormat.setInputPaths(job,inputPath);FileOutputFormat.setOutputPath(job,outputPath);booleanwaitForCompletion=job.waitForCompletion(true);System.exit(waitForCompletion?0:1);}创建Driver对象并封装MapReduce作业类的相关方法3.2.4运行MapReduce程序程序在Hadoop平台上运行的步骤如下:导出jar文件执行jar文件使用“put”命令上传需要统计的文档到HDFS中执行jar文件的命令:hadoopjarwc.jarcom.purple_bull.bigdata.mr.MapReduceDemo3.2.5Combiner的介绍在MapReduce中,当Map生成的数据过大时,带宽就成了瓶颈,当Map发送给Reduce时,对数据进行一次本地合并来减少数据传输量以提高网络I/O性能;Combiner最基本的作用是实现本地key的聚合,有“本地Reduce”之称,本质上就是一个Reducer3.2.5Combiner的介绍修改Driver类,在Mapper类和Reducer类定义之间加入下列内容:job.setCombinerClass(WCReduce.class); 3.2.6学生实践练习根据气象信息,获取全年的最高温和最低温信息253.2.6学生实践练习上传气象数据到HDFS中根据气象信息数据的相关特点,编写Mapper类根据Mapper输出的信息编写Reducer创建Driver对象并封装MapReduce作业类的相关方法使用hadoopjar命令运行程序,进行最高温和最低温的统计通过50070端口进入网页,下载结果数据Hadoop序列化与反序列化Java序列化和Hadoop序列化简介Writable的分析与使用WritableComparable的分析与使用使用Hadoop序列化与反序列化进行用户流量统计25203.3.1Java序列化和Hadoop序列化简介Java序列化需要实现serializable接口,进而实现序列化与反序列化,Hadoop序列化与反序列化需要实现Writable的接口Java序列化与反序列化优点:实现简便,对于循环引用和重复引用的情况也能处理,允许一定程度上类成员的改变。支持加密和验证缺点:序列化后的对象占用空间过大,数据膨胀。反序列化会不断创建新的对象。同一个类的对象的序列化结果只输出一份元数据,导致了文件不能分割。Hadoop序列化与反序列化排列紧凑:尽量减少带宽,加快数据交换速度处理快速:进程间通信需要大量的数据交互,使用大量的序列化机制,必须减少序列化和反序列的开支跨语言:可以支持不同语言间的数据交互可扩展:当系统协议升级,类定义发生变化时,序列化机制需要支持这些升级和变化3.3.2Writable的分析与使用Writable位于org.apache.hadoop.io包下,该接口有两个方法,分别为write()方法和readFields()方法,接口定义如下:publicinterfaceWritable{voidwrite(DataOutputout)throwsIOException;voidreadFields(DataInputin)throwsIOException;}3.3.2Writable的分析与使用在开发过程中经常涉及到复杂类型参数的传递,此时类的序列化和反序列化就需要实现Writable中的方法以全国高考成绩为例,获取文科和理科平均分最高的考生信息,数据结构说明见表:姓名考号身份证号语文数学英语综合文/理张*梅

030000000****34209842001082****01421321402800刘*晓022000324****26209842001082****01261029419813.3.2Writable的分析与使用publicclassScoreEntityimplementsWritable{privateStringuserName;privateStringuserCandidateNumber;privateStringuserIdCard;privateIntegeruserChinese;privateIntegeruserMathematical;privateIntegeruserEnglish;privateIntegeruserSynthesis;privatefloatuserAverageScore;privateIntegeruserIs;publicScoreEntity(){}//构造方法省略…//接下页}考生成绩信息实体类3.3.2Writable的分析与使用publicclassScoreEntityimplementsWritable{//接上页@Overridepublicvoidwrite(DataOutputout)throwsIOException{ out.writeUTF(userName); out.writeUTF(userCandidateNumber); out.writeUTF(userIdCard); out.writeInt(userChinese); out.writeInt(userMathematical); out.writeInt(userEnglish); out.writeInt(userSynthesis); out.writeFloat(userAverageScore); out.writeInt(userIs);}//接下页}序列化方法3.3.2Writable的分析与使用publicclassScoreEntityimplementsWritable{//接上页@OverridepublicvoidreadFields(DataInputin)throwsIOException{ userName=in.readUTF(); userCandidateNumber=in.readUTF(); userIdCard=in.readUTF(); userChinese=in.readInt(); userMathematical=in.readInt(); userEnglish=in.readInt(); userSynthesis=in.readInt(); userAverageScore=in.readFloat(); userIs=in.readInt();}//toString方法省略…//省略属性的get和set方法}反序列化方法3.3.2Writable的分析与使用publicclassScoreEntityimplementsWritable{//接上页@OverridepublicvoidreadFields(DataInputin)throwsIOException{ userName=in.readUTF(); userCandidateNumber=in.readUTF(); userIdCard=in.readUTF(); userChinese=in.readInt(); userMathematical=in.readInt(); userEnglish=in.readInt(); userSynthesis=in.readInt(); userAverageScore=in.readFloat(); userIs=in.readInt();}//toString方法省略…//省略属性的get和set方法}反序列化方法3.3.2Writable的分析与使用publicclassScoreMapperextendsMapper<LongWritable,Text,Text,ScoreEntity>{protectedvoidmap(LongWritablekey,Textvalue,Contextcontext)throwsIOException,InterruptedException{String[]fields=value.toString().split("\t");try{ ScoreEntityentity=newScoreEntity(); entity.setUserName(fields[0]); entity.setUserCandidateNumber(fields[1]); entity.setUserIdCard(fields[2]); entity.setUserChinese(Integer.parseInt(fields[3])); entity.setUserMathematical(Integer.parseInt(fields[4])); entity.setUserEnglish(Integer.parseInt(fields[5])); entity.setUserSynthesis(Integer.parseInt(fields[6])); entity.setUserIs(Integer.parseInt(fields[7])); entity.setUserAverageScore((entity.getUserChinese()+entity.getUserEnglish()+entity. getUserMathematical()+entity.getUserSynthesis())/4); if(Integer.parseInt(fields[7])==1){ context.write(newText("文科"),entity); }else{ context.write(newText("理科"),entity); }}catch(Exceptione){ CountercountPrint=context.getCounter("Map-Exception",e.toString()); countPrint.increment(1l); }}}创建Mapper子类继承Mapper基类3.3.2Writable的分析与使用publicclassScoreReduceextendsReducer<Text,ScoreEntity,Text,ScoreEntity>{@Overrideprotectedvoidreduce(Texttext,Iterable<ScoreEntity>iterable,Contextcontext)throwsIOException,InterruptedException{try{ ScoreEntityentity=null; if(text.toString().equals("文科")){ for(ScoreEntityscoreEntity:iterable){ if(entity==null){ entity=scoreEntity; }else{ if(entity.getUserAverageScore()<scoreEntity.getUserAverageScore()){ entity=scoreEntity;}}} context.write(newText("文科"),entity); }else{ for(ScoreEntityscoreEntity:iterable){ if(entity==null){ entity=scoreEntity; }else{ if(entity.getUserAverageScore()<scoreEntity.getUserAverageScore()){ entity=scoreEntity;}}} context.write(newText("理科"),entity);}}catch(Exceptione){ CountercountPrint=context.getCounter("Reduce-OutValue",e.getMessage()); countPrint.increment(1l);}}}创建Reducer子类继承Reducer基类3.3.2Writable的分析与使用publicclassScoreDemo{publicstaticvoidmain(String[]args)throwsException{ Configurationconf=newConfiguration(); Jobjob=Job.getInstance(conf); job.setJarByClass(ScoreDemo.class); job.setMapperClass(ScoreMapper.class); job.setReducerClass(ScoreReduce.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(ScoreEntity.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(ScoreEntity.class); PathinputPath=newPath("/data"); PathoutputPath=newPath("/output"); outputPath.getFileSystem(conf).delete(outputPath,true); FileInputFormat.setInputPaths(job,inputPath); FileOutputFormat.setOutputPath(job,outputPath); booleanwaitForCompletion=job.waitForCompletion(true); System.exit(waitForCompletion?0:1);}}创建Driver对象并封装MapReduce作业类的相关方法3.3.3WritableComparable的分析与使用在进行MapReduce编程时,key键往往用于分组或排序,在进行某些操作时,当Hadoop内置的key键数据类型无法满足需求时,或当它针对用例优化自定义数据类型时,可以通过实现org.apache.hadoop.io.WritableComparable接口定义一个自定义的WritableComparable类型,并将其作为MapReduce计算模型的key键数据类型WritableComparable位于org.apache.hadoop.io包下,该接口有3个方法,分别为write()方法、readFields()方法和compareTo()方法。3.3.3WritableComparable的分析与使用以全国各中学学生们的语文、数学和英语的成绩进行汇总排序姓名语文数学英语XXX

142132140XX126102943.3.3WritableComparable的分析与使用privateStringuserName;privateIntegeruserChinese;privateIntegeruserMathematical;privateIntegeruserEnglish;privateIntegeruserTotalScore;@OverridepublicintcompareTo(ScoreSortEntityo){ returnthis.userTotalScore>o.userTotalScore?-1:1;}创建考生成绩信息实体类,实现compareTo()方法3.3.3WritableComparable的分析与使用ScoreSortEntityentity=newScoreSortEntity( fields[0], Integer.parseInt(fields[1]), Integer.parseInt(fields[2]), Integer.parseInt(fields[3]));context.write(entity,newText(fields[0]));创建Mapper子类继承Mapper基类context.writ

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论