




已阅读5页,还剩39页未读, 继续免费阅读
版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
Hadoop平台简介,肖韬 南京大学计算机科学与技术系 2010,使用Hadoop的Java API接口,在Hadoop文件系统中的文件是由一个Hadoop Path对象来表示的,可以把一个Path对象想象成一个Hadoop文件系统的URI, 例如hdfs:/localhost:9000/user/xt/input/text.dat,通过2个静态工厂方法从抽象的Hadoop文件系统中抽取出一个具体的实现的实例。 public static FileSystem get(Configuration conf) throws IOException; 返回默认的文件系统(在conf/core-site.xml中指定),或者本地的文件系统(如果该文件中没有指定) public static FileSystem get(URI uri, Configuration conf) throws IOException; 返回由uri决定的文件系统,或者默认的文件系统(如果uri无效),新旧API变化的对比,以0.20.0版本为分水岭,有一些API在新的版本中被舍弃了,且推荐不使用,而是改为使用新的API 下面将以WordCount程序为例进行说明,0.20.0之前的WordCount程序,public WordCount public static void main(String args) throws Throwable JobConf conf = new JobConf(WordCount.class); conf.setJobName(“A Sample WordCount Example”); FileInputFormat.addInputPath(conf, new Path(args0); FileInputFormat.setOutputPath(conf, new Path(args1); conf.setMapperClass(WordCountMapper.class); conf.setReducerClass(WordCountReducer.class); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); JobClient.runJob(conf); ,class WordCountMapper extends MapReduceBase implements Mapper public void map(LongWritable offset, Text line, OutputCollector collector, Reporter reporter) throws IOException StringTokenizer tokenzier = new StringTokenizer(line.toString(); while (tokenizer.hasMoreTokens() collector.collect(new Text(tokenizer.nextToken(), new IntWritable(1); ,class WordCountReducer extends MapReduceBase implements Reducer public void reduce(Text word, Iterator counts, OutputCollector collector, Reporter reporter) throws IOException int sum = 0; while (counts.hasNext() sum += counts.next().get(); collector.collect(word, new IntWritable(sum); ,0.20.0之后的WordCount程序,public class WordCount public static void main(String args) throw Exception Configuration conf = new Configuration(); Job job = new Job(conf, “A Sample WordCount Example”); job.setJarByClass(WordCount.class); job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args0); FileOutputFormat.setOutputPath(job, new Path(args1); job.waitForCompletion(true); ,class WordCountMapper extends Mapper public void map(LongWritable offset, Text line, Context context) throws IOException, InterruptedException StringTokenizer tokenizer = new StringTokenizer(line.toString(); while (tokenizer.hasMoreTokens() context.write(new Text(tokenizer.nextToken(), new IntWritable(1); ,class WordCountReducer extends Reducer public void reduce(Text word, Iterator counts, Context context) throws IOException, InterruptedException int sum = 0; while (counts.hasNext() sum += counts.next().get(); context.write(word, new IntWritable(sum); ,Shuffle and Sort,MapReduce保证每一个reduce task的输入基于key排序的。 (MapReduce makes the guarantee that the input to every reducer is sorted by key). 系统进行排序的过程(包括将map的输出转换为reduce的输入) 被称为shuffle. The process by which the system performs the sort and transfers the map outputs to the reducer as inputs is known as the shuffle.,shuffle过程 map task中生成了3个spill file,每个spill file中有3个partition,shuffle过程: map task side,当一个map task开始产生它的输出时,输出并非不经处理 被直接就写到磁盘上去的。 每一个map task都有一个circular memory buffer,缺省大小为100MB,map task会将它产生的输出(key-value pairs)写入到它的memory buffer中去。 当map task写入到memory buffer的数据占memory buffer的大小百分比到达一个阈值(缺省为80%)时,一个background thread(记为thread)将开始把memory buffer中的内容spill到磁盘上去。 在thread将memory buffer中的数据spill到磁盘中之前,thread首先将这些数据分成若干partition,每一个partition将被发送至一个reducer。,在每一个partition内,thread将根据key对该partition内的数据(即key-value pairs)进行in-memory sort,如果指定了combiner function,那么该combiner function将会被作用于上述in-memory sort的输出。 每当memory buffer中的数据达到一个阈值时,就会产生一个spill file,所以在map task输出了所有的record之后,就会存在多个spill files. (1个record即1个key-value pair) 在map task结束之前,所有的spill files将被merge到一个单独的output file中,该output file在结构上由多个partition组成,每一个partition内的数据都是排好序的,且每一个partition将被送至对应的一个reduce task. 如果指定了combiner function并且spill的数量不低于3个,那么在生成output file之前,combiner function将会作用于将要被写入到output file里的每一个partition内的数据。,reduce task side,map task的输出存储在map task节点所在机器的本地文件系统中,reduce task会自己所需的某个partition数据复制到自己所在的HDFS中,且一个reduce task将会从多个map task复制其所需要的partition(这些partition都是同一类的)。 reducer 怎样知道从哪些map tasktracker那里去取自己所需要的partition(亦即map task的输出)? 当map task成功完成后,它会将状态更新通知它所属的tasktracker,该tasktracker进而又会通知其所属的jobtracker。 这些通知是通过heartbeat通信机制实现的。这样,对于一个job而言,jobtracker知道map output与tasktracker之间的映射关系。 reducer中的一个线程会周期性地向jobtracker询问map output所在的位置,直到该reducer接收了所有的map output.,combiner function 与 partitioner function,当存在多个reducer时,map tasks将会对它们的输出进行partition,每一个mask task都会为每一个reduce task生成一个partition. 在每一个partition内都可能会有很多keys(以及相应的values),但是对于任一个key而言,它的records都在一个partition内。 partition的过程可以由用户定义的partitioning函数来控制,但是一般来说,默认的partitioner函数(根据key进行hash映射)已经可以令人满意。,存在多个reduce task时的partitioning partition的数量与reducer的数量是一致的,定制个性化的partitioner,自定义的partitioner function需要继承于一个抽象类 Partitioner controls the partitioninig of the keys of the intermediate map-outputs .The key(or a subset of key) is used to derive the partition, typically by a hash function. This controls which of the m reduce tasks the intermediate key (and hence the record) is sent for reduction.,实现Partitioner中的getPartition函数 原型 abstract int getPartition(KEY key, VALUE value, int numPatitions); 其中, key和value是mapper输出的intermediate output。例如,在WordCount例子中就分别是word与1。 numPartitions是reducers的数量。 返回值是该record将被发送至的reducer的编号(0, 1, , m-1),指定多个reducers,bin/hadoop jar WordCount D mapred.reduce.tasks=3 input output 这样,在reduce阶段会有3个reduce tasks运行。,speculative execution(默认打开),当多个task并行运行时,可能若干个task运行明显比其他task要慢。这种情况下,Hadoop将会为这些运行较慢的task启动一个相同的backup task,称为speculative execution. 一个task及其speculative task不会同时运行,以避免竞争。 在一个job的所有task都已经启动的情况下,对于那些同时满足 1)已经运行了一段时间(至少1分钟) 2)运行的速度明显慢于其余task的平均速度 的task,一个speculative task才会被启动。 对于original task及其speculative task而言,如果任何一方先运行结束,则另一方将被killed.,Skipping bad records,当一个task失败时(原因可能是硬件故障、待处理数据非法等),该task将会被retried,但是如果该task失败的次数达到4次,那么该task所属的整个job就将被标记为failed。 当map task读到一个bad record时,可能会因为抛出异常而失败,进而整个job可能会失败。有时,第三方的库可能有bug,导致task因读取了某个bad record而失败,而这个第三方的库又无法修改。 这时,可以使用Hadoop的skip mode,以使得读取输入文件使自动地跳过bad records.,在打开了skipping mode之后,task会将其所处理的records报告给tasktracker。当task失败时,tasktracker会retry该task,并跳过引起失败的records。 为了减少skipping mode带来的带宽及记账信息(bookkeeping)的消耗,当一个task失败达到2次时,才会开启skipping mode。,如果一个task因为某个bad record而持续地失败,那么task tracker将会以下列的结果执行task attempts: task 失败. task 再次失败. skipping mode被打开. task仍然失败,但是bad record被tasktracker记录下来. skipping mode处于使能状态,task因为跳过了前面导致失败的bad record而成功. skipping mode是默认关闭的。 注意,对于每一个task attempt,skipping mode只能发现一个bad record.,Task side-effect files,要保证一个task的多个instance不会试图向同一个文件进行写操作: 1)如果某个task失败了(失败前已经向输出文件中写了一部分数据),那么当其再次运行(retry)时,必须先将旧的文件删掉。 2)当speculative execution被使能时,某个original task与它的speculative task可能会试图向同一个文件进行写操作。 Hadoop为每一个task attempt指定了一个临时目录,每一个task attempt的输出就会被写到这个目录中去,从而避免了上述的问题。 这个目录就是$mapred.output.dir,Input Format,map: (k1, v1) list(k2, v2) combine: (k2, list(v2) list(k2, v2) reduce: (k2, list(v2) list(k3, v3) 可以看出,如果使用combiner,那么它的输入/输出格式与reducer是完全一样的(同时也是Reducer的子类),只不过combiner的输出是intermediate key-value pairs(这将是reducer的输入)。,Input types由Input format决定,例如TextInputFormat决 定了输入的key的类型是LongWritable(首字符在文件中的偏移量),value的类型是Text(一行文本内容). 如果希望产生其他类型的输入,可以显式地调用JobConf 的方法。否则,若不显式地(set explicitly)设置,则不论是否使用combiner, intermediate types默认与最终的输出类型相同(即LongWritable与Text)。 所以,若k2和k3相同,则不需要调用setMapKeyOutputClass(),因为intermediate key type已经被setOutputKeyClass()设置好了。同理,若v2和v3相同,则只需要调用setOutputValueClass()即可。,为什么要为intermediate key-value pairs和最终的output指定类型?,似乎通过mapper与reducer就可以确定intermediate key-value pairs和最终的output的类型了。 原因:Java的泛型机制中的type erasure使得这些类型信息在运行时是不可知的,所以必须显式地为Hadoop指定这些类型。,InputFormat class hierarchy,Input Split,什么是 input split? 1个input split是input file中的1个chunk,该chunk将被1个单独的map进行处理。每一个map处理一个input split. 每一个split可被划分为若干records,1个record即1个key-value pair,map依次处理每一个record. Input split由一个Java 抽象类代表,即 org.apache.hadoop.mapreduce; abstract class InputSplit,InputSplit represents the data to be processed by an individual mapper. Typically, it presents a byte-oriented view on the input and is the responsibility of RecordReader of the job to process this and present a record-oriented view. 注意,InputSplit并不包含input data,而只是指向input data的一个reference。 Map/Reduce系统利用 getLocations()所得到的storage locations信息来将map tasks放置在尽可能靠近input split数据的地方;利用getLength()得到的size信息对splits进行排序,使得最大的spilt先被处理,试图来最小化job的运行时间。,Input file, input split and record,input file,Input split,record,key-value pair,MapReduce应用程序开发者不需要直接处理InputSplit, 因为它是由一个InputFormat生成的。InputFormat负责生成 input splits,并把它们划分为records.,0.20.0之前的定义如下 public interface InputFormat InputSplit getSplits(JobConf job, int numSplits) throws IOException; RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException; 其实跟新的还是很类似的。,对于旧版InputFormat的解释,The JobClient calls the getSplits() method, passing the desired number of map tasks as the numSplits argument. This number is treated as a hint, as InputFormat implementations are free to return a different number of splits to the number specified in numSplits. Having calculated the splits, the client sends them to the jobtracker, which uses their storage locations to schedule map tasks to process them on the tasktrackers. On a tasktracker, the map task passes the split to the getRecordReader() method on InputFormat to obtain a RecordReader for that split. A RecordReader is little more than an iterator over records, and the map task uses one to generate record key-value pairs, which it passes to the map function.,The abstract InputFormat class,The Map/Reduce framwork relies on the InputFormat of the job to : 1. Validate the input-specification of the job. 2. Split-up the input file(s) into logical InputSplits, each of which is then assigned to an individual Mapper. Provide the RecordReader implementation to be used to glean input records from logical InputSplit for processing by a Mapper.,org.apache.hadoop.mapred Interface RecordReader,RecordReader reads pairs from an InputSplit. RecordReader, typically, converts the byte-oriented view of the input provided by the InputSplit, and presents a record-oriented view for the Mapper & Reducer tasks for processing. It t
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
评论
0/150
提交评论