厦门大学数据库实验室MapReduce连接教学课件_第1页
厦门大学数据库实验室MapReduce连接教学课件_第2页
厦门大学数据库实验室MapReduce连接教学课件_第3页
厦门大学数据库实验室MapReduce连接教学课件_第4页
厦门大学数据库实验室MapReduce连接教学课件_第5页
已阅读5页,还剩42页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

厦门大学数据库实验室MapReduce连接16、人民应该为法律而战斗,就像为了城墙而战斗一样。——赫拉克利特17、人类对于不公正的行为加以指责,并非因为他们愿意做出这种行为,而是惟恐自己会成为这种行为的牺牲者。——柏拉图18、制定法律法令,就是为了不让强者做什么事都横行霸道。——奥维德19、法律是社会的习惯和思想的结晶。——托·伍·威尔逊20、人们嘴上挂着的法律,其真实含义是财富。——爱献生厦门大学数据库实验室MapReduce连接厦门大学数据库实验室MapReduce连接16、人民应该为法律而战斗,就像为了城墙而战斗一样。——赫拉克利特17、人类对于不公正的行为加以指责,并非因为他们愿意做出这种行为,而是惟恐自己会成为这种行为的牺牲者。——柏拉图18、制定法律法令,就是为了不让强者做什么事都横行霸道。——奥维德19、法律是社会的习惯和思想的结晶。——托·伍·威尔逊20、人们嘴上挂着的法律,其真实含义是财富。——爱献生

厦门大学数据库实验室

MapReduce连接报告人:李雨倩导师:林子雨

2014.07.12连接简介MapReduce连接策略连接关系图连接实例连接简介MapReduce连接策略连接连接是关系运算,可以用于合并关系。在数据库中,一般是表连接操作;在MapReduce中,连接可以用于合并两个或多个数据集。例如,用户基本信息和用户活动详情。用户基本信息来自于OLTP数据库。用户活动详情来自于日志文件。MapReduce的连接welcometousethesePowerPointtemplates,NewContentdesign,10yearsexperienceMapReduce连接的应用场景用户的人口统计信息的聚合操作(例如:青少年和中年人的习惯差异)当用户超过一定时间没有使用网站后,发邮件提醒他们。分析用户的浏览习惯,让系统可以提示用户有哪些网站特性还没有使用到,形成一个反馈循环。MapReduce中的连接策略重分区连接复制连接半连接——reduce端连接。使用场景:连接两个或多个大型数据集。——map端连接。使用场景:待连接的数据集中有一个数据集小到可以完全放在缓存中。——map端连接。使用场景:待连接的数据集中有一个数据集非常大,但同时这个数据集可以被过滤成小到可以放在缓存中。重分区连接重分区连接利用MapReduce的排序-合并机制来分组数据。它被实现为使用一个单独的MapReduce任务,并支持多路连接(这里的多路指的是多个数据集)。Map阶段负责从多个数据集中读取数据,决定每个数据的连接值,将连接值作为输出键。输出值则包含将在reduce阶段被合并的值。Reduce阶段,一个reducer接收map函数传来的一个输出键的所有输出值,并将数据分为多个分区。在此之后,reducer对所有的分区进行笛卡尔积连接运算,并生成全部的结果集。在如下示例中,用户数据中有用户姓名,年龄和所在州$cattest-data/ch4/users.txt

anne22NY

joe39CO

alison35NY

mike69VA

marie27OR

jim21OR

bob71CA

mary53NY

dave36VA

dude50CA用户活动日志中有用户姓名,进行的动作,来源IP。这个文件一般都要比用户数据要大得多。$cattest-data/ch4/user-logs.txt

jimlogout93.24.237.12

mikenew_tweet87.124.79.252

bobnew_tweet58.133.120.100

mikelogout55.237.104.36

jimnew_tweet93.24.237.12

marieview_user122.158.130.90$hadoopfs-puttest-data/ch4/user-logs.txtuser-logs.txt$bin/run.shcom.manning.hip.ch4.joins.improved.SampleMainusers.txt,user-logs.txtoutput$hadoopfs-catoutput/part*

bob71CAnew_tweet58.133.120.100

jim21ORlogout93.24.237.12

jim21ORnew_tweet93.24.237.12

jim21ORlogin198.184.237.49

marie27ORlogin58.133.120.100

marie27ORview_user122.158.130.90

mike69VAnew_tweet87.124.79.252

mike69VAlogout55.237.104.36重分区连接过滤(

filter)指的是将map极端的输入数据中不需要的部分丢弃。投影(

project)是关系代数的概念。投影用于减少发送给reducer的字段。优化重分区连接传统重分区方法的实现空间效率低下。它需要将连接的所有的输出值都读取到内存中,然后进行多路连接。事实上,如果仅仅将小数据集读取到内存中,然后用小数据集来遍历大数据集,进行连接,这样将更加高效。下图是优化后的重分区连接的流程图。Map输出的组合键和组合值上图说明了map输出的组合键和组合值。二次排序将会根据连接键(joinkey)进行分区,但会用整个组合键来进行排序。组合键包括一个标识源数据集(较大或较小)的整形值,因此可以根据这个整形值来保证较小源数据集的值先于较大源数据的值被reducer接收。优化重分区连接上图是实现的类图。类图中包含两个部分,一个通用框架和一些类的实现样例。使用这个连接框架需要实现抽象类OptimizedDataJoinMapperBase和OptimizedDataJoinReducerBase。OptimizedDataJoinMapperBaseprotectedabstractTextgenerateInputTag(StringinputFile);protectedabstractbooleanisInputSmaller(StringinputFile);publicvoidconfigure(JobConfjob){this.inputFile=job.get("map.input.file");this.inputTag=generateInputTag(this.inputFile);if(isInputSmaller(this.inputFile)){smaller=newBooleanWritable(true);outputKey.setOrder(0);}else{smaller=newBooleanWritable(false);outputKey.setOrder(1);}}这个类的作用是辨认出较小的数据集,并生成输出键和输出值。Configure方法在mapper创建期调用。Configure方法的作用之一是标识每一个数据集,让reducer可以区分数据的源数据集。另一个作用是辨认当前的输入数据是否是较小的数据集。OptimizedDataJoinMapperBase(续)protectedabstractOptimizedTaggedMapOutputgenerateTaggedMapOutput(Objectvalue);protectedabstractStringgenerateGroupKey(Objectkey,OptimizedTaggedMapOutputaRecord);publicvoidmap(Objectkey,Objectvalue,OutputCollectoroutput,Reporterreporter)throwsIOException{OptimizedTaggedMapOutputaRecord=generateTaggedMapOutput(value);if(aRecord==null){return;}aRecord.setSmaller(smaller);StringgroupKey=generateGroupKey(aRecord);if(groupKey==null){return;}outputKey.setKey(groupKey);output.collect(outputKey,aRecord);}

Map方法首先调用自定义的方法(generateTaggedMapOutput)来生成OutputValue对象。这个对象包含了在连接中需要使用的值,和一个标识较大或较小数据集的布尔值。如果map方法可以调用自定义的方法(generateGroupKey)来得到可以在连接中使用的键,那么这个键就作为map的输出键。OptimizedDataJoinReducerBasepublicvoidreduce(Objectkey,Iteratorvalues,OutputCollectoroutput,Reporterreporter)throwsIOException{CompositeKeyk=(CompositeKey)key;Listsmaller=newArrayList();while(values.hasNext()){Objectvalue=values.next();OptimizedTaggedMapOutputcloned=((OptimizedTaggedMapOutput)value).clone(job);if(cloned.isSmaller().get()){smaller.add(cloned);}else{joinAndCollect(k,smaller,cloned,output,reporter);}}}Map端处理后已经可以保证较小源数据集的值将会先于较大源数据集的值被接收。这里就可以将所有的较小源数据集的值放到缓存中。在开始接收较大源数据集的值的时候,就开始和缓存中的值做连接操作。OptimizedDataJoinRuducerBase(续)protectedabstractOptimizedTaggedMapOutputcombine(Stringkey,OptimizedTaggedMapOutputvalue1,OptimizedTaggedMapOutputvalue2);privatevoidjoinAndCollect(CompositeKeykey,Listsmaller,OptimizedTaggedMapOutputvalue,OutputCollectoroutput,Reporterreporter)throwsIOException{if(smaller.size()<1){OptimizedTaggedMapOutputcombined=combine(key.getKey(),null,value);collect(key,combined,output,reporter);}else{for(OptimizedTaggedMapOutputsmall:smaller){OptimizedTaggedMapOutputcombined=combine(key.getKey(),small,value);collect(key,combined,output,reporter);}}}方法joinAndCollect包含了两个数据集的值,并输出它们。优化重分区连接实例例如,需要连接用户详情数据和用户活动日志。第一步,判断两个数据集中哪一个比较小。对于一般的网站来说,用户详情数据会比较小,用户活动日志会比较大。首先,实现抽象类OptimizedDataJoinMapperBase。这个将在map端被调用。这个类将创建map的输出键和输出值。同时,它还将提示整个框架,当前处理的文件是不是比较小的那个。Map端实现代码publicclassSampleMapextendsOptimizedDataJoinMapperBase{privatebooleansmaller;OverrideprotectedTextgenerateInputTag(StringinputFile){//tagtherowwithinputfilename(datasource)smaller=inputFile.contains("users.txt");returnnewText(inputFile);}OverrideprotectedStringgenGroupKey(Objectkey,OutputValueoutput){returnkey.toString();}OverrideprotectedbooleanisInputSmaller(StringinputFile){returnsmaller;}OverrideprotectedOutputValuegenMapOutputValue(Objecto){returnnewTextTaggedOutputValue((Text)o);}}Reduce端实现代码第二步,你需要实现抽象类OptimizedDataJoinReducerBase。它将在reduce端被调用。在这个类中,将从map端传入不同数据集的输出键和输出值,然后返回reduce的输出数组。publicclassSampleReduceextendsOptimizedDataJoinReducerBase{privateTextTaggedOutputValueoutput=newTextTaggedOutputValue();privateTexttextOutput=newText();OverrideprotectedOutputValuecombine(Stringkey,OutputValuesmallValue,OutputValuelargeValue){if(smallValue==null||largeValue==null){returnnull;}Object[]values={smallValue.getData(),largeValue.getData()};textOutput.set(StringUtils.join(values,"\t"));output.setData(textOutput);returnoutput;}任务的主代码最后,任务的主代码需要指明InputFormat类,并设置二次排序。job.setInputFormat(KeyValueTextInputFormat.class);job.setMapOutputKeyClass(CompositeKey.class);job.setMapOutputValueClass(TextTaggedOutputValue.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.setPartitionerClass(CompositeKeyPartitioner.class);job.setOutputKeyComparatorClass(CompositeKeyComparator.class);job.setOutputValueGroupingComparator(CompositeKeyOnlyComparator.class);MapReduce中的连接策略重分区连接复制连接半连接——reduce端连接。使用场景:连接两个或多个大型数据集。——map端连接。使用场景:待连接的数据集中有一个数据集小到可以完全放在缓存中。——map端连接。使用场景:待连接的数据集中有一个数据集非常大,但同时这个数据集可以被过滤成小到可以放在缓存中。复制连接复制连接得名于它的具体实现:连接中最小的数据集将会被复制到所有的map主机节点。复制连接有一个假设前提:在被连接的数据集中,有一个数据集足够小到可以缓存在内存中。MapReduce的复制连接的工作原理如下:使用分布式缓存将这个小数据集复制到所有运行map任务的节点。用各个map任务初始化方法将这个小数据集装载到一个哈希表中。逐条用大数据集中的记录遍历这个哈希表,逐个判断是否符合连接条件输出符合连接条件的结果。复制连接一个复制连接通用框架该复制连接框架可以支持任意类型的数据集。这个框架中同样提供了一个优化的小功能:动态监测分布式缓存内容和输入块的大小,并判断哪个更大。如果输入块较小,那么就需要将map的输入块放到内存缓冲中,然后在mapper的cleanup方法中执行连接操作。下图为该框架的类图。并且提供了连接类(GenericReplicatedJoin)的具体实现,假设前提:每个数据文件的第一个标记是连接键。连接框架的算法Mapper的setup方法判断在map的输入块和分布式缓存的内容中哪个大。如果分布式缓存的内容比较小,那么它将被装载到内存缓存中。Map函数开始连接操作。如果输入块比较小,map函数将输入块的键\值对装载到内存缓存中。Map的cleanup方法将从分布式缓存中读取记录,逐条记录和在内存缓存中的键\值对进行连接操作。。GenericReplicatedJoin以下代码为GenericReplicatedJoin中的setup方法,它是在map的初始化阶段调用的。这个方法判断分布式缓存中的文件和输入块哪个大。如果文件比较小,则将文件装载到HashMap中。protectedvoidsetup(Contextcontext)throwsIOException,InterruptedException{distributedCacheFiles=DistributedCache.getLocalCacheFiles(context.getConfiguration());intdistCacheSizes=0;for(PathdistFile:distributedCacheFiles){FiledistributedCacheFile=newFile(distFile.toString());distCacheSizes+=distributedCacheFile.length();}if(context.getInputSplit()instanceofFileSplit){FileSplitsplit=(FileSplit)context.getInputSplit();longinputSplitSize=split.getLength();distributedCacheIsSmaller=(distCacheSizes<inputSplitSize);}else{distributedCacheIsSmaller=true;}if(distributedCacheIsSmaller){for(PathdistFile:distributedCacheFiles){FiledistributedCacheFile=newFile(distFile.toString());DistributedCacheFileReaderreader=getDistributedCacheReader();reader.init(distributedCacheFile);for(Pairp:(Iterable<Pair>)reader){addToCache(p);}reader.close();}}}GenericReplicatedJoin(续)以下代码为GenericReplicatedJoin中的Map方法。它将会根据setup方法是否将了分布式缓存的内容装载到内存的缓存中来选择行为。如果分布式缓存的内容被装载到内存中,那么map方法就将输入块的记录和内存中的缓存做连接操作。如果分布式缓存的内容没有被装载到内存中,那么map方法就将输入块的记录装载到内存中,然后在cleanup方法中使用。protectedvoidmap(Objectkey,Objectvalue,Contextcontext)throwsIOException,InterruptedException{Pairpair=readFromInputFormat(key,value);if(distributedCacheIsSmaller){joinAndCollect(pair,context);}else{addToCache(pair);}}publicvoidjoinAndCollect(Pairp,Contextcontext)throwsIOException,InterruptedException{List<Pair>cached=cachedRecords.get(p.getKey());if(cached!=null){for(Paircp:cached){Pairresult;if(distributedCacheIsSmaller){result=join(p,cp);}else{result=join(cp,p);}if(result!=null){context.write(result.getKey(),result.getData());}}}}publicPairjoin(PairinputSplitPair,PairdistCachePair){StringBuildersb=newStringBuilder();if(inputSplitPair.getData()!=null){sb.append(inputSplitPair.getData());}sb.append("\t");if(distCachePair.getData()!=null){sb.append(distCachePair.getData());}returnnewPair<Text,Text>(newText(inputSplitPair.getKey().toString()),newText(sb.toString()));}GenericReplicatedJoin(续)当所有的记录都被传输给map方法,MapReduce将会调用cleanup方法。如果分布式缓存的内容比输入块大,连接将会在cleanup中进行。连接的对象是map函数的缓存中的输入块的记录和分布式缓存中的记录。protectedvoidcleanup(Contextcontext)throwsIOException,InterruptedException{if(!distributedCacheIsSmaller){for(PathdistFile:distributedCacheFiles){FiledistributedCacheFile=newFile(distFile.toString());DistributedCacheFileReaderreader=getDistributedCacheReader();reader.init(distributedCacheFile);for(Pairp:(Iterable<Pair>)reader){joinAndCollect(p,context);}reader.close();}}}GenericReplicatedJoin(续)最后,任务的驱动代码必须指定需要装载到分布式缓存中的文件。以下的代码可以处理一个文件,也可以处理MapReduce输入结果的一个目录。Configurationconf=newConfiguration();FileSystemfs=smallFilePath.getFileSystem(conf);FileStatussmallFilePathStatus=fs.getFileStatus(smallFilePath);if(smallFilePathStatus.isDir()){for(FileStatusf:fs.listStatus(smallFilePath)){if(f.getPath().getName().startsWith("part")){DistributedCache.addCacheFile(f.getPath().toUri(),conf);}}}else{DistributedCache.addCacheFile(smallFilePath.toUri(),conf);}MapReduce中的连接策略重分区连接复制连接半连接——reduce端连接。使用场景:连接两个或多个大型数据集。——map端连接。使用场景:待连接的数据集中有一个数据集小到可以完全放在缓存中。——map端连接。使用场景:待连接的数据集中有一个数据集非常大,但同时这个数据集可以被过滤成小到可以放在缓存中。半连接假设一个场景,需要连接两个很大的数据集,例如用户日记和OLTP的用户数据。任何一个数据集都不是足够小到可以缓存在mapjob的内存中。如此看来,似乎就不得不应用reduce端的连接了。这时候,可以看一下题目本身:若是一个数据集中有的记录因为无法连接到另一个数据集的记录,将会被移除,还需要将全部数据集放到内存中吗?在这个例子中,在用户日记中的用户仅仅是OLTP用户数据中的用户中的很小的一个项目组。那么就可以从OLTP用户数据中取出存在于用户日记中的那项目组用户的用户数据。如许就可以获得足够小到可以放在内存中的数据集。如许的解决规划就叫做半连接。例子¥bin/run.shcom.manning.hip.ch4.joins.semijoin.Mainusers.txtuser-logs.txtoutput¥hadoopfs-lsoutput/user/aholmes/output/filtered/user/aholmes/output/result/user/aholmes/output/unique¥hadoopfs-catoutput/unique/part*

bob

jim

marie

mike¥hadoopfs-catoutput/filtered/part*

mike69VA

marie27OR

jim21OR

bob71CA¥hadoopfs-catoutput/result/part*

jimlogout93.24.237.1221OR

mikenew_tweet87.124.79.25269VA

bobnew_tweet58.133.120.10071CA

mikelogout55.237.104.3669VA

jimnew_tweet93.24.237.1221OR

marieview_user122.158.130.9027OR

jimlogin198.184.237.4921OR

marielogin58.133.120.10027OR半连接的实现——3个MapReducejob半连接的实现——job1第一个MapReducejob的功能是从日记文件中提取出用户名,用这些用户名生成一个用户名唯一的凑集(Set)。这经由过程让map函数履行了用户名的投影操作。然后用reducer输出用户名。为了削减在map阶段和reduce简短之间传输的数据量,就在map任务中采取哈希集来保存用户名,在cleanup办法中输出哈希集的值。Job1的实现代码publicstaticclassMapextendsMapper<Text,Text,Text,NullWritable>{privateSet<String>keys=newHashSet<String>();@Overrideprotectedvoidmap(Textkey,Textvalue,Contextcontext)throwsIOException,InterruptedException{keys.add(key.toString());}

@Overrideprotectedvoidcleanup(Contextcontext)throwsIOException,InterruptedException{TextoutputKey=newText();for(Stringkey:keys){outputKey.set(key);context.write(outputKey,NullWritable.get());}}}publicstaticclassReduceextendsReducer<Text,NullWritable,Text,NullWritable>{

@Overrideprotectedvoidreduce(Textkey,Iterable<NullWritable>values,Contextcontext)throwsIOException,InterruptedException{context.write(key,NullWritable.get());}}半连接的实现——job2第二步是一个进行过滤的MapReducejob。目标是从全部用户的用户数据集中移除不存在于日记文件中的用户。这是一个只包含map的功课。它用到了复制连接来缓存存在于日记文件中的用户名,并把它们和全部用户的数据集连接。来自于job1的唯一的数据集要远远小于全部用户的数据集。很自然的就把小的那个数据集放到缓

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

最新文档

评论

0/150

提交评论