三个主流消息中间件区别.docx_第1页
三个主流消息中间件区别.docx_第2页
三个主流消息中间件区别.docx_第3页
三个主流消息中间件区别.docx_第4页
三个主流消息中间件区别.docx_第5页
已阅读5页,还剩11页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

市场上的消息中间件:mom4jmom4j是一个完全实现JMS1.1规范的消息中间件并且向下兼容JMS1.0与1.02.它提供了自己的消息处理存储使它独立于关系数据与语言,所以它的客户端可以用任何语言开发.OpenJMSOpenJMS是一个开源的Java Message Service API 1.0.2 规范的实现,它包含有以下特性: *. 它既支持点到点(point-to-point)(PTP)模型和发布/订阅(Pub/Sub)模型。 *. 支持同步与异步消息发送 *. JDBC持久性管理使用数据库表来存储消息 *. 可视化管理界面。 *. Applet支持。 *. 能够与Jakarta Tomcat这样的Servlet容器结合。 *. 支持RMI, TCP, HTTP 与SSL协议。 *. 客户端验证 *. 提供可靠消息传输、事务和消息过滤UberMQUberMQ完全实现了Java Message Service 规范。UberMQ是因为现有的许多JMS提供商已经违背了分布式计算的核心原则:快速与简单而开发的。Hermes JMS利用它提供的Swing UI可以很好的实现监控JMS providers。ActiveMQActiveMQ是一个开放源码基于Apache 2.0 licenced 发布并实现了JMS 1.1。它能够与Geronimo,轻量级容器和任Java应用程序无缝的给合。SomnifugiSomnifugi使得工作在同一个java虚拟机中的线程能实现消息互发。MantaRayMantaRay基于peer-2-peer 技术。它具有以下特性: 1.它既支持点对点(point-to-point)的域,又支持发布/订阅(publish/subscribe)类型的域。 2.并且提供对下列类型的支持:经认可的消息传递,事务型消息的传递,一致性消息和具有持久性的订阅者支持。 3.消息过滤体制。 4.能与WebLogic and WebSphere 给合。 5.支持TCP, UDP 与 HTTP传输协。PresumoPresumo也是一个实现Java Message Service API的JMS消息中间件。JORAMJORAM一个类似于openJMS分布在ObjectWeb之下的JMS消息中间件。JMS4SpreadJMS4Spread是一个消息系统.它部分地实现了Java消息服务(JMS) API.Open Message QueueOpen Message Queue是Sun Java System Message Queue的一个开源版本。Open message queue是一个企业级,可升级,非常成熟的消息服务器。它为面向消息的系统集成提供一套完整的JMS(Java Message Service )实现。由于Open MQ源自Sun的Java Message Queue,所以其具有Java System Message Queue拥有的所有特性,功能和性能 。FFMQFFMQ是一个轻量级,高性能,快速的Native JMS1.1开源实现。支持SSL远程连接,自动防故障的持久化机制,基于模板定义目的地(Destination),采用模式匹配自动创建目的地(Destination)。 MQSSave/MQSLoadMQSSave是一个简单的Java程序,能够读取MQSeries队列的消息保存至文件中。而MQSLoad是一相反的Java程序,能够读取文件中的消息然后加载至MQSeries队列中。HornetQHornetQ是一个支持集群和多种协议,可嵌入、高性能的异步消息系统。HornetQ完全支持JMS,HornetQ不但支持JMS1.1 API同时也定义属于自己的消息API,这可以最大限度的提升HornetQ的性能和灵活性。在不久的将来更多的协议将被HornetQ支持。 HornetQ拥有超高的性能,HornetQ在持久化消息方面的性能可以轻易的超于其它常见的非持久化消息引擎的性能。当然,HornetQ的非持久化消息的性能会表现的更好! HornetQ完全使用POJO,纯POJO的设计让HornetQ可以尽可能少的以来第三方的包。从设计模式来说,HornetQ这样的设计入侵性也最小。HornetQ既可以独立运行,也可以与其它Java应用程序服务器集成使用。 HornetQ拥有完善的错误处理机制,HornetQ提供服务器复制和故障自动转移功能,该功能可以消除消息丢失或多个重复信息导致服务器出错。 HornetQ提供了灵活的集群功能,通过创建HornetQ集群,您可以享受到到消息的负载均衡带来的性能提升。您也可以通过集群,组成一个全球性的消息网络。您也可以灵活的配置消息路由。 HornetQ拥有强大的管理功能。HornetQ提供了大量的管理API和监控服务器。它可以无缝的与应用程序服务器整合,并共同工作在一个HA环境中。Apache QpidApache Qpid是最新开放企业信息标准AMQP(Advanced Message Queuing Protocol)的一个开源实现。Java版实现完全支持JMS标准,可运行在任意Java平台上。此外Qpid还提供AMQP Client APIs的各种语言实现包括: C+ Java, fully conformant with JMS 1.1 C# .NET, 0-10 using WCF Ruby PythonSpring AMQPSpring AMQP是一个用于替换原先Spring JMS支持的消息解决方案。提供收发消息的模板,还支持基于消息驱动的POJO。用法和配置与Spring中对JMS的支持一样。这个项目包含Java和.NET两个版本。 KafkaKafka是一个高吞吐量分布式消息系统。linkedin开源的kafka。 Kafka就跟这个名字一样,设计非常独特。首先,kafka的开发者们认为不需要在内存里缓存什么数据,操作系统的文件缓存已经足够完善和强大,只要你不搞随机写,顺序读写的性能是非常高效的。kafka的数据只会顺序append,数据的删除策略是累积到一定程度或者超过一定时间再删除。Kafka另一个独特的地方是将消费者信息保存在客户端而不是MQ服务器,这样服务器就不用记录消息的投递过程,每个客户端都自己知道自己下一次应该从什么地方什么位置读取消息,消息的投递过程也是采用客户端主动pull的模型,这样大大减轻了服务器的负担。Kafka还强调减少数据的序列化和拷贝开销,它会将一些消息组织成Message Set做批量存储和发送,并且客户端在pull数据的时候,尽量以zero-copy的方式传输,利用sendfile(对应java里的 FileChannel.transferTo/transferFrom)这样的高级IO函数来减少拷贝开销。可见,kafka是一个精心设计,特定于某些应用的MQ系统,这种偏向特定领域的MQ系统我估计会越来越多,垂直化的产品策略值的考虑。 play-rabbitmq这是Play! Framework开发框架的一个扩展模块。用于生产和消费RabbitMQ消息。 队列消息系统 FQueueFQueue是一个高性能、基于磁盘持久存储的队列消息系统。兼容memcached协议,能用memcached的语言都可以良好的与它通信。 FQueue为你提供一个不需要特别优化,高性能的一个消息系统。 特性1. 基于磁盘持久化存储。 2. 支持memcached协议。 3. 支持多队列,密码验证功能。 4. 高性能,能达到数十万qps。 5. 低内存消耗。100-300M内存即可工作得很好。 6. 高效率IO读写算法,IO效率高。 7. 纯JAVA代码。支持进程内JVM级别的直接调用。 8. 在不需要强顺序的场景下,支持多机负载均衡。 不支持1. 不支持topic方式的订阅功能。 2. 不支持主从复制。 主流消息中间件及选型推荐:ActiveMQ:(还有升级版叫Apollo, 由于转向Scala,原来的架构都要改掉。但是只支持storm协议,不支持JMS),在网络上别人反映,消息量越来越大时,当出现消息堆积时,性能争骤下降,主要卡在磁盘写入,用了硬件加速,也还是不能忍受。消息中间件的技术选型心得RabbitMQ、ActiveMQ和ZeroMQ作者:chszs,转载需注明。博客主页:/chszsRabbitMQ、ActiveMQ和ZeroMQ都是极好的消息中间件,但是我们在项目中该选择哪个更适合呢?很多开发者面临这个烦恼。下面我会对这三个消息中间件做一个比较,看了后你们就心中有数了。RabbitMQ是AMQP协议领先的一个实现,它实现了代理(Broker)架构,意味着消息在发送到客户端之前可以在中央节点上排队。此特性使得RabbitMQ易于使用和部署,适宜于很多场景如路由、负载均衡或消息持久化等,用消息队列只需几行代码即可搞定。但是,这使得它的可扩展性差,速度较慢,因为中央节点增加了延迟,消息封装后也比较大。ZeroMQ是一个非常轻量级的消息系统,专门为高吞吐量/低延迟的场景开发,在金融界的应用中经常可以发现它。与RabbitMQ相比,ZeroMQ支持许多高级消息场景,但是你必须实现ZeroMQ框架中的各个块(比如Socket或Device等)。ZeroMQ非常灵活,但是你必须学习它的80页的手册(如果你要写一个分布式系统,一定要阅读它)。ActiveMQ居于两者之间,类似于ZemoMQ,它可以部署于代理模式和P2P模式。类似于RabbitMQ,它易于实现高级场景,而且只需付出低消耗。它被誉为消息中间件的“瑞士军刀”。要注意一点,ActiveMQ的下一代产品为Apollo。最终,这三个产品:1. 都有客户端API且支持多种编程语言;2. 都有大量的文档;3. 都提供了积极的支持。ActiveMQRabbitMQZeroMQ遵循规范JMS1.1及j2ee1.4AMPQ-架构模型消息代理架构Broker消息代理架构Brokerc/s架构实现语言JavaErlangc/c+支持消息协议StompAMPQ、Stomp等-主要推动力量Apache、RedhatLshift、Vmware、SpringSourceIMatix支持编程语言C、Java、PythonC、Java、PythonC、Java、Python编程复杂度复杂简单中等持久化支持支持,不支持第三方数据库发送端缓存性能一般一般高三个经典消息中间件的比较 对于消息中间件,绝大多数熟悉的是 MQ(IBM公司出品),这是目前使用最广泛的中间件产品。还有两个也比较流行,他们是JMS和RV。JMS即JAVA消息服务(Java Message Service)应用程序接口是一个JAVA平台中关于面向消息中间件的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信,是一个与具体平台无关的API。TIBCO Rendezvous(或称为TIBCO RV)也是一种中间件,具有发布/订阅(Publish/Subscribe)、基于主题寻址(Subject-Based Addressing) 和自定义数据信息(Self-Describing Data Messages)等专利技术功能,使不同应用平台上的信息在一个共享的虚拟总线Information Bus(TIB)上进行传输交换。 先总结一下消息中间件的功能,以上的三类中间件都实现了这些功能。 ?0l 实现消息的异步发送接收,发布订阅,使得两端的应用解耦(减少或解除应用程序之间的耦合度)。 ?0l 实现消息持久化机制,保证消息可靠性传输。 ?0l 优化网络传输,支持断点续传。 、区别之是否分布式 RV 和 MQ 都是分布式结构的, 和 JMS 消息中间件的星型结构不同。分布式消息中间件的Sever在应用环境里都会部署多个,彼此互联,没有主备之分。JMS消息中间件的应用部署一般都是主备两个Server,消息的发送和接收应用平时和主Server相连,有问题时切换到备 Server,主备Server共用公共的存储设备来保存消息。 、区别之是否接收端主动 MQ 和 JMS 消息中间件都采用消息接收端主动接收消息的方式。消息从发送端发出后,首先会缓存到Server上,接收端应用发起一个接收消息的请求,Server把消息作为应答返回给接收端。接收端不执行接收动作,消息就会一直在Server上保存。RV 和这两种消息中间件都不同,使用的是发送端主动的消息推送模式。消息从发送端发出后,并不在Server上缓存,Server只做路由把消息推送给消息接收端。消息接收端只要连接上Server,订阅要接收的消息,这些消息就会源源不断地从Server那里推送过来,消息先缓存到接收客户端的队列里,接收端应用再从队列里取消息。RV的最大特点就是把一个数据生产者的数据以最快的速度推送到多个数据消费者那里。RV从金融市场数据系统的需求中产生而来,正是这些特点使得它在证券系统得到最广泛的应用。 、区别之是否便于一对多分布MQ 和 JMS 消息中间件在 IP 层都使用点对点的一对一传输方式,而RV在IP层使用的是广播或者组播的一对多方式。使用广播或者组播可以直接实现一对多的发布订阅形式,发布应用(发送端)发布消息到RV网络上,这些消息会广播到网络的每一个节点上,每一个订阅应用(接收端)都会收到这些消息。而MQ和JMS实现一对多发布订阅就要麻烦的很多,都是在Server按消息的Topic(主题)来缓存消息,为每一个订阅者拷贝每一条消息的引用。当所有订阅者都从 Server 上取走某条消息,这条消息才可在Server上删除。 、区别之是否在传输层使用TCP MQ和JMS 消息中间件不论是Server和Server的通信,还是Server和Client的通信,在传输层都使用 TCP 协议,保证消息传输连接的可靠性。而RV在Server和Server之间的通信使用了UDP协议,牺牲可靠性来达到高实时性的需求。RV有两种可靠性级别,RV Reliable 和 RVCM 。 RV Reliable 模式使用基于UDP增加了一定可靠机制的TRDP协议,在一定范围内具有消息包的检查和重传机制,保证了一定程度的消息可靠性,但不保证消息不丢失。RVCM 在 RV Reliable 基础上更进一步,在消息级别具有消息确认和重传机制,可以保证消息绝对不丢失。对于长度在 1500个字节以下的消息, RV Reliable 发布消息能达到150万笔消息每秒,接收也能达到50万笔消息每秒,传输消息的性能非常好。 、区别之是否用Subject做收发端的匹配 RV使用消息的Subject 来做消息发送端和接收端的匹配。RV不在Server端缓存消息,也没有Server端的Queue和Topic。每个消息都有 Subject,Subject格式是多个字符串的串接,没有数目或者长度的限制。比如在市场数据系统里,行情数据消息的Subject里包含金融品种的名字,这样的 Subject 可以有上百万个。消息订阅端可以细到只接收某个市场的某个品种的行情数据,所以RV能使用细粒度的消息分类。MQ和JMS消息中间件在Server端按Queue 和Topic来缓存消息,消息的发送端和接收端按Queue和Topic的名字来匹配。每个Server能创建的Queue和Topic是有限的,这也就限制了使用 MQ和JMS消息中间件构建的应用,这些应用在做消息收发处理的时候只能使用粗粒度的消息分类。 、区别之中间件结构7、区别之典型应用场景实例 MQ已知的典型应用场景是商业银行向人民银行报送监管信息; JMS已知的典型应用场景是异步发送邮件; RV已知的典型应用场景是金融市场数据提供商(如路透、彭博、道琼斯)向银行、大型企业提供证券、外汇等金融市场信息。淘宝开源框架Metaq:淘宝吸收其它消息中间件(Apache Kafka)而自己开发的一款消息中间件.原名(Metamorphosis)MetaQ作为一个分布式的消息中间件,需要依赖zookeeper,对于一些规模不大、单机应用的场景,我个人并不是特别支持尝试用MetaQ,因为多一个依赖系统,其实就是多一份风险,在这些简单场景下,可能类似memcacheq、kestrel甚至redis等轻量级MQ就非常合适。而MetaQ一开始就是为大规模分布式系统设计的,如果不当使用,可能没有带来好处,反而多出一堆问题。开发者需要根据自己面对的场景,团队的技术能力,做出一个合适的选择。MetaQ初探 博客分类: 开源MetaQ(全称Metamorphosis)是一个高性能、高可用、可扩展的分布式消息中间件,,MetaQ具有消息存储顺序写、吞吐量大和支持本地和XA事务等特性,适用于大吞吐量、顺序消息、广播和日志数据传输等场景,METAQ在阿里巴巴各个子公司被广泛应用,每天转发250亿+条消息。主要应用于异步解耦,Mysql数据复制,收集日志等场景。总体结构主要特点 生产者、服务器和消费者都可分布式 消息存储顺序写 性能极高,吞吐量大 支持消息顺序 支持本地和XA事务 客户端pull,随机读,利用sendfile系统调用,zero-copy ,批量拉数据 支持消费端事务 支持消息广播模式 支持异步发送消息 支持http协议 支持消息重试和recover 数据迁移、扩容对用户透明 消费状态保存在客户端 支持同步和异步复制两种HA主要特性数据完整性消息生产者发送的消息,meta服务器收到后在做必要的校验和检查之后的第一件事就是写入磁盘,写入成功之后返回应答给生产者,生产者发送消息返回SendResult,如果isSuccess返回为true,则表示消息已经确认发送到服务器并被服务器接收存储。整个发送过程是一个同步的过程。保证消息送达服务器并返回结果。因此,可以确认每条发送结果为成功的消息服务器都是写入磁盘的。写入磁盘,不意味着数据落到磁盘设备上,毕竟我们还隔着一层os,os对写有缓冲。Meta有两个特性来保证数据落到磁盘上:每1000条(可配置),即强制调用一次force来写入磁盘设备。每隔10秒(可配置),强制调用一次force来写入磁盘设备。因此,Meta通过配置可保证在异常情况下(如磁盘掉电)10秒内最多丢失1000条消息。当然通过参数调整你甚至可以在掉电情况下不丢失任何消息。虽然消息在发送到broker之后立即写入磁盘才返回客户端告诉消息生产者消息发送成功,通过unflushThreshold和unflushInterval两个参数的控制,可以保证单机消息数据的安全性,只要机器的磁盘没有永久损坏,消息总可以在重启后恢复并正常投递给消费者们。但是,如果遇到了磁盘永久损坏或者数据文件永久损坏的情况,那么该broker上的消息数据将可能永久丢失。为了防止这种情况的发生,一个可行的方案就是将消息数据复制到多台机器,类似mysql的主从复制功能(异步复制和同步功能)数据可靠性服务器通常组织为一个集群,一条从生产者过来的消息可能按照路由规则存储到集群中的某台机器。Meta已经实现高可用的HA方案,类似mysql的同步和异步复制,将一台meta服务器的数据完整复制到另一台slave服务器,并且slave服务器还提供消费功能(同步复制不提供消费)。消息的消费者是一条接着一条地消费消息,只有在成功消费一条消息后才会接着消费下一条。如果在消费某条消息失败(如异常),则会尝试重试消费这条消息(默认最大5次),超过最大次数后仍然无法消费,则将消息存储在消费者的本地磁盘,由后台线程继续做重试。而主线程继续往后走,消费后续的消息。因此,只有在MessageListener确认成功消费一条消息后,meta的消费者才会继续消费另一条消息。由此来保证消息的可靠消费。消费者的另一个可靠性的关键点是offset的存储,也就是拉取数据的偏移量。我们目前提供了以下几种存储方案zookeeper,默认存储在zoopkeeper上,zookeeper通过集群来保证数据的安全性。mysql,可以连接到您使用的mysql数据库,只要建立一张特定的表来存储。完全由数据库来保证数据的可靠性。file,文件存储,将offset信息存储在消费者的本地文件中。Offset会定期保存,并且在每次重新负载均衡前都会强制保存一次下载、配置、运行首先需要安装配置Zookeeper,如果不知道怎么配置的,看我的文章,有一章讲解过!/p/meta-queue/downloads/list选择最新版本的服务器并下载到本地解压缩文件,bin目录存放的脚本文件,日志在logs目录,而配置文件主要是conf目录下server.ini,lib存放所有的依赖jar包。进入bin/env.sh,修改JAVA_HOME,JMX等变量。根据需要修改conf/server.ini文件(列出了所有的配置):zk.zkEnable=true是否注册到zk,默认为truezk.zkConnect=localhost:2181 zk的服务器列表zk.zkSessionTimeoutMs=30000 zk心跳超时,单位毫秒,默认30秒 zk.zkSessionTimeoutMs=30000 zk.zkConnectionTimeoutMs=30000 zk连接超时时间,单位毫秒,默认30秒zk.zkSyncTimeMs=5000 zk数据同步时间,单位毫秒,默认5秒 brokerId:服务器ID(必须是集群内唯一) serverPort:服务器端口hostName:默认将取本机IP (多机网卡,需要指明)dataLogPath:日志数据文件路径,默认跟dataPath一样dataPath:于指定默认的数据存储路径(慎重设置,默认在user.home/meta下) numPartitions:默认topic的分区数目(慎重设置)maxSegmentSize:单个文件的最大大小,实际会超过此值,默认1GmaxTransferSize:传输给客户端每次最大的缓冲区大小,默认1MunflushThreshold:最大允许的未flush间隔时间,毫秒,默认10秒putProcessThreadCount:;处理put请求线程数,默认cpus*10deletePolicy=delete,168(数据删除策略,默认超过7天即删除,这里的168是小时,10s表示10秒,10m表示10分钟,10h表示10小时,默认为小时)deleteWhen: 何时执行删除策略的cron表达式,默认是0 0 6,18 * * ?,也就是每天的早晚6点执行处理策略。deleteWhen: 删除策略的执行时间,cron表达式maxCheckpoints: 最大保存事务checkpoint数目,默认为3checkpointInterval: 事务checkpoint时间间隔,单位毫秒,默认1小时(3600000)maxTxTimeoutTimerCapacity=30000最大事务超时事件数,用于监控事务超时maxTxTimeoutInSeconds=60最大事务超时时间,单位秒flushTxLogAtCommit=1事务日志的同步设置,0表示让操作系统决定,1表示每次commit都同步,2表示每隔1秒同步一次,此参数严重影响事务性能,可根据你需要的性能和可靠性之间权衡做出一个合理的选择。通常建议设置为2,表示每隔1秒刷盘一次,也就是最多丢失一秒内的运行时事务。这样的可靠级别对大多数服务是足够的。最安全的当然是设置为1,但是将严重影响事务性能。而0的安全级别最低。安全级别上 1=20,而性能则是0 = 2 1。 diamondZKDataId=metamorphosis.zkConfig zk在diamond中配置存储的dataIddiamondZKGroup=DEFAULT_GROUP zk在diamond中配置存储的groupacceptPublish: 是否接收消息,默认为true;如果为false,则不会注册发送信息到zookeeper上,客户端当然无法发送消息到该broker。本参数可以被后续的topic配置覆盖。acceptSubscribe: 与acceptPublish类似,默认也为true;如果为false,则不会注册消费信息到zookeeper上,消费者无法发现该broker,当然无法从该broker消费消息。本参数可以被后续的topic配置覆盖。 unflushThreshold: 每隔多少条消息做一次磁盘sync,强制将更改的数据刷入磁盘。默认为1000。也就是说在掉电情况下,最多允许丢失1000条消息。可设置为0,强制每次写入都sync。在设置为0的情况下,服务器会自动启用group commit技术,将多个消息合并成一次sync来提升IO性能。经过测试,group commit情况下消息发送者的TPS没有受到太大影响,但是服务端的负载会上升很多。unflushInterval: 间隔多少毫秒定期做一次磁盘sync,默认是10秒。也就是说在服务器掉电情况下,最多丢失10秒内发送过来的消息。不可设置为小于或者等于0JAVA客户端代码生产者:Java代码 1. duct;2. 3. importjava.io.BufferedReader;4. importjava.io.InputStreamReader;5. 6. importcom.taobao.metamorphosis.Message;7. importcom.taobao.metamorphosis.client.MessageSessionFactory;8. importcom.taobao.metamorphosis.client.MetaClientConfig;9. importcom.taobao.metamorphosis.client.MetaMessageSessionFactory;10. ducer.MessageProducer;11. ducer.SendResult;12. importcom.taobao.metamorphosis.utils.ZkUtils.ZKConfig;13. publicclassProducts14. publicstaticvoidmain(Stringargs)throwsException15. finalMetaClientConfigmetaClientConfig=newMetaClientConfig();16. finalZKConfigzkConfig=newZKConfig();17. zkConfig.zkConnect=1:2181;18. metaClientConfig.setZkConfig(zkConfig);19. /由这个工厂创建生产者或者消费者20. /1.服务的查找和发现,通过diamond和zookeeper帮你查找日常的meta服务器地址列表21. /2.连接的创建和销毁,自动创建和销毁到meta服务器的连接,并做连接复用,也就是到同一台meta的服务器在一个工厂内只维持一个连接。22. /3.消息消费者的消息存储和恢复,后续我们会谈到这一点。23. /4.协调和管理各种资源,包括创建的生产者和消费者的。24. MessageSessionFactorysessionFactory=newMetaMessageSessionFactory(metaClientConfig);25. /消息生产者的接口,MessageProducer是线程安全的,MessageProducer创建的代价昂贵,每次都需要通过zk26. /查找服务器并创建tcp长连接,通过它来发送消息,每个消息对象都是Message类的实例,Message表示一个消息对象,它包含这么几个属性:27. /id:Long型的消息id,消息的唯一id,系统自动产生,用户无法设置,在发送成功后由服务器返回,发送失败则为0。28. /topic:消息的主题,订阅者订阅该主题即可接收发送到该主题下的消息,生产者通过指定发布的topic查找到需要连接的服务器地址,必须。29. /data:消息的有效载荷,二进制数据,也就是消息内容,meta永远不会修改消息内容,你发送出去是什么样子,接收到就是什么样子。30. /消息内容通常限制在1M以内,我的建议是最好不要发送超过上百K的消息,必须。数据是否压缩也完全取决于用户。31. /attribute:消息属性,一个字符串,可选。发送者可设置消息属性来让消费者过滤。32. MessageProducerproducer=sessionFactory.createProducer();33. finalStringtopic=test;34. producer.publish(topic);35. BufferedReaderreader=newBufferedReader(newInputStreamReader(System.in);36. Stringline=qiujinyong;37. while(line=reader.readLine()!=null)38. /sendmessage39. SendResultsendResult=producer.sendMessage(newMessage(topic,line.getBytes();40. /checkresult41. if(!sendResult.isSuccess()42. System.err.println(Sendmessagefailed,errormessage:+sendResult.getErrorMessage();43. 44. else45. System.out.println(Sendmessagesuccessfully,sentto+sendResult.getPartition();46. 47. 48. 49. package duct;import java.io.BufferedReader;import java.io.InputStreamReader;import com.taobao.metamorphosis.Message;import com.taobao.metamorphosis.client.MessageSessionFactory;import com.taobao.metamorphosis.client.MetaClientConfig;import com.taobao.metamorphosis.client.MetaMessageSessionFactory;import ducer.MessageProducer;import ducer.SendResult;import com.taobao.metamorphosis.utils.ZkUtils.ZKConfig;public class Products public static void main(String args) throws Exception final MetaClientConfig metaClientConfig = new MetaClientConfig(); final ZKConfig zkConfig = new ZKConfig(); zkConfig.zkConnect = 1:2181; metaClientConfig.setZkConfig(zkConfig); / 由这个工厂创建生产者或者消费者 /1.服务的查找和发现,通过diamond和zookeeper帮你查找日常的meta服务器地址列表 /2.连接的创建和销毁,自动创建和销毁到meta服务器的连接,并做连接复用,也就是到同一台meta的服务器在一个工厂内只维持一个连接。 /3.消息消费者的消息存储和恢复,后续我们会谈到这一点。 /4.协调和管理各种资源,包括创建的生产者和消费者的。 MessageSessionFactory sessionFactory = new MetaMessageSessionFactory(metaClientConfig); /消息生产者的接口,MessageProducer是线程安全的,MessageProducer创建的代价昂贵,每次都需要通过zk /查找服务器并创建tcp长连接,通过它来发送消息,每个消息对象都是Message类的实例,Message表示一个消息对象,它包含这么几个属性: /id: Long型的消息id,消息的唯一id,系统自动产生,用户无法设置,在发送成功后由服务器返回,发送失败则为0。 /topic: 消息的主题,订阅者订阅该主题即可接收发送到该主题下的消息,生产者通过指定发布的topic查找到需要连接的服务器地址,必须。 /data: 消息的有效载荷,二进制数据,也就是消息内容,meta永远不会修改消息内容,你发送出去是什么样子,接收到就是什么样子。 /消息内容通常限制在1M以内,我的建议是最好不要发送超过上百K的消息,必须。数据是否压缩也完全取决于用户。 /attribute: 消息属性,一个字符串,可选。发送者可设置消息属性来让消费者过滤。 MessageProducer producer = sessionFactory.createProducer(); final String topic = test; producer.publish(topic); BufferedReader reader = new BufferedReader(new InputStreamReader(System.in); String line = qiujinyong; while (line = reader.readLine() != null) / send message SendResult sendResult = producer.sendMessage(new Message(topic, line.getBytes(); / check result if (!sendResult.isSuccess() System.err.println(Send message failed,error message: + sendResult.getErrorMessage(); else System.out.println(Send message successfully,sent to + sendResult.getPartition(); 消费者:Java代码 1. packagecom.metaq.consum;2. 3. importjava.util.concurrent.Executor;4. 5. importcom.taobao.metamorphosis.Message;6. importcom.taobao.metamorphosis.client.MessageSessionFactory;7. importcom.taobao.metamorphosis.client.MetaClientConfig;8. importcom.taobao.metamorphosis.client.MetaMessageSessionFactory;9. importcom.taobao.metamorphosis.client.consumer.ConsumerConfig;10. importcom.taobao.metamorphosis.client.consumer.MessageConsumer;11. importcom.taobao.metamorphosis.client.consumer.MessageListener;12. importcom.taobao.metamorphosis.utils.ZkUtils.ZKConfig;13. 14. publicclassAsyncConsum15. publicstaticvoidmain(Stringargs)throwsException16. finalMetaClientConfigmetaClientConfig=newMetaClientConfig();17. finalZKConfigzkConfig=newZKConfig();18. zkConfig.zkConnect=1:2181;19. metaClientConfig.setZkConfig(zkConfig);20. MessageSessionFactorysessionFactory=newMetaMessageSessionFactory(metaClientConfig);21. finalStringtopic=test;22. finalStringgroup=

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论