net core 使用 rabbitmq

释放双眼,带上耳机,听听看~!

目录(?)[+]

  • 安装

  • 基本用法

  • RabbitMQ消费失败的处理

  • 使用RabbitMQ的Exchange

  • Direct Exchange

    • Fanout Exchange
    • Topic Exchange
  • 问题:

  • None of the specified endpoints were reachable

 

windows环境安装:
https://www.daimajiaoliu.com/series/rabbitmq_basic/479c68a9c100403
.NET Core 使用RabbitMQ
https://www.daimajiaoliu.com/daima/4871fea0e100400

安装

"C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.9\sbin\rabbitmq-plugins.bat" enable rabbitmq_management

net stop RabbitMQ && net start RabbitMQ

创建用户,密码,绑定角色

查看已有用户及用户的角色:
rabbitmqctl.bat list_users

新增一个用户:
rabbitmqctl.bat add_user username password
示例:
rabbitmqctl.bat add_user tangsansan 123456

rabbitmqctl.bat set_user_tags username administrator

示例:
rabbitmqctl.bat set_user_tags tangsansan administrator

基本用法

引入:RabbitMQ.Client

消费者


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
1//创建连接工厂
2            var factory = new ConnectionFactory()
3            {
4                UserName = "tangsansan",//用户名
5                Password = "123456",//密码
6                HostName = "localhost"//rabbitmq ip
7            };
8
9            //创建连接
10            var connection = factory.CreateConnection();
11            //创建通道
12            var channel = connection.CreateModel();
13            //定义一个队列
14            channel.QueueDeclare("hello", false, false, false, null);
15            Console.WriteLine("\nRabbitMQ连接成功,请输入消息,输入exit退出!");
16
17            string input;
18            do
19            {
20                input = Console.ReadLine();
21
22                var sendBytes = Encoding.UTF8.GetBytes(input);
23                //发布消息
24                channel.BasicPublish("", "hello", null, sendBytes);
25
26            } while (input.Trim().ToLower() != "exit");
27
28            channel.Close();
29            connection.Close();
30

生产者


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
1//创建连接工厂
2            var factory = new ConnectionFactory()
3            {
4                UserName = "tangsansan",//用户名
5                Password = "123456",//密码
6                HostName = "localhost"//rabbitmq ip
7            };
8
9            //创建连接
10            var connection = factory.CreateConnection();
11            //创建通道
12            var channel = connection.CreateModel();
13            //定义一个队列
14            channel.QueueDeclare("hello", false, false, false, null);
15            Console.WriteLine("\nRabbitMQ连接成功,请输入消息,输入exit退出!");
16
17            string input;
18            do
19            {
20                input = Console.ReadLine();
21
22                var sendBytes = Encoding.UTF8.GetBytes(input);
23                //发布消息
24                channel.BasicPublish("", "hello", null, sendBytes);
25
26            } while (input.Trim().ToLower() != "exit");
27
28            channel.Close();
29            connection.Close();
30

启动了一个生产者,两个消费者,可以看见两个消费者都能收到消息,消息投递到哪个消费者是由RabbitMQ决定的。
net core 使用 rabbitmq

RabbitMQ消费失败的处理

RabbitMQ采用消息应答机制,即消费者收到一个消息之后,需要发送一个应答,然后RabbitMQ才会将这个消息从队列中删除,如果消费者在消费过程中出现异常,断开连接切没有发送应答,那么RabbitMQ会将这个消息重新投递


1
2
3
4
5
6
7
8
9
10
11
12
13
14
1//接收到消息事件
2consumer.Received += (ch, ea) =>
3{
4    var message = Encoding.UTF8.GetString(ea.Body);
5
6    Console.WriteLine($"收到消息: {message}");
7
8    Console.WriteLine($"收到该消息[{ea.DeliveryTag}] 延迟10s发送回执");
9    Thread.Sleep(10000);
10    //确认该消息已被消费
11    channel.BasicAck(ea.DeliveryTag, false);
12    Console.WriteLine($"已发送回执[{ea.DeliveryTag}]");
13};
14

net core 使用 rabbitmq

使用RabbitMQ的Exchange

前面我们可以看到生产者将消息投递到Queue中,实际上这在RabbitMQ中这种事情永远都不会发生。实际的情况是,生产者将消息发送到Exchange(交换器),由Exchange将消息路由到一个或多个Queue中(或者丢弃)
net core 使用 rabbitmq

Direct Exchange

net core 使用 rabbitmq


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
1string exchangeName = "TestChange";
2            string queueName = "hello";
3            string routeKey = "helloRouteKey";
4
5            //创建连接工厂
6            var factory = new ConnectionFactory()
7            {
8                UserName = "tangsansan",//用户名
9                Password = "123456",//密码
10                HostName = "localhost"//rabbitmq ip
11            };
12
13            //创建连接
14            var connection = factory.CreateConnection();
15            //创建通道
16            var channel = connection.CreateModel();
17
18            //定义一个Direct类型交换机
19            channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, false, false, null);
20
21            //定义一个队列
22            channel.QueueDeclare(queueName, false, false, false, null);
23
24            //将队列绑定到交换机
25            channel.QueueBind(queueName, exchangeName, routeKey, null);
26
27           Console.WriteLine($"\nRabbitMQ连接成功,Exchange:{exchangeName},Queue:{queueName},Route:{routeKey},\n\n请输入消息,输入exit退出!");
28
29            string input;
30            do
31            {
32                input = Console.ReadLine();
33
34                var sendBytes = Encoding.UTF8.GetBytes(input);
35                //发布消息
36                channel.BasicPublish(exchangeName, routeKey, null, sendBytes);
37
38            } while (input.Trim().ToLower() != "exit");
39
40            channel.Close();
41            connection.Close();
42

net core 使用 rabbitmq

Fanout Exchange

net core 使用 rabbitmq
所有发送到Fanout Exchange的消息都会被转发到与该Exchange 绑定(Binding)的所有Queue上。

Fanout Exchange 不需要处理RouteKey 。只需要简单的将队列绑定到exchange 上。这样发送到exchange的消息都会被转发到与该交换机绑定的所有队列上。类似子网广播,每台子网内的主机都获得了一份复制的消息。

所以,Fanout Exchange 转发消息是最快的。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
1static void Main(string[] args)
2        {
3            string exchangeName = "TestFanoutChange";
4            string queueName1 = "hello1";
5            string queueName2 = "hello";
6            string routeKey = "";
7
8            //创建连接工厂
9            ConnectionFactory factory = new ConnectionFactory
10            {
11                UserName = "tangsansan",//用户名
12                Password = "123456",//密码
13                HostName = "localhost"//rabbitmq ip
14            };
15
16            //创建连接
17            var connection = factory.CreateConnection();
18            //创建通道
19            var channel = connection.CreateModel();
20
21            //定义一个Direct类型交换机
22            channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout, false, false, null);
23
24            //定义队列1
25            channel.QueueDeclare(queueName1, false, false, false, null);
26            //定义队列2
27            channel.QueueDeclare(queueName2, false, false, false, null);
28
29            //将队列绑定到交换机
30            channel.QueueBind(queueName1, exchangeName, routeKey, null);
31            channel.QueueBind(queueName2, exchangeName, routeKey, null);
32
33            //生成两个队列的消费者
34            ConsumerGenerator(queueName1);
35            ConsumerGenerator(queueName2);
36
37
38            Console.WriteLine($"\nRabbitMQ连接成功,\n\n请输入消息,输入exit退出!");
39
40            string input;
41            do
42            {
43                input = Console.ReadLine();
44
45                var sendBytes = Encoding.UTF8.GetBytes(input);
46                //发布消息
47                channel.BasicPublish(exchangeName, routeKey, null, sendBytes);
48
49            } while (input.Trim().ToLower() != "exit");
50            channel.Close();
51            connection.Close();
52        }
53
54        /// <summary>
55        /// 根据队列名称生成消费者
56        /// </summary>
57        /// <param name="queueName"></param>
58        static void ConsumerGenerator(string queueName)
59        {
60            //创建连接工厂
61            ConnectionFactory factory = new ConnectionFactory
62            {
63                UserName = "tangsansan",//用户名
64                Password = "123456",//密码
65                HostName = "localhost"//rabbitmq ip
66            };
67
68            //创建连接
69            var connection = factory.CreateConnection();
70            //创建通道
71            var channel = connection.CreateModel();
72
73            //事件基本消费者
74            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
75
76            //接收到消息事件
77            consumer.Received += (ch, ea) =>
78            {
79                var message = Encoding.UTF8.GetString(ea.Body);
80
81                Console.WriteLine($"Queue:{queueName}收到消息: {message}");
82                //确认该消息已被消费
83                channel.BasicAck(ea.DeliveryTag, false);
84            };
85            //启动消费者 设置为手动应答消息
86            channel.BasicConsume(queueName, false, consumer);
87            Console.WriteLine($"Queue:{queueName},消费者已启动");
88        }
89

Topic Exchange

net core 使用 rabbitmq
所有发送到Topic Exchange的消息被转发到能和Topic匹配的Queue上,

Exchange 将路由进行模糊匹配。可以使用通配符进行模糊匹配,符号“#”匹配一个或多个词,符号“”匹配不多不少一个词。因此“XiaoChen.#”能够匹配到“XiaoChen.pets.cat”,但是“XiaoChen.” 只会匹配到“XiaoChen.money”。

所以,Topic Exchange 使用非常灵活。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
1static void Main(string[] args)
2        {
3            string exchangeName = "TestTopicChange";
4            string queueName = "hello";
5            string routeKey = "TestRouteKey.*";
6
7            //创建连接工厂
8            ConnectionFactory factory = new ConnectionFactory
9            {
10                UserName = "tangsansan",//用户名
11                Password = "123456",//密码
12                HostName = "localhost"//rabbitmq ip
13            };
14
15            //创建连接
16            var connection = factory.CreateConnection();
17            //创建通道
18            var channel = connection.CreateModel();
19
20            //定义一个Direct类型交换机
21            channel.ExchangeDeclare(exchangeName, ExchangeType.Topic, false, false, null);
22
23            //定义队列1
24            channel.QueueDeclare(queueName, false, false, false, null);
25
26            //将队列绑定到交换机
27            channel.QueueBind(queueName, exchangeName, routeKey, null);
28
29
30
31            Console.WriteLine($"\nRabbitMQ连接成功,\n\n请输入消息,输入exit退出!");
32
33            string input;
34            do
35            {
36                input = Console.ReadLine();
37
38                var sendBytes = Encoding.UTF8.GetBytes(input);
39                //发布消息
40                channel.BasicPublish(exchangeName, "TestRouteKey.one", null, sendBytes);
41
42            } while (input.Trim().ToLower() != "exit");
43            channel.Close();
44            connection.Close();
45        }
46

问题:

None of the specified endpoints were reachable

这个异常在创建连接时抛出(CreateConnection()),原因一般是ConnectionFactory参数设置不对,比如HostName、UserName、Password
未设置VirtualHost的权限
设置方法:RabbitmqWeb管理网站–>Admin

 

给TA打赏
共{{data.count}}人
人已打赏
安全网络

CDN安全市场到2022年价值76.3亿美元

2018-2-1 18:02:50

安全运维

docker安装mysql

2021-12-19 17:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索