分布式并行计算之Hadoop应用.doc_第1页
分布式并行计算之Hadoop应用.doc_第2页
分布式并行计算之Hadoop应用.doc_第3页
分布式并行计算之Hadoop应用.doc_第4页
分布式并行计算之Hadoop应用.doc_第5页
已阅读5页,还剩7页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

重庆邮电大学堂下论文分布式并行计算摘要:早先那种多线程,多任务分解的日志分析设计,其实是分布式计算的一个单机版缩略,如何将这种单机的工作分拆,变成集群工作协同,其实就是分布式计算框架设计所涉及的。具体的计算任务交由哪一台机器执行,执行后由谁来汇总,这都由分布式框架的Master来抉择,而使用者只需简单的将待分析内容的提供给分布式计算系统作为输入,就可以得到分布式计算后的结果。Hadoop是Apache开源组织的一个分布式计算开源框架,随着并行处理硬件性能的迅速提高,人们对并行算法的兴趣也日益增加。所谓并行算法是指一次可执行多个操作的算法。对并行算法的研究现在已发展为一个独立的研究领域。很多用串行算法解决的问题也已经有了相应的并行算法。关键字:分布式;并行计;,Hadoop框架;MapReduce和HDFSAbstract:The earlier the kind of multi-threaded, multi-task decomposition of the log analysis and design, distributed computing is actually a stand-alone version of abbreviations, how to break this stand-alone work, collaborative work into clusters, in fact, the framework of distributed computing involved in the design. Specific computing tasks by which machine execution, who after the execution summary, this framework by the Master to distributed choice, the user simply content to be analyzed will be provided to the distributed computing system as the input to distributed computing can be the result. Hadoop is an Apache open source open source distributed computing framework, with the performance of parallel processing hardware, the rapid improvement of people interested in the parallel algorithm is also increasing. The so-called parallel algorithm is an algorithm can perform multiple operations. Now the parallel algorithm has been developed as an independent field of study. A lot of problems with the serial algorithm has been a corresponding parallel algorithms. Keywords: distributed; parallel count;Hadoop framework; MapReduce and HDFS 1 Hadoop简介 Hadoop 是一个开源的可运行于大规模集群上的分布式并行编程框架,由于分布式存储对于分布式编程来说是必不可少的,这个框架中还包含了一个分布式文件系统 HDFS( Hadoop Distributed File System )。Hadoop框架中最核心设计就是:MapReduce和HDFS。MapReduce的思想是由Google的一篇论文所提及而被广为流传的,简单的一句话解释MapReduce就是任务的分解与结果的汇总。HDFS是Hadoop分布式文件系统的缩写,为分布式计算存储提供了底层支持。基于 Hadoop,你可以轻松地编写可处理海量数据的分布式并行程序,并将其运行于由成百上千个结点组成的大规模计算机集群上。MapReduce从它名字上来看就大致可以看出个缘由,两个动词Map, Reduce,Map(展开)就是将一个任务分解成为多个任务,Reduce就是将分解后多任务处理的结果汇总起来,得出最后的分析结果。这不是什么新思想,其实在前面提到了多线程,多任务的设计就可以找到这种思想的影子。不论是现实社会,还是在程序设计中,一项工作往往可以被拆分成为多个任务,任务之间的关系可以分为两种:一种是不相关的任务,可以并行执行;另一种是任务之间有相互的依赖,先后顺序不能够颠倒,这类任务是无法并行处理的。回到过去,大学老师上课时让大家去分析关键路径,无非就是找最省时的任务分解执行方式。在分布式系统中,机器集群就可以看作硬件资源池,将并行的任务拆分交由每一个空闲机器资源去处理,能够极大地提高计算效率,同时这种资源无关性,对于计算集群的扩展无疑提供了最好的设计保证。任务分解处理以后,那就需要将处理以后的结果在汇总起来,这就是Reduce要做的工作。从目前的情况来看,Hadoop 注定会有一个辉煌的未来:云计算是目前灸手可热的技术名词,全球各大 IT 公司都在投资和推广这种新一代的计算模式,而 Hadoop 又被其中几家主要的公司用作其云计算环境中的重要基础软件。1.1 MapReduce计算模型MapReduce 是 Google 公司的核心计算模型,它将复杂的运行于大规模集群上的并行计算过程高度的抽象到了两个函数,Map 和 Reduce, 这是一个令人惊讶的简单却又威力巨大的模型。适合用 MapReduce 来处理的数据集(或任务)有一个基本要求: 待处理的数据集可以分解成许多小的数据集,而且每一个小数据集都可以完全并行地进行处理。如图1-1 MapReduce 计算流程所示。图1-1 MapReduce 计算流程图一说明了用 MapReduce 来处理大数据集的过程, 这个 MapReduce 的计算过程简而言之,就是将大数据集分解为成百上千的小数据集,每个(或若干个)数据集分别由集群中的一个结点(一般就是一台普通的计算机)进行处理并生成中间结果,然后这些中间结果又由大量的结点进行合并, 形成最终结果。 计算模型的核心是 Map 和 Reduce 两个函数,这两个函数由用户负责实现,功能是按一定的映射规则将输入的 对转换成另一个或一批 对输出。 表一 Map 和 Reduce 函数函数输入输出说明MapList()1. 将小数据集进一步解析成一批 对,输入 Map 函数中进行处理。2. 每一个输入的 会输出一批 。 是计算的中间结果。 Reduce输入的中间结果 中的 List(v2) 表示是一批属于同一个 k2 的 value以一个计算文本文件中每个单词出现的次数的程序为例, 可以是 ,经 Map 函数映射之后,形成一批中间结果 , 而 Reduce 函数则可以对中间结果进行处理,将相同单词的出现次数进行累加,得到每个单词的总的出现次数。 基于 MapReduce 计算模型编写分布式并行程序非常简单,程序员的主要编码工作就是实现 Map 和 Reduce 函数,其它的并行编程中的种种复杂问题,如分布式存储,工作调度,负载平衡,容错处理,网络通信等,均由 MapReduce 框架(比如 Hadoop )负责处理,程序员完全不用操心。 1.2 HDFS 模型HDFS是分布式计算的存储基石,Hadoop的分布式文件系统和其他分布式文件系统有很多类似的特质。分布式文件系统基本的几个特点:1.对于整个集群有单一的命名空间。2.数据一致性。适合一次写入多次读取的模型,客户端在文件没有被成功创建之前是无法看到文件存在。3.文件会被分割成多个文件块,每个文件块被分配存储到数据节点上,而且根据配置会有复制文件块来保证数据的安全性。如下图1-2所示。图1-2 HDFS结构图上图中展现了整个HDFS三个重要角色:NameNode,DataNode,Client。NameNode可以看作是分布式文件系统中的管理者,主要负责管理文件系统的命名空间,集群配置信息,存储块的复制。Name Node会存储文件系统的Meta-data在内存中,这些信息主要包括了文件信息,每一个文件对应的文件块的信息,每一个文件块在Data Node的信息。DataNode是文件存储的基本单元。它存储Block在本地文件系统中,保存了Block的Meta-data,同时周期性的发送所有存在的block的报告给NameNode。Client就是需要获取分布式文件系统文件的应用程序。2 Hadoop 环境配置对于Hadoop的集群来说,可以分成两大类角色,Master和Slave,前者主要配置NameNode和JobTracker的角色,负责总管分布式数据和分解任务的执行,后者配置DataNode和TaskTracker的角色,负责分布式数据存储以及任务的执行。Hadoop 支持 Linux 及 Windows 操作系统, 但其官方网站声明 Hadoop 的分布式操作在 Windows 上未做严格测试,建议只把 Windows 作为 Hadoop 的开发平台。在 Windows 环境上的安装步骤如下( Linux 平台类似,且更简单一些): 1在 Windows 下,需要先安装 Cgywin, 安装 Cgywin 时注意一定要选择安装 openssh (在 Net category )。安装完成之后,把 Cgywin 的安装目录如 c:cygwinbin 加到系统环境变量 PATH 中,这是因为运行 Hadoop 要执行一些 Linux 环境下的脚本和命令。 2安装 Java 1.5.x,并将 JAVA_HOME 环境变量设置为 Java 的安装根目录如 C:Program FilesJavajdk1.5.0_01。 3到 Hadoop 官方网站 下载Hadoop Core, 最新的稳定版本是 0.16.0. 将下载后的安装包解压到一个目录,本文假定解压到 c:hadoop-0.16.0。 4修改 conf/hadoop-env.sh 文件,在其中设置 JAVA_HOME 环境变量: export JAVA_HOME=C:Program FilesJavajdk1.5.0_01” 。3 集群上的并行计算 MapReduce 计算模型非常适合在大量计算机组成的大规模集群上并行运行。每一个 Map 任务和每一个 Reduce 任务均可以同时运行于一个单独的计算结点上,可想而知其运算效率是很高的,那么这样的并行计算是如何做到的呢? 3.1数据分布存储Hadoop 中的分布式文件系统 HDFS 由一个管理结点 ( NameNode )和N个数据结点 ( DataNode )组成,每个结点均是一台普通的计算机。在使用上同我们熟悉的单机上的文件系统非常类似,一样可以建目录,创建,复制,删除文件,查看文件内容等。但其底层实现上是把文件切割成 Block,然后这些 Block 分散地存储于不同的 DataNode 上,每个 Block 还可以复制数份存储于不同的 DataNode 上,达到容错容灾之目的。NameNode 则是整个 HDFS 的核心,它通过维护一些数据结构,记录了每一个文件被切割成了多少个 Block,这些 Block 可以从哪些 DataNode 中获得,各个 DataNode 的状态等重要信息。 3.2 分布式并行计算Hadoop 中有一个作为主控的 JobTracker,用于调度和管理其它的 TaskTracker, JobTracker 可以运行于集群中任一台计算机上。TaskTracker 负责执行任务,必须运行于 DataNode 上,即 DataNode 既是数据存储结点,也是计算结点。 JobTracker 将 Map 任务和 Reduce 任务分发给空闲的 TaskTracker, 让这些任务并行运行,并负责监控任务的运行情况。如果某一个 TaskTracker 出故障了,JobTracker 会将其负责的任务转交给另一个空闲的 TaskTracker 重新运行。 3.3 本地计算数据存储在哪一台计算机上,就由这台计算机进行这部分数据的计算,这样可以减少数据在网络上的传输,降低对网络带宽的需求。在 Hadoop 这样的基于集群的分布式并行系统中,计算结点可以很方便地扩充,而因它所能够提供的计算能力近乎是无限的,但是由是数据需要在不同的计算机之间流动,故网络带宽变成了瓶颈,是非常宝贵的,“本地计算”是最有效的一种节约网络带宽的手段,业界把这形容为“移动计算比移动数据更经济”,如图3-1所示。 图 3-1. 分布存储与并行计算4 代码范例:业务场景描述:可设定输入和输出路径(操作系统的路径非HDFS路径),根据访问日志分析某一个应用访问某一个API的总次数和总流量,统计后分别输出到两个文件中。仅仅为了测试,因此没有去细分很多类,将所有的类都归并于一个类便于说明问题。如图4-1所示的测试代码类图。图4-1测试代码类图 LogAnalysiser就是主类,主要负责创建,提交任务,并且输出部分信息。内部的几个子类用途可以参看流程中提到的角色职责。具体的看看几个类和方法的代码片断:LogAnalysiser:MapClasspublicstaticclassMapClassextendsMapReduceBaseimplementsMapperpublicvoidmap(LongWritable key, Text value, OutputCollector output, Reporter reporter)throwsIOExceptionString line = value.toString();/没有配置RecordReader,所以默认采用line的实现,key就是行号,value就是行内容if(line =null| line.equals()return;String words = line.split(,);if(words =null| words.length 8)return;String appid = words1;String apiName = words2;LongWritable recbytes =newLongWritable(Long.parseLong(words7);Text record =newText();record.set(newStringBuffer(flow:).append(appid).append(:).append(apiName).toString();gress();output.collect(record, recbytes);/输出流量的统计结果,通过flow:作为前缀来标示。record.clear();record.set(newStringBuffer(count:).append(appid).append(:).append(apiName).toString();output.collect(record,newLongWritable(1);/输出次数的统计结果,通过count:作为前缀来标示LogAnalysiser: PartitionerClasspublicstaticclassPartitionerClassimplementsPartitionerpublicintgetPartition(Text key, LongWritable value,intnumPartitions)if(numPartitions = 2)/Reduce个数,判断流量还是次数的统计分配到不同的Reduceif(key.toString().startsWith(flow:)return0;elsereturn1;elsereturn0;publicvoidconfigure(JobConf job)LogAnalysiser: CombinerClass参看ReduceClass,通常两者可以使用一个,不过这里有些不同的处理就分成了两个。在ReduceClass中蓝色的行表示在CombinerClass中不存在。LogAnalysiser: ReduceClasspublicstaticclassReduceClassextendsMapReduceBaseimplementsReducerpublicvoidreduce(Text key, Iterator values,OutputCollector output, Reporter reporter)throwsIOExceptionText newkey =newText();newkey.set(key.toString().substring(key.toString().indexOf(:)+2);LongWritable result =newLongWritable();longtmp = 0;intcounter = 0;while(values.hasNext()/累加同一个key的统计结果tmp = tmp + values.next().get();counter = counter +1;/担心处理太久,JobTracker长时间没有收到报告会认为TaskTracker已经失效,因此定时报告一下if(counter = 1000)counter = 0;gress();result.set(tmp);output.collect(newkey, result);/输出最后的汇总结果LogAnalysiserpublicstaticvoidmain(String args)tryrun(args);catch(Exception e)e.printStackTrace();publicstaticvoidrun(String args)throwsExceptionif(args =null| args.length= 0)shortin = shortin.substring(shortin.lastIndexOf(File.separator);if(shortout.indexOf(File.separator) = 0)shortout = shortout.substring(shortout.lastIndexOf(File.separator);SimpleDateFormat formater =newSimpleDateFormat(yyyy.MM.dd);shortout =newStringBuffer(shortout).append(-).append(formater.format(newDate().toString();if(!shortin.startsWith(/)shortin =/+ shortin;if(!shortout.startsWith(/)shortout =/+ shortout;shortin =/user/root+ shortin;shortout =/user/root+ shortout;File inputdir =newFile(inputpath);File outputdir =newFile(outputpath);if(!inputdir.exists() | !inputdir.isDirectory() System.out.println(inputpath not exist or isnt dir!);return;if(!outputdir.exists()newFile(outputpath).mkdirs();JobConf conf =newJobConf(newConfiguration(),LogAnalysiser.class);/构建ConfigFileSystem fileSys = FileSystem.get(conf);fileSys.copyFromLocalFile(newPath(inputpath),newPath(shortin);/将本地文件系统的文件拷贝到HDFS中conf.setJobName(analysisjob);conf.setOutputKeyClass(Text.class);/输出的key类型,在OutputFormat会检查conf.setOutputValueClass(LongWritable.class);/输出的value类型,在OutputFormat会检查conf.setMapperClass(MapClass.class);conf.setCombinerClass(CombinerClass.class);conf.setReducerClass(ReduceClass.class);conf.setPartitionerClass(PartitionerClass.class);conf.set(mapred.reduce.tasks,2);/强制需要有两个Reduce来分别处理流量和次数的统计FileInputFormat.setInputPaths(conf, shortin);/hdfs中的输入路径FileOutputFormat.setOutputPath(conf,newPath(shortout);/hdfs中输出路径Date startTime =newDate();System.out.println(Job started: + startTime);JobClient.runJob(conf);Date end_time =newDate();System.out.println(Job ended: + end_time);System.out.println(The job took + (end_time.getTime() - startTime.getTime() /1000 + seconds.);/删除输入和输出的临时文件fileSys.copyToLocalFile(newPath(shortout),newPath(outputpath);fileSys.delete(newPath(shortin),true);fileSys.delete(newPath(shortout),true);以上的代码就完成了所有的逻辑性代码,然后还需要一个注册驱动类来注册业务Class为一个可标示的命令,让hadoop jar可以执行。publicclassExampleDriver publicstaticvoidmain(String argv)ProgramDriver pgd =newProgramDriver();trypgd.addClass(analysislog, LogAnalysiser.class,A map/reduce program that analysis log .);pgd.driver(argv);catch(Throwable e)e.printStackTrace();将代码打成jar,并且设置jar的mainClass为ExampleDriver这个类。在分布式环境启动以后执行如下语句:hadoop jar analysiser.jar analysislog /home/wenchu/test-in /home/wenchu/test-out在/home/wenchu/test-in中是需要分析的日志文件,执行后就会看见整个执行过程,包括了Map,Reduce的进度。执行完毕会在/home/wenchu/test-out下看到输出的内容。有两个文件:part-00000和part-00001分别记录了统计后的结果。如果需要看执行的具体情况,可以看在输出目录下的_logs/history/xxxx_analysisjob,里面罗列了所有的Map,Reduce的创建情况以及执行情况。在运行期也可以通过浏览器来查看Map,Reduce的情况:http:/MasterIP:50030/jobtracker.jsp5 Hadoop集群测试首先这里使用上面的范例作为测试,也没有做太多的优化配置,这个测试结果只是为了看看集群的效果,以及一些参数配置的影响。文件复制数为1,blocksize5MSlave数处理记录数(万条)执行时间(秒)295382950337495244950178695216950114Blocksize5MSlave数处理记录数(万条)执行时间(秒)2(文件复制数为1)950

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论