工作队列
在第一节我们写了程序来向命名队列发送和接收消息 。在本节我们会创建一个工作队列(Work Queue)用来在多个工人(worker)中分发时间消耗型任务(time-consuming tasks)。
工作队列(又叫做: Task Queues)背后的主体思想是 避免立刻去执行耗时任务并且等待它们完成。 相反我们可以安排这样的任务稍后执行. 我们可以把任务封装成一个消息并发送到队列中. 一个在后台运行的工人进程会接收任务并最终执行工作。当你使很多工人(workers)程序运行时,多个任务就会由它们共同承担。
这个概念在web应用中尤其有用,因为在一次短期的HTTP请求中处理复杂任务几乎是不可能的。
准备
在前一节我们发送了消息 “Hello World!”. 现在我们会发送一个代表复杂任务的字符串. 目前我们没有一个真实情境下的任务,像重置图片大小或者pdf文件渲染,所以我们就做一个伪装,假装我们很忙就行了:通过time.sleep()方法的使用,我们让字符串中存在的点(.)的数量代表任务的复杂性,一个点占用一个工作的一秒钟。例如,“Hello…”会耗用三秒钟。
我们将会稍微修改先前的 send.py 代码, 允许从命令行发送任意的消息. 这个程序会安排任务给我们的工作队列,所以重命名为new_task.py:
1
2
3
4
5
6
7
8
9
10
11 1import sys
2
3message = ' '.join(sys.argv[1:]) or "Hello World!"
4channel.basic_publish(exchange='',
5 routing_key='task_queue',
6 body=message,
7 properties=pika.BasicProperties(
8 delivery_mode = 2, # make message #persistent
9 ))
10print(" [x] Sent %r" % message)
11
我们之前的 receive.py 脚本也需要做些改变: 假装让消息体中的每个点”.”耗费一秒钟的工作。它需要从队列中提取消息并且完成任务 ,我们把它命名为worker.py:
1
2
3
4
5
6
7 1import time
2
3def callback(ch, method, properties, body):
4 print(" [x] Received %r" % body)
5 time.sleep(body.count(b'.'))
6 print(" [x] Done")
7
轮询派发(Round-robin dispatching)
使用任务队列的一个优势是简化并行任务的能力。如果我们正在建立一个后台记录的任务,只需要多添加些工人(worker),这很容易做到。
首先我们同时运行起两个worker.py脚本,它们都会从队列中获取消息,到底是怎么回事呢,我们来看一下 。
你需要打开三个控制台,两个运行worker.py脚本。这两个控制台会成为我们的两个消费者–C1和C2。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 1# shell 1
2python worker.py
3# => [*] Waiting for messages. To exit press CTRL+C
4
5# shell 2
6python worker.py
7# => [*] Waiting for messages. To exit press CTRL+C
8
9# shell 3
10python new_task.py First message.
11python new_task.py Second message..
12python new_task.py Third message...
13python new_task.py Fourth message....
14python new_task.py Fifth message.....
15
下图为在我的Ubuntu终端上的运行结果:
shell1
shell2
shell3
消息通知
RabbitMQ会默认把每条消息按次序发送给下一个消费者,平均每个消费者会获取到相同数量的消息,这种分发消息的方式就是轮询(round-robin),你可以使用三个或者更多工人试一下效果。
做一件任务需要耗费数秒钟的时间。你可能疑惑如果一个消费者开展了一个长时间任务,但只完成了一部分时就死掉了,这时候会发生什么呢? 就我们当前的代码来说,一旦RabbitMQ把消息传递给了它的客户,RabbitMQ会立刻从内存中把这条消息删除掉,这样的话如果你杀死掉一个工人进程,我们就会丢掉它正在处理的这条消息。我们也会丢掉所有派发给这个特定工人进程的还有没被处理的消息。
但我们不想丢掉任何任务,如果一个工人进程死掉了,我们希望任务会被传递给另一个工人。
为了确保消息没有丢,RabbitMQ支持消息通知机制(message acknowledgments)。一条通知(ack)会从消费者处返回来告知RabbitMQ特定的消息已经被接收,被处理并且RabbitMQ可以删掉它。
如果一个消费者挂了(它的渠道(channel)被关闭,连接被关闭或者TCP连接丢失)但没有发送通知,会理解为消息没有被完整地处理并且会重新把它推入队列。这时如果有其他消费者存在,它会迅速重新把它传递给其他消费者。这样的话你就可以确定不会有消息被丢掉,哪怕是工人进程意外挂了。
不会出现任何的消息超时问题,当消费者挂掉RabbitMQ会重新发送消息即便处理一条消息花费了很长很长时间。
消息通知默认是打开的。在前面的例子中我们通过设置no_ack=True 显式地关闭了他们flag. 是时候把它拿掉了,并且一旦完成了一个任务就让工人发送一条通知。
1
2
3
4
5
6
7
8
9 1def callback(ch, method, properties, body):
2 print " [x] Received %r" % (body,)
3 time.sleep( body.count('.') )
4 print " [x] Done"
5 ch.basic_ack(delivery_tag = method.delivery_tag)
6
7channel.basic_consume(callback,
8 queue='task_queue')
9
使用上面的代码我们可以确保什么也不会丢失,即便你通过CTRL+C退出了一个正在处理消息的工人进程。工人进程挂掉后,所有未返回通知的消息都会被重新发送。
忘了通知
一个常见错误是我们忘了basic_ack ,这看上去是个小错误,
但后果很严重。当你退出客户端时消息会重新发送(看上去像是随机发送),但RabbitMQ会吃掉越来越多的内存,因为它不会释放任何未返回通知的消息。调试这种类型的错误你可以使用rabbitmqctl打印messages_unacknowledged字段
sudo rabbitmqctl list_queues name messages_ready
messages_unacknowledged在 Windows上, 不用 sudo:
rabbitmqctl.bat list_queues name messages_ready
messages_unacknowledged
消息持久化(durability)
我们已经了解如何确保即便消费者死掉任务也不会丢失,但是如果RabbitMQ服务停止我们的任务仍然会丢失。
当RabbitMQ退出或崩溃时,它会遗忘掉队列和消息,除非你告诉它不要这样做。确保消息不会丢失我们有两件事需要做:把队列和消息都标记为持久化的。
首先,我们确保RabbitMQ不会丢失我们的队列,为了达到这个目的需要把它声明为持久化的:
1
2 1channel.queue_declare(queue='hello', durable=True)
2
就这条命令自身来说它是正确的,但在我们的设置中它无法正常工作。因为我们已经定义了一个叫做hello的非持久化的队列。RabbitMQ 不允许你使用不同的参数重新定义一个已经存在的队列并且会向任何试图那样做的程序返回一个错误。 但有一个变通方案(workaround)-我们用不同的名字声明一个队列,例如 task_queue:
1
2 1channel.queue_declare(queue='task_queue', durable=True)
2
这queue_declare 的改变 需要应用到生产者和消费者代码上面(其实我在前面早已经这样做了)
这样我们确定task_queue 队列不会被丢掉即便 RabbitMQ 重启。 现在我们需要标记我们的消息为持久化——通过提供一个值为2的delivery_mode 属性。
1
2
3
4
5
6
7 1channel.basic_publish(exchange='',
2 routing_key="task_queue",
3 body=message,
4 properties=pika.BasicProperties(
5 delivery_mode = 2,
6 ))
7
公平派发
你可能已经注意到派发过程仍然不太合适。例如有两个工人的情况, 当所有编号为偶数的消息是重量级,奇数消息是轻量级时,一个工人进程会持续繁忙,另一个却没做什么工作。好吧,RabbitMQ对此一无所知,并且继续若无其事地派发消息。
发生这种情况是因为当消息进入队列时,RabbitMQ只是进行派发,它不会查看一个消费者的未返回通知的数量。它只是忙目地把第n条消息派发给第n条消费者。
为了应对这种情况,我们可以使用basic.qos方法,设置prefetch_count=1 。这会告诉 RabbitMQ 不要同时给一个工人超过一条消息。或者换句话说,在一个工人处理完先前的消息并且返回通知前不要给他派发新的消息。相反的,它会把消息派发给下一个不忙的工人。
1
2 1channel.basic_qos(prefetch_count=1)
2
注意队列大小
如果所有工人都在繁忙中, 你的队列可能会被填满. 你会留意到这种情况,并且可能添加更多工人或者使用 message TTL(一个队列和消息存活时间的扩展,在此不做过多介绍)
整合
new_task.py脚本完整代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 1#!/usr/bin/env python
2import pika
3import sys
4
5connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
6channel = connection.channel()
7
8channel.queue_declare(queue='task_queue', durable=True)
9
10message = ' '.join(sys.argv[1:]) or "Hello World!"
11channel.basic_publish(exchange='',
12 routing_key='task_queue',
13 body=message,
14 properties=pika.BasicProperties(
15 delivery_mode = 2, # make message persistent
16 ))
17print(" [x] Sent %r" % message)
18connection.close()
19
worker.py脚本完成代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 1#!/usr/bin/env python
2import pika
3import time
4
5connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
6channel = connection.channel()
7
8channel.queue_declare(queue='task_queue', durable=True)
9print(' [*] Waiting for messages. To exit press CTRL+C')
10
11def callback(ch, method, properties, body):
12 print(" [x] Received %r" % body)
13 time.sleep(body.count(b'.'))
14 print(" [x] Done")
15 ch.basic_ack(delivery_tag = method.delivery_tag)
16
17channel.basic_qos(prefetch_count=1)
18channel.basic_consume(callback,
19 queue='task_queue')
20
21channel.start_consuming()
22
使用消息通知和prefetch_count你可以建立一个工作队列 ,持久化选项会使任务仍然存在即便RabbitMQ重启。
下一节我们会了解如何把相同的消息传递给多个消费者。