0°

基于Python语言使用RabbitMQ消息队列(六)

远程过程调用(RPC)

在第二节里我们学会了如何使用工作队列在多个工人中分布时间消耗性任务。
但如果我们想要运行存在于远程计算机上的方法并等待返回结果该如何去做呢?这就不太一样了,这种模式就是常说的远程过程调用(RPC)。
在本节我们会

在本节我们会使用RabbitMQ创建一个RPC系统:一个客户端和一个可扩展(scalable)的RPC服务。由于我们没什么真正的时间消耗型任务去分配,我们就创建一个摆样子的RPC服务,它可以返回Fibonacci数。

客户端接口

为了阐释RPC服务如何使用,我们将创建一个简单的客户端类,这个类暴露一个叫做call的方法,来发送RPC请求,并等待返回答案:


1
2
3
4
1fibonacci_rpc = FibonacciRpcClient()
2result = fibonacci_rpc.call(4)
3print("fib(4) is %r" % result)
4

RPC注意事项

虽然RPC在计算机科学中是一个相当常见的模式,但它也经常饱受批评。但一个程序员没有弄清楚一个方法调用是本地还是缓慢的RPC,问题就来了。像这种迷惑就会导致一个无法预测的系统并给调试带来不必要的复杂性,相比较简化软件的思想,乱用RPC会带来难维护的一团糟的代码
把上面这些问题记在脑里,考虑以下建议:

确认哪个方法调用是本地哪个调用是远程能够很明了。 使你的系统文档化,组件之间的依赖很清晰。
应对错误情况,当RPC服务宕机很久时客户端应该如何反应? 什么时候考虑避免使用 RPC.
如果可以你应该使用异步管线-而不是RPC-把结果推到下一个计算阶段。

回调队列

通常在 RabbitMQ上用RPC都很容易。客户端发送请求消息,服务端用响应消息回应。为了接收响应,客户端需要用这个请求发送一个 ‘callback’ 队列的地址。我们试一下:


1
2
3
4
5
6
7
8
9
10
11
12
1result = channel.queue_declare(exclusive=True)
2callback_queue = result.method.queue
3
4channel.basic_publish(exchange='',
5                      routing_key='rpc_queue',
6                      properties=pika.BasicProperties(
7                            reply_to = callback_queue,
8                            ),
9                      body=request)
10
11# ... 此处为从callback_queue队列读取响应消息的代码 ...
12

Message properties

AMQP 0-9-1 协议为一个消息预定义了一组包含14个属性的集合,大多数属性都很少使用,除了以下几种:

delivery_mode: 标记一个消息为持久的(值为2)或者暂时的(任何其他值)。你可能记得这个属性在第二节中。
content_type: 常用来描述编码的mime-type。例如常用的JSON格式最好把这个属性设置为:application/json。
reply_to:通常用来命名一个回调队列。
correlation_id: 让请求(requests)关联到RPC响应(responses)时很有用。

Correlation id

在上面呈现的方法中,我们建议为每个RPC调用创建一个回调队列。那会相当没效率,幸运的是有一种更好的方法——我们每个客户端创建一个单一回调队列。

但又出现了一个新的问题,在这个队列中接收到一个响应,不清楚这个响应属于哪个请求。这时correlation_id 属性就派上用场了。我们将为每个请求设定一个唯一值。稍后当我们从回调队列收到消息我们会查看这个属性,基于此我们能把这个响应和一个请求匹配。如果我们发现了一个不认识的correlation_id值,我们可以平安无事地忽略掉这条消息。——它不属于我们的请求。

你可能会问,为什么我们要忽略回调队列中不认识的消息,而不是提出个错误?那是由于服务端会有竞争条件(race condition)的可能。RPC服务在把结果发回来后,但还没有发送请求的通知消息之前就死掉了,虽然不太可能,但如果发生了,重启的RPC服务会再次处理之前的请求。所以我们要在客户端优雅地处理重复的响应,并且RPC应该是等幂的。

总结

我们的RPC工作流程如下:

当客户端启动,创建一个匿名的专用的回调队列。对于一个RPC请求,客户端发送一个带有两个属性的消息:reply_to,这个设置给回调队列;correlation_id,这个为每个请求设置唯一值。
请求发送给 rpc_queue 队列。
RPC工人 (也叫做服务)等待那个队列上的请求。当一个请求到达,它完成任务并且发送一个带有结果的消息返回给客户端,使用源自reply_to域的队列。

整合

rpc_server.py完整代码:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
1#!/usr/bin/env python
2import pika
3
4connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
5
6channel = connection.channel()
7
8channel.queue_declare(queue='rpc_queue')
9
10def fib(n):
11    if n == 0:
12        return 0
13    elif n == 1:
14        return 1
15    else:
16        return fib(n-1) + fib(n-2)
17
18def on_request(ch, method, props, body):
19    n = int(body)
20
21    print(" [.] fib(%s)" % n)
22    response = fib(n)
23
24    ch.basic_publish(exchange='',
25                     routing_key=props.reply_to,
26                     properties=pika.BasicProperties(correlation_id = \
27                                                         props.correlation_id),
28                     body=str(response))
29    ch.basic_ack(delivery_tag = method.delivery_tag)
30
31channel.basic_qos(prefetch_count=1)
32channel.basic_consume(on_request, queue='rpc_queue')
33
34print(" [x] Awaiting RPC requests")
35channel.start_consuming()
36

rpc_client.py完整代码:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
1#!/usr/bin/env python
2import pika
3import uuid
4
5class FibonacciRpcClient(object):
6    def __init__(self):
7        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
8
9        self.channel = self.connection.channel()
10
11        result = self.channel.queue_declare(exclusive=True)
12        self.callback_queue = result.method.queue
13
14        self.channel.basic_consume(self.on_response, no_ack=True,
15                                   queue=self.callback_queue)
16
17    def on_response(self, ch, method, props, body):
18        if self.corr_id == props.correlation_id:
19            self.response = body
20
21    def call(self, n):
22        self.response = None
23        self.corr_id = str(uuid.uuid4())
24        self.channel.basic_publish(exchange='',
25                                   routing_key='rpc_queue',
26                                   properties=pika.BasicProperties(
27                                         reply_to = self.callback_queue,
28                                         correlation_id = self.corr_id,
29                                         ),
30                                   body=str(n))
31        while self.response is None:
32            self.connection.process_data_events()
33        return int(self.response)
34
35fibonacci_rpc = FibonacciRpcClient()
36
37print(" [x] Requesting fib(30)")
38response = fibonacci_rpc.call(30)
39print(" [.] Got %r" % response)
40

现在我们的RPC服务已经准备好了。我们可以启动服务:


1
2
3
1python rpc_server.py
2# => [x] Awaiting RPC requests
3

在客户端运行请求一个fibonacci数:


1
2
3
1python rpc_client.py
2# => [x] Requesting fib(30)
3

当前的设计并不是RPC服务的唯一实现,但它有一些重要优势:

如果服务太慢,你只需要再运行一个服务进行扩展。试着在新控制台再运行一个rpc_server.py。
在客户端,RPC只需要发送和接收一个消息,不需要像queue_declare这样的异步调用。这样RPC客户端的单个RPC请求只需要一次网络往返。
我们的代码已经相当简化,没有试图处理更复杂(但重要)的问题,如:

如果没有正在运行的服务客户端该如何反应?
客户端需要为RPC设置超时吗?
如果服务功能故障或抛出异常,它应给被返回给客户端么?
在处理之前防止无效的输入消息。

「点点赞赏,手留余香」

    还没有人赞赏,快来当第一个赞赏的人吧!