0°

SpringBoot集成RabbitMQ

AmqpTemplate,RabbitTemplate

Spring AMQP提供了一个发送和接收消息的操作模板类AmqpTemplate。 AmqpTemplate它定义包含了发送和接收消息等的一些基本的操作功能。RabbitTemplate是AmqpTemplate的一个实现。

RabbitTemplate支持消息的确认与返回,为了返回消息,RabbitTemplate 需要设置mandatory 属性为true,并且CachingConnectionFactory 的publisherReturns属性也需要设置为true。返回的消息会根据它注册的RabbitTemplate.ReturnCallback setReturnCallback 回调发送到给客户端,

一个RabbitTemplate仅能支持一个ReturnCallback 。

为了确认Confirms消息, CachingConnectionFactory 的publisherConfirms 属性也需要设置为true,确认的消息会根据它注册的RabbitTemplate.ConfirmCallback setConfirmCallback回调发送到给客户端。一个RabbitTemplate也仅能支持一个ConfirmCallback.


1
2
3
4
5
6
1<dependency>
2  <groupId>org.springframework.boot</groupId>
3  <artifactId>spring-boot-starter-amqp</artifactId>
4</dependency>
5
6

SpringBoot集成RabbitMQ


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
1server.port=8083
2#服务器配置
3spring.application.name=rabbitmq-hello-sending
4#rabbitmq连接参数
5spring.rabbitmq.addresses:ip1:port1,ip2:port2,ip3:port3 #集群或单机 都可配置
6spring.rabbitmq.username=linpeng
7spring.rabbitmq.password=123456
8
9# rabbitmq服务器的虚拟主机名,可以在后台管理系统上查看和新建
10spring.rabbitmq.virtual-host=/test
11# 连接超时
12spring.rabbitmq.connection-timeout=5s
13
14# 发送方
15# 开启发送确认(未到达MQ服务器)
16spring.rabbitmq.publisher-confirms=true
17# 开启发送失败退回(未找到对应queue)
18spring.rabbitmq.publisher-returns=true
19
20# 消费方 开启手动ACK(坑:当序列化为JSON时,此配置会失效,见下文)
21spring.rabbitmq.listener.direct.acknowledge-mode=manual
22spring.rabbitmq.listener.simple.acknowledge-mode=manual
23
24# 消费方
25spring.rabbitmq.listener.concurrency=2  //最小消息监听线程数
26spring.rabbitmq.listener.max-concurrency=2 //最大消息监听线程数
27
28#消费者每次从队列获取的消息数量 (默认一次250个)
29#通过查看后台管理器中queue的unacked数量
30spring.rabbitmq.listener.simple.prefetch= 5
31
32#消费者自动启动
33spring.rabbitmq.listener.simple.auto-startup=true
34
35#消费失败,自动重新入队
36spring.rabbitmq.listener.simple.default-requeue-rejected= true
37
38#启用发送重试
39spring.rabbitmq.template.retry.enabled=true
40spring.rabbitmq.template.retry.initial-interval=1000
41spring.rabbitmq.template.retry.max-attempts=3
42spring.rabbitmq.template.retry.max-interval=10000
43spring.rabbitmq.template.retry.multiplier=1.0
44
45

RabbitTemplate

默认一个
RabbitTemplate在RabbitMQ中相当于一个connection,每发送一次消息相当于channel,MQ接收消息后释放channel。每个connection最多支持2048个channel,假如从一个connection同时超过2048个线程并发发送,channel超过2048,会报错
org.springframework.amqp.AmqpResourceNotAvailableException: The channelMax limit is reached. Try later。

测试启动publisher-confirms后,400个线程通过一个
RabbitTemplate并发发送10000消息,同时就可能产生1000左右的channel。因为发送端channel等待confirm回调所以没有释放。10000消息全部发送在几秒内完成,10000消息全部confirm回调完成用时22秒。(测试机/测试服务器配置都很低)

后台管理页面查看connection+channel

此connection中有10个线程并发发送消息,监控到10个channel生成,MQ完成接收后释放channel。如果是publisher-confirms模式,channel会保持到confirm回调完成再释放,
影响并发性能。每个connection最多支持2048个channel。

测试启动publisher-confirms后,500个线程并发发送,部分消息报
AmqpResourceNotAvailableException。400个线程通过一个
RabbitTemplate并发发送10000消息,最高同时就可能产生1000多的channel。因为channel在等待执行confirm回调。10000消息全部发送在几秒内完成,10000消息全部confirm回调完成用时22秒,此时所有channel全部释放。

 

绑定队列

若在rabbitmq的管理页面手动创建队列和交换机,则可以不再代码中声明 


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
1package com.example.demo;
2
3import com.rabbitmq.client.Channel;
4import org.springframework.amqp.core.*;
5import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
6import org.springframework.amqp.rabbit.connection.ConnectionFactory;
7import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
8import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
9import org.springframework.context.annotation.Bean;
10import org.springframework.context.annotation.Configuration;
11
12@Configuration
13public class RabbitConfig {
14
15    @Bean
16    public Queue QueueA() {
17        return new Queue("hello");
18    }
19
20    @Bean
21    public Queue QueueB() {
22        return new Queue("helloObj");
23    }
24
25    /**
26     * Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。
27     * @return
28     */
29    @Bean
30    FanoutExchange fanoutExchange() {
31        return new FanoutExchange("ABExchange");
32    }
33
34    @Bean
35    DirectExchange Exchange() {
36        return new DirectExchange("DExchange");
37    }
38  
39    @Bean
40    Binding bindingExchangeA(Queue QueueA, FanoutExchange fanoutExchange) {
41        return BindingBuilder.bind(QueueA).to(fanoutExchange);
42    }
43
44    @Bean
45    Binding bindingExchangeB(Queue QueueB, FanoutExchange fanoutExchange) {
46        return BindingBuilder.bind(QueueB).to(fanoutExchange);
47    }
48
49    @Bean
50    Binding bindingExchange() {
51        return BindingBuilder.bind(QueueA()).to(Exchange()).with("TEST");//routingKey
52    }
53}
54
55

 

**消息发送者 **

ConfirmCallback :ACK=true仅仅标示消息已被Broker接收到,并不表示已成功投放至消息队列中, ACK=false标示消息由于Broker处理错误,消息并未处理成功。如未找到对应交换机返回ACK=false。

ReturnCallback:当消息发送出去找不到对应路由队列时,将会把消息退回 。如果有任何一个路由队列接收投递消息成功,则不会退回消息。MQ成功接收,但是未找到对应队列触发

通过以上异步确认机制,增加降级、补偿处理。比如发送时保存信息和消息ID,ConfirmCallback 通过ID找到对应信息重发,注意要保证
幂等性。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
1package com.example.demo;
2
3import org.springframework.amqp.core.AmqpTemplate;
4import org.springframework.amqp.core.Message;
5import org.springframework.amqp.rabbit.core.RabbitTemplate;
6import org.springframework.amqp.rabbit.support.CorrelationData;
7import org.springframework.beans.factory.annotation.Autowired;
8import org.springframework.stereotype.Component;
9import org.springframework.stereotype.Service;
10
11import java.util.Date;
12//RabbitTemplate.ConfirmCallback
13@Service
14public class HelloSender implements RabbitTemplate.ReturnCallback {
15
16    @Autowired
17    private RabbitTemplate rabbitTemplate;
18
19    public void send() {
20        String context = "你好现在是 " + new Date() +"";
21        System.out.println("HelloSender发送内容 : " + context);
22
23        //消息序列化设置
24        //rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
25        
26        //自身实现ReturnCallback接口 设置异步回调对象为this
27        this.rabbitTemplate.setReturnCallback(this);
28
29        
30        //若是当前类实现RabbitTemplate.ConfirmCallback接口,则可以设置为this
31        //发送前给RabbitTemplate设置一个异步回调对象  RabbitTemplate.ConfirmCallback接口的匿名类
32        this.rabbitTemplate.setConfirmCallback((correlationData, confirm, cause) -> {
33        //若发送时没有CorrelationData,则这里correlationData==null
34            if (!confirm) {
35                System.out.println("HelloSender消息发送失败" + cause + correlationData.getId() );
36               //correlationData.getReturnedMessage(); Message
37               //correlationData.toString();
38            } else {
39                System.out.println("HelloSender 消息发送成功 ");
40            }
41        });
42        //this.rabbitTemplate.setConfirmCallback(this);
43        //rabbitTemplate.convertAndSend("hello", context);
44
45        //这里指定路由键,注意不是队列名
46        //发送时 可以指定消息ID,方便在ConfirmCallback时候二次处理消息
47        rabbitTemplate.convertAndSend("DExchange","QueueRoutingKey", context, new CorrelationData("自定义消息ID"));
48    }
49
50    public void sendObj() {
51       MessageObj obj = new MessageObj();
52       obj.setACK(false);
53       obj.setId(123);
54       obj.setName("zhangsan");
55       obj.setValue("data");
56       System.out.println("发送 : " + obj);
57       this.rabbitTemplate.convertAndSend("helloObj", obj);
58    }
59
60    @Override
61    public void returnedMessage(Message message, int i, String cause, String exchange, String queue) {
62        //没有找到queue
63        //Message中的成员,Body为消息内容  
64       //(Body:'hello' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
65    }
66
67//    @Override
68//    public void confirm(CorrelationData correlationData, boolean confirm, String cause) {
69//        System.out.println("sender success");
70//    }
71
72}
73
74

测试发送:

使用Spring默认的rabbitTemplate发送消息,CorrelationData可以重复。

交换机+路由键+消息Object+CorrelationData


1
2
1rabbitTemplate.convertAndSend("TEST.EX","TEST","String:message",new CorrelationData("111"));
2

在rabbitmq控制台上getmessage查看 ,rabbitTemplate默认发送deliverymode=2消息,已经设置了消息持久化。

测试速度:

测试100个线程同时并发向同一队列发送简单消息(15左右长度的字符串)。从发送到100个消息全部完成ConfirmCallback,用时为600ms左右。此过程不计入消费速度。

400个线程通过一个
RabbitTemplate并发发送10000消息,同时就可能产生1000左右的channel。因为channel等在confirm。10000消息全部发送在几秒内完成,10000消息全部confirm回调完成用时22秒。

 

测试ConfirmCallback回调:

public void confirm(CorrelationData correlationData, boolean confirm, String cause) ;

confirm==true仅仅标示消息已被Broker接收到,并不表示已成功投放至消息队列中, confirm==false标示消息由于Broker处理错误,消息并未处理成功。如未找到对应交换机返回confirm==false。

在此方法中针对confirm==false的消息实现
降级/补偿处理:重发、本地缓存、计入数据库/Redis等、更新状态…..

测试环境:实例化一个ConfirmCallback接口对象,作为rabbitTemplate共用回调处理对象。

回调测试结果:

      1 先发送到MQ的消息,先完成confirm回调。

      2 ConfirmCallback默认是由同一个线程执行回调,打印线程名可以看到线程名为【AMQP Connection rabbitmqIp:port】

        为了提高效率ConfirmCallback可以把任务提交到线程池,避免阻塞后边的Confirm任务。

      3 若发送时没有携带CorrelationData,回调时这里correlationData==null

      4.设置消息确认会影响并发性能,每个线程发送生成一个channel,channel会保持到confirm回调完成再释放。因为每个connection最多支持2048个channel,当channel达到2048时,会报错
org.springframework.amqp.AmqpResourceNotAvailableException: The channelMax limit is reached. Try later。

 

测试ReturnCallback 回调:

 public void returnedMessage(Message message, int i, String cause, String exchange, String queue) ;

MQ成功接收消息,但是未找到对应路由键的队列后回调。实现
降级/补偿处理。

测试环境:实例化一个ReturnCallback接口对象,作为rabbitTemplate共用回调处理对象。

回调测试结果:

默认是由同一个线程执行回调,打印线程名可以看到线程名为【AMQP Connection rabbitmqIp:port】

**message=**返回的Message对象中的成员,Body为发送时的消息内容   ,receivedDeliveryMode=PERSISTENT=2 为持久化消息。spring_returned_message_correlation=发送时的CorrelationData
(Body:'String:message' MessageProperties [headers={spring_returned_message_correlation=111}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])

**cause=**NO_ROUTE

**exchange、queue **为发送时的配置

 

消息消费者

设置QOS,避免触发流控机制

#消费者每次从队列获取的消息数量 (默认一次250个)
spring.rabbitmq.listener.simple.prefetch= 5

当QUEUE达到5条Unacked消息时,不会再推送消息给Consumer。查看后台管理器中queue的unacked数量


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
1package com.example.demo;
2
3import com.rabbitmq.client.Channel;
4import org.springframework.amqp.core.Message;
5import org.springframework.amqp.core.Queue;
6import org.springframework.amqp.rabbit.annotation.RabbitHandler;
7import org.springframework.amqp.rabbit.annotation.RabbitListener;
8import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
9import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
10import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
11import org.springframework.amqp.support.AmqpHeaders;
12import org.springframework.beans.factory.annotation.Configurable;
13import org.springframework.context.annotation.Bean;
14import org.springframework.messaging.handler.annotation.Headers;
15import org.springframework.stereotype.Component;
16
17import java.io.IOException;
18import java.util.Date;
19import java.util.Map;
20
21@Component
22public class HelloReceiver {
23
24    @RabbitListener(queues = "hello") //这里是队列名,不是路由键
25    public void process(String msg,Channel channel, Message message) throws IOException {
26        System.out.println("HelloReceiver收到  : " + msg +"收到时间"+new Date());
27        try {
28            //告诉MQ服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 后续还会在发
29            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
30            System.out.println("receiver success");
31        } catch (IOException e) {
32            e.printStackTrace();
33            //丢弃这条消息
34            //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
35            System.out.println("receiver fail");
36        }
37
38    }
39}
40
41

msg是消息内容,相当于
Message对象中的body。

Message对象的成员:

可以看到有消息信息BODY,发送方生成的消息CorrelationData,还有执行的Method对象(@RabbitListener标注的方法),目标BEAN

 

备注:我们用注解的方式来接受消息 就不要用 自己创建对象实现ChannelAwareMessageListener的方式来接受消息 这种方式还要去全局里面配置麻烦,直接用@RabbitListener(queues = "hello")最简单

消息确认  因为我在属性配置文件里面开启了ACK确认 所以如果代码没有执行ACK确认 你在RabbitMQ的后台会看到消息会一直留在队列里面未消费掉 只要程序一启动开始接受该队列消息的时候 又会收到

方法参数详解:https://www.cnblogs.com/piaolingzxh/p/5448927.html


1
2
3
4
5
1channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
2
3deliveryTag:该消息的index,由发送方生成
4multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。
5

 


1
2
3
4
5
6
1channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
2
3deliveryTag:该消息的index
4multiple:是否批量.true:将一次性拒绝所有小于deliveryTag的消息。
5requeue:被拒绝的是否重新入队列,true 放在队首,false 消息进入绑定的DLX。一定注意:若此消息一直Nack重入队会导致的死循环
6

channel.basicNack 与 channel.basicReject 的区别在于basicNack可以拒绝多条消息,而basicReject一次只能拒绝一条消息


1
2
3
4
5
1channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
2
3deliveryTag:该消息的index
4requeue:被拒绝的是否重新入队列。false 消息进入绑定的DLX
5

ShutdownSignalException

1 队列名找不到

2 代码中有ack,但是没有配置手动ACK

 

消费超时

消费超时,queue中unacked的消息会退回到queue中,且消费者ACK时会失败。

 

使用@Payload和@Headers注解


1
2
3
4
5
6
7
8
9
10
11
12
13
1@Component
2public class MessageHandler {
3
4    //获取消息的头属性和body属性
5    @RabbitListener(queues = "zhihao.miao.order")
6    public void handleMessage(@Payload String body, @Headers Map<String,Object> headers){
7        System.out.println("====消费消息===handleMessage");
8        System.out.println(headers);
9        System.out.println(body);
10    }
11}
12
13

 

@RabbitListener 和 @RabbitHandler 搭配使用

  • @RabbitListener 可以标注在类上面,需配合 @RabbitHandler 注解一起使用

  • @RabbitListener 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理,具体使用哪个方法处理,根据 MessageConverter 转换后的参数类型


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
1@Component
2@RabbitListener(queues = "consumer_queue")
3public class Receiver {
4
5    @RabbitHandler
6    public void processMessage1(String message) {
7        System.out.println(message);
8    }
9
10    @RabbitHandler
11    public void processMessage2(byte[] message) {
12        System.out.println(new String(message));
13    }
14    
15}
16

 

 

序列化

当中默认的序列化类为SimpleMessageConverter。

仅仅有调用了convertAndSend方法才会使用对应的MessageConvert进行消息的序列化与反序列化。

SimpleMessageConverter对于要发送的消息体body为字节数组时。不进行处理。

对于假设是String。则将String转成字节数组。

对于假设是Java对象,则使用jdk序列化Serializable将消息转成字节数组。转出来的结果较大,含class类名。类对应方法等信息。因此性能较差。

当使用RabbitMq作为中间件时,数据量比較大,此时就要考虑使用类似Jackson2JsonMessageConverter。hessian等序列化形式。以此提高性能。

 

使用 JSON 序列化与反序列化

https://www.daimajiaoliu.com/daima/56a7c0bc754cc04

发送


1
2
3
4
5
6
7
1@Bean
2    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
3        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
4        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());        
5        return rabbitTemplate;
6    }
7

1
2
3
1User user = new User("linyuan");
2rabbitTemplate.convertAndSend("topic.exchange","queue1",user);
3

接收


1
2
3
4
5
6
7
8
9
1@Bean
2    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
3        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
4        factory.setConnectionFactory(connectionFactory);
5        factory.setMessageConverter(new Jackson2JsonMessageConverter());
6        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);//json序列化时,若想手动ACK,则必须配置
7        return factory;
8    }
9

1
2
3
4
5
6
7
8
9
10
1@Component
2@RabbitListener(queues = "queue1")
3public class Receiver {
4
5    @RabbitHandler
6    public void processMessage1(@Payload User user) {
7        System.out.println(user.getName());
8    }
9}
10

消费者+json反序列化 造成手动ACK配置失效

解决方案: https://www.daimajiaoliu.com/daima/4796ad98110041c

这是springboot集成RabbitMQ的一个大坑。当消费者配置JSON反序列化时,配置文件中的手动ACK会失效,消费者会变成自动ACK模式。
spring.rabbitmq.listener.direct.acknowledge-mode=
manual
,spring.rabbitmq.listener.simple.acknowledge-mode=
manual  配置失效。

解决方法是消费者配置RabbitListenerContainerFactory这个Bean时(见上),设置factory.setAcknowledgeMode(AcknowledgeMode.MANUAL)。把消费者强制转换为手动ACK。

如果配置失效切换为自动ACK,但是代码中又使用channel.basicAck手动ACK。这样会造成双ACK的ERROR,接着信道会重启重连。如下:


1
2
3
1o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
2
3

unknown delivery tag 1表示当前Channel中找不到delivery-tag=1的消息,其实是这个消息已经自动ACK了,basicAck时就会出错。测试显示,消息并不会丢失而是在出现ERROR异常后走向Nack后重新入队,再多次重复消费后最终ACK成功,严重降低消费者的执行效率。

 

Delivery Tags投递的标识

当一个消费者向RabbitMQ注册后,RabbitMQ会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它在一个channel中唯一代表了一次投递。delivery tag的唯一标识范围限于channel. delivery tag是单调递增的正整数,客户端获取投递的方法用用dellivery tag作为一个参数。

 

 

TestController测试


1
2
3
4
5
6
7
8
9
10
11
12
1@Autowired
2    private HelloSender helloSender;
3
4    /**
5     * 单生产者-单个消费者
6     */
7    @RequestMapping("/test")
8    public void hello() throws Exception {
9        helloSender.send();
10    }
11
12

 发送消息

ACK场景测试

我们把HelloReceiver的ACK确认代码注释掉 ,那消息就算程序收到了, 但是未确认ACK导致消息服务器以为他是未成功消费的,若此时消费者断开则消息返回队列,后续还会再发。

 

「点点赞赏,手留余香」

    还没有人赞赏,快来当第一个赞赏的人吧!