




版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
1、RabbitMQ消息队列目录RabbitMQ基础概念详细介绍4引言4RabbitMQ简介4总结11(一) : Detailed Introduction 详细介绍111. 历史112. 应用场景113. 系统架构124. 进一步的细节阐明144.1 使用ack确认Message的正确传递144.2 Reject a message154.3 Creating a queue154.4 Exchanges164.5 Virtual hosts16(二):”Hello, World“161. 环境配置172. Sending183. Receiving194. 最终版本205. 最终运行21(三)
2、:任务分发机制211. 准备222. Round-robin dispatching 循环分发233. Message acknowledgment 消息确认244. Message durability消息持久化265. Fair dispatch 公平分发276. 最终版本28(四):分发到多Consumer(Publish/Subscribe)301. Exchanges302. Temporary queues313. Bindings绑定324. 最终版本33(五):Routing 消息路由351. Bindings绑定352. Direct exchange363. Multipl
3、e bindings364. Emitting logs375. Subscribing376. 最终版本38(六):使用主题进行消息分发401. Topic exchange402. 代码实现423. 运行和结果43(七):适用于云计算集群的远程调用(RPC)441. 客户端接口 Client interface442. 回调函数队列 Callback queue452.1 Message properties453. 相关id Correlation id454. 总结465. 最终实现46(九):Publisher的消息确认机制501. 事务机制 VS Publisher Confirm
4、512. 消息在什么时候确认523. 编程实现53(10)实例54(1)基础验证541、Windows下RabbitMQ的安装542、介绍553、Java入门实例56(二)队列581、 准备592、 消息应答(message acknowledgments)633、 消息持久化(Message durability)664、公平转发(Fair dispatch)675、完整的代码68RabbitMQ基础概念详细介绍引言你是否遇到过两个(多个)系统间需要通过定时任务来同步某些数据?你是否在为异构系统的不同进程间相互调用、通讯的问题而苦恼、挣扎?如果是,那么恭喜你,消息服务让你可以很轻松地解决这些
5、问题。消息服务擅长于解决多系统、异构系统间的数据交换(消息通知/通讯)问题,你也可以把它用于系统间服务的相互调用(RPC)。本文将要介绍的RabbitMQ就是当前最主流的消息中间件之一。RabbitMQ简介AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多
6、种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。下面将重点介绍RabbitMQ中的一些基础概念,了解了这些概念,是使用好RabbitMQ的基础。ConnectionFactory、Connection、ChannelConnectionFactory、Connection、Channel都是RabbitMQ对外提供的API中最基本的对象。Connection是RabbitMQ的socket链接,它封装了socket协议相关部分逻
7、辑。ConnectionFactory为Connection的制造工厂。Channel是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。QueueQueue(队列)是RabbitMQ的内部对象,用于存储消息,用下图表示。RabbitMQ中的消息都只能存储在Queue中,生产者(下图中的P)生产消息并最终投递到Queue中,消费者(下图中的C)可以从Queue中获取消息并消费。多个消费者可以订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消
8、费者进行处理,而不是每个消费者都收到所有的消息并处理。Message acknowledgment在实际应用中,可能会发生消费者收到Queue中的消息,但没有处理完成就宕机(或出现其他意外)的情况,这种情况下就可能会导致消息丢失。为了避免这种情况发生,我们可以要求消费者在消费完消息后发送一个回执给RabbitMQ,RabbitMQ收到消息回执(Message acknowledgment)后才将该消息从Queue中移除;如果RabbitMQ没有收到回执并检测到消费者的RabbitMQ连接断开,则RabbitMQ会将该消息发送给其他消费者(如果存在多个消费者)进行处理。这里不存在timeout概
9、念,一个消费者处理消息时间再长也不会导致该消息被发送给其他消费者,除非它的RabbitMQ连接断开。这里会产生另外一个问题,如果我们的开发人员在处理完业务逻辑后,忘记发送回执给RabbitMQ,这将会导致严重的bugQueue中堆积的消息会越来越多;消费者重启后会重复消费这些消息并重复执行业务逻辑另外pub message是没有ack的。Message durability如果我们希望即使在RabbitMQ服务重启的情况下,也不会丢失消息,我们可以将Queue与Message都设置为可持久化的(durable),这样可以保证绝大部分情况下我们的RabbitMQ消息不会丢失。但依然解决不了小概率
10、丢失事件的发生(比如RabbitMQ服务器已经接收到生产者的消息,但还没来得及持久化该消息时RabbitMQ服务器就断电了),如果我们需要对这种小概率事件也要管理起来,那么我们要用到事务。由于这里仅为RabbitMQ的简单介绍,所以这里将不讲解RabbitMQ相关的事务。Prefetch count前面我们讲到如果有多个消费者同时订阅同一个Queue中的消息,Queue中的消息会被平摊给多个消费者。这时如果每个消息的处理时间不同,就有可能会导致某些消费者一直在忙,而另外一些消费者很快就处理完手头工作并一直空闲的情况。我们可以通过设置prefetchCount来限制Queue每次发送给每个消费者
11、的消息数,比如我们设置prefetchCount=1,则Queue每次给每个消费者发送一条消息;消费者处理完这条消息后Queue会再给该消费者发送一条消息。Exchange在上一节我们看到生产者将消息投递到Queue中,实际上这在RabbitMQ中这种事情永远都不会发生。实际的情况是,生产者将消息发送到Exchange(交换器,下图中的X),由Exchange将消息路由到一个或多个Queue中(或者丢弃)。Exchange是按照什么逻辑将消息路由到Queue的?这个将在Binding一节介绍。RabbitMQ中的Exchange有四种类型,不同的类型有着不同的路由策略,这将在Exchange
12、Types一节介绍。routing key生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则,而这个routing key需要与Exchange Type及binding key联合使用才能最终生效。在Exchange Type与binding key固定的情况下(在正常使用时一般这些内容都是固定配置好的),我们的生产者就可以在发送消息给Exchange时,通过指定routing key来决定消息流向哪里。RabbitMQ为routing key设定的长度限制为255 bytes。BindingRabbitMQ中通过Binding将Exc
13、hange与Queue关联起来,这样RabbitMQ就知道如何正确地将消息路由到指定的Queue了。Binding key在绑定(Binding)Exchange与Queue的同时,一般会指定一个binding key;消费者将消息发送给Exchange时,一般会指定一个routing key;当binding key与routing key相匹配时,消息将会被路由到对应的Queue中。这个将在Exchange Types章节会列举实际的例子加以说明。在绑定多个Queue到同一个Exchange的时候,这些Binding允许使用相同的binding key。binding key 并不是在所有
14、情况下都生效,它依赖于Exchange Type,比如fanout类型的Exchange就会无视binding key,而是将消息路由到所有绑定到该Exchange的Queue。Exchange TypesRabbitMQ常用的Exchange Type有fanout、direct、topic、headers这四种(AMQP规范里还提到两种Exchange Type,分别为system与自定义,这里不予以描述),下面分别进行介绍。fanoutfanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。上图中,生产者(P)发送到E
15、xchange(X)的所有消息都会路由到图中的两个Queue,并最终被两个消费者(C1与C2)消费。directdirect类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中。以上图的配置为例,我们以routingKey=”error”发送消息到Exchange,则消息会路由到Queue1(amqp.gen-S9b,这是由RabbitMQ自动生成的Queue名称)和Queue2(amqp.gen-Agl);如果我们以routingKey=”info”或routingKey=”warning”来发送消息,则消息只会路由
16、到Queue2。如果我们以其他routingKey发送消息,则消息不会路由到这两个Queue中。topic前面讲到direct类型的Exchange路由规则是完全匹配binding key与routing key,但这种严格的匹配方式在很多情况下不能满足实际业务需求。topic类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中,但这里的匹配规则有些不同,它约定:· routing key为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符
17、串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”· binding key与routing key一样也是句点号“. ”分隔的字符串· binding key中可以存在两种特殊字符“*”与“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)以上图中的配置为例,routingKey=”quick.orange.rabbit”的消息会同时路由到Q1与Q2,routingKey=”lazy.orange.fox”的消息会路由到Q1,routingKey=”lazy.brown
18、.fox”的消息会路由到Q2,routingKey=”lazy.pink.rabbit”的消息会路由到Q2(只会投递给Q2一次,虽然这个routingKey与Q2的两个bindingKey都匹配);routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit”的消息将会被丢弃,因为它们没有匹配任何bindingKey。headersheaders类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性
19、进行匹配。在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。该类型的Exchange没有用到过(不过也应该很有用武之地),所以不做介绍。RPCMQ本身是基于异步的消息处理,前面的示例中所有的生产者(P)将消息发送到RabbitMQ后不会知道消费者(C)处理成功或者失败(甚至连有没有消费者来处理这条消息都不知道)。但实际的应用场景中,我们很可能需要一些
20、同步处理,需要同步等待服务端将我的消息处理完成后再进行下一步处理。这相当于RPC(Remote Procedure Call,远程过程调用)。在RabbitMQ中也支持RPC。RabbitMQ中实现RPC的机制是:· 客户端发送请求(消息)时,在消息的属性(MessageProperties,在AMQP协议中定义了14中properties,这些属性会随着消息一起发送)中设置两个值replyTo(一个Queue名称,用于告诉服务器处理完成后将通知我的消息发送到这个Queue中)和correlationId(此次请求的标识号,服务器处理完成后需要将此属性返还,客户端将根据这个id了解哪
21、条请求被成功执行了或执行失败)· 服务器端收到消息并处理· 服务器端处理完消息后,将生成一条应答消息到replyTo指定的Queue,同时带上correlationId属性· 客户端之前已订阅replyTo指定的Queue,从中收到服务器的应答消息后,根据其中的correlationId属性分析哪条请求被执行了,根据执行结果进行后续业务处理总结本文介绍了RabbitMQ中个人认为最重要的概念,充分利用RabbitMQ提供的这些功能就可以处理我们绝大部分的异步业务了。(一) : Detailed Introduction 详细介绍1. 历史 &
22、#160; RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现。AMQP 的出现其实也是应了广大人民群众的需求,虽然在同步消息通讯的世界里有很多公开标准(如 COBAR的 IIOP ,或者是 SOAP 等),但是在异步消息处理中却不是这样,只有大企业有一些商业实现(如微软的 MSMQ ,IBM 的 Websphere MQ 等),因此,在 2006 年的 6 月,Cisco 、Redhat、iMatix 等联合制定了 AMQP 的公开标准。 RabbitMQ是由RabbitMQ Techn
23、ologies Ltd开发并且提供商业支持的。该公司在2010年4月被SpringSource(VMWare的一个部门)收购。在2013年5月被并入Pivotal。其实VMWare,Pivotal和EMC本质上是一家的。不同的是VMWare是独立上市子公司,而Pivotal是整合了EMC的某些资源,现在并没有上市。 RabbitMQ的官网是2. 应用场景 言归正传。RabbitMQ,或者说AMQP解决了什么问题,或者说它的应用场景是什么?
24、 对于一个大型的软件系统来说,它会有很多的组件或者说模块或者说子系统或者(subsystem or Component or submodule)。那么这些模块的如何通信?这和传统的IPC有很大的区别。传统的IPC很多都是在单一系统上的,模块耦合性很大,不适合扩展(Scalability);如果使用socket那么不同的模块的确可以部署到不同的机器上,但是还是有很多问题需要解决。比如: 1)信息的发送者和接收者如何维持这个连接,如果一方的连接中断,这期间的数据如何方式丢失? 2)如何降低发送者和接收者的耦合度? 3)如何让Priority高的接收者先接到数据?
25、160;4)如何做到load balance?有效均衡接收者的负载? 5)如何有效的将数据发送到相关的接收者?也就是说将接收者subscribe 不同的数据,如何做有效的filter。 6)如何做到可扩展,甚至将这个通信模块发到cluster上? 7)如何保证接收者接收到了完整,正确的数据? AMDQ协议解决了以上的问题,而RabbitMQ实现了AMQP。3. 系统架构 成为系统架构可能不太合适,可能叫应用场景的系统架构更合适。 这个
26、系统架构图版权属于sunjun041640。 RabbitMQ Server: 也叫broker server,它不是运送食物的卡车,而是一种传输服务。原话是RabbitMQisnt a food truck, its a delivery service. 他的角色就是维护一条从Producer到Consumer的路线,保证数据能够按照指定的方式进行传输。但是这个保证也不是100%的保证,但是对于普通的应用来说这已经足够了。当然对于商业系统来说,可以再做一层数据一致性的guard,就可以彻底保证系统的一致性了。
27、 Client A & B: 也叫Producer,数据的发送方。createmessages and publish (send) them to a broker server (RabbitMQ).一个Message有两个部分:payload(有效载荷)和label(标签)。payload顾名思义就是传输的数据。label是exchange的名字或者说是一个tag,它描述了payload,而且RabbitMQ也是通过这个label来决定把这个Message发给哪个Consumer。AMQP仅仅描述了label,而RabbitMQ决定了如何使用这个label的规则。
28、60; Client 1,2,3:也叫Consumer,数据的接收方。Consumersattach to a broker server (RabbitMQ) and subscribe to a queue。把queue比作是一个有名字的邮箱。当有Message到达某个邮箱后,RabbitMQ把它发送给它的某个订阅者即Consumer。当然可能会把同一个Message发送给很多的Consumer。在这个Message中,只有payload,label已经被删掉了。对于Consumer来说,它是不知道谁发送的这个信息的。就是协议本身不支持。但是当然了如果Pr
29、oducer发送的payload包含了Producer的信息就另当别论了。 对于一个数据从Producer到Consumer的正确传递,还有三个概念需要明确:exchanges, queues and bindings。 Exchanges are where producers publish their messages. Queuesare where t
30、he messages end up and are received by consumers Bindings are how the messages get routed from the exchange to particular queues. 还有几个概念是上述图中没有标明的,那就是Connection(连接),Channel(通道,频道)。 Connection: 就是一个TCP的连接。Producer和Consum
31、er都是通过TCP连接到RabbitMQ Server的。以后我们可以看到,程序的起始处就是建立这个TCP连接。 Channels: 虚拟连接。它建立在上述的TCP连接中。数据流动都是在Channel中进行的。也就是说,一般情况是程序起始建立TCP连接,第二步就是建立这个Channel。 那么,为什么使用Channel,而不是直接使用TCP连接? 对于OS来说,建立和关闭TCP连接是有代价的,频繁的建立关闭TCP连接对于系统的性能有很大的影响,而且TCP的连接数也有限制
32、,这也限制了系统处理高并发的能力。但是,在TCP连接中建立Channel是没有上述代价的。对于Producer或者Consumer来说,可以并发的使用多个Channel进行Publish或者Receive。有实验表明,1s的数据可以Publish10K的数据包。当然对于不同的硬件环境,不同的数据包大小这个数据肯定不一样,但是我只想说明,对于普通的Consumer或者Producer来说,这已经足够了。如果不够用,你考虑的应该是如何细化split你的设计。4. 进一步的细节阐明4.1 使用ack确认Message的正确传递 默认情况下,如果Message 已经
33、被某个Consumer正确的接收到了,那么该Message就会被从queue中移除。当然也可以让同一个Message发送到很多的Consumer。 如果一个queue没被任何的Consumer Subscribe(订阅),那么,如果这个queue有数据到达,那么这个数据会被cache,不会被丢弃。当有Consumer时,这个数据会被立即发送到这个Consumer,这个数据被Consumer正确收到时,这个数据就被从queue中删除。 那么什么是正确收到呢?通过ack。每个Message都要被acknowledg
34、ed(确认,ack)。我们可以显示的在程序中去ack,也可以自动的ack。如果有数据没有被ack,那么: RabbitMQ Server会把这个信息发送到下一个Consumer。 如果这个app有bug,忘记了ack,那么RabbitMQ Server不会再发送数据给它,因为Server认为这个Consumer处理能力有限。 而且ack的机制可以起到限流的作用(Benefitto throttling):在Consumer处理完成数据后发送ack,甚至在额外的延时后发送ack,将有效的b
35、alance Consumer的load。 当然对于实际的例子,比如我们可能会对某些数据进行merge,比如merge 4s内的数据,然后sleep 4s后再获取数据。特别是在监听系统的state,我们不希望所有的state实时的传递上去,而是希望有一定的延时。这样可以减少某些IO,而且终端用户也不会感觉到。4.2 Reject a message 有两种方式,第一种的Reject可以让RabbitMQ Server将该Message 发送到下一个Consumer。第二种是从queue中立即删除该Message。4.3 Creating a qu
36、eue Consumer和Procuder都可以通过 queue.declare 创建queue。对于某个Channel来说,Consumer不能declare一个queue,却订阅其他的queue。当然也可以创建私有的queue。这样只有app本身才可以使用这个queue。queue也可以自动删除,被标为auto-delete的queue在最后一个Consumer unsubscribe后就会被自动删除。那么如果是创建一个已经存在的queue呢?那么不会有任何的影响。需要注意的是没有任何的影响,也就是说第二次创建如果参数和
37、第一次不一样,那么该操作虽然成功,但是queue的属性并不会被修改。 那么谁应该负责创建这个queue呢?是Consumer,还是Producer?如果queue不存在,当然Consumer不会得到任何的Message。但是如果queue不存在,那么Producer Publish的Message会被丢弃。所以,还是为了数据不丢失,Consumer和Producer都try to create the queue!反正不管怎么样,这个接口都不会出问题。 queue对load balance的处理是完美的。对于多个Consum
38、er来说,RabbitMQ 使用循环的方式(round-robin)的方式均衡的发送给不同的Consumer。4.4 Exchanges 从架构图可以看出,Procuder Publish的Message进入了Exchange。接着通过“routing keys”, RabbitMQ会找到应该把这个Message放到哪个queue里。queue也是通过这个routing keys来做的绑定。 有三种类型的Exchanges:direct, fanout,topic。 每个实现
39、了不同的路由算法(routing algorithm)。· Direct exchange: 如果 routing key 匹配, 那么Message就会被传递到相应的queue中。其实在queue创建时,它会自动的以queue的名字作为routing key来绑定那个exchange。· Fanout exchange: 会向响应的queue广播。·
40、160; Topic exchange: 对key进行模式匹配,比如ab*可以传递到所有ab*的queue。4.5 Virtual hosts 每个virtual host本质上都是一个RabbitMQ Server,拥有它自己的queue,exchagne,和bings rule等等。这保证了你可以在多个不同的application中使用RabbitMQ。 接下来我会使用Python来说明RabbitMQ的使用方法。请移步 RabbitMQ消息队列(二):”Hello, Wor
41、ld“ (二):”Hello, World“ 本文将使用Python(pika 0.9.8)实现从Producer到Consumer传递数据”Hello, World“。 首先复习一下上篇所学:RabbitMQ实现了AMQP定义的消息队列。它实现的功能”非常简单“:从Producer接收数据然后传递到Consumer。它能保证多并发,数据安全传递,可扩展。 和任何的Hello world一样,它们都不复杂。我们将会设计两个程序,一个发送Hello world,另一个接收这个数据
42、并且打印到屏幕。 整体的设计如下图:1. 环境配置RabbitMQ 实现了AMQP。因此,我们需要安装AMPQ的library。幸运的是对于多种编程语言都有实现。我们可以使用以下lib的任何一个:· py-amqplib · txAMQP· pika在这里我们将使用pika. 可以通过 pip 包管理工具来安装:plain view plaincopy1. $ sudo pip install pika=0.9.8
43、 这个安装依赖于pip和git-core。· On Ubuntu:· $ sudo apt-get install python-pip git-core· On Debian:· $ sudo apt-get install python-setuptools git-core· $ sudo easy_install pip· On Windows:To install easy_install, run the MS Windows Installer for setuptools· &
44、gt; easy_install pip· > pip install pika=0.9.82. Sending第一个program send.py:发送Hello world 到queue。正如我们在上篇文章提到的,你程序的第一句话就是建立连接,第二句话就是创建channel:python view plaincopy1. #!/usr/bin/env python 2. import pika 3. 4. connection = pika.Blo
45、ckingConnection(pika.ConnectionParameters( 5. 'localhost') 6. channel = connection.channel() 创建连接传入的参数就是RabbitMQ Server的ip或者name。关于谁创建queue,上篇文章也讨论过:Producer和C
46、onsumer都应该去创建。接下来我们创建名字为hello的queue:cpp view plaincopy1. channel.queue_declare(queue='hello') 创建了channel,我们可以通过相应的命令来list queue:plain view plaincopy1. $ sudo rabbitmqctl list_queues 2. Listing queues . 3. hello
47、160; 0 4. .done. 现在我们已经准备好了发送了。从架构图可以看出,Producer只能发送到exchange,它是不能直接发送到queue的。现在我们使用默认的exchange(名字是空字符)。这个默认的exchange允许我们发送给指定的queue。routing_key就是指定的queue名字。python view plaincopy1. channel.basic_publish(exchange='', 2.
48、 routing_key='hello', 3. body='Hello Wo
49、rld!') 4. print " x Sent 'Hello World!'" 退出前别忘了关闭connection。python view plaincopy1. connection.close() 3. Receiving第二个program receive.py 将从queue中获取Message并且打印到屏幕。第一步还是创建connection。第二步创建channel。第三步创建queue,n
50、ame = hello:python view plaincopy1. channel.queue_declare(queue='hello') 接下来要subscribe了。在这之前,需要声明一个回调函数来处理接收到的数据。python view plaincopy1. def callback(ch, method, properties, body): 2. print " x Re
51、ceived %r" % (body,) subscribe:python view plaincopy1. channel.basic_consume(callback, 2. queue='hello',
52、60;3. no_ack=True) 最后,准备好无限循环监听吧:python view plaincopy1. print ' * Waiting for messages. To exit press
53、CTRL+C' 2. channel.start_consuming() 4. 最终版本send.py:python view plaincopy1. #!/usr/bin/env python 2. import pika 3. 4. connection = pika.BlockingConnection(pika.ConnectionParameters( 5.
54、 host='localhost') 6. channel = connection.channel() 7. 8. channel.queue_declare(queue='hello') 9. 10. channel.basic_publish(exchange='', 11.
55、60; routing_key='hello', 12. body='Hello
56、0;World!') 13. print " x Sent 'Hello World!'" 14. connection.close() receive.py:python view plaincopy1. #!/usr/bin/env python 2. import pika 3. 4. connection =&
57、#160;pika.BlockingConnection(pika.ConnectionParameters( 5. host='localhost') 6. channel = connection.channel() 7. 8. channel.queue_declare(queue='hello') 9. 1
58、0. print ' * Waiting for messages. To exit press CTRL+C' 11. 12. def callback(ch, method, properties, body): 13. print " x Received %r"
59、0;% (body,) 14. 15. channel.basic_consume(callback, 16. queue='hello', 17.
60、; no_ack=True) 18. 19. channel.start_consuming() 5. 最终运行先运行 send.py program:python view plaincopy1. $ python send.py 2. x Sent
61、0;'Hello World!' send.py 每次运行完都会停止。注意:现在数据已经存到queue里了。接收它:python view plaincopy1. $ python receive.py 2. * Waiting for messages. To exit press CTRL+C 3. x Received 'Hello World!
62、9; 接下来,就要奉上更接近实际环境的例子。取决与我的课余时间啊。(三):任务分发机制<= RabbitMQ消息队列(二):”Hello, World“ 在上篇文章中,我们解决了从发送端(Producer)向接收端(Consumer)发送“Hello World”的问题。在实际的应用场景中,这是远远不够的。从本篇文章开始,我们将结合更加实际的应用场景来讲解更多的高级用法。 当有Consumer需要大量的运算时,RabbitMQ Server需要一定的分发机制来balance每个Consumer的load。试
63、想一下,对于web application来说,在一个很多的HTTP request里是没有时间来处理复杂的运算的,只能通过后台的一些工作线程来完成。接下来我们分布讲解。 应用场景就是RabbitMQ Server会将queue的Message分发给不同的Consumer以处理计算密集型的任务:1. 准备 在上一篇文章中,我们简单在Message中包含了一个字符串"Hello World"。现在为了是Consumer做的是计算密集型的工作,那就不能简单的字符串了。在现实应用中,Consumer有可能做的是一个图片的
64、resize,或者是pdf文件的渲染或者内容提取。但是作为Demo,还是用字符串模拟吧:通过字符串中的.的数量来决定计算的复杂度,每个.都会消耗1s,即sleep(1)。 还是复用上篇文章中的code,根据“计算密集型”做一下简单的修改,为了辨别,我们把send.py 的名字换成new_task.pypython view plaincopy1. import sys 2. 3. message = ' '.join(sys.argv1:)
65、0;or "Hello World!" 4. channel.basic_publish(exchange='', 5. routing_key='hello', 6.
66、160; body=message) 7. print " x Sent %r" % (message,) 同样的道理,把receive.py的名字换成worker.py,并且根据Message中的.的数量进行计算密集型模拟:python
67、view plaincopy1. import time 2. 3. def callback(ch, method, properties, body): 4. print " x Received %r" % (body,) 5. time.sleep( body.co
68、unt('.') ) 6. print " x Done" 2. Round-robin dispatching 循环分发 RabbitMQ的分发机制非常适合扩展,而且它是专门为并发程序设计的。如果现在load加重,那么只需要创建更多的Consumer来进行任务处理即可。当然了,对于负载还要加大怎么办?我没有遇到过这种情况,那就可以创建多个vir
69、tual Host,细化不同的通信类别了。 首先开启两个Consumer,即运行两个worker.py。Console1:python view plaincopy1. shell1$ python worker.py 2. * Waiting for messages. To exit press CTRL+C Consule2:python view plaincopy1. sh
70、ell2$ python worker.py 2. * Waiting for messages. To exit press CTRL+C Producer new_task.py要Publish Message了:python view plaincopy1. shell3$ python new_task.py First message. 2. shell3$
71、python new_task.py Second message. 3. shell3$ python new_task.py Third message. 4. shell3$ python new_task.py Fourth message. 5. shell3$ python new_task.py Fifth message. 注意一下:.代表的s
72、leep(1)。接着开一下Consumer worker.py收到了什么:Console1:python view plaincopy1. shell1$ python worker.py 2. * Waiting for messages. To exit press CTRL+C 3. x Received 'First message.' 4.
73、;x Received 'Third message.' 5. x Received 'Fifth message.' Console2:python view plaincopy1. shell2$ python worker.py 2. * Waiting for messages. To exit press CTRL+C 3. x Received 'Second message.' 4. x Received 'Fourth message.' 默认情况下,RabbitMQ 会顺序的分发每个Message。当每个收到ack后,会将该Message删除,然后将下一个Message分发到下一个Consumer。这种分发方式叫做round-robi
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 2025年模具设计工程师考试试题及答案
- 2025年家庭教育指导师考试题及答案
- 2025年货币政策与宏观经济管理能力的考试题及答案
- 2025年电子信息工程师考试试卷及答案
- 2025年公共卫生安全管理考试试题及答案
- 2025年甘肃省天水市秦安县中医医院招聘编外人员34人笔试参考题库及参考答案详解1套
- 物资采购公司管理制度
- 物资集散中心管理制度
- 特殊人员羁押管理制度
- 特殊工种人员管理制度
- 上海高一数学教材电子版
- 数字通信系统课件
- 内功四经内功真经真本全书
- 2021年度中国一线城市出行平台调研报告
- 贵州省毕节市各县区乡镇行政村村庄村名明细居民村民委员会
- 幼儿园小班社会:《红绿灯》 课件
- isa-381g站用变接地保护测控装置技术使用说明书南网版v3
- 六年级劳动教育7.青椒炒肉丝(课件)
- 油气藏类型、典型的相图特征和识别实例
- 《议程设置理论》
- 取力器的设计设计说明书
评论
0/150
提交评论