




已阅读5页,还剩2页未读, 继续免费阅读
版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
前一篇文章分析了netty的服务端流程,接下来分析一下客户端的大致流程,客户端启动代码如下Java代码1. ClientBootstrapbootstrap=newClientBootstrap(newNioClientSocketChannelFactory(Executors.newCachedThreadPool(),2. Executors.newCachedThreadPool();3. 4. bootstrap.setPipelineFactory(newChannelPipelineFactory()5. 6. Override7. publicChannelPipelinegetPipeline()throwsException8. ChannelPipelinepipleline=pipeline();9. pipleline.addLast(encode,newObjectEncoder(1048576*16);10. pipleline.addLast(decode,newObjectDecoder(1048576*16,ClassResolvers.weakCachingConcurrentResolver(null);11. pipleline.addLast(handler,handler);12. returnpipleline;13. 14. );15. 16. bootstrap.setOption(receiveBufferSize,1048576*64);17. bootstrap.setOption(child.tcpNoDelay,true);/关闭Nagle算法18. /tcp定期发送心跳包比如IM里边定期探测对方是否下线19. /只有tcp长连接下才有意义20. /bootstrap.setOption(child.keepAlive,true);21. ChannelFuturefuture=bootstrap.connect(newInetSocketAddress(address,port);22. Channelchannel=future.awaitUninterruptibly().getChannel();客户端事件处理顺序如下:UpStream.ChannelState.OPEN(已经open)DownStream.ChannelState.BOUND(需要绑定)DownStream.CONNECTED(需要连接)UpStream.ChannelState.BOUND(已经绑定)-UpStream.CONNECTED(连接成功)在connect的时候做了如下处理Java代码1. publicChannelFutureconnect(finalSocketAddressremoteAddress,finalSocketAddresslocalAddress)2. 3. if(remoteAddress=null)4. thrownewNullPointerException(remoteAddress);5. 6. 7. ChannelPipelinepipeline;8. try9. pipeline=getPipelineFactory().getPipeline();10. catch(Exceptione)11. thrownewChannelPipelineException(Failedtoinitializeapipeline.,e);12. 13. 14. /Settheoptions.先创建Channel15. Channelch=getFactory().newChannel(pipeline);16. ch.getConfig().setOptions(getOptions();17. 18. /Bind.19. if(localAddress!=null)20. ch.bind(localAddress);21. 22. 23. /Connect.再进行连接24. returnch.connect(remoteAddress);25. 首先要创建出ChannelJava代码1. NioClientSocketChannel(2. ChannelFactoryfactory,ChannelPipelinepipeline,3. ChannelSinksink,NioWorkerworker)4. 5. super(null,factory,pipeline,sink,newSocket(),worker);6. fireChannelOpen(this);7. 紧接着会fire一个ChannelOpen事件,Java代码1. if(channel.getParent()!=null)2. fireChildChannelStateChanged(channel.getParent(),channel);3. 4. 5. channel.getPipeline().sendUpstream(6. newUpstreamChannelStateEvent(7. channel,ChannelState.OPEN,Boolean.TRUE);这样会出发Upstream的ChannelState.OPEN事件。接下来要继续connect了Java代码1. if(remoteAddress=null)2. thrownewNullPointerException(remoteAddress);3. 4. ChannelFuturefuture=future(channel,true);5. channel.getPipeline().sendDownstream(newDownstreamChannelStateEvent(6. channel,future,ChannelState.CONNECTED,remoteAddress);7. returnfuture;这样就会出发Downstream的ChannelState.CONNECTED事件。接下来就要由NioClientSocketPipelineSink来进行处理了Java代码1. switch(state)2. caseOPEN:3. if(Boolean.FALSE.equals(value)4. channel.worker.close(channel,future);5. 6. break;7. caseBOUND:8. if(value!=null)9. bind(channel,future,(SocketAddress)value);10. else11. channel.worker.close(channel,future);12. 13. break;14. caseCONNECTED:15. if(value!=null)16. connect(channel,future,(SocketAddress)value);17. else18. channel.worker.close(channel,future);19. 20. break;21. caseINTEREST_OPS:22. channel.worker.setInterestOps(channel,future,(Integer)value).intValue();23. break;下面看下channel注册到worker的代码,连接的时候是在内部的一个Boss类里处理的所有的连接connect操作都被封装成一个RegisterTask对象,Boss类持有registerTask队列,在loop中不断的去进行selectJava代码1. privatestaticfinalclassRegisterTaskimplementsRunnable2. privatefinalBossboss;3. privatefinalNioClientSocketChannelchannel;4. 5. RegisterTask(Bossboss,NioClientSocketChannelchannel)6. this.boss=boss;7. this.channel=channel;8. 9. 10. publicvoidrun()11. try12. channel.socket.register(13. boss.selector,SelectionKey.OP_CONNECT,channel);14. catch(ClosedChannelExceptione)15. channel.worker.close(channel,succeededFuture(channel);16. 17. 18. intconnectTimeout=channel.getConfig().getConnectTimeoutMillis();19. if(connectTimeout0)20. channel.connectDeadlineNanos=System.nanoTime()+connectTimeout*1000000L;21. 22. 23. register方法Java代码1. voidregister(NioClientSocketChannelchannel)2. RunnableregisterTask=newRegisterTask(this,channel);3. Selectorselector;4. 5. synchronized(startStopLock)6. if(!started)7. /Openaselectorifthisworkerdidntstartyet.8. try9. this.selector=selector=Selector.open();10. catch(Throwablet)11. thrownewChannelException(12. Failedtocreateaselector.,t);13. 14. 15. /StarttheworkerthreadwiththenewSelector.16. booleansuccess=false;17. try18. DeadLockProofWorker.start(19. bossExecutor,20. newThreadRenamingRunnable(21. this,NewI/Oclientboss#+id+-+subId);22. success=true;23. finally24. if(!success)25. /ReleasetheSelectoriftheexecutionfails.26. try27. selector.close();28. catch(Throwablet)29. logger.warn(Failedtocloseaselector.,t);30. 31. this.selector=selector=null;32. /Themethodwillreturntothecalleratthispoint.33. 34. 35. else36. /Usetheexistingselectorifthisworkerhasbeenstarted.37. selector=this.selector;38. 39. 40. assertselector!=null&selector.isOpen();41. 42. started=true;43. booleanoffered=registerTaskQueue.offer(registerTask);44. assertoffered;45. RegisterTask,放到Boss类持有的registerTaskQueue之后,Boss类会从boss executer线程池中取出一个线程不断地处理队列、选择准备就绪的键等。然后run方法处理感兴趣的事件Java代码1. publicvoidrun()2. booleanshutdown=false;3. Selectorselector=this.selector;4. longlastConnectTimeoutCheckTimeNanos=System.nanoTime();5. for(;)6. wakenUp.set(false);7. 8. try9. intselectedKeyCount=selector.select(500);10. .11. 12. 13. processRegisterTaskQueue();14. 15. if(selectedKeyCount0)16. processSelectedKeys(selector.selectedKeys();17. 在loop中,processRegisterTaskQueue会处理需要注册的任务,processSelectedKeys处理连接事件Java代码1. privatevoidprocessSelectedKeys(SetselectedKeys)2. for(Iteratori=selectedKeys.iterator();i.hasNext();)3. SelectionKeyk=i.next();4. i.remove();5. 6. if(!k.
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 车床操作规程培训
- 少先队大队委培训
- 二零二五年电商代运营品牌形象授权使用合同
- 二零二五年度变电工程高空作业安全防护合同
- 二零二五年度互联网+农业服务平台建设合同
- 二零二五年度材料代购及绿色环保验收合同范本
- 二零二五年度船舶抵押贷款合同规范文本
- 二零二五年新型电商代运营服务合同示范文本
- 二零二五年度物流产业贷款融资居间合同
- 2025版智能建筑系统设计咨询中介服务合同
- 留疆战士考试题库及答案
- 16J914-1 公用建筑卫生间
- GB/T 7324-2010通用锂基润滑脂
- GB/T 20000.1-2014标准化工作指南第1部分:标准化和相关活动的通用术语
- 哲学导论(完整版)
- 气瓶检验站乙炔瓶检验
- 工艺美术专业人才培养方案调研报告
- 《上海市城镇职工基本医疗保险综合减负申请表》
- 汉语拼音发音表(适合初学者和老年人)
- 购物中心商场商户促销活动管理制度
- 中国工商银行个人贷款申请表
评论
0/150
提交评论