hadoop倒排索引试验报告_第1页
hadoop倒排索引试验报告_第2页
hadoop倒排索引试验报告_第3页
hadoop倒排索引试验报告_第4页
hadoop倒排索引试验报告_第5页
免费预览已结束,剩余13页可下载查看

付费下载

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

1、大数据技术概论实验报告姓名:郭利强专业:工程管理专业学号:2015E8009064028目录1. 实验要求32. 环境说明42.1 系统硬件42.2 系统软件42.3 集群配置43. 实验设计43.1 第一部分设计43.2 第二部分设计64. 程序代码114.1 第一部分代码114.2 第二部分代码175. 实验输入和结果21实验输入输出结果见压缩包中对应目录211.实验要求第一部分:采用辅助排序的设计方法,对于输入的N个IP网络流量文件,计算得到文件中的各个源IP地址连接的不同目的IP地址个数,即对各个源IP地址连接的目的IP地址去重并计数举例如下:,3156.121

2、,28,178,115973.162.8,1159.1623131,1<156,122.23,34.17+119.8><156,121,28,178,129.87.122,1><,131.8L5,26><156.122,23,34,17,119.2><156,122.23.1,34.17.119心<159.1623131,137.23.S,123><156,12223.1,34.17.1191><156,121.28,178,129,87,122.1>第二部分:输入N个文件,生成带

3、详细信息的倒排索引举例如下,有4个输入文件: d1.txt:catdogcatfox d2.txt:catbearcatcatfox d3.txt:foxwolfdog d4.txt:wolfhenrabbitcatsheep要求建立如下格式的倒排索引: cat>3:4:(d1.txt,2,4),(d2.txt,3,5),(d4.txt,1,5)一单词一>由现该单词的文件个数:总文件个数:(由现该单词的文件名,单词在该文件中的由现次数,该文件的总单词数),2 .环境说明2.1 系统硬件处理器:IntelCorei3-2350MCPU2.3GHzX4内存:2GB磁盘:60GB2.2

4、系统软件操作系统:Ubuntu14.04LTS操作系统类型:32位Java版本:1.7.0_85Eclipse版本:3.8Hadoop插件:hadoop-eclipse-plugin-2.6.0.jarHadoop: 集群配置集群配置为伪分布模式,节点数量一个3 .实验设计3.1 第一部分设计利用两个Map/Reduce过程,在第一个MR中,读取记录并去除重复记录,第二个MR®照辅助排序设计方法,根据源地址进行分组,统计目的地址数量。第一个MRS计:自定义StringPair源地址,目的地址类型,实现WritableComparable,在map过程读取文件,输由&l

5、t;StringPair,NullWritable>,reduce过程去除重复记录输由<StringPair.toString,NullWritable>。在第二个MRS计:1 .在Map过程读取第一个MR勺输由,对value值进行拆分,并以拆分得到的源地址和目的地址初始化StringPair对象作为输由键,输由值为1。publicvoidmap(Objectkey,Textvalue,Contextcontext)throwsIOException,InterruptedExceptionString口records=value.toString().split("

6、;t");Stringsourceip=records0;Stringdesip=records1;context.write(newStringPair(sourceip,desip),one);2 .定义GroupComparator类,继承WritableComparator类,并重载compare方法,对Map过程输由按照StringPair.first排序,完成按照源地址分组。publicstaticclassGroupComparatorextendsWritableComparatorprotectedGroupComparator()super(StringPair.

7、class,true);Overridepublicintcompare(WritableComparablew1,WritableComparablew2)StringPairip1=(StringPair)w1;StringPairip2=(StringPair)w2;returnip1.getFirst().compareTo(ip2.getFirst();3 .在Reduce过程统计分组中的所有值,得到源地址连接不同目的地址数量。publicvoidreduce(StringPairkey,Iterable<IntWritable>values,Contextcontext

8、)throwsIOException,InterruptedExceptionintsum=0;for(IntWritableval:values)sum+=val.get();statistic.set(sum);context.write(key.getFirst(),statistic);3.2 第二部分设计利用两个Map/Reduce过程,第一个乂就计各个文件中的所有单词的由现次数,以及各个文件单词总数,第二个MR艮据统计结果处理加工得到单词倒排索引。第一个MRS计:1 .在Map过程中,重写map类,利用StringTokenizer类,将map方法中的value值中存储的文本,拆分

9、成一个个的单词,并获取文件名,以两种格式进行输由<filename+word,1>或者<filename,1>。publicvoidmap(Objectkey,Textvalue,Contextcontext)throwsIOException,InterruptedException/获取文件名FileSplitfileSplit=(FileSplit)context.getInputSplit();StringfileName=fileSplit.getPath().getName();|/获取单词在单个文件中出现次数,及文件单词总数StringTokenizeri

10、tr=newStringTokenizer(value.toString();for(;itr.hasMoreTokens();)Stringword=removeNonLetters(itr.nextToken().toLowerCase();StringfileWord=fileName+"001"+word;if(!word.equals("")context.write(newText(fileWord),newIntWritable(1);context.write(newText(fileName),newIntWritable(1);2 .

11、在Reduce过程中,统计得到每个文件中每个单词的由现次数,以及每个文件的单词总数,输由<key,count>。publicvoidreduce(Textkey,Iterable<IntWritable>values,Contextcontext)throwsIOException,InterruptedExceptionintsum=0;for(IntWritableval:values)sum+=val.get();context.write(key,newIntWritable(sum);)第二个M就计:1 .Map过程读取第一个MR的输由,对value值进行拆分

12、,重新组合后输由键为固定Text类型值index,值为filename+word+count或者filename+count。publicvoidmap(Objectkey,Textvalue,Contextcontext)throwsIOException,InterruptedExceptionStringvalStr=value.toString();String口records=valStr.split("t");context.write(newText("index"),newText(records0+"001"+rec

13、ords1);)2 .Reduce过程中定义四个HashMapMap<String,Integer>wordinfilescount,key为单词+文件名,value为单词在该文件中由现的次数;Map<String,Integer>filescount,key为文件名,value为文件的单词总数;Map<String,Integer>wordinfiles,key为单词,value为单词在多少个文件中由现;Map<String,String>indexes,key为单词,value为倒排索引。读取values值,根据设定分隔符拆分,判断拆分后长度

14、如果为2,则该值为文件名+文件单词总数,将拆分后的文件名及文件单词总数,组成键值对放入Map<String,Integer>filescount;拆分后长度如果为3,则该值为文件名+单词+单词在该文件中由现次数,将拆分后的文件名+单词及单词在该文件中出现次数组成键值对放入Map<String,Integer>wordinfilescount,同时统计单词在多少个文件中由现,并组成键值对放入Map<String,Integer>wordinfiles。遍历Map<String,Integer>wordinfilescount,将单词作为键,“单词-

15、>由现该单词的文件个数:总文件个数:(由现该单词的文件名,单词在该文件中的由现次数,该文件的总单词数)”作为值,放入Map<String,String>indexes中。遍历Map<String,String>indexes获取倒排索引并输由全部索引。publicvoidreduce(Textkey,Iterable<Text>values,Contextcontext)throwsIOException,InterruptedException/拆分输入,获取单词出现在几个文件中以及在该文件中出现次数,各个文件的单词总数,总文件数for(Textva

16、l:values)StringvalStr=val.toString();Stringrecords=valStr.split("001");switch(records.length)case2:filescount.put(records0,Integer.parseInt(records1);break;case3:wordinfilescount.put(valStr,Integer.parseInt(records2);if(!wordinfiles.containsKey(records1)wordinfiles.put(records1,1);elseword

17、infiles.put(records1,wordinfiles.get(records1)+1);break;/处理获取倒排索引for(Entry<String,Integer>entry:wordinfilescount.entrySet()StringvalStr=entry.getKey();Stringrecords=valStr.split("001");Stringword=records1;if(!indexes.containsKey(word)StringBuildersb=newStringBuilder();sb.append(word)

18、.append("->")|.append(wordinfiles.get(word).append(":")|.append(filescount.size().append(":")|.append("(")|.append(records0).append(",")|.append(entry.getValue().append(",")|.append(filescount.get(records。).append(")");indexes.

19、put(word,sb.toString();elseStringBuildersb=newStringBuilder();sb.append(",(").append(records0).append(",").append(entry.getValue().append(",").append(filescount.get(records0).append(")");|indexes.put(word,indexes.get(word)+sb.toString();for(Entry<String,Str

20、ing>entry:indexes.entrySet()context.write(newText(entry.getValue()+""),NullWritable.get();)4.程序代码4.1 第一部分代码1.IpStatistics.java/* LicensedtotheApacheSoftwareFoundation(ASF)underone* ormorecontributorlicenseagreements.SeetheNOTICEfile* distributedwiththisworkforadditionalinformation* rega

21、rdingcopyrightownership.TheASFlicensesthisfile* toyouundertheApacheLicense,Version2.0(the* "License");youmaynotusethisfileexceptincompliance* withtheLicense.YoumayobtainacopyoftheLicenseat* http:/licenses/LICENSE-2.0* Unlessrequiredbyapplicablelaworagreedtoinwriting,software*

22、 distributedundertheLicenseisdistributedonan"ASIS"BASIS,* WITHOUTWARRANTIESORCONDITIONSOFANYKIND,eitherexpressorimplied.* SeetheLicenseforthespecificlanguagegoverningpermissionsand* limitationsundertheLicense.*/importjava.io.IOException;importjava.util.ArrayList;importjava.util.Collections

23、;importjava.util.Comparator;importjava.util.HashMap;importjava.util.List;importjava.util.Map;importjava.util.Map.Entry;importjava.util.StringTokenizer;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.NullWritable;importorg.apache.hadoop.io.

24、Text;importorg.apache.hadoop.io.WritableComparable;importorg.apache.hadoop.io.WritableComparator;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.Reducer;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoo

25、p.mapreduce.lib.output.FileOutputFormat;importorg.apache.hadoop.util.GenericOptionsParser;importorg.apache.hadoop.fs.Path;publicclassIpStatistics/第一个Map/Reducemapfe,用于去重publicstaticclassRemoveMapperextendsMapper<Object,Text,StringPair,NullWritable>publicvoidmap(Objectkey,Textvalue,Contextconte

26、xt)throwsIOException,InterruptedExceptionStringTokenizeritr=newStringTokenizer(value.toString();while(itr.hasMoreTokens()StringnextToken=itr.nextToken();String口records=nextToken.split(",");Stringsourceip=records0.replace("<","");Stringdestinationip=records1.replace(&

27、quot;>","");context.write(newStringPair(sourceip,destinationip),NullWritable.get();/第二个Map/Reducedzi程ma酸,用于统计publicstaticclassStatisticsMapperextendsMapper<Object,Text,StringPair,IntWritable>IntWritableone=newIntWritable(1);publicvoidmap(Objectkey,Textvalue,Contextcontext)thr

28、owsIOException,InterruptedExceptionStringrecords=value.toString().split("t");Stringsourceip=records0;Stringdesip=records1;context.write(newStringPair(sourceip,desip),one);/按照源地址分组publicstaticclassGroupComparatorextendsWritableComparatorprotectedGroupComparator()super(StringPair.class,true)

29、;Overridepublicintcompare(WritableComparablew1,WritableComparablew2)StringPairip1=(StringPair)w1;StringPairip2=(StringPair)w2;returnip1.getFirst().compareTo(ip2.getFirst();)/第一个Map/Reducedzi程reduce过程,去重publicstaticclassRemoveReducerextendsReducer<StringPair,IntWritable,Text,NullWritable>public

30、voidreduce(StringPairkey,Iterable<IntWritable>values,Contextcontext)throwsIOException,InterruptedExceptioncontext.write(newText(key.toString(),NullWritable.get();)/第二个Map/Reducedzi程reduce过程,统计publicstaticclassStatisticsReducerextendsReducer<StringPair,IntWritable,Text,IntWritable>private

31、IntWritablestatistic=newIntWritable();publicvoidreduce(StringPairkey,Iterable<IntWritable>values,Contextcontext)throwsIOException,InterruptedExceptionintsum=0;for(IntWritableval:values)sum+=val.get();)statistic.set(sum);context.write(key.getFirst(),statistic);)/去重任务publicstaticvoidRemoveTask(S

32、tringargs)throwsExceptionConfigurationconf=newConfiguration();Stringmaster=""conf.set("fs.defaultFS","hdfs::9000");conf.set("hadoop.job.user","hadoop");conf.set("","yarn");conf.set("y

33、arn.resourcemanager.address",master+”:8032");conf.set("yarn.resourcemanager.scheduler.address",master+":8030");conf.set("mapred.jar","ipstatistics.jar");StringotherArgs=newGenericOptionsParser(conf,args).getRemainingArgs();if(otherArgs.length<3)Sy

34、stem.err.println("Usage:ipstatistics<in><in>.<out>");System.exit(2);)Jobjob=newJob(conf,"ipstatisticsRemoving");job.setMapperClass(RemoveMapper.class);job.setReducerClass(RemoveReducer.class);job.setOutputKeyClass(StringPair.class);job.setOutputValueClass(NullWr

35、itable.class);FileInputFormat.addInputPath(job,newPath(otherArgs0);FileOutputFormat.setOutputPath(job,newPath(otherArgs1);job.waitForCompletion(true);/统计任务publicstaticvoidStatisticsTask(Stringargs)throwsExceptionConfigurationconf=newConfiguration();Stringmaster=""conf.set("fs

36、.defaultFS","hdfs::9000");conf.set("hadoop.job.user","hadoop");conf.set("","yarn");conf.set("yarn.resourcemanager.address",master+”:8032");conf.set("yarn.resourcemanager.scheduler.address"

37、;,master+":8030");conf.set("mapred.jar","ipstatistics.jar");StringotherArgs=newGenericOptionsParser(conf,args).getRemainingArgs();if(otherArgs.length<3)System.err.println("Usage:ipstatistics<in><in>.<out>");System.exit(2);Jobjob=newJob(con

38、f,"ipstatisticsStatistics");job.setMapperClass(StatisticsMapper.class);job.setGroupingComparatorClass(GroupComparator.class);job.setReducerClass(StatisticsReducer.class);job.setOutputKeyClass(StringPair.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job,newP

39、ath(otherArgs1);FileOutputFormat.setOutputPath(job,newPath(otherArgs2);System.exit(job.waitForCompletion(true)?0:1);publicstaticvoidmain(Stringargs)throwsExceptionRemoveTask(args);StatisticsTask(args);2.StringPair.javaimportorg.apache.hadoop.io.Text;importorg.apache.hadoop.io.WritableComparable;impo

40、rtjava.io.Datalnput;importjava.io.DataOutput;importjava.io.IOException;publicclassStringPairimplementsWritableComparable<StringPair>privateTextfirst;privateTextsecond;publicStringPair()this.first=newText();this.second=newText();publicStringPair(Stringfirst,Stringsecond)set(newText(first),newTe

41、xt(second);publicStringPair(Textfirst,Textsecond)set(first,second);publicvoidset(Textfirst,Textsecond)this.first=first;this.second=second;publicTextgetFirst()returnfirst;publicTextgetSecond()returnsecond;)publicvoidwrite(DataOutputout)throwsIOExceptionfirst.write(out);second.write(out);)publicvoidre

42、adFields(DataInputin)throwsIOExceptionfirst.readFields(in);second.readFields(in);)OverridepublicinthashCode()returnfirst.hashCode()*163+second.hashCode();)Overridepublicbooleanequals(Objectobj)if(objinstanceofStringPair)StringPairip=(StringPair)obj;returnfirst.toString().equals(ip.first.toString()&a

43、mp;&second.toString().equals(ip.second.toString();4.2第二部分代码importjava.io.IOException;importjava.util.HashSet;importjava.util.Map;importjava.util.Set;importjava.util.StringTokenizer;importjava.util.HashMap;importjava.util.Map.Entry;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoo

44、p.fs.Path;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.NullWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.Reducer;importorg.apache.hadoop.mapreduce.Mapper.Context;importorg

45、.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.input.FileSplit;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;importorg.apache.hadoop.util.GenericOptionsParser;publicclassInverseV3/*第一个mr的ma暧,获取每个单词在单个文件中出现次数,输入为每个文件行偏移量,输出为<word+filename,1

46、>*或<filename,1>*/publicstaticclassstatisticsMapextendsMapper<Object,Text,Text,IntWritable>privateTextmapKey=newText("key");©Overridepublicvoidmap(Objectkey,Textvalue,Contextcontext)throwsIOException,InterruptedException/获取文件名FileSplitfileSplit=(FileSplit)context.getInpu

47、tSplit();StringfileName=fileSplit.getPath().getName();/获取单词在单个文件中出现次数,及文件单词总数StringTokenizeritr=newStringTokenizer(value.toString();for(;itr.hasMoreTokens();)Stringword=removeNonLetters(itr.nextToken().toLowerCase();StringfileWord=fileName+"001"+word;if(!word.equals("")context.wr

48、ite(newText(fileWord),newIntWritable(l);context.write(newText(fileName),newIntWritable(1);/去掉字符串非字母字符publicstaticStringremoveNonLetters(Stringoriginal)StringBufferaBuffer=newStringBuffer(original.length();charaCharacter;for(inti=0;i<original.length();i+)aCharacter=original.charAt(i);if(Character.

49、isLetter(aCharacter)aBuffer.append(aCharacter);returnnewString(aBuffer);/第一个mr白reduce类,统计汇总出现单词的文件个数,及每个文件中单词出现个数及每个文件单词个数,publicstaticclassstatisticsReduceextendsReducer<Text,IntWritable,Text,IntWritable>Overridepublicvoidreduce(Textkey,Iterable<IntWritable>values,Contextcontext)throwsI

50、OException,InterruptedExceptionintsum=0;for(IntWritableval:values)sum+=val.get();context.write(key,newIntWritable(sum);publicstaticclassInverseMapperextendsMapper<Object,Text,Text,Text>publicvoidmap(Objectkey,Textvalue,Contextcontext)throwsIOException,InterruptedExceptionStringvalStr=value.toS

51、tring();Stringrecords=valStr.split("t");context.write(newText("index"),newText(records0+"001"+records1);publicstaticclassInverseReducerextendsReducer<Text,Text,Text,NullWritable>privateMap<String,Integer>wordinfilescount=newHashMap<String,Integer>();/k

52、ey为单词+文件名,value为单词在该文件中出现的次数privateMap<String,Integer>filescount=newHashMap<String,Integer>();/key为文件名,value为文件的单词总数privateMap<String,Integer>wordinfiles=newHashMap<String,Integer>();/key为单词,value为单词在多少的文件中出现privateMap<String,String>indexes=newHashMap<String,String&g

53、t;();/key为单词,value为倒排索引publicvoidreduce(Textkey,Iterable<Text>values,Contextcontext)throwsIOException,InterruptedException/拆分输入,获取单词出现在几个文件中以及在该文件中出现次数,各个文件的单词总数,总文件数for(Textval:values)StringvalStr=val.toString();String口records=valStr.split("001");switch(records.length)case2:filescou

54、nt.put(records0,Integer.parseInt(records1);break;case3:wordinfilescount.put(valStr,Integer.parseInt(records2);if(!wordinfiles.containsKey(records1)wordinfiles.put(records1,1);elsewordinfiles.put(records1,wordinfiles.get(records1)+1);break;/处理获取倒排索引for(Entry<String,Integer>entry:wordinfilescoun

55、t.entrySet()StringvalStr=entry.getKey();Stringrecords=valStr.split("001");Stringword=records1;if(!indexes.containsKey(word)StringBuildersb=newStringBuilder();sb.append(word).append("->").append(wordinfiles.get(word).append(":").append(filescount.size().append(":

56、").append("(").append(records0).append(",").append(entry.getValue().append(",").append(filescount.get(records0).append(")");indexes.put(word,sb.toString();elseStringBuildersb=newStringBuilder();sb.append(",(").append(records0).append(",&quo

57、t;).append(entry.getValue().append(",").append(filescount.get(records0).append(")");indexes.put(word,indexes.get(word)+sb.toString();for(Entry<String,String>entry:indexes.entrySet()context.write(newText(entry.getValue()+""),NullWritable.get();/统计单词在文件中出现次数及单个文件单词总数publicstaticvoidStatisticsTask(Stringargs)throwsExceptionConfigurationconf=newConfiguration();Stringmaster=""conf.set("fs.defaultFS","hdfs::9000");conf.set("hadoop.job.user",

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

最新文档

评论

0/150

提交评论