C# 操作rabbitmq(五)

释放双眼,带上耳机,听听看~!

此篇介绍rabbitmq的RPC

一、
虽然我们可以使用work queue给worker发送消息,但是如果我们希望在远程服务器上运行一个方法并且想要得到结果呢?rabbitmq的RPC功能可以实现。

二、
Callback queue,回调队列,在rabbitmq上构建RPC是很容易的,客户端发送请求消息,服务端响应回复消息,为了接收响应消息,我们需要在发送请求的时候附加一个callback 队列地址


1
2
3
4
5
6
7
8
9
10
11
1var props = channel.CreateBasicProperties();
2props.ReplyTo = replyQueueName;
3
4var messageBytes = Encoding.UTF8.GetBytes(message);
5channel.BasicPublish(exchange: "",
6                     routingKey: "rpc_queue",
7                     basicProperties: props,
8                     body: messageBytes);
9
10// ... then code to read a response message from the callback_queue ...
11

Message properties 消息属性:

AMQP 0-9-1协议预定义了一组带有消息的14个属性。大多数属性很少使用,但以下情况除外:

Persistent: : 将消息标记为持久性(值为2)或瞬态(任何其他值)
DeliveryMode: 熟悉协议的人可以选择使用此属性而不是Persistent。他们控制着同样的事情。
ContentType:用于描述编码的mime类型。例如,对于经常使用的JSON编码,将此属性设置为:application / json是一种很好的做法。
ReplyTo: 通常用于命名回调队列。
CorrelationId: 用于将RPC响应与请求相关联

三、
Correlation Id 关联Id

在上面介绍的方法中,我们建议为每个RPC请求创建一个回调队列。这是非常低效的,但幸运的是有更好的方法 – 让我们为每个客户端创建一个回调队列。

这引发了一个新问题,在该队列中收到响应后,不清楚响应属于哪个请求。那是在使用CorrelationId属性的时候 。我们将为每个请求将其设置为唯一值。稍后,当我们在回调队列中收到一条消息时,我们将查看此属性,并根据该属性,我们将能够将响应与请求进行匹配。如果我们看到未知的 CorrelationId值,我们可以安全地丢弃该消息 – 它不属于我们的请求。

您可能会问,为什么我们应该忽略回调队列中的未知消息,而不是失败并出现错误?这是由于服务器端可能存在竞争条件。虽然不太可能,但RPC服务器可能会在向我们发送答案之后,但在发送请求的确认消息之前死亡。如果发生这种情况,重新启动的RPC服务器将再次处理请求。这就是为什么在客户端上我们必须优雅地处理重复的响应,理想情况下RPC应该是幂等(一次请求和多次请求资源的状态是一样的)的。

四、
实例


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
1//Fibonacci
2private static int fib(int n)
3{
4    if (n == 0 || n == 1) return n;
5    return fib(n - 1) + fib(n - 2);
6}
7
8//RPCServer.cs
9using System;
10using RabbitMQ.Client;
11using RabbitMQ.Client.Events;
12using System.Text;
13
14class RPCServer
15{
16    public static void Main()
17    {
18        var factory = new ConnectionFactory() { HostName = "localhost" };
19        using (var connection = factory.CreateConnection())
20        using (var channel = connection.CreateModel())
21        {
22            channel.QueueDeclare(queue: "rpc_queue", durable: false,
23              exclusive: false, autoDelete: false, arguments: null);
24            channel.BasicQos(0, 1, false);
25            var consumer = new EventingBasicConsumer(channel);
26            channel.BasicConsume(queue: "rpc_queue",
27              autoAck: false, consumer: consumer);
28            Console.WriteLine(" [x] Awaiting RPC requests");
29
30            consumer.Received += (model, ea) =>
31            {
32                string response = null;
33
34                var body = ea.Body;
35                var props = ea.BasicProperties;
36                var replyProps = channel.CreateBasicProperties();
37                replyProps.CorrelationId = props.CorrelationId;
38
39                try
40                {
41                    var message = Encoding.UTF8.GetString(body);
42                    int n = int.Parse(message);
43                    Console.WriteLine(" [.] fib({0})", message);
44                    response = fib(n).ToString();
45                }
46                catch (Exception e)
47                {
48                    Console.WriteLine(" [.] " + e.Message);
49                    response = "";
50                }
51                finally
52                {
53                    var responseBytes = Encoding.UTF8.GetBytes(response);
54                    channel.BasicPublish(exchange: "", routingKey: props.ReplyTo,
55                      basicProperties: replyProps, body: responseBytes);
56                    channel.BasicAck(deliveryTag: ea.DeliveryTag,
57                      multiple: false);
58                }
59            };
60
61            Console.WriteLine(" Press [enter] to exit.");
62            Console.ReadLine();
63        }
64    }
65
66    ///
67
68    /// Assumes only valid positive integer input.
69    /// Don't expect this one to work for big numbers, and it's
70    /// probably the slowest recursive implementation possible.
71    ///
72
73    private static int fib(int n)
74    {
75        if (n == 0 || n == 1)
76        {
77            return n;
78        }
79
80        return fib(n - 1) + fib(n - 2);
81    }
82}
83
84//RPCClient.cs
85using System;
86using System.Collections.Concurrent;
87using System.Text;
88using RabbitMQ.Client;
89using RabbitMQ.Client.Events;
90
91public class RpcClient
92{
93    private readonly IConnection connection;
94    private readonly IModel channel;
95    private readonly string replyQueueName;
96    private readonly EventingBasicConsumer consumer;
97    private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>();
98    private readonly IBasicProperties props;
99
100public RpcClient()
101{
102        var factory = new ConnectionFactory() { HostName = "localhost" };
103
104        connection = factory.CreateConnection();
105        channel = connection.CreateModel();
106        replyQueueName = channel.QueueDeclare().QueueName;
107        consumer = new EventingBasicConsumer(channel);
108
109        props = channel.CreateBasicProperties();
110        var correlationId = Guid.NewGuid().ToString();
111        props.CorrelationId = correlationId;
112        props.ReplyTo = replyQueueName;
113
114        consumer.Received += (model, ea) =>
115        {
116            var body = ea.Body;
117            var response = Encoding.UTF8.GetString(body);
118            if (ea.BasicProperties.CorrelationId == correlationId)
119            {
120                respQueue.Add(response);
121            }
122        };
123    }
124
125    public string Call(string message)
126    {
127        var messageBytes = Encoding.UTF8.GetBytes(message);
128        channel.BasicPublish(
129            exchange: "",
130            routingKey: "rpc_queue",
131            basicProperties: props,
132            body: messageBytes);
133
134        channel.BasicConsume(
135            consumer: consumer,
136            queue: replyQueueName,
137            autoAck: true);
138
139        return respQueue.Take(); ;
140    }
141
142    public void Close()
143    {
144        connection.Close();
145    }
146}
147
148public class Rpc
149{
150    public static void Main()
151    {
152        var rpcClient = new RpcClient();
153
154        Console.WriteLine(" [x] Requesting fib(30)");
155        var response = rpcClient.Call("30");
156
157        Console.WriteLine(" [.] Got '{0}'", response);
158        rpcClient.Close();
159    }
160}
161

给TA打赏
共{{data.count}}人
人已打赏
安全网络

CDN安全市场到2022年价值76.3亿美元

2018-2-1 18:02:50

安全技术

【专访 PHP 之父】PHP7 性能翻倍关键大揭露

2016-12-18 18:12:08

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索