版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
项目四Mapreduce分布式计算编程实战CONTENTS目录01
项目导读02
知识目标03
技能目标04
素质(思政)目标05
任务一MapReduce基本案例-单词统计CONTENTS目录06
任务二MapReduce基本案例-数据去重07
任务三MapReduce基本案例-求平均成绩08
项目总结09
项目考核项目导读01项目导读MapReduce分布式计算实战MapReduce是一个软件框架,基于该框架能够容易地编写应用程序,这些应用程序能够运行在由上千个商用机器组成的大集群上,并以一种可靠的,具有容错能力的方式并行地处理上TB级别的海量数据集。目前很多公司还都在用这个计算引擎,后续要讲的Hive原生支持的计算引擎也是MapReduce,故本章以3个经典案例来讲解Mapreduce分布式计算编程实战。MapReduce采用”分而治之”的思想,把对大规模数据集的操作,分发给一个主节点管理下的各个分节点共同完成,然后通过整合各个节点的中间结果,得到最终结果。简单地说,MapReduce就是“任务的分解与结果的汇总”知识目标02知识目标
了解MapReduce优缺点
熟悉MapReduce计算模型
熟悉shuffle
熟悉MapReduce常用编程组件技能目标03技能目标
掌握Mapper类的编写
掌握Reducer类的编写
掌握使用MapReduce解决实际问题的逻辑思路素质(思政)目标04素质(思政)目标
培养严谨细致的工匠精神
厚植技术报国梦
培养逻辑思维能力
培养学以致用解决问题的能力任务一MapReduce基本案例-单词统计05任务工单
任务场景现在需要统计一系列文本文件中每个单词出现的次数,使用Mapreduce编程完成
任务准备全班学生以4人左右为一组,各组选出组长。请组长组织组员查找相关资料,进行需求分析和系统设计问题1:描述单词统计程序的设计思路问题2:javamain函数里面的args参数分别有什么含义必备知识技能:一、MapReduce概述MapReduce编程模型简介MapReduce是一个用于大规模数据处理的分布式计算模型,最初由Google工程师设计并实现的,Google已经将完整的MapReduce论文公开发布了。其中的定义是,MapReduce是一个编程模型,是一个用于处理和生成大规模数据集的相关的实现。用户定义一个map函数来处理一个Key-Value对以生成一批中间的Key-Value对,再定义一个reduce函数将所有这些中间的有相同Key的Value合并起来。很多现实世界中的任务都可用这个模型来表达必备知识技能:二、MapReduce优缺点
优点
可伸缩性:MapReduce可以处理大规模的数据集,通过将数据分割为多个小块进行并行处理,可以有效地利用集群的计算资源。它可以在需要处理更大数据集时进行水平扩展,而不需要对现有的代码进行修改
容错性:MapReduce具有高度的容错性。当某个节点发生故障时,作业可以自动重新分配给其他可用的节点进行处理,从而保证作业的完成
灵活性:MapReduce允许开发人员使用自定义的Mapper和Reducer来处理各种类型的数据和计算任务。它提供了灵活的编程模型,可以根据具体需求进行定制和扩展
易于使用:MapReduce提供了高级抽象,隐藏了底层的并行和分布式处理细节。开发人员只需要关注数据的转换和计算逻辑,而不需要关心并发和分布式算法的实现细节必备知识技能:二、MapReduce优缺点
缺点
适用性有限:MapReduce适用于一些需要进行大规模数据处理和分析的场景,但对于一些需要实时计算和交互式查询的场景,MapReduce的延迟较高,不太适合
复杂性:尽管MapReduce提供了高级抽象,但对于开发人员来说,编写和调试MapReduce作业仍然是一项复杂的任务。需要熟悉MapReduce的编程模型和框架,并理解分布式计算的概念和原理
磁盘IO开销:在MapReduce中,数据需要在Map和Reduce阶段之间进行磁盘IO,这可能会导致性能瓶颈。尽管可以通过合理的数据分区和调优来减少磁盘IO的开销,但仍然需要考虑和处理数据移动和复制的开销
综上所述,MapReduce是一种适用于大规模数据处理的编程模型和计算框架,具有可伸缩性、容错性、灵活性和易用性等优点。然而,它在实时计算和交互式查询等场景下的适用性有限,同时开发和调试MapReduce作业的复杂性也需要考虑必备知识技能:三、MapReduce核心思想
MapReduce原理与应用MapReduce的思想核心是“分而治之”,适用于大规模复杂的任务处理场景(大规模数据处理场景)。Map负责“分”,即把复杂的任务分解为若干个“简单的任务”来并行处理。可以进行拆分的前提是这些小任务可以并行计算,彼此间没有依赖关系。Reduce负责“合”,即对Map阶段的结果进行全局汇总必备知识技能:四、MapReduce计算模型
MapReduce计算模型解析我们知道MapReduce计算模型主要由三个阶段构成:Map、Shuffle、Reduce。Map是映射,负责数据的过滤和分区,将原始数据转化为键值对;Reduce是合并,将具有相同Key值的Value进行处理后再输出新的键值对作为最终结果。为了让Reduce可以并行处理Map的结果,必须对Map的输出进行一定的排序与分割,然后再交给对应的Reduce,而这个将Map输出进行进一步整理并交给Reduce的过程就是Shuffle任务实施:一、准备数据步骤1准备输入数据。在Windows的D盘根目录底下新建一个目录,名为input。在input目录底下新建一个名为words.txt的文件,内容如下Helloworld单击此处添加项正文hellohadoop单击此处添加项正文hellobob单击此处添加项正文hadooptom单击此处添加项正文tomspark单击此处添加项正文任务实施:二、编写代码
步骤1编写Mapper类。在src-java下面创建包cn.jscfa.wordcount,在该包下新建一个Java文件WordCountMapper.java,清空该文件并输入如下代码packagecn.jscfa.wordcountimportorg.apache.hadoop.io.IntWritableimportorg.apache.hadoop.io.LongWritableimportorg.apache.hadoop.io.Textimportorg.apache.hadoop.mapreduce.Mapper任务实施:二、编写代码importjava.io.IOException
单击此处添加项正文/**
单击此处添加项正文Mapper类实现
*这里就是MapReduce程序Map阶段业务逻辑实现的类Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>*KEYIN
表示mapper数据输入时key的数据类型,在默认读取数据组件下,叫作InputFormat,它的行为是每行读取待处理的数据KEYIN数据类型解释
*读取一行,就返回一行给MR程序,这种情况下KEYIN就表示每一行的起始偏移,因此数据类型是Long*VALUEIN
表示mapper数据输入的时候value的数据类型,在默认读取数据组件下,VALUEIN就表示读取的一行内容,因此数据类型是String任务实施:二、编写代码01*KEYOUT表示Map阶段数据输出的时候key的数据类型,在本案例中输出的key是单词,因此数据类型是String02*VALUEOUT表示mapper阶段数据输出的时候value的数据类型,在本案例中输出的value是单词的次数,因此数据类型是Integer03数据序列化效率问题*这里所说的数据类型String,Long都是JDK自带的类型,数据在分布式系统中跨网络传输就需要将数据序列化,默认JDK序列化时效率低下,因此04HadoopSerializationTypes*使用Hadoop封装的序列化类型。long--LongWritableString--TextIntegerIntWritable05*/单击此处添加项正文06WordCountMapperClasspublicclassWordCountMapperextendsMapper<LongWritable,Text,Text,IntWritable>{任务实施:二、编写代码
/***这里就是Map阶段具体业务逻辑实现的方法。该方法的调用取决于读取数据的组件有没有给MR传入数据*如果有数据传入,每一个<k,v>对,map就会被调用一次*/@OverrideMapFunctioninHadoopprotectedvoidmap(LongWritablekey,Textvalue,Contextcontext)throwsIOException,InterruptedException{任务实施:二、编写代码
//拿到传入进来的一行内容,把数据类型转换为String
//key-k1,value-v1
//key=0,value="helloworld"
Stringline=value.toString()
//将这行内容按照分隔符切割
String[]words=line.split("")任务实施:二、编写代码//words[0]="hello";words[1]="world"//遍历数组,每出现一个单词就标记一个数值1例如:〈单词,1>for(Stringword:words){//使用MR上下文context,把Map阶段处理的数据发送给Reduce阶段作为输入数据context.write(newText(word),newIntWritable(1))//k2="hello",v2=1;k2="world",v2=1任务实施:二、编写代码
//<hello,1><world,1>HadoopSparkDataTransmission//第一行hadoophadoopspark发送出去的是<hadoop,1><hadoop,1><spark,1>步骤2编写Combiner类,在cn.jscfa.wordcount包下新建java文件WordCountCombiner.java,清空该文件并输入如下代码packagecn.jscfa.wordcountimportorg.apache.hadoop.io.IntWritableimportorg.apache.hadoop.io.Text任务实施:二、编写代码
importorg.apache.hadoop.mapreduce.Reducerimportjava.io.IOExceptionWordCountCombinerClasspublicclassWordCountCombinerextendsReducer<Text,IntWritable,Text,IntWritable>{@OverrideReduceFunctionExplanationprotectedvoidreduce(Textkey,Iterable<IntWritable>values,Contextcontext)throwsIOException,InterruptedException{//输入k2="hello"v2=<1,1,1><"hello",<1,1,1>任务实施:二、编写代码//1.局部汇总//key="hello";values=[1,1,1]intcount=0//求和V2for(IntWritablev:values){count+=v.get();count=count+v.get()任务实施:二、编写代码
//count为values这个数组的所有元素之和
context.write(key,newIntWritable(count))
//k2="hello",v2=3;<hello,3>
//输入
//<k2,v2>
//<hello,1>任务实施:二、编写代码
//<world,1>//<hello,1>//<hadoop,1>//<hello,1>//<bob,1>//<hadoop,1>任务实施:二、编写代码
01//<tom,1>单击此处添加项正文
02//<tom,1>单击此处添加项正文
03//<spark,1>单击此处添加项正文
04//<hello,<1,1,1>单击此处添加项正文
05步骤3编写Reducer类,在cn.jscfa.wordcount包下新建java文件WordCountReducer.java,清空该文件并输入如下代码
06packagecn.jscfa.wordcount单击此处添加项正文任务实施:二、编写代码
importorg.apache.hadoop.io.IntWritableimportorg.apache.hadoop.io.Textimportorg.apache.hadoop.mapreduce.Reducerimportjava.io.IOException编程模型继承Reducer//都要继承基类Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>这就是我们所说的编程模型/**任务实施:二、编写代码
*这里是MR程序Reducer阶段处理的类*KEYIN:就是Reducer阶段输入的数据key类型,对应Mapper阶段输出KEY类型*VALUEIN就是Reducer阶段输入的数据value类型,对应Mapper阶段输出VALUE类型,在本案例中就是个数*KEYOUT:就是Reducer阶段输出的数据key类型,在本案例中,就是单词Text*VALUEOUT:reducer阶段输出的数据value类型,在本案例中,就是单词的总次数*/任务实施:二、编写代码
WordCountReducerClasspublicclassWordCountReducerextendsReducer<Text,IntWritable,Text,IntWritable>{/***这里是REDUCE阶段具体业务类的实现方法HadoopSparkDataTransmission*第一行hadoophadoopspark发送出去的是<hadoop,1><hadoop,1><spark,1>*Reduce阶段接收所有来自Map阶段处理的数据之后,按照Key的字典序进行排序*按照key是否相同作为一组去调用reduce方法任务实施:二、编写代码
*本方法的key就是这一组相同的kv对共同的Key*迭代器:<hadoop,[1,1]>*/@OverrideReduceFunctionExplanationprotectedvoidreduce(Textkey,Iterable<IntWritable>valueIOExceptionandInterruptedExceptionhandlinginContextContextcontext)throwsIOException,InterruptedException{任务实施:二、编写代码//定义一个计数器intcount=0//遍历一组迭代器,把每个数量1累加起来就构成了单词总次数//for(IntWritableiw:value){count+=iw.get()任务实施:二、编写代码
context.write(key,newIntWritable(count))//输入k2,v2;输出k3,v3步骤4编写主类,在cn.jscfa.wordcount包下新建java文件WordCountDriver.java,清空该文件并输入如下代码packagecn.jscfa.wordcountimportorg.apache.hadoop.conf.Configurationimportorg.apache.hadoop.fs.Path任务实施:二、编写代码
importorg.apache.hadoop.io.IntWritable单击此处添加项正文
importorg.apache.hadoop.io.Text单击此处添加项正文
importorg.apache.hadoop.mapreduce.Job单击此处添加项正文
FileInputFormatImportimportorg.apache.hadoop.mapreduce.lib.input.FileInputFormat
FileOutputFormatImportimportorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat
/**单击此处添加项正文任务实施:二、编写代码
*Driver类就是MR程序运行的主类,本类中组装了一些程序运行时所需要的信息*比如:使用的Mapper类是什么,Reducer类,输入、输出数据存放在哪里*/publicclassWordCountDriver{JavaProgramStartpublicstaticvoidmain(String[]args)throwsException{if(args==null||args.length==0){任务实施:二、编写代码args=newString[2]args[0]="D:\\wordcount\\input"args[1]="D:\\wordcount\\output"Configurationconf=newConfiguration()conf.set("","local")Jobwcjob=Job.getInstance(conf)任务实施:二、编写代码单击此处添加正文
//指定MRJobjar包运行主类
wcjob.setJarByClass(WordCountDriver.class)
wcjob.setMapperClass(WordCountMapper.class)
wcjob.setReducerClass(WordCountReducer.class)
//设置我们业务逻辑Mapper类的输出key和value的数据类型任务实施:二、编写代码
wcjob.setMapOutputValueClass(IntWritable.class)
//设置我们业务逻辑Reducer类的输出key和value的数据类型
wcjob.setOutputKeyClass(Text.class)
wcjob.setOutputValueClass(IntWritable.class)
//设置Combiner组件
wcjob.setCombinerClass(WordCountCombiner.class)任务实施:二、编写代码
//指定要处理的数据所在的位置单击此处添加项正文
InputPathsSettingFileInputFormat.setInputPaths(wcjob,newPath(args[0]))
//指定处理完成之后的结果所保存的位置单击此处添加项正文
SetOutputPathFileOutputFormat.setOutputPath(wcjob,newPath(args[1]))
//提交程序并且监控打印程序执行情况单击此处添加项正文任务实施:二、编写代码
booleanres=wcjob.waitForCompletion(true)
System.exit(res?0:1)任务实施:三、运行程序
WordCount结果分析本地运行,在主类WordCountDriver里面用鼠标右键单击,单击运行子命令,程序开始执行,在D:\wordcount目录下生成了一个output目录,在output目录下面有4个文件,分别是“_SUCCESS.crc”“part-r-00000.crc”“_SUCCESS”“part-r-00000”,结果存放在“part-r-00000”文件里面,其内容如下
bob1单击此处添加项正文
hadoop2单击此处添加项正文
hello3单击此处添加项正文任务实施:三、运行程序
spark1tom2world1任务二MapReduce基本案例-数据去重06任务工单
任务场景现在需要删掉目录下所有文本文件中的重复行,重复行只保留一个,使用Mapreduce编程完成
任务准备全班学生以4人左右为一组,各组选出组长。请组长组织组员查找相关资料,进行需求分析和系统设计问题1:描述数据去重的设计思路问题2:描述map阶段工作流程问题3:描述reduce阶段工作流程必备知识技能:一、MapshuffleMapshuffle在Map端的shuffle过程是对Map的结果进行分区、排序、分割,然后将属于同一划分(分区)的输出合并在一起一并写在磁盘上,最终得到一个分区有序的文件,分区有序的含义是map输出的键值对按分区进行排列,具有相同partition值的键值对存储在一起,每个分区里面的键值对又按key值进行升序排列(默认)必备知识技能:一、Mapshuffle
Partition对于map输出的每一个键值对,系统都会给定一个partition,partition值默认是通过计算key的hash值后对Reducetask的数量取模获得如果一个键值对的partition值为1,意味着这个键值对会交给第一个Reducer处理我们知道每一个Reducer的输出都是有序的,但是将所有Reducer的输出合并到一起却并非全局有序的,如果要做到全局有序,我们该怎么做呢?最简单的方式,只设置一个Reducetask,但是这样完全发挥不出集群的优势,而且能应对的数据量也很受限。最佳的方式是自己定义一个Partitioner,用输入数据的最大值除以系统Reducetask数量的商作为分割边界,也就是说分割数据的边界为此商的1倍、2倍至numPartitions-1倍,这样就能保证执行partition后的数据是整体有序的另一种需要我们自己定义一个Partitioner的情况是各个Reducetask处理的键值对数量极不平衡。对于某些数据集,由于很多不同的key的hash值都一样,导致这些键值对都被分给同一个Reducer处理,而其他的Reducer处理的键值对很少,从而拖延整个任务的进度。当然,编写自己的Partitioner必须保证具有相同key值的键值对分发到同一个Reducer必备知识技能:一、Mapshuffle
CollectorMap的输出结果是由collector处理的,每个Map任务不断地将键值对输出到在内存中构造的一个环形数据结构中。使用树形数据结构是为了更有效地使用内存空间,在内存中放置尽可能多的数据这个数据结构其实就是个字节数组,叫Kvbuffer,名如其义,但是这里面不光放置了数据,还放置了一些索引数据,给放置索引数据的区域起了一个Kvmeta的别名,在Kvbuffer的一块区域上穿了一个IntBuffer(字节序采用的是平台自身的字节序)的马甲。数据区域和索引数据区域在Kvbuffer中是相邻不重叠的两个区域,用一个分界点来划分两者,分界点不是一成不变的,而是每次Spill之后都会更新一次。初始的分界点是0,数据的存储方向是向上增长,索引数据的存储方向是向下增长在HadoopMapReduce的MapOutputBuffer内存管理机制中,kvbuffer、Kvmeta、bufindex和Kvindex构成了一个精妙协同的四元体系。kvbuffer作为主数据缓冲区,通过bufindex指针以前向增长的方式存储序列化后的键值对原始数据,bufindex随着每个键值对的写入而按实际数据字节数递增,当达到缓冲区边界时执行环形回绕。与此同时,Kvmeta作为元数据索引区,采用固定格式的四元组结构记录每个键值对的定位信息,包括值起始位置、键起始位置、分区标识符和值长度。核心组件Kvindex作为元数据写入指针,采用与bufindex相反的逆向递减机制:初始时指向元数据区末端,每当需要添加新的键值对索引时,指针首先递减四个整型单位,为新的四元组预留存储空间,随后依次填充各个字段值。这种bufindex正向增长与Kvindex反向递减的双指针机制形成了对称平衡的内存布局,通过实时计算两指针的相对位置监控缓冲区使用状态。当bufindex与Kvindex指针相遇或达到预设阈值时,系统自动触发溢出写磁盘操作,确保内存资源的有效循环利用。整个机制通过双指针的精确协同控制和空间的高效划分,为MapReduce框架的大规模数据处理提供了稳定可靠的内存管理基础,在保证性能的同时实现了资源的动态优化分配Kvbuffer的大小可以通过io.sort.mb设置,默认大小为100M。但不管怎么设置,Kvbuffer的容量都是有限的,键值对和索引不断地增加,加着加着,Kvbuffer总有不够用的那天,那怎么办?把数据从内存刷到磁盘上再接着往内存写数据,把Kvbuffer中的数据刷到磁盘上的过程就叫Spill,多么明了的叫法,内存中的数据满了就自动地Spill到具有更大空间的磁盘关于Spill触发的条件,也就是Kvbuffer用到什么程度开始Spill,还是要讲究一下的。如果把Kvbuffer用得死死的,一点缝都不剩的时候再开始Spill,那Map任务就需要等Spill完成腾出空间之后才能继续写数据;如果Kvbuffer只是满到一定程度,比如80%的时候就开始Spill,那在Spill的同时,Map任务还能继续写数据,如果Spill够快,Map可能都不需要为空闲空间而发愁。两利相权取其重,一般选择后者。Spill的门限可以通过io.sort.spill.percent设置,默认是0.8必备知识技能:一、MapshuffleSort当Spill触发后,SortAndSpill先把Kvbuffer中的数据按照partition值和key两个关键字升序排序,移动的只是索引数据,排序结果是Kvmeta中数据以partition为单位聚集在一起,同一partition内的按照key有序必备知识技能:一、Mapshuffle
SpillSpill线程为这次Spill过程创建一个磁盘文件:从所有的本地目录中轮询查找能存储这么大空间的目录,找到之后在其中创建一个类似于“spill12.out”的文件。Spill线程根据排过序的Kvmeta挨个partition地把数据吐到这个文件中,一个partition对应的数据吐完之后顺序地吐下一个partition,直到把所有的partition遍历完。一个partition在文件中对应的数据也叫段(segment)。在这个过程中如果用户配置了combiner类,那么在写之前会先调用combineAndSpill(),对结果进行进一步合并后再写出。Combiner会优化MapReduce的中间结果,所以它在整个模型中会多次使用。那哪些场景才能使用Combiner呢?Combiner的输出是Reducer的输入,Combiner绝不能改变最终的计算结果。所以从我的想法来看,Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加、最大值等。Combiner的使用一定得慎重,如果用得好,它对job执行效率有帮助,反之会影响reduce的最终结果所有的partition对应的数据都放在这个文件里,虽然是顺序存放的,但是怎么直接知道某个partition在这个文件中存放的起始位置呢?强大的索引又出场了。有一个三元组记录某个partition对应的数据在这个文件中的索引:起始位置、原始数据长度、压缩之后的数据长度,一个partition对应一个三元组。然后把这些索引信息存放在内存中,如果内存中放不下了,后续的索引信息就需要写到磁盘文件中了:从所有的本地目录中轮询查找能存储这么大空间的目录,找到之后在其中创建一个类似于“spill12.out.index”的文件,文件中不光存储了索引数据,还存储了crc32的校验数据。spill12.out.index不一定在磁盘上创建,如果内存(默认1M空间)中能放得下就放在内存中,即使在磁盘上创建了,它和spill12.out文件也不一定在同一个目录下。每一次Spill过程就会至少生成一个out文件,有时还会生成index文件,Spill的次数也烙印在文件名中。索引文件和数据文件的对应关系如下图4-2-1所示在Spill线程如火如荼地进行SortAndSpill工作的同时,Map任务不会因此而停歇,而是一如既往地进行着数据输出。Map还是把数据写到kvbuffer中,那问题就来了:只顾着闷头按照bufindex指针向上增长,kvmeta只顾着按照Kvindex向下增长,是保持指针起始位置不变继续跑呢,还是另谋出路?如果保持指针起始位置不变,很快bufindex和Kvindex就碰头了,碰头之后再重新开始或者移动内存都比较麻烦,不可取。Map任务取kvbuffer中剩余空间的中间位置,用这个位置设置为新的分界点,bufindex指针移动到这个分界点,Kvindex移动到这个分界点的-16位置,然后两者就可以和谐地按照自己既定的轨迹放置数据了,当Spill完成,空间腾出之后,不需要做任何改动继续前进。分界点的转换如下图4-2-2所示。Map任务总要把输出的数据写到磁盘上,即使输出数据量很小,在内存中全部能装得下,在最后也会把数据刷到磁盘上必备知识技能:一、Mapshuffle
MergeMap任务如果输出数据量很大,可能会进行好几次Spill,out文件和Index文件会产生很多,分布在不同的磁盘上。最后把这些文件进行合并的merge过程闪亮登场Merge过程怎么知道产生的Spill文件都在哪了呢?从所有的本地目录上扫描得到产生的Spill文件,然后把路径存储在一个数组里。Merge过程又怎么知道Spill的索引信息呢?没错,也是从所有的本地目录上扫描得到Index文件,然后把索引信息存储在一个列表里。到这里,又遇到了一个值得纳闷的地方。在之前Spill过程中的时候为什么不直接把这些信息存储在内存中呢,何必又多了这步扫描的操作?特别是Spill的索引数据,当内存超限之后把数据写到磁盘,现在又要从磁盘把这些数据读出来?在MapReduce的Spill过程中不将索引信息完全保存在内存中,而采用磁盘索引机制,本质上是受限于内存资源的有限性——当Map任务需要处理海量数据并可能经历多次Spill时,若将所有Spill的索引数据都保留在内存中,其累积占用的内存空间将随Spill次数线性增长,极易导致内存耗尽而使任务失败;虽然从磁盘重新读取索引需要额外的I/O操作,但这种设计通过将内存占用控制在稳定范围内,确保了系统在处理任意规模数据时的可靠性和可扩展性,这是在大数据场景下为保障任务成功执行而做出的必要权衡。(对于内存空间较大的土豪来说,用内存来省却这两个I/O步骤还是值得考虑的。)为merge过程创建一个叫file.out的文件和一个叫file.out.Index的文件用来存储最终的输出和索引,一个partition一个partition的进行合并输出。对于某个partition来说,从索引列表中查询这个partition对应的所有索引信息,每个对应一个段插入到段列表中。也就是这个partition对应一个段列表,记录所有的Spill文件中对应的这个partition那段数据的文件名、起始位置、长度等等对这个partition对应的所有的segment进行合并,目标是合并成一个segment。当这个partition对应很多个segment时,会分批地进行合并:先从segment列表中把第一批取出来,以key为关键字放置成最小堆,然后从最小堆中每次取出最小的输出到一个临时文件中,这样就把这一批段合并成一个临时的段,把它加回到segment列表中;再从segment列表中把第二批取出来合并输出到一个临时segment,把它加入列表中;这样往复执行,直到剩下的段是一批,输出到最终的文件中。最终的索引数据仍然输出到Index文件中,如图4-2-3必备知识技能:二、ReduceshuffleReduceshuffle在Reduce端,shuffle主要分为复制Map输出、排序合并两个阶段CopyReduce任务通过HTTP协议向各个已完成的Map任务节点主动拉取(Pull)其对应分区的数据。Map任务成功完成后,会通知父TaskTracker状态已经更新,TaskTracker进而通知JobTracker(这些通知在心跳机制中进行)。所以,对于指定作业来说,JobTracker能记录Map输出和TaskTracker的映射关系。Reduce会定期向JobTracker获取Map的输出位置,一旦拿到输出位置,Reduce任务就会从此输出对应的TaskTracker上复制输出到本地,而不会等到所有的Map任务结束必备知识技能:二、ReduceshuffleMergeSort在ReduceShuffle阶段,拉取(Pull)过来的数据会首先被存入一个内存缓冲区。当缓冲区中的数据量达到设定的阈值(例如,占用缓冲区容量的特定比例)时,系统会将缓冲区内所有数据(这些数据来自不同的Map任务,但属于同一个Reduce分区)根据键进行排序,然后溢写(Spill)到磁盘,生成一个临时的、内部有序的文件。如果配置了Combiner,并且其适用于Reduce端(此场景较少,Combiner主要用在Map端),它可能会在溢写前或后续的归并阶段被调用以化简数据。这个过程会重复多次,最终在磁盘上生成多个这样的临时文件。随后,Reduce任务会启动一个多路归并(MergeSort)流程,将这些临时文件与内存中剩余的数据合并,最终生成一个面向Reduce函数的、整体有序的输入文件必备知识技能:三、MapReduce中Mapper、Partition、Reducer数目的确定与关系
Mapper由客户端分片情况决定,客户端获取到输入路径的所有文件,依次对每个文件执行分片,分片大小通过最大分片大小、最小分片大小、HDFS的blocksize综合确定,分片结果写入job.split提交给YARN,对每个分片分配一个Mapper,即确定了数目
Partition由PartitionerClass中的逻辑确定,默认情况下使用的HashPartitioner中使用了hash值与reducerNum的余数,即由reducerNum决定,等于Reducer数目
自定义Partitioner逻辑如果自定义的PartitionerClass中有其他逻辑,例如逻辑是固定的,也可以与Reducer数目无关
Reducer数量设置指南Reducer(ReduceTask)的默认数量通常是1。设置ReduceTask数量时,首先要考虑业务逻辑。例如,计算全局汇总结果时,必须设置为1。ReduceTask数量与Map阶段输出的分区数密切相关,但并不要求严格相等必备知识技能:三、MapReduce中Mapper、Partition、Reducer数目的确定与关系
ReduceTask与分区数匹配策略ReduceTask数量=分区数(1:1或N:N):这是最理想的情况,每个ReduceTask处理一个分区的数据,负载均衡最好。ReduceTask数量<分区数(N:1或N:M,N>M):当设置为1时(N:1),所有分区的数据都由一个ReduceTask处理;当设置为M(M>1)时,框架会通过规则(如分区号%M)将多个分区的数据分配给同一个ReduceTask,这可能导致数据倾斜和性能下降,但分区机制依然有效。ReduceTask数量>分区数(1:N或M:N,M>N):会产生(M-N)个没有数据的空ReduceTask,它们会生成空输出文件。这浪费资源,但不影响计算结果的确定性。因此,应避免将ReduceTask数量设置得远多于分区数,也应避免设置得过少(除非业务需要,如全局计算),以免导致资源浪费或数据倾斜。通常建议将ReduceTask数量设置为与分区数相等,或根据集群资源和数据量进行合理调整任务实施:一、准备数据
步骤1准备输入数据。在Windows的D盘根目录底下新建一个目录,名为input。在input目录底下新建2个文本文件file1.txt和file2.txt,file1.txt内容如下
hello单击此处添加项正文
world单击此处添加项正文
say单击此处添加项正文
goodbye单击此处添加项正文
goodgoodstudy单击此处添加项正文任务实施:一、准备数据
daydayup
good
hello
world
file2.txt内容如下
hello任务实施:一、准备数据saysaygoodbyegoodgoodstudydaydayup任务实施:一、准备数据
good
goodbye
world任务实施:二、编写代码
步骤1编写Mapper类,在src/java下面创建包cn.jscfa.dedup,在该包下新建java文件DedupMapper.java,清空该文件并输入如下代码packagecn.jscfa.dedupimportorg.apache.hadoop.io.LongWritableimportorg.apache.hadoop.io.NullWritableimportorg.apache.hadoop.io.Textimportorg.apache.hadoop.mapreduce.Mapper任务实施:二、编写代码
importjava.io.IOExceptionDedupMapperClassDefinitionpublicclassDedupMapperextendsMapper<LongWritable,Text,Text,NullWritable>{privatestaticTextfield=newText()//<0,2018-3-3c><11,2018-3-4d>@OverrideMapFunctioninHadoopprotectedvoidmap(LongWritablekey,Textvalue,Contextcontext)throwsIOException,InterruptedException{任务实施:二、编写代码
//k1-key=70;v1-value=“2018-3-3c”;2018-3-3cfield=valuecontext.write(field,NullWritable.get())//v2=null,k2=field=v1//<2018-3-3c,null><2018-3-4d,null>步骤2编写Reducer类,在cn.jscfa.dedup包下新建java文件DedupReducer.java,清空该文件并输入如下代码:其内容如下任务实施:二、编写代码
packagecn.jscfa.dedupimportorg.apache.hadoop.io.NullWritableimportorg.apache.hadoop.io.Textimportorg.apache.hadoop.mapreduce.Reducerimportjava.io.IOExceptionDedupReducerClasspublicclassDedupReducerextendsReducer<Text,NullWritable,Text,NullWritable>{任务实施:二、编写代码
由于待提炼的正文内容为日期和无法识别的标记,没有实际意义或主题,无法提炼出符合要求的标题。根据规则,当正文内容字数小于规定字数且无实际意义时,可直接输出原文。但考虑到输出应为有意义的信息,此处不适用直接输出原文的规定。鉴于此情况,建议在实际应用中,若遇到类似无实际信息的文本,可以返回“无效信息”或“无标题”等提示,以表明无法从给定文本中提炼出有效标题。但在本例中,由于规则限制,我们无法提供一个合适的标题,故不输出任何内容。在实际操作中,可考虑返回“无标题”作为默认处理方式。然而,根据当前规则,正确做法是不输出任何内容//<2018-3-3c,null><2018-3-4d,null><2018-3-4d,null>@OverrideReduceFunctionExplanationprotectedvoidreduce(Textkey,Iterable<NullWritable>values,Contextcontext)throwsIOException,InterruptedException{//k2-key?="2018-3-3c";v2-values?=nullcontext.write(key,NullWritable.get())任务实施:二、编写代码单击此处添加正文//k3=key="2018-3-3c";v3=null单击此处添加项正文步骤3编写主类,在cn.jscfa.dedup包下新建java文件DedupRunner.java,清空该文件并输入如下代码packagecn.jscfa.dedup单击此处添加项正文importorg.apache.hadoop.conf.Configuration单击此处添加项正文importorg.apache.hadoop.fs.Path单击此处添加项正文importorg.apache.hadoop.io.NullWritable单击此处添加项正文任务实施:二、编写代码
importorg.apache.hadoop.io.Text单击此处添加项正文
importorg.apache.hadoop.mapreduce.Job单击此处添加项正文
FileInputFormatImportimportorg.apache.hadoop.mapreduce.lib.input.FileInputFormat
FileOutputFormatImportimportorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat
importjava.io.IOException单击此处添加项正文
publicclassDedupRunner{单击此处添加项正文任务实施:二、编写代码
main函数定义publicstaticvoidmain(String[]args)throwsIOException,ClassNotFoundException,InterruptedException{if(args==null||args.length==0){args=newString[2]args[0]="D:\\Dedup\\input"args[1]="D:\\Dedup\\output"Configurationconf=newConfiguration()任务实施:二、编写代码
Jobjob=Job.getInstance(conf)
job.setJarByClass(DedupRunner.class)
job.setMapperClass(DedupMapper.class)
job.setReducerClass(DedupReducer.class)
job.setOutputKeyClass(Text.class)
job.setOutputValueClass(NullWritable.class)任务实施:二、编写代码
01SetInputPathsforJobFileInputFormat.setInputPaths(job,newPath(args[0]))
02//指定处理完成之后的结果所保存的位置单击此处添加项正文
03SetOutputPathFileOutputFormat.setOutputPath(job,newPath(args[1]))
04job.waitForCompletion(true)单击此处添加项正文任务实施:三、运行程序WordCount结果分析本地运行,在主类WordCountDriver上用鼠标右键单击,单击运行子命令,程序开始执行,在D:\wordcount目录下生成了一个output目录,在output目录下面有4个文件,分别是“_SUCCESS.crc”“part-r-00000.crc”“_SUCCESS”“part-r-00000”,结果存放在“part-r-00000”文件里面,其内容如下daydayup单击此处添加项正文good单击此处添加项正文goodgoodstudy单击此处添加项正文goodbye单击此处添加项正文任务实施:三、运行程序
hello
say
world任务三MapReduce基本案例-求平均成绩07任务工单01任务场景某高中期末考试后,有一份某年级学生的成绩表,现要求统计本次期末考各个科目平均成绩,要求使用MapReduce编程实现02任务准备全班学生以4人左右为一组,各组选出组长。请组长组织组员查找相关资料,进行需求分析和系统设计问题1:描述求平均成绩的设计思路问题2:MaskTask的个数由什么决定的问题3:切片是什么必备知识技能本节我们需要熟悉MapReduce常用编程组件必备知识技能:一、InputFormatInputFormat组件解析InputFormat是MapReduce当中用于处理数据输入的一个组件,是顶级的一个抽象父类,主要用于解决各个地方的数据源的数据输入问题。其中InputFormat的UML类图可以通过IDEA进行查看(只有商业版本才有这个功能)必备知识技能:二、FileInputFormat常用类介绍FileInputFormat类介绍FileInputFormat类也是InputFormat的一个子类,如果需要操作HDFS上面的文件,基本上都是通过FileInputFormat类来实现的。可以通过FileInputFormat来实现各种格式的文件操作,FileInputFormat的子类如表4-3-1必备知识技能:三、CombineTextInputFormat类
MapTask的个数由什么决定的在运行我们的MapReduce程序的时候,我们可以清晰地看到会有多个MapTask的运行,那么MapTask的个数究竟与什么有关,是不是MapTask越多越好,或者说是不是MapTask的个数越少越好呢?我们可以通过MapReduce的源码进行查看MapTask的个数究竟是如何决定的。在MapReduce当中,每个MapTask处理一个切片split的数据量,因此,一个切片对应一个MapTask必备知识技能:三、CombineTextInputFormat类
切片是什么切片与block块的概念很像,但是block块是HDFS当中存储数据的单位,切片split是MapReduce当中每个MapTask处理数据量的单位一个切片对应一个MapTask,一个job事务的map阶段的并行度由提交job时的切片数,切片不考虑数据集整体,而是针对每一个文件单独切片,数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储HDFS上面如果有以下两个文件,文件大小分别为300M和10M,那么会启动多少个MapTaskfile1.txt300Mfile2.txt10M经过FileInputFormat的切片机制运算后,形成的切片信息如下file1.txt.split1--0~128Mfile1.txt.split2--128~256Mfile1.txt.split3--256~300Mfile2.txt.split1--0~10M//针对每一个文件单独切片,这个文件很小,但是会对应一个切片,一个MapTask默认的切片机制是对任务按照文件规划切片,不管文件多小,都会是单独的切片,交给一个MapTask,如果有大量的小文件,就会产生大量的MapTask,处理效率比较低。在小文件过多的场景使用CombineTextInputFormat可以将多个小文件从逻辑上规划到一个切片中,这样多个小文件就可以交给一个MapTask处理必备知识技能:三、CombineTextInputFormat类
切片机制在MapReduce框架中,切片主要是使用CombineTextInputFormat类来实现的,其机制旨在优化小文件处理效率,过程可分为虚拟存储与物理切片两个阶段必备知识技能:三、CombineTextInputFormat类
3.切片机制虚拟存储过程该阶段执行逻辑划分,将输入文件集转换为若干虚拟存储块(VirtualStorageBlock)。具体规则如下•若文件大小不大于设定的最大分片大小(maxSplitSize,如4MB),则直接作为一个虚拟块•若文件大小超过maxSplitSize但小于其两倍,则均分为两个虚拟块,以避免产生过小碎片•若文件大小超过maxSplitSize的两倍,则按maxSplitSize连续切分,直至剩余部分落入上述两种情形之一示例说明设maxSplitSize=4MB,若输入文件为8.02MB,则切分为•一个4MB的虚拟块•剩余4.02MB文件因处于[maxSplitSize,2×maxSplitSize]区间,被进一步均分为两个2.01MB的虚拟块必备知识技能:三、CombineTextInputFormat类
物理切片过程本阶段将虚拟块合并为实际分片(Split),规则如下•按文件顺序依次处理虚拟块,若某虚拟块文件大小≥maxSplitSize,则直接转为独立分片•若虚拟块文件大小小于maxSplitSize,则与后续虚拟块合并,直至总大小≥maxSplitSize或无剩余虚拟块示例说明设五个文件大小分别为1.7MB、0.1MB、5.1MB、3.4MB、6.8MB,经虚拟存储后生成七个虚拟块[1.7,0.1,(2.55,2.55),3.4,(3.4,3.4)]最终合并为三个分片•分片1:1.7+0.1+2.55=4.35MB•分片2:2.55+3.4=5.95MB•分片3:3.4+3.4=6.8MB必备知识技能:四、自定义InputFormat
自定义MapReduceInputFormatMapReduce框架当中提供了很多的文件输入类,用于处理文件数据的输入,如果提供的数据类不足以实现我们的需求,可以通过自定义的InputFormat来实现文件数据的输入
自定义InputFormat和RecordReader自定义MyInputFormat继承InputFormat,重写isSplitable方法(是否需要分片),重写createRecordReader方法,自定义MyRecordReader继承RecordReader,实现抽象方法必备知识技能:五、partitioner详解
MapReducePartitioner详解partition(分区)主要是将相同的数据发送到同一个reduceTask里面去,在MapReduce当中有一个抽象类叫作Partitioner,默认使用的实现类是HashPartitioner
自定义partitioner继承Partitioner类重写getPartition方法单击此处添加项正文
调整ReduceTask数量根据设定的partition设置相应的reducetask数量。提示:partition大于reduce数量报错,partition小于reduce数量会生成空文件必备知识技能:六、MapReduce中的排序
MapReduce排序机制解析排序是MapReduce框架中的重要操作之一,MapTask和ReduceTask均会对数据按照key排序,该操作属于Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。默认按照字典排序,且实现方法为快排MapTask数据处理流程对于MapTask,它会将处理结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,当数据处理完毕后,对磁盘上所有文件进行归并排序ReduceTask数据处理流程对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写到磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件。如果内存中文件大小或者数目超过一定阈值,则进行一次合并将数据溢写到磁盘上。当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序部分排序:根据输入记录的键对数据集进行排序,保证输出的每个文件内部都有序必备知识技能:六、MapReduce中的排序全排序最终输出结果只有一个文件,通过只设置一个ReduceTask实现,这样MapReduce所提供的并行架构就丧失了分组排序Reduce阶段的关键机制,用于控制如何将已排序的键分组并传递给reduce函数,分组排序定义了在Reduce阶段哪些键应该被视为同一组,从而决定何时调用reduce函数。分组排序二次排序:自定义排序过程中,compareTo判断条件为两个即为二次排序单击此处添加项正文必备知识技能:七、Combiner详解
Combiner(规约)是MapReduce中Mapper和Reducer之外的一种组件CombinerandReducerDifferenceCombiner继承自Reducer,但是Combiner是在每一个MapTask所在的节点上运行,而Reducer是接收全局所有Mapper的输出结果Combiner的意义就是对每一个MapTask的输出进行局部汇总,以减少网络传输量Combiner应用的前提是不能影响最终的业务逻辑,且输出的k应该与Reducer的输入k类型对应自定义Combiner设置使用自定义Combiner继承Reducer类,重写reduce方法,使用job.setCombinerClass()设置自定义Combiner组件必备知识技能:八、GroupingComparator分组详解
GroupingComparator在MapReduce中的作用GroupingComparator是MapReduce当中reduce端的一个功能组件,主要作用是决定哪些数据为一组调用一次reduce的逻辑,默认是每个不同的key作为不同的组,每个组调用一次reduce逻辑,我们可以自定义GroupingComparator实现不同的key作为同一组,调用一次reduce逻辑。分组本质是排序,步骤如下自定义类继承WritableComparator创建一个构造方法将比较对象传给父类重写compare方法(3)重写compare(WritableComparablea,WritableComparableb)方法设置分组:job.setGroupingComparatorClass(MyGroup)tip重写compare(WritableComparablea,WritableComparableb),不是compare(Objecta,Objectb)必备知识技能
自定义outputFormat可以自定义MyOutputFormat继承FileOutputFormat类,重写getRecordWriter方法;自定义MyRecordWriter继承RecordWriter类,实现其抽象方法任务实施
准备数据步骤1:准备输入数据。在windows的D盘根目录下新建一个目录,名为avg。在avg目录下新建一个score.csv,该文件数据如表4-3-2任务实施:二、编写代码
步骤1编写Mapper类。Mapper阶段的主要任务是将输入数据解析为键值对的形式输出给Reducer阶段。在本例中,我们假设输入数据是CSV格式的文件,每行包含学生ID、课程名和成绩。Mapper类WordCountMap继承自Mapper,并定义了输入和输出的键值对类型为LongWritable、Text和Text、IntWritable。在map方法中,我们首先将输入的Text对象转换为字符串,并使用StringTokenizer按逗号作为分隔符进行解析。我们跳过第一个字段(学生ID),然后获取课程名和成绩,并将解析结果分别赋值给course和score。最后,我们通过context.write方法将课程名和成绩作为键值对输出新建WordCountMap.java文件在cn.jscfa.avg包下新建java文件WordCountMap.java,清空该文件并输入如下代码packagecn.jscfa.avgimportorg.apache.hadoop.io.IntWritableimportorg.apache.hadoop.io.LongWritableimportorg.apache.hadoop.io.Text任务实施:二、编写代码
importorg.apache.hadoop.mapreduce.Mapperimportjava.io.IOExceptionimportjava.util.StringTokenizerWordCountMapClassDefinitionpublicclassWordCountMapextendsMapper<LongWritable,Text,Text,IntWritable>{//声明Text类型的变量course,用于存储课程名称privateTextcourse=newText()任务实施:二、编写代码
//声明IntWritable类型的变量score,用于存储分数privateIntWritablescore=newIntWritable()//重写map方法,处理输入的键值对@OverrideMapFunctioninHadoopprotectedvoidmap(LongWritablekey,Textvalue,Contextcontext)throwsIOException,InterruptedException{//将Text类型的value转换为String类型任务实施:二、编写代码单击此处添加正文Stringline=value.toString()单击此处添加项正文//使用StringTokenizer对行进行分词,按逗号分隔单击此处添加项正文StringTokenizerInitializationStringTokenizeritr=newStringTokenizer(line,",")//如果分词后的数量大于等于3单击此处添加项正文if(itr.countTokens()>=3){单击此处添加项正文//跳过第一个分词(假设是学生ID)单击此处添加项正文任务实施:二、编写代码
itr.nextToken()
//获取课程名称
StringcourseName=itr.nextToken()
//获取分数并转换为整数类型
Stringfen=itr.nextToken()
intscoreValue=Integer.parseInt(fen)任务实施:二、编写代码//将课程名称设置到Text类型的变量course中course.set(courseName)//将分数设置到IntWritable类型的变量score中score.set(scoreValue)//将课程名称和分数作为输出键值对写入Context中context.write(course,score)任务实施:二、编写代码
步骤2编写Reducer类,在cn.jscfa.avg包下新建java文件WordCountMap.java,清空该文件并输入如下代码
packagecn.jscfa.avg单击此处添加项正文
importorg.apache.hadoop.io.IntWritable单击此处添加项正文
importorg.apache.hadoop.io.Text单击此处添加项正文
importorg.apache.hadoop.mapreduce.Reducer单击此处添加项正文
importjava.io.IOException单击此处添加项正文任务实施:二、编写代码
importjava.util.Iterator//定义WordCountReduce类,继承自Reducer类WordCountReduceClassDefinitionpublicclassWordCountReduceextendsReducer<Text,IntWritable,Text,IntWritable>{//声明IntWritable类型的变量average,用于存储平均值privateIntWritableaverage=newIntWritable()//重写reduce方法,对相同键的值进行合并任务实施:二、编写代码
@Override单击此处添加项正文
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 国庆-大学活动策划方案(3篇)
- 施工进度管理及控制制度
- 罕见血液病患者的营养支持方案
- 2026广东佛山市顺德区顺盛投资开发有限公司招聘1人备考题库及完整答案详解一套
- 甘肃省武威市第二中学 2026届英语高三上期末达标测试试题含解析
- 2026上半年贵州事业单位联考大方县招聘210人备考题库及答案详解(考点梳理)
- 销售部回款规定制度
- 2025浙江宁波文旅会展集团有限公司招聘9人备考题库及完整答案详解一套
- 农村小学食堂财务制度
- 家用电器财务制度范本
- 研究受试者知情同意书
- 2025年水利工程质量检测员考试(混凝土工程)全真模拟试题及答案及答案(云南省)
- 2025年3D建模服务保密协议
- 战场适应性训练
- 各种挖机租赁合同范本
- 油料运输应急预案
- 自来水维修抢修知识培训课件
- 2025浙江绍兴市新闻传媒中心(传媒集团)招聘6人笔试题库历年考点版附带答案详解
- 第四单元民族关系与国家关系(任务型复习课件)历史统编版选择性必修1
- 20kV及以下配电网工程设备材料价格信息(2025年上半年)
- 铁科院试验员培训课件
评论
0/150
提交评论