




版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
WebSocketHTTP一些课程中有讲到的相对高级的功能,如应用层心跳、ACK机制等。希望通过期末整体技术实现上的升级,你能更深刻地体会到IM系统升级前后,对使用方和 支持基于WebSocket支持消息推送的ACK支持客户端的心跳机制和双端的idleWebSocketHTTP接。为了方便Web端的演示,这里我建议你可以使用WebSocket来实现。WebSocket,JS(JavaScript)HTML5APIif(window.WebSocket)websocket=newwebsocket.onmessage=function(event) 6//websocket.onopen=function()
//websocket.onclose=function() //websocket.onerror=function() 23}else alert("您的浏览器不支持WebSocket页面打开时,JS先通过服务端的WebSocket地址建立长连接。要注意这里服务端连接的地址是ws://开头的,不是http://的了;如果是使用加密的WebSocket协议,那么相应的地址应该是以wss://开头的。建立长连之后,要针对创建的WebSocket对象进行的,我们只需要在各种API主要支持的几种有:长连接通道建立完成后,通过onopn来进行用户信息的上报绑定;通过onmessage,对接收到的所有该连接上的数据进行处理,也是最消息的处辑;,在接通生异误,连接被关闭时,可以分别通过onerror和onclose两个来进行处理。WebSocket(消息)。这个功能在实现上也非常简单,你只需要调用WebSocket对象的send方法就OK了。1varsendMsgJson='{"type":3,"data":{"senderUid":'+sender_id+23此外,针对WebSocket在服务端的实现,如果你是使用JVM(JavaVirtualMachine,Java虚拟机)系列语言的话,我推荐你使用比较成JavaNIO框架Netty来做实现。因为Netty本身对WebSocket的支持就很完善了,各种编器和WebSocket的处理采用Netty实现WebSocketServer的代码,你可以参考下面的示例代码EventLoopGroupbossGroupnewEpollEventLoopGroup(serverConfig.bossThreads,new3EventLoopGroupworkerGroupnewEpollEventLoopGroup(serverConfig.workerThreads,new67ServerBootstrapserverBootstrap=newServerBootstrap().group(bossGroup,8ChannelInitializer<SocketChannel>initializer=newprotectedvoidinitChannel(SocketChannelch)throwsExceptionChannelPipelinepipeline=//先添加WebSocket相关的编器和协议处理pipeline.addLast(newpipeline.addLast(newpipeline.addLast(newpipeline.addLast(newWebSocketServerProtocolHandler("/",null,////服务端添加一个idle处理器,如果一段时间Socketpipeline.addLast(newIdleStateHandler(0,0, 24首先创建服务器的ServerBootstrap对象。Netty作为服务端,从ServerBootstrap启动,ServerBootstrap对象主要用于在服务端的某一个端口进行,并接受客户端的连接着,通过ChannelInitializer对象,初始化连接管道中用于处理数据的各种编器和业务逻辑处理器。比如这里,我们就需要添加为了处理WebSocket协议相关的编最后,ServerBootstrapbindsync建立好WebSocket长连接后,我们再来看一下最的消息收发是怎么处理的WebSocketRouterHandlerprotectedvoidchannelRead0(ChannelHandlerContextctx,WebSocketFrameframe)throws//如果是文本类型的WebSocketif(frameinstanceofTextWebSocketFrame)//Stringmsg=((TextWebSocketFrame)//再用JSONJSONObjectmsgJson=inttype=JSONObjectdata= longsenderUid= longrecipientUid= Stringcontent= intmsgType= //调用业务层的Service MessageVOmessageContent=messageService.sendNewMsg(senderUid,recipientUid,(messageContent!=null)JSONObjectjsonObject=newjsonObject.put("type", 26
jsonObject.put("data",JSONObject.toJSON(messageContent));ctx.writeAndFlush(newTextWebSocketFrame(JSONObject.toJSONStrin}这里的WebSocketRouterHandler,我们也是采用机制来实现。由于这里需要处理“接收到”的消息,所以我们只需要实现channelRead0方法就可以。WebSocketFrameUIDwriteAndFlush因此,我们可以在messageService.sendNewMsg方法中,等待消息、未读变更都完1privatestaticfinalConcurrentHashMap<Long,Channel>userChannel=new2 protectedvoidchannelRead0(ChannelHandlerContextctx,WebSocketFrameframe)56longloginUid=7userChannel.put(loginUid,8}9publicvoidpushMsg(longrecipientUid,JSONObjectmessage)14
Channelchannel=if(channel!=null&&channel.isActive()&&{channel.writeAndFlush(new}publicclassNewMessageListenerimplementsMessageListenerpublicvoidonMessage(Messagemessage,byte[]pattern)Stringtopic=//从订阅到的RedisStringjsonMsg=("MessageReceived-->pattern:{},topic:{},message:{}",newJSONObjectmsgJson=//解析出消息接收人的longotherUid=JSONObjectpushJson=newpushJson.put("type",pushJson.put("data",//websocketRouterHandler.pushMsg(otherUid, 19publicMessageVOsendNewMsg(longsenderUid,longrecipientUid,Stringcontent,int//先对发送消息进行、加未读等操//然后将待推送消息发布到redisTemte.convertAndSend(Constants.WEBSOCKET_MSG_TOPIC,Redis在业务层进行发送消息逻辑处理的最后,会将这条消息发布到Redis的一个Topic中,这个Topic被NewMessageListener一直着,如果有消息发布,那么器会马上感知到,然后再将消息提交给WebSocketRouterHandler,来进行最终消息的下推。消息推送的我在“04|ACK机制:如何保证消息的可靠投递?”中有讲到,当系统有消息下推后,我们会依赖客户端响应的ACK包,来保证消息推送的可靠性。如果消息下推后一段时间,服务端没有收到客户端的ACK包,那么服务端会认为这条消息没有正常投递下去,就会触发关于publicvoidpushMsg(longrecipientUid,JSONObjectmessage)channel.writeAndFlush(new//消息推送下去后,将这条消息加入到待ACKaddMsgToAckBuffer(channel,5publicvoidaddMsgToAckBuffer(Channelchannel,JSONObjectmsgJson)nonAcked.put(msgJson.getLong("tid"),//定时器针对下推的这条消息在5s后进行"是否ACK"executorService.schedule(()->if(channel.isActive())//检查是否被ACK,如果没有收到ACKcheckAndResend(channel, },5000,15longtid=privatevoidcheckAndResend(Channelchannel,JSONObjectmsgJson)longtid=//重推2inttryTimes=while(tryTimes>0)if(nonAcked.containsKey(tid)&&tryTimes>0)channel.writeAndFlush(newtry}catch(InterruptedExceptione) 33用户在上线完成后,服务端会在这个连接维度的里,初始化一个起始值为0的序(tid),每当有消息推送给客户端时,服务端会针对这个序号进行加1操作,下推消息时消息推送后,服务端会将当前消息加入到一个“待ACKBuffer”中,这个ACKBuffer的实现,我们可以简单地用一个ConcurrentHashMap来实现,Key就是这条消息携带的序号,Value是消息本身。当消息加入到这个“待ACKBuffer”时,服务端会同时创建一个定时器,在一定的时间后,会触发“检查当前消息是否被ACK”的逻辑;如果客户端有回ACK,那么服务端就会从这个“待ACKBuffer”中移除这条消息,否则如果这条消息没有被ACK,那么就会触发在了解了如何通过WebSocket长连接,来完成最的消息收发功能之后,我们再来看8//每2varheartBeat= timeout:timeoutObj:serverTimeoutObj:reset:function() start:function()varself=this.timeoutObj=setTimeout(function()varsender_id=varsendMsgJson='{"type":0,"data":{"uid":'+sender_id+self.serverTimeoutObj=setTimeout(function()$("#ws_status").text("},}, 23客户端通过一个定时器,每2分钟通过长连接给服务端发送一次心跳包,如果在2分钟内接收到服务端的消息或者响应,那么客户端的下次2分钟定时器的计时,会进行重置,重新计算;如果发送的心跳包在2分钟后没有收到服务端的响应,客户端会断开当前protectedvoidchannelRead0(ChannelHandlerContextctx,WebSocketFrameframe)throwslonguid=longtimeout=("[heartbeat]:uid={},currenttimeoutis{}ms,channel={}",ctx.writeAndFlush(newTextWebSocketFrame("{\"type\":0,\"timeout\":"+timeout+7 所以,实战课程的设计和示例代码只能做到挂一漏万,我尽量通过最简化的代码,
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 高中性安全知识培训总结课件
- 高中女生工作课件
- 高一必修走进细胞课件
- 离婚双方房产租赁及物业管理协议范本
- 夫妻离婚房产分割及子女抚养、监护协议模板
- 离婚协议书中关于知识产权归属的详细范本
- 租赁合同担保法律风险防范与违约责任追究
- 教育培训机构租赁合同担保与教育资源整合协议
- 广告创意提案及代理落地合同
- 骨骼知识培训课件
- 银川文化园全民健身体育运动馆地块土壤污染状况调查报告
- 天然药物活性成分的发现与筛选课件
- 干部选拔任用全流程解析
- 明厨亮灶协议书
- 新药研究与开发技术 课件3.新药的工艺与质量研究
- 2025-2030中国工程监理行业市场深度调研及竞争格局与发展趋势研究报告
- “厂中厂”安全生产管理协议书(未修改版)7篇
- 《智能制造技术》课件 第4章 智能设计
- 类风湿关节炎健康教育
- 2025年装维智企工程师(三级)复习模拟100题及答案
- 中外建筑(教学设计)浙教版六年级上册综合实践活动
评论
0/150
提交评论