activemq-cpp开发手册.doc_第1页
activemq-cpp开发手册.doc_第2页
activemq-cpp开发手册.doc_第3页
activemq-cpp开发手册.doc_第4页
activemq-cpp开发手册.doc_第5页
已阅读5页,还剩79页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

Activemq-cpp开发手册丁靖2008-05-061 引言1.1 编写目的快速学习CMS,提高CMS开发效率,提供一个CMS开发参考手册详细API手册请参考/cms/api_docs/activemqcpp-2.1/1.2 功能介绍Activemq-cpp是一个与ActiveMQ交互通讯的C+ API开发库,为C+开发者提供了一个访问ActiveMQ的接口。Winkeemq-cpp是一个在Activemq-cpp基础上封装的API库,对一些重复机械的初始化及销毁清除及一些不关心的细节进行了封装,从而简化了编程。1.3 术语解析ActiveMQ :开源的消息队列服务器Broker :消息中介,每个消息队列服务器中至少有一个broker,是消息队列的载体Destination :消息在broker上的目的地Queue :消息队列 Topic :主题 Message :消息 Producer :消息产生者 Consumer :消息消费者Client :客户端,生产者和消费者都在客户端上 Server :Activemq服务器BrokerUri :客户端访问服务器上broker时的Uri其它资料请参考/2 开发前准备在开发前必须先安装activemq-cpp及winkeemq-cpp库,具体步骤参考activemq-cpp安装及使用文档.doc3 CMS 3.1 概述CMS(stands for C+ Messaging Service)是一组C+应用程序接口(C+ API),它提供创建、发送、接收、读取消息的服务。定义了一组和Sun公司和它的合作伙伴设计的CMS API相同的公共应用程序接口和相应语法,使得C+程序能够和其他消息组件进行通信。 CMS是一种与厂商无关的 API,用来访问消息收发系统。它类似于 JDBC (Java Database Connectivity):这里,JDBC 是可以用来访问许多不同关系数据库的 API,而 CMS 则提供同样与厂商无关的访问方法,以访问消息收发服务。CMS 使您能够通过消息收发服务(有时称为消息中介程序或路由器)从一个 CMS 客户机向另一个客户机发送消息。消息是 CMS 中的一种类型对象,由两部分组成:报头和消息主体。报头由路由信息以及有关该消息的元数据组成。消息主体则携带着应用程序的数据或有效负载。根据有效负载的类型来划分,可以将消息分为几种类型,它们分别携带:简单文本 (TextMessage)、可序列化的对象 (ObjectMessage)、属性集合 (MapMessage)、字节流 (BytesMessage)、原始值流 (StreamMessage),还有无有效负载的消息 (Message)。 消息收发系统是异步的,也就是说,CMS 客户机可以发送消息而不必等待回应。比较可知,这完全不同于基于 RPC 的(基于远程过程的)系统,如 EJB 1.1、CORBA 和 Java RMI 的引用实现。在 RPC 中,客户机调用服务器上某个分布式对象的一个方法。在方法调用返回之前,该客户机被阻塞;该客户机在可以执行下一条指令之前,必须等待方法调用结束。在 CMS 中,客户机将消息发送给一个虚拟通道(主题或队列),而其它 CMS 客户机则预订或监听这个虚拟通道。当 CMS 客户机发送消息时,它并不等待回应。它执行发送操作,然后继续执行下一条指令。消息可能最终转发到一个或许多个客户机,这些客户机都不需要作出回应。 CMS的通用接口集合以异步方式发送或接收消息。异步方式接收消息显然是使用间断网络连接的客户 机,诸如移动电话和PDA的最好的选择。另外, CMS采用一种宽松结合方式整合企业系统的方法,其主要的目的就是创建能够使用跨平台数据信息的、可移植的企业级应用程序,而把开发人力解放出来。CMS消息服务支持两种消息模型:Point-to-Point消息(P2P)和发布订阅消息(Publish Subscribe messaging,简称Pub/Sub)。CMS规范并不要求供应商同时支持这两种消息模型,但开发者应该熟悉这两种消息模型的优势与缺点。P2P消息模型是在点对点之间传递消息时使用。如果应用程序开发者希望每一条消息都能够被处理,那么应该使用P2P消息模型。与Pub/Sub消息模型不同,P2P消息总是能够被传送到指定的位置。Pub/Sub模型在一到多的消息广播时使用。如果一定程度的消息传递的不可靠性可以被接受的话,那么应用程序开发者也可以使用Pub/Sub消息模型。换句话说,它适用于所有的消息消费程序并不要求能够收到所有的信息或者消息消费程序并不想接收到任何消息的情况。CMS通过允许创建持久订阅来简化时间相关性,即使消息预订者未激活也可以接收到消息。此外,使用 持久订阅还可通过队列提供灵活性和可靠性,而仍然允许消息被发给许多的接收者。 Topic Subscriber topic Subscriber = topicSession.createDurableSubscriber(topic, subscriptionName); Connection对象表示了到两种消息模型中的任一种的消息系统的连接。服务器端和客户机端对象要求管理创建的CMS连接的状态。连接是由 Connection Factory创建的并且通过JNDI查寻定位。 /取得用于 P2P的 QueueConnectionFactory QueueConnectionFactory = queueConnectionFactory(); Context messaging = new InitialContext(); QueueConnectionFactory = (QueueConnectionFactory) Messaging.lookup(“QueueConnectionFactory”); /取得用于 pub/sub的 TopicConnectionFactory TopicConnectonFactory topicConnectionFactory; Context messaging = new InitialContext(); topicConnectionFactory= (TopicConnectionFactory)messaging.lookup(“TopicConnectionFactory”); 注意:用于P2P的代码和用于PublishSubscribe的代码非常相似。如果session被标记为transactional的话,确认消息就通过确认和校正来自动地处理。如果session没有标记为 transactional,你有三个用于消息确认的选项。 AUTO_ACKNOWLEDGE session将自动地确认收到一则消息。 CLIENT_ACKNOWLEDGE 客户端程 序将确认收到一则消息,调用这则消息的确认方法。 DUPS_OK_ACKNOWLEDGE 这个选项命令session“懒散的”确认消息传递,可以想到,这将导致消息提供者传递的一些复制消息可能会出错。这种确认的方式只应当用于消息消费程序 可以容忍潜在的副本消息存在的情况。 queueSession =queueConnection.createQueueSession(false,session.AUTO_ACKNOWLEDGE);/P2P topicSession = topicConnection.createTopicSession(false, session.AUTO_ACKNOWLEDGE); /Pub-Sub注意:在本例中,一个session目的从连结中创建,非值指出session是non-transactional的,并且 session将自动地确认收到一则消息。CMS现在有两种传递消息的方式。标记为NON_PERSISTENT的消息最多投递一次,而标记 为PERSISTENT的消息将使用暂存后再转送的机理投递。如果一个CMS服务离线,那么持久性消息不会丢失但是得等到这个服务恢复联机时才会被传递。 所以默认的消息传递方式是非持久性的。即使使用非持久性消息可能降低内务和需要的存储器,并且这种传递方式只有当你不需要接收所有的消息时才使用。虽然 CMS规范并不需要CMS供应商实现消息的优先级路线,但是它需要递送加快的消息优先于普通级别的消息。CMS定义了从0到9的优先级路线级别,0是最低 的优先级而9则是最高的。更特殊的是0到4是正常优先级的变化幅度,而5到9是加快的优先级的变化幅度。举例来说: topicPublisher.publish (message, DeliveryMode.PERSISTENT, 8, 10000); /Pub-Sub 或 queueSender.send(message, DeliveryMode.PERSISTENT, 8, 10000);/P2P 这个代码片断,有两种消息模型,映射递送方式是持久的,优先级为加快型,生存周期是10000 (以毫秒度量 )。如果生存周期设置为零,这则消息将永远不会过期。当消息需要时间限制否则将使其无效时,设置生存周期是有用的。CMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。 StreamMessage - Java原始值的数据流 MapMessage-一套名称-值对 TextMessage-一个字符串对象 ObjectMessage-一个序列化的 Java对象 BytesMessage-一个未解释字节的数据流CMS应用程序接口提供用于创建每种类型消息和设置荷载的方法例如,为了在一个队列创建并发送一个 TextMessage实例,你可以使用下列语句: TextMessage message = queueSession.createTextMessage(); message.setText(textMsg); 以异步方式接收消息,需要创建一个消息监听器然后注册一个或多个使用MessageConsumer的CMS MessageListener接口。会话(主题或队列)负责产生某些消息,这些消息被传送到使用onMessage方法的监听者那里。 Using namespace cms; class ExampleListener : public MessageListener /把消息强制转化为TextMessage格式 public void onMessage(Message message) TextMessage textMsg = null; / 打开并处理这段消息 当我们创建QueueReceiver和TopicSubscriber时,我们传递消息选择器字符串: /P2P QueueReceiver QueueReceiver receiver; receiver = session.createReceiver(queue, selector); /Pub-Sub TopicSubscriber TopicSubscriber subscriber; subscriber = session.createSubscriber(topic, selector);为了启动消息的交付,不论是Pub/Sub还是P2P,都需要调用start方法。 TopicConnection.start(); /pub-sub QueueConnection.start(); /P2P 当一条消息被捕捉时,这条消息做为一条必须被强制转化为适当消息类型的普通Message对象到达。如TextMessagevoid onMessage(const Message* message) TextMessage txtMsg=dynamic_cast(message); /对 txtMsg做一些处理停止消息的传递,无论是Pub/Sub还是P2P,都调用stop方法。 TopicConnection. stop (); /pub-sub QueueConnection. stop (); /P2P 3.2 接口描述CMS 支持两种消息类型P2P 和Pub/Sub,分别称作:P2P Domain 和Pub/Sub Domain,这两种接口都继承统一的CMS Parent 接口,CMS 主要接口如下所示:CMS ParentConnectionFactoryConnectionDestinationSessionMessageProducerMessageConsumer 以下是对这些接口的简单描述: ConnectionFactory :连接工厂,CMS 用它创建连接 Connection :CMS 客户端到CMS Provider 的连接 Destination :消息的目的地 Session: 一个发送或接收消息的线程 MessageProducer: 由Session 对象创建的用来发送消息的对象 MessageConsumer: 由Session 对象创建的用来接收消息的对象3.3 CMS消息模型CMS 消息由以下几部分组成:消息头,属性,消息体。 消息头(Header) - 消息头包含消息的识别信息和路由信息,消息头包含一些标准的属性如:CMSDestination,CMSMessageID 等。 消息头由谁设置CMSDestinationsend 或 publish 方法CMSDeliveryModesend 或 publish 方法CMSExpirationsend 或 publish 方法CMSPrioritysend 或 publish 方法CMSMessageIDsend 或 publish 方法CMSTimestampsend 或 publish 方法CMSCorrelationID客户CMSReplyTo客户CMSType客户CMSRedeliveredCMS Provider 属性(Properties) - 除了消息头中定义好的标准属性外,CMS 提供一种机制增加新属性到消息头中,这种新属性包含以下几种: 1. 应用需要用到的属性; 2. 消息头中原有的一些可选属性; 3. CMS Provider 需要用到的属性。 标准的CMS 消息头包含以下属性: CMSDestination -消息发送的目的地 CMSDeliveryMode -传递模式, 有两种模式: PERSISTENT 和NON_PERSISTENT,PERSISTENT 表示该消息一定要被送到目的地,否则会导致应用错误。NON_PERSISTENT 表示偶然丢失该消息是被允许的,这两种模式使开发者可以在消息传递的可靠性和吞吐量之间找到平衡点。 CMSMessageID 唯一识别每个消息的标识,由CMS Provider 产生。 CMSTimestamp 一个消息被提交给CMS Provider 到消息被发出的时间。 CMSCorrelationID 用来连接到另外一个消息,典型的应用是在回复消息中连接到原消息。 CMSReplyTo 提供本消息回复消息的目的地址。 CMSRedelivered 如果一个客户端收到一个设置了CMSRedelivered 属性的消息,则表示可能该客户端曾经在早些时候收到过该消息,但并没有签收(acknowledged)。 CMSType 消息类型的识别符。 CMSExpiration 消息过期时间,等于QueueSender 的send 方法中的timeToLive 值或TopicPublisher 的publish 方法中的timeToLive 值加上发送时刻的GMT 时间值。如果timeToLive值等于零,则CMSExpiration 被设为零,表示该消息永不过期。如果发送后,在消息过期时间之后消息还没有被发送到目的地,则该消息被清除。 CMSPriority 消息优先级,从0-9 十个级别,0-4 是普通消息,5-9 是加急消息。CMS 不要求CMS Provider 严格按照这十个优先级发送消息,但必须保证加急消息要先于普通消息到达。 消息体(Body) - CMS API 定义了4种消息体格式,也叫消息类型,你可以使用不同形式发送接收数据并可以兼容现有的消息格式,下面描述这4种类型: 消息类型消息体TextMessagestring对象,如xml文件内容MapMessage名/值对的集合,名是string对象,值类型可以是c+任何基本类型BytesMessage字节流ObjectMessage对象类型Message没有消息体,只有消息头和属性。下例演示创建并发送一个TextMessage到一个队列: TextMessage message = queueSession.createTextMessage();message.setText(msg_text); / msg_text is a Stringmessage.setCMSType(“text”);queueSender.send(message);下例演示接收消息并转换为合适的消息类型: Message* m = queueReceiver.receive();If (m-getCMSType() = “text”)TextMessage txt=dynamic_cast(m);/ do somethingelse4 消息生产者客户端消息生产者产生消息并将消息发送到broker上的队列或主题中。要使消息生产者生产的消息被消息消费者消费,必须满足两个条件:生产者和消费者必须连接到同一个Broker, 即BrokerUri中主机名和端口相同生产者和消费者必须具有相同的destination, 即同一个队列名或主题名4.1 使用activemq-cpp来创建消息生产者4.1.1 头文件及名字空间#include #include #include #include #include #include #include #include using namespace activemq;using namespace activemq:core;using namespace cms;using namespace std;4.1.2 创建一个生产者类class SimpleProducer private: Connection* connection; /连接对象 Session* session; /会话 Destination* destination; /消息目的地 MessageProducer* producer; /消息生产者 bool useTopic; /是否采用采用主题模式 bool clientAck; /是否自动确认消息接收 unsigned int numMessages; /生产消息数 std:string brokerURI; /连接borker uri std:string destURI; /队列或主题名public:/./构造函数SimpleProducer( const std:string& brokerURI, unsigned int numMessages, const std:string& destURI, bool useTopic = false, bool clientAck = false ) connection = NULL; session = NULL; destination = NULL; producer = NULL; this-numMessages = numMessages; this-useTopic = useTopic; this-brokerURI = brokerURI; this-destURI = destURI; this-clientAck = clientAck;initialize();virtual SimpleProducer() cleanup();4.1.3 初始化及销毁/ 初始化private:Virtual void initialize()try / 创建连接工厂 ActiveMQConnectionFactory* connectionFactory = new ActiveMQConnectionFactory( brokerURI ); / 创建一个到broker的连接 connection = connectionFactory-createConnection(); connection-start(); / 关闭连接工厂 delete connectionFactory; / 创建一个会话 if( clientAck ) /消息接收后由消费者客户端确认 session= connection-createSession( Session:CLIENT_ACKNOWLEDGE ); else /消息接收自动确认 session = connection-createSession( Session:AUTO_ACKNOWLEDGE ); / 创建一个队列或主题 (Topic or Queue) if( useTopic ) destination = session-createTopic( destURI ); else destination = session-createQueue( destURI ); / 创建生产者并设定消息传送模式 producer = session-createProducer( destination ); producer-setDeliveryMode( DeliveryMode:NON_PERSISTENT );catch ( CMSException& e ) e.printStackTrace(); / 销毁void cleanup() / Destroy resources. try if( destination != NULL ) delete destination; catch ( CMSException& e ) e.printStackTrace(); destination = NULL; try if( producer != NULL ) delete producer; catch ( CMSException& e ) e.printStackTrace(); producer = NULL; / Close open resources. try if( session != NULL ) session-close(); if( connection != NULL ) connection-close(); catch ( CMSException& e ) e.printStackTrace(); try if( session != NULL ) delete session; catch ( CMSException& e ) e.printStackTrace(); session = NULL; try if( connection != NULL ) delete connection; catch ( CMSException& e ) e.printStackTrace(); connection = NULL; 4.1.4 生产一个消息并发送到队列中public :void send() / 消息内容 string text = (string)Hello world! thread ; for( std:size_t ix=0; ixcreateTextMessage( text ); / 发送消息 printf( Sent message #%d n, ix+1 ); producer-send( message ); / 释放消息 delete message;4.1.5 发送消息主程序Int main(void) / broker uristd:string brokerURI = tcp:/:61616 ?wireFormat=openwire &transport.useAsyncSend=true/ 发送消息数unsigned int numMessages = 2000;/ 消息队列名std:string destURI = TEST.FOO;/ 使用队列模式bool useTopics = false; /初始化一个消息生产者对象并发送消息 SimpleProducer producer( brokerURI, numMessages, destURI, useTopics );producer.send();return 0;4.1.6 总结综上例子可知,每次发送一个消息到消息队列中都需要定义一个生产者类,并完成一个机械的初始化及销毁过程。为了提高软件开发效率,可以模仿生产者类定义一个消息发送者类,封装相关细节,并编译成共享库以供使用,简化编程过程。4.2 使用winkeemq-cpp来创建消息生产者4.2.1 头文件及名字空间#include using namespace winkeemq;using namespace std;4.2.2 发送消息主程序int main(int argc, char* argv)/ broker uri std:string brokerURI = tcp:/79:61616 ?wireFormat=openwire &wireFormat.maxInactivityDuration=0 &soKeepAlive=true &transport.useAsyncSend=true;/ 队列名string mqName=mm.mq;/ 创建一个消息发送对象(采用队列模式,每次只发一个消息)MessageSender ms(brokerURI,1,false,mqName);string body=”hello worldn”;/ 创建一个文本消息TextMessage* msg=dynamic_cast (ms.createMessag(MessageSender:TEXT_MESSAGE);/ 设定消息体内容 msg-setText(body); / 发送消息 ms.sendMessage();/ 销毁消息 ms.deleteMessage();4.2.3 总结由上述例子可看出,采用winkeemq-cpp后代码量精简了很多,开发员不需要关心那些机械的初始化细节。要创建一个消息生产者,只需要给定Broker uri, 队列名,消息目的模式,然后调用MessageSender的createMessage()创建一个具体类型的消息,createMessage()的参数是一个在MessageSender中定义的一个无名enum, 指明消息的类型。调用MessageSender的sendMessage()发送消息到broker中,最后销毁消息5 消息消费者客户端消息消费者从Broker上的队列或主题中取出消息并做相应的处理。5.1 使用activemq-cpp来创建消息消息者5.1.1 头文件及名字空间#include #include #include #include #include #include #include #include #include #include #include using namespace activemq;using namespace activemq:core;using namespace cms;using namespace std;5.1.2 创建一个生产者类class SimpleAsyncConsumer : public ExceptionListener, public MessageListener private: Connection* connection; /连接对象 Session* session; /会话 Destination* destination; /消息目的地 MessageConsumer* consumer; /消息消费者 bool useTopic; /是否采用采用主题模式 bool clientAck; /是否自动确认消息接收 std:string brokerURI; /连接borker uri std:string destURI; /队列或主题名public:/./构造函数 SimpleAsyncConsumer( const std:string& brokerURI, const std:string& destURI, bool useTopic = false, bool clientAck = false ) connection = NULL; session = NULL; destination = NULL; consumer = NULL; this-useTopic = useTopic; this-brokerURI = brokerURI; this-destURI = destURI;this-clientAck = clientAck;initialize (); virtual SimpleAsyncConsumer () cleanup();5.1.3 初始化及销毁private:/ 初始化virtual void initialize()try / 创建连接工厂 ActiveMQConnectionFactory* connectionFactory = new ActiveMQConnectionFactory( brokerURI ); / 创建一个到broker的连接 connection = connectionFactory-createConnection(); connection-start(); / 设置连接异常侦听类connection-setExceptionListener(this); / 关闭连接工厂 delete connectionFactory; / 创建一个会话 if( clientAck ) /消息接收后由消费者客户端确认 session= connection-createSession( Session:CLIENT_ACKNOWLEDGE ); else /消息接收自动确认 session = connection-createSession( Session:AUTO_ACKNOWLEDGE ); / 创建一个队列或主题 (Topic or Queue) if( useTopic ) destination = session-createTopic( destURI ); else destination = session-createQueue( destURI ); / 创建消费者并设定消息接收侦听类 consumer = session-createConsumer( destination ); consumer-setMessageListener( this );catch ( CMSException& e ) e.printStackTrace(); / 销毁void cleanup() / Destroy resources. try if( destination != NULL ) delete destination; catch ( CMSException& e ) e.printStackTrace(); destination = NULL; try if( producer != NULL ) delete producer; catch ( CMSException& e ) e.printStackTrace(); producer = NULL; / Close open resources. try if( session != NULL ) session-close(); if( connection != NULL ) connection-close(); catch ( CMSException& e ) e.printStackTrace(); try if( session != NULL ) delete session; catch ( CMSException& e ) e.printStackTrace(); session = NULL; try if( connection != NULL ) delete connection; catch ( CMSException& e ) e.printStackTrace(); connection = NULL; 5.1.4 从消息队列中异步接收消息如果队列中有消息到来,程序会自动调用onMessage函数,因此只需要在onMessage()中编写对消息的处理。onMessage()函数中的message参数在onMessage()返回后便会自动销毁,可以通过调用Message的clone()方法拷贝自

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

最新文档

评论

0/150

提交评论