




版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
1、UDP传输工具类(serverclient)UDP不适合传输大数据,所以传输要尽量小。UDP传输中可能会丢包,如果需要可能多次发送同一个包保证包能安全到达;接收端可以对收到的包进行CRC校验,javaviewplaincopypackageorg.sl.udp.beans;.DatagramPacket;/* 处理udp请求的接口* authorshanl*/publicinterfaceIUdpRequestHandler/*解析请求数据包*paramrequestPack*/voidparse(DatagramPacketrequestPack);javaviewplaincopypack
2、ageorg.sl.udp.beans;importjava.io.ByteArrayInputStream;importjava.io.ByteArrayOutputStream;importjava.io.ObjectInputStream;importjava.io.ObjectOutputStream;importjava.io.PrintStream;importjava.util.Arrays;importjava.util.Map;importjava.util.Properties;importjava.util.Set;* 传输包格式<br/>* 具备头校验功能。
3、* authorshanl* /publicclassPacketFormat/*请求头,主要用于校验*/publicstaticfinalbyteREQUEST_HEADER=S,H,A,N;privatePropertiesprop=null;/*构造一个用于发送请求的包结构*/publicPacketFormat()p=newProperties();/*解析并从请求中加载数据* parambuff数据包* paramoffset偏移量* paramlen长度* returntrue:解析成功false:解析失败*/publicbooleanparse(bytebuff,
4、intoffset,intlen)booleandone=false;ByteArrayInputStreamdataCache=null;ObjectInputStreamobjectIn=null;ByteArrayInputStreampropCache=null;trydataCache=newByteArrayInputStream(buff,offset,len);byted_header=newbyteREQUEST_HEADER.length;dataCache.read(d_header);/校验请求头if(!Arrays.equals(d_header,REQUEST_HE
5、ADER)returnfalse;objectIn=newObjectInputStream(dataCache);/得到数据长度shortdataLen=objectIn.readShort();bytepropBys=newbytedataLen;objectIn.read(propBys);propCache=newByteArrayInputStream(propBys);/加载数据p.load(propCache);done=true;catch(Exceptione)done=false;finallytryif(null!=propCache)propCache.
6、close();catch(Exceptionex)tryif(null!=objectIn)objectIn.close();catch(Exceptionex)tryif(null!=dataCache)dataCache.close();catch(Exceptionex)returndone;/*设置数据*paramkey*paramvalue*/publicvoidsetProperty(Stringkey,Stringvalue)p.setProperty(key,value);/*取数据*paramkey*return*/publicStringgetProper
7、ty(Stringkey)p.getProperty(key,);/*返回keyset*return*/publicSet<Object>keySet()p.keySet();/*将内容转换为byte数组*return*/publicbytetoBytes()bytedataCacheBys=null;ByteArrayOutputStreamdataCache=null;ObjectOutputStreamdataOut=null;StringBuilderpropContent=newStringBuilder();byt
8、epropBys=null;Set<Map.Entry<Object,Object>>items=p.entrySet();for(Map.Entry<Object,Object>i:items)propContent.append(String)i.getKey();propContent.append(=);propContent.append(String)i.getValue();propContent.append(n);propBys=propContent.toString().getBytes();trydataCache=n
9、ewByteArrayOutputStream();dataCache.write(REQUEST_HEADER);dataOut=newObjectOutputStream(dataCache);/写入数据长度dataOut.writeShort(propBys.length);/写入数据dataOut.write(propBys,0,propBys.length);dataOut.flush();dataCacheBys=dataCache.toByteArray();catch(Exceptionex)thrownewRuntimeException(ex);finallytryif(n
10、ull!=dataOut)dataOut.close();catch(Exceptionex)if(null!=dataCache)dataCache.close();catch(Exceptionex)returndataCacheBys;/* 得到请求数据* parambuff数据缓存,缓存大小>=128* paramoffset偏移量* return包长度=请求头长度+2+数据* /publicintgetBytes(bytebuff,intoffset)intlen=0;/ByteArrayOutputStreampropCache=null;ByteArrayOutputStr
11、eamdataCache=null;ObjectOutputStreamdataOut=null;StringBuilderpropContent=newStringBuilder();bytepropBys=null;/propCache=newByteArrayOutputStream(buff.length-REQUEST_HEADER.length-2);/p.list(newPrintStream(propCache);/propBys=propCache.toByteArray();/catch(Exceptionex)/thrownewRuntimeExcepti
12、on(ex);/finally/try/if(null!=propCache)propCache.close();/catch(Exceptionex)/这段代码与上面这段功能相同,但Properties的list()会写入额外的字节Set<Map.Entry<Object,Object>>items=p.entrySet();for(Map.Entry<Object,Object>i:items)propContent.append(String)i.getKey();propContent.append(=);propContent.ap
13、pend(String)i.getValue();propContent.append(n);propBys=propContent.toString().getBytes();try/写入头dataCache=newByteArrayOutputStream(buff.length);dataCache.write(REQUEST_HEADER);len+=REQUEST_HEADER.length;dataOut=newObjectOutputStream(dataCache);/写入数据长度dataOut.writeShort(propBys.length);len+=2;/写入数据da
14、taOut.write(propBys,0,propBys.length);dataOut.flush();bytedataCacheBys=dataCache.toByteArray();System.arraycopy(dataCacheBys,0,buff,offset,dataCacheBys.length);len+=dataCacheBys.length;catch(Exceptionex)thrownewRuntimeException(ex);finallytryif(null!=dataOut)dataOut.close();catch(Exceptionex)tryif(n
15、ull!=dataCache)dataCache.close();catch(Exceptionex)returnlen;/* 返回crc32校验码* return* /longcrc32()CRC32crc32=newCRC32();crc32.update(toBytes();returncrc32.getValue();javaviewplaincopypackageorg.sl.udp.client;importjava.io.IOException;.DatagramPacket;.DatagramSocket;.SocketException;importjava.util.Ran
16、dom;importjava.util.concurrent.BlockingDeque;importjava.util.concurrent.LinkedBlockingDeque;/* UDP发送请求服务* authorshanl*/publicclassUDPSenderServiceprivateBlockingDeque<RequestObject>requestPool=null;privatelongintervalTime=200L;privateDatagramSocketudpSender=null;privatebooleanshutdown=true;pri
17、vateObjectlockObj=newObject();publicUDPSenderService()requestPool=newLinkedBlockingDeque<RequestObject>();/* parampoolSize请求缓冲池上限* paramintervalTime间隔时间* /publicUDPSenderService(intpoolSize,intintervalTime)requestPool=newLinkedBlockingDeque<RequestObject>(poolSize);ervalTime=inte
18、rvalTime;/*过程*/voidprocess()dolongnow=System.currentTimeMillis();trysynchronized(lockObj)lockObj.wait(intervalTime);if(!requestPool.isEmpty()send(now);catch(Exceptione)e.printStackTrace();while(!shutdown);/intc=0;privatevoidsend(longtime)throwsIOExceptionRequestObjectro=null;tryfor(inti=0;i<10;i+
19、)ro=requestPool.poll();/ro=requestPool.take();/ro=requestPool.pollFirst();if(null!=ro)if(ro.getTime()<=time)/System.out.println(c+);udpSender.send(ro.getDataPacket();elserequestPool.put(ro);/requestPool.offerLast(ro);elsebreak;catch(Exceptione)e.printStackTrace();* 重复多送发送一个udp请求* paramrequest请求包*
20、 paramrepeatCount重复次数* paraminterval重复发送请求包间隔* /publicvoidaddRepeatingRequest(DatagramPacketrequest,intrepeatCount,longinterval)addImmediateRequest(request);/timingRequest(System.currentTimeMillis(),request);bytereqData=newbyterequest.getLength()-request.getOffset();System.arraycopy(request.getData(
21、),request.getOffset(),reqData,request.getOffset(),request.getLength();longnow=System.currentTimeMillis();DatagramPacketdpClone=null;for(longi=0,nextTime=interval;i<repeatCount;i+,nextTime+=interval)dpClone=newDatagramPacket(reqData,request.getOffset(),request.getLength();dpClone.setSocketAddress(
22、request.getSocketAddress();addTimingRequest(now+nextTime,dpClone);/* 添加一个定时的请求,在一个近似的时间执行发送.<br/>* 如果这个请求为过期的请求,则会在下一个时间被执行.* paramtime* paramrequest* /publicvoidaddTimingRequest(longtime,DatagramPacketrequest)try/requestPool.offerLast(newRequestObject(time,request),300,TimeUnit.MILLISECONDS);
23、requestPool.put(newRequestObject(time,request);/requestPool.offerLast(newRequestObject(time,request);catch(Exceptione)e.printStackTrace();synchronized(lockObj)lockObj.notify();/try/Thread.sleep(1);/catch(InterruptedExceptione)/* 立即发送一个请求* paramrequest* /publicvoidaddImmediateRequest(DatagramPacketre
24、quest)udpSender.send(request);catch(Exceptionex)ex.printStackTrace();/*启动服务*/publicvoidstart()if(this.shutdown)tryif(null=udpSender)udpSender=newDatagramSocket();catch(SocketExceptione1)e1.printStackTrace();return;tryThreadt=newThread(newProcessService(),UDPSenderService-+newRandom().nextInt(999);t.
25、start();catch(Exceptione)thrownewRuntimeException(e.getMessage();this.shutdown=false;/*中止服务*/publicvoidshutdown()this.shutdown=true;tryThread.sleep(1000*10);catch(Exceptionex)trythis.udpSender.disconnect();catch(Exceptionex)trythis.udpSender.close();catch(Exceptionex)publicvoidsetDatagramSocket(Data
26、gramSocketsender)this.udpSender=sender;privateclassProcessServiceimplementsRunnablepublicvoidrun()process();staticclassRequestObjectprivateLongtime;privateDatagramPacketdataPacket=null;publicRequestObject(DatagramPacketdataPacket)this.time=System.currentTimeMillis();publicRequestObject(Longtime,Data
27、gramPacketdataPacket)this.time=time;this.dataPacket=dataPacket;publicLonggetTime()returntime;publicDatagramPacketgetDataPacket()returndataPacket;javaviewplaincopypackageorg.sl.udp.server;.DatagramPacket;.DatagramSocket;.InetSocketAddress;importorg.sl.udp.beans.IUdpRequestHandler;/* udp接收器* authorsha
28、nl* /publicclassUDPReceptorimplementsRunnableprivateStringhostname=localhost;privateintport=7777;privateintrecePacketSize=512;privateIUdpRequestHandlerrequestHandler=null;/*过程*/publicvoidrun()DatagramSocketudpRece=null;DatagramPacketdataPack=null;bytebuff=null;tryudpRece=newDatagramSocket(newInetSoc
29、ketAddress(this.hostname,this.port);udpRece.setReceiveBufferSize(this.recePacketSize);for(;)buff=newbytethis.recePacketSize;dataPack=newDatagramPacket(buff,this.recePacketSize);udpRece.receive(dataPack);if(null!=requestHandler)requestHandler.parse(dataPack);catch(Exceptionex)ex.printStackTrace();/*注
30、入请求处理* paramrequestHandler请求处理*/publicvoidsetRequestHandler(IUdpRequestHandlerrequestHandler)this.requestHandler=requestHandler;/*设置接收包大小* paramudpPacketSize*/publicvoidsetRecePacketSize(intudpPacketSize)this.recePacketSize=udpPacketSize;publicvoidsetHostname(Stringhostname)this.hostname=hostname;pu
31、blicvoidsetPort(intport)this.port=port;javaviewplaincopypackageorg.sl.udp.demo;.DatagramPacket;importjava.util.Set;importjava.util.concurrent.ExecutorService;importorg.sl.udp.beans.PacketFormat;importorg.sl.udp.beans.IUdpRequestHandler;publicclassDemoMultiThreadUDPRequestHandlerImplimplementsIUdpReq
32、uestHandlerprivateExecutorServicethreadPool=null;publicvoidparse(DatagramPacketrequestPack)threadPool.execute(newHandlerService(requestPack);publicvoidsetThreadPool(ExecutorServicethreadPool)this.threadPool=threadPool;privatestaticclassHandlerServiceextendsThreadprivateDatagramPacketrequestPack=null
33、;publicHandlerService(DatagramPacketrequestPack)this.requestPack=requestPack;publicvoidrun()tryPacketFormatreqPack=newPacketFormat();if(!reqPack.parse(requestPack.getData(),requestPack.getOffset(),requestPack.getLength()return;Set<Object>keyset=reqPack.keySet();System.out.println(value:+reqPac
34、k.getProperty(value);System.out.println(=time:+System.currentTimeMillis();for(Objectkey:keyset)System.out.println(key+:+reqPack.getProperty(String)key);catch(Exceptionex)ex.printStackTrace();javaviewplaincopypackageorg.sl.udp.demo;.DatagramPacket;importjava.util.Set;importorg.sl.udp.beans.PacketForm
35、at;importorg.sl.udp.beans.IUdpRequestHandler;publicclassDemoUDPRequestHandlerImplimplementsIUdpRequestHandlerprivateDatagramPacketrequestPack=null;publicvoidparse(DatagramPacketrequestPack)this.requestPack=requestPack;process();privatevoidprocess()tryPacketFormatreqPack=newPacketFormat();if(!reqPack
36、.parse(requestPack.getData(),requestPack.getOffset(),requestPack.getLength()return;Set<Object>keyset=reqPack.keySet();synchronized(UDPReceptorDemo.iset)UDPReceptorDemo.iset.add(Integer.parseInt(reqPack.getProperty(value);/System.out.println(value:+reqPack.getProperty(value);/System.out.println
37、(=time:+System.currentTimeMillis();/for(Objectkey:keyset)/System.out.println(key+:+reqPack.getProperty(String)key);/catch(Exceptionex)ex.printStackTrace();javaviewplaincopypackageorg.sl.udp.demo;importjava.util.HashSet;importjava.util.Set;importjava.util.concurrent.Executors;importorg.sl.udp.beans.I
38、UdpRequestHandler;importorg.sl.udp.server.UDPReceptor;publicclassUDPReceptorDemopublicstaticvoidmain(Stringargs)/t1();t2();t3();publicstaticSet<Integer>iset=newHashSet<Integer>(10000);staticvoidt3()for(;)System.out.println(iset.size();tryThread.sleep(3000);catch(InterruptedExceptione)e.p
39、rintStackTrace();staticvoidt2()System.err.println(启动udp监听,端口7777.);IUdpRequestHandlerhandlerImpl=newDemoUDPRequestHandlerImpl();UDPReceptorrece=newUDPReceptor();rece.setRequestHandler(handlerImpl);rece.setHostname(3);rece.setPort(7777);Threadt=newThread(rece,udpRece_port7777);t.start();/*
40、已普通实例启动*/staticvoidt1()System.err.println(启动udp监听,端口7777.);DemoMultiThreadUDPRequestHandlerImplhandlerImpl=newDemoMultiThreadUDPRequestHandlerImpl();handlerImpl.setThreadPool(Executors.newFixedThreadPool(5);UDPReceptorrece=newUDPReceptor();rece.setRequestHandler(handlerImpl);rece.setHostname(192.168
41、.2.23);rece.setPort(7777);rece.run();javaviewplaincopypackageorg.sl.udp.demo;.DatagramPacket;.DatagramSocket;.InetSocketAddress;.SocketException;importjava.util.Random;importorg.sl.udp.beans.PacketFormat;importorg.sl.udp.client.UDPSenderService;publicclassUDPSenderServiceDemopublicstaticvoidmain(Str
42、ingargs)/t1();/t2();/t3();t4();staticvoidt4()UDPSenderServiceudpSenderService=newUDPSenderService();udpSenderService.start();System.err.println(启动udp发送服务);Randomrand=newRandom();bytebuff=null;longtime=System.currentTimeMillis();DatagramPacketreq=null;PacketFormatpf=null;for(inti=0,endi=300;i<endi
43、;i+)/time=(rand.nextInt(100)+1)*100L+System.currentTimeMillis();pf=newPacketFormat();pf.setProperty(msg,test);pf.setProperty(msg,test);pf.setProperty(type,cpu);pf.setProperty(value,+i);buff=newbyte512;intpackLen=pf.getBytes(buff,0);req=newDatagramPacket(buff,0,packLen);/System.out.println(value:+pf.
44、getProperty(value);req.setSocketAddress(newInetSocketAddress(3,7777);/有一定几率丢包/for(intn=0;n<3;n+)/udpSenderService.addImmediateRequest(req);/多次重发保证可靠性/udpSenderService.addRepeatingRequest(req,2,1L);/有一定几率丢包udpSenderService.addTimingRequest(time,req);/try/if(i%10=0)/Thread.sleep(3);/rand
45、.setSeed(time);/catch(Exceptione)/tryThread.sleep(1000*60);catch(Exceptionex)udpSenderService.shutdown();System.err.println(udp服务关闭.);staticvoidt3()UDPSenderServiceudpSenderService=newUDPSenderService();udpSenderService.start();System.err.println(启动udp发送服务);Randomrand=newRandom();bytebuff=newbyte512
46、;longtime=0;for(inti=0,endi=3*100*100;i<endi;i+)PacketFormatpf=newPacketFormat();pf.setProperty(msg,test);pf.setProperty(msg,test);pf.setProperty(type,cpu);pf.setProperty(value,33.05);intpackLen=pf.getBytes(buff,0);DatagramPacketdata=newDatagramPacket(buff,0,packLen);data.setSocketAddress(newInet
47、SocketAddress(3,7777);time=(rand.nextInt(4)+1)*1000L+System.currentTimeMillis();/udpSenderService.timingRequest(time,data);udpSenderService.addTimingRequest(time,data);tryif(i%50=0)Thread.sleep(1);rand.setSeed(time);catch(Exceptione)tryThread.sleep(1000*10);catch(Exceptionex)udpSenderServ
48、ice.shutdown();System.err.println(udp服务关闭.);staticvoidt2()UDPSenderServiceudpSenderService=newUDPSenderService();try/InetSocketAddressserviceAddress=newInetSocketAddress(localhost,7777);/DatagramSocketudpSocket=newDatagramSocket(serviceAddress);/udpSenderService.setDatagramSocket(udpSocket);udpSenderService.start();System.err.printl
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 2025年中小学教师招聘考试预测题及答题技巧
- 2025年中央空调系统中级操作员考试模拟试题大全
- 抢救车核对课件
- 抢救药物速记讲解课件
- 2025年城市管道天然气项目建议书
- 抗震安全培训通知课件
- 2025年检重秤项目发展计划
- 2025年坦克玻璃系列项目合作计划书
- 2025年抗重症肌无力药项目建议书
- 黑龙江省鹤岗市绥滨县2025-2026学年八年级上学期开学考试生物试题 (含答案)
- 万用表使用方法课件
- 转基因生物安全审定程序
- 教学课件-现代酒店管理基础
- 日语作文細やかな(细小)幸せにも感謝の気持ち 讲义-高考日语二轮复习
- 2009-2022历年河南省郑州市市属事业单位公开招聘考试《行政职业能力测试》笔试试题含答案带详解2022-2023上岸资料汇编3
- 新老物业移交表格(全套)
- 改装课件b737增压系统终定版
- 中国石化集团公司油气田企业清洁生产评价指标体系
- 改造工程电气工程施工组织设计方案
- 非计划再次手术制度
- 辽宁医院明细.xls
评论
0/150
提交评论