hadoop rpc服务端初始化和调用过程详解.docx_第1页
hadoop rpc服务端初始化和调用过程详解.docx_第2页
hadoop rpc服务端初始化和调用过程详解.docx_第3页
hadoop rpc服务端初始化和调用过程详解.docx_第4页
hadoop rpc服务端初始化和调用过程详解.docx_第5页
已阅读5页,还剩4页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

本文主要描述了hadoop rpc服务端的初始化和调用过程,相比客户端的初始化,rpc服务端感觉会简单点,但是调用过程却比客户端复杂一些。本文还是以namenode为例,namenode会在执行main方法的时候,创建一个namenode实例,及完成一系列的初始化过程,其中就包括了rpc的初始化过程。rpc服务端的初始化上面已经提到我们这里主要借用了namenode的远程服务,先来看看相关代码:?12345678910111213141516171819public class NameNode implements NameNodeStatusMXBean public static void main(String argv) throws Exception NameNode namenode = createNameNode(argv, null); protected NameNode(Configuration conf, NamenodeRole role)throws IOException initialize(conf);protected void initialize(Configuration conf) throws IOException rpcServer = createRpcServer(conf);startCommonServices(conf); /相当重要protected NameNodeRpcServer createRpcServer(Configuration conf)throws IOException return new NameNodeRpcServer(conf, this);我们的linux的终端执行hadoop的启动命令的时候,最终的命令是调用NameNode的main方法,所以我们追踪代码的切入点是NameNode的main方法,方法比较简单,就是调用NameNode的构造函数创建一个NameNode,然后执行初始化方法initialize,这个方法相对来说,是我们关注的重点,包括rpc服务在内的初始化操作都放在这个方法里面。特定于rpc,他执行了两个相关的方法createRpcServer和startCommonServices,第一个方法见名思意,不多说,先简单介绍下后面的方法,该方法的作用就是启动namenode的rpc服务,稍后我给出代码。好的,从上面的代码可以看到,我们的rpcServer功能都放在了类NameNodeRpcServer里面,现在让我们来看看这个类里面相关的代码:?123456789101112131415161718192021222324252627282930313233343536class NameNodeRpcServer implements NamenodeProtocols public NameNodeRpcServer(Configuration conf, NameNode nn)throws IOException RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class,ProtobufRpcEngine.class);ClientNamenodeProtocolServerSideTranslatorPB clientProtocolServerTranslator = new ClientNamenodeProtocolServerSideTranslatorPB(this);BlockingService clientNNPbService = ClientNamenodeProtocol.newReflectiveBlockingService(clientProtocolServerTranslator);InetSocketAddress rpcAddr = nn.getRpcServerAddress(conf); / fs.defaultFSString bindHost = nn.getRpcServerBindHost(conf);if (bindHost = null) bindHost = rpcAddr.getHostName();LOG.info(RPC server is binding to + bindHost + : + rpcAddr.getPort();this.clientRpcServer = new RPC.Builder(conf).setProtocol(tocolPB.ClientNamenodeProtocolPB.class).setInstance(clientNNPbService).setBindAddress(bindHost).setPort(rpcAddr.getPort().setNumHandlers(handlerCount).setVerbose(false).setSecretManager(namesystem.getDelegationTokenSecretManager().build();/ Add all the RPC protocols that the namenode implementsDFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService,clientRpcServer);DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,clientRpcServer);DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService,clientRpcServer);在NameNodeRpcServer的构造函数里面最重要的一件事情是实例化clientRpcServer,这里面我最想说明的是,NameNode宣称自己实现了三个协议:ClientProtocol、DatanodeProtocol和NamenodeProtocol,在服务端的实现基本上就靠ClientNamenodeProtocolServerSideTranslatorPB之类的类型了,特别在实例化ClientNamenodeProtocolServerSideTranslatorPB的时候有传入一个形参,这个形参就是NameNodeRpcServer实例,看代码:?12345678910111213141516171819202122public ClientNamenodeProtocolServerSideTranslatorPB(ClientProtocol server)throws IOException this.server = server;Overridepublic GetBlockLocationsResponseProto getBlockLocations(RpcController controller, GetBlockLocationsRequestProto req)throws ServiceException try LocatedBlocks b = server.getBlockLocations(req.getSrc(), req.getOffset(),req.getLength();Builder builder = GetBlockLocationsResponseProto.newBuilder();if (b != null) builder.setLocations(PBHelper.convert(b).build();return builder.build(); catch (IOException e) throw new ServiceException(e);上面代码中的getBlockLocations也一定程度上说明了刚才的观点。现在让我们回过头看看NameNode中initialize方法中执行的startCommonServices方法,这个方法用来启动clientRpcServer下面的线程,包括listener,handler、response,具体看代码:?123456789101112131415161718192021222324252627public class NameNode implements NameNodeStatusMXBean private void startCommonServices(Configuration conf) throws IOException rpcServer.start();class NameNodeRpcServer implements NamenodeProtocols void start() clientRpcServer.start();if (serviceRpcServer != null) serviceRpcServer.start(); public abstract class Server public synchronized void start() responder.start();listener.start();handlers = new HandlerhandlerCount;for (int i = 0; i handlerCount; i+) handlersi = new Handler(i);handlersi.start();代码看到这里,启动过程中rpc相关的代码就结束了。rpc服务端的调用过程现在让我们来看看rpc被调用的过程,先来认识下Server的关键结构:?1234567891011121314151617public abstract class Server private Listener listener = null;private Responder responder = null;private Handler handlers = null;private class Responder extends Thread private class Listener extends Thread private class Handler extends Thread 在初始化的时候,就启动listener、responder和handlers下面的所有线程。其中listener线程里面启动了一个socker服务,专门用来接受客户端的请求,handler下面的线程用来处理具体的请求,responder写请求结果,具体过程可以看下下面的代码:?123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110public abstract class Server private Listener listener = null;private Responder responder = null;private Handler handlers = null;private class Listener extends Thread public Listener() throws IOException address = new InetSocketAddress(bindAddress, port);/ Create a new server socket and set to non blocking modeacceptChannel = ServerSocketChannel.open();acceptChannel.configureBlocking(false);/ Bind the server socket to the local host and portbind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig);port = acceptChannel.socket().getLocalPort(); /Could be an ephemeral port/ create a selector;selector= Selector.open();readers = new ReaderreadThreads;for (int i = 0; i readThreads; i+) Reader reader = new Reader(Socket Reader # + (i + 1) + for port + port);readersi = reader;reader.start();/ Register accepts on the server socket with the selector.acceptChannel.register(selector, SelectionKey.OP_ACCEPT);this.setName(IPC Server listener on + port);this.setDaemon(true);public void run() while (running) doAccept(key);void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOfMemoryError Reader reader = getReader();Connection c = connectionManager.register(channel);key.attach(c); / so closeCurrentConnection can get the objectreader.addConnection(c);private class Reader extends Thread public void run() doRunLoop();private synchronized void doRunLoop() while (running) Connection conn = pendingConnections.take();conn.channel.register(readSelector, SelectionKey.OP_READ, conn);readSelector.select();doRead(key);void doRead(SelectionKey key) throws InterruptedException Connection c = (Connection)key.attachment();count = c.readAndProcess();public class Connection public int readAndProcess()processOneRpc(data.array();private void processOneRpc(byte buf)processRpcRequest(header, dis);private void processRpcRequest(RpcRequestHeaderProto header,DataInputStream dis) throws WrappedRpcServerException,InterruptedException Call call = new Call(header.getCallId(), header.getRetryCount(),rpcRequest, this, ProtoUtil.convert(header.getRpcKind(), header.getClientId().toByteArray();callQueue.put(call);private class Handler extends Thread public void run() final Call call = callQueue.take();value = call(call.rpcKind, tocolName, call.rpcRequest, call.timestamp);setupResponse(buf, call, returnStatus, detailedErr, value, errorClass, error);responder.doRespond(call);private class Responder extends Thread void doRespond(Call call) throws IOException processResponse(call.connection.responseQueue, true);private boolean processResponse(LinkedList responseQueue,boolean inHandler) throws IOException int numBytes = channelWrite(channel, call.rpcResponse);done = true;这里给出了一个比较完整版Server的rpc调用过程,从listener都构造函数开始,在他的构造函数中起了几个reader线程,当监听器收到访问请求的时候,由reader请请求中读取数据,reader中实际上调用的是connection的readAndProcess方法,在这个方法中,会往RPC server中的callQueue添加call对象,之后,handler这个家伙从队列中取出当前call,具体的处理过程,用到了Server类的call方法,这地方有些玄机,仔细跟过代码的人才知道,因为server的实例类不再是org.apache.hadoop.ipc.Server,而是Protobuf的一个实现类,org.apache.hadoop.ipc.RPC.Server,而且call方法是被重写过的,代码如下:?123456Overridepublic Writable call(RPC.RpcKind rpcKind, String protocol,Writable rpcRequest, long receiveTime) throws Exception return getRpcInvoker(rpcKind).call(this, protocol, rpcRequest,receiveTime);继续追踪下,差不多就可以到底了:?1234567891011121314public class ProtobufRpcEngine imple

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论