文章目录
-
- RabbitAdmin
-
- RabbitTemplate
-
- SimpleMessageListenerContainer(消息监听容器;可以运行中, 动态修改)
-
- MessageListenerAdapter(消息监听适配器)
-
- MessageConverter(消息转换器)
-
- RabbitAdmin
1. RabbitAdmin
-
概述
-
可以把RabbitAdmin作为Spring的一个Bean,然后细粒度的操作RabbitMQ。
- 底层调用的是RabbitTemplate的execute方法,然后execute调用RabbitMQ的其它基础操作。
-
pom文件配置
1
2
3
4
5
6
7
8
9
10
11
12 1<!--RabbitMQ-->
2<dependency>
3 <groupId>com.rabbitmq</groupId>
4 <artifactId>amqp-client</artifactId>
5 <version>3.6.5</version>
6</dependency>
7<dependency>
8 <groupId>org.springframework.boot</groupId>
9 <artifactId>spring-boot-starter-amqp</artifactId>
10</dependency>
11
12
- Rabbit和AMQP的配置:
RabbitAdmin 's AutoStartup必须设置为true, 否则Spring启动时, 不会把RabbitAdmin注入到Spring容器中. 通过@Bean的方式, 当Spring启动的时候, 会自动创建这些Bean的Exchange、queue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43 1@Configuration
2@ComponentScan({"com.boot.rabbitmq.spring.*"})public class RabbitMQConfig {
3
4 @Bean
5 public ConnectionFactory connectionFactory() {
6 CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
7 connectionFactory.setAddresses("127.0.0.1:5672");
8 connectionFactory.setUsername("guest");
9 connectionFactory.setPassword("guest");
10 connectionFactory.setVirtualHost("/");
11
12 return connectionFactory;
13 }
14
15 @Bean
16 public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
17 RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
18 // 必须设置为true, 否则Spring启动时, 不会把RabbitAdmin注入到Spring容器中.
19 rabbitAdmin.setAutoStartup(true);
20
21 return rabbitAdmin;
22 }
23
24 @Bean
25 public TopicExchange exchange001() {
26 return new TopicExchange("exchange001", true, false);
27 }
28
29 @Bean
30 public Queue queue001() {
31 return new Queue("queue001", true);
32 }
33
34 @Bean
35 public Binding binding001() {
36 return BindingBuilder
37 .bind(queue001()) // Queue
38 .to(exchange001()) // Exchange
39 .with("spring.*"); // Routing key
40 }
41 }
42
43
2. RabbitTemplate
-
概述:
-
是提供与SpringAMQP整合的时候进行发送消息的关键类。
- 该类包含了:可靠性投递消息方法,回调监听消息接口(ConfirmCallback)、返回值确认接口(ReturnCallback)等等。
-
Config配置
1
2
3
4
5
6
7
8
9
10 1 @Bean
2 public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
3 RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
4 // 可以设置很多回到方法.
5 // rabbitTemplate.setConfirmCallback();
6 // rabbitTemplate.setReturnCallback();
7 return rabbitTemplate;
8 }
9
10
-
基础API使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43 1public class RabbitTemplateTest {
2
3 @Autowired
4 private RabbitTemplate rabbitTemplate;
5
6 public void sendMessage() {
7 // 1.设置消息头.
8 MessageProperties messageProperties = new MessageProperties();
9 messageProperties.getHeaders().put("header1", "HEADER!");
10
11 String messageBody = "Hello RabbitTemplate!";
12 // 2.设置休息体.
13 Message message = new Message(messageBody.getBytes(), messageProperties);
14
15 // 3.发送消息.
16 rabbitTemplate.convertAndSend("exchange001", "spring.amqp", message, message1 -> {
17 // 对Message的转换.
18 message1.getMessageProperties().getHeaders().put("header1", "NEW HEADER!");
19 message1.getMessageProperties().getHeaders().put("header2", "HEADER2!");
20 return message1;
21 });
22 }
23
24 /**
25 * 多种发送消息形式.
26 */
27 public void sendMessage2() { // 1.设置消息头.
28 MessageProperties messageProperties = new MessageProperties();
29 messageProperties.getHeaders().put("header1", "HEADER!");
30
31 String messageBody = "Hello RabbitTemplate!";
32 // 2.设置休息体.
33 Message message = new Message(messageBody.getBytes(), messageProperties);
34
35 // 3.1 发送的是封装好的Message对象.
36 rabbitTemplate.send("exchange001", "spring.amqp", message);
37
38 // 3.2 直接发送字符串.
39 rabbitTemplate.convertAndSend("exchange001", "spring.amqp", "Hello world!");
40 }
41 }
42
43
3. SimpleMessageListenerContainer(
消息监听容器;可以运行中, 动态修改)
概述:
- 可以让消费者监听队列(多个),自动启动,自动声明功能。
- 设置事务特性、事务管理器、事务属性、事务容量(并发)、是否开启事务、回滚消息等。
- 设置消费者数量、最小最大数量、批量消费。
- 设置消息确认和自动确认模式、是否重回队列、异常捕获handler函数。
- 设置消费者标签生成策略、是否独占模式、消费者属性等。
- 设置具体的监听器、消息转换器等。
配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 1@Bean
2public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
3 container.setQueues(queue001(), queue002()); // 设置监听多个queue
4 container.setConcurrentConsumers(1); // 设置消费端数量
5 container.setMaxConcurrentConsumers(5); // 设置最大的消费者数量
6 container.setDefaultRequeueRejected(false); // 是否重回队列
7 container.setAcknowledgeMode(AcknowledgeMode.AUTO); // 设置签收模式
8 container.setConsumerTagStrategy(queue -> queue + "_" + UUID.randomUUID()); // 设置消费端标签策略
9
10 // 设置消息的监听
11 container.setMessageListener(message -> System.out.println("消费端消息: " + new String(message.getBody())));
12 return container;
13}
14
15
4. MessageListenerAdapter(消息监听适配器)
-
案例:自定义MessageListenerAdapter以及MessageConverter 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 1// 通过自定义listenerAdapter和转换DefaultListenerMethod参数类型.
2MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(new MyMessageListenerAdapter());
3listenerAdapter.setDefaultListenerMethod("defaultListener");
4listenerAdapter.setMessageConverter(new MyMessageConverter());
5container.setMessageListener(listenerAdapter);
6
7
8/**
9 * {@link org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter#setMessageConverter(MessageConverter)}
10 */
11public class MyMessageConverter implements MessageConverter {
12 @Override
13 public Message toMessage(Object message, MessageProperties messageProperties) throws MessageConversionException {
14 return new Message(message.toString().getBytes(), messageProperties);
15 }
16
17 @Override
18 public Object fromMessage(Message message) throws MessageConversionException {
19 String contentType = message.getMessageProperties().getContentType();
20 if (contentType != null && contentType.contains("text")) {
21 return new String(message.getBody());
22 }
23
24 return message.getBody();
25 }
26}
27
28
-
案例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14 1/**
2 * 队列名称和方法名称一一对应
3 * 自定义的listenerAdapter, 需要有对应的方法名称.
4 * 这样就是队列触发的是, 对应的方法。
5 */
6MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(new MyMessageListenerAdapter());
7Map<String, String> queueOrTagToMethodName =new HashMap<>();
8queueOrTagToMethodName.put("queue001", "method1");
9queueOrTagToMethodName.put("queue002", "method2");
10queueOrTagToMethodName.put("queue003", "method3");
11
12listenerAdapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
13
14
5. MessageConverter(消息转换器)
-
概述:需要实现MessageConverter接口,然后,重写fromMessage和toMessage方法。
-
转换器:
-
JSON转换器:Jackson2JsonMessageConverter:进行java对象的转换功能。
- DefaultJackson2JavaTypeMapper映射器:进行java对象的映射关系。
- 自定义二进制转换器:比如图片、PDF、PPT、流媒体。
-
TextConverter:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 1/**
2 * {@link org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter#setMessageConverter(MessageConverter)}
3 */
4public class MyMessageConverter implements MessageConverter {
5 @Override
6 public Message toMessage(Object message, MessageProperties messageProperties) throws MessageConversionException {
7 return new Message(message.toString().getBytes(), messageProperties);
8 }
9
10 @Override
11 public Object fromMessage(Message message) throws MessageConversionException {
12 String contentType = message.getMessageProperties().getContentType();
13 if (contentType != null && contentType.contains("text")) {
14 return new String(message.getBody());
15 }
16
17 return message.getBody();
18 }
19}
20
21
-
JSON格式的转换器
1
2
3
4
5
6
7 1// 通过自定义listenerAdapter和转换DefaultListenerMethod参数类型.
2MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(new MyMessageListenerAdapter());
3listenerAdapter.setDefaultListenerMethod("defaultListener");
4listenerAdapter.setMessageConverter(new MyMessageConverter());
5container.setMessageListener(listenerAdapter);
6
7