Java使用RabbitMQ(六)–订阅发布

释放双眼,带上耳机,听听看~!

发布订阅模式
这一篇主要是java 客户端使用rabbitMQ发布和订阅消息。
前面几篇讲的是将一个消息只发给一个worker,这里讲的是将一个消息同时发给多个订阅者,也就是发布/订阅模式。

为了阐明这种模式,这里将创建一个简单的日志系统,这个系统由2部分组成:第一部分,发送日志消息;第二部分接收和打印日志消息。

在这个日志系统中,复制了一份receiver,一个接收者用来把收到的消息存储到硬盘中,另一个用来把消息展示到屏幕上。

在前面的文章中,我们通过队列发送和接收消息,现在将介绍一种新的消息模式。

Exchanges (交换机)

这种模式的核心思想是:生产者从不直接发送消息到任意一个队列中,也不知道消息将被发送到哪个队列中,而是把消息发送到交换机中,

交换机 是一个简单的东西,它一方面从生产者这里接收消息,另一方面将收到的消息发送到队列中。但交换机必须知道怎么处理它收到的消息,是放到指定的队列中?放到很多个队列中?还是应该丢弃?这个规则,由exchange type定义。

一些可用的交换机模型:direct, topic, match,trace,headers 和fanout。
默认使用的direct。
在前面的文章中,我们不知道交换机的存在,仍然能给发送消息到队列中,是因为使用了默认的交换机


1
2
1channel.basicPublish("", "hello", null, message.getBytes());
2

第一个参数“”就是交换机的名字


1
2
1channel.exchangeDeclare("logs", "fanout");
2

上面声明了交换机的名字和类型。
fanout是把它收到的消息广播给所有的它知道的队列(绑定到这个交换机的队列)。这也是这个日志系统所用到的。


1
2
1String queueName = channel.queueDeclare().getQueue();
2

这会创建一个非持久的,唯一的,自动删除的队列。

如果没有消费者接收消息,消息会消失。
生产者代码:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
1public static void main(String[] args) {
2        try(Connection connection=getConnection("guest","guest","localhost",5672,"/");
3            Channel channel=connection.createChannel()) {
4
5            channel.exchangeDeclare(args[0],"fanout");
6            String queueName= channel.queueDeclare().getQueue();
7            System.out.println(queueName);
8            String message=getMessage(new String[]{"a","b","c"});
9//            channel.queueBind(queueName,args[0],"");
10            int i=0;
11            while (true){
12                Thread.sleep(1000);
13                channel.basicPublish(args[0],queueName, MessageProperties.PERSISTENT_TEXT_PLAIN,(i+++message).getBytes("utf-8"));
14            }
15
16//            Thread.sleep(2000);
17        } catch (Exception e) {
18            throw new RuntimeException(e);
19        }
20    }
21

消费者代码:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
1 public static void main(String[] args) {
2        try(Connection connection=BaseTest2.getConnection("guest","guest","localhost",5672,"/");
3            Channel channel=connection.createChannel()) {
4
5            channel.exchangeDeclare(args[0],"fanout");
6//            channel.queueDeclare(BaseTest2.queueName,true,false,false,null);
7            String queueName= channel.queueDeclare().getQueue();
8            channel.queueBind(queueName,args[0],"");
9            System.out.println(queueName);
10            Consumer consumer=new DefaultConsumer(channel) {
11                @Override
12                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
13                    String message = new String(body, "UTF-8");
14                    try {
15                        doWork(message);
16                    } catch (InterruptedException e) {
17                        e.printStackTrace();
18                    } finally {
19                        System.out.println(message+" [x] Done");
20                    }
21                    channel.basicAck(envelope.getDeliveryTag(),false);
22                }
23            };
24            channel.basicConsume(queueName,false, consumer);
25            Thread.sleep(200000);
26        } catch (Exception e) {
27            throw new RuntimeException(e);
28        }
29
30    }
31

给TA打赏
共{{data.count}}人
人已打赏
安全网络

CDN安全市场到2022年价值76.3亿美元

2018-2-1 18:02:50

安全运维

《解读NoSQL》——2.3 策略地使用RAM、SSD和磁盘提升性能

2021-12-11 11:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索