hadoop[1].源码阅读总结.doc_第1页
hadoop[1].源码阅读总结.doc_第2页
hadoop[1].源码阅读总结.doc_第3页
hadoop[1].源码阅读总结.doc_第4页
hadoop[1].源码阅读总结.doc_第5页
已阅读5页,还剩6页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

commonipc/rpc server大概流程基于nio,listener关注op_accept事件,当有客户端连接过来,accept后,从readers中选取一个reader将客户端channel注册到reader中的nio selector,并新建一个connection对象关联客户端channel,reader关注op_read事件.客户端建立连接后,首先发送的是connnectionheader包含协议名,用户组信息,验证方法,connection会根据以上信息进行校验.之后将是先读取4位的长度代表这次请求的数据的长度,然后一直等待事件触发读取够长度,将读取的数据 解码为调用id和param,新建一个call对象(关联connection)放入call队列中,handlers中的handler会将call中callquene中取走,call中的param实际为invocation对象,包含调用方法名,参数名,参数类型,由这些信息使用java反射api调用server的instance对象,获取返回值,组织返回数据,写入call的response属性中,马上调用responder的dorespond方法,将call加入到connection的responsequene中,如果responsequene长度等于1做一次nio写操作,如果不能一次性能够将数据写完,将客户端channel注册到responder关注写事件op_write,下一次继续写,如果长度不为1证明该channel已经注册到了responder了直接加入队列,由responder线程后续处理.note:客户端关闭流后出发一次读操作,返回为-1,server关闭连接curcall与获取客户端iphandler获取一个call后,会将server的curcall(threadlocal类型)设置为当前的call,调用instance方法实际是在handler线程中,在instance的方法内就可以使用server提供的方法来获取客户端ipping在上述过程中如果读取的4位长度为-1代表是客户端ping操作清理空闲连接如果一定时间ipc.client.connection.maxidletime没有读取到数据并且当前连接现有call数目为0,则视为空闲连接,listener会在每次接受完新连接之后进行一次清理,最多清理ipc.client.kill.max个连接,如果出现outofmemoryerror则强制清理全部空闲连接datanode,tasktractor设置的心跳时间需小于空闲时间.清理长时间未发送到客户端的响应注册到responder的call如果长时间没有发送到客户端,每隔一段时间会清理掉涉及参数ipc.server.handler.queue.sizecallquene队列大小,随集群增大而增大,ipc.server.max.response.size如果返回的结果序列化后大小大于这个值,重置缓冲区bytearrayoutputstream释放内存.ipc.server.read.threadpool.sizereaders个数ipc.server.tcpnodelaytcp优化参数nagles algorithmhandlercounthandler个数,由构造参数指定,在dn,tt中配置socketsendbuffersizesocket设置ipc.server.listen.queue.sizesocket设置socket.bind(address, backlog)ipc.client.idlethreshold总连接数超过多少后,开始清理空闲连接ipc.client.kill.max一次最多清理多少个空闲连接ipc/rpc clientclient代理模式,调用rpc.getproxy实际上返回的一个代理对象,当调用方法的时候实际调用的是invoker, invoker将协议,调用的方法名,参数,参数类型封装成invocation对象经过client发送到server,并读取返回流,根据流中的id,判断是服务器返回的是那次调用的结果.connection线程负责读取server返回值,在读取的过程中,调用client的线程会wait直到connection获取到返回值.读取时候如果超时(erval)就发送一次ping,如果没有出现ioexception就继续读取,conneciton可以根据标识(地址,用户组信息,协议)共用.ipc/rpc authinghdfsname协议clientprotocol:客户端调用协议,涉及文件操作,dfs管理,升级(dfsadmin)datanodeprotocol:dn与nn通讯协议,注册,blockreport,心跳,升级namenodeprotocol: sn和nn通讯协议,通知nn使用新的fsimage和editrefreshauthorizationpolicyprotocol, refreshusermappingsprotocol fsnamesystem数据结构lightweightgsetgset 类似set但提供get方法,主要用于存储blockinfo,根据block找到blockinfo其中一个实现lightweightgset,利用数组存储元素,用链表解决冲突问题,类似hashmap但是没有rehash操作blocksmap初始化lightweightgset时候,会根据jvm内存将数组的大小初始为最大能占用的内存数(4194304 -xmx1024m)加上高效的hash映射算法, lightweightgset在blockinfo数量比较小的时候get性能逼近数组.blockinfo继承block,没有重写hashcode和equals方法,在block中equals方法只要求传入的对象是block实例并且blockid相等,就认为两个对象相等,故存储blockinfo时候分配的在数组中的index和get时候由block的hashcode定位是一致的.blockmapsblockmaps负责管理block以及block相关的元数据block 有3个long型的属性blockid(随机生成)numbytes(块大小),generationstampblockinfo继承block添加了2个属性,实现了用户lightweightgset的linkedelement接口inode:引用文件inodetriplets:3xreplication的数组,即replication 个组,每组有3个元素,第一个指向datanodedescriptor,代表在这个dn上有一个block,第二个和第三个分别代表dn上的上一个blockinfo和下一个blockinfodatanodedescriptor有一个属性blocklist指向一个blockinfo,因为每个blockinfo中的triplets中有一组记录着对应的dn上的上一个,下一个blockinfo,所以从这个角度来看blockinfo是一个双向链表.新建文件打开输入流后,写入,会在namenode中分配blockinfo,当block写入到分配的dn后,dn在发送心跳时候会将新接受到的块报告给nn,此时nn在将triplets可用的组关联到dn(dd).(例子前提假设:新建的集群没有文件,操作是在dn1上,此时很大可能性每次分配块的时候都会首选本地dn1,bkl_* *实际为随机数)namenode中分配blockinfo 并加入gset中,blk_1,0-64m,此时dn1的blocklist为nulldn1向nn报告接收了新的block blk_1 ,nn从blocksmap中根据block blk_1找到blockinfo blockinfo1 将triplets的可用组(=null)的第一位关联到dn1(datanodedescriptor1),将dn1的blocklist指向blockinfo1此时blocklist指向的是blockinfo1nn分配blockinfo2,dn1向nn报告接受到了信的block blk_2,nn找到blockinfo2后1,将triplets的可用组(=null)的第一位关联到dn1(datanodedescriptor1)2,将第三位指向blocklist即blockinfo1,2,将blockinfo1的对应dn1的组的第二位指向blockinfo24,将dn1的blocklist指向blockinfo1升级loadbalance磁盘占用,还是分布策略 可能出现一个dn上两个相同的block么.mapreduce命令行运行bin/hadoop jarjarfilemainclassargs.设置jvm启动参数,将lib,conf等加入classpath,启动jvm运行runjarrunjar阶段:1,设定mainclass如果jar设置了manifest,则作为mainclass否则取第二个参数2,在临时目录(hadoop.tmp.dir)中建立临时目录(file.createtempfile(hadoop-unjar, , tmpdir),并注册钩子jvm退出时候删除.3,将jar解压到建立的临时目录中4,将目录hadoop-unjar38923742,目录hadoop-unjar38923742/class, 目录hadoop-unjar38923742/lib中的每个文件作为urlclassloader参数,构造一个classloader.5,将当前线程的上下文classloader设置为classloader6,以上5步都是为mainclass启动做准备,最后应用反射启动mainclass,将args作为参数mainclassbin/hadoop jar -libjars testlib.jar -archives test.tgz -files file.txt inputjar argsjob.setxxx均将kv设置到了conf实例中(传值,例如将-file指定的文件设定到conf中,在submit中获取,从本地复制到hdfs)在job.submit方法中会向hdfs写入以下信息目录:hdfs:/$mapreduce.jobtracker.staging.root.dir/$user/.staging/$jobid/hdfs:/tmp/hadoop/mapred/staging/$user/.staging/$jobid/目录下文件:job.split-(split信息)由writesplits方法写入job.splitmetainfo-( split信息元数据,版本,个数,索引)由writesplits方法写入job.xml-conf对象job.jar-inputjarfiles/-参数-files 逗号分割,交给(distributedcache管理)archives/-参数-archives 逗号分割, 交给(distributedcache管理)libjars/-参数-libjars 逗号分割, 交给(distributedcache管理)split,splitmetainfo(filesplit)设计目的job.splitmetainfo中保存有split的在那几个机器上有副本,jt读取这个文件用,用来分配task使task能够读取本地磁盘文件.job.split保存具体的split,不保存位置信息,因为tt不需要(hdfs决定)jt调度capacitytaskscheduler,tttt启动时候,启动线程maplauncher(用于启动maptask),reducelauncher(用于启动reducetask), taskcleanupthread(用于清理task或者job),tt 通过心跳从jt获得heartbeatresponse,包含tasktrackeraction,具体有5种操作launch_task启动任务,将launchtaskaction中包装的task与conf对象和tasklauncher组合成taskinprogres,然后添加到maplauncher或者reducelauncher中的队列中.tasklauncher构造参数numslots代表当前tasktractor能同时执行多少个task,由参数mapred.tasktracker.map.tasks.maximum, mapred.tasktracker.reduce.tasks.maximum设定,slot意思为: 槽,位置 将tasktractor的资源抽象化,一般情况下一个task占用一个slot,如果有对资源需求大的task也可以通过参数来控制(调度器capacitytaskscheduler设置,未开放给user?)tasklauncher根据剩余空闲的槽位(numfreeslots)和队列情况,来从队列中取出task来运行(synchronized, wait, notify).kill_task杀死任务kill_job杀死和job相关的任务,放入taskstocleanup队列中reinit_tracker重新启动ttcommit_task提交任务(1, speculative execution 2,need commit file?) outputcommitterreinit_tracker 重启tt, startnewtask 新的jvm(不是tt的jvm,错误处理,gc)执行child.class,通过main参数argsmap过程maptask中会根据jobconf记录的hdfs上的job.split文件以及jt分配的splitindex获取inputsplit,根据jobconf的配置新建map和inputformat,由inputformat获取recordreader来读取inputsplit,生成原始original_key, original_value交给mapper.map方法处理生成gen_key,gen_value,根据partitioner生成partition,成对的(gen_key,gen_value, partition)会先放入一个缓冲区,如图,这个缓冲区分为3级索引(排序kvoffset,复制效率)等这个缓冲区到达一定阀值之后,并不是缓冲区慢之后,splitthread会标记当前前后界,对界内数据进行排序(现根据partition在根据kv),并写入到磁盘文件中(split.x.out)并记录各个partition段的位置,部分存到内存部分存到磁盘,在这个过程中,map仍然继续进行,如果缓冲区满之后,map线程暂时wait,到splitthread完毕.当输入读取完毕,随之的splitthread也结束后,磁盘中中间文件为split.1.out - split.n.out ,索引部分存在内存里面,超过1024*1024个,作为索引文件spill.n.out.index(避免内存不够用).然后通过合并排序将分段的文件(split.x.n)合并排序成一个文件file.out,file.out.index记录partition信息.(详细见mergequeue)这样在reduce过程中,通过http请求tt其中需要的partition段(参数reduce),tt根据file.out.index记录的索引信息将file.out的partition段,生成http响应.如果有combinersortandspillkvoffset达到临界点softrecordlimit,例如100个,设定80个为临界点.kvbuffer达到临界点softbufferlimit,例如100m,当80m为临界点.目的是为了不让map过程停止浪费时间,但由于io map可能会慢一点(进一步多磁盘负载).io.sort.mb配置的是图中kvoffset,kvindices,kvbuffer占用的空间总大小mb.上述参数都可以通过conf.setxxx来配置,根据特定job的特点来设定.来减少spill次数,同时避免内存溢出.reduce过程jobinprogress初始化mapred.reduce.tasks个reducetask 用参数partition区别.然后jt在心跳过程中,将reducetask分给tt执行.reducetask有shuffle, sort, reduce三个阶段shuffle这一阶段是reducetask初始化阶段,新建了n(参数控制)个下载线程,来获取map的输出,tasktracker中有一个线程会不断的从jt中获取在本tt运行的reudcetask(s)的job的map完成事件. reducetask不断从tt中获取job的map完成事件,然后将事件中的map输出位置交给下载线程来获取.下载的时候,从http响应头获取文件的大小,决定是放在内存中还是写入磁盘.在内存中的数据,满足一定条件会在后台将内存中的数据merger写入硬盘,在硬盘中的数据,满足一定条件(数目超过了2 * iosortfactor - 1)会在后台做merger.所有的map输出下载完毕,并且后台merger线程也结束后,进入sort阶段.sort这个阶段还是merger,将内存和硬盘中的数据,做合并排序(iosortfactor),使能够高效率的输出key ,values.然后进入redeuce阶段.reduce这一阶段只要是将上述产生的key value通过reducecontext转化成key values(valueitera

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论