免费预览已结束,剩余1页可下载查看
下载本文档
版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
JavaNIO处理长连接 最近恰好要写一个处理长连接的服务,接收日志包,然后打包成syslog形式再转发,所以在它的基础上改了一下。主要改了两个类,一个是Server,因为我们只关注read事件,所以write事件我们暂不处理。另外,在处理完ON_READ事件后,不能执行key.cancel()。Java代码 1. packagenioserver; 2. 3. .InetSocketAddress; 4. .ServerSocket; 5. importjava.nio.channels.SelectionKey; 6. importjava.nio.channels.Selector; 7. importjava.nio.channels.ServerSocketChannel; 8. importjava.nio.channels.SocketChannel; 9. importjava.util.Iterator; 10. importjava.util.LinkedList; 11. importjava.util.List; 12. importjava.util.Set; 13. 14. 15. /* 16. *Title:主控服务线程,采用NIO实现的长连接服务器 17. *authorsxl 18. *version1.0 19. */20. 21. publicclassServerimplementsRunnable 22. privatestaticSelectorselector; 23. privateServerSocketChannelsschannel; 24. privateInetSocketAddressaddress; 25. protectedNotifiernotifier; 26. privateintport; 27. 28. /* 29. *创建主控服务线程 30. *paramport服务端口 31. *throwsjava.lang.Exception 32. */33. publicstaticintMAX_THREADS=4; 34. publicServer(intport)throwsException 35. this.port=port; 36. 37. /获取事件触发器 38. notifier=Notifier.getNotifier(); 39. 40. /创建读写线程池 41. for(inti=0;i0) 69. SetselectedKeys=selector.selectedKeys(); 70. Iteratorit=selectedKeys.iterator(); 71. while(it.hasNext() 72. SelectionKeykey=(SelectionKey)it.next(); 73. it.remove(); 74. /处理IO事件 75. if(key.isAcceptable() 76. /Acceptthenewconnection 77. ServerSocketChannelssc=(ServerSocketChannel)key.channel(); 78. notifier.fireOnAccept(); 79. SocketChannelsc=ssc.accept(); 80. sc.configureBlocking(false); 81. /触发接受连接事件 82. Requestrequest=newRequest(sc); 83. notifier.fireOnAccepted(request); 84. /注册读操作,以进行下一步的读操作 85. sc.register(selector,SelectionKey.OP_READ,request); 86. 87. elseif(key.isReadable() 88. RcessRequest(key);/提交读服务线程读取客户端数据 89. 90. 91. 92. 93. catch(Exceptione) 94. continue; 95. 96. 97. 98. package nioserver;import .InetSocketAddress;import .ServerSocket;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.LinkedList;import java.util.List;import java.util.Set;/* * Title: 主控服务线程,采用NIO实现的长连接服务器 * author sxl * version 1.0 */public class Server implements Runnable private static Selector selector; private ServerSocketChannel sschannel; private InetSocketAddress address; protected Notifier notifier; private int port; /* * 创建主控服务线程 * param port 服务端口 * throws java.lang.Exception */ public static int MAX_THREADS = 4; public Server(int port) throws Exception this.port = port; / 获取事件触发器 notifier = Notifier.getNotifier(); / 创建读写线程池 for (int i = 0; i 0) Set selectedKeys = selector.selectedKeys(); Iterator it = selectedKeys.iterator(); while (it.hasNext() SelectionKey key = (SelectionKey) it.next(); it.remove(); / 处理IO事件 if (key.isAcceptable() / Accept the new connection ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); notifier.fireOnAccept(); SocketChannel sc = ssc.accept(); sc.configureBlocking(false); / 触发接受连接事件 Request request = new Request(sc); notifier.fireOnAccepted(request); / 注册读操作,以进行下一步的读操作 sc.register(selector, SelectionKey.OP_READ, request); else if (key.isReadable() RcessRequest(key); / 提交读服务线程读取客户端数据 catch (Exception e) continue; 另一个改动的类是Reader,改变对-1的处理,这里不是break,而是抛出异常。在读取完buffer的数据后,将数据包传递给另外两个线程进行syslog的发送和入库操作。Java代码 1. packagenioserver; 2. 3. importjava.io.IOException; 4. importjava.nio.ByteBuffer; 5. importjava.nio.channels.SelectionKey; 6. importjava.nio.channels.SocketChannel; 7. importjava.util.LinkedList; 8. importjava.util.List; 9. 10. /* 11. *Title:读线程 12. *Description:该线程用于读取客户端数据 13. *authorsxl 14. *version1.0 15. */16. 17. publicclassReaderextendsThread 18. privatestaticListpool=newLinkedList(); 19. privatestaticNotifiernotifier=Notifier.getNotifier(); 20. 21. publicvoidrun() 22. while(true) 23. try 24. SelectionKeykey; 25. synchronized(pool) 26. while(pool.isEmpty() 27. pool.wait(); 28. 29. key=(SelectionKey)pool.remove(0); 30. 31. /读取数据 32. read(key); 33. 34. catch(Exceptione) 35. continue; 36. 37. 38. 39. 40. /* 41. *读取客户端发出请求数据 42. *paramsc套接通道 43. */44. privatestaticintBUFFER_SIZE=1024; 45. publicstaticbytereadRequest(SocketChannelsc)throwsIOException 46. ByteBufferbuffer=ByteBuffer.allocate(BUFFER_SIZE); 47. intoff=0; 48. intr=0; 49. bytedata=newbyteBUFFER_SIZE*10; 50. while(true) 51. buffer.clear(); 52. r=sc.read(buffer); 53. if(r=0)break; 54. if(r=-1)/如果是流的末尾,则抛出异常 55. thrownewIOException(); 56. if(off+r)data.length)/数组扩容 57. data=grow(data,BUFFER_SIZE*10); 58. 59. bytebuf=buffer.array(); 60. System.arraycopy(buf,0,data,off,r); 61. off+=r; 62. 63. bytereq=newbyteoff; 64. System.arraycopy(data,0,req,0,off); 65. returnreq; 66. 67. 68. /* 69. *处理连接数据读取 70. *paramkeySelectionKey 71. */72. publicvoidread(SelectionKeykey) 73. SocketChannelsc=null; 74. try 75. /读取客户端数据 76. sc=(SocketChannel)key.channel(); 77. byteclientData=readRequest(sc); 78. if(clientData.length0)/有数据才处理 79. Requestrequest=(Request)key.attachment(); 80. request.setDataInput(clientData); 81. /提交到数据库写入线程 82. WcessRequest(request); 83. /提交到Syslog线程,发送syslog 84. ScessRequest(request); 85. 86. 87. catch(Exceptione) 88. if(sc!=null) 89. try 90. sc.socket().close(); 91. sc.close(); 92. catch(IOExceptione1) 93. e1.printStackTrace(); 94. 95. 96. 97. 98. /* 99. *处理客户请求,管理用户的联结池,并唤醒队列中的线程进行处理 100. */101. publicstaticvoidprocessRequest(SelectionKeykey) 102. synchronized(pool) 103. pool.add(pool.size(),key); 104. pool.notifyAll(); 105. 106. 107. 108. /* 109. *数组扩容 110. *paramsrcbyte源数组数据 111. *paramsizeint扩容的增加量 112. *returnbyte扩容后的数组 113. */114. publicstaticbytegrow(bytesrc,intsize) 115. bytetmp=newbytesrc.length+size; 116. System.arraycopy(src,0,tmp,0,src.length); 117. returntmp; 118. 119. package nioserver;import java.io.IOException;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.SocketChannel;import java.util.LinkedList;import java.util.List;/* * Title: 读线程 * Description: 该线程用于读取客户端数据 * author sxl * version 1.0 */public class Reader extends Thread private static List pool = new LinkedList(); private static Notifier notifier = Notifier.getNotifier(); public void run() while (true) try SelectionKey key; synchronized (pool) while (pool.isEmpty() pool.wait(); key = (SelectionKey) pool.remove(0); / 读取数据 read(key); catch (Exception e) continue; /* * 读取客户端发出请求数据 * param sc 套接通道 */ private static int BUFFER_SIZE = 1024; public static byte readRequest(SocketChannel sc) throws IOException ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE); int off = 0; int r = 0; byte data = new byteBUFFER_SIZE * 10; while ( true ) buffer.clear(); r = sc.read(buffer); if(r = 0) break; if(r = -1)/如果是流的末尾,则抛出异常 throw new IOException(); if (off + r) data.length) /数组扩容 data = grow(data, BUFFER_SIZE * 10); byte buf = buffer.array(); System.arraycopy(buf, 0, data, off, r); off += r; byte req = new byteoff; System.arraycopy(data, 0, req, 0, off); return req; /* * 处理连接数据读取 * param key SelectionKey */ public void read(SelectionKey key) SocketChannel sc = null; try / 读取客户端数据 sc = (SocketChannel) key.channel(); byte clientData = readRequest(sc); if(clientData.length 0)/有数据才处理 Request request = (Request)key.attachment(); request.setDataInput(clientData); / 提交到数据库写入线程 WcessRequest(request); / 提交到Syslog线程,发送syslog ScessReq
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 2025广西北海市中日友谊中学秋季学期教师招聘2人备考公基题库附答案解析
- 2025广东南粤银行重庆分行招聘历年真题汇编附答案解析
- 参考题库附答案解析
- 2025北京市房山区卫生健康委员会所属事业单位招聘高层次专业技术人才9人模拟试卷附答案解析
- 2026中信证券福建分公司校园招聘备考题库附答案解析
- 2026包头市青山区教育系统校园招聘33人(哈尔滨师范大学考点)笔试备考试卷附答案解析
- 2025广东深圳市公安局第招聘警务辅助人员2356人(十三批)笔试备考试卷带答案解析
- 2025四川省第四地质大队下半年考核招聘工作人员10人历年真题汇编带答案解析
- 2025年下半年芜湖市眼科医院招聘事业编制工作人员4人参考题库带答案解析
- 2025玉溪市峨山县林业和草原局招聘短期综合应急救援队员(11人)模拟试卷附答案解析
- GB/T 5296.1-2012消费品使用说明第1部分:总则
- 《高分子化学》课件
- 世界卫生组织(who)饮用水水质标准
- 保险业反洗钱培训
- 《笔算除法三位数除以一位数》-完整版课件
- (转正申请书)护士入职转正申请书5篇
- 小学生简笔画社团活动记录
- 财务人员登记表参考模板范本
- 教学课件 金属学与热处理-崔忠圻
- Q∕SY 1180.3-2014 管道完整性管理规范 第3部分:管道风险评价
- 安全工器具介绍学习培训课件
评论
0/150
提交评论