消息的订阅和发布是使用消息队列的常用场景。在上一篇文章中,虽然有多个消费者,但是一个消息只会有一个消费者来处理。而订阅和发布则是每个订阅该消息的消费者都会收到这个消息。RabbitMQ的路由机制让我们实现这个功能轻而易举。
要了解RabbitMQ的路由机制,exchange是一个关键。exchange可以叫做交换机,也似乎可以叫做路由器,反正它是用来选择路由的。前文说到,RabbitMQ的核心思想就是消息的发布者不是直接把消息发送到目标队列中的,事实上,通常它并不知道消息要发到哪个队列中,它只知道把消息队列发送到exchange中。exchange一边接收发送者发过来的消息,而另一边则把消息发送到目标队列中去。exchange一定知道哪些队列需要接收这个消息,是加到一个队列里还是加到好几个队列里,还是直接扔掉。下图中的X就是exchange。
RabbitMQ的exchange有一些类型,这些类型决定了exchange的行为。分别是 direct, topic, headers 和 fanout四种类型。在这一篇文章中介绍的最简单的发送和接收例子中使用的如下代码中
1
2 1//指定发送的路由,通过默认的exchange直接发送到指定的队列中。
2
1
2 1channel.BasicPublish("", "esbtest.rmq.consoleserver", null, bytes);
2
第一个参数我们输入了空字符串来代表一个exchange。空字符串的exchange在RabbitMQ中时默认的exchange,类型是direct。在这个例子中它会直接将消息发送到第二个参数route_key定义的同名的队列中。
而我们这篇介绍的订阅和发布是用了fanout这个类型。由于默认类型是direct的,所以需要使用fanout就需要额外定义。如下代码就是定义了一个名字叫publish的fanout类型的exchange。
1
2 1channel.ExchangeDeclare("publish", "fanout");
2
这种exchange有分发的意思。那分发到哪些队列中呢?因为发布者不需要知道,所以这段代码页就在订阅者那边来实现。来看发送方得代码片段:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87 1using RabbitMQ.Client;
2using RabbitMQ.Client.Events;
3using System;
4using System.Collections.Generic;
5using System.IO;
6using System.Linq;
7using System.Text;
8using System.Threading;
9using System.Threading.Tasks;
10using System.Xml.Serialization;
11
12namespace SendService
13{
14 class Program
15 {
16 static void Main(string[] args)
17 {
18 var factory = new ConnectionFactory();
19 factory.HostName = "192.168.12.111";
20 factory.Port = 5672;
21 factory.UserName = "admin";
22 factory.Password = "admin";
23 //定义要发送的数据
24
25 List<RequestMessage> messages = new List<RequestMessage>();
26 for (int i = 0; i < 100; i++)
27 {
28 RequestMessage message = new RequestMessage() { MessageId = Guid.NewGuid(), Message = "Send1this is a 请求。" + i, Type="email" };
29 messages.Add(message);
30 }
31 for (int i = 0; i < 100; i++)
32 {
33 RequestMessage message = new RequestMessage() { MessageId = Guid.NewGuid(), Message = "Send1this is a 请求。" + i, Type = "sms" };
34 messages.Add(message);
35 }
36
37 using (var connection = factory.CreateConnection())
38 {
39 using (var channel = connection.CreateModel())
40 {
41 channel.ExchangeDeclare("publish-topic", "topic", true);//定义一个交换机,且采用广播类型,并持久化该交换机
42 string smsqueue = channel.QueueDeclare("all.sms.message", true, false, false, null);//创建一个队列,第2个参数为true表示为持久队列
43 //绑定到名字叫publish的exchange上
44 channel.QueueBind(smsqueue, "publish-topic", "sms");
45 string emailqueue = channel.QueueDeclare("all.email.message", true, false, false, null);//创建一个队列,第2个参数为true表示为持久队列
46 channel.QueueBind(emailqueue, "publish-topic", "email");
47 var properties = channel.CreateBasicProperties();
48 properties.DeliveryMode = 2;//1表示不持久,2.表示持久化
49 foreach (var item in messages)
50 {
51 if (item.Type == "sms")
52 {
53 XmlSerializer xs = new XmlSerializer(typeof(RequestMessage));
54 using (MemoryStream ms = new MemoryStream())
55 {
56 xs.Serialize(ms, item);
57 byte[] bytes = ms.ToArray();
58 channel.BasicPublish("publish-topic", "sms", properties, bytes); //发送消息,这里指定了交换机名称,且routeKey会被忽略
59 Console.WriteLine(" [x] Sent {0}", item.Message);
60 }
61 }
62 if (item.Type == "email")
63 {
64 XmlSerializer xs = new XmlSerializer(typeof(RequestMessage));
65 using (MemoryStream ms = new MemoryStream())
66 {
67 xs.Serialize(ms, item);
68 byte[] bytes = ms.ToArray();
69 channel.BasicPublish("publish-topic", "email", properties, bytes); //发送消息,这里指定了交换机名称,且routeKey会被忽略
70 Console.WriteLine(" [x] Sent {0}", item.Message);
71 }
72 }
73 }
74 }
75 }
76 }
77 }
78
79 public class RequestMessage
80 {
81 public Guid MessageId { set; get; }
82 public string Message { set; get; }
83 public string Type { set; get; }
84 }
85}
86
87
接受方SMS:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71 1using RabbitMQ.Client;
2using RabbitMQ.Client.Events;
3using System;
4using System.Collections.Generic;
5using System.IO;
6using System.Linq;
7using System.Text;
8using System.Threading;
9using System.Threading.Tasks;
10using System.Xml.Serialization;
11
12namespace ReceiveSMSService
13{
14 class Program
15 {
16 static void Main(string[] args)
17 {
18 var factory = new ConnectionFactory();
19 factory.HostName = "192.168.12.111";
20 factory.Port = 5672;
21 factory.UserName = "admin";
22 factory.Password = "admin";
23
24 using (var connection = factory.CreateConnection())
25 {
26 using (var channel = connection.CreateModel())
27 {
28 //交换机持久化
29 channel.ExchangeDeclare("publish-topic", "topic", true);
30 bool durable = true;//队列持久化
31 string queue_name = channel.QueueDeclare("all.sms.message", durable, false, false, null);//hello是queue的名字
32 //绑定到名字叫publish的exchange上
33 channel.QueueBind(queue_name, "publish-topic", "sms");
34 //定义这个队列的消费者
35 QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
36 channel.BasicConsume(queue_name, false, consumer);
37 Console.WriteLine(" [*] Waiting for messages." + "To exit press CTRL+C");
38 while (true)
39 {
40 //阻塞函数,获取队列中的消息
41 var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();//挂起的操作
42 //模拟长时间运行
43 Thread.Sleep(3000);
44
45 byte[] body = ea.Body;
46 XmlSerializer xs = new XmlSerializer(typeof(RequestMessage));
47 using (MemoryStream ms = new MemoryStream(body))
48 {
49
50 RequestMessage message = (RequestMessage)xs.Deserialize(ms);
51
52 Console.WriteLine("Receive a Message, Id:" + message.MessageId + " Message:" + message.Message + " Type:" + message.Type);
53
54 }
55 //发送应答包,消息持久化时候使用
56 channel.BasicAck(ea.DeliveryTag, false);
57 }
58 }
59 }
60 }
61 }
62
63 public class RequestMessage
64 {
65 public Guid MessageId { set; get; }
66 public string Message { set; get; }
67 public string Type { set; get; }
68 }
69}
70
71
接收方邮件:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70 1using RabbitMQ.Client;
2using RabbitMQ.Client.Events;
3using System;
4using System.Collections.Generic;
5using System.IO;
6using System.Linq;
7using System.Text;
8using System.Threading;
9using System.Threading.Tasks;
10using System.Xml.Serialization;
11
12namespace ReceiveEmailService
13{
14 class Program
15 {
16 static void Main(string[] args)
17 {
18 var factory = new ConnectionFactory();
19 factory.HostName = "192.168.12.111";
20 factory.Port = 5672;
21 factory.UserName = "admin";
22 factory.Password = "admin";
23
24 using (var connection = factory.CreateConnection())
25 {
26 using (var channel = connection.CreateModel())
27 {
28 channel.ExchangeDeclare("publish-topic", "topic", true);
29 bool durable = true;//队列持久化
30 string queue_name = channel.QueueDeclare("all.email.message", durable, false, false, null);//hello是queue的名字
31 //绑定到名字叫publish的exchange上
32 channel.QueueBind(queue_name, "publish-topic", "email");
33 //定义这个队列的消费者
34 QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
35 channel.BasicConsume(queue_name, false, consumer);
36 Console.WriteLine(" [*] Waiting for messages." + "To exit press CTRL+C");
37 while (true)
38 {
39 //阻塞函数,获取队列中的消息
40 var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();//挂起的操作
41 //模拟长时间运行
42 Thread.Sleep(3000);
43
44 byte[] body = ea.Body;
45 XmlSerializer xs = new XmlSerializer(typeof(RequestMessage));
46 using (MemoryStream ms = new MemoryStream(body))
47 {
48
49 RequestMessage message = (RequestMessage)xs.Deserialize(ms);
50
51 Console.WriteLine("Receive a Message, Id:" + message.MessageId + " Message:" + message.Message + " Type:" + message.Type);
52
53 }
54 //发送应答包,消息持久化时候使用
55 channel.BasicAck(ea.DeliveryTag, false);
56 }
57 }
58 }
59 }
60 }
61
62 public class RequestMessage
63 {
64 public Guid MessageId { set; get; }
65 public string Message { set; get; }
66 public string Type { set; get; }
67 }
68}
69
70
同样先定义一个fanout的exchange。对于为什么要在发布者和订阅者都要定义同名的exchange,我的理解是如果没有定义的一方先启动的话则会报错说找不到那个exchange。我测试过如果有定义的先启动,没定义的后启动也是没有问题的。
因为每个订阅者都需要一个队列来存放发给自己的消息,所以需要创建一个队列。通过QueueBind来和exchange关联了。所有发送给名字为publish的exchange的消息,都会被它分发给所有与之绑定的队列中,这样,每个对应的消费者都会收到一个副本。
在浏览器的管理界面上我们可以看见,RabbitMQ为每一个消费者(订阅者)创建了一个队列,而没有为发送者创建队列。