Java使用RabbitMQ(九)–RPC

释放双眼,带上耳机,听听看~!

首先声明:
RPC调用会增加系统复杂度以及调试难度,
容易使代码混乱,系统缓慢。

尽量避免使用RPC,转而使用异步管道替代。

rabbitMQ实现的RPC主要流程:

  1. 客户端启动后,创建一个匿名唯一的回调队列
  2. 对于一个RPC请求,客户端发送一个消息和2个属性,一个是replyto用来设置回调队列,另一个是correlationId,每个请求的值都是唯一的
  3. RPC worker(也是被调用的服务方)从请求队列中获取值,每出现一个请求,就处理它,并把结果发送到回调队列返回给 客户端。使用replyto字段
  4. 客户端在回调队列中等待结果,检查correlationId,符合条件的返还给应用。

实现代码(服务方):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
1public static void main(String[] args) {
2        try(Connection connection=getConnection("guest","guest","localhost",5672,"/");
3            Channel channel=connection.createChannel()) {
4
5            channel.queueDeclare(queueName, false, false, false, null);
6            channel.basicQos(1);
7
8            System.out.println(" [x] Awaiting RPC requests");
9
10            Consumer consumer = new DefaultConsumer(channel) {
11                @Override
12                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException, IOException {
13                    AMQP.BasicProperties replyProps = new AMQP.BasicProperties
14                            .Builder()
15                            .correlationId(properties.getCorrelationId())
16                            .build();
17
18                    String response = "";
19
20                    try {
21                        String message = new String(body,"UTF-8");
22                        int n = Integer.parseInt(message);
23
24                        System.out.println(" [.] fib(" + message + ")");
25                        response += fib(n);
26                    }
27                    catch (RuntimeException e){
28                        System.out.println(" [.] " + e.toString());
29                    }
30                    finally {
31                        channel.basicPublish( "", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));
32
33                        channel.basicAck(envelope.getDeliveryTag(), false);
34
35                        // RabbitMq consumer worker thread notifies the RPC server owner thread
36                        synchronized(this) {
37                            System.out.println(this.getClass().getName());
38                            this.notify();
39                        }
40                    }
41                }
42
43                private int fib(int n) {
44                    if (n == 0) return 0;
45                    if (n == 1) return 1;
46                    return fib(n-1) + fib(n-2);
47                }
48            };
49
50
51            channel.basicConsume(queueName, false, consumer);
52
53            while (true){
54                synchronized (consumer){
55                    consumer.wait();
56                }
57
58            }
59        } catch (Exception e) {
60            throw new RuntimeException(e);
61        }
62
63    }
64

客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
1public static void main(String[] args) {
2
3        try (Connection connection = getConnection("guest", "guest", "localhost", 5672, "/");
4            Channel channel = connection.createChannel()) {
5
6            String r=call("5",channel,"rpc_queue");
7            System.out.println(r);
8        } catch (Exception e) {
9            throw new RuntimeException(e);
10        }
11
12
13    }
14    public static String call(String message,Channel channel,String requestQueueName) throws IOException, InterruptedException {
15        String replyQueueName=channel.queueDeclare().getQueue();
16
17        final String corrId= UUID.randomUUID().toString();
18        AMQP.BasicProperties properties=new AMQP.BasicProperties.Builder().correlationId(corrId)
19                .replyTo(replyQueueName)
20                .build();
21        channel.basicPublish("",requestQueueName,properties,message.getBytes("utf-8"));
22
23        final BlockingQueue<String> response=new ArrayBlockingQueue<String>(1);
24        channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel){
25            @Override
26            public void handleDelivery(String s, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
27                if (properties.getCorrelationId().equals(corrId)){
28                    response.offer(new String(bytes,"utf-8"));
29                }
30            }
31        });
32        return response.take();
33    }
34

给TA打赏
共{{data.count}}人
人已打赏
安全网络

CDN安全市场到2022年价值76.3亿美元

2018-2-1 18:02:50

安全资讯

报告称美国只有4%的iOS用户选择接受广告追踪

2021-8-16 15:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索