RabbitMQ-整合Spring AMQP实战

释放双眼,带上耳机,听听看~!

文章目录

    1. RabbitAdmin
        1. RabbitTemplate
        1. SimpleMessageListenerContainer(消息监听容器;可以运行中, 动态修改)
        1. MessageListenerAdapter(消息监听适配器)
        1. MessageConverter(消息转换器)

1. RabbitAdmin

  1. 概述

  2. 可以把RabbitAdmin作为Spring的一个Bean,然后细粒度的操作RabbitMQ。

    1. 底层调用的是RabbitTemplate的execute方法,然后execute调用RabbitMQ的其它基础操作。
  3. pom文件配置


1
2
3
4
5
6
7
8
9
10
11
12
1<!--RabbitMQ-->
2<dependency>
3    <groupId>com.rabbitmq</groupId>
4    <artifactId>amqp-client</artifactId>
5    <version>3.6.5</version>
6</dependency>
7<dependency>
8    <groupId>org.springframework.boot</groupId>
9    <artifactId>spring-boot-starter-amqp</artifactId>
10</dependency>
11
12
  1. Rabbit和AMQP的配置:

RabbitAdmin 's AutoStartup必须设置为true, 否则Spring启动时, 不会把RabbitAdmin注入到Spring容器中. 通过@Bean的方式, 当Spring启动的时候, 会自动创建这些Bean的Exchange、queue


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
1@Configuration
2@ComponentScan({"com.boot.rabbitmq.spring.*"})public class RabbitMQConfig {
3
4    @Bean
5    public ConnectionFactory connectionFactory() {        
6       CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
7        connectionFactory.setAddresses("127.0.0.1:5672");
8        connectionFactory.setUsername("guest");
9        connectionFactory.setPassword("guest");
10        connectionFactory.setVirtualHost("/");
11
12        return connectionFactory;
13    }
14    
15    @Bean
16    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {        
17      RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
18        // 必须设置为true, 否则Spring启动时, 不会把RabbitAdmin注入到Spring容器中.
19        rabbitAdmin.setAutoStartup(true);
20
21        return rabbitAdmin;
22    }
23    
24    @Bean
25  public TopicExchange exchange001() {    
26      return new TopicExchange("exchange001", true, false);
27  }
28 
29  @Bean
30  public Queue queue001() {    
31      return new Queue("queue001", true);
32  }
33 
34  @Bean
35  public Binding binding001() {    
36      return BindingBuilder
37              .bind(queue001()) // Queue
38              .to(exchange001()) // Exchange
39              .with("spring.*"); // Routing key
40  }
41 }
42
43

2. RabbitTemplate

  1. 概述:

  2. 是提供与SpringAMQP整合的时候进行发送消息的关键类。

    1. 该类包含了:可靠性投递消息方法,回调监听消息接口(ConfirmCallback)、返回值确认接口(ReturnCallback)等等。
  3. Config配置


1
2
3
4
5
6
7
8
9
10
1    @Bean
2    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {        
3       RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
4        // 可以设置很多回到方法.
5       // rabbitTemplate.setConfirmCallback();
6       // rabbitTemplate.setReturnCallback();
7        return rabbitTemplate;
8    }
9
10
  1. 基础API使用


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
1public class RabbitTemplateTest {
2
3    @Autowired
4    private RabbitTemplate rabbitTemplate;
5
6    public void sendMessage() {        
7       // 1.设置消息头.
8        MessageProperties messageProperties = new MessageProperties();
9        messageProperties.getHeaders().put("header1", "HEADER!");
10
11        String messageBody = "Hello RabbitTemplate!";
12        // 2.设置休息体.
13        Message message = new Message(messageBody.getBytes(), messageProperties);
14
15        // 3.发送消息.
16        rabbitTemplate.convertAndSend("exchange001", "spring.amqp", message, message1 -> {            
17          // 对Message的转换.
18            message1.getMessageProperties().getHeaders().put("header1", "NEW HEADER!");
19            message1.getMessageProperties().getHeaders().put("header2", "HEADER2!");
20            return message1;
21        });
22    }
23    
24    /**
25     * 多种发送消息形式.
26     */
27    public void sendMessage2() {        // 1.设置消息头.
28        MessageProperties messageProperties = new MessageProperties();
29        messageProperties.getHeaders().put("header1", "HEADER!");
30
31        String messageBody = "Hello RabbitTemplate!";
32        // 2.设置休息体.
33        Message message = new Message(messageBody.getBytes(), messageProperties);
34
35        // 3.1 发送的是封装好的Message对象.
36        rabbitTemplate.send("exchange001", "spring.amqp", message);
37
38        // 3.2 直接发送字符串.
39        rabbitTemplate.convertAndSend("exchange001", "spring.amqp", "Hello world!");
40    }
41 }
42
43

3. SimpleMessageListenerContainer(

消息监听容器;可以运行中, 动态修改)

概述:

  1. 可以让消费者监听队列(多个),自动启动,自动声明功能。
    1. 设置事务特性、事务管理器、事务属性、事务容量(并发)、是否开启事务、回滚消息等。
    2. 设置消费者数量、最小最大数量、批量消费。
    3. 设置消息确认和自动确认模式、是否重回队列、异常捕获handler函数。
    4. 设置消费者标签生成策略、是否独占模式、消费者属性等。
    5. 设置具体的监听器、消息转换器等。

配置


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
1@Bean
2public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
3    container.setQueues(queue001(), queue002()); // 设置监听多个queue
4    container.setConcurrentConsumers(1); // 设置消费端数量
5    container.setMaxConcurrentConsumers(5); // 设置最大的消费者数量
6    container.setDefaultRequeueRejected(false); // 是否重回队列
7    container.setAcknowledgeMode(AcknowledgeMode.AUTO); // 设置签收模式
8    container.setConsumerTagStrategy(queue -> queue + "_" + UUID.randomUUID()); // 设置消费端标签策略
9
10    // 设置消息的监听
11    container.setMessageListener(message -> System.out.println("消费端消息: " + new String(message.getBody())));
12    return container;
13}
14
15

4. MessageListenerAdapter(消息监听适配器)

  1. 案例:自定义MessageListenerAdapter以及MessageConverter 。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
1// 通过自定义listenerAdapter和转换DefaultListenerMethod参数类型.
2MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(new MyMessageListenerAdapter());
3listenerAdapter.setDefaultListenerMethod("defaultListener");
4listenerAdapter.setMessageConverter(new MyMessageConverter());
5container.setMessageListener(listenerAdapter);
6
7
8/**
9 * {@link org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter#setMessageConverter(MessageConverter)}
10 */
11public class MyMessageConverter implements MessageConverter {
12    @Override
13    public Message toMessage(Object message, MessageProperties messageProperties) throws MessageConversionException {        
14      return new Message(message.toString().getBytes(), messageProperties);
15    }
16    
17    @Override
18    public Object fromMessage(Message message) throws MessageConversionException {        
19      String contentType = message.getMessageProperties().getContentType();
20        if (contentType != null && contentType.contains("text")) {            
21          return new String(message.getBody());
22        }
23        
24        return message.getBody();
25    }
26}
27
28
  1. 案例:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
1/**
2 * 队列名称和方法名称一一对应
3 * 自定义的listenerAdapter, 需要有对应的方法名称.
4 * 这样就是队列触发的是, 对应的方法。
5 */
6MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(new MyMessageListenerAdapter());
7Map<String, String> queueOrTagToMethodName =new HashMap<>();
8queueOrTagToMethodName.put("queue001", "method1");
9queueOrTagToMethodName.put("queue002", "method2");
10queueOrTagToMethodName.put("queue003", "method3");
11
12listenerAdapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
13
14

5. MessageConverter(消息转换器)

  1. 概述:需要实现MessageConverter接口,然后,重写fromMessage和toMessage方法。

  2. 转换器:

  3. JSON转换器:Jackson2JsonMessageConverter:进行java对象的转换功能。

    1. DefaultJackson2JavaTypeMapper映射器:进行java对象的映射关系。
    2. 自定义二进制转换器:比如图片、PDF、PPT、流媒体。
  4. TextConverter:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
1/**
2 * {@link org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter#setMessageConverter(MessageConverter)}
3 */
4public class MyMessageConverter implements MessageConverter {
5    @Override
6    public Message toMessage(Object message, MessageProperties messageProperties) throws MessageConversionException {        
7       return new Message(message.toString().getBytes(), messageProperties);
8    }
9    
10    @Override
11    public Object fromMessage(Message message) throws MessageConversionException {        
12      String contentType = message.getMessageProperties().getContentType();
13        if (contentType != null && contentType.contains("text")) {            
14          return new String(message.getBody());
15        }
16        
17        return message.getBody();
18    }
19}
20
21
  1. JSON格式的转换器


1
2
3
4
5
6
7
1// 通过自定义listenerAdapter和转换DefaultListenerMethod参数类型.
2MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(new MyMessageListenerAdapter());
3listenerAdapter.setDefaultListenerMethod("defaultListener");
4listenerAdapter.setMessageConverter(new MyMessageConverter());
5container.setMessageListener(listenerAdapter);
6
7

给TA打赏
共{{data.count}}人
人已打赏
安全网络

CDN安全市场到2022年价值76.3亿美元

2018-2-1 18:02:50

病毒疫情

福建省新增新型冠状病毒感染的肺炎疫情情况

2020-2-4 11:00:00

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索