Reactor模式和NIO.docx_第1页
Reactor模式和NIO.docx_第2页
Reactor模式和NIO.docx_第3页
Reactor模式和NIO.docx_第4页
Reactor模式和NIO.docx_第5页
已阅读5页,还剩2页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

当前分布式计算Web Services盛行天下,这些网络服务的底层都离不开对socket的操作。他们都有一个共同的结构:1. Read request2. Decode request3. Process service4. Encode reply5. Send reply经典的网络服务的设计如下图,在每个线程中完成对数据的处理:但这种模式在用户负载增加时,性能将下降非常的快。我们需要重新寻找一个新的方案,保持数据处理的流畅,很显然,事件触发机制是最好的解决办法,当有事件发生时,会触动handler,然后开始数据的处理。Reactor模式类似于AWT中的Event处理:Reactor模式参与者1.Reactor 负责响应IO事件,一旦发生,广播发送给相应的Handler去处理,这类似于AWT的thread2.Handler 是负责非堵塞行为,类似于AWT ActionListeners;同时负责将handlers与event事件绑定,类似于AWT addActionListener如图:Java的NIO为reactor模式提供了实现的基础机制,它的Selector当发现某个channel有数据时,会通过SlectorKey来告知我们,在此我们实现事件和handler的绑定。我们来看看Reactor模式代码:Java代码 1. public class Reactor implements Runnable 2.3. final Selector selector; 4. final ServerSocketChannel serverSocket; 5.6. Reactor(int port) throws IOException 7. selector = Selector.open(); 8. serverSocket = ServerSocketChannel.open(); 9. InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(),port); 10. serverSocket.socket().bind(address); 11.12. serverSocket.configureBlocking(false); 13. /向selector注册该channel 14. SelectionKey sk =serverSocket.register(selector,SelectionKey.OP_ACCEPT); 15.16. logger.debug(-Start serverSocket.register!); 17.18. /利用sk的attache功能绑定Acceptor 如果有事情,触发Acceptor 19. sk.attach(new Acceptor(); 20. logger.debug(-attach(new Acceptor()!); 21. 22.23.24. public void run() / normally in a new Thread 25. try 26. while (!Terrupted() 27. 28. selector.select(); 29. Set selected = selector.selectedKeys(); 30. Iterator it = selected.iterator(); 31. /Selector如果发现channel有OP_ACCEPT或READ事件发生,下列遍历就会进行。 32. while (it.hasNext() 33. /来一个事件 第一次触发一个accepter线程 34. /以后触发SocketReadHandler 35. dispatch(SelectionKey)(it.next(); 36. selected.clear(); 37. 38. catch (IOException ex) 39. logger.debug(reactor stop!+ex); 40. 41. 42.43. /运行Acceptor或SocketReadHandler 44. void dispatch(SelectionKey k) 45. Runnable r = (Runnable)(k.attachment(); 46. if (r != null) 47. / r.run(); 48.49. 50. 51.52. class Acceptor implements Runnable / inner 53. public void run() 54. try 55. logger.debug(-ready for accept!); 56. SocketChannel c = serverSocket.accept(); 57. if (c != null) 58. /调用Handler来处理channel 59. new SocketReadHandler(selector, c); 60. 61. catch(IOException ex) 62. logger.debug(accept stop!+ex); 63. 64. 65. 66. public class Reactor implements Runnablefinal Selector selector;final ServerSocketChannel serverSocket;Reactor(int port) throws IOException selector = Selector.open();serverSocket = ServerSocketChannel.open();InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(),port);serverSocket.socket().bind(address);serverSocket.configureBlocking(false);/向selector注册该channel SelectionKey sk =serverSocket.register(selector,SelectionKey.OP_ACCEPT);logger.debug(-Start serverSocket.register!);/利用sk的attache功能绑定Acceptor 如果有事情,触发Acceptorsk.attach(new Acceptor();logger.debug(-attach(new Acceptor()!);public void run() / normally in a new Threadtry while (!Terrupted()selector.select();Set selected = selector.selectedKeys();Iterator it = selected.iterator();/Selector如果发现channel有OP_ACCEPT或READ事件发生,下列遍历就会进行。while (it.hasNext()/来一个事件 第一次触发一个accepter线程/以后触发SocketReadHandlerdispatch(SelectionKey)(it.next();selected.clear();catch (IOException ex) logger.debug(reactor stop!+ex);/运行Acceptor或SocketReadHandlervoid dispatch(SelectionKey k) Runnable r = (Runnable)(k.attachment();if (r != null)/ r.run();class Acceptor implements Runnable / innerpublic void run() try logger.debug(-ready for accept!);SocketChannel c = serverSocket.accept();if (c != null)/调用Handler来处理channelnew SocketReadHandler(selector, c);catch(IOException ex) logger.debug(accept stop!+ex);以上代码中巧妙使用了SocketChannel的attach功能,将Hanlder和可能会发生事件的channel链接在一起,当发生事件时,可以立即触发相应链接的Handler。再看看Handler代码:Java代码 1. public class SocketReadHandler implements Runnable 2.3. public static Logger logger = Logger.getLogger(SocketReadHandler.class); 4.5. private Test test=new Test(); 6.7. final SocketChannel socket; 8. final SelectionKey sk; 9.10. static final int READING = 0, SENDING = 1; 11. int state = READING; 12.13. public SocketReadHandler(Selector sel, SocketChannel c) 14. throws IOException 15.16. socket = c; 17.18. socket.configureBlocking(false); 19. sk = socket.register(sel, 0); 20.21. /将SelectionKey绑定为本Handler 下一步有事件触发时,将调用本类的run方法。 22. /参看dispatch(SelectionKey k) 23. sk.attach(this); 24.25. /同时将SelectionKey标记为可读,以便读取。 26. erestOps(SelectionKey.OP_READ); 27. sel.wakeup(); 28. 29.30. public void run() 31. try 32. / test.read(socket,input); 33. readRequest() ; 34. catch(Exception ex) 35. logger.debug(readRequest error+ex); 36. 37. 38.39.40. /* 41. * 处理读取data 42. * param key 43. * throws Exception 44. */ 45. private void readRequest() throws Exception 46.47. ByteBuffer input = ByteBuffer.allocate(1024); 48. input.clear(); 49. try 50.51. int bytesRead = socket.read(input); 52.53. . 54.55. /激活线程池 处理这些request 56. requestHandle(new Request(socket,btt); 57.58. . 59.60.61. catch(Exception e) 62. 63.64. public class SocketReadHandler implements Runnable public static Logger logger = Logger.getLogger(SocketReadHandler.class);private Test test=new Test();final SocketChannel socket;final SelectionKey sk; static final int READING = 0, SENDING = 1;int state = READING;public SocketReadHandler(Selector sel, SocketChannel c)throws IOException socket = c;socket.configureBlocking(false); sk = socket.register(sel, 0);/将SelectionKey绑定为本Handler 下一步有事件触发时,将调用本类的run方法。/参看dispatch(SelectionKey k)sk.attach(this);/同时将SelectionKey标记为可读,以便读取。erestOps(SelectionKey.OP_READ);

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论