




免费预览已结束,剩余3页可下载查看
下载本文档
版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
JMS的一些代码文章分类:Java编程 通过这里的例子,可以把消息进行分组,然后进行分组的方式进行发送出去,即相当于不同级别的用户可以收取到不同的消息 现在对于消息的基于推的方式还没有想明白,即服务器向客户端推送消息还不是太明白 监听的方式是可以实现,但这是必须先运行接收端 在发送端代码可以使用 producer.send(message); producer.send(message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); 在接收端则可以使用consumer.setMessageListener(this);consumer.receive();consumer.receive(Long Timeout); 第一个例子是发布订阅者的例子,使用了选择器(选择器的选择是通过发布者里设置的属性来进行的) 这里使用了Tomcat里的JNDI,使用了ActiveMQ,但在程序里是使用标准来进行编写的,和具体使用的何种MQ没有关系,更换MQ,只需更改JNDI,当然这里没有涉及到分布式的问题,貌似分布式还需要使用JTA的xa吧 发送消息代码: Java代码 1. packagecom.test; 2. 3. importjavax.jms.*; 4. importjavax.naming.*; 5. 6. publicclassTestSendSelectorMain 7. 8. 9. /* 10. *如果客户端需要接收主题上所有发布的消息,包括那些订阅者不活动期间发布的消 11. 息,那么就要使用持久化的TopicSubscriber.持久化的TopicSubscriber可以被Session或者被 12. TopicSession创建。JMS保持这个持久化订阅的记录并且确保所有来自于主题发布者的消息 13. 都会被保持到被持久化订阅者确认,或者过期。 14. */15. publicvoidsendMessage() 16. try 17. InitialContextinitCtx=newInitialContext(); 18. ContextenvContext=(Context)initCtx.lookup(java:comp/env); 19. ConnectionFactoryconnectionFactory=(ConnectionFactory)envContext.lookup(jms/ConnectionFactory); 20. Connectionconnection=connectionFactory.createConnection(); 21. Sessionsession=connection.createSession(false,1); 22. MessageProducerproducer=session.createProducer(Destination)envContext.lookup(jms/topic/MyTopic); 23. MessagetestMessage=session.createMessage(); 24. /设置所需传输的属性,可以使用选择器进行选择 25. testMessage.setStringProperty(testKey,testValue); 26. testMessage.setStringProperty(JMSXGroupID,QS_10/3/11); 27. testMessage.setJMSType(car); 28. producer.send(testMessage); 29. System.out.println(SendEnd); 30. catch(NamingExceptione) 31. System.out.println(NamingException); 32. e.printStackTrace(); 33. catch(JMSExceptione) 34. System.out.println(JMSException); 35. e.printStackTrace(); 36. 37. 38. package com.test;import javax.jms.*;import javax.naming.*;public class TestSendSelectorMain/* * 如果客户端需要接收主题上所有发布的消息,包括那些订阅者不活动期间发布的消息,那么就要使用持久化的TopicSubscriber.持久化的TopicSubscriber可以被Session或者被TopicSession创建。JMS保持这个持久化订阅的记录并且确保所有来自于主题发布者的消息都会被保持到被持久化订阅者确认,或者过期。 */ public void sendMessage() try InitialContext initCtx = new InitialContext(); Context envContext = (Context)initCtx.lookup(java:comp/env); ConnectionFactory connectionFactory = (ConnectionFactory)envContext.lookup(jms/ConnectionFactory); Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(false, 1); MessageProducer producer = session.createProducer(Destination)envContext.lookup(jms/topic/MyTopic); Message testMessage = session.createMessage(); /设置所需传输的属性,可以使用选择器进行选择 testMessage.setStringProperty(testKey, testValue); testMessage.setStringProperty(JMSXGroupID, QS_10/3/11); testMessage.setJMSType(car); producer.send(testMessage); System.out.println(Send End); catch(NamingException e) System.out.println(NamingException); e.printStackTrace(); catch(JMSException e) System.out.println(JMSException); e.printStackTrace(); 下面是接收消息代码: package com.test; Java代码 1. importjavax.jms.*; 2. importjavax.naming.*; 3. 4. /* 5. *异步接收消息(并且可通过类似sql的where条件语句的方式进行选择消息) 6. *如果没有消息,则会进行等待,直到获取消息为止 7. * 8. */9. publicclassTestReceiveSelectorMain 10. 11. /* 12. *1、使用JNDI查找一个ConnectionFactory对象。 13. *2、使用JNDI查找一个或者多个Destination对象。 14. *3、使用ConnectionFactory创建一个JMS连接 15. *4、使用连接创建一个或者多个JMSSessions 16. *5、使用SessionandDestinations创建所需的MessageProducers和MessageConsumers 17. *6、告知Connection开始传送消息。 18. *throwsNamingException 19. *throwsJMSException 20. */21. publicvoidreceiveMessage()throwsNamingException,JMSException 22. InitialContextinitCtx=newInitialContext(); 23. ContextenvContext=(Context)initCtx.lookup(java:comp/env); 24. ConnectionFactoryconnectionFactory=(ConnectionFactory)envContext.lookup(jms/ConnectionFactory); 25. /连接工厂创建一个jmsconnection 26. Connectionconnection=connectionFactory.createConnection(); 27. connection.setClientID(testMessageDurableSubscriber); 28. connection.start(); 29. 30. /是生产和消费的一个单线程上下文。会话用于创建消息的生产者,消费者和消息。会话提供了一个事务性的上下文。 31. Sessionsession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);/不支持事务 32. /目的地是客户用来指定他生产消息的目标还有他消费消息的来源的对象,两种消息传递方式:点对点和发布/订阅 33. /会话创建消息的生产者将消息发送到目的地 34. 35. MessageConsumerconsumer=session.createDurableSubscriber(Topic)envContext.lookup(jms/topic/MyTopic),name,JMSType=carANDtestKey=testValue,false); 36. /使用createDurableSubscriber方法之后,当主题里有消息时,receive()方法则进行一条消息一条消息取出来 37. /阻塞式监听consumer.receive() 38. /当主题里没有消息时,则一直等待发送消息 39. /队列的receive(longtimeout)方法用与一次获取一条消息, 40. /当没有消息时,等待一段时间就结束而不会像receive()等待 41. MessagetestMessage=consumer.receive(); 42. if(testMessage!=null) 43. StringreturnMessage=testMessage.getStringProperty(testKey); 44. System.out.println(newStringBuilder(-returnMessage=).append(returnMessage).toString(); 45. 46. session.close(); 47. connection.close(); 48. System.out.println(ReciveEnd); 49. 50. import javax.jms.*;import javax.naming.*;/* * 异步接收消息(并且可通过类似sql的where条件语句的方式进行选择消息) * 如果没有消息,则会进行等待,直到获取消息为止 * */public class TestReceiveSelectorMain/* * 1、使用JNDI查找一个ConnectionFactory对象。 * 2、使用JNDI查找一个或者多个Destination对象。 * 3、使用ConnectionFactory创建一个JMS连接 * 4、使用连接创建一个或者多个JMS Sessions * 5、使用Session and Destinations 创建所需的MessageProducers 和MessageConsumers * 6、告知Connection 开始传送消息。 * throws NamingException * throws JMSException */ public void receiveMessage() throws NamingException, JMSException InitialContext initCtx = new InitialContext(); Context envContext = (Context)initCtx.lookup(java:comp/env); ConnectionFactory connectionFactory = (ConnectionFactory)envContext.lookup(jms/ConnectionFactory); /连接工厂创建一个jms connection Connection connection = connectionFactory.createConnection(); connection.setClientID(testMessageDurableSubscriber); connection.start(); /是生产和消费的一个单线程上下文。会话用于创建消息的生产者,消费者和消息。会话提供了一个事务性的上下文。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);/不支持事务 /目的地是客户用来指定他生产消息的目标还有他消费消息的来源的对象,两种消息传递方式:点对点和发布/订阅 /会话创建消息的生产者将消息发送到目的地 MessageConsumer consumer = session.createDurableSubscriber(Topic)envContext.lookup(jms/topic/MyTopic), name, JMSType = car AND testKey=testValue, false); /使用createDurableSubscriber方法之后,当主题里有消息时,receive()方法则进行一条消息一条消息取出来 /阻塞式监听 consumer.receive() /当主题里没有消息时,则一直等待发送消息 /队列的receive(long timeout)方法用与一次获取一条消息, /当没有消息时,等待一段时间就结束而不会像receive()等待 Message testMessage = consumer.receive(); if(testMessage != null) String returnMessage = testMessage.getStringProperty(testKey); System.out.println(new StringBuilder(-returnMessage=).append(returnMessage).toString(); session.close(); connection.close(); System.out.println(Recive End); 剩下的工作就只剩下接收和发送的jsp页面了,页面里仅仅只是调用上面的两个类中的方法而已。 第二个例子是基于队列的有选择器应用的,还传递对象的例子, 发送消息: Java代码 1. packagecom.test; 2. 3. importjavax.jms.*; 4. importjavax.naming.*; 5. 6. publicclassTestJavaxJmsSendMain 7. 8. publicvoidsendMessage() 9. try 10. /这是自定义的所需传输的对方,必须进行序列化 11. MessageObjectmo=newMessageObject(); 12. mo.setKey(testKey); 13. mo.setTitle(标题); 14. mo.setContent(内容); 15. mo.setDescription(描述测试); 16. System.out.println(SendStart); 17. InitialContextinitCtx=newInitialContext(); 18. ContextenvContext=(Context)initCtx.lookup(java:comp/env); 19. ConnectionFactoryconnectionFactory=(ConnectionFactory)envContext.lookup(jms/ConnectionFactory); 20. Connectionconnection=connectionFactory.createConnection(); 21. Sessionsession=connection.createSession(false,1); 22. MessageProducerproducer=session.createProducer(Destination)envContext.lookup(jms/queue/MyQueue); 23. /MessageProducerproducer=session.createProducer(Destination)envContext.lookup(jms/queue/MyQueue); 24. 25. /ObjectMessage可以用来传输对象 26. /ObjectMessage的属性可以用来设置查询的条件 27. /ObjectMessage.setObject(Object)用来设置需要传输的对象 28. ObjectMessagetestMessage=session.createObjectMessage(); 29. testMessage.setJMSType(ObjectSendTest); 30. testMessage.setStringProperty(testKey,testValue); 31. testMessage.setStringProperty(JMSXGroupID,QS_10/3/11); 32. testMessage.setObject(mo); 33. /发送的两种方式 34. /producer.send(message); 35. /producer.send(message,Message.DEFAULT_DELIVERY_MODE,Message.DEFAULT_PRIORITY,Message.DEFAULT_TIME_TO_LIVE); 36. producer.send(testMessage,Message.DEFAULT_DELIVERY_MODE,Message.DEFAULT_PRIORITY,10000); 37. System.out.println(SendEnd); 38. catch(NamingExceptione) 39. System.out.println(NamingException); 40. e.printStackTrace(); 41. catch(JMSExceptione) 42. System.out.println(JMSException); 43. e.printStackTrace(); 44. 45. 46. package com.test;import javax.jms.*;import javax.naming.*;public class TestJavaxJmsSendMain public void sendMessage() try /这是自定义的所需传输的对方,必须进行序列化 MessageObject mo = new MessageObject(); mo.setKey(testKey); mo.setTitle(标题); mo.setContent(内容); mo.setDescription(描述测试); System.out.println(Send Start); InitialContext initCtx = new InitialContext(); Context envContext = (Context)initCtx.lookup(java:comp/env); ConnectionFactory connectionFactory = (ConnectionFactory)envContext.lookup(jms/ConnectionFactory); Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(false, 1); MessageProducer producer = session.createProducer(Destination)envContext.lookup(jms/queue/MyQueue);/ MessageProducer producer = session.createProducer(Destination)envContext.lookup(jms/queue/MyQueue); /ObjectMessage可以用来传输对象 /ObjectMessage的属性可以用来设置查询的条件 /ObjectMessage.setObject(Object)用来设置需要传输的对象 ObjectMessage testMessage = session.createObjectMessage(); testMessage.setJMSType(ObjectSendTest); testMessage.setStringProperty(testKey, testValue); testMessage.setStringProperty(JMSXGroupID, QS_10/3/11); testMessage.setObject(mo); /发送的两种方式/ producer.send(message);/ producer.send(message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); producer.send(testMessage, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, 10000); System.out.println(Send End); catch(NamingException e) System.out.println(NamingException); e.printStackTrace(); catch(JMSException e) System.out.println(JMSException); e.printStackTrace(); 接收消息: Java代码 1. packagecom.test; 2. 3. 4. importjavax.jms.*; 5. importjavax.naming.*; 6. 7. publicclassTestJavaxJmsReceiveMainimplementsMessageListener,ExceptionListener 8. 9. publicvoidreceiveMessage() 10. try 11. System.out.println(ReceiveStart); 12. InitialContextinitCtx=newInitialContext(); 13. ContextenvContext=(Context)initCtx.lookup(java:comp/env); 14. ConnectionFactoryconnectionFactory=(ConnectionFactory)envContext.lookup(jms/ConnectionFactory); 15. Connectionconnection=connectionFactory.createConnection(); 16. /connection.start(); 17. Sessionsession=connection.createSession(false,1); 18. MessageConsumerconsumer=session.createConsumer(Destination)envContext.lookup(jms/queue/MyQueue),testKey=testValue); 19. /这种采用consumer.setMessageListener(this)方式不太稳定,有时需要发送很多次,接收端才能收到消息,一下能接收到很多条消息 20. /consumer.setMessageListener(this); 21. 22. /主动去获取消息,实现了阻塞式获取消息consumer.receive()consumer.receive(Longtimeout) 23. ObjectMessagetestMessage=(ObjectMessage)consumer.receive(); 24. System.out.println(consumer.getMessageSelector=+consumer.getMessageSelector(); 25. if(testMessage!=null) 26. System.out.println(testMessage.getJMSType=+testMessage.getJMSType(); 27. /这里接收到了自定义的对象 28. MessageObjectmo=(MessageObject)testMessage.getObject(); 29. System.out.println(mo.getContent=+m
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 内蒙古通辽市库伦旗2026届九上化学期中学业水平测试试题含解析
- 江苏省句容市二中学片区合作共同体2026届英语九上期末质量检测模拟试题含解析
- 幼儿园期末汇报通关
- 安徽省宿州十三校2026届英语九年级第一学期期末统考试题含解析
- 福建省泉州台商投资区五校联考2026届九年级化学第一学期期中质量检测试题含解析
- 2026届辽宁省台安县化学九年级第一学期期中监测试题含解析
- 2026届广东省惠阳市马安中学英语九上期末学业质量监测模拟试题含解析
- 2026届浙江省杭州市余杭区英语九上期末经典试题含解析
- 巢湖市重点中学2026届九上化学期中质量检测试题含解析
- 2025年辅警勤务岗面试题及答案
- 第8课《网络新世界》第一课时-统编版《道德与法治》四年级上册教学课件
- 2025年审计部招聘考试模拟题及答案详解
- 2025年招聘市场年中洞察报告-瀚纳仕
- Bowtie安全分析培训课件
- 退役军人优抚政策课件
- 财务遴选笔试题及答案
- (2025秋新版)人教版二年级数学上册全册教案(教学设计)
- 六年级上册音乐课教案
- 肿瘤病人疼痛评估与干预策略
- 物业管理人员考核制度及标准
- 计算机视觉技术课件
评论
0/150
提交评论