Ch6-高级MapReduce编程技术_第1页
Ch6-高级MapReduce编程技术_第2页
Ch6-高级MapReduce编程技术_第3页
Ch6-高级MapReduce编程技术_第4页
Ch6-高级MapReduce编程技术_第5页
已阅读5页,还剩96页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

Ch.7.高级MapReduce编程技术,南京大学计算机科学与技术系主讲人:黄宜华,深入理解大数据-大数据处理与编程实践,鸣谢:本课程得到Google(北京)与Intel公司中国大学合作部精品课程计划资助,Ch.7.高级MapReduce编程技术,1.复合键值对的使用2.用户自定义数据类型3.用户自定义输入输出格式4.用户自定义Partitioner和Combiner5.迭代完成MapReduce计算6.组合式MapReduce程序设计7.多数据源的连接8.全局参数/数据文件的传递9.其它处理技术,用复合键让系统完成排序map计算过程结束后进行Partitioning处理时,系统自动按照map的输出键进行排序,因此,进入Reduce节点的(key,value)对将保证是按照key进行排序的,而value则不保证是排好序的。但有些应用中恰恰需要value列表中的value也是排好序的。为了解决这个问题,可以在Reduce过程中对value列表中的各个value进行本地排序。但当value列表数据量巨大、无法在本地内存中进行排序时,将出现问题。一个更好的办法是,将value中需要排序的部分加入到key中形成复合键,这样将能利用MapRecue系统的排序功能完成排序。但需要实现一个新的Partitioner,保证原来同一key值的键值对最后分区到同一个Reduce节点上。,1.复合键值对的使用,用复合键让系统完成排序,复合键值对的使用,带频率的倒排索引示例1:classMapper2:procedureMap(dociddn,docd)3:FnewAssociativeArray4:foralltermtdocddo5:FtFt+16:foralltermtFdo7:Emit(termt,posting)1:classReducer2:procedureReduce(termt,postings,)3:PnewList4:forallpostingpostings,do5:Append(P,)6:Sort(P);/进入Reduce节点的postings不保证按照文档序/号排序,因而需要对postings进行一个本地排序7:Emit(termt;postingsP),用复合键让系统完成排序,复合键值对的使用,带频率的倒排索引示例为了能利用系统自动对docid进行排序,解决方法是:代之以生成(term,)键值对,map时将term和docid组合起来形成复合键。但会引起新的问题,同一个term下的所有posting信息无法被分区到同一个Reduce节点,为此,需要实现一个新的Partitioner:从中取出term,以term作为key进行分区。,Arevisedexample,复合键值对的使用,InvertedIndexing,Arevisedexample(cont.),CustomizedPartitioner,复合键值对的使用,进入reduce的键值对按照(term,docid)排序,把小的键值对合并成大的键值对通常一个计算问题会产生大量的键值对,为了减少键值对传输和排序的开销,一些问题中的大量小的键值对可以被合并成一些大的键值对(pairs-stripes)。单词同现矩阵算法一个Map可能会产生单词a与其它单词间的多个键值对,这些键值对可以在Map过程中合并成右侧的一个大的键值对(条):然后,在Reduce阶段,把每个单词a的键值对(条)进行累加:,复合键值对的使用,(a,b)1(a,c)2(a,d)5(a,e)3(a,f)2,ab:1,c:2,d:5,e:3,f:2,ab:1,d:5,e:3ab:1,c:2,d:2,f:2ab:2,c:2,d:7,e:3,f:2,+,把小的键值对合并成大的键值对单词同现矩阵算法,复合键值对的使用,Clustersize:38coresDataSource:AssociatedPressWorldstream(APW)oftheEnglishGigawordCorpus(v3),whichcontains2.27milliondocuments(1.8GBcompressed,5.7GBuncompressed),把小的键值对合并成大的键值对用JobTracker的WebGUI可以观察优化执行后各项统计数据,复合键值对的使用,Hadoop内置的数据类型这些数据类型都实现了WritableComparable接口,以便进行网络传输和文件存储,以及进行大小比较。,2.用户自定义数据类型,用户自定义数据类型需要实现Writable接口,作为key或者需要比较大小时则需要实现WritableComparable接口,用户自定义数据类型,publicclassPoint3DimplementsWritableComparableprivateintx,y,z;publicintgetX()returnx;publicintgetY()returny;publicintgetZ()returnz;publicvoidwrite(DataOutputout)throwsIOExceptionout.writeFloat(x);out.writeFloat(y);out.writeFloat(z);publicvoidreadFields(DataInputin)throwsIOExceptionx=in.readFloat();y=in.readFloat();z=in.readFloat();publicintcompareTo(Point3Dp)/comparesthis(x,y,z)withp(x,y,z)andoutputs-1,0,1,若用户需要自定义数据类型,实现WritableComparable接口publicclassEdgeimplementsWritableComparableprivateStringdepartureNode;privateStringarrivalNode;publicStringgetDepartureNode()returndepartureNode;OverridepublicvoidreadFields(DataInputin)throwsIOExceptiondepartureNode=in.readUTF();arrivalNode=in.readUTF();Overridepublicvoidwrite(DataOutputout)throwsIOExceptionout.writeUTF(departureNode);out.writeUTF(arrivalNode);OverridepublicintcompareTo(Edgeo)return(departureNpareTo(o.departureNode)!=0)?departureNpareTo(o.departureNode):arrivalNpareTo(o.arrivalNode);,用户自定义数据类型,Hadoop内置的文件输入格式,3.用户自定义输入输出格式,InterfaceInputFormat,TheMap-ReduceframeworkreliesontheInputFormatofthejobto:Validatetheinput-specificationofthejob.Split-uptheinputfile(s)intologicalInputSplits,eachofwhichisthenassignedtoanindividualMapper.ProvidetheRecordReaderimplementationtobeusedtogleaninputrecordsfromthelogicalInputSplitforprocessingbytheMapper.,HadoopMapReduce主要组件,HadoopMapReduce的基本工作原理,输入数据分块InputSplits,InputSplit定义了输入到单个Map任务的输入数据一个MapReduce程序被统称为一个Job,可能有上百个任务构成InputSplit将文件分为64MB的大小配置文件hadoop-site.xml中的mapred.min.split.size参数控制这个大小mapred.tasktracker.map.taks.maximum用来控制某一个节点上所有map任务的最大数目,ClassInputSplit,lnputformat将输入文件分成逻辑上的lnputSplit片,每一个InputSplit为相对应的Mapper程序提供服务。InputSplit和HDFS块的区别:后者代表对磁盘上输入数据的物理划分,而前者者是Mapper程序所用的数据的逻辑划分。InputSplit可以用一种用户透明的方式包含许多HDFS块。例如,对于TextlnputFomiat,不能保证每一行只占据一个块,有些行会跨越多个块。LineRecoderReader是TextInutFormat的RecoderReader,为用户处理RecoderReader。InputSplit必须要从不同于HDFS块中获取一条记录的一个片段,HadoopMapReduce主要组件,HadoopMapReduce的基本工作原理,数据记录读入RecordReader,InputSplit定义了一个数据分块,但是没有定义如何读取数据记录RecordReader实际上定义了如何将数据记录转化为一个(key,value)对的详细方法,并将数据记录传给Mapper类TextInputFormat提供了LineRecordReader,读入一个文本行数据记录,ClassRecordReader,Hadoop内置的文件输入格式,用户自定义输入输出格式,AutoInputFormat,CombineFileInutFormat,CompositeInputFormat,DBInputFormat,FileInputFormat,KeyValueTextInputFormat,LineDocInputFormat,MultiFileInputFormat,NLineInputFormat,SequenceFileAsBinaryInputFormat,SequenceFileAsTextInputFormat,SequenceFileInputFilter,SequenceFileInputFormat,StreamInputFormat,TextInputFormat,Hadoop内置的RecordReader,用户自定义输入输出格式,Hadoop内置的RecordReader,用户自定义输入输出格式,CombineFileRecordReader,DBInputFormat.DBRecordReader,InnerJoinRecordReader,JoinRecordReader,KeyValueLineRecordReader,LineDocRecordReader,MultiFilterRecordReader,OuterJoinRecordReader,OverrideRecordReader,SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader,SequenceFileAsTextRecordReader,SequenceFileRecordReader,StreamBaseRecordReader,StreamXmlRecordReader,WrappedRecordReader,用户自定义InputFormat和RecordReader示例,简单的文档倒排索引importjava.io.IOException;importjava.util.StringTokenizer;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Mapper;publicclassInvertedIndexMapperextendsMapperOverrideprotectedvoidmap(Textkey,Textvalue,Contextcontext)throwsIOException,InterruptedException/defaultRecordReader:LineRecordReader;key:lineoffset;value:linestringTextword=newText();FileSplitfileSplit=(FileSplit)context.getInputSplit();StringfileName=fileSplit.getPath().getName();TextfileName_lineOffset=newText(fileName+”#”+key.toString();StringTokenizeritr=newStringTokenizer(value.toString();for(;itr.hasMoreTokens();)word.set(itr.nextToken();context.write(word,fileName_lineOffset);,用户自定义输入输出格式,由于采用了缺省的TextInputFormat和LineRecordReader,需要增加此段代码完成特殊处理,用户自定义InputFormat和RecordReader示例,简单的文档倒排索引可以自定义一个InputFormat和RecordReader实现同样的效果,用户自定义输入输出格式,publicclassFileNameOffsetInputFormatextendsFileInputFormatOverridepublicRecordReadercreateRecordReader(InputSplitsplit,TaskAttemptContextcontext)FileNameOffsetRecordReaderfnrr=newFileNameOffsetRecordReader();tryfnrr.initialize(split,context);catch(IOExceptione)e.printStackTrace();catch(InterruptedExceptione)e.printStackTrace();returnfnrr;,用户自定义InputFormat和RecordReader示例简单的文档倒排索引,用户自定义输入输出格式,publicclassFileNameOffsetRecordReaderextendsRecordReaderStringfileName;LineRecordReaderlrr=newLineRecordReader();OverridepublicTextgetCurrentKey()throwsIOException,InterruptedExceptionreturnnewText(+fileName+“#+lrr.getCurrentKey()+);OverridepublicTextgetCurrentValue()throwsIOException,InterruptedExceptionreturnlrr.getCurrentValue();Overridepublicvoidinitialize(InputSplitarg0,TaskAttemptContextarg1)throwsIOException,InterruptedExceptionlrr.initialize(arg0,arg1);fileName=(FileSplit)arg0).getPath().getName();,简单的文档倒排索引publicclassInvertedIndexerpublicstaticvoidmain(Stringargs)tryConfigurationconf=newConfiguration();job=newJob(conf,invertindex);job.setJarByClass(InvertedIndexer.class);job.setInputFormatClass(FileNameOffsetInputFormat.class);job.setMapperClass(InvertedIndexMapper.class);job.setReducerClass(InvertedIndexReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);FileInputFormat.addInputPath(job,newPath(args0);FileOutputFormat.setOutputPath(job,newPath(args1);System.exit(job.waitForCompletion(true)?0:1);catch(Exceptione)e.printStackTrace();,用户自定义输入输出格式,用户自定义InputFormat和RecordReader示例,用户自定义InputFormat和RecordReader示例,简单的文档倒排索引importjava.io.IOException;importjava.util.StringTokenizer;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Mapper;publicclassInvertedIndexMapperextendsMapperOverrideprotectedvoidmap(Textkey,Textvalue,Contextcontext)throwsIOException,InterruptedException/InputFormat:FileNameOffsetInputFormat/RecordReader:FileNameOffsetRecordReader;key:filename#lineoffset;value:linestringTextword=newText();StringTokenizeritr=newStringTokenizer(value.toString();for(;itr.hasMoreTokens();)word.set(itr.nextToken();context.write(word,key);,用户自定义输入输出格式,用户自定义OutputFormat和RecordWriterHadoop内置的OutputFormat和RecordWriter,用户自定义输入输出格式,DBOutputFormat,FileOutputFormat,FilterOutputFormat,IndexUpdateOutputFormat,LazyOutputFormat,MapFileOutputFormat,MultipleOutputFormat,MultipleSequenceFileOutputFormat,MultipleTextOutputFormat,NullOutputFormat,SequenceFileAsBinaryOutputFormat,SequenceFileOutputFormat,TextOutputFormat,用户自定义OutputFormat和RecordWriterHadoop内置的OutputFormat和RecordWriter与InputFormat和RecordReader类似,用户可以根据需要定制OutputFormat和RecordWriter,用户自定义输入输出格式,DBOutputFormat.DBRecordWriter,FilterOutputFormat.FilterRecordWriter,TextOutputFormat.LineRecordWriter,MultipleInputs:多个输入文件使用不同的InputFormat和Mapper,用户自定义Partitioner,哪个key到哪个Reducer的分配过程,即shuffle过程,是由Partitioner规定的。,默认Partitioner:HashPartitioner,默认的分区函数,是通过key的hashCode对numReduceTasks求模得到。,定制Partitioner程序员可以根据需要定制Partitioner来改变Map中间结果到Reduce节点的分区方式,并在Job中设置新的Partitioner,4.用户自定义Partitioner,ClassNewPartitionerextendsHashPartitioner/overridethemethodgetPartition(Kkey,Vvalue,intnumReduceTasks)term=key.toString().split(“,”)0;/=termsuper.getPartition(term,value,numReduceTasks);并在Job中设置新的Partitioner:Job.setPartitionerClass(NewPartitioner),定制Combiner程序员可以根据需要定制Combiner来减少网络数据传输量,提高系统效率,并在Job中设置新的Combiner每年申请美国专利的国家数统计Patentdescriptiondataset“apat63_99.txt”“PATENT”,”GYEAR”,”GDATE”,”APPYEAR”,”COUNTRY”,”POSTATE”,”ASSIGNEE”,”ASSCODE”,”CLAIMS”,”NCLASS”,”CAT”,”SUBCAT”,”CMADE”,”CRECEIVE”,”RATIOCIT”,”GENERAL”,”ORIGINAL”,”FWDAPLAG”,”BCKGTLAG”,”SELFCTUB”,”SELFCTLB”,”SECDUPBD”,”SECDLWBD”3070801,1963,1096,”BE”,”,1,269,6,69,1,0,3070802,1963,1096,”US”,”TX”,1,2,6,63,0,3070803,1963,1096,”US”,”IL”,1,2,6,63,9,0.3704,3070804,1963,1096,”US”,”OH”,1,2,6,63,3,0.6667,3070805,1963,1096,”US”,”CA”,1,2,6,63,1,0,用户自定义和Combiner,定制Combiner每年申请美国专利的国家数统计1.Map中用作为key输出,Emit(,1)(,1),(,1),(,1),2.实现一个定制的Partitioner,保证同一年份的数据划分到同一个Reduce节点3.Reduce中对每一个(,1,1,1,)输入,忽略后部的出现次数,仅考虑key部分:问题:Map结果(,1,1,1,)数据通信量较大解决办法:实现一个Combiner将1,1,1,合并为1,用户自定义Partitioner和Combiner,定制Combiner每年申请美国专利的国家数统计publicstaticclassNewCombinerextendsReducerpublicvoidreduce(Textkey,Iterablevalues,Contextcontext)throwsIOException,InterruptedException/忽略(,1,1,1,)后部很长的数据串,/归并为的1次出现context.write(key,newIntWritable(1);/输出key:;value:1,用户自定义Partitioner和Combiner,37,通过在Driver程序中使用Job.setComparatorClass指定排序方式若没有指定,SortComparator默认使用Key的compareTo方法排序,GroupingComparator,GroupingComparator类:在Reducer端对键进行聚拢,它是用来确定Grouping标准的。Grouping标准决定从Mapper发往Reducer的哪些值将在一次reduce调用中被处理。GroupingComparator类只关心它的compare方法是返回0值,还是非0值。0表示两个键需要进行聚拢,非0就表示不聚拢使用GroupingComparator,reduce函数的输入key为第一个被聚拢的键值对中的键,航空公司数据集,每一行包括如下数据,二次排序,要求问题定义:在某次航班延迟到达一个机场的报告中,给出该机场上次航班延迟的信息输出格式|难点同一个机场的记录应该分配到同一个Reducer当中记录要按照到达时间排序解决方案错误方案:使用为Map的输出键,利用Hadoop默认排序错误方案:以为key,在Reducer端排序二次排序,代码,自定义ArrivalFlightKey:两个成员destinationAirport,arrivalDtTimehashCode():destinationAirport.hashCode()CompareTo():destinationAirport.CompareTo()自定义ArrivalFlightKeyBasedPartitionerMath.abs(key.destinationAirport.hashCode()%numPartitions,SortingComparator类,GroupComparator类,Mapper,Reducer,run,基本问题一些求解计算需要用迭代方法求得逼近结果(求解计算必须是收敛性的)。当用MapReduce进行这样的问题求解时,运行一趟MapReduce过程无法完成整个求解过程,因此,需要采用迭代方法循环运行该MapReduce过程,直到达到一个逼近结果。页面排序算法PageRank随机浏览模型:假设一位上网者随机地浏览一些网页有可能从当前网页点击一个链接继续浏览(概率为d);有可能随机跳转到其它N个网页中的任一个(概率为1-d)。每个网页的PageRank值可以看成该网页被随机浏览的概率:L(pj)为网页pj上的超链个数,5.迭代MapReduce计算,页面排序算法PageRank问题是在求解PR(pi)时,需要递归调用PR(pj),而PR(pj)本身也是待求解的。因此,我们只能先给每个网页赋一个假定的PR值,如0.5。但这样求出的PR(pi)肯定不准确。然而,当用求出的PR值反复进行迭代计算时,会越来越趋近于最终的准确结果。因此,需要用迭代方法循环运行MapReduce过程,直至第n次迭代后的结果与第n-1次的结果小于某个指定的阀值时结束。,迭代MapReduce计算,基本问题一些复杂任务难以用一趟MapReduce处理过程来完成,需要将其分为多趟简单些的MapReduce子任务组合完成。如:专利文献引用直方图统计,需要先进行被引次数统计,然后在被引次数上再进行被引直方图统计,6.组合式MapReduce程序设计,“CITING”,”CITED”3858241,9562033858241,13242343858241,33984063858241,35573843858241,36348893858242,15157013858242,33192613858242,36687053858242,3707004.,12(2,1)(2,3)100001(1,1)(1,9)1000001(1,1)10000061(1,1)(3,1)10000071(1,1)10000111(1,1)10000171(1,1)10000261(1,1)10000332(2,1)10000431(1,1)10000442(2,1)10000451(1,1)10000463(3,1),MapReduce1,MapReduce2,MapReduce子任务的顺序化执行多个MapReduce子任务可以用手工逐一执行,但更方便的做法是将这些子任务串起来,前面MapReduce任务的输出作为后面MapReduce的输入,自动地完成顺序化的执行,如:mapreduce-1=mapreduce-2=mapreduce-3=.,组合式MapReduce程序设计,单个MapReduce作业控制执行代码:Configurationjobconf=newConfiguration();job=newJob(jobconf,invertindex);job.setJarByClass(InvertedIndexer.class);FileInputFormat.addInputPath(job,newPath(args0);FileOutputFormat.setOutputPath(job,newPath(args1);job.waitForCompletion(true);,MapReduce子任务的顺序化执行同样,链式MapReduce中的每个子任务需要提供独立的jobconf,并按照前后子任务间的输入输出关系设置输入输出路径,而任务完成后所有中间过程的输出结果路径都可以删除掉。,组合式MapReduce程序设计,Configurationjobconf1=newConfiguration();job1=newJob(jobconf1,“Job1);job1.setJarByClass(jobclass1);FileInputFormat.addInputPath(job1,inpath1);FileOutputFormat.setOutputPath(job1,outpath1);job1.waitForCompletion(true);,Configurationjobconf2=newConfiguration();job2=newJob(jobconf2,“Job2);job2.setJarByClass(jobclass2);FileInputFormat.addInputPath(job2,outpath1);FileOutputFormat.setOutputPath(job2,outpath2);job2.waitForCompletion(true);,Configurationjobconf3=newConfiguration();job3=newJob(jobconf3,“Job3);job3.setJarByClass(jobclass3);FileInputFormat.addInputPath(job3,outpath2);FileOutputFormat.setOutputPath(job3,outpath3);job3.waitForCompletion(true);,具有数据依赖关系的MapReduce子任务的执行设一个MapReduce任务由子任务x、y和z构成,其中x和y是相互独立的,但z依赖于x和y,因此,x,y和z不能按照顺序执行。如:x处理一个数据集Dx,y处理另一个数据集Dy,然后z需要将Dx和Dy进行一个jion处理,则z一定要等到x和y执行完毕才能开始执行。,组合式MapReduce程序设计,Jobx,Joby,Jobz,Job,具有数据依赖关系的MapReduce子任务的执行Hadoop通过Job和JobControl类提供了一种管理这种具有数据依赖关系的子任务的MapReduce作业的执行。Job除了维护Conf信息外,还能维护job间的依赖关系。jobx=newJob(jobxconf,“Jobx);joby=newJob(jobyconf,“Joby);jobz=newJob(jobzconf,“Jobz);jobz.addDependingJob(jobx);/jobz将等待jobx执行完毕jobz.addDependingJob(joby);/jobz将等待joby执行完毕,组合式MapReduce程序设计,具有数据依赖关系的MapReduce子任务的执行JobControl类用来控制整个作业的执行。把所有子任务的作业加入到JobControl中,执行JobControl的run()方法即可开始整个作业的执行jobx=newJob(jobxconf,“Jobx);joby=newJob(jobyconf,“Joby);jobz=newJob(jobzconf,“Jobz);jobz.addDependingJob(jobx);/jobz将等待jobx执行完毕jobz.addDependingJob(joby);/jobz将等待joby执行完毕JobControlJC=newJobControl(“XYZJob”);JC.addJob(jobx);JC.addJob(joby);JC.addJob(jobz);JC.run();,组合式MapReduce程序设计,MapReduce前处理和后处理步骤的链式执行一个MapReduce作业可能会有一些前处理和后处理步骤,比如,文档倒排索引处理前需要一个去除Stop-word的前处理,倒排索引处理后需要一个变形词后处理步骤(making,made-make)。将这些前后处理步骤实现为单独的MapReduce任务可以达到目的,但多个独立的MapReduce作业的执行开销较大,且增加很多I/O操作,因而效率不高。一个办法是在核心的Map和Reduce过程之外,把这些前后处理步骤实现为一些辅助的Map和Reduce过程,将这些辅助Map和Reduce过程与核心Map和Reduce过程合并为一个过程链,从而完成整个作业的执行。,组合式MapReduce程序设计,MapReduce前处理和后处理步骤的链式执行Hadoop提供了链式Mapper(ChainMapper)和链式Reducer(ChainReducer)来完成这种处理。ChainMapper和ChainReducer分别提供了addMapper方法加入一系列Mapper:ChainMapper.addMapper()ChainReducer.addMapper()publicstaticvoidaddMapper(Jobjob,/主作业Classmclass,/待加入的mapclassClassinputKeyClass,/待加入的map输入键classClassinputValueClass,/待加入的map输入键值classClassoutputKeyClass,/待加入的map输出键classClassoutputValueClass,/待加入的map输出键值classorg.apache.hadoop.conf.ConfigurationmapperConf/待加入的map的conf)throwsIOException,组合式MapReduce程序设计,MapReduce前处理和后处理步骤的链式执行设有一个完整的MapReduce作业,由Map1,Map2,Reduce,Map3,Map4构成。Configurationconf=newConfiguration();Jobjob=newJob(conf);job.setJobName(“ChainJob”);job.setInputFormat(TextInputFormat.class);job.setOutputFormat(TextOutputFormat.class);FileInputFormat.setInputPaths(job,in);FileOutputFormat.setOutputPath(job,out);JobConfmap1Conf=newJobConf(false);ChainMapper.addMapper(job,Map1.class,LongWritable.class,Text.class,Text.class,Text.class,true,map1Conf);JobConfmap2Conf=newJobConf(false);ChainMapper.addMapper(job,Map2.class,Text.class,Text.class,LongWritable.class,Text.class,true,map2Conf);,组合式MapReduce程序设计,MapReduce前处理和后处理步骤的链式执行(接前页)JobConfreduceConf=newJobConf(false);ChainReducer.setReducer(job,Reduce.class,LongWritable.class,Text.class,Text.class,Text.class,true,reduceConf);JobConfmap3Conf=newJobConf(false);ChainReducer.addMapper(job,Map3.class,Text.class,Text.class,LongWritable.class,Text.class,true,map3Conf);JobConfmap4Conf=newJobConf(false);ChainReducer.addMapper(job,Map4.class,LongWritable.class,Text.class,LongWritable.class,Text.class,true,map4Conf);JobClient.runJob(job);,组合式MapReduce程序设计,本例引自ChuckLam,HadoopinAction,2010,ManningPublications,基本问题一个MapReduce任务很可能需要访问和处理两个甚至多个数据集,比如,专利文献数据分析中,当需要统计每个国家的专利引用率时,将需要同时访问“专利引用数据集cite75_99.txt”和专利描述数据集“apat63_99.txt”。在关系数据库中,这将是两个或多个表的连接(join)处理,且join操作完全由数据库系统负责处理。但Hadoop系统没有关系数据库中那样强大的join处理功能,因此多数据源的连接处理比关系数据库中要复杂一些。根据不同的需要和权衡,可以有几种不同的连接方法。,7.多数据源的连接,本例引自ChuckLam,HadoopinAction,2010,ManningPublications,设有两个数据集,一个是顾客数据集:CustomerID,Name,PhoneNumber1,王二,025-1111-11112,张三,021-2222-22223,李四,025-3333-33334,孙五,010-4444-4444第二个是顾客的订单数据集为:CustomerID,OrderID,Price,PurchaseDate3,订单1,90,2011.8.11,订单2,130,2011.8.62,订单3,220,2011.8.103,订单4,160,2011.8.18以CustomerID进行内连接(innerjoin)后的数据记录是:CustomerID,Name,PhoneNumber,OrderID,Price,PurchaseDate1,王二,025-1111-1111,订单2,90,2011.8.62,张三,021-2222-2222,订单3,220,2011.8.103,李四,025-3333-3333,订单1,100,2011.8.13,李四,025-3333-3333,订单4,160,2011.8.18,多数据源的连接,用DataJoin类实现Reduce端Join,多数据源的连接,map()将为每个记录附加一个数据源标签,以便后续能辨别每个记录来自哪个数据源,用DataJoin类实现Reduce端Join,多数据源的连接,用DataJoin类实现Reduce端JoinHadoop提供了一个实现多数据源连接的基本框架DataJoin类,帮助程序员完成一些必要的连接处理;DataJoin框架包含以下三个抽象类,使用DataJoin框架后,程序员仅需要实现几个抽象方法:DataJoinMapperBase:由程序员实现的Mapper类继承,该基类已实现了map()方法用以完成标签化数据记录的生成和输出,因此程序员仅需实现三个产生数据源标签、GroupKey(key)和标签化记录(value)所需要的抽象方法DataJoinReducerBase:由程序员实现的Reducer类继承,该基类已实现了reduce()方法用以完成多数据源记录的叉积组合生成TaggedMapOutput:描述一个标签化数据记录,实现了getTag(),setTag()方法;作为Mapper的key-value输出中的value的数据类型,由于需要进行I/O,程序员需要继承并实现Writable接口,并实现抽象的getData()方法用以读取记录数据。,多数据源的连接,用DataJoin类实现Reduce端JoinMapper需要实现的抽象方法abstractTextgenerateInputTag(StringinputFile)由程序员决定如何产生记录的数据源标签如:使用数据集的文件名为数据源标签时(如Customers):protectedTextgenerateInputTag(StringinputFile)returnnewText(inputFile);当一个数据集包含多个文件时(如part-0000,part-0001,等)、以致无法直接用文件名时,可以从这些文件名定制一个标签名,比如protectedTextgenerateInputTag(StringinputFile)/取“-”前的“part”作为标签名Stringdatasource=inputFile.split(-)0;returnnewText(datasource);,多数据源的连接,用DataJoin类实现Redu

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论