0°

Rabbitmq 延迟队列

延迟队列存储的对象是对应的延迟消息,所谓"延迟消息"是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费,延迟队列的使用场景有很多,比如:

1、在订单系统中,某个用户下单之后通常有30分钟的时间进行支付,如果 30 分钟之内没有支付成功,那么这个订单将进行取消,这时就可以使用延迟队列来处理这些订单了

2、用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作。这时候就可以将用户指令发送到延迟队列,当指令设定的时间到了再将指令推送到智能设备。

实现延迟队列可以采用死信队列+过期时间的方式,比如在死信队列的那篇文章中,我们向hello_dlx_normal队列中发送消息,hello_dlx_normal设置了一个过期时间30s,30s后这个消息转入hello_dlx这个死信队列。消费者订阅hello_dlx这个队列,而不是hello_dlx_normal。这样就可以实现一个延迟队列的效果。

实现延迟队列,还可以采用rabbitmq提供的延迟队列插件。

1、访问http://www.rabbitmq.com/community-plugins.html 下载对应版本的延迟插件

90a6b8d94bb6e407e88b1b92a688a92bef3.jpg

2、将下载的插件重命名 rabbitmq_delayed_message_exchange-0.0.1.ez

将插件上传到/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/plugins 这个目录下

d206fc5734460c214874b0c924279027e8c.jpg

3、执行命令 rabbitmq-plugins enable rabbitmq_delayed_message_exchange

4、查看交换机的类型

dc38a05daf5814b2e41315eb96c5cc8e0d8.jpg

 

5、声明交换机和队列并绑定,这里的交换机类型x-delayed-message,并设置一个参数x-delayed-type,表明该交换机的是topic、direct、fanout哪一种特性的交换机。x-delayed-message是插件提供的类型,并不是rabbitmq本身的。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
1    /**
2     * 1、声明交换机和队列并绑定
3     */
4    @Test
5    public void decalreExchangeAndQueue() throws Exception {
6        String exchange = "hello_delay";
7        // 获取到连接
8        Connection connection = ConnectionUtil.getConnection();
9        // 获取通道
10        Channel channel = connection.createChannel();
11        //声明一个延迟队列,该队列的类型为direct这种类型
12        Map<String, Object> args = new HashMap<String, Object>();
13        args.put("x-delayed-type", "direct");
14        channel.exchangeDeclare(exchange, "x-delayed-message", true, false, args);
15
16        String queueName = "hello_delay_c";
17        // 声明队列
18        channel.queueDeclare(queueName, true, false, false, null);
19        // 绑定队列到交换机
20        String routingKey = "aaa";
21        channel.queueBind(queueName, exchange, routingKey,null);
22    }
23

3848b063fcf11b9d3fd8ab98dd07e4424c5.jpg

6、发送消息的时候通过在header添加”x-delay”参数来控制消息的延时时间


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
1    /**
2     * 2、生产者发送消息
3     * @throws Exception
4     */
5    @Test
6    public void sendMessage() throws Exception {
7        String exchange = "hello_delay";
8        // 获取到连接
9        Connection connection = ConnectionUtil.getConnection();
10        // 获取通道
11        Channel channel = connection.createChannel();
12        // 消息内容
13        String message = "Less is more direct delay";
14        Map<String, Object> headers = new HashMap<String, Object>();
15        //发送消息的时候通过在header添加”x-delay”参数来控制消息的延时时间
16        headers.put("x-delay", 60000);//延迟60s
17        AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers);
18        // 发布消息到Exchange 指定路由键aaa
19        channel.basicPublish(exchange, "aaa", props.build(), message.getBytes());
20        channel.close();
21        connection.close();
22    }
23

运行后,60s后我们可以在**hello_delay_c **这个队列里看到消息。

50fcd124ec106bd85ce8067e589fbb0777f.jpg

6d8f55652c21f5441e0b056a1f225474d4a.jpg

详细源码地址

http://​https://github.com/suzhe2018/rabbitmq-item

165c46d43e58f951d596827201e58f406f0.jpg

 

 

「点点赞赏,手留余香」

    还没有人赞赏,快来当第一个赞赏的人吧!