_Tip:_此篇已加入.NET Core微服务基础系列文章索引
一、消息队列与RabbitMQ
1.1 消息队列
“消息”是在两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串;也可以更复杂,可能包含嵌入对象。消息被发送到队列中,“消息队列”是在消息的传输过程中保存消息的容器。
消息队列(Message Queue),是分布式系统中重要的组件,其通用的使用场景可以简单地描述为:
当不需要立即获得结果,但是并发量又需要进行控制的时候,差不多就是需要使用消息队列的时候。
消息队列主要解决了应用耦合、异步处理、流量削锋等问题。当前使用较多的消息队列有RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等,而部分数据库如Redis、Mysql以及phxsql也可实现消息队列的功能。更多详细内容请参考:《消息队列及其应用场景介绍》
我也在前几年写过一篇基于Redis做消息队列的文章,对消息队列的一个应用场景做了介绍,没有了解过的童鞋可以看看。
1.2 RabbitMQ
RabbitMQ是一款基于AMQP(高级消息队列协议),由Erlang开发的开源消息队列组件。是一款优秀的消息队列组件,他由两部分组成:服务端和客户端,客户端支持多种语言的驱动,如:.Net、JAVA、Erlang等。
网上有很多性能比较的文章,例如在1百万条1k的消息下,每秒种的收发情况如下图所示:
这里不过多介绍RabbitMQ,有关RabbitMQ的一些需要了解的概念你可以通过下面的文章了解:
颜圣杰,《RabbitMQ知多少》
如果你想了解RabbitMQ与Kafka的对比,可以阅读这篇文章:《开源软件成熟度评测报告-分布式消息中间件》
而EasyNetQ呢,它是一款基于RabbitMQ.Client封装的API库,正如其名,使用起来比较Easy,它把原RabbitMQ.Client中的很多操作都进行了再次封装,让开发人员减少了很多工作量。
二、RabbitMQ的安装
2.1 Linux下的安装
这里不演示如何在Linux下安装,但推荐生产环境使用Linux,下面是一些参考资料:
mcgrady,《Linux下RabbitMQ的安装》
晓晨Master,《.NET Core使用RabbitMQ》
牛头人,《Linux安装RabbitMQ》
一只猪儿虫,《RabbitMQ Linux安装》
2.2 Windows下的安装
开发环境下,我一般使用Windows Server虚拟机,所以这里说明下如何在Windows下安装:
(1)下载Erlang和RabbitMQ (这里我选则的并非最新版本,而是etp20.3和rabbitmq3.7.5)
(2)首先安装Erlang,然后添加环境变量(如果添加了,则skip这一步)并加到PATH中
(3)其次安装RabbitMQ,一路Next,安装完成后也为其添加环境变量并添加到PATH中
(4)检查是否安装成功:rabbitmqctl status
这里我碰到了如下的错误:
解决方法:
更正erlang.cookie文件,详情请参考:https://blog.csdn.net/u012637358/article/details/80078610
最终状态:
检查Windows服务,发现已经自动注册了一个服务:
(5)激活Web管理插件,然后检查是否可见(http://127.0.0.1:15672)
2.3 一些必要的配置
(1)使用默认账号:guest/guest登录进去,添加一个新用户(Administrator权限),并设置其Permission
(2)添加新的虚拟机(默认为/,这里我添加一个名为EDCVHOST的虚拟机)
(3)绑定新添加的用户到新的虚拟机上,接下来在我们的程序中就主要使用admin这个用户和EDCVHOST这个虚拟机
*.当然,为了安全考虑,你也可以把guest用户remove掉
三、Quick Start:第一个消息队列
3.1 项目准备
这里为了快速的演示如何使用EasyNetQ,我们来一个QuickStart,准备三个项目:两个Console程序和一个Class Library。
其中,对Publisher和Subscriber项目安装EasyNetQ:
NuGet>Install-Package EasyNetQ
针对Messages类库,新增一个class如下:
1
2
3
4
5 1 public class TextMessage
2 {
3 public string Text { get; set; }
4 }
5
3.2 我是Publisher
添加以下代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 1 public class Program
2 {
3 public static void Main(string[] args)
4 {
5 var connStr = "host=192.168.80.71;virtualHost=EDCVHOST;username=admin;password=edison";
6
7 using (var bus = RabbitHutch.CreateBus(connStr))
8 {
9 var input = "";
10 Console.WriteLine("Please enter a message. 'Quit' to quit.");
11 while ((input = Console.ReadLine()) != "Quit")
12 {
13 bus.Publish(new TextMessage
14 {
15 Text = input
16 });
17 }
18 }
19 }
20 }
21
可以看到,我们在其中使用EasyNetQ高度封装的接口创建了一个IBus接口的实例,通过这个IBus实例我们可以通过一个超级Easy的Publish接口进行发布消息。这里主要是读取用户在控制台中输入的消息字符串进行发送。实际中,发送的一般都是一个或多个复杂的实体对象。
3.3 我是Subscriber
添加如下所示代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 1 public class Program
2 {
3 public static void Main(string[] args)
4 {
5 var connStr = "host=192.168.80.71;virtualHost=EDCVHOST;username=admin;password=edison";
6
7 using (var bus = RabbitHutch.CreateBus(connStr))
8 {
9 bus.Subscribe<TextMessage>("my_test_subscriptionid", HandleTextMessage);
10
11 Console.WriteLine("Listening for messages. Hit <return> to quit.");
12 Console.ReadLine();
13 }
14 }
15
16 public static void HandleTextMessage(TextMessage textMessage)
17 {
18 Console.ForegroundColor = ConsoleColor.Red;
19 Console.WriteLine("Got message: {0}", textMessage.Text);
20 Console.ResetColor();
21 }
22 }
23
这里主要是通过IBus实例去订阅消息(这里是除非用户关闭程序否则一直处于监听状态),当发布者发布了指定类型的消息之后,这里就把它打印出来(红色字体显示)。
3.4 简单测试
通过控制台信息查看结果:
通过RabbitMQ管理界面查看:
(1)通过Connections Tab可以发现我们的两个客户端都在Running中
(2)通过Queues Tab查看目前已有的队列=>可以看到目前我们只注册了一个队列
四、在ASP.NET Core中的使用
4.1 案例结构与说明
这里假设有这样一个场景,客户通过浏览器提交了一个保单,这个保单中包含一些客户信息,ClientService将这些信息处理后发送一个消息到RabbitMQ中,NoticeService和ZAPEngineService订阅了这个消息。NoticeService会将客户信息取出来并获取一些更多信息为客户发送Email,而ZAPEngineService则会根据客户的一些关键信息(比如:年龄,是否吸烟,学历,年收入等等)去数据库读取一些规则来生成一份Question List并存入数据库。
4.2 项目准备工作
创建上面提到的这几个项目,这里我选择ASP.NET Core WebAPI类型。
分别为这几个项目通过NuGet安装EasyNetQ组件,并且通过以下代码注入统一的IBus实例对象:
1
2
3
4
5
6
7 1 public IServiceProvider ConfigureServices(IServiceCollection services)
2 {
3 // IoC - EventBus
4 services.AddSingleton(RabbitHutch.CreateBus(Configuration["MQ:Dev"]));
5 ......
6 }
7
这里我将连接字符串写到了配置文件中,请参考上面的QuickStart中的内容。
下面是这个demo用到的一个消息对象实体:通过标签声明队列名称。
1
2
3
4
5
6
7
8
9
10
11
12
13
14 1 [Queue("Qka.Client", ExchangeName = "Qka.Client")]
2 public class ClientMessage
3 {
4 public int ClientId { get; set; }
5 public string ClientName { get; set; }
6 public string Sex { get; set; }
7 public int Age { get; set; }
8 // N: Non-Smoker, S: Smoker
9 public string SmokerCode { get; set; }
10 // Bachelor, Master, Doctor
11 public string Education { get; set; }
12 public decimal YearIncome { get; set; }
13 }
14
此外,为了充分简化代码量,EasyNetQ提供了一个AutoSubscriber的方式,可以通过接口和标签快速地让一个类成为Consumer。详细内容参考:https://github.com/EasyNetQ/EasyNetQ/wiki/Auto-Subscriber
这里为了快速的在项目中使用Subscriber,添加一个扩展方法,它会从注入的服务中取出IBus实例对象,并自动帮我们进行Subscriber(那些实现了IConsume接口的类)的注册。具体用法见后面的介绍。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 1 public static class AppBuilderExtension
2 {
3 public static IApplicationBuilder UseSubscribe(this IApplicationBuilder appBuilder, string subscriptionIdPrefix, Assembly assembly)
4 {
5 var services = appBuilder.ApplicationServices.CreateScope().ServiceProvider;
6
7 var lifeTime = services.GetService<IApplicationLifetime>();
8 var bus = services.GetService<IBus>();
9 lifeTime.ApplicationStarted.Register(() =>
10 {
11 var subscriber = new AutoSubscriber(bus, subscriptionIdPrefix);
12 subscriber.Subscribe(assembly);
13 subscriber.SubscribeAsync(assembly);
14 });
15
16 lifeTime.ApplicationStopped.Register(() => bus.Dispose());
17
18 return appBuilder;
19 }
20 }
21
4.3 Publisher:ClientService
ClientService作为消费者,这里假设我们在API中处理完业务代码后,将message发布给RabbitMQ:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37 1 [Produces("application/json")]
2 [Route("api/Client")]
3 public class ClientController : Controller
4 {
5 private readonly IClientService clientService;
6 private readonly IBus bus;
7
8 public ClientController(IClientService _clientService, IBus _bus)
9 {
10 clientService = _clientService;
11 bus = _bus;
12 }
13
14 ......
15
16 [HttpPost]
17 public async Task<string> Post([FromBody]ClientDTO clientDto)
18 {
19 // Business Logic here...
20 // eg.Add new client to your service databases via EF
21 // Sample Publish
22 ClientMessage message = new ClientMessage
23 {
24 ClientId = clientDto.Id.Value,
25 ClientName = clientDto.Name,
26 Sex = clientDto.Sex,
27 Age = 29,
28 SmokerCode = "N",
29 Education = "Master",
30 YearIncome = 100000
31 };
32 await bus.PublishAsync(message);
33
34 return "Add Client Success! You will receive some letter later.";
35 }
36 }
37
当然,你可以使用同步方法:bus.Publish(message);
4.4 Subscriber: NoticeService & ZAPEngineService
(1)NoticeService:新增一个实现IConsume接口的Consumer类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 1 public class ClientMessageConsumer: IConsumeAsync<ClientMessage>
2 {
3 [AutoSubscriberConsumer(SubscriptionId = "ClientMessageService.Notice")]
4 public Task ConsumeAsync(ClientMessage message)
5 {
6 // Your business logic code here
7 // eg.Build one email to client via SMTP service
8 // Sample console code
9 System.Console.ForegroundColor = System.ConsoleColor.Red;
10 System.Console.WriteLine("Consume one message from RabbitMQ : {0}, I will send one email to client.", message.ClientName);
11 System.Console.ResetColor();
12
13 return Task.CompletedTask;
14 }
15 }
16
这里为了演示效果,增加了一些输出信息的代码,下面的ZAPEngineService也是一样,不再赘述。
(2)ZAPEngineService:新增一个实现IConsume接口的Consumer类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 1 public class ClientMessageConsumer : IConsumeAsync<ClientMessage>
2 {
3 [AutoSubscriberConsumer(SubscriptionId = "ClientMessageService.ZapQuestion")]
4 public Task ConsumeAsync(ClientMessage message)
5 {
6 // Your business logic code here
7 // eg.Generate one ZAP question records into database and send to client
8 // Sample console code
9 System.Console.ForegroundColor = System.ConsoleColor.Red;
10 System.Console.WriteLine("Consume one message from RabbitMQ : {0}, I will generate one ZAP question list to client", message.ClientName);
11 System.Console.ResetColor();
12
13 return Task.CompletedTask;
14 }
15 }
16
注意两个Consumer的SubscriptionId不能一样,否则无法接受到消息。
(3)为两个Consumer使用扩展方法:UseSubscribe
1
2
3
4
5
6
7
8 1 public void Configure(IApplicationBuilder app, IHostingEnvironment env, IApplicationLifetime lifetime)
2 {
3 ......
4
5 // easyNetQ
6 app.UseSubscribe("ClientMessageService", Assembly.GetExecutingAssembly());
7 }
8
4.5 简单测试
(1)借助Postman向ClientService发起Post请求
(2)查看NoticeService的日志信息
(3)查看ZAPEngineService的日志信息
(4)查看RabbitMQ的管理控制台:
五、小结
本篇超级简单地介绍了一下消息队列与RabbitMQ,通过使用EasyNetQ这个基于RabbitMQ.Client的客户端做了一个QuickStart演示了在.NET Core环境下如何进行消息的发布与订阅,并通过一个微服务的小案例演示了如何在ASP.NET Core环境下如何基于EasyNetQ完成消息的发布与订阅,看起来就像一个类似于简单的事件总线。当然,本篇的内容都十分基础,如果要应用好RabbitMQ,还得把那些基础概念(如:Channel,Exchange等)弄清楚,然后去理解一下事件总线的概念,实际中还得考虑数据一致性等等,路途漫漫,继续加油吧!
示例代码
Click Here => 点我下载
参考资料
EasyNetQ官方文档:https://github.com/EasyNetQ/EasyNetQ/wiki/Introduction
focus-lei,《.net core使用EasyNetQ做EventBus》
常山造纸农,《RabbitMQ安装配置和基于EasyNetQ驱动的基础使用》