付费下载
下载本文档
版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
1、大数据技术概论实验报告姓名:郭利强专业:工程管理专业学号:2015E8009064028目录1. 实验要求32. 环境说明42.1 系统硬件42.2 系统软件42.3 集群配置43. 实验设计43.1 第一部分设计43.2 第二部分设计64. 程序代码114.1 第一部分代码114.2 第二部分代码175. 实验输入和结果21实验输入输出结果见压缩包中对应目录211.实验要求第一部分:采用辅助排序的设计方法,对于输入的N个IP网络流量文件,计算得到文件中的各个源IP地址连接的不同目的IP地址个数,即对各个源IP地址连接的目的IP地址去重并计数举例如下:,3156.121
2、,28,178,115973.162.8,1159.1623131,1<156,122.23,34.17+119.8><156,121,28,178,129.87.122,1><,131.8L5,26><156.122,23,34,17,119.2><156,122.23.1,34.17.119心<159.1623131,137.23.S,123><156,12223.1,34.17.1191><156,121.28,178,129,87,122.1>第二部分:输入N个文件,生成带
3、详细信息的倒排索引举例如下,有4个输入文件: d1.txt:catdogcatfox d2.txt:catbearcatcatfox d3.txt:foxwolfdog d4.txt:wolfhenrabbitcatsheep要求建立如下格式的倒排索引: cat>3:4:(d1.txt,2,4),(d2.txt,3,5),(d4.txt,1,5)一单词一>由现该单词的文件个数:总文件个数:(由现该单词的文件名,单词在该文件中的由现次数,该文件的总单词数),2 .环境说明2.1 系统硬件处理器:IntelCorei3-2350MCPU2.3GHzX4内存:2GB磁盘:60GB2.2
4、系统软件操作系统:Ubuntu14.04LTS操作系统类型:32位Java版本:1.7.0_85Eclipse版本:3.8Hadoop插件:hadoop-eclipse-plugin-2.6.0.jarHadoop: 集群配置集群配置为伪分布模式,节点数量一个3 .实验设计3.1 第一部分设计利用两个Map/Reduce过程,在第一个MR中,读取记录并去除重复记录,第二个MR®照辅助排序设计方法,根据源地址进行分组,统计目的地址数量。第一个MRS计:自定义StringPair源地址,目的地址类型,实现WritableComparable,在map过程读取文件,输由&l
5、t;StringPair,NullWritable>,reduce过程去除重复记录输由<StringPair.toString,NullWritable>。在第二个MRS计:1 .在Map过程读取第一个MR勺输由,对value值进行拆分,并以拆分得到的源地址和目的地址初始化StringPair对象作为输由键,输由值为1。publicvoidmap(Objectkey,Textvalue,Contextcontext)throwsIOException,InterruptedExceptionString口records=value.toString().split("
6、;t");Stringsourceip=records0;Stringdesip=records1;context.write(newStringPair(sourceip,desip),one);2 .定义GroupComparator类,继承WritableComparator类,并重载compare方法,对Map过程输由按照StringPair.first排序,完成按照源地址分组。publicstaticclassGroupComparatorextendsWritableComparatorprotectedGroupComparator()super(StringPair.
7、class,true);Overridepublicintcompare(WritableComparablew1,WritableComparablew2)StringPairip1=(StringPair)w1;StringPairip2=(StringPair)w2;returnip1.getFirst().compareTo(ip2.getFirst();3 .在Reduce过程统计分组中的所有值,得到源地址连接不同目的地址数量。publicvoidreduce(StringPairkey,Iterable<IntWritable>values,Contextcontext
8、)throwsIOException,InterruptedExceptionintsum=0;for(IntWritableval:values)sum+=val.get();statistic.set(sum);context.write(key.getFirst(),statistic);3.2 第二部分设计利用两个Map/Reduce过程,第一个乂就计各个文件中的所有单词的由现次数,以及各个文件单词总数,第二个MR艮据统计结果处理加工得到单词倒排索引。第一个MRS计:1 .在Map过程中,重写map类,利用StringTokenizer类,将map方法中的value值中存储的文本,拆分
9、成一个个的单词,并获取文件名,以两种格式进行输由<filename+word,1>或者<filename,1>。publicvoidmap(Objectkey,Textvalue,Contextcontext)throwsIOException,InterruptedException/获取文件名FileSplitfileSplit=(FileSplit)context.getInputSplit();StringfileName=fileSplit.getPath().getName();|/获取单词在单个文件中出现次数,及文件单词总数StringTokenizeri
10、tr=newStringTokenizer(value.toString();for(;itr.hasMoreTokens();)Stringword=removeNonLetters(itr.nextToken().toLowerCase();StringfileWord=fileName+"001"+word;if(!word.equals("")context.write(newText(fileWord),newIntWritable(1);context.write(newText(fileName),newIntWritable(1);2 .
11、在Reduce过程中,统计得到每个文件中每个单词的由现次数,以及每个文件的单词总数,输由<key,count>。publicvoidreduce(Textkey,Iterable<IntWritable>values,Contextcontext)throwsIOException,InterruptedExceptionintsum=0;for(IntWritableval:values)sum+=val.get();context.write(key,newIntWritable(sum);)第二个M就计:1 .Map过程读取第一个MR的输由,对value值进行拆分
12、,重新组合后输由键为固定Text类型值index,值为filename+word+count或者filename+count。publicvoidmap(Objectkey,Textvalue,Contextcontext)throwsIOException,InterruptedExceptionStringvalStr=value.toString();String口records=valStr.split("t");context.write(newText("index"),newText(records0+"001"+rec
13、ords1);)2 .Reduce过程中定义四个HashMapMap<String,Integer>wordinfilescount,key为单词+文件名,value为单词在该文件中由现的次数;Map<String,Integer>filescount,key为文件名,value为文件的单词总数;Map<String,Integer>wordinfiles,key为单词,value为单词在多少个文件中由现;Map<String,String>indexes,key为单词,value为倒排索引。读取values值,根据设定分隔符拆分,判断拆分后长度
14、如果为2,则该值为文件名+文件单词总数,将拆分后的文件名及文件单词总数,组成键值对放入Map<String,Integer>filescount;拆分后长度如果为3,则该值为文件名+单词+单词在该文件中由现次数,将拆分后的文件名+单词及单词在该文件中出现次数组成键值对放入Map<String,Integer>wordinfilescount,同时统计单词在多少个文件中由现,并组成键值对放入Map<String,Integer>wordinfiles。遍历Map<String,Integer>wordinfilescount,将单词作为键,“单词-
15、>由现该单词的文件个数:总文件个数:(由现该单词的文件名,单词在该文件中的由现次数,该文件的总单词数)”作为值,放入Map<String,String>indexes中。遍历Map<String,String>indexes获取倒排索引并输由全部索引。publicvoidreduce(Textkey,Iterable<Text>values,Contextcontext)throwsIOException,InterruptedException/拆分输入,获取单词出现在几个文件中以及在该文件中出现次数,各个文件的单词总数,总文件数for(Textva
16、l:values)StringvalStr=val.toString();Stringrecords=valStr.split("001");switch(records.length)case2:filescount.put(records0,Integer.parseInt(records1);break;case3:wordinfilescount.put(valStr,Integer.parseInt(records2);if(!wordinfiles.containsKey(records1)wordinfiles.put(records1,1);elseword
17、infiles.put(records1,wordinfiles.get(records1)+1);break;/处理获取倒排索引for(Entry<String,Integer>entry:wordinfilescount.entrySet()StringvalStr=entry.getKey();Stringrecords=valStr.split("001");Stringword=records1;if(!indexes.containsKey(word)StringBuildersb=newStringBuilder();sb.append(word)
18、.append("->")|.append(wordinfiles.get(word).append(":")|.append(filescount.size().append(":")|.append("(")|.append(records0).append(",")|.append(entry.getValue().append(",")|.append(filescount.get(records。).append(")");indexes.
19、put(word,sb.toString();elseStringBuildersb=newStringBuilder();sb.append(",(").append(records0).append(",").append(entry.getValue().append(",").append(filescount.get(records0).append(")");|indexes.put(word,indexes.get(word)+sb.toString();for(Entry<String,Str
20、ing>entry:indexes.entrySet()context.write(newText(entry.getValue()+""),NullWritable.get();)4.程序代码4.1 第一部分代码1.IpStatistics.java/* LicensedtotheApacheSoftwareFoundation(ASF)underone* ormorecontributorlicenseagreements.SeetheNOTICEfile* distributedwiththisworkforadditionalinformation* rega
21、rdingcopyrightownership.TheASFlicensesthisfile* toyouundertheApacheLicense,Version2.0(the* "License");youmaynotusethisfileexceptincompliance* withtheLicense.YoumayobtainacopyoftheLicenseat* http:/licenses/LICENSE-2.0* Unlessrequiredbyapplicablelaworagreedtoinwriting,software*
22、 distributedundertheLicenseisdistributedonan"ASIS"BASIS,* WITHOUTWARRANTIESORCONDITIONSOFANYKIND,eitherexpressorimplied.* SeetheLicenseforthespecificlanguagegoverningpermissionsand* limitationsundertheLicense.*/importjava.io.IOException;importjava.util.ArrayList;importjava.util.Collections
23、;importjava.util.Comparator;importjava.util.HashMap;importjava.util.List;importjava.util.Map;importjava.util.Map.Entry;importjava.util.StringTokenizer;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.NullWritable;importorg.apache.hadoop.io.
24、Text;importorg.apache.hadoop.io.WritableComparable;importorg.apache.hadoop.io.WritableComparator;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.Reducer;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoo
25、p.mapreduce.lib.output.FileOutputFormat;importorg.apache.hadoop.util.GenericOptionsParser;importorg.apache.hadoop.fs.Path;publicclassIpStatistics/第一个Map/Reducemapfe,用于去重publicstaticclassRemoveMapperextendsMapper<Object,Text,StringPair,NullWritable>publicvoidmap(Objectkey,Textvalue,Contextconte
26、xt)throwsIOException,InterruptedExceptionStringTokenizeritr=newStringTokenizer(value.toString();while(itr.hasMoreTokens()StringnextToken=itr.nextToken();String口records=nextToken.split(",");Stringsourceip=records0.replace("<","");Stringdestinationip=records1.replace(&
27、quot;>","");context.write(newStringPair(sourceip,destinationip),NullWritable.get();/第二个Map/Reducedzi程ma酸,用于统计publicstaticclassStatisticsMapperextendsMapper<Object,Text,StringPair,IntWritable>IntWritableone=newIntWritable(1);publicvoidmap(Objectkey,Textvalue,Contextcontext)thr
28、owsIOException,InterruptedExceptionStringrecords=value.toString().split("t");Stringsourceip=records0;Stringdesip=records1;context.write(newStringPair(sourceip,desip),one);/按照源地址分组publicstaticclassGroupComparatorextendsWritableComparatorprotectedGroupComparator()super(StringPair.class,true)
29、;Overridepublicintcompare(WritableComparablew1,WritableComparablew2)StringPairip1=(StringPair)w1;StringPairip2=(StringPair)w2;returnip1.getFirst().compareTo(ip2.getFirst();)/第一个Map/Reducedzi程reduce过程,去重publicstaticclassRemoveReducerextendsReducer<StringPair,IntWritable,Text,NullWritable>public
30、voidreduce(StringPairkey,Iterable<IntWritable>values,Contextcontext)throwsIOException,InterruptedExceptioncontext.write(newText(key.toString(),NullWritable.get();)/第二个Map/Reducedzi程reduce过程,统计publicstaticclassStatisticsReducerextendsReducer<StringPair,IntWritable,Text,IntWritable>private
31、IntWritablestatistic=newIntWritable();publicvoidreduce(StringPairkey,Iterable<IntWritable>values,Contextcontext)throwsIOException,InterruptedExceptionintsum=0;for(IntWritableval:values)sum+=val.get();)statistic.set(sum);context.write(key.getFirst(),statistic);)/去重任务publicstaticvoidRemoveTask(S
32、tringargs)throwsExceptionConfigurationconf=newConfiguration();Stringmaster=""conf.set("fs.defaultFS","hdfs::9000");conf.set("hadoop.job.user","hadoop");conf.set("","yarn");conf.set("y
33、arn.resourcemanager.address",master+”:8032");conf.set("yarn.resourcemanager.scheduler.address",master+":8030");conf.set("mapred.jar","ipstatistics.jar");StringotherArgs=newGenericOptionsParser(conf,args).getRemainingArgs();if(otherArgs.length<3)Sy
34、stem.err.println("Usage:ipstatistics<in><in>.<out>");System.exit(2);)Jobjob=newJob(conf,"ipstatisticsRemoving");job.setMapperClass(RemoveMapper.class);job.setReducerClass(RemoveReducer.class);job.setOutputKeyClass(StringPair.class);job.setOutputValueClass(NullWr
35、itable.class);FileInputFormat.addInputPath(job,newPath(otherArgs0);FileOutputFormat.setOutputPath(job,newPath(otherArgs1);job.waitForCompletion(true);/统计任务publicstaticvoidStatisticsTask(Stringargs)throwsExceptionConfigurationconf=newConfiguration();Stringmaster=""conf.set("fs
36、.defaultFS","hdfs::9000");conf.set("hadoop.job.user","hadoop");conf.set("","yarn");conf.set("yarn.resourcemanager.address",master+”:8032");conf.set("yarn.resourcemanager.scheduler.address"
37、;,master+":8030");conf.set("mapred.jar","ipstatistics.jar");StringotherArgs=newGenericOptionsParser(conf,args).getRemainingArgs();if(otherArgs.length<3)System.err.println("Usage:ipstatistics<in><in>.<out>");System.exit(2);Jobjob=newJob(con
38、f,"ipstatisticsStatistics");job.setMapperClass(StatisticsMapper.class);job.setGroupingComparatorClass(GroupComparator.class);job.setReducerClass(StatisticsReducer.class);job.setOutputKeyClass(StringPair.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job,newP
39、ath(otherArgs1);FileOutputFormat.setOutputPath(job,newPath(otherArgs2);System.exit(job.waitForCompletion(true)?0:1);publicstaticvoidmain(Stringargs)throwsExceptionRemoveTask(args);StatisticsTask(args);2.StringPair.javaimportorg.apache.hadoop.io.Text;importorg.apache.hadoop.io.WritableComparable;impo
40、rtjava.io.Datalnput;importjava.io.DataOutput;importjava.io.IOException;publicclassStringPairimplementsWritableComparable<StringPair>privateTextfirst;privateTextsecond;publicStringPair()this.first=newText();this.second=newText();publicStringPair(Stringfirst,Stringsecond)set(newText(first),newTe
41、xt(second);publicStringPair(Textfirst,Textsecond)set(first,second);publicvoidset(Textfirst,Textsecond)this.first=first;this.second=second;publicTextgetFirst()returnfirst;publicTextgetSecond()returnsecond;)publicvoidwrite(DataOutputout)throwsIOExceptionfirst.write(out);second.write(out);)publicvoidre
42、adFields(DataInputin)throwsIOExceptionfirst.readFields(in);second.readFields(in);)OverridepublicinthashCode()returnfirst.hashCode()*163+second.hashCode();)Overridepublicbooleanequals(Objectobj)if(objinstanceofStringPair)StringPairip=(StringPair)obj;returnfirst.toString().equals(ip.first.toString()&a
43、mp;&second.toString().equals(ip.second.toString();4.2第二部分代码importjava.io.IOException;importjava.util.HashSet;importjava.util.Map;importjava.util.Set;importjava.util.StringTokenizer;importjava.util.HashMap;importjava.util.Map.Entry;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoo
44、p.fs.Path;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.NullWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.Reducer;importorg.apache.hadoop.mapreduce.Mapper.Context;importorg
45、.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.input.FileSplit;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;importorg.apache.hadoop.util.GenericOptionsParser;publicclassInverseV3/*第一个mr的ma暧,获取每个单词在单个文件中出现次数,输入为每个文件行偏移量,输出为<word+filename,1
46、>*或<filename,1>*/publicstaticclassstatisticsMapextendsMapper<Object,Text,Text,IntWritable>privateTextmapKey=newText("key");©Overridepublicvoidmap(Objectkey,Textvalue,Contextcontext)throwsIOException,InterruptedException/获取文件名FileSplitfileSplit=(FileSplit)context.getInpu
47、tSplit();StringfileName=fileSplit.getPath().getName();/获取单词在单个文件中出现次数,及文件单词总数StringTokenizeritr=newStringTokenizer(value.toString();for(;itr.hasMoreTokens();)Stringword=removeNonLetters(itr.nextToken().toLowerCase();StringfileWord=fileName+"001"+word;if(!word.equals("")context.wr
48、ite(newText(fileWord),newIntWritable(l);context.write(newText(fileName),newIntWritable(1);/去掉字符串非字母字符publicstaticStringremoveNonLetters(Stringoriginal)StringBufferaBuffer=newStringBuffer(original.length();charaCharacter;for(inti=0;i<original.length();i+)aCharacter=original.charAt(i);if(Character.
49、isLetter(aCharacter)aBuffer.append(aCharacter);returnnewString(aBuffer);/第一个mr白reduce类,统计汇总出现单词的文件个数,及每个文件中单词出现个数及每个文件单词个数,publicstaticclassstatisticsReduceextendsReducer<Text,IntWritable,Text,IntWritable>Overridepublicvoidreduce(Textkey,Iterable<IntWritable>values,Contextcontext)throwsI
50、OException,InterruptedExceptionintsum=0;for(IntWritableval:values)sum+=val.get();context.write(key,newIntWritable(sum);publicstaticclassInverseMapperextendsMapper<Object,Text,Text,Text>publicvoidmap(Objectkey,Textvalue,Contextcontext)throwsIOException,InterruptedExceptionStringvalStr=value.toS
51、tring();Stringrecords=valStr.split("t");context.write(newText("index"),newText(records0+"001"+records1);publicstaticclassInverseReducerextendsReducer<Text,Text,Text,NullWritable>privateMap<String,Integer>wordinfilescount=newHashMap<String,Integer>();/k
52、ey为单词+文件名,value为单词在该文件中出现的次数privateMap<String,Integer>filescount=newHashMap<String,Integer>();/key为文件名,value为文件的单词总数privateMap<String,Integer>wordinfiles=newHashMap<String,Integer>();/key为单词,value为单词在多少的文件中出现privateMap<String,String>indexes=newHashMap<String,String&g
53、t;();/key为单词,value为倒排索引publicvoidreduce(Textkey,Iterable<Text>values,Contextcontext)throwsIOException,InterruptedException/拆分输入,获取单词出现在几个文件中以及在该文件中出现次数,各个文件的单词总数,总文件数for(Textval:values)StringvalStr=val.toString();String口records=valStr.split("001");switch(records.length)case2:filescou
54、nt.put(records0,Integer.parseInt(records1);break;case3:wordinfilescount.put(valStr,Integer.parseInt(records2);if(!wordinfiles.containsKey(records1)wordinfiles.put(records1,1);elsewordinfiles.put(records1,wordinfiles.get(records1)+1);break;/处理获取倒排索引for(Entry<String,Integer>entry:wordinfilescoun
55、t.entrySet()StringvalStr=entry.getKey();Stringrecords=valStr.split("001");Stringword=records1;if(!indexes.containsKey(word)StringBuildersb=newStringBuilder();sb.append(word).append("->").append(wordinfiles.get(word).append(":").append(filescount.size().append(":
56、").append("(").append(records0).append(",").append(entry.getValue().append(",").append(filescount.get(records0).append(")");indexes.put(word,sb.toString();elseStringBuildersb=newStringBuilder();sb.append(",(").append(records0).append(",&quo
57、t;).append(entry.getValue().append(",").append(filescount.get(records0).append(")");indexes.put(word,indexes.get(word)+sb.toString();for(Entry<String,String>entry:indexes.entrySet()context.write(newText(entry.getValue()+""),NullWritable.get();/统计单词在文件中出现次数及单个文件单词总数publicstaticvoidStatisticsTask(Stringargs)throwsExceptionConfigurationconf=newConfiguration();Stringmaster=""conf.set("fs.defaultFS","hdfs::9000");conf.set("hadoop.job.user",
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 2026年吉林省长春市高职单招职业适应性测试考试题库有答案详细解析
- 2026浙江事业单位统考普陀区招聘5人考试备考题库及答案解析
- 2026安徽合肥市包河区童享时光合肥市武汉路幼儿园招聘笔试参考题库及答案解析
- 2026内蒙古乌海海南区事业单位招聘40人笔试备考试题及答案解析
- 2026年杭州万向职业技术学院单招综合素质考试题库附答案详细解析
- 宁波数字产业集团有限公司招聘4人笔试参考题库及答案解析
- 河南省周口市郸城县重点达标名校2025-2026学年初三摸底考试语文试题试卷含解析
- 福建省莆田市仙游县第六片区达标名校2026年中考语文试题仿真题含解析
- 2026年上海市浦东区第四教育署初三练习题二(全国卷I)语文试题含解析
- 广西柳州市柳南区、城中区重点达标名校2025-2026学年校初三4月考语文试题含解析
- 2026广西北海市从“五方面人员”中选拔乡镇领导班子成员25人考试备考题库及答案解析
- 2026杭州市市级机关事业单位编外招聘148人笔试参考题库及答案解析
- 2026年春季贵州人民版(2024)六年级下册综合实践活动《小学毕业留念》教学课件
- 湖北省襄阳市2026届高三下学期3月一模统一调研测试数学试题
- 第4课《坚持才会有收获》课件
- 2026年春季安全教育班会记录表(19周):开学安全第一课-启航安全守护新学期
- 2025年黄山职业技术学院单招职业技能测试题库附答案解析
- 大坝安全监测仪器检验测试规程
- 绿色数据中心 暨对算力行业的一点思考 行业洞察 2026
- 妇产科学精准医学:围产期多组学监测与管理
- 二十届中纪委五次全会知识测试题及答案解析
评论
0/150
提交评论