Java使用RabbitMQ(七)–Routing

释放双眼,带上耳机,听听看~!

Routing 路由选择
前面的文章 介绍了,怎么广播消息到多个接收者,这一篇将怎么订阅消息的子集。

比如指定一个消费者只把 错误日志写入磁盘,另一个消费者仍然能够将所有的日志消息打印到屏幕上。

Binding
一个Binding描述的是交换机和队列之间的关系,比如描述 某个队列对这个交换机中的哪些消息感兴趣。


1
2
1channel.queueBind(queueName, EXCHANGE_NAME, "");
2

第三个参数就是Binding的描述参数,也叫binding key(与basic_publish区分)。
创建一个简单的Binding,如下:


1
2
1channel.queueBind(queueName, EXCHANGE_NAME, "black");
2

Binding key的指定和 交换机(exchange)的类型有关。

fanout不能提供太大的灵活性,所以为了筛选信息,我们使用direct交换机。

生产方:
生产方不能创建队列,发布时原本填队列名的位置填写 routkey


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
1 try(Connection connection=getConnection("guest","guest","localhost",5672,"/");
2            Channel channel=connection.createChannel()) {
3
4            channel.exchangeDeclare(args[0],"direct");
5
6            String message=getMessage(new String[]{"a","b","c"});
7            int i=0;
8            while (true){
9                Thread.sleep(1000);
10                channel.basicPublish(args[0],args[1], null,(i+++message).getBytes("utf-8"));
11            }
12
13//            Thread.sleep(2000);
14        } catch (Exception e) {
15            throw new RuntimeException(e);
16        }
17    }
18

消费方:
主要是注意绑定routkey


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
1 public static void main(String[] args) {
2        try(Connection connection=BaseTest2.getConnection("guest","guest","localhost",5672,"/");
3            Channel channel=connection.createChannel()) {
4            channel.exchangeDeclare(args[0],"direct");
5//            channel.queueDeclare("red",false,false,false,null);
6            String queueName= channel.queueDeclare().getQueue();
7            channel.queueBind(queueName,args[0],args[1]);
8//            System.out.println(queueName);
9            Consumer consumer=new DefaultConsumer(channel) {
10                @Override
11                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
12                    String message = new String(body, "UTF-8");
13                    try {
14                        doWork(message);
15                    } catch (InterruptedException e) {
16                        e.printStackTrace();
17                    } finally {
18                        System.out.println(message+" [x] Done");
19                    }
20//                    channel.basicAck(envelope.getDeliveryTag(),false);
21                }
22            };
23            channel.basicConsume(queueName,true, consumer);
24            Thread.sleep(200000);
25        } catch (Exception e) {
26            throw new RuntimeException(e);
27        }
28
29    }
30

给TA打赏
共{{data.count}}人
人已打赏
安全网络

CDN安全市场到2022年价值76.3亿美元

2018-2-1 18:02:50

安全技术

python datetime模块

2022-1-12 12:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索