C# 操作rabbitmq(二)

释放双眼,带上耳机,听听看~!

接着上一篇继续分析rabbitmq的使用,此篇介绍rabbitmq的publish/Subcribe

一、
Exchange
rabbitmq建议消息的producer不要直接的把消息发送给queue,而是把消息发送给Exchange。
Exchange一边接收来自producer的消息,一边将消息push给queue。

rabbitmq提供了四种exchange type:

  • direct :第三篇
  • topic :第四篇
  • headers :很少使用,rabbitmq官网没有给出例子

此类型的exchange和以上三个都不一样,其路由的规则是根据header来判断,其中的header就是以下方法的arguments参数:


1
2
3
4
5
6
7
8
9
10
1Dictionary<string, object> aHeader = new Dictionary<string, object>();
2aHeader.Add("format", "pdf");
3aHeader.Add("type", "report");
4aHeader.Add("x-match", "all");
5channel.QueueBind(queue: "queue.A",
6            exchange: "agreements",
7            routingKey: string.Empty,
8            arguments: aHeader);
9
10

其中的x-match为特殊的header,可以为all则表示要匹配所有的header,如果为any则表示只要匹配其中的一个header即可。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
1var factory = new ConnectionFactory() { HostName = "localhost" };
2using (var connection = factory.CreateConnection())
3using (var channel = connection.CreateModel())
4{
5    // Headers类型的exchange, 名称 agreements
6    channel.ExchangeDeclare(exchange: "agreements",
7                    type: ExchangeType.Headers,
8                    durable: true,
9                    autoDelete: false,
10                    arguments: null);
11
12    // 创建queue.A队列
13    channel.QueueDeclare(queue: "queue.A", durable: true, exclusive: false, autoDelete: false, arguments: null);
14
15    /*发布者*/
16    string message1 = "hello world";
17    var body = Encoding.UTF8.GetBytes(message1);
18    var properties = channel.CreateBasicProperties();
19    properties.Persistent = true;
20    Dictionary<string, object> mHeader1 = new Dictionary<string, object>();
21    mHeader1.Add("format", "pdf");
22    mHeader1.Add("type", "report");
23    properties.Headers = mHeader1;
24
25    //queue.A 的binding (format=pdf, type=report, x-match=all)
26    channel.BasicPublish(exchange: "agreements",
27                    routingKey: string.Empty,
28                    basicProperties: properties,
29                    body: body);
30
31    //接收者
32    Dictionary<string, object> aHeader = new Dictionary<string, object>();
33    aHeader.Add("format", "pdf");
34    aHeader.Add("type", "report");
35    aHeader.Add("x-match", "all");
36    channel.QueueBind(queue: "queue.A",
37                exchange: "agreements",
38                routingKey: string.Empty,
39                arguments: aHeader);
40}
41
42
  • fanout : 将接收到消息广播给它所知道的queue


1
2
3
4
5
6
7
8
9
10
1channel.ExchangeDeclare("logs", "fanout"); //声明一个fanout类型的exchange ,名称为logs
2
3var message = GetMessage(args);
4var body = Encoding.UTF8.GetBytes(message);
5channel.BasicPublish(exchange: "logs",
6             routingKey: "",
7             basicProperties: null,
8             body: body);
9
10

1
2
3
4
5
6
7
8
9
1//create a non-durable, exclusive, autodelete queue with a generated name:
2var queueName = channel.QueueDeclare().QueueName;
3
4//将queue与exchange绑定
5channel.QueueBind(queue: queueName,
6              exchange: "logs",
7              routingKey: "");
8
9

publish


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
1using System;
2using RabbitMQ.Client;
3using System.Text;
4
5class EmitLog
6{
7    public static void Main(string[] args)
8    {
9        var factory = new ConnectionFactory() { HostName = "localhost" };
10        using(var connection = factory.CreateConnection())
11        using(var channel = connection.CreateModel())
12        {
13            channel.ExchangeDeclare(exchange: "logs", type: "fanout");
14
15            var message = GetMessage(args);
16            var body = Encoding.UTF8.GetBytes(message);
17            channel.BasicPublish(exchange: "logs",
18                                 routingKey: "",
19                                 basicProperties: null,
20                                 body: body);
21            Console.WriteLine(" [x] Sent {0}", message);
22        }
23
24        Console.WriteLine(" Press [enter] to exit.");
25        Console.ReadLine();
26    }
27
28    private static string GetMessage(string[] args)
29    {
30        return ((args.Length > 0)
31               ? string.Join(" ", args)
32               : "info: Hello World!");
33    }
34}
35

Subcribe


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
1using System;
2using RabbitMQ.Client;
3using RabbitMQ.Client.Events;
4using System.Text;
5
6class ReceiveLogs
7{
8    public static void Main()
9    {
10        var factory = new ConnectionFactory() { HostName = "localhost" };
11        using(var connection = factory.CreateConnection())
12        using(var channel = connection.CreateModel())
13        {
14            channel.ExchangeDeclare(exchange: "logs", type: "fanout");
15
16            var queueName = channel.QueueDeclare().QueueName;
17            channel.QueueBind(queue: queueName,
18                              exchange: "logs",
19                              routingKey: "");
20
21            Console.WriteLine(" [*] Waiting for logs.");
22
23            var consumer = new EventingBasicConsumer(channel);
24            consumer.Received += (model, ea) =>
25            {
26                var body = ea.Body;
27                var message = Encoding.UTF8.GetString(body);
28                Console.WriteLine(" [x] {0}", message);
29            };
30            channel.BasicConsume(queue: queueName,
31                                 autoAck: true,
32                                 consumer: consumer);
33
34            Console.WriteLine(" Press [enter] to exit.");
35            Console.ReadLine();
36        }
37    }
38}
39

给TA打赏
共{{data.count}}人
人已打赏
安全网络

CDN安全市场到2022年价值76.3亿美元

2018-2-1 18:02:50

安全运维

DevOps基础-3.2-DevOps组件:精益

2021-10-12 11:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索