版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
Hadoop的RPC设计分析之前鼓捣Hbase的时候,觉得单机和伪分布式模式太low了,就在笔记本上用三个虚拟机搭建了一个“完全分布式”的Hbase环境(心疼破本子一秒钟)。刚好趁这个元旦假期,我就研究了一下\o""hadoop。Hadoop也算是个巨无霸了,涉及了很多方面的功能。个人工作中有多个RPCclient管理以及交互的场景,一直觉得设计的不太好。所以心里一直想研究一下优秀项目的多路RPC是如何实现的,然后计划一直搁置到现在。难得小假期,就拿手上的Hadoop开刀吧!1.宏观背景Hadoop的RPC确实挺复杂的,就单单以HDFS为例,client与NameNode,client与DataNode,NameNode与DataNode以及DataNode与其他DataNode。如果要提到Hadoopmap/reduce,那么事情就更不简单了。虽然Hadoop的RPC如此复杂,但是这些RPC都是基于同一个RPC框架,这个RPC框架是Hadoop自己实现的。不同的RPC只需要在这个RPC框架上实现自己的通信协议即可。这篇文章里,我打算主要分析这个底层的RPC框架是如何实现的。2.Client实现RPC的client端实现在org.apache.hadoop.ipc这个包里面。至于这个包为啥叫ipc,我也不太明白,这个ipc也该不是interprocesscommunication的缩写。前文已经说过了,一个Client(不仅仅是Hadoop的客户端,也可能是DataNode等等)会存在多个客户端连接。这个情况下,Hadoop的Client的内部会持有多个连接。Client有Connection、ConnectionId这样的一些内部类。其中ConnectionId包含IntetSocketAddress和一些配置信息;而Connection则就是一个Thread的子类,负责接收和发送消息。privateConcurrentMap<ConnectionId,Connection>connections=newConcurrentHashMap<>();这个connections就是Client的成员变量,代表着Client所建立的所有连接。此外Client还有一个叫Call内部类。Call代表一次RCP调用,虽然Hadoop的RPC是直接基于TCP的,但上层使用起来和REST之类的RPC还是非常相似的。Call的代码片段如下:/***ClassthatrepresentsanRPCcall*/staticclassCall{finalintid;//callidfinalintretry;//retrycountfinalWritablerpcRequest;//theserializedrpcrequestWritablerpcResponse;//nullifrpchaserrorIOExceptionerror;//exception,nullifsuccessfinalRPC.RpcKindrpcKind;//RpcEngineKindbooleandone;//truewhencallisdoneprivatefinalObjectexternalHandler;privateCall(RPC.RpcKindrpcKind,Writableparam){this.rpcKind=rpcKind;this.rpcRequest=param;finalIntegerid=callId.get();if(id==null){this.id=nextCallId();}else{callId.set(null);this.id=id;}finalIntegerrc=retryCount.get();if(rc==null){this.retry=0;}else{this.retry=rc;}this.externalHandler=EXTERNAL_CALL_HANDLER.get();}}id表示这次RPC的调用的编号,因为这里的TCPRPC是全双工的,所以需要一个序列标识。为了保证Call的id在单个连接中唯一,Client定义了几个AtomicInteger变量。每个RPCCall都会把这个id带上,call的response里面也会带上这个id,这样客户端可以分发消息了。一个Client主要的数据结构如下图所示:其实这个逻辑结构显得挺简单的,主要的工作还是在Connection类中完成的。Connection作为一个Thread的子类,它的run()方法其实就是不断的read,然后根据Response中的Callid分发返回消息。在具体实现中,Connection的run方法就是在while循环中不断receiveRpcResponse()。privatevoidreceiveRpcResponse(){if(shouldCloseConnection.get()){return;}touch();try{ByteBufferbb=ipcStreams.readResponse();RpcWritable.Bufferpacket=RpcWritable.Buffer.wrap(bb);RpcResponseHeaderProtoheader=packet.getValue(RpcResponseHeaderProto.getDefaultInstance());checkResponse(header);intcallId=header.getCallId();if(LOG.isDebugEnabled())LOG.debug(getName()+"gotvalue#"+callId);RpcStatusProtostatus=header.getStatus();if(status==RpcStatusProto.SUCCESS){Writablevalue=packet.newInstance(valueClass,conf);finalCallcall=calls.remove(callId);call.setRpcResponse(value);}//verifythatpacketlengthwascorrectif(packet.remaining()>0){thrownewRpcClientException("RPCresponselengthmismatch");}if(status!=RpcStatusProto.SUCCESS){//RpcRequestfailedfinalStringexceptionClassName=header.hasExceptionClassName()?header.getExceptionClassName():"ServerDidNotSetExceptionClassName";finalStringerrorMsg=header.hasErrorMsg()?header.getErrorMsg():"ServerDidNotSetErrorMsg";finalRpcErrorCodeProtoerCode=(header.hasErrorDetail()?header.getErrorDetail():null);if(erCode==null){LOG.warn("Detailederrorcodenotsetbyserveronrpcerror");}RemoteExceptionre=newRemoteException(exceptionClassName,errorMsg,erCode);if(status==RpcStatusProto.ERROR){finalCallcall=calls.remove(callId);call.setException(re);}elseif(status==RpcStatusProto.FATAL){//ClosetheconnectionmarkClosed(re);}}}catch(IOExceptione){markClosed(e);}}看了接收逻辑,那么发送RPCcall的逻辑也必不可少。有一点值得注意的是,发送RPCcall都不是connection线程,所以这里需要一些线程同步方法。一般来说,会使用消息队列的方式来缓存call,然后一个发送线程不断发送call。不过Hadoop不是这样做的,它使用的是一个线程池,然后传输给线程池的是一个包装发送Call的Runnable。为什么采用这种完全taskbase的方法,我也没太明白。不过话说回来,也没有明显的缺点,反而是把消息队列的工作扔给线程池了,减少了一定工作量。这里简单的贴一点代码:publicvoidsendRpcRequest(finalCallcall)throwsInterruptedException,IOException{if(shouldCloseConnection.get()){return;}//Serializethecalltobesent.Thisisdonefromtheactual//callerthread,ratherthanthesendParamsExecutorthread,RpcRequestHeaderProtoheader=ProtoUtil.makeRpcRequestHeader(call.rpcKind,OperationProto.RPC_FINAL_PACKET,call.id,call.retry,clientId);finalResponseBufferbuf=newResponseBuffer();header.writeDelimitedTo(buf);RpcWritable.wrap(call.rpcRequest).writeTo(buf);synchronized(sendRpcRequestLock){Future<?>senderFuture=sendParamsExecutor.submit(newRunnable(){@Overridepublicvoidrun(){try{synchronized(ipcStreams.out){if(shouldCloseConnection.get()){return;}if(LOG.isDebugEnabled()){LOG.debug(getName()+"sending#"+call.id);}//RpcRequestHeader+RpcRequestipcStreams.sendRequest(buf.toByteArray());ipcStreams.flush();}}catch(IOExceptione){//exceptionatthispointwouldleavetheconnectioninan//unrecoverablestate(eghalfacallleftonthewire).//So,closetheconnection,killinganyoutstandingcallsmarkClosed(e);}finally{//thebufferisjustanin-memorybuffer,butitisstillpoliteto//closeearlyIOUtils.closeStream(buf);}}});}}3.Server实现前面大致分析了一遍Client,然后这里就轮到了Server的实现了。Server和Client在一个包,不过这个Server是个抽象类。Server唯一的一个抽象方法就是call方法,这个方法就是处理具体请求的。不同功能的Server会有不同的业务逻辑,所以它们需要实现这个函数。通过之前的Client分析,Server的实现也应该能猜出一二了。Server类的逻辑结构图如下:Server类的Connection、Call与Client的非常相似,所以这里就不再赘述。相对Client来说,Server的线程模型更复杂一些。Server类有很多内部类,Listener,Responder,Handler这几个内部类
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 2026-2030全球与中国环保水瓶行业深度调查与投资营销策略建议报告
- 2026-2030招聘系统产业深度调研及发展趋势与投资战略研究报告
- 小儿腹泻的睡眠护理
- 急救护理技术与操作
- 护理课件分享
- 2025年智能家居阳台功能集成方案
- 某汽车厂涂装车间管理
- 某纺织厂加班审批细则
- 宠物眼睛清洁
- 某塑料厂产品追溯准则
- 2026四川广安爱众股份限公司招聘5人(第四批次)易考易错模拟试题(共500题)试卷后附参考答案
- 2026广东肇庆市端州区教育局招聘中小学教师75人笔试备考题库及答案详解
- 2026年幼儿园学前班课件完整版
- 2026湖南长沙市雨花区东塘街道办事处公开招聘工作人员3人笔试备考试题及答案详解
- 2026年全国护士执业资格考试试题及答案
- GB/T 45355-2025无压埋地排污、排水用聚乙烯(PE)管道系统
- 部门年度工作目标分解与工作计划模板
- 个体餐饮员工的规章制度
- 万象天地详情
- GB/T 7704-2017无损检测X射线应力测定方法
- 采油气井口及阀门知识
评论
0/150
提交评论