C# 操作rabbitmq(一)

释放双眼,带上耳机,听听看~!

更多资料参考 http://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html

一、
简单的Helloworld


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
1 #region pulisher
2
3{
4      var factory = new ConnectionFactory() { HostName = "localhost", UserName = "jesen", Password = "jesen", Port = 5672 };
5      using (var connection = factory.CreateConnection())
6      {
7          while (true)
8          {
9              var sendMessage = Console.ReadLine();
10              if (sendMessage.Equals("exit")) return;
11              using (var channel = connection.CreateModel())
12              {
13                  channel.QueueDeclare(queue: "helloworld", durable: false, exclusive: false, autoDelete: false, arguments: null); //声明一个队列
14
15                  string message = sendMessage;
16                  var body = Encoding.UTF8.GetBytes(message);
17                  channel.BasicPublish(exchange: "", routingKey: "helloworld", basicProperties: null, body: body);
18
19                  Console.WriteLine($"send message : {message}");
20              }
21        }            
22    }
23}
24
25 #endregion
26
27#region consumer
28{
29    var factory = new ConnectionFactory() { HostName = "localhost", UserName = "jesen", Password = "jesen", Port = 5672 };
30
31    using (var connection = factory.CreateConnection())
32    {
33          using (var channel = connection.CreateModel())
34          {
35              channel.QueueDeclare(queue: "helloworld",
36                                   durable: false,
37                                   exclusive: false,
38                                   autoDelete: false,
39                                   arguments: null);
40
41              var consumer = new EventingBasicConsumer(channel);
42
43              consumer.Received += (model, ea) =>
44              {
45                   var body = ea.Body;
46                   var message = Encoding.UTF8.GetString(body);
47                   Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId.ToString("00") } : Received Message {message}");
48              };
49
50                   channel.BasicConsume(queue: "helloworld",
51                                        autoAck: true,
52                                        consumer: consumer);
53
54                   Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId.ToString("00") } : Press [enter] to exit.");
55
56                   Console.ReadLine();
57           }
58     }
59}
60
61#endregion
62

二、
Work Queue 在多个workers中分发耗时的任务


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
1//publisher
2{
3    var factory = new ConnectionFactory() { HostName = "localhost", UserName = "jesen", Password = "jesen", Port = 5672 };
4     using (var connection = factory.CreateConnection())
5     {
6         while (true)
7         {
8             var sendMessage = Console.ReadLine(); //命令行已空格隔开,加...模拟将要耗时的时间
9             if (sendMessage.Equals("exit")) return;
10             using (var channel = connection.CreateModel())
11             {
12                 var message = sendMessage;
13                 var body = Encoding.UTF8.GetBytes(message);
14
15                 var properties = channel.CreateBasicProperties();
16                 properties.Persistent = true;
17
18                 channel.BasicPublish(exchange: "",
19                                      routingKey: "task_queue",
20                                      basicProperties: properties,
21                                      body: body);
22
23                 Console.WriteLine($"send message : {message}");
24             }
25         }
26
27     }
28}
29
30//consumer
31{
32 //var factory = new ConnectionFactory() { HostName = "localhost", UserName = "jesen", Password = "jesen", Port = 5672 };
33
34     using (var connection = factory.CreateConnection())
35     {
36         using (var channel = connection.CreateModel())
37         {
38             channel.QueueDeclare(queue: "task_queue",
39                                  durable: false,
40                                  exclusive: false,
41                                  autoDelete: false,
42                                  arguments: null);
43
44             var consumer = new EventingBasicConsumer(channel);
45
46             consumer.Received += (model, ea) =>
47             {
48                 var body = ea.Body;
49                 var message = Encoding.UTF8.GetString(body);
50                 Console.WriteLine(" [x] Received {0}", message);
51
52                 int dots = message.Split('.').Length - 1;  //模拟耗时任务
53                 Thread.Sleep(dots * 1000);
54
55                 Console.WriteLine(" [x] Done");
56             };
57
58             channel.BasicConsume(queue: "task_queue", autoAck: true, consumer: consumer);
59             Console.ReadLine();
60         }
61     }
62}
63

打开两个Consumer端,查看结果,可以看到分发是按循环分发的,即给一个consumer1,接下来给consumer2,再给consumer1,再给consumer2 ….这样如果consumber1处理耗时很长的任务,而consumer2处理很快,就得等consumer处理完,才能获取到下个task,而且如果有一个挂掉了,rabbitmq发给消费折后就把队列的消息删除了,因此消息也丢失了。很不合适业务

为了解决上述问题,rabbitmq引入了消息确认机制 Message acknowledgment
修改上面消费者的代码,可以确保即使工作中的消费者挂掉了,消息也不会丢失,可以继续分发,消息确认回复必须是在同一个channel里面,在不同的channel回复确认会抛异常


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
1var consumer = new EventingBasicConsumer(channel);
2consumer.Received += (model, ea) =>
3{
4    var body = ea.Body;
5    var message = Encoding.UTF8.GetString(body);
6    Console.WriteLine(" [x] Received {0}", message);
7
8    int dots = message.Split('.').Length - 1;
9    Thread.Sleep(dots * 1000);
10
11    Console.WriteLine(" [x] Done");
12
13    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
14};
15channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer);
16

虽然上面的确认机制可以消费者挂掉不丢失数据,可是如果rabbitmq服务器挂掉了呢?

rabbitmq提供了消息持久化机制,Message Durablity
rabbitmq不允许重复声明已经存在的队列,因此设置的时候必须是第一次声明队列的时候


1
2
3
4
5
6
7
8
9
10
11
1//确保rabbitmq服务器挂掉不丢失数据
2channel.QueueDeclare(queue: "task_queue",
3                     durable: true,
4                     exclusive: false,
5                     autoDelete: false,
6                     arguments: null);
7
8//确保rabbitmq服务器重启不丢失数据
9var properties = channel.CreateBasicProperties();
10properties.Persistent = true;
11

合理的分发 Fair Dispatch


1
2
3
1//下面的代码告诉rabbitmq一次只给同一个worker一条消息,也就是说要等到接收到确认消息时再分发消息给worker,相反的,将消息分发给下一个处于空闲的worker进行处理。
2channel.BasicQos(0, 1, false);
3

给TA打赏
共{{data.count}}人
人已打赏
安全网络

CDN安全市场到2022年价值76.3亿美元

2018-2-1 18:02:50

安全资讯

股价暴增好几倍!NVIDIA逆袭背后:老黄太牛了

2016-12-26 22:16:37

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索