此篇介绍rabbitmq的Topic
一、
虽然Routing可以让我们用更适合于工作业务的方式操作消息队列,但还是存在一定的局限性,例如不能同时适应多个,比如我们不仅想要监听来自cron的错误消息,也要坚定kern的消息,而Topic类型的exchange可以解决这个问题。
二、
Topic exchange
topic exchange 发送的消息,routing-key不能是任意的,必须是一个以…为界的列表,列表的单词可以是任意的。有效的routing-key例如:
”stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”. 可以是多个单词组成,但是不能参过255个字符。
* (star) 可以取代其中的一个word.
# (hash) 可以取代0或多个word.
建立这样的一个routingKey : “< speed>.< colour>.< species>”.
Q1 对于颜色为orange的感兴趣,订阅了它
Q2 想监听关于rabbit是的所有和光宇lazy的一切.
如果我们使用一个或四个word,像”orange”或”quick.orange.male.rabbit”,因为匹配不到,消息会丢失
“lazy.orange.male.rabbit”, 则会匹配最后一个binding规则,会被分发给Q2
三、实例
publish
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30 1using System;
2using System.Linq;
3using RabbitMQ.Client;
4using System.Text;
5
6class EmitLogTopic
7{
8 public static void Main(string[] args)
9 {
10 var factory = new ConnectionFactory() { HostName = "localhost" };
11 using(var connection = factory.CreateConnection())
12 using(var channel = connection.CreateModel())
13 {
14 channel.ExchangeDeclare(exchange: "topic_logs",
15 type: "topic");
16
17 var routingKey = (args.Length > 0) ? args[0] : "anonymous.info";
18 var message = (args.Length > 1)
19 ? string.Join(" ", args.Skip( 1 ).ToArray())
20 : "Hello World!";
21 var body = Encoding.UTF8.GetBytes(message);
22 channel.BasicPublish(exchange: "topic_logs",
23 routingKey: routingKey,
24 basicProperties: null,
25 body: body);
26 Console.WriteLine(" [x] Sent '{0}':'{1}'", routingKey, message);
27 }
28 }
29}
30
subscribe
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55 1using System;
2using RabbitMQ.Client;
3using RabbitMQ.Client.Events;
4using System.Text;
5
6class ReceiveLogsTopic
7{
8 public static void Main(string[] args)
9 {
10 var factory = new ConnectionFactory() { HostName = "localhost" };
11 using(var connection = factory.CreateConnection())
12 using(var channel = connection.CreateModel())
13 {
14 channel.ExchangeDeclare(exchange: "topic_logs", type: "topic");
15 var queueName = channel.QueueDeclare().QueueName;
16
17 if(args.Length < 1)
18 {
19 Console.Error.WriteLine("Usage: {0} [binding_key...]",
20 Environment.GetCommandLineArgs()[0]);
21 Console.WriteLine(" Press [enter] to exit.");
22 Console.ReadLine();
23 Environment.ExitCode = 1;
24 return;
25 }
26
27 foreach(var bindingKey in args)
28 {
29 channel.QueueBind(queue: queueName,
30 exchange: "topic_logs",
31 routingKey: bindingKey);
32 }
33
34 Console.WriteLine(" [*] Waiting for messages. To exit press CTRL+C");
35
36 var consumer = new EventingBasicConsumer(channel);
37 consumer.Received += (model, ea) =>
38 {
39 var body = ea.Body;
40 var message = Encoding.UTF8.GetString(body);
41 var routingKey = ea.RoutingKey;
42 Console.WriteLine(" [x] Received '{0}':'{1}'",
43 routingKey,
44 message);
45 };
46 channel.BasicConsume(queue: queueName,
47 autoAck: true,
48 consumer: consumer);
49
50 Console.WriteLine(" Press [enter] to exit.");
51 Console.ReadLine();
52 }
53 }
54}
55