版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
第七章MapReduce提要7.1概述7.2MapReduce体系构造7.3MapReduce工作流程7.4实例分析:WordCount7.5MapReduce的详细应用7.6MapReduce编程实践7.1 概述7.1.1 分布式并行编程7.1.2 MapReduce模型简介7.1.3 Map和Reduce函数7.1.1 分布式并行编程“摩尔定律”,CPU性能大约每隔18个月翻一番从2023年开始摩尔定律逐渐失效,需要处理的数据量迅速增长,人们开始借助于分布式并行编程来提升程序性能分布式程序运营在大规模计算机集群上,能够并行执行大规模数据处理任务,从而取得海量的计算能力google企业最先提出了分布式并行编程模型MapReduce,HadoopMapReduce是它的开源实现,后者比前者使用门槛低诸多7.1.1 分布式并行编程问题:在MapReduce出现之前,已经有像MPI这么非常成熟的并行计算框架了,那么为何Google还需要MapReduce?MapReduce相较于老式的并行计算框架有什么优势?传统并行计算框架MapReduce集群架构/容错性共享式(共享内存/共享存储),容错性差非共享式,容错性好硬件/价格/扩展性刀片服务器、高速网、SAN,价格贵,扩展性差普通PC机,便宜,扩展性好编程/学习难度what-how,难what,简单适用场景实时、细粒度计算、计算密集型批处理、非实时、数据密集型7.1.2 MapReduce模型简介MapReduce将复杂的、运营于大规模集群上的并行计算过程高度地抽象到了两个函数:Map和Reduce编程轻易,不需要掌握分布式并行编程细节,也能够很轻易把自己的程序运营在分布式系统上,完毕海量数据的计算MapReduce采用“分而治之”策略,一种存储在分布式文件系统中的大规模数据集,会被切提成许多独立的分片(split),这些分片能够被多种Map任务并行处理MapReduce设计的一种理念就是“计算向数据靠拢”,而不是“数据向计算靠拢”,因为,移动数据需要大量的网络传播开销MapReduce框架采用了Master/Slave架构,涉及一种Master和若干个Slave。Master上运营JobTracker,Slave上运营TaskTrackerHadoop框架是用Java实现的,但是,MapReduce应用程序则不一定要用Java来写7.1.3 Map和Reduce函数函数输入输出说明Map<k1,v1>如:<行号,”abc”>List(<k2,v2>)如:<“a”,1><“b”,1><“c”,1>1.将小数据集进一步解析成一批<key,value>对,输入Map函数中进行处理2.每一个输入的<k1,v1>会输出一批<k2,v2>。<k2,v2>是计算的中间结果Reduce<k2,List(v2)>如:<“a”,<1,1,1>><k3,v3><“a”,3>输入的中间结果<k2,List(v2)>中的List(v2)表示是一批属于同一个k2的value7.2MapReduce的体系构造MapReduce体系构造主要由四个部分构成,分别是:Client、JobTracker、TaskTracker以及Task7.2MapReduce的体系构造MapReduce主要有如下4个部分构成:1)Client顾客编写的MapReduce程序经过Client提交到JobTracker端顾客可经过Client提供的某些接口查看作业运营状态2)JobTrackerJobTracker负责资源监控和作业调度JobTracker监控全部TaskTracker与Job的健康情况,一旦发觉失败,就将相应的任务转移到其他节点JobTracker会跟踪任务的执行进度、资源使用量等信息,并将这些信息告诉任务调度器(TaskScheduler),而调度器会在资源出现空闲时,选择合适的任务去使用这些资源7.2MapReduce的体系构造3)TaskTrackerTaskTracker会周期性地经过“心跳”将本节点上资源的使用情况和任务的运营进度报告给JobTracker,同步接受JobTracker发送过来的命令并执行相应的操作(如开启新任务、杀死任务等)TaskTracker使用“slot”等量划分本节点上的资源量(CPU、内存等)。一种Task获取到一种slot后才有机会运营,而Hadoop调度器的作用就是将各个TaskTracker上的空闲slot分配给Task使用。slot分为Mapslot和Reduceslot两种,分别供MapTask和ReduceTask使用4)TaskTask分为MapTask和ReduceTask两种,均由TaskTracker开启7.3 MapReduce工作流程7.3.1 工作流程概述7.3.2 MapReduce各个执行阶段7.3.3 Shuffle过程详解7.3.1 工作流程概述图7-1MapReduce工作流程Shuffle7.3.1 工作流程概述不同的Map任务之间不会进行通信不同的Reduce任务之间也不会发生任何信息互换顾客不能显式地从一台机器向另一台机器发送消息全部的数据互换都是经过MapReduce框架本身去实现的7.3.2 MapReduce各个执行阶段7.3.2 MapReduce各个执行阶段HDFS以固定大小的block为基本单位存储数据,而对于MapReduce而言,其处理单位是split。split是一种逻辑概念,它只涉及某些元数据信息,例如数据起始位置、数据长度、数据所在节点等。它的划分措施完全由顾客自己决定。有关Split(分片)7.3.2 MapReduce各个执行阶段Reduce任务的数量最优的Reduce任务个数取决于集群中可用的reduce任务槽(slot)的数目一般设置比reduce任务槽数目稍微小某些的Reduce任务个数(这么能够预留某些系统资源处理可能发生的错误)Map任务的数量Hadoop为每个split创建一种Map任务,split的多少决定了Map任务的数目。大多数情况下,理想的分片大小是一种HDFS块7.3.3 Shuffle过程详解图7-3Shuffle过程
1.Shuffle过程简介7.3.3 Shuffle过程详解2.Map端的Shuffle过程每个Map任务分配一种缓存MapReduce默认100MB缓存设置溢写百分比0.8分区默认采用哈希函数排序是默认的操作排序后能够合并(Combine)合并不能变化最终止果在Map任务全部结束之迈进行归并归并得到一种大的文件,放在本地磁盘文件归并时,假如溢写文件数量不小于预定值(默认是3)则能够再次开启Combiner,少于3不需要JobTracker会一直监测Map任务的执行,并告知Reduce任务来领取数据合并(Combine)和归并(Merge)的区别:两个键值对<“a”,1>和<“a”,1>,假如合并,会得到<“a”,2>,假如归并,会得到<“a”,<1,1>>7.3.3 Shuffle过程详解3.Reduce端的Shuffle过程Reduce任务经过RPC向JobTracker问询Map任务是否已经完毕,若完毕,则领取数据Reduce领取数据先放入缓存,来自不同Map机器,先归并,再合并,写入磁盘多种溢写文件归并成一种或多种大文件,文件中的键值对是排序的当数据极少时,不需要溢写到磁盘,直接在缓存中归并,然后输出给Reduce7.3.3 Shuffle过程详解3.Reduce端的Shuffle过程图7-5Reduce端的Shuffle过程7.3.4 MapReduce应用程序执行过程7.4 实例分析:WordCount7.4.1 WordCount程序任务7.4.2 WordCount设计思绪7.4.3 一种WordCount执行过程的实例7.4.1 WordCount程序任务表7-2WordCount程序任务程序WordCount输入一个包含大量单词的文本文件输出文件中每个单词及其出现次数(频数),并按照单词字母顺序排序,每个单词和其频数占一行,单词和频数之间有间隔表7-3一种WordCount的输入和输出实例输入输出HelloWorldHelloHadoopHelloMapReduceHadoop1Hello3MapReduce1World17.4.2 WordCount设计思绪首先,需要检验WordCount程序任务是否能够采用MapReduce来实现其次,拟定MapReduce程序的设计思绪最终,拟定MapReduce程序的执行过程7.4.3 一种WordCount执行过程的实例图7-7Map过程示意图7.4.3 一种WordCount执行过程的实例图7-8顾客没有定义Combiner时的Reduce过程示意图7.4.3 一种WordCount执行过程的实例图7-9顾客有定义Combiner时的Reduce过程示意图7.5MapReduce的详细应用MapReduce能够很好地应用于多种计算问题关系代数运算(选择、投影、并、交、差、连接)分组与聚合运算矩阵-向量乘法矩阵乘法7.5MapReduce的详细应用用MapReduce实现关系的自然连接7.5MapReduce的详细应用用MapReduce实现关系的自然连接假设有关系R(A,B)和S(B,C),对两者进行自然连接操作使用Map过程,把来自R的每个元组<a,b>转换成一种键值对<b,<R,a>>,其中的键就是属性B的值。把关系R涉及到值中,这么做使得我们能够在Reduce阶段,只把那些来自R的元组和来自S的元组进行匹配。类似地,使用Map过程,把来自S的每个元组<b,c>,转换成一种键值对<b,<S,c>>全部具有相同B值的元组被发送到同一种Reduce进程中,Reduce进程的任务是,把来自关系R和S的、具有相同属性B值的元组进行合并Reduce进程的输出则是连接后的元组<a,b,c>,输出被写到一种单独的输出文件中7.5MapReduce的详细应用用MapReduce实现关系的自然连接7.6 MapReduce编程实践7.6.1 任务要求7.6.2 编写Map处理逻辑7.6.3 编写Reduce处理逻辑7.6.4 编写main措施7.6.5 编译打包代码以及运营程序7.6.6Hadoop中执行MapReduce任务的几种方式7.6.1任务要求文件A的内容如下:ChinaismymotherlandIloveChina文件B的内容如下:IamfromChina期望成果如右侧所示:I2is1China3my1love1am1from1motherland17.6.2编写Map处理逻辑Map输入类型为<key,value>期望的Map输出类型为<单词,出现次数>Map输入类型最终拟定为<Object,Text>Map输出类型最终拟定为<Text,IntWritable>publicstaticclassMyMapperextendsMapper<Object,Text,Text,IntWritable>{privatefinalstaticIntWritableone=newIntWritable(1);privateTextword=newText();publicvoidmap(Objectkey,Textvalue,Contextcontext)throwsIOException,InterruptedException{ StringTokenizeritr=newStringTokenizer(value.toString()); while(itr.hasMoreTokens()){ word.set(itr.nextToken()); context.write(word,one); }}}7.6.3编写Reduce处理逻辑在Reduce处理数据之前,Map的成果首先经过Shuffle阶段进行整顿Reduce阶段的任务:对输入数字序列进行求和Reduce的输入数据为<key,Iterable容器>Reduce任务的输入数据:<”I”,<1,1>><”is”,1>……<”from”,1><”China”,<1,1,1>>publicstaticclassMyReducerextendsReducer<Text,IntWritable,Text,IntWritable>{privateIntWritableresult=newIntWritable();publicvoidreduce(Textkey,Iterable<IntWritable>values,Contextcontext)throwsIOException,InterruptedException{ intsum=0; for(IntWritableval:values){ sum+=val.get(); } result.set(sum);context.write(key,result);}}7.6.3编写Reduce处理逻辑7.6.4编写main措施publicstaticvoidmain(String[]args)throwsException{Configurationconf=newConfiguration();//程序运营时参数String[]otherArgs=newGenericOptionsParser(conf,args).getRemainingArgs();if(otherArgs.length!=2){System.err.println("Usage:wordcount<in><out>");System.exit(2);}Jobjob=newJob(conf,"wordcount");//设置环境参数job.setJarByClass(WordCount.class);//设置整个程序的类名job.setMapperClass(MyMapper.class);//添加MyMapper类job.setReducerClass(MyReducer.class);//添加MyReducer类job.setOutputKeyClass(Text.class);//设置输出类型job.setOutputValueClass(IntWritable.class);//设置输出类型FileInputFormat.addInputPath(job,newPath(otherArgs[0]));//设置输入文件FileOutputFormat.setOutputPath(job,newPath(otherArgs[1]));//设置输出文件System.exit(job.waitForCompletion(true)?0:1);}importjava.io.IOException;importjava.util.StringTokenizer;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.Reducer;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;importorg.apache.hadoop.util.GenericOptionsParser;publicclassWordCount{//WordCount类的详细代码见下一页}完整代码publicclassWordCount{publicstaticclassMyMapperextendsMapper<Object,Text,Text,IntWritable>{privatefinalstaticIntWritableone=newIntWritable(1);privateTextword=newText();publicvoidmap(Objectkey,Textvalue,Contextcontext)throwsIOException,InterruptedException{StringTokenizeritr=newStringTokenizer(value.toString()); while(itr.hasMoreTokens()){ word.set(itr.nextToken()); context.write(word,one); }}} publicstaticclassMyReducerextendsReducer<Text,IntWritable,Text,IntWritable>{privateIntWritableresult=newIntWritable();publicvoidreduce(Textkey,Iterable<IntWritable>values,Contextcontext)throwsIOException,InterruptedException{ intsum=0; for(IntWritableval:values){ sum+=val.get(); } result.set(sum); context.write(key,result);}} publicstaticvoidmain(String[]args)throwsException{ Configurationconf=newConfiguration(); String[]otherArgs=newGenericOptionsParser(conf,args).getRemainingArgs(); if(otherArgs.length!=2){ System.err.println("Usage:wordcount<in><out>"); System.exit(2); } Jobjob=newJob(conf,"wordcount"); job.setJarByClass(WordCount.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job,newPath(otherArgs[0])); FileOutputFormat.setOutputPath(job,newPath(otherArgs[1]));System.exit(job.waitForCompletion(true)?0:1);}}7.6.5编译打包代码以及运营程序试验环节:使用java编译程序,生成.class文件将.class文件打包为jar包运营jar包(需要开启Hadoop)查看成果7.6.5编译打包代码以及运营程序Hadoop2.x版本中的依赖jarHadoop2.x版本中jar不再集中在一种hadoop-core*.jar中,而是提成多种jar,如使用Hadoop
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 莱施尼汉综合征护理查房
- 颈部皮肤良性肿瘤护理查房
- IC卡智能电表项目可行性研究报告
- 扩张器外露护理查房
- 大熊产品运营方案设计
- 公司运营培训班方案
- 杰士邦运营方案
- 运营群游戏互动方案策划
- 潮玩直播运营方案设计
- 海边帐篷营地运营方案
- 2025中国铁路南宁局集团有限公司招聘高校毕业生53人笔试历年参考题库附带答案详解
- 四川省内江市高2026届适应性训练试题(内江三模)历史+答案
- 2026中共仁寿县委政法委员会招聘专职网格员184人备考题库(四川)附答案详解(模拟题)
- (二模)呼和浩特市2026年高三年级第二次模拟考试英语试卷(含答案)
- 2026上半年安徽黄山市休宁城乡建设投资集团有限公司及权属子公司招聘18人笔试历年参考题库附带答案详解
- 统编人教五年级语文下册《杨氏之子》教学课件
- 编制说明-矿产资源规划数据质量检查与汇交规范
- 充电桩日常维护手册
- 2026届新高考语文三轮热点复习:二元思辨作文指导
- 河北省石家庄市2026年小升初入学分班考试数学试卷解析及答案
- 煤矿乳化泵维修培训课件
评论
0/150
提交评论