




版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
2010-01-0811:142883人阅读评论(4)收藏举报inkfish原创,请勿商业性质转载,转载请注明来源(/inkfish)。p来测试结果。ServerbitJDK0_16-b01Writerviewplainpackageinkfish.hadoop.study;2.importjava.io.DataOutputStream;3.importjava.io.IOException;4.importjava.io.UnsupportedEncodingException;5.importorg.apache.hadoop.io.NullWritable;6.importorg.apache.hadoop.io.Text;7.importorg.apache.hadoop.mapreduce.RecordWriter;8.importorg.apache.hadoop.mapreduce.TaskAttemptContext;9.importorg.apache.hadoop.mapreduce.lib.output.TextOutputFormat;neRecordWriterKVextendsRecordWriterKV12.privatestaticfinalStringutf8="UTF-8";13.privatestaticfinalbyte[]newline;14.static{15.try{16.newline="/n".getBytes(utf8);17.}catch(UnsupportedEncodingExceptionuee){18.thrownewIllegalArgumentException("can'tfind"+utf8+"enco19.}20.}21.protectedDataOutputStreamout;22.privatefinalbyte[]keyValueSeparator;23.publicLineRecordWriter(DataOutputStreamout,StringkeyValueSeparator){24.this.out=out;25.try{26.this.keyValueSeparator=keyValueSeparator.getBytes(utf8);27.}catch(UnsupportedEncodingExceptionuee){28.thrownewIllegalArgumentException("can'tfind"+utf8+"enco29.}30.}31.publicLineRecordWriter(DataOutputStreamout){32.this(out,"/t");33.}34.privatevoidwriteObject(Objecto)throwsIOException{35.if(oinstanceofText){36.Textto=(Text)o;37.out.write(to.getBytes(),0,to.getLength());38.}else{39.out.write(o.toString().getBytes(utf8));40.}41.}42.publicsynchronizedvoidwrite(Kkey,Vvalue)throwsIOException{43.booleannullKey=key==null||keyinstanceofNullWritable;44.booleannullValue=value==null||valueinstanceofNullWritable;45.if(nullKey&&nullValue){46.return;47.}48.if(!nullKey){49.writeObject(key);50.}51.if(!(nullKey||nullValue)){52.out.write(keyValueSeparator);53.}54.if(!nullValue){55.writeObject(value);56.}57.out.write(newline);58.}59.publicsynchronizedvoidclose(TaskAttemptContextcontext)throwsIOException{60.out.close();61.}ltipleOutputFormatconf),即通过key和value及conf配置信息决定文件名(含扩展名)。viewplainpackageinkfish.hadoop.study;importjava.io.DataOutputStream;.importjava.io.IOException;4.importjava.util.HashMap;importjava.util.Iterator;importorgapache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.FSDataOutputStream;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.Writable;CodecmmitterdWritermptContextputFileOutputCommittertputFileOutputFormationUtilsutFormatKextendsWritableComparableVextendsWritable>20.extendsFileOutputFormat<K,V>{21.privateMultiRecordWriterwriter=null;22.publicRecordWriter<K,V>getRecordWriter(TaskAttemptContextjob)throws23.InterruptedException{24.if(writer==null){25.writer=newMultiRecordWriter(job,getTaskOutputPath(job));26.}27.returnwriter;28.}29.privatePathgetTaskOutputPath(TaskAttemptContextconf)throwsIOException{30.PathworkPath=null;31.OutputCommittercommitter=super.getOutputCommitter(conf);32.if(committerinstanceofFileOutputCommitter){33.workPath=((FileOutputCommitter)committer).getWorkPath();34.}else{35.PathoutputPath=super.getOutputPath(conf);36.if(outputPath==null){37.thrownewIOException("Undefinedjoboutput-path");38.}39.workPath=outputPath;40.}41.returnworkPath;42.}43./**通过key,value,conf来确定输出文件名(含扩展名)*/44.protectedabstractStringgenerateFileNameForKeyValue(Kkey,Vvalue,Cotionconf45.publicclassMultiRecordWriterextendsRecordWriter<K,V>{46./**RecordWriter的缓存*/47.privateHashMap<String,RecordWriter<K,V>>recordWriters=null;48.privateTaskAttemptContextjob=null;49./**输出目录*/50.privatePathworkPath=null;51.publicMultiRecordWriter(TaskAttemptContextjob,PathworkPath){52.super();53.this.job=job;54.this.workPath=workPath;55.recordWriters=newHashMap<String,RecordWriter<K,V>>();56.}57.@Override58.publicvoidclose(TaskAttemptContextcontext)throwsIOException,InterruptedException{59.Iterator<RecordWriter<K,V>>values=this.recordWriters.values(60.while(values.hasNext()){61.values.next().close(context);62.}63.this.recordWriters.clear();64.}65.@Override66.publicvoidwrite(Kkey,Vvalue)throwsIOException,InterruptedException{67.//得到输出文件名68.StringbaseName=generateFileNameForKeyValue(key,value,job.getConfiguration;69.RecordWriter<K,V>rw=this.recordWriters.get(baseName);70.if(rw==null){71.rw=getBaseRecordWriter(job,baseName);72.this.recordWriters.put(baseName,rw);73.}74.rw.write(key,value);75.}76.//${mapred.out.dir}/_temporary/_${taskid}/${nameWithExtension}77.privateRecordWriter<K,V>getBaseRecordWriter(TaskAttemptContextjongbaseName78.throwsIOException,InterruptedException{79.Configurationconf=job.getConfiguration();80.booleanisCompressed=getCompressOutput(job);81.StringkeyValueSeparator=",";82.RecordWriter<K,V>recordWriter=null;83.if(isCompressed){84.Class<?extendsCompressionCodec>codecClass=getOutputCompob85.GzipCodec.class);86.CompressionCodeccodec=ReflectionUtils.newInstance(codecClconf87.Pathfile=newPath(workPath,baseName+codec.getDefaultExtension));88.FSDataOutputStreamfileOut=file.getFileSystem(conf).create89.recordWriter=newLineRecordWriter<K,V>(newDataOutputStre90..createOutputStream(fileOut)),keyValueSeparator);91.}else{92.Pathfile=newPath(workPath,baseName);93.FSDataOutputStreamfileOut=file.getFileSystem(conf).create94.recordWriter=newLineRecordWriter<K,V>(fileOut,keyValueSrator95.}96.returnrecordWriter;97.}98.}ordCountttxt其他以“other.txt”保存。viewplainpackageinkfish.hadoop.study;.importjava.io.IOException;importjava.util.StringTokenizer;4.importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.Mapper;ser15.publicstaticclassTokenizerMapperextendsMapper<Object,Text,Text,IntWritable>{privatefinalstaticIntWritableone=newIntWritable(1);17.privateTextword=newText();18.publicvoidmap(Objectkey,Textvalue,Contextcontext)throwsIOEx19.InterruptedException{20.StringTokenizeritr=newStringTokenizer(value.toString());21.while(itr.hasMoreTokens()){22.word.set(itr.nextToken());23.context.write(word,one);24.}25.}26.}27.publicstaticclassIntSumReducerextendsReducer<Text,IntWritable,Text,IntWritable>{28.privateIntWritableresult=newIntWritable();29.publicvoidreduce(Textkey,Iterable<IntWritable>values,Contextctext30.throwsIOException,InterruptedException{31.intsum=0;32.for(IntWritableval:values){33.sum+=val.get();34.}35.result.set(sum);36.context.write(key,result);37.}38.}39.publicstaticclassAlphabetOutputFormatextendsMultipleOutputFormat<Text,IntWritable>{40.@OtectedStringgenerateFileNameForKeyValue(Textkey,IntWritablevalue,Configurationconf){42.charc=key.toString().toLowerCase().charAt(0);43.if(c>='a'&&c<='z'){44.returnc+".txt";45.}46.return"other.txt";47.}48.}49.publicstaticvoidmain(String[]args)throwsException{50.Configurationconf=newConfiguration();51.String[]otherArgs=newGenericOptionsParser(conf,args).getRemaini52.if(otherArgs.length!=2){53.System.err.println("Usage:wordcount<in><out>");54.System.exit(2);67.}}bnewJobconfwordcountByClassWordCountclasserClassTokenizerMapperclassinerClassInt
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 公园端午活动方案
- CA运营管理制度
- it维保管理制度
- 上门安全管理制度
- 专利资产管理制度
- 专项成果管理制度
- 丙肝随访管理制度
- 业务付款管理制度
- 业务提奖管理制度
- 业务用酒管理制度
- 石家庄事业单位综合类岗位笔试真题2024
- 《宴会国际礼仪》课件
- 【博观研究院】2025年跨境进口保健品市场分析报告
- 叉车安全使用管理制度
- 2025吉林长春市轨道交通集团限公司校园招聘670人高频重点提升(共500题)附带答案详解
- 【MOOC】高分子化学-浙江大学 中国大学慕课MOOC答案
- 【MOOC】西方园林历史与艺术-北京林业大学 中国大学慕课MOOC答案
- 《中医情志护理》课件
- 【MOOC】质量工程技术基础-北京航空航天大学 中国大学慕课MOOC答案
- 跆拳道培训机构家长会
- 学校操场塑胶跑道改造方案
评论
0/150
提交评论