C# 操作rabbitmq(三)

释放双眼,带上耳机,听听看~!

此篇介绍rabbitmq的Routing

一、
使用exchange时,会广播给所有连接的consumer,而有时候我们需要订阅消息的子集就好,例如只需要其中的一些错误信息写入到日志当中

二、
Bindings
在上一篇中我们创建了exchange和queue的绑定,其中有一个属性routingKey,该属性依赖于exchange的类型,fanout会忽略改属性。
direct类型的exchange可以实现这种需求

上图中,设置了orange为routingKey的exchange会被路由到Q1,设置了black或green的routingKey会被路由到Q2,其他未设置的会被放弃掉

三、
Multiple bindings

绑定多个queue到同一个routingKey中是合法的,相当于fanout类型的exchange,会广播到绑定的响应queue中

四、例子

Publish


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
1using System;
2using System.Linq;
3using RabbitMQ.Client;
4using System.Text;
5
6class EmitLogDirect
7{
8    public static void Main(string[] args)
9    {
10        var factory = new ConnectionFactory() { HostName = "localhost" };
11        using(var connection = factory.CreateConnection())
12        using(var channel = connection.CreateModel())
13        {
14            channel.ExchangeDeclare(exchange: "direct_logs",
15                                    type: "direct");
16
17            var severity = (args.Length > 0) ? args[0] : "info";
18            var message = (args.Length > 1)
19                          ? string.Join(" ", args.Skip( 1 ).ToArray())
20                          : "Hello World!";
21            var body = Encoding.UTF8.GetBytes(message);
22            channel.BasicPublish(exchange: "direct_logs",
23                                 routingKey: severity,
24                                 basicProperties: null,
25                                 body: body);
26            Console.WriteLine(" [x] Sent '{0}':'{1}'", severity, message);
27        }
28
29        Console.WriteLine(" Press [enter] to exit.");
30        Console.ReadLine();
31    }
32}
33

Subscribe


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
1using System;
2using RabbitMQ.Client;
3using RabbitMQ.Client.Events;
4using System.Text;
5
6class ReceiveLogsDirect
7{
8    public static void Main(string[] args)
9    {
10        var factory = new ConnectionFactory() { HostName = "localhost" };
11        using(var connection = factory.CreateConnection())
12        using(var channel = connection.CreateModel())
13        {
14            channel.ExchangeDeclare(exchange: "direct_logs",
15                                    type: "direct");
16            var queueName = channel.QueueDeclare().QueueName;
17
18            if(args.Length < 1)
19            {
20                Console.Error.WriteLine("Usage: {0} [info] [warning] [error]",
21                                        Environment.GetCommandLineArgs()[0]);
22                Console.WriteLine(" Press [enter] to exit.");
23                Console.ReadLine();
24                Environment.ExitCode = 1;
25                return;
26            }
27
28            foreach(var severity in args)
29            {
30                channel.QueueBind(queue: queueName,
31                                  exchange: "direct_logs",
32                                  routingKey: severity);
33            }
34
35            Console.WriteLine(" [*] Waiting for messages.");
36
37            var consumer = new EventingBasicConsumer(channel);
38            consumer.Received += (model, ea) =>
39            {
40                var body = ea.Body;
41                var message = Encoding.UTF8.GetString(body);
42                var routingKey = ea.RoutingKey;
43                Console.WriteLine(" [x] Received '{0}':'{1}'",
44                                  routingKey, message);
45            };
46            channel.BasicConsume(queue: queueName,
47                                 autoAck: true,
48                                 consumer: consumer);
49
50            Console.WriteLine(" Press [enter] to exit.");
51            Console.ReadLine();
52        }
53    }
54}
55

给TA打赏
共{{data.count}}人
人已打赏
安全网络

CDN安全市场到2022年价值76.3亿美元

2018-2-1 18:02:50

安全漏洞

Spring Framework 多个安全漏洞预警

2018-4-7 11:12:22

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索