Spring Boot 配置多源的 RabbitMQ

释放双眼,带上耳机,听听看~!

简介

MQ 是开发中很平常的中间件,本文讲述的是怎么在一个Spring Boot项目中配置多源的RabbitMQ,这里不过多的讲解RabbitMQ的相关知识点。如果你也有遇到需要往多个RabbitMQ中发送消息的需求,希望本文可以帮助到你。

环境

  • rabbitmq 3.7.12
  • spring boot 2.1.6.RELEASE

当然软件的版本不是硬性要求,只是我使用的环境而已,唯一的要求是需要启动两个RabbitMQ,我这边是在kubernetes集群中使用helm官方提供的charts包快速启动的两个rabbitmq-ha高可用rabbitmq集群。

Spring Boot 配置多源的 RabbitMQ

想要了解 kubernetes或者helm,可以参看以下 github仓库:

SpringBoot中配置两个RabbitMQ源

在springboot 中配置单个RabbitMQ是极其简单的,我们只需要使用Springboot为我们自动装配的RabbitMQ相关的配置就可以了。但是需要配置多个源时,第二个及其以上的就需要单独配置了,这里我使用的都是单独配置的。

代码:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
1/**
2 * @author innerpeacez
3 * @since 2019/3/11
4 */
5@Data
6public abstract class AbstractRabbitConfiguration {
7
8    protected String host;
9    protected int port;
10    protected String username;
11    protected String password;
12
13    protected ConnectionFactory connectionFactory() {
14        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
15        connectionFactory.setHost(host);
16        connectionFactory.setPort(port);
17        connectionFactory.setUsername(username);
18        connectionFactory.setPassword(password);
19        return connectionFactory;
20    }
21}
22

第一个源的配置代码


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
1package com.zhw.study.springbootmultirabbitmq.config;
2
3import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
4import org.springframework.amqp.rabbit.connection.ConnectionFactory;
5import org.springframework.amqp.rabbit.core.RabbitAdmin;
6import org.springframework.amqp.rabbit.core.RabbitTemplate;
7import org.springframework.beans.factory.annotation.Qualifier;
8import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
9import org.springframework.boot.context.properties.ConfigurationProperties;
10import org.springframework.context.annotation.Bean;
11import org.springframework.context.annotation.Configuration;
12import org.springframework.context.annotation.Primary;
13
14/**
15 * @author innerpeacez
16 * @since 2019/3/8
17 */
18
19@Configuration
20@ConfigurationProperties("spring.rabbitmq.first")
21public class FirstRabbitConfiguration extends AbstractRabbitConfiguration {
22
23    @Bean(name = "firstConnectionFactory")
24    @Primary
25    public ConnectionFactory firstConnectionFactory() {
26        return super.connectionFactory();
27    }
28
29    @Bean(name = "firstRabbitTemplate")
30    @Primary
31    public RabbitTemplate firstRabbitTemplate(@Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory) {
32        return new RabbitTemplate(connectionFactory);
33    }
34
35    @Bean(name = "firstFactory")
36    public SimpleRabbitListenerContainerFactory firstFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
37                                                             @Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory) {
38        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
39        configurer.configure(factory, connectionFactory);
40        return factory;
41    }
42
43    @Bean(value = "firstRabbitAdmin")
44    public RabbitAdmin firstRabbitAdmin(@Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory) {
45        return new RabbitAdmin(connectionFactory);
46    }
47}
48

第二个源的配置代码


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
1package com.zhw.study.springbootmultirabbitmq.config;
2
3import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
4import org.springframework.amqp.rabbit.connection.ConnectionFactory;
5import org.springframework.amqp.rabbit.core.RabbitAdmin;
6import org.springframework.amqp.rabbit.core.RabbitTemplate;
7import org.springframework.beans.factory.annotation.Qualifier;
8import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
9import org.springframework.boot.context.properties.ConfigurationProperties;
10import org.springframework.context.annotation.Bean;
11import org.springframework.context.annotation.Configuration;
12
13/**
14 * @author innerpeacez
15 * @since 2019/3/8
16 */
17
18@Configuration
19@ConfigurationProperties("spring.rabbitmq.second")
20public class SecondRabbitConfiguration extends AbstractRabbitConfiguration {
21
22    @Bean(name = "secondConnectionFactory")
23    public ConnectionFactory secondConnectionFactory() {
24        return super.connectionFactory();
25    }
26
27    @Bean(name = "secondRabbitTemplate")
28    public RabbitTemplate secondRabbitTemplate(@Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory) {
29        return new RabbitTemplate(connectionFactory);
30    }
31
32    @Bean(name = "secondFactory")
33    public SimpleRabbitListenerContainerFactory secondFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
34                                                             @Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory) {
35        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
36        configurer.configure(factory, connectionFactory);
37        return factory;
38    }
39
40    @Bean(value = "secondRabbitAdmin")
41    public RabbitAdmin secondRabbitAdmin(@Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory) {
42        return new RabbitAdmin(connectionFactory);
43    }
44}
45

配置信息


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
1spring:
2  application:
3    name: multi-rabbitmq
4  rabbitmq:
5    first:
6      host: 192.168.10.76
7      port: 30509
8      username: admin
9      password: 123456
10    second:
11      host: 192.168.10.76
12      port: 31938
13      username: admin
14      password: 123456
15

测试

这样我们的两个RabbitMQ源就配置好了,接下来我们进行测试使用,为了方便使用,我写了一个MultiRabbitTemplate.class 方便我们使用不同的源。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
1/**
2 * @author innerpeacez
3 * @since 2019/3/8
4 */
5@Component
6public abstract class MultiRabbitTemplate {
7
8    @Autowired
9    @Qualifier(value = "firstRabbitTemplate")
10    public AmqpTemplate firstRabbitTemplate;
11
12    @Autowired
13    @Qualifier(value = "secondRabbitTemplate")
14    public AmqpTemplate secondRabbitTemplate;
15}
16

第一个消息发送者类 TestFirstSender.class


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
1/**
2 * @author innerpeacez
3 * @since 2019/3/11
4 */
5@Component
6@Slf4j
7public class TestFirstSender extends MultiRabbitTemplate implements MessageSender {
8
9    @Override
10    public void send(Object msg) {
11        log.info("rabbitmq1 , msg: {}", msg);
12        firstRabbitTemplate.convertAndSend("rabbitmq1", msg);
13    }
14
15    public void rabbitmq1sender() {
16        this.send("innerpeacez1");
17    }
18}
19

第二个消息发送者类 TestSecondSender.class


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
1/**
2 * @author innerpeacez
3 * @since 2019/3/11
4 */
5@Component
6@Slf4j
7public class TestSecondSender extends MultiRabbitTemplate implements MessageSender {
8
9    @Override
10    public void send(Object msg) {
11        log.info("rabbitmq2 , msg: {}", msg);
12        secondRabbitTemplate.convertAndSend("rabbitmq2", msg);
13    }
14
15    public void rabbitmq2sender() {
16        this.send("innerpeacez2");
17    }
18}
19

动态创建Queue的消费者


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
1/**
2 * @author innerpeacez
3 * @since 2019/3/11
4 */
5
6@Slf4j
7@Component
8public class TestFirstConsumer implements MessageConsumer {
9
10    @Override
11    @RabbitListener(bindings = @QueueBinding(value = @Queue("rabbitmq1")
12            , exchange = @Exchange("rabbitmq1")
13            , key = "rabbitmq1")
14            , containerFactory = "firstFactory")
15    public void receive(Object obj) {
16        log.info("rabbitmq1 , {}", obj);
17    }
18
19}
20

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
1/**
2 * @author innerpeacez
3 * @since 2019/3/11
4 */
5
6@Slf4j
7@Component
8public class TestSecondConsumer implements MessageConsumer {
9
10    @Override
11    @RabbitListener(bindings = @QueueBinding(value = @Queue("rabbitmq2")
12            , exchange = @Exchange("rabbitmq2")
13            , key = "rabbitmq2")
14            , containerFactory = "secondFactory")
15    public void receive(Object obj) {
16        log.info("rabbitmq2 , {}", obj);
17    }
18
19}
20

测试类


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
1@RunWith(SpringRunner.class)
2@SpringBootTest
3@Slf4j
4public class SpringBootMultiRabbitmqApplicationTests extends MultiRabbitTemplate {
5
6    @Autowired
7    private TestFirstSender firstSender;
8    @Autowired
9    private TestSecondSender secondSender;
10
11    /**
12     * 一百个线程向 First Rabbitmq 的 rabbitmq1 queue中发送一百条消息
13     */
14    @Test
15    public void testFirstSender() {
16        for (int i = 0; i < 100; i++) {
17            new Thread(() ->
18                    firstSender.rabbitmq1sender()
19            ).start();
20        }
21        try {
22            Thread.sleep(1000 * 10);
23        } catch (InterruptedException e) {
24            e.printStackTrace();
25        }
26    }
27
28    /**
29     * 一百个线程向 Second Rabbitmq 的 rabbitmq2 queue中发送一百条消息
30     */
31    @Test
32    public void testSecondSender() {
33        for (int i = 0; i < 100; i++) {
34            new Thread(() ->
35                    secondSender.rabbitmq2sender()
36            ).start();
37        }
38        try {
39            Thread.sleep(1000 * 10);
40        } catch (InterruptedException e) {
41            e.printStackTrace();
42        }
43
44    }
45}
46

测试结果:

Spring Boot 配置多源的 RabbitMQ

Spring Boot 配置多源的 RabbitMQ

总结

这样配置好之后我们就可向两个RabbitMQ中发送消息啦。这里只配置了两个源,当然如果你需要更多的源,仅仅只需要配置*RabbitConfiguration.class就可以啦。本文没有多说关于RabbitMQ的相关知识,如果未使用过需要自己了解一下相关知识。

给TA打赏
共{{data.count}}人
人已打赏
安全网络

CDN安全市场到2022年价值76.3亿美元

2018-2-1 18:02:50

安全技术

Nodejs读写文件

2021-12-21 16:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索