RabbitMQ从入门到精通

释放双眼,带上耳机,听听看~!

文章目录

  • Linux 部署运维

  • 1.安装erlang

    • 2.安装socat
    • 3.安装rabbit MQ
    • 4.查看rabbitmq状态
    • 5.rabbitmq启动
    • 6.rabbitmq的管理界面
    • 7.rabbitmq停止
    • 8.rabbitmq关闭
    • 9.rabbitmq重启
    • 10.配置文件位于/etc/rabbitmq
    • 11.用户管理
    • 12.集群
  • Client Demo-HelloWorld (JAVA)

  • 1.Create Maven Project And Import Rabbit MQ amqp-client

    • 2.Create Send Class
    • 3.Create Receive Class
  • Client Demo-HelloWorld(SpringBoot)

  • 1.Use Spring Initializr And Add RabbitMQ Dependency

    • 2.配置 application.yml
    • 3.编写启动类
    • 4.编写Sender
    • 5.编写Receiver
    • 6.编写Amqp配置类
  • Exchanges

  • 交换机类型

  • direct
    * topic
    * headers
    * fanout
    * 匿名队列
    * 绑定
    * Code

  • Notice

  • 报错 connection Refuse

    • 报错reply-code=406, reply-text=PRECONDITION_FAILED

安利HTML网页版观看,详细代码

Linux 部署运维

1.安装erlang

2.安装socat


1
2
3
1$ yum install socat
2
3

3.安装rabbit MQ


1
2
3
1$ rpm -ivh rabbitmq-server-3.7.5-1.el7.noarch.rpm
2
3

4.查看rabbitmq状态


1
2
3
4
5
1$ systemctl status rabbitmq-server.service
2$ bbitmqctl status //当前状态
3$ bbitmqctl cluster_status //集群状态
4
5

5.rabbitmq启动


1
2
3
1$ systemctl start rabbitmq-server.service
2
3

6.rabbitmq的管理界面


1
2
3
4
5
6
7
8
9
10
11
1$ rabbitmq-plugins list //列出所有插件
2$ rabbitmq-plugins enable rabbitmq_management //启动后在http://server-name:15672访问
3$ rabbitmq-plugins disable rabbitmq_management //关闭rabbitmq管理插件
4$ rabbitmq-plugins enable rabbitmq_tracing //启用trace插件
5$ rabbitmqctl trace_on //打开trace的开关
6$ rabbitmqctl trace_on -p vhost
7$ rabbitmqctl trace_off //关闭trace的开关
8$ rabbitmq-plugins disable rabbitmq_tracing //关闭trace插件
9$ rabbitmq-plugins enable rabbitmq_delayed_message_exchange //安装延时队列
10
11

7.rabbitmq停止


1
2
3
1$ systemctl stop rabbitmq-server.service
2
3

8.rabbitmq关闭


1
2
3
1$ rabbitmqctl shutdown
2
3

9.rabbitmq重启


1
2
3
4
1$ systemctl restart rabbitmq-server.service
2$ rabbitmqctl reset
3
4

10.配置文件位于/etc/rabbitmq


1
2
3
1$ rabbitmqctl environment //查看运行参数
2
3

11.用户管理


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
1$ rabbitmqctl list_users //列出所有用户
2$ rabbitmqctl list_user_permissions user //查看用户权限
3$ 例子:rabbitmqctl list_user_permissions hyp
4$ rabbitmqctl add_user user password //添加用户
5$ 例子:rabbitmqctl add_user hyp hyp
6$ rabbitmqctl delete_user user //删除用户
7$ 例子:rabbitmqctl delete_user hyp
8$ rabbitmqctl set_user_tags user <administrator | monitoring | management | policymaker> //授予角色
9例子:$ rabbitmqctl set_user_tags hyp administrator
10administrator角色,可查看所有信息,并且可以对用户,策略(policy)进行操作.
11monitoring角色,可查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
12policymaker角色,可以对策略(policy)进行管理,但无法查看节点的相关信息.
13management角色,只能查看队列和交换机等,无法看到节点信息,也无法对策略进行管理.
14rabbitmqctl change_password user newpassword //修改密码
15例子:$ rabbitmqctl change_password hyp hyppassword
16
17

12.集群


1
2
3
4
5
6
7
8
9
10
11
12
13
1$ rabbitmqctl stop_app
2$ rabbitmqctl reset
3$ rabbitmqctl join_cluster --ram rabbit@v01-app-rabbitmq01 //加入集群,该节点为内存节点类型
4$ rabbitmqctl join_cluster --disc rabbit@v01-app-rabbitmq01 //加入集群,该节点为磁盘节点类型
5$ rabbitmqctl change_cluster_node_type <ram | disc> //修改节点类型
6$ rabbitmqctl start_app
7/*到目前为止,集群虽然搭建成功,但只是默认的普通集群,exchange,binding等数据可以复制到集群各节点
8但对于队列来说,各节点只有相同的元数据,即队列结构,但队列实体只存在于创建改队列的节点,即队列内容不会复制
9(从其余节点读取,可以建立临时的通信传输)
10$ rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}' //将所有队列设置为镜像队列,即队列会被复制到各个节点,各个节点状态保持一直
11$ rabbitmqctl list_policies //查看策略
12
13

Client Demo-HelloWorld (JAVA)

1.Create Maven Project And Import Rabbit MQ amqp-client


1
2
3
4
5
6
7
8
1<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
2<dependency>
3    <groupId>com.rabbitmq</groupId>
4    <artifactId>amqp-client</artifactId>
5    <version>5.4.1</version>
6</dependency>
7
8

2.Create Send Class


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
1import com.rabbitmq.client.ConnectionFactory;
2import com.rabbitmq.client.Connection;
3import com.rabbitmq.client.Channel;
4
5import java.io.IOException;
6import java.nio.charset.StandardCharsets;
7import java.util.concurrent.TimeoutException;
8
9public class Send {
10    private final static String QUEUE_NAME = "hello";
11
12    public static void main(String[] args) throws IOException, TimeoutException {
13        ConnectionFactory connectionFactory = new ConnectionFactory();
14        connectionFactory.setHost("IP or Host");
15        connectionFactory.setUsername("your UserName");
16        connectionFactory.setPassword("your Password");
17        Connection connection = connectionFactory.newConnection();
18        Channel channel = connection.createChannel();
19
20        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
21        String message = "Hello world!!";
22        channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
23        System.out.println(" [x] Sent '" + message + "'");
24
25        channel.close();
26        connection.close();
27    }
28}
29
30

3.Create Receive Class


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
1import com.rabbitmq.client.*;
2import java.nio.charset.StandardCharsets;
3
4public class Received {
5
6    private final static String QUEUE_NAME = "hello";
7
8    public static void main(String[] argv) throws Exception {
9        ConnectionFactory connectionFactory = new ConnectionFactory();
10        connectionFactory.setHost("IP or Host");
11        connectionFactory.setUsername("your UserName");
12        connectionFactory.setPassword("your Password");
13        Connection connection = factory.newConnection();
14        Channel channel = connectionFactory.createChannel();
15
16        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
17        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
18
19        Consumer consumer = new DefaultConsumer(channel) {
20            @Override
21            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
22                String message = new String(body, StandardCharsets.UTF_8);
23                System.out.println(" [x] Received '" + message + "'");
24            }
25        };
26        channel.basicConsume(QUEUE_NAME, true, consumer);
27    }
28}
29
30

Client Demo-HelloWorld(SpringBoot)

1.Use Spring Initializr And Add RabbitMQ Dependency

2.配置 application.yml


1
2
3
4
5
6
7
8
9
1spring:
2  rabbitmq:
3    host: xxxx
4    port: 5672
5    username: xxxx
6    password: xxxx
7    virtual-host: /
8
9

3.编写启动类


1
2
3
4
5
6
7
8
9
10
11
12
13
14
1import org.springframework.boot.SpringApplication;
2import org.springframework.boot.autoconfigure.SpringBootApplication;
3import org.springframework.scheduling.annotation.EnableScheduling;
4
5@SpringBootApplication
6@EnableScheduling
7public class RabbitmqTutorialsApplication {
8
9    public static void main(String[] args) {
10        SpringApplication.run(RabbitmqTutorialsApplication.class, args);
11    }
12}
13
14

4.编写Sender


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
1import org.springframework.amqp.core.Queue;
2import org.springframework.amqp.rabbit.core.RabbitTemplate;
3import org.springframework.beans.factory.annotation.Autowired;
4import org.springframework.scheduling.annotation.Scheduled;
5import org.springframework.stereotype.Component;
6
7@Component
8public class TutorialSender {
9
10    @Autowired
11    private RabbitTemplate template;
12
13    @Autowired
14    private Queue queue;
15
16    @Scheduled(fixedDelay = 1000, initialDelay = 500)
17    public void send() {
18        String message = "Hello World!";
19        this.template.convertAndSend(queue.getName(), message);
20        System.out.println(" [x] Sent '" + message + "'");
21    }
22}
23
24

5.编写Receiver


1
2
3
4
5
6
7
8
9
10
11
12
13
1import org.springframework.amqp.rabbit.annotation.RabbitHandler;
2import org.springframework.amqp.rabbit.annotation.RabbitListener;
3
4@RabbitListener(queues = "hello")
5public class TutorialReceiver {
6
7    @RabbitHandler
8    public void receive(String msg) {
9        System.out.println(" [x] Received '" + msg + "'");
10    }
11}
12
13

6.编写Amqp配置类


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
1import com.rrc.rabbitmqtutorials.Service.TutorialReceiver;
2import com.rrc.rabbitmqtutorials.Service.TutorialSender;
3import org.springframework.amqp.core.*;
4import org.springframework.context.annotation.Bean;
5import org.springframework.context.annotation.Configuration;
6
7@Configuration
8public class AmqpConfig {
9
10    private static final String QUEUE_NAME = "hello";
11    @Bean
12    public Queue hello() {
13        return new Queue(QUEUE_NAME);
14    }
15
16    @Bean
17    public TutorialReceiver receiver() {
18        return new TutorialReceiver();
19    }
20
21    @Bean
22    public TutorialSender sender() {
23        return new TutorialSender();
24    }
25}
26
27

Exchanges

The core idea in the messaging model in RabbitMQ is that the producer never sends any messages directly to a queue. Actually, quite often the producer doesn’t even know if a message will be delivered to any queue at all.

在RabbitMQ的消息模型中最核心的想法是生产者绝不直接将消息发送到队列。实际上生产者往往不知道消息会交给哪个队列。

Instead, the producer can only send messages to an exchange. An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.

因此,生产者只能将消息交给一个交换机,这个交换机是一个非常简单的东西,它一边接受生产者的消息另一边将消息推给队列。这个交换机必须清楚的知道对于接收到的消息如何处理。是该交给一个特定的队列,还是发送给很多队列,还是应该丢弃掉这个消息。这些规则通过交换机的类型进行定义。

交换机类型

exchange在和queue进行binding时会设置routingkey

direct

在direct类型的exchange中,只有这两个routing key完全相同,exchange才会选择对应的binging进行消息路由。

topic

此类型exchange和上面的direct类型差不多,但direct类型要求routingkey完全相等,这里的routingkey可以有通配符:’*’,’#’.

其中’*'表示匹配一个单词, '#'则表示匹配没有或者多个单词

Topic exchange is powerful and can behave like other exchanges.

When a queue is bound with “#” (hash) binding key – it will receive all the messages, regardless of the routing key – like in fanout exchange.

When special characters “*” (star) and “#” (hash) aren’t used in bindings, the topic exchange will behave just like a direct one.

topic很强。

当一个队列用#绑定时,不管routing key是啥它都能匹配,所以它会接收所有信息,就是一个广播交换机。

当* 和#都没有用在绑定中,那topic交换机就是一个direct交换机,没有区别。

headers

其路由的规则是根据header来判断,其中的header就是以下方法的arguments参数。headers并不常用。


1
2
3
4
5
6
7
1//eg
2Map<String, Object> map = new HashMap<String, Object>();
3map.put("h1", "v1");
4map.put("h2", "v2");
5BindingBuilder.bind(autoDeleteQueue1).to(headersExchange).whereAll(map).match();
6
7

fanout

The fanout exchange is very simple. As you can probably guess from the name, it just broadcasts all the messages it receives to all the queues it knows. And that’s exactly what we need for fanning out our messages.

广播交换机非常简单,可以从名字猜测出它是广播所有接收到的消息给它已知的队列。广播信息就用这个。

匿名队列

We’re also interested only in currently flowing messages not in the old ones. To solve that we need two things.

Firstly, whenever we connect to Rabbit we need a fresh, empty queue. To do this we could create a queue with a random name, or, even better – let the server choose a random queue name for us.

Secondly, once we disconnect the consumer the queue should be automatically deleted. To do this with the spring-amqp client, we defined and AnonymousQueue, which creates a non-durable, exclusive, autodelete queue with a generated name:

我们经常只对最近的信息感兴趣,对旧数据没有,为了解决这个我们需要两件事:

1.无论何时我们连接到Rabbit,我们需要一个新的、空的队列。做到这个我们需要一个随机名字或者更好的方法——让服务器给我们选一个随机队列名。

2.一旦我们从消费者断开连接,队列应该自动删除。用sping-amqp做到这个事,我们定义了AnonymousQueue,作用就是用一个生成的名字创建一个非持久的、专用的自动删除的队列。

绑定

We’ve already created a fanout exchange and a queue. Now we need to tell the exchange to send messages to our queue. That relationship between exchange and a queue is called a binding. In the above Tut3Config you can see that we have two bindings, one for each AnonymousQueue.

我们已经创建了一个广播交换机和一个队列,现在我们需要告诉这个交换机发送信息给我们的队列。交换机和队列直接的关系我们叫做绑定。下面的代码中可以看到两个绑定,每一个都对应一个匿名队列。

Code


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
1//代码结构类比SpringBoot版Demo,这里只贴改动代码
2//更多的详细代码见
3//config
4   //定义一个广播交换机
5   @Bean
6    public FanoutExchange fanout() {
7        return new FanoutExchange("tut.fanout");
8    }
9    
10    private static class ReceiverConfig {
11        
12        //定义一个队列
13        @Bean(name = "autoDeleteQueue1")
14        public Queue autoDeleteQueue1() {
15            //匿名队列
16            return new AnonymousQueue();
17        }
18
19        @Bean(name = "autoDeleteQueue2")
20        public Queue autoDeleteQueue2() {
21            return new AnonymousQueue();
22        }
23
24        //绑定队列到交换机
25        @Bean
26        public Binding binding1(FanoutExchange fanout,
27            @Qualifier("autoDeleteQueue1") Queue autoDeleteQueue1) {
28            return BindingBuilder.bind(autoDeleteQueue1).to(fanout);
29        }
30
31        @Bean
32        public Binding binding2(FanoutExchange fanout,
33            @Qualifier("autoDeleteQueue2") Queue autoDeleteQueue2) {
34            return BindingBuilder.bind(autoDeleteQueue2).to(fanout);
35        }
36
37        @Bean
38        public Tut3Receiver receiver() {
39            return new Tut3Receiver();
40        }
41    }
42
43

1
2
3
4
5
6
7
1//Sender,按之前模板添加或替换,将消息发送到广播交换机
2   @Autowired
3    private FanoutExchange fanout;
4  
5   template.convertAndSend(fanout.getName(), "", message);
6
7

1
2
3
4
5
6
7
8
9
10
11
12
1//Receiver,receive函数已经写好了,接收指定队列的消息
2   @RabbitListener(queues = "#{autoDeleteQueue1.name}")
3    public void receive1(String in) throws InterruptedException {
4        receive(in, 1);
5    }
6
7    @RabbitListener(queues = "#{autoDeleteQueue2.name}")
8    public void receive2(String in) throws InterruptedException {
9        receive(in, 2);
10    }
11
12

Notice

报错 connection Refuse

确认用户名密码,账号访问权限问题。可以在网页端设置host:15672/#users设置。

报错reply-code=406, reply-text=PRECONDITION_FAILED

Exchange或Queue使用不同参数重复创建同名的会出错。换个名试下。

给TA打赏
共{{data.count}}人
人已打赏
安全网络

CDN安全市场到2022年价值76.3亿美元

2018-2-1 18:02:50

安全经验

RMI远程调用

2021-11-28 16:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索