RabbitMQ+Python入门经典:兔子和兔子窝(下)

日期: 2010-12-02 翻译:张聪 来源:TechTarget中国 英文

  在《RabbitMQ+Python入门经典:兔子和兔子窝(上)》中,我们介绍了AMQP/RabbitMQ和如何使用Python访问的教程,并说明所有的概念如何在Python当中被组合起来。下面我们将继续介绍相关内容。

  持久化这些小东西们

  你花了大量的时间来创建队列、交换机和绑定,然后,砰~服务器程序挂了。你的队列、交换机和绑定怎么样了?还有,放在队列里面但是尚未处理的消息们呢?

  放松~如果你是用默认参数构造的这一切的话,那么,他们,都,biu~,灰飞烟灭了。是的,RabbitMQ重启之后会干净的像个新生儿。你必须重做所有的一切,亡羊补牢,如何避免将来再度发生此类杯具?

  队列和交换机有一个创建时候指定的标志durable,直译叫做坚固的。durable的唯一含义就是具有这个标志的队列和交换机会在重启之后重新建立,它不表示说在队列当中的消息会在重启后恢复。那么如何才能做到不只是队列和交换机,还有消息都是持久的呢?

  但是首先一个问题是,你真的需要消息是持久的吗?对于一个需要在重启之后回复的消息来说,它需要被写入到磁盘上,而即使是最简单的磁盘操作也是要消耗时间的。如果和消息的内容相比,你更看重的是消息处理的速度,那么不要使用持久化的消息。不过对于我们@DigiTar来说,持久化很重要。

  当你将消息发布到交换机的时候,可以指定一个标志“Delivery Mode”(投递模式)。根据你使用的AMQP的库不同,指定这个标志的方法可能不太一样(我们后面会讨论如何用Python搞定)。简单的说,就是将 Delivery Mode设置成2,也就是持久的(persistent)即可。一般的AMQP库都是将Delivery Mode设置成1,也就是非持久的。所以要持久化消息的步骤如下:

  •   将交换机设成 durable。
  •   将队列设成 durable。
  •   将消息的 Delivery Mode 设置成2 。

  就这样,不是很复杂,起码没有造火箭复杂,不过也有可能犯点小错误。

  下面还要罗嗦一个东西……绑定(Bindings)怎么办?我们无法在创建绑定的时候设置成durable。没问题,如果你绑定了一个 durable的队列和一个durable的交换机,RabbitMQ会自动保留这个绑定。类似的,如果删除了某个队列或交换机(无论是不是 durable),依赖它的绑定都会自动删除。

  注意两点:

  RabbitMQ不允许你绑定一个非坚固(non-durable)的交换机和一个durable的队列。反之亦然。要想成功必须队列和交换机都是durable的。 一旦创建了队列和交换机,就不能修改其标志了。例如,如果创建了一个non-durable的队列,然后想把它改变成durable的,唯一的办法就是删除这个队列然后重现创建。因此,最好仔细检查创建的标志。

  开始喂蛇了~【译注】说喂蛇是因为Python的图标是条蛇。

  AMQP的一个空白地带是如何在Python当中使用。对于其他语言有一大坨材料。

以下是引用片段:
Java – http://www.rabbitmq.com/java-client.html 
.NET – http://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.5.0/rabbitmq-dotnet-client-1.5.0-user-guide.pdf 
Ruby – http://somic.org/2008/06/24/ruby-amqp-rabbitmq-example/ 

  但是对Python老兄来说,你需要花点时间来挖掘一下。所以我写了这个,这样别的家伙们就不需要经历我这种抓狂的过程了。

  首先,我们需要一个Python的AMQP库。有两个可选:

  •   py-amqplib——通用的AMQP
  •   txAMQP——使用 Twisted 框架的AMQP库,因此允许异步I/O。

  根据你的需求,py-amqplib或者txAMQP都是可以的。因为是基于Twisted的,txAMQP可以保证用异步IO构建超高性能的 AMQP程序。但是Twisted编程本身就是一个很大的主题……因此清晰起见,我们打算用 py-amqplib。更新:请参见 Esteve Fernandez关于txAMQP的使用和代码样例的回复 。

  AMQP支持在一个TCP连接上启用多个MQ通信channel,每个channel都可以被应用作为通信流。每个AMQP程序至少要有一个连接和一个channel。

以下是引用片段:
view plaincopy to clipboardprint?
<SPAN style=”FONT-WEIGHT: bold; COLOR: #ff7700″>from</SPAN>   
 amqplib <SPAN style=”FONT-WEIGHT: bold; COLOR: #ff7700″>import</SPAN>   
 client_0_8 <SPAN style=”FONT-WEIGHT: bold; COLOR: #ff7700″>as</SPAN>   
 amqp<BR>   
conn = amqp.<SPAN style=”COLOR: black”>Connection</SPAN>   
<SPAN style=”COLOR: black”>(</SPAN>   
host=<SPAN style=”COLOR: #483d8b”>”localhost:5672 “</SPAN>   
, userid=<SPAN style=”COLOR: #483d8b”>”guest”</SPAN>   
,<BR>   
password=<SPAN style=”COLOR: #483d8b”>”guest”</SPAN>   
, virtual_host=<SPAN style=”COLOR: #483d8b”>”/”</SPAN>   
, insist=<SPAN style=”COLOR: #008000″>False</SPAN>   
<SPAN style=”COLOR: black”>)</SPAN>   
<BR>   
chan = conn.<SPAN style=”COLOR: black”>channel</SPAN>   
<SPAN style=”COLOR: black”>(</SPAN>   
<SPAN style=”COLOR: black”>)</SPAN>  
from
 amqplib import
 client_0_8 as
 amqp
conn = amqp.Connection
(
host=”localhost:5672 ”
, userid=”guest”
,
password=”guest”
, virtual_host=”/”
, insist=False
)
chan = conn.channel
(
)

  每个channel都被分配了一个整数标识,自动由Connection()类的.channel()方法维护。或者,你可以使用.channel(x)来指定channel标识,其中x是你想要使用的channel标识。通常情况下,推荐使用.channel()方法来自动分配 channel标识,以便防止冲突。

  现在我们已经有了一个可以用的连接和channel。现在,我们的代码将分成两个应用,生产者(producer)和消费者(consumer)。我们先创建一个消费者程序,他会创建一个叫做“po_box”的队列和一个叫“sorting_room”的交换机:

以下是引用片段:
view plaincopy to clipboardprint?
chan.<SPAN style=”COLOR: black”>queue_declare</SPAN>   
<SPAN style=”COLOR: black”>(</SPAN>   
queue=<SPAN style=”COLOR: #483d8b”>”po_box”</SPAN>   
, durable=<SPAN style=”COLOR: #008000″>True</SPAN>   
, exclusive=<SPAN style=”COLOR: #008000″>False</SPAN>   
, auto_delete=<SPAN style=”COLOR: #008000″>False</SPAN>   
<SPAN style=”COLOR: black”>)</SPAN>   
<BR>   
chan.<SPAN style=”COLOR: black”>exchange_declare</SPAN>   
<SPAN style=”COLOR: black”>(</SPAN>   
exchange=<SPAN style=”COLOR: #483d8b”>”sorting_room”</SPAN>   
, <SPAN style=”COLOR: #008000″>type</SPAN>   
=<SPAN style=”COLOR: #483d8b”>”direct”</SPAN>   
, durable=<SPAN style=”COLOR: #008000″>True</SPAN>   
, auto_delete=<SPAN style=”COLOR: #008000″>False</SPAN>   
,<SPAN style=”COLOR: black”>)</SPAN>  
chan.queue_declare
(
queue=”po_box”
, durable=True
, exclusive=False
, auto_delete=False
)
chan.exchange_declare
(
exchange=”sorting_room”
, type
=”direct”
, durable=True
, auto_delete=False
,)

  这段代码干了啥?首先,它创建了一个名叫“po_box ”的队列,它是durable的(重启之后会重新建立),并且最后一个消费者断开的时候不会自动删除(auto_delete=False )。在创建durable的队列(或者交换机)的时候,将auto_delete设置成false是很重要的,否则队列将会在最后一个消费者断开的时候消失,与durable与否无关。如果将durable和auto_delete都设置成True,只有尚有消费者活动的队列可以在RabbitMQ意外崩溃的时候自动恢复。

  (你可以注意到了另一个标志,称为“exclusive”。如果设置成True,只有创建这个队列的消费者程序才允许连接到该队列。这种队列对于这个消费者程序是私有的)。

  还有另一个交换机声明,创建了一个名字叫“sorting_room”的交换机。auto_delete和durable的含义和队列是一样的。但是,.excange_declare() 还有另外一个参数叫做type,用来指定要创建的交换机的类型(如前面列出的): fanout , direct 和 topic .

  到此为止,你已经有了一个可以接收消息的队列和一个可以发送消息的交换机。不过我们需要创建一个绑定,把它们连接起来。

  chan.queue_bind(queue=”po_box”, exchange=”sorting_room”, routing_key=”jason”)这个绑定的过程非常直接。任何送到交换机“sorting_room ”的具有路由键“jason ” 的消息都被路由到名为“po_box ” 的队列。

  现在,你有两种方法从队列当中取出消息。第一个是调用chan.basic_get() ,主动从队列当中拉出下一个消息(如果队列当中没有消息,chan.basic_get()会返回None, 因此下面代码当中print msg.body 会在没有消息的时候崩掉):

以下是引用片段:
view plaincopy to clipboardprint?
msg = chan.<SPAN style=”COLOR: black”>basic_get</SPAN>   
<SPAN style=”COLOR: black”>(</SPAN>   
<SPAN style=”COLOR: #483d8b”>”po_box”</SPAN>   
<SPAN style=”COLOR: black”>)</SPAN>   
<BR>   
<SPAN style=”FONT-WEIGHT: bold; COLOR: #ff7700″>print</SPAN>   
 msg.<SPAN style=”COLOR: black”>body</SPAN>   
<BR>   
chan.<SPAN style=”COLOR: black”>basic_ack</SPAN>   
<SPAN style=”COLOR: black”>(</SPAN>   
msg.<SPAN style=”COLOR: black”>delivery_tag</SPAN>   
<SPAN style=”COLOR: black”>)</SPAN>  
msg = chan.basic_get
(
“po_box”
)
print
 msg.body
chan.basic_ack
(
msg.delivery_tag
)

  但是如果你想要应用程序在消息到达的时候立即得到通知怎么办?这种情况下不能使用chan.basic_get() ,你需要用chan.basic_consume() 注册一个新消息到达的回调。

以下是引用片段:
view plaincopy to clipboardprint?
<SPAN style=”FONT-WEIGHT: bold; COLOR: #ff7700″>def</SPAN>   
 recv_callback<SPAN style=”COLOR: black”>(</SPAN>   
msg<SPAN style=”COLOR: black”>)</SPAN>   
:<BR>   
    <SPAN style=”FONT-WEIGHT: bold; COLOR: #ff7700″>print</SPAN>   
 <SPAN style=”COLOR: #483d8b”>’Received: ‘</SPAN>   
 + msg.<SPAN style=”COLOR: black”>body</SPAN>   
<BR>   
chan.<SPAN style=”COLOR: black”>basic_consume</SPAN>   
<SPAN style=”COLOR: black”>(</SPAN>   
queue=<SPAN style=”COLOR: #483d8b”>’po_box'</SPAN>   
, no_ack=<SPAN style=”COLOR: #008000″>True</SPAN>   
,<BR>   
callback=recv_callback, consumer_tag=<SPAN style=”COLOR: #483d8b”>”testtag”</SPAN>   
<SPAN style=”COLOR: black”>)</SPAN>   
<BR>   
<SPAN style=”FONT-WEIGHT: bold; COLOR: #ff7700″>while</SPAN>   
 <SPAN style=”COLOR: #008000″>True</SPAN>   
:<BR>   
    chan.<SPAN style=”COLOR: black”>wait</SPAN>   
<SPAN style=”COLOR: black”>(</SPAN>   
<SPAN style=”COLOR: black”>)</SPAN>   
<BR>   
chan.<SPAN style=”COLOR: black”>basic_cancel</SPAN>   
<SPAN style=”COLOR: black”>(</SPAN>   
<SPAN style=”COLOR: #483d8b”>”testtag”</SPAN>   
<SPAN style=”COLOR: black”>)</SPAN>  
def
 recv_callback(
msg)
:
    print
 ‘Received: ‘
 + msg.body
chan.basic_consume
(
queue=’po_box’
, no_ack=True
,
callback=recv_callback, consumer_tag=”testtag”
)
while
 True
:
    chan.wait
(
)
chan.basic_cancel
(
“testtag”
)

  chan.wait()放在一个无限循环里面,这个函数会等待在队列上,直到下一个消息到达队列。chan.basic_cancel()用来注销该回调函数。参数consumer_tag 当中指定的字符串和chan.basic_consume()注册的一直。在这个例子当中chan.basic_cancel()不会被调用到,因为上面是个无限循环…… 不过你需要知道这个调用,所以我把它放在了代码里。

  需要注意的另一个东西是no_ack参数。这个参数可以传给chan.basic_get() 和chan.basic_consume(),默认是false。当从队列当中取出一个消息的时候,RabbitMQ需要应用显式地回馈说已经获取到了该消息。如果一段时间内不回馈,RabbitMQ 会将该消息重新分配给另外一个绑定在该队列上的消费者。另一种情况是消费者断开连接,但是获取到的消息没有回馈,则RabbitMQ同样重新分配。如果将no_ack 参数设置为true,则py-amqplib会为下一个AMQP请求添加一个no_ack属性,告诉AMQP服务器不需要等待回馈。但是,大多数时候,你也许想要自己手工发送回馈,例如,需要在回馈之前将消息存入数据库。回馈通常是通过调用chan.basic_ack() 方法,使用消息的delivery_tag 属性作为参数。参见chan.basic_get() 的实例代码。

  好了,这就是消费者的全部代码。(下载:amqp_consumer.py )

  不过没有人发送消息的话,要消费者何用?所以需要一个生产者。下面的代码示例表明如何将一个简单消息发送到交换区“sorting_room ”,并且标记为路由键“jason ” :

以下是引用片段:
view plaincopy to clipboardprint?
msg = amqp.<SPAN style=”COLOR: black”>Message</SPAN>   
<SPAN style=”COLOR: black”>(</SPAN>   
<SPAN style=”COLOR: #483d8b”>”Test message!”</SPAN>   
<SPAN style=”COLOR: black”>)</SPAN>   
<BR>   
msg.<SPAN style=”COLOR: black”>properties</SPAN>   
<SPAN style=”COLOR: black”>[</SPAN>   
<SPAN style=”COLOR: #483d8b”>”delivery_mode”</SPAN>   
<SPAN style=”COLOR: black”>]</SPAN>   
 = <SPAN style=”COLOR: #ff4500″>2</SPAN>   
<BR>   
chan.<SPAN style=”COLOR: black”>basic_publish</SPAN>   
<SPAN style=”COLOR: black”>(</SPAN>   
msg,exchange=<SPAN style=”COLOR: #483d8b”>”sorting_room”</SPAN>   
,routing_key=<SPAN style=”COLOR: #483d8b”>”jason”</SPAN>   
<SPAN style=”COLOR: black”>)</SPAN>  
msg = amqp.Message
(
“Test message!”
)
msg.properties
[
“delivery_mode”
]
 = 2
chan.basic_publish
(
msg,exchange=”sorting_room”
,routing_key=”jason”
)

  你也许注意到我们设置消息的delivery_mode 属性为2,因为队列和交换机都设置为durable的,这个设置将保证消息能够持久化,也就是说,当它还没有送达消费者之前如果RabbitMQ重启则它能够被恢复。

  剩下的最后一件事情(生产者和消费者都需要调用的)是关闭channel和连接:

以下是引用片段:
view plaincopy to clipboardprint?
chan.<SPAN style=”COLOR: black”>close</SPAN>   
<SPAN style=”COLOR: black”>(</SPAN>   
<SPAN style=”COLOR: black”>)</SPAN>   
<BR>   
conn.<SPAN style=”COLOR: black”>close</SPAN>   
<SPAN style=”COLOR: black”>(</SPAN>   
<SPAN style=”COLOR: black”>)</SPAN>  
chan.close
(
)
conn.close
(
)

  很简单吧。(下载:amqp_publisher.py )

  来真实地跑一下吧……

  现在我们已经写好了生产者和消费者,让他们跑起来吧。假设你的RabbitMQ在localhost上安装并且运行。

  打开一个终端,执行python ./amqp_consumer.py 让消费者运行,并且创建队列、交换机和绑定。

  然后在另一个终端运行python ./amqp_publisher.py “AMQP rocks.” 。如果一切良好,你应该能够在第一个终端看到输出的消息。

  付诸使用吧

  我知道这个教程是非常粗浅的关于AMQP/RabbitMQ和如何使用Python访问的教程。希望这个可以说明所有的概念如何在Python当中被组合起来。如果你发现任何错误,请联系原作者(williamsjj@digitar.com ) 【译注:如果是翻译问题请联系译者】。同时,我很高兴回答我知道的问题。【译注:译者也是一样的】。接下来是,集群化(clustering)!不过我需要先把它弄懂再说。

我们一直都在努力坚持原创.......请不要一声不吭,就悄悄拿走。

我原创,你原创,我们的内容世界才会更加精彩!

【所有原创内容版权均属TechTarget,欢迎大家转发分享。但未经授权,严禁任何媒体(平面媒体、网络媒体、自媒体等)以及微信公众号复制、转载、摘编或以其他方式进行使用。】

微信公众号

TechTarget微信公众号二维码

TechTarget

官方微博

TechTarget中国官方微博二维码

TechTarget中国

翻译

张聪
张聪

Vision Solutions解决方案架构师

相关推荐