更多资料参考 http://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html
一、
简单的Helloworld
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62 1 #region pulisher
2
3{
4 var factory = new ConnectionFactory() { HostName = "localhost", UserName = "jesen", Password = "jesen", Port = 5672 };
5 using (var connection = factory.CreateConnection())
6 {
7 while (true)
8 {
9 var sendMessage = Console.ReadLine();
10 if (sendMessage.Equals("exit")) return;
11 using (var channel = connection.CreateModel())
12 {
13 channel.QueueDeclare(queue: "helloworld", durable: false, exclusive: false, autoDelete: false, arguments: null); //声明一个队列
14
15 string message = sendMessage;
16 var body = Encoding.UTF8.GetBytes(message);
17 channel.BasicPublish(exchange: "", routingKey: "helloworld", basicProperties: null, body: body);
18
19 Console.WriteLine($"send message : {message}");
20 }
21 }
22 }
23}
24
25 #endregion
26
27#region consumer
28{
29 var factory = new ConnectionFactory() { HostName = "localhost", UserName = "jesen", Password = "jesen", Port = 5672 };
30
31 using (var connection = factory.CreateConnection())
32 {
33 using (var channel = connection.CreateModel())
34 {
35 channel.QueueDeclare(queue: "helloworld",
36 durable: false,
37 exclusive: false,
38 autoDelete: false,
39 arguments: null);
40
41 var consumer = new EventingBasicConsumer(channel);
42
43 consumer.Received += (model, ea) =>
44 {
45 var body = ea.Body;
46 var message = Encoding.UTF8.GetString(body);
47 Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId.ToString("00") } : Received Message {message}");
48 };
49
50 channel.BasicConsume(queue: "helloworld",
51 autoAck: true,
52 consumer: consumer);
53
54 Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId.ToString("00") } : Press [enter] to exit.");
55
56 Console.ReadLine();
57 }
58 }
59}
60
61#endregion
62
二、
Work Queue 在多个workers中分发耗时的任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63 1//publisher
2{
3 var factory = new ConnectionFactory() { HostName = "localhost", UserName = "jesen", Password = "jesen", Port = 5672 };
4 using (var connection = factory.CreateConnection())
5 {
6 while (true)
7 {
8 var sendMessage = Console.ReadLine(); //命令行已空格隔开,加...模拟将要耗时的时间
9 if (sendMessage.Equals("exit")) return;
10 using (var channel = connection.CreateModel())
11 {
12 var message = sendMessage;
13 var body = Encoding.UTF8.GetBytes(message);
14
15 var properties = channel.CreateBasicProperties();
16 properties.Persistent = true;
17
18 channel.BasicPublish(exchange: "",
19 routingKey: "task_queue",
20 basicProperties: properties,
21 body: body);
22
23 Console.WriteLine($"send message : {message}");
24 }
25 }
26
27 }
28}
29
30//consumer
31{
32 //var factory = new ConnectionFactory() { HostName = "localhost", UserName = "jesen", Password = "jesen", Port = 5672 };
33
34 using (var connection = factory.CreateConnection())
35 {
36 using (var channel = connection.CreateModel())
37 {
38 channel.QueueDeclare(queue: "task_queue",
39 durable: false,
40 exclusive: false,
41 autoDelete: false,
42 arguments: null);
43
44 var consumer = new EventingBasicConsumer(channel);
45
46 consumer.Received += (model, ea) =>
47 {
48 var body = ea.Body;
49 var message = Encoding.UTF8.GetString(body);
50 Console.WriteLine(" [x] Received {0}", message);
51
52 int dots = message.Split('.').Length - 1; //模拟耗时任务
53 Thread.Sleep(dots * 1000);
54
55 Console.WriteLine(" [x] Done");
56 };
57
58 channel.BasicConsume(queue: "task_queue", autoAck: true, consumer: consumer);
59 Console.ReadLine();
60 }
61 }
62}
63
打开两个Consumer端,查看结果,可以看到分发是按循环分发的,即给一个consumer1,接下来给consumer2,再给consumer1,再给consumer2 ….这样如果consumber1处理耗时很长的任务,而consumer2处理很快,就得等consumer处理完,才能获取到下个task,而且如果有一个挂掉了,rabbitmq发给消费折后就把队列的消息删除了,因此消息也丢失了。很不合适业务
为了解决上述问题,rabbitmq引入了消息确认机制 Message acknowledgment
修改上面消费者的代码,可以确保即使工作中的消费者挂掉了,消息也不会丢失,可以继续分发,消息确认回复必须是在同一个channel里面,在不同的channel回复确认会抛异常
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 1var consumer = new EventingBasicConsumer(channel);
2consumer.Received += (model, ea) =>
3{
4 var body = ea.Body;
5 var message = Encoding.UTF8.GetString(body);
6 Console.WriteLine(" [x] Received {0}", message);
7
8 int dots = message.Split('.').Length - 1;
9 Thread.Sleep(dots * 1000);
10
11 Console.WriteLine(" [x] Done");
12
13 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
14};
15channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer);
16
虽然上面的确认机制可以消费者挂掉不丢失数据,可是如果rabbitmq服务器挂掉了呢?
rabbitmq提供了消息持久化机制,Message Durablity
rabbitmq不允许重复声明已经存在的队列,因此设置的时候必须是第一次声明队列的时候
1
2
3
4
5
6
7
8
9
10
11 1//确保rabbitmq服务器挂掉不丢失数据
2channel.QueueDeclare(queue: "task_queue",
3 durable: true,
4 exclusive: false,
5 autoDelete: false,
6 arguments: null);
7
8//确保rabbitmq服务器重启不丢失数据
9var properties = channel.CreateBasicProperties();
10properties.Persistent = true;
11
合理的分发 Fair Dispatch
1
2
3 1//下面的代码告诉rabbitmq一次只给同一个worker一条消息,也就是说要等到接收到确认消息时再分发消息给worker,相反的,将消息分发给下一个处于空闲的worker进行处理。
2channel.BasicQos(0, 1, false);
3