释放双眼,带上耳机,听听看~!
首先声明:
RPC调用会增加系统复杂度以及调试难度,
容易使代码混乱,系统缓慢。
尽量避免使用RPC,转而使用异步管道替代。
rabbitMQ实现的RPC主要流程:
- 客户端启动后,创建一个匿名唯一的回调队列
- 对于一个RPC请求,客户端发送一个消息和2个属性,一个是replyto用来设置回调队列,另一个是correlationId,每个请求的值都是唯一的
- RPC worker(也是被调用的服务方)从请求队列中获取值,每出现一个请求,就处理它,并把结果发送到回调队列返回给 客户端。使用replyto字段
- 客户端在回调队列中等待结果,检查correlationId,符合条件的返还给应用。
实现代码(服务方):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64 1public static void main(String[] args) {
2 try(Connection connection=getConnection("guest","guest","localhost",5672,"/");
3 Channel channel=connection.createChannel()) {
4
5 channel.queueDeclare(queueName, false, false, false, null);
6 channel.basicQos(1);
7
8 System.out.println(" [x] Awaiting RPC requests");
9
10 Consumer consumer = new DefaultConsumer(channel) {
11 @Override
12 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException, IOException {
13 AMQP.BasicProperties replyProps = new AMQP.BasicProperties
14 .Builder()
15 .correlationId(properties.getCorrelationId())
16 .build();
17
18 String response = "";
19
20 try {
21 String message = new String(body,"UTF-8");
22 int n = Integer.parseInt(message);
23
24 System.out.println(" [.] fib(" + message + ")");
25 response += fib(n);
26 }
27 catch (RuntimeException e){
28 System.out.println(" [.] " + e.toString());
29 }
30 finally {
31 channel.basicPublish( "", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));
32
33 channel.basicAck(envelope.getDeliveryTag(), false);
34
35 // RabbitMq consumer worker thread notifies the RPC server owner thread
36 synchronized(this) {
37 System.out.println(this.getClass().getName());
38 this.notify();
39 }
40 }
41 }
42
43 private int fib(int n) {
44 if (n == 0) return 0;
45 if (n == 1) return 1;
46 return fib(n-1) + fib(n-2);
47 }
48 };
49
50
51 channel.basicConsume(queueName, false, consumer);
52
53 while (true){
54 synchronized (consumer){
55 consumer.wait();
56 }
57
58 }
59 } catch (Exception e) {
60 throw new RuntimeException(e);
61 }
62
63 }
64
客户端:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34 1public static void main(String[] args) {
2
3 try (Connection connection = getConnection("guest", "guest", "localhost", 5672, "/");
4 Channel channel = connection.createChannel()) {
5
6 String r=call("5",channel,"rpc_queue");
7 System.out.println(r);
8 } catch (Exception e) {
9 throw new RuntimeException(e);
10 }
11
12
13 }
14 public static String call(String message,Channel channel,String requestQueueName) throws IOException, InterruptedException {
15 String replyQueueName=channel.queueDeclare().getQueue();
16
17 final String corrId= UUID.randomUUID().toString();
18 AMQP.BasicProperties properties=new AMQP.BasicProperties.Builder().correlationId(corrId)
19 .replyTo(replyQueueName)
20 .build();
21 channel.basicPublish("",requestQueueName,properties,message.getBytes("utf-8"));
22
23 final BlockingQueue<String> response=new ArrayBlockingQueue<String>(1);
24 channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel){
25 @Override
26 public void handleDelivery(String s, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
27 if (properties.getCorrelationId().equals(corrId)){
28 response.offer(new String(bytes,"utf-8"));
29 }
30 }
31 });
32 return response.take();
33 }
34