MINA2实用手册.docx_第1页
MINA2实用手册.docx_第2页
MINA2实用手册.docx_第3页
MINA2实用手册.docx_第4页
MINA2实用手册.docx_第5页
免费预览已结束,剩余12页可下载查看

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

MINA2实用手册作者:李庆丰 Email:MINA框架是对java的NIO包的一个封装,简化了NIO程序开发的难度,封装了很多底层的细节,然开发者把精力集中到业务逻辑上来,最近做了一个相关的项目,为了备忘对MINA做一个总结。一、 服务端初始化及参数配置MINA2初始化很简单。基本的初始化参数如下:/初始化Acceptor可以不指定线程数量,MINA2里面默认是CPU数量+2 NioSocketAcceptor acceptor = new NioSocketAcceptor(5); java.util.concurrent.Executor threadPool = Executors.newFixedThreadPool(1500);/建立线程池 /加入过滤器(Filter)到Acceptor acceptor.getFilterChain().addLast(exector, new ExecutorFilter(threadPool); /编码解码器acceptor.getFilterChain().addLast(codec, new ProtocolCodecFilter(new WebDecoder(),new XmlEncoder(); /日志 LoggingFilter filter = new LoggingFilter(); filter.setExceptionCaughtLogLevel(LogLevel.DEBUG); filter.setMessageReceivedLogLevel(LogLevel.DEBUG); filter.setMessageSentLogLevel(LogLevel.DEBUG); filter.setSessionClosedLogLevel(LogLevel.DEBUG); filter.setSessionCreatedLogLevel(LogLevel.DEBUG); filter.setSessionIdleLogLevel(LogLevel.DEBUG); filter.setSessionOpenedLogLevel(LogLevel.DEBUG); acceptor.getFilterChain().addLast(logger, filter); acceptor.setReuseAddress(true);/设置的是主服务监听的端口可以重用 acceptor.getSessionConfig().setReuseAddress(true);/设置每一个非主监听连接的端口可以重用 MINA2中,当启动一个服务端的时候,要设定初始化缓冲区的长度,如果不设置这个值,系统默认为2048,当客户端发过来的消息超过设定值的时候,MINA2的机制是分段接受的,将字符是放入缓冲区中读取,所以在读取消息的时候,需要判断有多少次。这样的好处就是可以节省通讯的流量。acceptor.getSessionConfig().setReceiveBufferSize(1024);/设置输入缓冲区的大小 acceptor.getSessionConfig().setSendBufferSize(10240);/设置输出缓冲区的大小 /设置为非延迟发送,为true则不组装成大包发送,收到东西马上发出 acceptor.getSessionConfig().setTcpNoDelay(true); /设置主服务监听端口的监听队列的最大值为100,如果当前已经有100个连接,再新的连接来将被服务器拒绝 acceptor.setBacklog(100); acceptor.setDefaultLocalAddress(new InetSocketAddress(port); /加入处理器(Handler)到Acceptor acceptor.setHandler(new YourHandler(); acceptor.bind(); 二、 初始化客户端客户端的初始化和服务器端其实是一样的,就是初始化类不一样,客户端是作为发送者的SocketConnector connector = new NioSocketConnector();connector.getFilterChain().addLast(codec,new ProtocolCodecFilter(new XmlCodecFactory(Charset1. .forName(charsetName), null, sertType);/指定线程池connector.getFilterChain().addLast(executor, new 、ExecutorFilter(); /指定业务处理类connector.setHandler(this);三、 处理流程NioSocketAcceptor是MINA的适配器,一切都是从这里开始的。MINA中有个过滤器和处理器的概念,过滤器用来过滤数据,处理器用来处理数据。具体来说MINA的处理模型就是request-过滤器A-过滤器B-处理器-过滤器B-过滤器A-response,这里的request和response类似serlvet的request和response。acceptor.getFilterChain().addLast(codec, new ProtocolCodecFilter(new WebDecoder(),new XmlEncoder(); /request-WebDecoder-XmlHander-WebEncode-response acceptor.getFilterChain().addLast(codec, new ProtocolCodecFilter(new WebDecoder(),new XmlEncoder();/这里是处理逻辑的关键部位,请求的处理都是在WebDecoder类和XmlEncoder类中处理,可以明显从命名上看出来一个是用来解码,另一个是用来编码,requet过来后先进入WebDecoder类(实现了ProtocolDecoder接口)进行解码处理,这里可以加入自己的逻辑把传进来的流解码成自己需要的信息。而XmlEncoder类(实现了ProtocolEncoder接口)是进行编码,在这个类里面加入自己的逻辑把处理后的信息组装发送给客户端(response)。而在解码和编码过程中XmlHander(扩展了IoHandlerAdapter抽象类)起到了处理器的作用。现在详细描述一下request-WebDecoder-XmlHander-WebEncode-response的过程:客户端发送一个请求到MINA服务器,这里相当于来了一个requet。请求首先来到WebDecoder类(实现了ProtocolDecoder接口)中的 boolean decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception方法 /* 参数in:用户请求信息全存在这里,读数据就从in这里读。 参数out:用来输出处理后的数据到Filter的下一个过滤器,如果没有过滤器了就输出到XmlHander,这里有点和 servelt的过滤器类似。利用out.write(Object object);这个函数可以把数据传到下一个Filter。我们可以自己定义一个对象,我们假设为Request,用它来传递消息,那末这里就可以写成out.write(new RequsetMessage(); 如果这个方法返回false,就是说当前逻辑包还没接收完(也就是当前的IoBuffer并没有包含足够的数据),需要再次 执行decode方法(再次获取新的IoBuffer),用来获取足够的数据。如果返回值为true就表示可以不执行decode方 法了,但是要激活handler方法,必须要调用out.write方法。 public class RequestMessage/这里什么也不做 */ */然后到XmlHander(扩展了IoHandlerAdapter抽象类)中的 void messageReceived(IoSession session, Object message) throws Exception方法 WriteFuture future = session.write(response);/session中必须加入这个代码,才会激活encode方法 future.addListener(IoFutureListener.CLOSE);/这个的作用是发送完毕后关闭连接,加了就是短连接,不然是长连接;在XmlHanler类中可以在重载sessionIdle方法,这个方法判断整个SOCKET连接通道是否空闲,可以再这里间隔(在服务店启动的时候设置idleTime)发送心跳包来保持各个长连接:/* *当网络通道空闲时此方法被调用,在这里可以判断是读空闲、写空闲还是两个都空闲,以便做出正确的处理 一般的网络通讯程序都要与服务器端保持长连接,所以这里可以发一下网络测试数据以保持与服务器端的连接 * param session 会话信息 * param status 状态 * throws Exception 异常 */ Overridepublic void sessionIdle(IoSession session, IdleStatus status) throws ExceptionIoFutureListener里面有个operationComplete(IoFuture future)方法,当流发送完成之后才调用这个方法。 /* 参数message:用来获取Filter传递过来的对象.对应代码RequestMessage request = (RequestMessage) message; 参数session:用来发送数据到Filter.对应代码session.write(new ResponseMessage(); public class ResponseMessage/这里什么也不做,假设存放处理后的数据 注意:对于一个MINA程序而言,对于XmlHander类只生成一个对象,所以要考虑线程安全问题 */然后到XmlEncoder类(实现了ProtocolEncoder接口)中的 boolean encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception 方法 /* 参数message:用来获取上一个Filter节点的数据或者处理器的数据(如果这个过滤器为最靠近处理器的那个) ResponseMessage response = (ResponseMessage)message; 参数out:用来输出数据到下一个Filter节点过或者到客户端,用out.write(Object encodedMessage)把数据发送 出去,但是要注意的是,如果这个Filter下一个节点如果是客户端的话,那个这个encodedMessage数据必须为 IoBuffer类型的,可以利用IoBuffer.wrap(byte byteArray)这个方法来格式化输出数据 */四、 大容量包的处理MINA2中(MINA2 RC版本,MINA2.0正式版已经发布)服务端接受数据默认有一定长度的缓冲区(可以在启动的时候设置)。那么对于大报文,怎么处理呢?比如说超过1024,甚至更多?MINA2为了节省网络流量,提高处理效率,会将大报文自动拆分(可能是存放MINA2中的缓冲区里面):比如2048字节的报文,就会拆分成两次;那么在接受的时候,就有一个如何判断是完整报文的问题,或者说是一个拆包组包的问题。 MINA2中初始化服务的时候是可以设置输入和输出的缓冲区的: acceptor.getSessionConfig().setReadBufferSize(1024);a) acceptor.getSessionConfig().setReadBufferSize(1024); MINA2提供的案例是,在IoSession中设置一个类似于session,存在在当前IoSession中的全局变量,在此IoSession中有效。 privatefinalAttributeKeyTEST=newAttributeKey(getClass(),TEST); 五、 private final AttributeKey TEST = new AttributeKey(getClass(), TEST); 大家都知道,通过 SOCKET TCP/IP传输过来的报文是不知道边界的,所以一般会约定在前端固定长度的字节加上报文长度,让SERVER来根据这个长度来确定整个报文的边界,在我前面的博文有提到。其实MINA2中有: prefixedDataAvailable(4) int 方法,来判断固定长度的报文长度,但是参数只能是1,2,4;该方法很好用。判断前四字节的整型值是否大于等于整个缓冲区的数据。可以方便的判断一次 messageReceived 过来的数据是否完整。(前提是自己设计的网络通讯协议前四字节等于发送数据的长度) ,如果你不是设定1,2,4字节来作为长度的话,那么就没辙了。 在你的解码操作中,MINA2的缓冲区发多少次报文,你的decode方法就会调用多少次。 上面设置了session之后,可以采用一个方法: /* * *paramsession *会话信息 *return返回session中的累积 */privateContextgetContext(IoSessionsession) Contextctx=(Context)session.getAttribute(CONTEXT); if(ctx=null) ctx=newContext(); session.setAttribute(CONTEXT,ctx); returnctx; /* param session会话信息return 返回session中的累积*/private Context getContext(IoSession session) Context ctx = (Context) session.getAttribute(CONTEXT);if (ctx = null) ctx = new Context();session.setAttribute(CONTEXT, ctx);return ctx;然后在你的decode方法中,首先从session取出数据对象,进行拼接: Contextctx=getContext(session); /先把当前buffer中的数据追加到Context的buffer当中 ctx.append(ioBuffer); /把position指向0位置,把limit指向原来的position位置 IoBufferbuf=ctx.getBuffer(); buf.flip();Context ctx = getContext(session);/ 先把当前buffer中的数据追加到Context的buffer当中ctx.append(ioBuffer);/ 把position指向0位置,把limit指向原来的position位置IoBuffer buf = ctx.getBuffer();buf.flip(); 接着读取每次报文的总长度: /读取消息头部分 bytebLeng=newbytepackHeadLength; buf.get(bLeng); intlength=-1; try length=Integer.parseInt(newString(bLeng); catch(NumberFormatExceptionex) ex.printStackTrace(); if(length0) ctx.setMsgLength(length); / 读取消息头部分byte bLeng = new bytepackHeadLength;buf.get(bLeng);int length = -1;try length = Integer.parseInt(new String(bLeng); catch (NumberFormatException ex) ex.printStackTrace();if (length 0) ctx.setMsgLength(length);在读取到每次报文的长度之后,就接着循环判断BUF里面的字节数据是否已经全部接受完毕了,如果没有接受完毕,那么就不处理;下面是完整处理的代码: while(buf.remaining()=packHeadLength) buf.mark(); /设置总长度 if(ctx.getMsgLength()0) ctx.setMsgLength(length); /读取消息头部分 intlength=ctx.getMsgLength(); /检查读取的包头是否正常,不正常的话清空buffer if(lengthmaxPackLength2) buf.clear(); out.write(ERROR!); break; /读取正常的消息包,并写入输出流中,以便IoHandler进行处理 elseif(lengthpackHeadLength&buf.remaining()=length) /完整的数据读取之后,就可以开始做你自己想做的操作了 else /如果消息包不完整 /将指针重新移动消息头的起始位置 buf.reset(); break; if(buf.hasRemaining()/如果有剩余的数据,则放入Session中 /将数据移到buffer的最前面 IoBuffertemp=IoBuffer.allocate(2048).setAutoExpand( true); temp.put(buf); temp.flip(); buf.clear(); buf.put(temp); else/如果数据已经处理完毕,进行清空 buf.clear(); while (buf.remaining() = packHeadLength) buf.mark();/ 设置总长度if (ctx.getMsgLength() 0) ctx.setMsgLength(length);/ 读取消息头部分int length = ctx.getMsgLength();/ 检查读取的包头是否正常,不正常的话清空bufferif (length maxPackLength2) buf.clear();out.write(ERROR!);break;/ 读取正常的消息包,并写入输出流中,以便IoHandler进行处理 else if (length packHeadLength & buf.remaining() = length) /完整的数据读取之后,就可以开始做你自己想做的操作了 else / 如果消息包不完整/ 将指针重新移动消息头的起始位置buf.reset();break;if (buf.hasRemaining() / 如果有剩余的数据,则放入Session中/ 将数据移到buffer的最前面IoBuffer temp = IoBuffer.allocate(2048).setAutoExpand(true);temp.put(buf);temp.flip();buf.clear();buf.put(temp); else / 如果数据已经处理完毕,进行清空buf.clear();为了便于操作,最好设置一个内部类: privateclassContext privatefinalCharsetDecoderdecoder; privateIoBufferbuf; privateintmsgLength=0; privateintoverflowPosition=0; /* * * */privateContext() decoder=charset.newDecoder(); buf=IoBuffer.allocate(80).setAutoExpand(true); /* * * *returnCharsetDecoder */publicCharsetDecodergetDecoder() returndecoder; /* * * *returnIoBuffer */publicIoBuffergetBuffer() returnbuf; /* * * *returnoverflowPosition */publicintgetOverflowPosition() returnoverflowPosition; /* * * *returnmatchCount */publicintgetMsgLength() returnmsgLength; /* * * *parammatchCount *报文长度 */publicvoidsetMsgLength(intmsgLength) this.msgLength=msgLength; /* * * */publicvoidreset() this.buf.clear(); this.overflowPosition=0; this.msgLength=0; this.decoder.reset(); /* * *paramin *输入流 */publicvoidappend(IoBufferin) getBuffer().put(in); 五 多个SOCKET通讯的处理在MINA2中两个SOCKET SERVER进行通讯,可以采用虚拟机内部的管道的方式。在MINA2的源码包里面自带了这个例子: IoAcceptoracceptor=newVmPipeAcceptor(); VmPipeAddressaddress=newVmPipeAddress(8080); /Setupserver acceptor.setHandler(newTennisPlayer(); acceptor.bind(address); /Connecttotheserver. VmPipeConnectorconnector=newVmPipeConnector(); connector.setHandler(newTennisPlayer(); ConnectFuturefuture=connector.connect(address); future.awaitUninterruptibly(); IoSessionsession=future.getSession(); /Sendthefirstpingmessage session.write(newTennisBall(10); /Waituntilthematchends. session.getCloseFuture().awaitUninterruptibly(); acceptor.unbind();六、 IoAcceptor acceptor = new VmPipeAcceptor();1. VmPipeAddress address = new VmPipeAddress(8080);2. / Set up server3. acceptor.setHandler(new TennisPlayer();4. acceptor.bind(address);5. / Connect to the server.6. VmPipeConnector connector = new VmPipeConnector();7. connector.setHandler(new TennisPlayer();8. ConnectFuture future = connector.connect(address);9. future.awaitUninterruptibly();10. IoSession session = future.getSession();11. / Send the first ping message12. session.write(new TennisBall(10);13. / Wait until the match ends.14. session.getCloseFuture().awaitUninterruptibly();15. acceptor.unbind();也可以将IoSession对方放入全局的线程安全的Map中去,当需要发送的时候根据KEY取出来,然后write出去。六 MIN2的BUG我们知道,在MINA2中,发送和接受时两个独立的工作线程,但是可以设置一个参数,当服务端发送消息之后同步读取客户端的返回: session.getConfig().setUseReadOperation(true);七、 session.getConfig().setUseReadOperation(true);近日,采用MINA2(RC)的同步读取方法,发现无法真的同步读取客户端的返回; 场景是:服务端发送一个消息给客户端,需要同步等待客户端的一个消息回执,然后服务端的程序继续执行; 但是实际在使用的时候这个设置无效。 sendSession.getConfig().setUseReadOperation(true); WriteFuturefuture=sendSession.write(xmlMsgBean);/发送数据 future.awaitUninterruptibly();/等待发送数据操作完成 if(future.getException()!=null) thrownewAppException(future.getException().getMessage(); if(future.isWritten() /数据已经被成功发送 logger.debug(数据已经被成功发送); ReadFuturereadFuture=sendSession.read(); readFuture.awaitUninterruptibly(); if(readFuture.getException()!=null) thrownewAppException(readFuture.getException().getMessage(); sendSession.getConfig().setUseReadOperation(false); return(XmlMsgBean)readFuture.getMessage().getStrErrMsg(); else /数据发送失败 logger.debug(数据发送失败); 后来用GOOGLE搜索了一下,发现在MINA的官网上,老外同样问了一个一模一样的问题,并且提了一个BUG上去,但是目前BUG的状态还是open; /jira/browse/DIRMINA-777 Imattemptingtoperformasynchronouswrite/readinademux-basedclientapplicationwithMINA2.0RC1,butitseemstogetstuck.Hereismycode: code publicbooleanlogin(finalStringusername,finalStringpassword) /blockinboundmessages session.getConfig().setUseReadOperation(true); /sendtheloginrequest finalLoginRequestloginRequest=newLoginRequest(username,password); finalWriteFuturewriteFuture=session.write(loginRequest); writeFuture.awaitUninterruptibly(); if(writeFuture.getException()!=null) session.getConfig().setUseReadOperation(false); returnfalse; /retrievetheloginresponse finalReadFuturereadFuture=session.read(); readFuture.awaitUninterruptibly(); if(readFuture.getException()!=null) session.getConfig().setUseReadOperation(false); returnfalse; /stopblockinginboundmessages session.getConfig().setUseReadOperation(false); /determineifthelogininfoprovidedwasvalid finalLoginResponseloginResponse=(LoginResponse)readFuture.getMessage(); returnloginResponse.getSuccess(); code IcanseeontheserversidethattheLoginRequestobjectisretrieved,andaLoginResponsemessageissent.Ontheclientside,theDemuxingProtocolCodecFactoryreceivestheresponse,butafterthrowinginsomelogging,IcanseethattheclientgetsstuckonthecalltoreadFuture.awaitUninterruptibly(). Icantforthelifeofmefigureoutwhyitisstuckherebaseduponmyowncode.Iproperlysetthereadoperationtotrueonthesessionconfig,meaningthatmessagesshouldbeblocked.However,itseemsasifthemessagenolongerexistsbytimeItrytoreadresponsemessagessynchronously.Im attempting to perform a synchronous write/read in a demux-based client application with MINA 2.0 RC1, but it seems to get stuck. Here is my code: code public boolean login(final String username, final String password) / block inbound messages session.getConfig().setUseReadOperation(true); / send the login re

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论