简介
MQ 是开发中很平常的中间件,本文讲述的是怎么在一个Spring Boot项目中配置多源的RabbitMQ,这里不过多的讲解RabbitMQ的相关知识点。如果你也有遇到需要往多个RabbitMQ中发送消息的需求,希望本文可以帮助到你。
环境
- rabbitmq 3.7.12
- spring boot 2.1.6.RELEASE
当然软件的版本不是硬性要求,只是我使用的环境而已,唯一的要求是需要启动两个RabbitMQ,我这边是在kubernetes集群中使用helm官方提供的charts包快速启动的两个rabbitmq-ha高可用rabbitmq集群。
想要了解 kubernetes或者helm,可以参看以下 github仓库:
- kubernetes : https://github.com/kubernetes/kubernetes
- helm: https://github.com/helm/helm
- charts: https://github.com/helm/charts
SpringBoot中配置两个RabbitMQ源
在springboot 中配置单个RabbitMQ是极其简单的,我们只需要使用Springboot为我们自动装配的RabbitMQ相关的配置就可以了。但是需要配置多个源时,第二个及其以上的就需要单独配置了,这里我使用的都是单独配置的。
代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 1/**
2 * @author innerpeacez
3 * @since 2019/3/11
4 */
5@Data
6public abstract class AbstractRabbitConfiguration {
7
8 protected String host;
9 protected int port;
10 protected String username;
11 protected String password;
12
13 protected ConnectionFactory connectionFactory() {
14 CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
15 connectionFactory.setHost(host);
16 connectionFactory.setPort(port);
17 connectionFactory.setUsername(username);
18 connectionFactory.setPassword(password);
19 return connectionFactory;
20 }
21}
22
第一个源的配置代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48 1package com.zhw.study.springbootmultirabbitmq.config;
2
3import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
4import org.springframework.amqp.rabbit.connection.ConnectionFactory;
5import org.springframework.amqp.rabbit.core.RabbitAdmin;
6import org.springframework.amqp.rabbit.core.RabbitTemplate;
7import org.springframework.beans.factory.annotation.Qualifier;
8import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
9import org.springframework.boot.context.properties.ConfigurationProperties;
10import org.springframework.context.annotation.Bean;
11import org.springframework.context.annotation.Configuration;
12import org.springframework.context.annotation.Primary;
13
14/**
15 * @author innerpeacez
16 * @since 2019/3/8
17 */
18
19@Configuration
20@ConfigurationProperties("spring.rabbitmq.first")
21public class FirstRabbitConfiguration extends AbstractRabbitConfiguration {
22
23 @Bean(name = "firstConnectionFactory")
24 @Primary
25 public ConnectionFactory firstConnectionFactory() {
26 return super.connectionFactory();
27 }
28
29 @Bean(name = "firstRabbitTemplate")
30 @Primary
31 public RabbitTemplate firstRabbitTemplate(@Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory) {
32 return new RabbitTemplate(connectionFactory);
33 }
34
35 @Bean(name = "firstFactory")
36 public SimpleRabbitListenerContainerFactory firstFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
37 @Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory) {
38 SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
39 configurer.configure(factory, connectionFactory);
40 return factory;
41 }
42
43 @Bean(value = "firstRabbitAdmin")
44 public RabbitAdmin firstRabbitAdmin(@Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory) {
45 return new RabbitAdmin(connectionFactory);
46 }
47}
48
第二个源的配置代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45 1package com.zhw.study.springbootmultirabbitmq.config;
2
3import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
4import org.springframework.amqp.rabbit.connection.ConnectionFactory;
5import org.springframework.amqp.rabbit.core.RabbitAdmin;
6import org.springframework.amqp.rabbit.core.RabbitTemplate;
7import org.springframework.beans.factory.annotation.Qualifier;
8import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
9import org.springframework.boot.context.properties.ConfigurationProperties;
10import org.springframework.context.annotation.Bean;
11import org.springframework.context.annotation.Configuration;
12
13/**
14 * @author innerpeacez
15 * @since 2019/3/8
16 */
17
18@Configuration
19@ConfigurationProperties("spring.rabbitmq.second")
20public class SecondRabbitConfiguration extends AbstractRabbitConfiguration {
21
22 @Bean(name = "secondConnectionFactory")
23 public ConnectionFactory secondConnectionFactory() {
24 return super.connectionFactory();
25 }
26
27 @Bean(name = "secondRabbitTemplate")
28 public RabbitTemplate secondRabbitTemplate(@Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory) {
29 return new RabbitTemplate(connectionFactory);
30 }
31
32 @Bean(name = "secondFactory")
33 public SimpleRabbitListenerContainerFactory secondFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
34 @Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory) {
35 SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
36 configurer.configure(factory, connectionFactory);
37 return factory;
38 }
39
40 @Bean(value = "secondRabbitAdmin")
41 public RabbitAdmin secondRabbitAdmin(@Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory) {
42 return new RabbitAdmin(connectionFactory);
43 }
44}
45
配置信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 1spring:
2 application:
3 name: multi-rabbitmq
4 rabbitmq:
5 first:
6 host: 192.168.10.76
7 port: 30509
8 username: admin
9 password: 123456
10 second:
11 host: 192.168.10.76
12 port: 31938
13 username: admin
14 password: 123456
15
测试
这样我们的两个RabbitMQ源就配置好了,接下来我们进行测试使用,为了方便使用,我写了一个MultiRabbitTemplate.class 方便我们使用不同的源。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 1/**
2 * @author innerpeacez
3 * @since 2019/3/8
4 */
5@Component
6public abstract class MultiRabbitTemplate {
7
8 @Autowired
9 @Qualifier(value = "firstRabbitTemplate")
10 public AmqpTemplate firstRabbitTemplate;
11
12 @Autowired
13 @Qualifier(value = "secondRabbitTemplate")
14 public AmqpTemplate secondRabbitTemplate;
15}
16
第一个消息发送者类 TestFirstSender.class
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 1/**
2 * @author innerpeacez
3 * @since 2019/3/11
4 */
5@Component
6@Slf4j
7public class TestFirstSender extends MultiRabbitTemplate implements MessageSender {
8
9 @Override
10 public void send(Object msg) {
11 log.info("rabbitmq1 , msg: {}", msg);
12 firstRabbitTemplate.convertAndSend("rabbitmq1", msg);
13 }
14
15 public void rabbitmq1sender() {
16 this.send("innerpeacez1");
17 }
18}
19
第二个消息发送者类 TestSecondSender.class
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 1/**
2 * @author innerpeacez
3 * @since 2019/3/11
4 */
5@Component
6@Slf4j
7public class TestSecondSender extends MultiRabbitTemplate implements MessageSender {
8
9 @Override
10 public void send(Object msg) {
11 log.info("rabbitmq2 , msg: {}", msg);
12 secondRabbitTemplate.convertAndSend("rabbitmq2", msg);
13 }
14
15 public void rabbitmq2sender() {
16 this.send("innerpeacez2");
17 }
18}
19
动态创建Queue的消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 1/**
2 * @author innerpeacez
3 * @since 2019/3/11
4 */
5
6@Slf4j
7@Component
8public class TestFirstConsumer implements MessageConsumer {
9
10 @Override
11 @RabbitListener(bindings = @QueueBinding(value = @Queue("rabbitmq1")
12 , exchange = @Exchange("rabbitmq1")
13 , key = "rabbitmq1")
14 , containerFactory = "firstFactory")
15 public void receive(Object obj) {
16 log.info("rabbitmq1 , {}", obj);
17 }
18
19}
20
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 1/**
2 * @author innerpeacez
3 * @since 2019/3/11
4 */
5
6@Slf4j
7@Component
8public class TestSecondConsumer implements MessageConsumer {
9
10 @Override
11 @RabbitListener(bindings = @QueueBinding(value = @Queue("rabbitmq2")
12 , exchange = @Exchange("rabbitmq2")
13 , key = "rabbitmq2")
14 , containerFactory = "secondFactory")
15 public void receive(Object obj) {
16 log.info("rabbitmq2 , {}", obj);
17 }
18
19}
20
测试类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46 1@RunWith(SpringRunner.class)
2@SpringBootTest
3@Slf4j
4public class SpringBootMultiRabbitmqApplicationTests extends MultiRabbitTemplate {
5
6 @Autowired
7 private TestFirstSender firstSender;
8 @Autowired
9 private TestSecondSender secondSender;
10
11 /**
12 * 一百个线程向 First Rabbitmq 的 rabbitmq1 queue中发送一百条消息
13 */
14 @Test
15 public void testFirstSender() {
16 for (int i = 0; i < 100; i++) {
17 new Thread(() ->
18 firstSender.rabbitmq1sender()
19 ).start();
20 }
21 try {
22 Thread.sleep(1000 * 10);
23 } catch (InterruptedException e) {
24 e.printStackTrace();
25 }
26 }
27
28 /**
29 * 一百个线程向 Second Rabbitmq 的 rabbitmq2 queue中发送一百条消息
30 */
31 @Test
32 public void testSecondSender() {
33 for (int i = 0; i < 100; i++) {
34 new Thread(() ->
35 secondSender.rabbitmq2sender()
36 ).start();
37 }
38 try {
39 Thread.sleep(1000 * 10);
40 } catch (InterruptedException e) {
41 e.printStackTrace();
42 }
43
44 }
45}
46
测试结果:
总结
这样配置好之后我们就可向两个RabbitMQ中发送消息啦。这里只配置了两个源,当然如果你需要更多的源,仅仅只需要配置*RabbitConfiguration.class就可以啦。本文没有多说关于RabbitMQ的相关知识,如果未使用过需要自己了解一下相关知识。