




已阅读5页,还剩22页未读, 继续免费阅读
版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
1 JMSJMS源于企业应用对于消息中间件的需求,使应用程序可以通过消息进行异步处理而互不影响。Sun公司和它的合作伙伴设计的JMS API定义了一组公共的应用程序接口和相应语法,使得Java程序能够和其他消息组件进行通信。1.1 JMS的基本构件1.1.1 连接工厂连接工厂是客户用来创建连接的对象,例如ActiveMQ提供的ActiveMQConnectionFactory。1.1.2 连接JMS Connection封装了客户与JMS提供者之间的一个虚拟的连接。1.1.3 会话JMS Session是生产和消费消息的一个单线程上下文。会话用于创建消息生产者(producer)、消息消费者(consumer)和消息(message)等。会话提供了一个事务性的上下文,在这个上下文中,一组发送和接收被组合到了一个原子操作中。1.1.4 目的地目的地是客户用来指定它生产的消息的目标和它消费的消息的来源的对象。JMS1.0.2规范中定义了两种消息传递域:点对点(PTP)消息传递域和发布/订阅消息传递域。点对点消息传递域的特点如下:l 每个消息只能有一个消费者。l 消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发送消息的时候是否处于运行状态,它都可以提取消息。发布/订阅消息传递域的特点如下:l 每个消息可以有多个消费者。l 生产者和消费者之间有时间上的相关性。订阅一个主题的消费者只能消费自它订阅之后发布的消息。JMS规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活状态时发送的消息。在点对点消息传递域中,目的地被成为队列(queue);在发布/订阅消息传递域中,目的地被成为主题(topic)。1.1.5 消息生产者消息生产者是由会话创建的一个对象,用于把消息发送到一个目的地。1.1.6 消息消费者消息消费者是由会话创建的一个对象,它用于接收发送到目的地的消息。消息的消费可以采用以下两种方法之一:l 同步消费。通过调用消费者的receive方法从目的地中显式提取消息。receive方法可以一直阻塞到消息到达。l 异步消费。客户可以为消费者注册一个消息监听器,以定义在消息到达时所采取的动作。1.1.7 消息JMS消息由以下三部分组成:l 消息头。每个消息头字段都有相应的getter和setter方法。l 消息属性。如果需要除消息头字段以外的值,那么可以使用消息属性。l 消息体。JMS定义的消息类型有TextMessage、MapMessage、BytesMessage、StreamMessage和ObjectMessage。1.2 JMS的可靠性机制1.2.1 消息确认JMS消息只有在被确认之后,才认为已经被成功地消费了。消息的成功消费通常包含三个阶段:客户接收消息、客户处理消息和消息被确认。在事务性会话中,当一个事务被提交的时候,确认自动发生。在非事务性会话中,消息何时被确认取决于创建会话时的应答模式(acknowledgement mode)。该参数有以下三个可选值:l Session.AUTO_ACKNOWLEDGE。当客户成功的从receive方法返回的时候,或者从MessageListener.onMessage方法成功返回的时候,会话自动确认客户收到的消息。l Session.CLIENT_ACKNOWLEDGE。客户通过消息的acknowledge方法确认消息。需要注意的是,在这种模式中,确认是在会话层上进行:确认一个被消费的消息将自动确认所有已被会话消费的消息。例如,如果一个消息消费者消费了10个消息,然后确认第5个消息,那么所有10个消息都被确认。l Session.DUPS_ACKNOWLEDGE。该选择只是会话迟钝第确认消息的提交。如果JMS provider失败,那么可能会导致一些重复的消息。如果是重复的消息,那么JMS provider必须把消息头的JMSRedelivered字段设置为true。1.2.2 持久性JMS 支持以下两种消息提交模式:l PERSISTENT。指JMS provider持久保存消息,以保证消息不会因为JMS provider的失败而丢失。l NON_PERSISTENT。不要求JMS provider持久保存消息。1.2.3 优先级可以使用消息优先级来指示JMS provider首先提交紧急的消息。优先级分10个级别,从0(最低)到9(最高)。如果不指定优先级,默认级别是4。需要注意的是,JMS provider并不一定保证按照优先级的顺序提交消息。1.2.4 消息过期可以设置消息在一定时间后过期,默认是永不过期。1.2.5 本地事务在一个JMS客户端,可以使用本地事务来组合消息的发送和接收。JMS Session接口提供了commit和rollback方法。事务提交意味着生产的所有消息被发送,消费的所有消息被确认;事务回滚意味着生产的所有消息被销毁,消费的所有消息被恢复并重新提交,除非它们已经过期。事务性的会话总是牵涉到事务处理中,commit或rollback方法一旦被调用,一个事务就结束了,而另一个事务被开始。关闭事务性会话将回滚其中的事务。需要注意的是,如果使用请求/回复机制,即发送一个消息,同时希望在同一个事务中等待接收该消息的回复,那么程序将被挂起,因为知道事务提交,发送操作才会真正执行。需要注意的还有一个,消息的生产和消费不能包含在同一个事务中。2 ActiveMQActiveMQ是apache旗下开源消息中间件,是目前最流行的开源的消息中间件。ActiveMQ功能特点:l 支持跨语言的客户端,如:Java,C和C + +,C,Ruby,Perl,Python和PHP。l 在JMS客户端和消息代理都全面支持企业集成模式。l 支持许多高级功能,如:信息组,虚拟目的地,通配符和复合目的地。l 完全支持JMS 1.1和J2EE1.4 。l 支持Spring,以便ActiveMQ可以很容易地嵌入到Spring应用程序和使用Spring的XML配置机制 。l 通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试。 l 支持可插拔传输协议,如:in-VM,TCP, SSL, NIO, UDP, multicast, JGroups and JXTA 传输。l 支持通过JDBC和journal提供高速的消息持久化。l 设计基于高性能集群,客户端服务器,对等通信。l Ajax支持网络流媒体,支持使用纯DHTML的Web浏览器,允许Web浏览器成为消息传递结构的一部分。l 支持CXF和Axis,以便ActiveMQ可以很容易地进入这些Web服务并提供可靠的消息传递。l 可作为一个在内存中的JMS提供者,是JMS的单元测试的理想选择。2.1 安装ActiveMQ 可以从官方网站(/)下载最新版的ActiveMQ。(最新版的为:5.4.2)2.2 启动ActiveMQ在下载最新的ActiveMQ将其解压到相应的目录就可以了。需要启动ActiveMQ只要找到$activemq.home/bin目录,双击运行activemq.bat就可以了(windows版本)。启动后的界面如上图。(ActiveMQ5.3.0版本启动后的截图,不同版本会有所不同)Listening for connections at: tcp:/74:61616(这里的IP显示的是机器名称)tcp:/74:61616表示监听的端口地址,也就是编写程序时,获取连接进用到的URL。ActiveMQ Console at 74:8161/admin 74:8161/admin这个是新版本的activeMQ提供的管理工具访问地址,可以查看队列详情,生产者,消费者等信息。2.3 配置ActiveMQ2.3.1 基本配置说明ActiveMQ默认使用的是XML格式配置,配置文件在$activemq.home/conf目录下,文件名为activemq.xml file:$activemq.base/conf/perties !- The element is used to configure the ActiveMQ broker. - producerFlowControl=true memoryLimit=1mb producerFlowControl=true memoryLimit=1mb !- Use VM cursor for better latency For more information, see: /message-cursors.html - !- The systemUsage controls the maximum amount of space the broker will use before slowing down producers. For more information, see: /producer-flow-control.html - Kahadb配置说明配置示例:KahaDB的各个可配置属性:属性默认值描述directoryactivemq-data保存message store数据文件的目录indexWriteBatchSize1000批量更新索引的阀值,当要更新的索引到达这个索引时,批量更新到metadata store中indexCacheSize10000指定metadata cache的大小enableIndexWriteAsyncfalse写入索引文件到metadata store中的方式是否采用异步写入journalMaxFileLength32mb消息持久数据文件的大小enableJournalDiskSyncstrue如果为true,保证使用同步写入的方式持久化消息到journal文件中cleanupInterval30000清除(清除或归档)不再使用的journal 文件的时间周期(毫秒)。checkpointInterval5000写入索引信息到metadata store中的时间周期(毫秒)ignoreMissingJournalfilesfalse是否忽略丢失的journal文件。如果为false,当丢失了journal文件时,broker启动时会抛异常并关闭checkForCorruptJournalFilesfalse如果为true,broker在启动的时候会检测journal文件是否损坏,若损坏便尝试恢复它。checksumJournalFilesfalse如果为true。KahaDB为journal文件生产一个checksum,以便能够检测journal文件是否损坏。archiveDataLogsfalse如果为true,当达到cleanupInterval周期时,会归档journal文件而不是删除directoryArchivenull指定归档journal文件存放的路径databaseLockedWaitDelay10000在使用主从数据库备份时,等待获取DB上的lock的延迟时间。maxAsyncJobs10000等待写入journal文件的任务队列的最大数量。应该大于或等于最大并发producer的数量。配合并行存储转发属性使用。concurrentStoreAndDispatchTransactionsfalse如果为true,转发消息的时候同时提交事务concurrentStoreAndDispatchTopicsfalse如果为true,转发Topic消息的时候同时存储消息的message store中。concurrentStoreAndDispatchQueuestrue如果为true,转发Queue消息的时候同时存储消息到message store中。2.3.2 安全性配置说明(以5.3.0的版本为例,各个版本配置会有所不同,需查阅相应资料)访问activeMQ权限配置是在$activemq.home/conf目录下的activemq.xml中配置的,默认是没有进行配置,任何用户都是可以访问activeMQ。如果需要对访问权限进行控制可以进行相应配置,可以指定到相应队列的访问权限。1. 配置activemq.xml配置权限需要在,activemq.xml的元素中加入以下部分。 /queue=对应的是所有队列/ read=”admins” 表示admins组的所有用户都可以消费前面queue属性指定的队列的消息/ write=admins表示admins组的所有用户都可以向前面queue属性指定的队列发送消息/ admin=admins表示admins组的所有用户可以在前面queue属性指定的队列不存在的情况创建队列 read=admins write=admins admin=admins / queue=TEST.表示队列名称所有以”TEST.”(包含“.”)开头的队列,(这里建议队列名称以“.”作分隔符) read=users write=users admin=users / read=users write=users admin=users / write=guests,users admin=guests,users可以同时指定多个组 read=guests write=guests,users admin=guests,users /主题相关配置,与队列配置类似 read=admins write=admins admin=admins / read=users write=users admin=users / read=guests write=guests,users admin=guests,users / read=guests,users write=guests,users admin=guests,users/ 2. 用户配置在$activemq.home/conf目录下增加 login.config、perties、perties三个文件。l login.config内容activemq-domain org.apache.activemq.jaas.PropertiesLoginModule required debug=true perties.user=perties perties.group=perties; ;perties.user=perties指定用户对应的配置文件perties.group=perties;指定用户组对应的配置文件l perties内容admins=systemusers=zengjun,llguests=guestusers=zengjun,llusers表示用户组名,与activemq.xml安全性配置中的read,write,admin属性值对应。zengjun,ll表示两个用户名zengjun和ll,这里表示users组下有两个用户zengjun和ll。l perties内容system=managerzengjun=zjll=llzengjun=zjzengjun表示用户名,与perties配置用户名对应zj表示等号左边用户对应的密码。注:在配置安全性配置后,在写代码创建连接时需要加上对应的用户名与密码。如:Connection connection = connectionFactory.createConnection(zengjun,zj);2.4 点对点域2.4.1 生产消息生产消息都步骤如上图的深色部分所示。首先需要从连接工厂中获取到连接,然后通过连接来创建会话,再通过会话来创建目的地,再用会话与目的地来创建生产者。需要发送的消息也是通过会话来创建的。最后通过生产者来发送消息。示例代码:/初使化连接工厂ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(tcp:/74:61616); Connection connection = null; Session session = null; MessageProducer producer = null;Destination destination = null;/创建连接connection = connectionFactory.createConnection();/创建会话 session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);/创目的地destination = session.createQueue(TEST_QUEUE_ZJ);/创生产者producer = session.createProducer(destination);/设置消息的持久模式producer.setDeliveryMode(DeliveryMode.PERSISTENT);connection.start();/创建消息TextMessage message = session.createTextMessage();/设置消息属性 double d = Math.random();message.setStringProperty(ID, String.valueOf(d);message.setText(tttt111);message.setText(tttt22222222);/发送消息producer.send(message);2.4.2 消费消息消费消息如上图深色部分所示。首先需要从连接工厂创建连接,然后再通过连接创建会话,然后通过会话创建目的地,再通过会话与目的地来创消费者,然后消费者调用接口来消费消息。代码示例:/初始连接工厂 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(tcp:/74:61616); Session session = null; MessageConsumer consumer = null; Connection connection = null; Message message = null;Destination destination=null;/创建连接connection = connectionFactory.createConnection();connection.start();/创建会话,指定消息的确认模式,(确认模式请参1.2.1)session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);/创建目的地destination = session.createQueue(TEST_QUEUE_ZJ);/创建消息者consumer = session.createConsumer(destination);/消息者调用接收消息接口来接收消息。/ 无时间参数表示一直等待,直到收到消息。/ message = consumer.receive();/ 有时间参数表示指定时间后没有消息则结束时,如果存在消息就在取完消息后结束message = consumer.receive(5 * 1000);/ 如果没有收到消息,不会等待,立即往下执行,不建议使用/ message = consumer.receiveNoWait();/判断是否收到消息。if (message != null) /判断消息的类型if (message instanceof TextMessage) TextMessage textMessage = (TextMessage) message;String text = textMessage.getText();System.out.println(TEXT: + text);/ 确认消息textMessage.acknowledge(); else if (message instanceof StreamMessage) StreamMessage streamMessage = (StreamMessage) message;String strId = streamMessage.getStringProperty(ID);System.out.println(streammessage ID: + strId);/确认消息streamMessage.acknowledge(); else System.out.println(没有收到消息);还可以采用监听的方式来消费消息。采用监听的方式,需要实现MessageListener接口,创建消费者的方式与前面描消费消息的步骤一致,在建创建好消费者后需要设置实现MessageListener接口的监听器,当监听器监听到消费者对应的目的地上有消息时,会自动调用onMessage方法,在onMessage方法中可以得到消息。代码示例:/实现MessageListener接口public class AmqConsumerOnMessage implements MessageListener public static void main(String args) new AmqConsumerOnMessage().createConsumer(TEST_queuE);/实现onMessage方法 public void onMessage(Message message) try if (message != null) if (message instanceof TextMessage) TextMessage textMessage = (TextMessage) message; String text = textMessage.getText(); System.out.println(TEXT: + text); else if (message instanceof StreamMessage) StreamMessage streamMessage = (StreamMessage) message; String strId = streamMessage.getStringProperty(ID); System.out.println(streammessage ID: + strId); /确认消息 message.acknowledge(); else System.out.println(没有收到消息); catch (JMSException e) e.printStackTrace(); /创建消费者 public void createConsumer(String queue) MessageConsumer consumer = null; Session session = null; Connection connection = null; try ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(tcp:/74:61616); connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); Destination objDestination = null; objDestination = session.createQueue(queue); consumer = session.createConsumer(objDestination);/设置监听器,来监听消息 consumer.setMessageListener(this); catch (JMSException e) e.printStackTrace(); finally ConnectionUtil.closeAll(connection, session, consumer); 2.5 发布/订阅域2.5.1 发布消息发布消息与点对点域的生产消息类似。只是将队列换成了主题,将生产者换成发布者代码示例:/初始化连接工厂ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(tcp:/74:61616);TopicConnection connection = null;ActiveMQTopicSession session = null;ActiveMQTopicPublisher publisher = null;ActiveMQTopic topic = null; /创建连接connection = connectionFactory.createTopicConnection(zengjun, zj);/创建会话session = (ActiveMQTopicSession) connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);/创建主题topic = (ActiveMQTopic) session.createTopic(TEST.topic.zj);/创建发布者publisher = (ActiveMQTopicPublisher) session.createPublisher(topic);/设置消息持久方式,如果要实现持久订阅,持久方法必须是DeliveryMode.PERSISTENTpublisher.setDeliveryMode(DeliveryMode.PERSISTENT);connection.start();TextMessage message = session.createTextMessage();mes
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 广东省紫金县2026届化学高一第一学期期末调研模拟试题含解析
- 情景转述课件
- 2026届山东省莒县第二中学实验班化学高一上期中质量检测试题含解析
- 威海市重点中学2026届高二化学第一学期期中复习检测模拟试题含解析
- 园林绿化个人年度工作方案
- 医院医生年度工作方案
- 成功的茶叶营销策划方案
- 社区三八妇女节活动方案
- 识字试卷测试题及答案
- 鼻肠管留置操作流程
- 施工技术管理考核内容及评分标准研究
- 《电磁感应现象解析》课件
- 小儿过敏性紫癜患者的护理课件
- 《新型冠状病毒肺炎诊治要点》课件
- 门诊分诊知识培训课件
- 武汉市2025年高三语文四调10篇高分作文范文:去过与感动过
- 行政执法三项制度培训课件
- 射阳县卫生健康委员会直属事业单位招聘考试真题2024
- 普通铣床基础知识
- 《混凝土路面施工技术》课件
- 2025年策划资金注入框架协议书
评论
0/150
提交评论