rabbitMq实现延时队列

释放双眼,带上耳机,听听看~!

    rabbitMq是受欢迎的消息中间件之一,相比其他的消息中间件,具有高并发的特性(天生具备高并发高可用的erlang语言编写),除此之外,还可以持久化,保证消息不易丢失,高可用,实现集群部署,提供灵活的路由和可靠性,可视化管理等等的优点。

    相比于其他的消息队列,rabbitmq最大的特色就是加入了exchange(交换器)这个东西,AMQP协议中的核心思想就是生产者和消费者隔离,生产者从不直接将消息发送给队列。生产者通常不知道是否一个消息会被发送到队列中,只是将消息发送到一个交换机。先由Exchange来接收,然后Exchange按照特定的策略转发到Queue进行存储。同理,消费者也是如此。Exchange 就类似于一个交换机,转发各个消息分发到相应的队列中。

      RabbitMQ常用的Exchange Type有三种:fanout、direct、topic。

    **  fanout**:把所有发送到该Exchange的消息投递到所有与它绑定的队列中。

     ** direct:**把消息投递到那些binding key与routing key完全匹配的队列中。

**      topic**:将消息路由到binding key与routing key模式匹配的队列中。

言归正传,延时队列如何通过rabbitmq来实现呢?

分析:首先rabbitmq自己是不具备延时的功能的,除了使用官方提供的插件之外,我们还可以通过ttl(设置超时时间的方式)+ DLX(一个死信队列)的方式来实现 + Router(转发队列)

其中,ttl可以设置在消息上,也可以设置在队列上,设置在消息上可以提供更大的灵活性,但是如果同时设置超时时间的话,就取最小的超时时间为准。

此外,死信队列是一个普通的队列,它没有消费者,用来存储有超时时间信息的消息,并且可以设置当消息超时(ttl),转发到另一个指定队列(此处设置转发到router, 当发送消息之后(发送时,带上要延时的队列名称),等待消息超时,将消息转发到指定的Router队列。

最后,转发队列,用来接收死信队列超时消息,在接收到之后,消费者将消息解析,获取queueName,body,再向所获取的queueName队列发送一条消息,内容为body.

下面是代码:

生产者:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
1package cn.chinotan.service.delayQueueRabbitMQ;
2
3import org.apache.commons.lang3.StringUtils;
4import org.slf4j.Logger;
5import org.slf4j.LoggerFactory;
6import org.springframework.amqp.AmqpException;
7import org.springframework.amqp.core.AmqpTemplate;
8import org.springframework.amqp.core.Message;
9import org.springframework.amqp.core.MessagePostProcessor;
10import org.springframework.beans.factory.annotation.Autowired;
11import org.springframework.stereotype.Service;
12
13/**
14 * @program: test
15 * @description: 生产者
16 * @author: xingcheng
17 * @create: 2018-08-12 12:33
18 **/
19@Service
20public class Producr {
21    private static final Logger LOGGER = LoggerFactory.getLogger(Producr.class);
22
23    @Autowired
24    private AmqpTemplate amqpTemplate;
25
26    public void send(String msg, long time, String delayQueueName) {
27        //rabbit默认为毫秒级
28        long times = time * 1000;
29        MessagePostProcessor processor = new MessagePostProcessor() {
30
31            @Override
32            public Message postProcessMessage(Message message) throws AmqpException {
33                message.getMessageProperties().setExpiration(String.valueOf(times));
34                return message;
35            }
36        };
37        // 拼装msg
38        msg = StringUtils.join(msg, ":", delayQueueName);
39        amqpTemplate.convertAndSend(MqConstant.MY_EXCHANGE, MqConstant.DEAD_LETTER_QUEUE, msg, processor);
40    }
41}
42
43
44

消费队列1:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
1package cn.chinotan.service.delayQueueRabbitMQ;
2
3import org.slf4j.Logger;
4import org.slf4j.LoggerFactory;
5import org.springframework.amqp.rabbit.annotation.RabbitHandler;
6import org.springframework.amqp.rabbit.annotation.RabbitListener;
7import org.springframework.stereotype.Service;
8
9import java.text.SimpleDateFormat;
10import java.util.Date;
11
12/**
13 * @program: test
14 * @description: queueOne消费者
15 * @author: xingcheng
16 * @create: 2018-08-12 12:35
17 **/
18@Service
19public class MyQueueOneConsumer {
20    private static final Logger LOGGER = LoggerFactory.getLogger(MyQueueOneConsumer.class);
21
22    @RabbitListener(queues=MqConstant.MY_QUEUE_ONE)
23    @RabbitHandler
24    public void process(String content) {
25        LOGGER.info("延迟时间到,queueOne开始执行 {}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
26    }
27}
28
29

消费队列2:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
1package cn.chinotan.service.delayQueueRabbitMQ;
2
3import org.slf4j.Logger;
4import org.slf4j.LoggerFactory;
5import org.springframework.amqp.rabbit.annotation.RabbitHandler;
6import org.springframework.amqp.rabbit.annotation.RabbitListener;
7import org.springframework.beans.factory.annotation.Autowired;
8import org.springframework.stereotype.Service;
9
10import java.text.SimpleDateFormat;
11import java.util.Date;
12
13/**
14 * @program: test
15 * @description: queueTwo队列
16 * @author: xingcheng
17 * @create: 2018-08-12 12:35
18 **/
19@Service
20public class MyQueueTwoConsumer {
21    private static final Logger LOGGER = LoggerFactory.getLogger(MyQueueTwoConsumer.class);
22    @Autowired
23    private Producr producr;
24
25    @RabbitListener(queues=MqConstant.MY_QUEUE_TWO)
26    @RabbitHandler
27    public void process(String content) {
28        LOGGER.info("延迟时间到,queueTwo开始执行 {}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
29    }
30}
31
32

转发队列:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
1package cn.chinotan.service.delayQueueRabbitMQ;
2
3import org.slf4j.Logger;
4import org.slf4j.LoggerFactory;
5import org.springframework.amqp.core.AmqpTemplate;
6import org.springframework.amqp.rabbit.annotation.RabbitHandler;
7import org.springframework.amqp.rabbit.annotation.RabbitListener;
8import org.springframework.beans.factory.annotation.Autowired;
9import org.springframework.stereotype.Service;
10
11import java.text.SimpleDateFormat;
12import java.util.Date;
13
14/**
15 * @program: test
16 * @description: 转发队列
17 * @author: xingcheng
18 * @create: 2018-08-12 12:35
19 **/
20@Service
21public class TradeProcess {
22    private static final Logger LOGGER = LoggerFactory.getLogger(TradeProcess.class);
23
24    @Autowired
25    private AmqpTemplate amqpTemplate;
26    
27    @RabbitListener(queues=MqConstant.MY_TRANS_QUEUE)
28    @RabbitHandler
29    public void process(String content) {
30        String msg = content.split(":")[0];
31        String delayQueueName = content.split(":")[1];
32        amqpTemplate.convertAndSend(MqConstant.MY_EXCHANGE, delayQueueName, msg);
33        LOGGER.info("进行转发 {}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
34    }
35}
36
37

队列配置:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
1package cn.chinotan.service.delayQueueRabbitMQ;
2
3import org.springframework.amqp.core.Binding;
4import org.springframework.amqp.core.BindingBuilder;
5import org.springframework.amqp.core.DirectExchange;
6import org.springframework.amqp.core.Queue;
7import org.springframework.context.annotation.Bean;
8import org.springframework.context.annotation.Configuration;
9
10import java.util.HashMap;
11import java.util.Map;
12
13/**
14 * @program: test
15 * @description: 延时队列rabbitMQ配置
16 * @author: xingcheng
17 * @create: 2018-08-12 12:27
18 **/
19@Configuration
20public class RabbitConfig {
21
22    @Bean
23    public DirectExchange myExchange() {
24        return new DirectExchange(MqConstant.MY_EXCHANGE, true, false);
25    }
26
27    @Bean
28    public Queue myQueueOne() {
29        return new Queue(MqConstant.MY_QUEUE_ONE, true, false, false);
30    }
31
32    @Bean
33    public Queue myQueueTwo() {
34        return new Queue(MqConstant.MY_QUEUE_TWO, true, false, false);
35    }
36    @Bean
37    public Queue myTransQueue() {
38        return new Queue(MqConstant.MY_TRANS_QUEUE, true, false, false);
39    }
40
41    @Bean
42    public Queue deadLetterQueue() {
43        Map<String, Object> map = new HashMap<>();
44        map.put("x-dead-letter-exchange", MqConstant.MY_EXCHANGE);
45        map.put("x-dead-letter-routing-key", MqConstant.MY_TRANS_QUEUE);
46        Queue queue = new Queue(MqConstant.DEAD_LETTER_QUEUE, true, false, false, map);
47        System.out.println("arguments :" + queue.getArguments());
48        return queue;
49    }
50
51    @Bean
52    public Binding queueOneBinding() {
53        return BindingBuilder.bind(myQueueOne()).to(myExchange()).with(MqConstant.MY_QUEUE_ONE);
54    }
55
56    @Bean
57    public Binding queueTwoBinding() {
58        return BindingBuilder.bind(myQueueTwo()).to(myExchange()).with(MqConstant.MY_QUEUE_TWO);
59    }
60
61    @Bean
62    public Binding queueDeadBinding() {
63        return BindingBuilder.bind(deadLetterQueue()).to(myExchange()).with(MqConstant.DEAD_LETTER_QUEUE);
64    }
65
66    @Bean
67    public Binding queueTransBinding() {
68        return BindingBuilder.bind(myTransQueue()).to(myExchange()).with(MqConstant.MY_TRANS_QUEUE);
69    }
70}
71
72
73

队列常量配置:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
1package cn.chinotan.service.delayQueueRabbitMQ;
2
3/**
4 * @program: test
5 * @description: rabbitMq常量
6 * @author: xingcheng
7 * @create: 2018-08-12 12:30
8 **/
9public class MqConstant {
10    
11    public static final String MY_EXCHANGE = "my_exchange";
12    
13    public static final String MY_QUEUE_ONE = "my_queue_one";
14
15    public static final String MY_QUEUE_TWO = "my_queue_two";
16    
17    public static final String DEAD_LETTER_QUEUE = "dead_letter_queue";
18    
19    public static final String MY_TRANS_QUEUE = "my_trans_queue";
20    
21}
22
23

测试延时controller:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
1package cn.chinotan.controller;
2
3import cn.chinotan.service.delayQueueRabbitMQ.MqConstant;
4import cn.chinotan.service.delayQueueRabbitMQ.Producr;
5import org.slf4j.Logger;
6import org.slf4j.LoggerFactory;
7import org.springframework.beans.factory.annotation.Autowired;
8import org.springframework.web.bind.annotation.GetMapping;
9import org.springframework.web.bind.annotation.PathVariable;
10import org.springframework.web.bind.annotation.RequestMapping;
11import org.springframework.web.bind.annotation.RestController;
12
13import java.text.SimpleDateFormat;
14import java.util.Date;
15
16/**
17 * @program: test
18 * @description: 延时队列启动类
19 * @author: xingcheng
20 * @create: 2018-08-12 15:41
21 **/
22@RestController
23@RequestMapping("/delayQueue")
24public class DelayQueueController {
25
26    private static final Logger LOGGER = LoggerFactory.getLogger(DelayQueueController.class);
27
28    @Autowired
29    private Producr producr;
30
31    @GetMapping("/send/{time}")
32    public String send(@PathVariable("time") int time){
33        LOGGER.info("{}秒后, 发送延迟消息,当前时间{}", time, new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
34        producr.send("我是延时消息...", time, MqConstant.MY_QUEUE_TWO);
35        return "ok";
36    }
37    
38}
39
40

测试结果:

rabbitMq实现延时队列

给TA打赏
共{{data.count}}人
人已打赏
安全网络

CDN安全市场到2022年价值76.3亿美元

2018-2-1 18:02:50

安全技术

具备这3种特质 才能称的上是一个好的 Exploit Kit

2016-12-26 12:12:42

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索