ASP.NET Core微服务之基于MassTransit实现数据最终一致性(1)

释放双眼,带上耳机,听听看~!

Tip: 此篇已加入.NET Core微服务基础系列文章索引

一、预备知识:数据一致性

关于数据一致性的文章,园子里已经有很多了,如果你还不了解,那么可以通过以下的几篇文章去快速地了解了解,有个感性认识即可。

(1)左正,《保证分布式系统数据一致性的6种方案》

(2)成金之路,《分布式系统的数据一致性解决方案》

(3)E_Star,《分布式环境下数据一致性的设计总结》

(4)Itegel,《分布式事务?No,最终一致性》

必须要了解的点:ACID、CAP、BASE、强一致性、弱一致性、最终一致性

CAP理论由加州大学伯克利分校的计算机教授Eric Brewer在2000年提出,其核心思想是任何基于网络的数据共享系统最多只能满足数据一致性(Consistency)、可用性(Availability)和网络分区容忍(Partition Tolerance)三个特性中的两个(由此我们知道在分布式系统中,同时满足CAP三个特性是不可能的),三个特性的定义如下:

C:数据一致性(Consistency):如果系统对一个写操作返回成功,那么之后的读请求都必须读到这个新数据;如果返回失败,那么所有读操作都不能读到这个数据,对调用者而言数据具有强一致性(Strong Consistency)(又叫原子性Atomic或线性一致性Linerizable Consistency)
A:服务可用性(Availability):所有读写请求在一定时间内得到响应,可终止、不会一直等待
P:分区容错性(Partition-Tolerance):在网络分区的情况下,被分隔的节点仍能正常对外服务  

  • 强一致性:当更新操作完成之后,任何多个后续进程或者线程的访问都会返回最新的更新过的值。这种是对用户最友好的,就是用户上一次写什么,下一次就保证能读到什么。根据 CAP 理论,这种实现需要牺牲可用性。=> 在传统单体式应用中,大部分都是强一致性的应用,想想我们写过多少工作单元模式的Code?
  • 弱一致性:系统并不保证续进程或者线程的访问都会返回最新的更新过的值。系统在数据写入成功之后,不承诺立即可以读到最新写入的值,也不会具体的承诺多久之后可以读到。
  • 最终一致性:弱一致性的特定形式。系统保证在没有后续更新的前提下,系统最终返回上一次更新操作的值。在没有故障发生的前提下,不一致窗口的时间主要受通信延迟,系统负载和复制副本的个数影响。

为保证可用性,互联网分布式架构中经常将强一致性需求转换成最终一致性的需求,并通过系统执行幂等性的保证,保证数据的最终一致性

在微服务架构中,各个微服务之间通常会使用事件驱动通信和发布订阅系统实现最终一致性。

更多背景知识,还是得看上面列出的参考文章,这里不再赘述。

二、MassTransit极简介绍

MassTransit 是一个自由、开源、轻量级的消息总线, 用于使用. NET 框架创建分布式应用程序。MassTransit 在现有消息传输上提供了一组广泛的功能, 从而使开发人员能够友好地使用基于消息的会话模式异步连接服务。基于消息的通信是实现面向服务的体系结构的可靠和可扩展的方式。

官网地址:http://masstransit-project.com/,GitHub地址:https://github.com/MassTransit/MassTransit (目前:1590Star,719Fork)

类似的国外开源组件还有NServiceBus,没有用过,据说MassTransit比NServiceBus更加轻量级,并且在开发之初就选用了RabbitMQ作为消息传输组件,当然MassTransit还支持Azure Service Bus。类似的国内开源组件则有园友savorboard(杨晓东)的CAP,这个我会在MassTransit学习结束后去使用使用,CAP在GitHub上已经有了超过1000个Star,是NCC的几个千星项目之一。另外,张善友大队长在他的NanoFabric项目中推荐我们使用Rebus和Ray,如下图所示:

由于时间和精力,以及文档资料的可见性,我在我的POC和这个系列博文的准备中,只会使用到MassTransit和CAP这两个开源项目。

三、MassTransit Quick Start

这里以MassTransit + RabbitMQ为例子,首先请确保安装了RabbitMQ,如果没有安装,可以阅读我的《基于EasyNetQ使用RabbitMQ消息队列》去把RabbitMQ先安装到你的电脑上。另外,RabbitMQ的背景知识也有一堆,有机会也还是要了解下Exchange,Channel、Queue等内容。

3.1 最简单的发送/接收实例

(1)准备两个控制台程序,一个为Sender(发送者),一个为Receiver(接收者),并分别通过NuGet安装MassTransit以及MassTransit.RabbitMQ

NuGet>Install-Package MassTransit
NuGet>Install-Package MassTransit.RabbitMQ  

(2)编写Sender


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
1    public class Program
2    {
3        public static void Main(string[] args)
4        {
5            Console.Title = "MassTransit Client";
6
7            var bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
8            {
9                var host = cfg.Host(new Uri("rabbitmq://192.168.80.71/EDCVHOST"), hst =>
10                {
11                    hst.Username("admin");
12                    hst.Password("edison");
13                });
14            });
15
16            var uri = new Uri("rabbitmq://192.168.80.71/EDCVHOST/Qka.MassTransitTest");
17            var message = Console.ReadLine();
18
19            while (message != null)
20            {
21                Task.Run(() => SendCommand(bus, uri, message)).Wait();
22                message = Console.ReadLine();
23            }
24
25            Console.ReadKey();
26        }
27
28        private static async void SendCommand(IBusControl bus, Uri sendToUri, string message)
29        {
30            var endPoint = await bus.GetSendEndpoint(sendToUri);
31            var command = new Client()
32            {
33                Id = 100001,
34                Name = "Edison Zhou",
35                Birthdate = DateTime.Now.AddYears(-18),
36                Message = message
37            };
38
39            await endPoint.Send(command);
40
41            Console.WriteLine($"You Sended : Id = {command.Id}, Name = {command.Name}, Message = {command.Message}");
42        }
43    }
44

这里首先连接到我的RabbitMQ服务,然后向指定的Queue发送消息(这里是一个Client类型的实例对象)。

(3)编写Receiver


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
1    public class Program
2    {
3        public static void Main(string[] args)
4        {
5            Console.Title = "MassTransit Server";
6
7            var bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
8            {
9                var host = cfg.Host(new Uri("rabbitmq://192.168.80.71/EDCVHOST"), hst =>
10                {
11                    hst.Username("admin");
12                    hst.Password("edison");
13                });
14
15                cfg.ReceiveEndpoint(host, "Qka.MassTransitTest", e=>
16                {
17                    e.Consumer<TestConsumerClient>();
18                    e.Consumer<TestConsumerAgent>();
19                });
20            });
21
22            bus.Start();
23            Console.WriteLine("Press any key to exit.");
24            Console.ReadKey();
25
26            bus.Stop();
27        }
28    }
29

对于Receiver,要做的事就只有两件:一是连接到RabbitMQ,二是告诉RabbitMQ我要接收哪个消息队列的什么类型的消息。下面是TestConsumerClient和TestConsumerAgent的定义:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
1    public class TestConsumerClient : IConsumer<Client>
2    {
3        public async Task Consume(ConsumeContext<Client> context)
4        {
5            Console.ForegroundColor = ConsoleColor.Red;
6            await Console.Out.WriteLineAsync($"Receive message: {context.Message.Id}, {context.Message.Name}, {context.Message.Birthdate.ToString()}");
7            Console.ResetColor();
8        }
9    }
10
11    public class Client
12    {
13        public int Id { get; set; }
14        public string Name { get; set; }
15        public DateTime Birthdate { get; set; }
16        public string Message { get; set; }
17    }
18
19    public class TestConsumerAgent : IConsumer<Agent>
20    {
21        public async Task Consume(ConsumeContext<Agent> context)
22        {
23            Console.ForegroundColor = ConsoleColor.Red;
24            await Console.Out.WriteLineAsync($"Receive message: {context.Message.AgentCode}, {context.Message.AgentName}, {context.Message.AgentRole}");
25            Console.ResetColor();
26        }
27    }
28
29    public class Agent
30    {
31        public int AgentCode { get; set; }
32        public string AgentName { get; set; }
33        public string AgentRole { get; set; }
34        public string Message { get; set; }
35    }
36

(4)测试一下:

3.2 最简单的发布/订阅实例

除了简单的发送/接收模式外,我们用的更多的是发布/订阅这种模式。

(1)准备下图所示的类库和控制台项目,并对除Messages类库之外的其他项目安装MassTransit以及MassTransit.RabbitMQ。

(2)Messages类库:准备需要传输的Message


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
1    public class TestBaseMessage
2    {
3        public string Name { get; set; }
4
5        public DateTime Time { get; set; }
6
7        public string Message { get; set; }
8    }
9
10    public class TestCustomMessage
11    {
12        public string Name { get; set; }
13
14        public DateTime Time { get; set; }
15
16        public int Age { get; set; }
17
18        public string Message { get; set; }
19    }
20

(3)Publisher:接收我的消息吧骚年们


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
1    public class Program
2    {
3        public static void Main(string[] args)
4        {
5            Console.Title = "MassTransit Publisher";
6
7            var bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
8            {
9                var host = cfg.Host(new Uri("rabbitmq://192.168.80.71/EDCVHOST/"), hst =>
10                {
11                    hst.Username("admin");
12                    hst.Password("edison");
13                });
14            });
15
16            do
17            {
18                Console.WriteLine("Please enter your message, if want to exit please press q.");
19                string message = Console.ReadLine();
20                if (message.ToLower().Equals("q"))
21                {
22                    break;
23                }
24
25                bus.Publish(new TestBaseMessage()
26                {
27                    Name = "Edison Zhou",
28                    Time = DateTime.Now,
29                    Message = message
30                });
31
32                bus.Publish(new TestCustomMessage()
33                {
34                    Name = "Leo Dai",
35                    Age = 27,
36                    Time = DateTime.Now,
37                    Message = message
38                });
39            } while (true);
40
41            bus.Stop();
42        }
43

这里向RabbitMQ发布了两个不同类型的消息(TestBaseMessage和TestCustomMessage)

(4)SubscriberA:我只接收TestBaseMessage类型的消息,其他的我不要


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
1    public class Program
2    {
3        public static void Main(string[] args)
4        {
5            Console.Title = "MassTransit SubscriberA";
6
7            var bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
8            {
9                var host = cfg.Host(new Uri("rabbitmq://192.168.80.71/EDCVHOST"), hst =>
10                {
11                    hst.Username("admin");
12                    hst.Password("edison");
13                });
14
15                cfg.ReceiveEndpoint(host, "Qka.MassTransitTestv2.CA", e =>
16                {
17                    e.Consumer<ConsumerA>();
18                });
19            });
20
21            bus.Start();
22            Console.ReadKey(); // press Enter to Stop
23            bus.Stop();
24        }
25    }
26

这里ConsumerA的定义如下:可以看出,它实现了一个泛型接口IConsumer,然后指定了TestBaseMessage为消费的消息类型。


1
2
3
4
5
6
7
8
9
10
1    public class ConsumerA : IConsumer<TestBaseMessage>
2    {
3        public async Task Consume(ConsumeContext<TestBaseMessage> context)
4        {
5            Console.ForegroundColor = ConsoleColor.Red;
6            await Console.Out.WriteLineAsync($"SubscriberA => ConsumerA received message : {context.Message.Name}, {context.Message.Time}, {context.Message.Message}, Type:{context.Message.GetType()}");
7            Console.ResetColor();
8        }
9    }
10

(5)SubscriberA:我只接收TestCustomMessage类型的消息,其他的我不要


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
1    public class Program
2    {
3        public static void Main(string[] args)
4        {
5            Console.Title = "MassTransit SubscriberB";
6
7            var bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
8            {
9                var host = cfg.Host(new Uri("rabbitmq://192.168.80.71/EDCVHOST"), hst =>
10                {
11                    hst.Username("admin");
12                    hst.Password("edison");
13                });
14
15                cfg.ReceiveEndpoint(host, "Qka.MassTransitTestv2.CB", e =>
16                {
17                    e.Consumer<ConsumerA>();
18                });
19            });
20
21            bus.Start();
22            Console.ReadKey(); // press Enter to Stop
23            bus.Stop();
24        }
25    }
26

这里的ConsumerA的定义如下;它实现的接口是IConsumer


1
2
3
4
5
6
7
8
9
10
1    public class ConsumerA : IConsumer<TestCustomMessage>
2    {
3        public async Task Consume(ConsumeContext<TestCustomMessage> context)
4        {
5            Console.ForegroundColor = ConsoleColor.Red;
6            await Console.Out.WriteLineAsync($"SubscriberB => ConsumerA received message : {context.Message.Name}, {context.Message.Time}, {context.Message.Message}, Age: {context.Message.Age}, Type:{context.Message.GetType()}");
7            Console.ResetColor();
8        }
9    }
10

(6)测试一下:由于Publisher发送了两个不同类型的消息,两个Subscriber均只接受其中的一个类型的消息。

3.3 带返回状态消息的示例

之前的例子都是发布之后,不管订阅者有没有收到以及收到后有没有处理成功(即有没有返回消息,类似于HTTP请求和响应),在MassTransit中提供了这样的一种模式,并且还可以结合GreenPipes的一些扩展方法实现重试、限流以及熔断机制。这一部分详见官方文档:http://masstransit-project.com/MassTransit/usage/request-response.html

(1)准备下图所示的三个项目:通过NuGet安装MassTransit以及MassTransit.RabbitMQ

(2)Messages:准备请求和响应的消息传输类型


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
1    public interface IRequestMessage
2    {
3        int MessageId { get; set; }
4        string Content { get; set; }
5    }
6
7    public class RequestMessage : IRequestMessage
8    {
9        public int MessageId { get; set; }
10        public string Content { get; set; }
11
12        public int RequestId { get; set; }
13    }
14
15    public interface IResponseMessage
16    {
17        int MessageCode { get; set; }
18        string Content { get; set; }
19    }
20
21    public class ResponseMessage : IResponseMessage
22    {
23        public int MessageCode { get; set; }
24        public string Content { get; set; }
25
26        public int RequestId { get; set; }
27    }
28

(3)Sender 请求发送端


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
1    public class Program
2    {
3        public static void Main(string[] args)
4        {
5            Console.Title = "Masstransit Request Side";
6
7            var bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
8            {
9                var host = cfg.Host(new Uri("rabbitmq://192.168.80.71/EDCVHOST"), hst =>
10                {
11                    hst.Username("admin");
12                    hst.Password("edison");
13                });
14                // Retry : 重试
15                cfg.UseRetry(ret =>
16                {
17                    ret.Interval(3, 10); // 消费失败后重试3次,每次间隔10s
18                });
19                // RateLimit : 限流
20                cfg.UseRateLimit(1000, TimeSpan.FromMinutes(1)); // 1分钟以内最多1000次调用访问
21                // CircuitBreaker : 熔断
22                cfg.UseCircuitBreaker(cb =>
23                {
24                    cb.TrackingPeriod = TimeSpan.FromMinutes(1);
25                    cb.TripThreshold = 15; // 当失败的比例至少达到15%才会启动熔断
26                    cb.ActiveThreshold = 10; // 当失败次数至少达到10次才会启动熔断
27                    cb.ResetInterval = TimeSpan.FromMinutes(5);
28                }); // 当在1分钟内消费失败率达到15%或调用了10次还是失败时,暂停5分钟的服务访问
29            });
30
31            bus.Start();
32
33            SendMessage(bus);
34
35            bus.Stop();
36        }
37
38        private static void SendMessage(IBusControl bus)
39        {
40            var mqAddress = new Uri($"rabbitmq://192.168.80.71/EDCVHOST/Qka.MassTransitTestv3");
41            var client = bus.CreateRequestClient<IRequestMessage, IResponseMessage>(mqAddress,
42                TimeSpan.FromHours(10)); // 创建请求客户端,10s之内木有回馈则认为是超时(Timeout)
43
44            do
45            {
46                Console.WriteLine("Press q to exit if you want.");
47                string value = Console.ReadLine();
48                if (value.ToLower().Equals("q"))
49                {
50                    break;
51                }
52
53                Task.Run(async () =>
54                {
55                    var request = new RequestMessage()
56                    {
57                        MessageId = 10001,
58                        Content = value,
59                        RequestId = 10001
60                    };
61
62                    var response = await client.Request(request);
63
64                    Console.WriteLine($"Request => MessageId={request.MessageId}, Content={request.Content}");
65                    Console.WriteLine($"Response => MessageCode={response.MessageCode}, Content={response.Content}");
66                }).Wait();
67            } while (true);
68        }
69    }
70

这里不再解释,请看注释。

(4)Receiver 接收端


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
1    public class Program
2    {
3        public static void Main(string[] args)
4        {
5            Console.Title = "MassTransit Response Side";
6
7            var bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
8            {
9                var host = cfg.Host(new Uri("rabbitmq://192.168.80.71/EDCVHOST"), hst =>
10                {
11                    hst.Username("admin");
12                    hst.Password("edison");
13                });
14
15                cfg.ReceiveEndpoint(host, "Qka.MassTransitTestv3", e =>
16                {
17                    e.Consumer<RequestConsumer>();
18                });
19            });
20
21            bus.Start();
22            Console.WriteLine("Press any key to exit.");
23            Console.ReadKey();
24
25            bus.Stop();
26        }
27    }
28

其中,RequestConsumer的定义如下:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
1    public class RequestConsumer : IConsumer<IRequestMessage>
2    {
3        public async Task Consume(ConsumeContext<IRequestMessage> context)
4        {
5            Console.ForegroundColor = ConsoleColor.Red;
6            await Console.Out.WriteLineAsync($"Received message: Id={context.Message.MessageId}, Content={context.Message.Content}");
7            Console.ResetColor();
8
9            var response = new ResponseMessage
10            {
11                MessageCode = 200,
12                Content = $"Success",
13                RequestId = context.Message.MessageId
14            };
15
16            Console.ForegroundColor = ConsoleColor.Green;
17            Console.WriteLine($"Response message: Code={response.MessageCode}, Content={response.Content}, RequestId={response.RequestId}");
18            Console.ResetColor();
19            await context.RespondAsync(response);
20        }
21    }
22

(5)测试一下:

可以看出,请求调用方收到了来自接收方返回的状态消息,我们可以借助返回值去check一些状态。这里不再演示发生异常从而启用重试、熔断等的示例,有兴趣的园友可以自行测试。

3.4 Observer模式的发布/订阅示例

   在某些场景中,我们需要针对一个消息进行类似于AoP(面向切面编程)或者监控的操作,比如在发送消息之前和结束后记日志等操作,我们可以借助MassTransit中的Observer模式来实现。(在MassTransit的消息接收中,可以通过两种模式来实现:一种是基于实现IConsumer接口,另一种就是基于实现IObserver接口)关于这一部分,详见官方文档:http://masstransit-project.com/MassTransit/usage/observers.html

(1)准备以下图所示的项目:

(2)Messages:定义要使用的Consumer和Observer

Consumer:


1
2
3
4
5
6
7
8
9
10
1    public class TestMessageConsumer : IConsumer<TestMessage>
2    {
3        public async Task Consume(ConsumeContext<TestMessage> context)
4        {
5            Console.ForegroundColor = ConsoleColor.Red;
6            await Console.Out.WriteLineAsync($"TestMessageConsumer => Type:{context.Message.GetType()}, ID:{context.Message.MessageId}, Content:{context.Message.Content}");
7            Console.ResetColor();
8        }
9    }
10

Observer:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
1    public class PublishObserver : IPublishObserver
2    {
3        public Task PrePublish<T>(PublishContext<T> context) where T : class
4        {
5            Console.WriteLine("------------------PrePublish--------------------");
6            var message = context.Message as TestMessage;
7            Console.WriteLine($"ID={message.MessageId}, Content={message.Content},Time={message.Time}");
8            Console.WriteLine("----------------------------------------------------");
9
10            return Task.CompletedTask;
11        }
12
13        public Task PostPublish<T>(PublishContext<T> context) where T : class
14        {
15            Console.WriteLine("------------------PostPublish--------------------");
16            var message = context.Message as TestMessage;
17            Console.WriteLine($"ID={message.MessageId}, Content={message.Content},Time={message.Time}");
18            Console.WriteLine("----------------------------------------------------");
19
20            return Task.CompletedTask;
21        }
22
23        public Task PublishFault<T>(PublishContext<T> context, Exception exception) where T : class
24        {
25            Console.WriteLine("------------------PublishFault--------------------");
26            var message = context.Message as TestMessage;
27            Console.WriteLine($"ID={message.MessageId}, Content={message.Content},Time={message.Time}");
28            Console.WriteLine("------------------------------------------------------");
29
30            return Task.CompletedTask;
31        }
32    }
33
34    public class SendObserver : ISendObserver
35    {
36        public Task PreSend<T>(SendContext<T> context) where T : class
37        {
38            Console.WriteLine("------------------PreSend--------------------");
39            var message = context.Message as TestMessage;
40            Console.WriteLine($"ID={message.MessageId}, Content={message.Content},Time={message.Time}");
41            Console.WriteLine("-------------------------------------------------");
42
43            return Task.CompletedTask;
44        }
45
46        public Task PostSend<T>(SendContext<T> context) where T : class
47        {
48            Console.WriteLine("------------------PostSend-------------------");
49            var message = context.Message as TestMessage;
50            Console.WriteLine($"ID={message.MessageId}, Content={message.Content},Time={message.Time}");
51            Console.WriteLine("-------------------------------------------------");
52
53            return Task.CompletedTask;
54        }
55
56        public Task SendFault<T>(SendContext<T> context, Exception exception) where T : class
57        {
58            Console.WriteLine("------------------SendFault-----------------");
59            var message = context.Message as TestMessage;
60            Console.WriteLine($"ID={message.MessageId}, Content={message.Content},Time={message.Time}");
61            Console.WriteLine("-------------------------------------------------");
62
63            return Task.CompletedTask;
64        }
65    }
66
67    public class ReceiveObserver : IReceiveObserver
68    {
69        public Task PreReceive(ReceiveContext context)
70        {
71            Console.WriteLine("------------------PreReceive--------------------");
72            Console.WriteLine(Encoding.Default.GetString(context.GetBody()));
73            Console.WriteLine("--------------------------------------");
74
75            return Task.CompletedTask;
76        }
77
78        public Task PostReceive(ReceiveContext context)
79        {
80            Console.WriteLine("------------------PostReceive--------------------");
81            Console.WriteLine(Encoding.Default.GetString(context.GetBody()));
82            Console.WriteLine("------------------------------------------------------");
83
84            return Task.CompletedTask;
85        }
86
87        public Task ReceiveFault(ReceiveContext context, Exception exception)
88        {
89            Console.WriteLine("------------------ReceiveFault--------------------");
90            Console.WriteLine(Encoding.Default.GetString(context.GetBody()));
91            Console.WriteLine("-------------------------------------------------------");
92
93            return Task.CompletedTask;
94        }
95
96        public Task PostConsume<T>(ConsumeContext<T> context, TimeSpan duration, string consumerType) where T : class
97        {
98            Console.WriteLine("------------------PostConsume--------------------");
99            var message = context.Message as TestMessage;
100            Console.WriteLine($"ID={message.MessageId}, Content={message.Content},Time={message.Time}");
101            Console.WriteLine("--------------------------------------------------------");
102
103            return Task.CompletedTask;
104        }
105
106        public Task ConsumeFault<T>(ConsumeContext<T> context, TimeSpan duration, string consumerType, Exception exception) where T : class
107        {
108            Console.WriteLine("------------------ConsumeFault-------------------");
109            var message = context.Message as TestMessage;
110            Console.WriteLine($"ID={message.MessageId}, Content={message.Content},Time={message.Time}");
111            Console.WriteLine("--------------------------------------------------------");
112
113            return Task.CompletedTask;
114        }
115    }
116

(3)ObserverPublish


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
1    public class Program
2    {
3        public static void Main(string[] args)
4        {
5            Console.Title = "Masstransit Observer Publisher";
6
7            var bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
8            {
9                var host = cfg.Host(new Uri("rabbitmq://192.168.80.71/EDCVHOST"), hst =>
10                {
11                    hst.Username("admin");
12                    hst.Password("edison");
13                });
14            });
15
16            var observer1 = new SendObserver();
17            var handle1 = bus.ConnectSendObserver(observer1);
18
19            var observer2 = new PublishObserver();
20            var handle2 = bus.ConnectPublishObserver(observer2);
21
22            bus.Start();
23
24            do
25            {
26                Console.WriteLine("Presss q if you want to exit this program.");
27                string message = Console.ReadLine();
28                if (message.ToLower().Equals("q"))
29                {
30                    break;
31                }
32
33                bus.Publish(new TestMessage
34                {
35                    MessageId = 10001,
36                    Content = message,
37                    Time = DateTime.Now
38                });
39            } while (true);
40
41            bus.Stop();
42        }
43    }
44

可以看到,这里我们使用了两个用于发送消息的Observer:SendObserver和PublishObserver

(3)ObserverReceive


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
1    public class Program
2    {
3        public static void Main(string[] args)
4        {
5            Console.Title = "Masstransit Observer Receiver";
6
7            var bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
8            {
9                var host = cfg.Host(new Uri("rabbitmq://192.168.80.71/EDCVHOST"), hst =>
10                {
11                    hst.Username("admin");
12                    hst.Password("edison");
13                });
14
15                cfg.ReceiveEndpoint(host, "Qka.MassTransitTestv4", e =>
16                {
17                    e.Consumer<TestMessageConsumer>();
18                });
19            });
20
21            var observer = new ReceiveObserver();
22            var handle = bus.ConnectReceiveObserver(observer);
23
24            bus.Start();
25            Console.ReadKey();
26            bus.Stop();
27        }
28    }
29

(4)测试一下:

由此看出,我们可以借助Observer这种方式来截取到消息的一个lifecycle,用以进行不同阶段的业务逻辑操作。

四、小结

本篇极简的介绍了一下数据一致性和MassTransit这个开源的组件,通过几个例子介绍了在.NET环境下如何使用MassTransit操作RabbitMQ实现消息的接收/发送以及发布/订阅。后续我会继续使用MassTransit结合Quartz.Net和Polly在ASP.NET Core环境下实现一个简单的基于补偿机制的数据一致性的小案例,希望到时也可以和各位园友分享。

示例代码

Click Here => 点我下载

参考资料

(1)桂素伟,《基于.NET Core的微服务》

(2)richieyangs(张阳),《如何优雅的使用RabbitMQ》,《使用Masstransit开发基于消息传递的分布式应用》

(3)青客宝团队,《MassTransit&Sagas分布式服务开发ppt分享》

(4)成天,《MassTransit实现应用程序间的交互》

(5)娃娃都会打酱油了,《MassTransit学习记录》

(6)MassTransit 官方文档,http://masstransit-project.com/MassTransit/

作者:周旭龙

出处:http://edisonchou.cnblogs.com

本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文链接。

给TA打赏
共{{data.count}}人
人已打赏
安全网络

CDN安全市场到2022年价值76.3亿美元

2018-2-1 18:02:50

安全经验

架构设计:负载均衡层设计方案(6)——Nginx + Keepalived构建高可用的负载层

2021-11-28 16:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索