接着上一篇继续分析rabbitmq的使用,此篇介绍rabbitmq的publish/Subcribe
一、
Exchange
rabbitmq建议消息的producer不要直接的把消息发送给queue,而是把消息发送给Exchange。
Exchange一边接收来自producer的消息,一边将消息push给queue。
rabbitmq提供了四种exchange type:
- direct :第三篇
- topic :第四篇
- headers :很少使用,rabbitmq官网没有给出例子
此类型的exchange和以上三个都不一样,其路由的规则是根据header来判断,其中的header就是以下方法的arguments参数:
1
2
3
4
5
6
7
8
9
10 1Dictionary<string, object> aHeader = new Dictionary<string, object>();
2aHeader.Add("format", "pdf");
3aHeader.Add("type", "report");
4aHeader.Add("x-match", "all");
5channel.QueueBind(queue: "queue.A",
6 exchange: "agreements",
7 routingKey: string.Empty,
8 arguments: aHeader);
9
10
其中的x-match为特殊的header,可以为all则表示要匹配所有的header,如果为any则表示只要匹配其中的一个header即可。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42 1var factory = new ConnectionFactory() { HostName = "localhost" };
2using (var connection = factory.CreateConnection())
3using (var channel = connection.CreateModel())
4{
5 // Headers类型的exchange, 名称 agreements
6 channel.ExchangeDeclare(exchange: "agreements",
7 type: ExchangeType.Headers,
8 durable: true,
9 autoDelete: false,
10 arguments: null);
11
12 // 创建queue.A队列
13 channel.QueueDeclare(queue: "queue.A", durable: true, exclusive: false, autoDelete: false, arguments: null);
14
15 /*发布者*/
16 string message1 = "hello world";
17 var body = Encoding.UTF8.GetBytes(message1);
18 var properties = channel.CreateBasicProperties();
19 properties.Persistent = true;
20 Dictionary<string, object> mHeader1 = new Dictionary<string, object>();
21 mHeader1.Add("format", "pdf");
22 mHeader1.Add("type", "report");
23 properties.Headers = mHeader1;
24
25 //queue.A 的binding (format=pdf, type=report, x-match=all)
26 channel.BasicPublish(exchange: "agreements",
27 routingKey: string.Empty,
28 basicProperties: properties,
29 body: body);
30
31 //接收者
32 Dictionary<string, object> aHeader = new Dictionary<string, object>();
33 aHeader.Add("format", "pdf");
34 aHeader.Add("type", "report");
35 aHeader.Add("x-match", "all");
36 channel.QueueBind(queue: "queue.A",
37 exchange: "agreements",
38 routingKey: string.Empty,
39 arguments: aHeader);
40}
41
42
-
fanout : 将接收到消息广播给它所知道的queue
1
2
3
4
5
6
7
8
9
10 1channel.ExchangeDeclare("logs", "fanout"); //声明一个fanout类型的exchange ,名称为logs
2
3var message = GetMessage(args);
4var body = Encoding.UTF8.GetBytes(message);
5channel.BasicPublish(exchange: "logs",
6 routingKey: "",
7 basicProperties: null,
8 body: body);
9
10
1
2
3
4
5
6
7
8
9 1//create a non-durable, exclusive, autodelete queue with a generated name:
2var queueName = channel.QueueDeclare().QueueName;
3
4//将queue与exchange绑定
5channel.QueueBind(queue: queueName,
6 exchange: "logs",
7 routingKey: "");
8
9
publish
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 1using System;
2using RabbitMQ.Client;
3using System.Text;
4
5class EmitLog
6{
7 public static void Main(string[] args)
8 {
9 var factory = new ConnectionFactory() { HostName = "localhost" };
10 using(var connection = factory.CreateConnection())
11 using(var channel = connection.CreateModel())
12 {
13 channel.ExchangeDeclare(exchange: "logs", type: "fanout");
14
15 var message = GetMessage(args);
16 var body = Encoding.UTF8.GetBytes(message);
17 channel.BasicPublish(exchange: "logs",
18 routingKey: "",
19 basicProperties: null,
20 body: body);
21 Console.WriteLine(" [x] Sent {0}", message);
22 }
23
24 Console.WriteLine(" Press [enter] to exit.");
25 Console.ReadLine();
26 }
27
28 private static string GetMessage(string[] args)
29 {
30 return ((args.Length > 0)
31 ? string.Join(" ", args)
32 : "info: Hello World!");
33 }
34}
35
Subcribe
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39 1using System;
2using RabbitMQ.Client;
3using RabbitMQ.Client.Events;
4using System.Text;
5
6class ReceiveLogs
7{
8 public static void Main()
9 {
10 var factory = new ConnectionFactory() { HostName = "localhost" };
11 using(var connection = factory.CreateConnection())
12 using(var channel = connection.CreateModel())
13 {
14 channel.ExchangeDeclare(exchange: "logs", type: "fanout");
15
16 var queueName = channel.QueueDeclare().QueueName;
17 channel.QueueBind(queue: queueName,
18 exchange: "logs",
19 routingKey: "");
20
21 Console.WriteLine(" [*] Waiting for logs.");
22
23 var consumer = new EventingBasicConsumer(channel);
24 consumer.Received += (model, ea) =>
25 {
26 var body = ea.Body;
27 var message = Encoding.UTF8.GetString(body);
28 Console.WriteLine(" [x] {0}", message);
29 };
30 channel.BasicConsume(queue: queueName,
31 autoAck: true,
32 consumer: consumer);
33
34 Console.WriteLine(" Press [enter] to exit.");
35 Console.ReadLine();
36 }
37 }
38}
39