netty源代码解析(2)——客户端流程.doc_第1页
netty源代码解析(2)——客户端流程.doc_第2页
netty源代码解析(2)——客户端流程.doc_第3页
netty源代码解析(2)——客户端流程.doc_第4页
netty源代码解析(2)——客户端流程.doc_第5页
已阅读5页,还剩2页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

前一篇文章分析了netty的服务端流程,接下来分析一下客户端的大致流程,客户端启动代码如下Java代码1. ClientBootstrapbootstrap=newClientBootstrap(newNioClientSocketChannelFactory(Executors.newCachedThreadPool(),2. Executors.newCachedThreadPool();3. 4. bootstrap.setPipelineFactory(newChannelPipelineFactory()5. 6. Override7. publicChannelPipelinegetPipeline()throwsException8. ChannelPipelinepipleline=pipeline();9. pipleline.addLast(encode,newObjectEncoder(1048576*16);10. pipleline.addLast(decode,newObjectDecoder(1048576*16,ClassResolvers.weakCachingConcurrentResolver(null);11. pipleline.addLast(handler,handler);12. returnpipleline;13. 14. );15. 16. bootstrap.setOption(receiveBufferSize,1048576*64);17. bootstrap.setOption(child.tcpNoDelay,true);/关闭Nagle算法18. /tcp定期发送心跳包比如IM里边定期探测对方是否下线19. /只有tcp长连接下才有意义20. /bootstrap.setOption(child.keepAlive,true);21. ChannelFuturefuture=bootstrap.connect(newInetSocketAddress(address,port);22. Channelchannel=future.awaitUninterruptibly().getChannel();客户端事件处理顺序如下:UpStream.ChannelState.OPEN(已经open)DownStream.ChannelState.BOUND(需要绑定)DownStream.CONNECTED(需要连接)UpStream.ChannelState.BOUND(已经绑定)-UpStream.CONNECTED(连接成功)在connect的时候做了如下处理Java代码1. publicChannelFutureconnect(finalSocketAddressremoteAddress,finalSocketAddresslocalAddress)2. 3. if(remoteAddress=null)4. thrownewNullPointerException(remoteAddress);5. 6. 7. ChannelPipelinepipeline;8. try9. pipeline=getPipelineFactory().getPipeline();10. catch(Exceptione)11. thrownewChannelPipelineException(Failedtoinitializeapipeline.,e);12. 13. 14. /Settheoptions.先创建Channel15. Channelch=getFactory().newChannel(pipeline);16. ch.getConfig().setOptions(getOptions();17. 18. /Bind.19. if(localAddress!=null)20. ch.bind(localAddress);21. 22. 23. /Connect.再进行连接24. returnch.connect(remoteAddress);25. 首先要创建出ChannelJava代码1. NioClientSocketChannel(2. ChannelFactoryfactory,ChannelPipelinepipeline,3. ChannelSinksink,NioWorkerworker)4. 5. super(null,factory,pipeline,sink,newSocket(),worker);6. fireChannelOpen(this);7. 紧接着会fire一个ChannelOpen事件,Java代码1. if(channel.getParent()!=null)2. fireChildChannelStateChanged(channel.getParent(),channel);3. 4. 5. channel.getPipeline().sendUpstream(6. newUpstreamChannelStateEvent(7. channel,ChannelState.OPEN,Boolean.TRUE);这样会出发Upstream的ChannelState.OPEN事件。接下来要继续connect了Java代码1. if(remoteAddress=null)2. thrownewNullPointerException(remoteAddress);3. 4. ChannelFuturefuture=future(channel,true);5. channel.getPipeline().sendDownstream(newDownstreamChannelStateEvent(6. channel,future,ChannelState.CONNECTED,remoteAddress);7. returnfuture;这样就会出发Downstream的ChannelState.CONNECTED事件。接下来就要由NioClientSocketPipelineSink来进行处理了Java代码1. switch(state)2. caseOPEN:3. if(Boolean.FALSE.equals(value)4. channel.worker.close(channel,future);5. 6. break;7. caseBOUND:8. if(value!=null)9. bind(channel,future,(SocketAddress)value);10. else11. channel.worker.close(channel,future);12. 13. break;14. caseCONNECTED:15. if(value!=null)16. connect(channel,future,(SocketAddress)value);17. else18. channel.worker.close(channel,future);19. 20. break;21. caseINTEREST_OPS:22. channel.worker.setInterestOps(channel,future,(Integer)value).intValue();23. break;下面看下channel注册到worker的代码,连接的时候是在内部的一个Boss类里处理的所有的连接connect操作都被封装成一个RegisterTask对象,Boss类持有registerTask队列,在loop中不断的去进行selectJava代码1. privatestaticfinalclassRegisterTaskimplementsRunnable2. privatefinalBossboss;3. privatefinalNioClientSocketChannelchannel;4. 5. RegisterTask(Bossboss,NioClientSocketChannelchannel)6. this.boss=boss;7. this.channel=channel;8. 9. 10. publicvoidrun()11. try12. channel.socket.register(13. boss.selector,SelectionKey.OP_CONNECT,channel);14. catch(ClosedChannelExceptione)15. channel.worker.close(channel,succeededFuture(channel);16. 17. 18. intconnectTimeout=channel.getConfig().getConnectTimeoutMillis();19. if(connectTimeout0)20. channel.connectDeadlineNanos=System.nanoTime()+connectTimeout*1000000L;21. 22. 23. register方法Java代码1. voidregister(NioClientSocketChannelchannel)2. RunnableregisterTask=newRegisterTask(this,channel);3. Selectorselector;4. 5. synchronized(startStopLock)6. if(!started)7. /Openaselectorifthisworkerdidntstartyet.8. try9. this.selector=selector=Selector.open();10. catch(Throwablet)11. thrownewChannelException(12. Failedtocreateaselector.,t);13. 14. 15. /StarttheworkerthreadwiththenewSelector.16. booleansuccess=false;17. try18. DeadLockProofWorker.start(19. bossExecutor,20. newThreadRenamingRunnable(21. this,NewI/Oclientboss#+id+-+subId);22. success=true;23. finally24. if(!success)25. /ReleasetheSelectoriftheexecutionfails.26. try27. selector.close();28. catch(Throwablet)29. logger.warn(Failedtocloseaselector.,t);30. 31. this.selector=selector=null;32. /Themethodwillreturntothecalleratthispoint.33. 34. 35. else36. /Usetheexistingselectorifthisworkerhasbeenstarted.37. selector=this.selector;38. 39. 40. assertselector!=null&selector.isOpen();41. 42. started=true;43. booleanoffered=registerTaskQueue.offer(registerTask);44. assertoffered;45. RegisterTask,放到Boss类持有的registerTaskQueue之后,Boss类会从boss executer线程池中取出一个线程不断地处理队列、选择准备就绪的键等。然后run方法处理感兴趣的事件Java代码1. publicvoidrun()2. booleanshutdown=false;3. Selectorselector=this.selector;4. longlastConnectTimeoutCheckTimeNanos=System.nanoTime();5. for(;)6. wakenUp.set(false);7. 8. try9. intselectedKeyCount=selector.select(500);10. .11. 12. 13. processRegisterTaskQueue();14. 15. if(selectedKeyCount0)16. processSelectedKeys(selector.selectedKeys();17. 在loop中,processRegisterTaskQueue会处理需要注册的任务,processSelectedKeys处理连接事件Java代码1. privatevoidprocessSelectedKeys(SetselectedKeys)2. for(Iteratori=selectedKeys.iterator();i.hasNext();)3. SelectionKeyk=i.next();4. i.remove();5. 6. if(!k.

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论