ActiveMQ持久化方式.doc_第1页
ActiveMQ持久化方式.doc_第2页
ActiveMQ持久化方式.doc_第3页
ActiveMQ持久化方式.doc_第4页
ActiveMQ持久化方式.doc_第5页
已阅读5页,还剩14页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

ActiveMQ持久化方式消息持久性对于可靠消息传递来说应该是一种比较好的方法,有了消息持久化,即使发送者和接受者不是同时在线或者消息中心在发送者发送消息后宕机了,在消息中心重新启动后仍然可以将消息发送出去,如果把这种持久化和ReliableMessaging结合起来应该是很好的保证了消息的可靠传送。消息持久性的原理很简单,就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等,然后试图将消息发送给接收者,发送成功则将消息从存储中删除,失败则继续尝试。消息中心启动以后首先要检查制定的存储位置,如果有未发送成功的消息,则需要把消息发送出去。ActiveMQ持久化方式:AMQ、KahaDB、JDBC、LevelDB。1、AMQAMQ是一种文件存储形式,它具有写入速度快和容易恢复的特点。消息存储在一个个文件中,文件的默认大小为32M,如果一条消息的大小超过了32M,那么这个值必须设置大一点。当一个存储文件中的消息已经全部被消费,那么这个文件将被标识为可删除,在下一个清除阶段,这个文件被删除。AMQ适用于ActiveMQ5.3之前的版本。默认配置如下:?123属性如下:属性名称默认值描述directoryactivemq-data消息文件和日志的存储目录useNIOtrue使用NIO协议存储消息syncOnWritefalse同步写到磁盘,这个选项对性能影响非常大maxFileLength32Mb一个消息文件的大小persistentIndextrue消息索引的持久化,如果为false,那么索引保存在内存中maxCheckpointMessageAddSize4kb一个事务允许的最大消息量cleanupInterval30000清除操作周期,单位msindexBinSize1024索引文件缓存页面数,缺省为1024,当amq扩充或者缩减存储时,会锁定整个broker,导致一定时间的阻塞,所以这个值应该调整到比较大,但是代码中实现会动态伸缩,调整效果并不理想。indexKeySize96索引key的大小,key是消息IDindexPageSize16kb索引的页大小directoryArchivearchive存储被归档的消息文件目录archiveDataLogsfalse当为true时,归档的消息文件被移到directoryArchive,而不是直接删除2、KahaDBKahaDB是基于文件的本地数据库储存形式,虽然没有AMQ的速度快,但是它具有强扩展性,恢复的时间比AMQ短,从5.4版本之后KahaDB做为默认的持久化方式。默认配置如下:?123KahaDB的属性如下:属性名称默认值描述directoryactivemq-data消息文件和日志的存储目录indexWriteBatchSize1000一批索引的大小,当要更新的索引量到达这个值时,更新到消息文件中indexCacheSize10000内存中,索引的页大小enableIndexWriteAsyncfalse索引是否异步写到消息文件中journalMaxFileLength32mb一个消息文件的大小enableJournalDiskSyncstrue是否讲非事务的消息同步写入到磁盘cleanupInterval30000清除操作周期,单位mscheckpointInterval5000索引写入到消息文件的周期,单位msignoreMissingJournalfilesfalse忽略丢失的消息文件,false,当丢失了消息文件,启动异常checkForCorruptJournalFilesfalse检查消息文件是否损坏,true,检查发现损坏会尝试修复checksumJournalFilesfalse产生一个checksum,以便能够检测journal文件是否损坏。5.4版本之后有效的属性:archiveDataLogsfalse当为true时,归档的消息文件被移到directoryArchive,而不是直接删除directoryArchivenull存储被归档的消息文件目录databaseLockedWaitDelay10000在使用负载时,等待获得文件锁的延迟时间,单位msmaxAsyncJobs10000同个生产者产生等待写入的异步消息最大量concurrentStoreAndDispatchTopicsfalse当写入消息的时候,是否转发主题消息concurrentStoreAndDispatchQueuestrue当写入消息的时候,是否转发队列消息5.6版本之后有效的属性:archiveCorruptedIndexfalse是否归档错误的索引每个KahaDB的实例都可以配置单独的适配器,如果没有目标队列提交给filteredKahaDB,那么意味着对所有的队列有效。如果一个队列没有对应的适配器,那么将会抛出一个异常。配置如下:?12345678910111213141516171819如果filteredKahaDB的perDestination属性设置为true,那么匹配的目标队列将会得到自己对应的KahaDB实例。配置如下:?1234567891011123、JDBC可以将消息存储到数据库中,例如:Mysql、SQL Server、Oracle、DB2。配置JDBC适配器:?123dataSource指定持久化数据库的bean,createTablesOnStartup是否在启动的时候创建数据表,默认值是true,这样每次启动都会去创建数据表了,一般是第一次启动的时候设置为true,之后改成false。?12345678910111213141516171819202122232425262728293031323334Mysql持久化bean:SQL Server持久化bean:Oracle持久化bean:DB2持久化bean:4、LevelDB这种文件系统是从ActiveMQ5.8之后引进的,它和KahaDB非常相似,也是基于文件的本地数据库储存形式,但是它提供比KahaDB更快的持久性。与KahaDB不同的是,它不是使用传统的B-树来实现对日志数据的提前写,而是使用基于索引的LevelDB。默认配置如下:?123属性如下:属性名称默认值描述directoryLevelDB数据文件的存储目录readThreads10系统允许的并发读线程数量synctrue同步写到磁盘logSize104857600 (100 MB)日志文件大小的最大值logWriteBufferSize4194304 (4 MB)日志数据写入文件系统的最大缓存值verifyChecksumsfalse是否对从文件系统中读取的数据进行校验paranoidChecksfalse尽快对系统内部发生的存储错误进行标记indexFactoryorg.fusesource.leveldbjni.JniDBFactory, org.iq80.leveldb.impl.Iq80DBFactory在创建LevelDB索引时使用indexMaxOpenFiles1000可供索引使用的打开文件的数量indexBlockRestartInterval16Number keys between restart points for delta encoding of keys.indexWriteBufferSize6291456 (6 MB)内存中索引数据的最大值indexBlockSize4096 (4 K)每个数据块的索引数据大小indexCacheSize268435456 (256 MB)使用缓存索引块允许的最大内存indexCompressionsnappy适用于索引块的压缩类型logCompressionnone适用于日志记录的压缩类型5、 下面详细介绍一下如何将消息持久化到Mysql数据库中需要将mysql的驱动包放置到ActiveMQ的lib目录下修改activeMQ的配置文件:?123在配置文件中的broker节点外增加:?12345678从配置中可以看出数据库的名称是activemq,需要手动在MySql中建立这个数据库。然后重新启动activeMQ,会发现activemq多了三张表:1:activemq_acks2:activemq_lock3:activemq_msgs点到点类型Sender类:?12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.DeliveryMode;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.MessageProducer;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;public class Sender private static final int SEND_NUMBER =2000;public static void main(String args) / ConnectionFactory :连接工厂,JMS用它创建连接ConnectionFactory connectionFactory;/ Connection :JMS客户端到JMS Provider的连接Connection connection =null;/ Session:一个发送或接收消息的线程Session session;/ Destination :消息的目的地;消息发送给谁.Destination destination;/ MessageProducer:消息发送者MessageProducer producer;/ TextMessage message;/ 构造ConnectionFactory实例对象,此处采用ActiveMq的实现connectionFactory =new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,tcp:/localhost:61616);try/ 构造从工厂得到连接对象connection = connectionFactory.createConnection();/启动connection.start();/获取操作连接session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);/获取session,FirstQueue是一个服务器的queue destination = session.createQueue(FirstQueue);/ 得到消息生成者【发送者】producer = session.createProducer(destination);/设置不持久化producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);/构造消息sendMessage(session, producer);/mit();connection.close();catch(Exception e)e.printStackTrace();finallyif(null != connection)try connection.close();catch (JMSException e) / TODO Auto-generatedcatch blocke.printStackTrace();public static void sendMessage(Session session, MessageProducer producer)throws Exceptionfor(int i=1; i=SEND_NUMBER; i+)TextMessage message = session.createTextMessage(ActiveMQ发送消息+i);System.out.println(发送消息:ActiveMQ发送的消息+i);producer.send(message);Receiver类:?1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.MessageConsumer;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;public class Receiver public static void main(String args) / ConnectionFactory :连接工厂,JMS用它创建连接ConnectionFactory connectionFactory;/ Connection :JMS客户端到JMS Provider的连接Connection connection =null;/ Session:一个发送或接收消息的线程Session session;/ Destination :消息的目的地;消息发送给谁.Destination destination;/ 消费者,消息接收者MessageConsumer consumer;connectionFactory = newActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,tcp:/localhost:61616);try /得到连接对象connection =connectionFactory.createConnection();/ 启动connection.start();/ 获取操作连接session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);/ 创建Queuedestination = session.createQueue(FirstQueue);consumer =session.createConsumer(destination);while(true)/设置接收者接收消息的时间,为了便于测试,这里定为100sTextMessagemessage = (TextMessage)consumer.receive(100000);if(null != message)System.out.println(收到消息 +message.getText();else break;catch(Exception e)e.printStackTrace();finally try if (null != connection)connection.close();catch (Throwable ignore) 测试:测试一:A、 先运行Sender类,待运行完毕后,运行Receiver类B、 在此过程中activemq数据库的activemq_msgs表中没有数据C、 再次运行Receiver,消费不到任何信息测试二:A、 先运行Sender类B、 重启电脑C、 运行Receiver类,无任何信息被消费测试三:A、 把Sender类中的producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);改为producer.setDeliveryMode(DeliveryMode.PERSISTENT);B、 先运行Sender类,待运行完毕后,运行Receiver类C、 在此过程中activemq数据库的activemq_msgs表中有数据生成,运行完Receiver类后,数据清除测试四:A、 把Sender类中的producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);改为producer.setDeliveryMode(DeliveryMode.PERSISTENT);B、 运行Sender类C、 重启电脑D、 运行Receiver类,有消息被消费结论:通过以上测试,可以发现,在P2P类型中当DeliveryMode设置为NON_PERSISTENCE时,消息被保存在内存中,而当DeliveryMode设置为PERSISTENCE时,消息保存在broker的相应的文件或者数据库中。而且P2P中消息一旦被Consumer消费就从broker中删除。发布/订阅类型Sender类:?123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.DeliveryMode;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.MessageProducer;import javax.jms.Session;import javax.jms.TextMessage;import javax.jms.Topic;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;public class Sender private static final int SEND_NUMBER =100;public static void main(String args) / ConnectionFactory :连接工厂,JMS用它创建连接ConnectionFactory connectionFactory;/ Connection :JMS客户端到JMS Provider的连接Connection connection =null;/ Session:一个发送或接收消息的线程Session session;/ MessageProducer:消息发送者MessageProducer producer;/ TextMessage message;/ 构造ConnectionFactory实例对象,此处采用ActiveMq的实现connectionFactory =new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,tcp:/localhost:61616);try/得到连接对象connection = connectionFactory.createConnection();/启动connection.start();/获取操作连接session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);Topic topic = session.createTopic(MQ_test);/ 得到消息生成者【发送者】producer = session.createProducer(topic);/设置持久化producer.setDeliveryMode(DeliveryMode.PERSISTENT);/构造消息sendMessage(session, producer);/mit();connection.close();catch(Exception e)e.printStackTrace();finallyif(null != connection)try connection.close();catch (JMSException e) / TODO Auto-generatedcatch blocke.printStackTrace();public static void sendMessage(Session session, MessageProducer producer)throws Exceptionfor(int i=1; i=SEND_NUMBER; i+)TextMessage message = session.createTextMessage(ActiveMQ发送消息+i);System.out.println(发送消息:ActiveMQ发送的消息+i);producer.send(message);Receiver类:?12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.MessageConsumer;import javax.jms.Session;import javax.jms.TextMessage;import javax.jms.Topic;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;public class Receiver public static void main(String args) / ConnectionFactory :连接工厂,JMS用它创建连接ConnectionFactory connectionFactory;/ Conn

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论