版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
认识MapReduce编程模型认识MapReduce编程模型1主要内容MapReduce编程模型简介WordCount编程实例HadoopMapReduce架构MapReduce实战开发主要内容MapReduce编程模型简介2MapReduce编程模型简介MapReduce是一种可用于数据处理的编程模型。该模型比较简单,但用于编写有用的程序并不简单。Hadoop可以运行由各种语言编写的MapReduce程序。例如:Java、Ruby、Python和C++语言等。最重要的是,MapReduce程序本质上是并行运行的,因此可以将大规模的数据分析任务交给任何一个拥有足够多机器的运行商。MapReduce的优势在于处理大规模数据集。MapReduce编程模型简介MapReduce是一种可用于3MapReduce编程模型简介1、从MapReduce自身的命名特点可以看出,MapReduce由两个阶段组成:Map和Reduce。用户只需map()和reduce()两个函数,即可完成简单的分布式程序设计。2、map()函数以key/value对作为输入,产生另外一系列key/value对作为中间输出写入本地磁盘。MapReduce框架会自动将这些中间数据按照key值进行聚合,且key值相同的数据被统一交给reduce()函数处理。3、reduce()函数以key及对应的value列表作为输入,经合并key相同的value值后,产生另外一系列key/value对作为最终输出写入HDFS。MapReduce编程模型简介1、从MapReduce自身的4MapReduce编程模型简介MapReduce设计目的:易于编程良好的扩展性高容错性MapReduce编程模型简介MapReduce设计目的:5WordCount编程实例Mapper类:publicclassWordMapperextendsMapper<Object,Text,Text,IntWritable>{ publicstaticfinalIntWritableval=newIntWritable(1); publicstaticfinalTextword=newText(); publicvoidmap(Objectkey,Textvalue,Contextcontext) throwsInterruptedException,IOException{ Stringline=value.toString(); String[]arr=line.split("\t"); for(Stringwd:arr){ word.set(wd); context.write(word,val); } }}WordCount编程实例Mapper类:6WordCount编程实例Reducer类publicclassWordReducerextendsReducer<Text,IntWritable,Text,IntWritable>{ publicIntWritableval=newIntWritable(); publicvoidreduce(Textkey,Iterable<IntWritable>values,Contextcontext) throwsInterruptedException,IOException{ intsum=0; for(IntWritablevalue:values){ sum+=value.get(); } val.set(sum); context.write(key,val); }}WordCount编程实例Reducer类7WordCount编程实例main类:publicclassWordCount{ publicstaticvoidmain(String[]args)throwsIOException, ClassNotFoundException,InterruptedException{ Stringintput=null; Stringoutput=null; if(null!=args&&args.length==2){ intput=args[0]; output=args[1]; Jobjob=newJob(newConfiguration(),"wordcount");//创建一个job
//以jar包的形式运行 job.setJarByClass(WordCount.class);
//设置Mapper类和Reducer类 job.setMapperClass(Mapper.class); job.setReducerClass(Reducer.class);
WordCount编程实例main类:8WordCount编程实例//设置输出的key/value的输出数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置输入输出的格式 FileInputFormat.addInputPath(job,newPath(intput)); FileOutputFormat.setOutputPath(job,newPath(output)); System.exit(job.waitForCompletion(true)?0:1); }else{ System.err.println("<Urage>wordcount<intput><output>"); } }}WordCount编程实例9运行结果运行结果10WordCount编程实例用户编写完MapReduce程序后,按照一定的规则指定程序的输入和输出目录,并提交到Hadoop集群中,作业在Hadoop中的执行过程如图所示。Hadoop将输入数据切分成若干个输入分片(inputsplit),并将每个split交给一个MapTask处理;MapTask不断的从对应的split中解析出一个个key/value,并调用map()函数处理,处理完之后根据ReduceTask个数将结果分成若干个分区(partition)写到本地磁盘;同时,每个ReduceTask从每个MapTask上读取属于自己的那个partition,然后基于排序的方法将key相同的数据聚集在一起,调用reduce()函数处理,并将结果输出到文件中。WordCount编程实例用户编写完MapReduce程序后11WordCount编程实例流程图如下:WordCount编程实例流程图如下:12HadoopMapReduce架构HadoopMapReduce架构13HadoopMapReduce架构1)Client用户编写的MapReduce程序通过Client提交到JobTracker端;同时,用户可通过Client提供的一些接口查看作业的运行状态。在Hadoop内部用“作业”(Job)表示MapReduce程序。一个MapReduce程序可对应若干个作业,而每个作业会被分解成若干个Map/Reduce任务(Task)。2)JobTrackerJobTracke负责资源监控和作业调度。JobTracker监控所有TaskTracker与job的健康状况,一旦发现失败,就将相应的任务转移到其他节点;同时,JobTracker会跟踪任务的执行进度、资源使用量等信息,并将这些信息告诉任务调度器,而调度器会在资源出现空闲时,选择合适的任务使用这些资源。在Hadoop中,任务调度器是一个可插拔的模块,用户可以根据自己的需要设计相应的调度器。HadoopMapReduce架构1)Client14HadoopMapReduce架构3)TaskTrackerTaskTracker会周期性地通过Heartbeat将本节点上资源的使用情况和任务的运行进度汇报给JobTracker,同时接收JobTracker发送过来的命令并执行相应的操作(如启动新任务、杀死任务等)。TaskTracker使用“slot”等量划分本节点上的资源量。“slot”代表计算资源(CPU、内存等)。一个Task获取到一个slot后才有机会运行,而Hadoop调度器的作用就是将各个TaskTracker上的空闲slot分配给Task使用。slot分为Mapslot和Reduceslot两种,分别供MapTask和ReduceTask使用。TaskTracker通过slot数目(可配置参数)限定Task的并发度。HadoopMapReduce架构3)TaskTracke15HadoopMapReduce架构4)TaskTask分为MapTask和ReduceTask两种,均由TaskTracker启动。HDFS以固定大小的block为基本单位存储数据,而对于MapReduce而言,其处理单位是split。split是一个逻辑概念,它只包含一些元数据信息,比如数据起始位置、数据长度、数据所在节点等。它的划分方法完全由用户自己决定。但需要注意的是,split的多少决定了MapTask的数目,因为每个split只会交给一个MapTask处理。HadoopMapReduce架构4)Task16HadoopMapReduce架构MapTask执行过程如下图所示。由该图可知,MapTask先将对应的split迭代解析成一个个key/value对,依次调用用户自定义的map()函数进行处理,最终将临时结果存放到本地磁盘上,其中临时数据被分成若干个partition,每个partition将被一个ReduceTask处理。HadoopMapReduce架构MapTask执行过17HadoopMapReduce架构ReduceTask执行过程下图所示。该过程分为三个阶段:①从远程节点上读取MapTask中间结果(称为“Shuffle阶段”);②按照key对key/value对进行排序(称为“Sort阶段”);③依次读取<key,valuelist>,调用用户自定义的reduce()函数处理,并将最终结果存到HDFS上(称为“Reduce阶段”)。HadoopMapReduce架构ReduceTask18MapReduce实战开发数据源sogou500w数据或sogou4000w数据数据字段描述Time:用户访问时间Uid:用户的idKeyword:访问的关键字Rank:点击排名Order:页数Url:网址MapReduce实战开发数据源19条件过滤统计出搜索过包含有“仙剑奇侠传”内容的UID及搜索关键字记录详细代码见XjUid.javarank<3并且order>2的所有UID及数量代码见UidByRank.java条件过滤统计出搜索过包含有“仙剑奇侠传”内容的UID及搜索关20搜索过‘仙剑奇侠传’内容的UID、搜索记录staticclassUidMapextendsMapper<LongWritable,Text,Text,Text>{ Textuid=newText(); protectedvoidmap(LongWritablekey,Textvalue,org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,Text,Text>.Contextcontext)throwsjava.io.IOException,InterruptedException{ String[]lines=value.toString().split("\t"); if(lines!=null&&lines.length==6){ Stringkw=lines[2]; if(kw.indexOf(“仙剑奇侠传")>=0){ uid.set(lines[1]); context.write(uid,newText(kw)); } } };}搜索过‘仙剑奇侠传’内容的UID、搜索记录staticcl21搜索过‘仙剑奇侠传’内容的UID、搜索记录publicstaticvoidmain(String[]args)throwsIOException,ClassNotFoundException,InterruptedException{ if(args.length!=2&&args==null){ System.err.println("PleaseIputRightPath!"); System.exit(0); } Configurationconfiguration=newConfiguration(); Jobjob=newJob(configuration,BaiduUid.class.getSimpleName()); job.setJarByClass(BaiduUid.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.setInputPaths(job,newPath(args[0])); FileOutputFormat.setOutputPath(job,newPath(args[1])); job.setMapperClass(UidMap.class); job.setNumReduceTasks(0); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.waitForCompletion(true); }搜索过‘仙剑奇侠传’内容的UID、搜索记录publicst22
条件查询上午7-9点之间,搜索过“赶集网”的用户详细代码请见GjTest.java条件查询上午7-9点之间,搜索过“赶集网”的用户23本章小结MapReduce主要分为input、splitting、Mapping、Shuffling、Reducing、Finalreduce这几个阶段。HadoopMapReduce处理的数据一般位于底层分布式文件系统中。该系统往往将用户的文件切分为若干个固定大小的block存储到不同的节点上。默认情况下,MapReduce的每个Task处理一个block。MapReduce主要由四个组件构成,分别是Client,JobTracker,TaskTracker和Task,它们共同保障一个作业的成功运行。本章小结MapReduce主要分为input、splitti24作业用MapReduce实现WordCount作业用MapReduce实现WordCount25谢谢谢谢26认识MapReduce编程模型认识MapReduce编程模型27主要内容MapReduce编程模型简介WordCount编程实例HadoopMapReduce架构MapReduce实战开发主要内容MapReduce编程模型简介28MapReduce编程模型简介MapReduce是一种可用于数据处理的编程模型。该模型比较简单,但用于编写有用的程序并不简单。Hadoop可以运行由各种语言编写的MapReduce程序。例如:Java、Ruby、Python和C++语言等。最重要的是,MapReduce程序本质上是并行运行的,因此可以将大规模的数据分析任务交给任何一个拥有足够多机器的运行商。MapReduce的优势在于处理大规模数据集。MapReduce编程模型简介MapReduce是一种可用于29MapReduce编程模型简介1、从MapReduce自身的命名特点可以看出,MapReduce由两个阶段组成:Map和Reduce。用户只需map()和reduce()两个函数,即可完成简单的分布式程序设计。2、map()函数以key/value对作为输入,产生另外一系列key/value对作为中间输出写入本地磁盘。MapReduce框架会自动将这些中间数据按照key值进行聚合,且key值相同的数据被统一交给reduce()函数处理。3、reduce()函数以key及对应的value列表作为输入,经合并key相同的value值后,产生另外一系列key/value对作为最终输出写入HDFS。MapReduce编程模型简介1、从MapReduce自身的30MapReduce编程模型简介MapReduce设计目的:易于编程良好的扩展性高容错性MapReduce编程模型简介MapReduce设计目的:31WordCount编程实例Mapper类:publicclassWordMapperextendsMapper<Object,Text,Text,IntWritable>{ publicstaticfinalIntWritableval=newIntWritable(1); publicstaticfinalTextword=newText(); publicvoidmap(Objectkey,Textvalue,Contextcontext) throwsInterruptedException,IOException{ Stringline=value.toString(); String[]arr=line.split("\t"); for(Stringwd:arr){ word.set(wd); context.write(word,val); } }}WordCount编程实例Mapper类:32WordCount编程实例Reducer类publicclassWordReducerextendsReducer<Text,IntWritable,Text,IntWritable>{ publicIntWritableval=newIntWritable(); publicvoidreduce(Textkey,Iterable<IntWritable>values,Contextcontext) throwsInterruptedException,IOException{ intsum=0; for(IntWritablevalue:values){ sum+=value.get(); } val.set(sum); context.write(key,val); }}WordCount编程实例Reducer类33WordCount编程实例main类:publicclassWordCount{ publicstaticvoidmain(String[]args)throwsIOException, ClassNotFoundException,InterruptedException{ Stringintput=null; Stringoutput=null; if(null!=args&&args.length==2){ intput=args[0]; output=args[1]; Jobjob=newJob(newConfiguration(),"wordcount");//创建一个job
//以jar包的形式运行 job.setJarByClass(WordCount.class);
//设置Mapper类和Reducer类 job.setMapperClass(Mapper.class); job.setReducerClass(Reducer.class);
WordCount编程实例main类:34WordCount编程实例//设置输出的key/value的输出数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置输入输出的格式 FileInputFormat.addInputPath(job,newPath(intput)); FileOutputFormat.setOutputPath(job,newPath(output)); System.exit(job.waitForCompletion(true)?0:1); }else{ System.err.println("<Urage>wordcount<intput><output>"); } }}WordCount编程实例35运行结果运行结果36WordCount编程实例用户编写完MapReduce程序后,按照一定的规则指定程序的输入和输出目录,并提交到Hadoop集群中,作业在Hadoop中的执行过程如图所示。Hadoop将输入数据切分成若干个输入分片(inputsplit),并将每个split交给一个MapTask处理;MapTask不断的从对应的split中解析出一个个key/value,并调用map()函数处理,处理完之后根据ReduceTask个数将结果分成若干个分区(partition)写到本地磁盘;同时,每个ReduceTask从每个MapTask上读取属于自己的那个partition,然后基于排序的方法将key相同的数据聚集在一起,调用reduce()函数处理,并将结果输出到文件中。WordCount编程实例用户编写完MapReduce程序后37WordCount编程实例流程图如下:WordCount编程实例流程图如下:38HadoopMapReduce架构HadoopMapReduce架构39HadoopMapReduce架构1)Client用户编写的MapReduce程序通过Client提交到JobTracker端;同时,用户可通过Client提供的一些接口查看作业的运行状态。在Hadoop内部用“作业”(Job)表示MapReduce程序。一个MapReduce程序可对应若干个作业,而每个作业会被分解成若干个Map/Reduce任务(Task)。2)JobTrackerJobTracke负责资源监控和作业调度。JobTracker监控所有TaskTracker与job的健康状况,一旦发现失败,就将相应的任务转移到其他节点;同时,JobTracker会跟踪任务的执行进度、资源使用量等信息,并将这些信息告诉任务调度器,而调度器会在资源出现空闲时,选择合适的任务使用这些资源。在Hadoop中,任务调度器是一个可插拔的模块,用户可以根据自己的需要设计相应的调度器。HadoopMapReduce架构1)Client40HadoopMapReduce架构3)TaskTrackerTaskTracker会周期性地通过Heartbeat将本节点上资源的使用情况和任务的运行进度汇报给JobTracker,同时接收JobTracker发送过来的命令并执行相应的操作(如启动新任务、杀死任务等)。TaskTracker使用“slot”等量划分本节点上的资源量。“slot”代表计算资源(CPU、内存等)。一个Task获取到一个slot后才有机会运行,而Hadoop调度器的作用就是将各个TaskTracker上的空闲slot分配给Task使用。slot分为Mapslot和Reduceslot两种,分别供MapTask和ReduceTask使用。TaskTracker通过slot数目(可配置参数)限定Task的并发度。HadoopMapReduce架构3)TaskTracke41HadoopMapReduce架构4)TaskTask分为MapTask和ReduceTask两种,均由TaskTracker启动。HDFS以固定大小的block为基本单位存储数据,而对于MapReduce而言,其处理单位是split。split是一个逻辑概念,它只包含一些元数据信息,比如数据起始位置、数据长度、数据所在节点等。它的划分方法完全由用户自己决定。但需要注意的是,split的多少决定了MapTask的数目,因为每个split只会交给一个MapTask处理。HadoopMapReduce架构4)Task42HadoopMapReduce架构MapTask执行过程如下图所示。由该图可知,MapTask先将对应的split迭代解析成一个个key/value对,依次调用用户自定义的map()函数进行处理,最终将临时结果存放到本地磁盘上,其中临时数据被分成若干个partition,每个partition将被一个ReduceTask处理。HadoopMapReduce架构MapTask执行过43HadoopMapReduce架构ReduceTask执行过程下图所示。该过程分为三个阶段:①从远程节点上读取MapTask中间结果(称为“Shuffle阶段”);②按照key对key/value对进行排序(称为“Sort阶段”);③依次读取<key,valuelist>,调用用户自定义的reduce()函数处理,并将最终结果存到HDFS上(称为“Reduce阶段”)。HadoopMapReduce架构ReduceTask44MapReduce实战开发数据源sogou500w数据或sogou4000w数据数据字段描述Time:用户访问时间Uid:用户的idKeyword:访问的关键字Rank:点击排名Order:页数Url:网址MapReduce实战开发数据源45条件过滤统计出搜索过包含有“仙剑奇侠传”内容的UID及搜索关键字记录详细代码见XjUid.javarank<3并且order>2的所有UID及数量代码见UidByRank.java条件过滤统计出搜索过包含有“仙剑奇侠传”内容的UID及搜索关46搜索过‘仙剑奇侠传’内容的UID、搜索记录staticclassUidMapextendsMapper<LongWritable,Text,Text,Text>{ Textuid=newText(); protectedvoidmap(LongWritablekey,Textvalue,org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,Text,Text>.Contextcontext)throwsjava.io.IOException,InterruptedException{ String[]lines=value.toString().split("\t"); if(lines!=null&&lines.length==6){ Stringkw=lines[2];
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
评论
0/150
提交评论