已阅读5页,还剩8页未读, 继续免费阅读
版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
首先总结HDFS的设计架构,了解需要RPC通信的场景。HDFS的架构采用master/slave模式,一个HDFS集群是由一个Namenode和多个Datanode组成。在HDFS集群中,只有一个Namenode结点。Namenode作为HDFS集群的中心服务器,主要负责:n 管理集群中文件系统的命名空间。n 管理文件块和datanode的映射。n 管理datanode的状态报告。Datanode的主要功能是:n 负责节点上数据的读写操作。n 向Namenode结点报告状态。n 执行流水线复制。通过上面的叙述,可以看到,在HDFS集群中,存在三个主要的进程:Namenode进程、Datanode进程和文件系统客户端进程,这三个进程之间都是基于Hadoop实现的RPC机制进行通信的,该IPC模型基于Client/Server模式进行通信。因此上述三个进程之间存在如下端到端通信与交互:1、(Client)Datanode/ Namenode(Server)2、(Client)DFS Client/ Namenode(Server)3、(Client)DFS Client/ Datanode(Server)4、(Client)Datanode A/ Datanode B(Server)1. Client端分析客户端Client类提供的最基本的功能就是进行RPC调用,其中,提供了两种调用方式,一种就是串行单个调用,另一种就是并行调用,分别介绍如下。首先是串行单个调用的实现方法call,如下所示:public Writable call(Writable param, InetSocketAddress addr, Class protocol, UserGroupInformation ticket) throws InterruptedException, IOException Call call = new Call(param);/ 根据传入的addr, protocol和ticket构造connectionId对象,在client的connections池中检索是否已经存在,如果存在返回相应的connection/ 对象, 否则构造一个新的connection对象,将其加入到池中。并将该call加入到该connection的calls调用表中,然后调用setupIOstreams,该/ 法的功能是Connect to the server and set up the I/O streams. It then sends a header to the server and starts the connection/ thread that waits for responses.Connection connection = getConnection(addr, protocol, ticket, call);connection.sendParam(call); / send the parameterboolean interrupted = false;synchronized (call) while (!call.done) try / wait for the result 正常应该被callComplete唤醒/ void java.lang.Object.wait() throws InterruptedException Causes the current thread to wait until another thread / invokes the java.lang.Object.notify() method or the java.lang.Object.notifyAll() method for this object.call.wait(); catch (InterruptedException ie) / save the fact that we were interruptedinterrupted = true;if (interrupted) / set the interrupt flag now that we are done waitingThread.currentThread().interrupt();if (call.error != null) if (call.error instanceof RemoteException) call.error.fillInStackTrace();throw call.error; else / local exceptionthrow wrapException(addr, call.error); else return call.value;并行调用的实现方法:/* Makes a set of calls in parallel. Each parameter is sent to the corresponding address. When * all values are available, or have timed out or errored, the collected results are returned in * an array. The array contains nulls for calls that timed out or errored. */public Writable call(Writable params, InetSocketAddress addresses, Class protocol, UserGroupInformation ticket) throws IOException if (addresses.length = 0) return new Writable0;ParallelResults results = new ParallelResults(params.length);synchronized (results) for (int i = 0; i 0) try wait(timeout); catch (InterruptedException e) if (!calls.isEmpty() & !shouldCloseConnection.get() & running.get() return true; / calls队列中还有没有处理的请求 else if (shouldCloseConnection.get() return false; else if (calls.isEmpty() / idle connection closed or stoppedmarkClosed(null); / 标记shouldCloseConnection为truereturn false; else / get stopped but there are still pending requests markClosed(IOException)new IOException().initCause(new InterruptedException();return false; receiveResponse用来读取RPC调用的结果private void receiveResponse() if (shouldCloseConnection.get() return;touch(); try int id = in.readInt(); / try to read an id, 首先返回的是idif (LOG.isDebugEnabled()LOG.debug(getName() + got value # + id);Call call = calls.get(id);int state = in.readInt(); / read call statusif (state = Status.SUCCESS.state) Writable value = ReflectionUtils.newInstance(valueClass, conf);value.readFields(in); / read valuecall.setValue(value); / 这个函数读取完结构后调用callComplete,唤醒callcalls.remove(id); else if (state = Status.ERROR.state) call.setException(new RemoteException(WritableUtils.readString(in), WritableUtils.readString(in); else if (state = Status.FATAL.state) / Close the connectionmarkClosed(new RemoteException(WritableUtils.readString(in), WritableUtils.readString(in); catch (IOException e) markClosed(e); 2. Server端分析Server类,用来处理来自Client的RPC,接受参数,执行调用,回送结果。启动服务器如下:public synchronized void start() throws IOException responder.start();listener.start();handlers = new HandlerhandlerCount; for (int i = 0; i handlerCount; i+) handlersi = new Handler(i);handlersi.start();该函数分别启动responder,listener和各handler线程。下面先来看listener线程的创建和工作流程:public Listener() throws IOException address = new InetSocketAddress(bindAddress, port); / Create a new server socket and set to non blocking modeacceptChannel = ServerSocketChannel.open();acceptChannel.configureBlocking(false);/ Bind the server socket to the local host and port backlogLength 就是我们熟悉的backlogbind(acceptChannel.socket(), address, backlogLength);port = acceptChannel.socket().getLocalPort(); /Could be an ephemeral port/ create a selector;selector= Selector.open();/ Register accepts on the server socket with the selector.acceptChannel.register(selector, SelectionKey.OP_ACCEPT);this.setName(IPC Server listener on + port);this.setDaemon(true);public void run() / 分析这个函数是联想我们熟悉的select和poll机制LOG.info(getName() + : starting);SERVER.set(Server.this);while (running) SelectionKey key = null;try / 下面依次处理就绪的socketselector.select();Iterator iter = selector.selectedKeys().iterator();while (iter.hasNext() key = iter.next();iter.remove();try if (key.isValid() if (key.isAcceptable() doAccept(key); / 新的连接到来,创建一个connection对象将其加入connectionList else if (key.isReadable() doRead(key);/ 新的数据到来,根据Key获得关联的connection对象,设置connection/ 的最后联系时间为当前时间,然后调用c.readAndProcess()处理 catch (IOException e) key = null; catch (OutOfMemoryError e) / we can run out of memory if we have too many threads log the event and sleep for a minute and / give some thread(s) a chance to finishLOG.warn(Out of Memory in server select, e);closeCurrentConnection(key, e);cleanupConnections(true);try Thread.sleep(60000); catch (Exception ie) catch (InterruptedException e) if (running) / unexpected - log itLOG.info(getName() + caught: + StringUtils.stringifyException(e); catch (Exception e) closeCurrentConnection(key, e);cleanupConnections(false);LOG.info(Stopping + this.getName();synchronized (this) try acceptChannel.close(); selector.close(); catch (IOException e) selector= null;acceptChannel= null;/ clean up all connectionswhile (!connectionList.isEmpty() closeConnection(connectionList.remove(0);connection.readAndProcess()的处理流程:根据前面写入的数据顺序(HEADER - CURRENT_VERSION ConnectionHeader长度和序列化信息 dataLength(后面data的长度) data(call.id 和 参数序列化内容)对从网络中读取到的数据进行解析。然后通过processHeader函数读取ConnectionHeader后获得protocol和ugi信息;通过ProcessDate获得参数信息。private void processData() throws IOException, InterruptedException DataInputStream dis = new DataInputStream(new ByteArrayInputStream(data.array();int id = dis.readInt(); / try to read an id if (LOG.isDebugEnabled()LOG.debug( got # + id);/ 在Server端反序列化参数对象Writable param = ReflectionUtils.newInstance(paramClass, conf); / read param param.readFields(dis); / 创建Call对象,并将其放入callQueue,该队列是BlockingQueue类型的,所以put可能阻塞Call call = new Call(id, param, this);callQueue.put(call); / queue the call; maybe blocked here前面我们看到获得了一个call的全部信息后,创建一个Call的对象,并将其加入到callQueue,那么这些Call是如何被处理的呢,我们接着分析Handler类,该类的工作流程如下:/ 从队列中取得一个Call对象final Call call = callQueue.take(); / pop the queue; maybe blocked here/ 执行Callcall(tocol, call.param, call.timestamp);/ 为Call设置响应器setupResponse(buf, call, (error = null) ? Status.SUCCESS : Status.ERROR, value, errorClass, error);/ 对call的结果进行响应responder.doRespond(call);继续分析setupResponse,就是根据函数调用的成功与否来生成响应结果流,并存入Call的response中private void setupResponse(ByteArrayOutputStream response, Call call, Status status, Writable rv, String errorClass, String error) throws IOException response.reset();DataOutputStream out = new DataOutputStream(response);out.writeInt(call.id); / write call idout.writeInt(status.state); / write statusif (status = Status.SUCCESS) rv.write(out); else WritableUtils.writeString(out, errorClass);WritableUtils.writeString(out, error); call.setResponse(ByteBuffer.wrap(response.toByteArray();void doRespond(Call call) throws IOException synchronized (call.connection.responseQueue) call.connection.responseQueue.addLast(call);/ 如果队列中只有一条信息的话直接处理,否者要通过responder的线程函数来处理if (call.connection.responseQueue.size() = 1) / processResponse才真正的将response数据写入到套接口中processResponse(call.connection.responseQueue, true);下面开始分析responder线程的线程函数:public void run() LOG.info(getName() + : starting);SERVER.set(Server.this);long lastPurgeTime = 0; / last check for old calls.while (running) try waitPending(); / If a channel is being registered, wait.发送失败重新注册的时候会pendingwriteSelector.select(PURGE_INTERVAL);/ 等待特定的清理时间,然后会依次循环进行处理Iterator iter = writeSelector.selectedKeys().iterator();while (iter.hasNext() SelectionKey key = iter.next();iter.remove();try if (key.isValid() & key.isWritable() doAsyncWrite(key); / 异步写入数据 catch (IOException e) LOG.info(getName() + : doAsyncWrite threw exception + e); long now = System.currentTimeMillis(); if (now lastPurgeTime + PURGE_INTERVAL) continue;/ 前面是正常的处理流程,不需要进行定期的清理操作!下面开始进行周期性的清理lastPurgeTime = now; / If there were some calls that have not been sent out for a long time, discard them.LOG.debug(Checking for old call responses.);ArrayList calls; / get the list of channels from list of keys.synchronized (writeSelector.keys() calls = new ArrayList(writeSelector.keys().size();iter = writeSelector.keys().iterator();while (iter.hasNext() SelectionKey key = iter.next();Call call = (Call)key.attachment();if (call != null & key.channel() = call.connection.channel) calls.add(call); for(Call call : calls) try doPurge(call, now); / 定期执行的清理工作,将一段时间未发送的消息discard掉 catch (IOException e) LOG.warn(Error in purging old calls + e); catch (OutOfMemoryError e) / we can run out of memory if we have too many threads log the event and sleep for a minute and / givesome thread(s) a chance to finishLOG.warn(Out of Memory in server select, e);try Thread.sleep(60000); catch (Exception ie) catch (Exception e) LOG.warn(Exception in Responder + StringUtils.stringifyException(e); LOG.info(Stopping + this.getName();doAsyncWrite实质上是在内部调用processResponse来完成工作的,只不过将第二个参数设置为false,不再将处理失败的Call重新加入队列中。而doPurge的实质上的工作就是判断后关闭连接:if (now call.timestamp + PURGE_INTERVAL)closeConnection(call.connection);3. RPC分析为了分析Invoker,我们需要介绍一些Java反射实现Dynamic Proxy的背景。Dynamic Proxy是由两个class实现的:java.lang.reflect.Proxy 和 java.lang.reflect.InvocationHandler,后者是一个接口。所谓Dynamic Proxy是这样一种class:它是在运行时生成的class,在生成它时你必须提供一组interface给它,然后该class就宣称它实现了这些 interface。这个Dynamic Proxy其实就是一个典型的Proxy模式,它不会替你作实质性的工作,在生成它的实例时你必须提供一个handler,由它接管实际的工作。这个 handler,在Hadoop的RPC中,就是Invoker对象。我们可以简单地理解:就是你可以通过一个接口来生成一个类,这个类上的所有方法调用,都会传递到你生成类时传递的InvocationHandler实现中。在Hadoop的RPC中,Invoker实现了InvocationHandler的invoke方法 (invoke方法也是InvocationHandler的唯一方法)。Invoker会把所有跟这次调用相关的调用方法名,参数类型列表,参数列表打 包,然后利用前面我们分析过的Client,通过socket传递到服务器端。就是说,你在proxy类上的任何调用,都通过Client发送到远方的服 务器上。Invoker使用Invocation。Invocation封装了一个远程调用的所有相关信息,它的主 要属性有: methodName,调用方法名,parameterClasses,调用方法参数的类型列表和parameters,调用方法参数。注意,它实现了 Writable接口,可以串行化。RPC.Server实现了org.apache.hadoop.ipc.Server, 你可以把一个对象,通过RPC, 升级成为一个服务器。服务器接收到的请求(通过Invocation),解串行化以后,就变成了方法名,方法参数列表和参数列表。利用Java反射,我们就可以调用对应的对象的方法。调用的结果再通过socket,返回给客户端,客户端把结果解包后,就可以 返回给Dynamic Proxy的 使用者了。处理流程是首先用waitForProxy方法获得一个Get a proxy connection to a remote serverstatic VersionedProtocol waitForProxy(Class protocol, long clientVersion, InetSocketAddress addr, Configuration conf, long timeout) throws IOException long startTime = System.currentTimeMillis();IOException ioe;while (true) try return getProxy(protocol, clientVersion, addr, conf); / 做实际工作的 catch(ConnectException se) / namenode has not been startedLOG.info(Server at + addr + not available yet, Zzzzz.);ioe = se; catch(SocketTimeoutException te) / namenode is busyLOG.info(Problem connecting to server: + addr);ioe = te;/ check if timed outif (System.currentTimeMillis()-timeout = startTime) throw ioe;/ wait for retrytry Thread.sleep(1000); catch (InterruptedException ie) / IGNORE public static VersionedProtocol getProxy(Class protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory) throws IOException / 这段代码之后针对VersionedProtocol对象的调用就交给了Invoker的invoke处理,从而发送到服务器,进行远程调用VersionedProtocol proxy = (VersionedProtocol) Proxy.newProxyInstance( protocol.getClassLoader(), new Class protocol , new Invoker(addr, ticket, conf, factory);long serverVersion = proxy.getProtocolVersion(protocol.getName(), clientVersion);if (serverVersion = clientVersion) return proxy; else throw new VersionMismatch(protocol.getName(), clientVersion, serverVersion); Invoker的invoke实现如下:public Object invoke(Object proxy, Method method, Object args) throws Throwable final boolean logDebug = LOG.isDebugEnabled();long startTime = 0;if (logDebug) startTime = System.currentTimeMillis();/ 发起调用,注意其传递的参数ObjectWritable value = (ObjectWritable) client.call(new Invocation(method, args), address, method.getDeclaringClass(), ticket);if (logDebug) long callTime = System.currentTimeMillis() - startTime;LOG.debug(Call: + method.getName() + + callTime);return value.get();Server端反序列化执行该调用,Server端的Call实现如下:public Writable call(Class protocol, Writable param, long receivedTime) throws IOExcepti
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 架子工变更管理知识考核试卷含答案
- 贵金属首饰机制工安全综合评优考核试卷含答案
- 印前处理和制作员安全生产规范测试考核试卷含答案
- 光学计量员岗前安全知识考核试卷含答案
- 2024年湖南农业大学马克思主义基本原理概论期末考试题附答案
- 2024年郑州美术学院辅导员考试笔试真题汇编附答案
- 2024年邯郸职业技术学院马克思主义基本原理概论期末考试题附答案
- 2025年九江市特岗教师招聘真题题库附答案
- 2025北京市公务员考试公共基础知识题库及答案1套
- 2025年云南特殊教育职业学院辅导员招聘考试真题汇编附答案
- 食品安全管理制度打印版
- 多联机安装施工方案
- 煤矿副斜井维修安全技术措施
- 公共视频监控系统运营维护要求
- 河南省职工养老保险参保人员关键信息变更核准表
- 四川大学宣传介绍PPT
- 小学数学人教版六年级上册全册电子教案
- 液氨储罐区风险评估与安全设计
- 阿司匹林在一级预防中应用回顾
- 2023年福海县政务中心综合窗口人员招聘笔试模拟试题及答案解析
- GB/T 4103.10-2000铅及铅合金化学分析方法银量的测定
评论
0/150
提交评论