Routing 路由选择
前面的文章 介绍了,怎么广播消息到多个接收者,这一篇将怎么订阅消息的子集。
比如指定一个消费者只把 错误日志写入磁盘,另一个消费者仍然能够将所有的日志消息打印到屏幕上。
Binding
一个Binding描述的是交换机和队列之间的关系,比如描述 某个队列对这个交换机中的哪些消息感兴趣。
1
2 1channel.queueBind(queueName, EXCHANGE_NAME, "");
2
第三个参数就是Binding的描述参数,也叫binding key(与basic_publish区分)。
创建一个简单的Binding,如下:
1
2 1channel.queueBind(queueName, EXCHANGE_NAME, "black");
2
Binding key的指定和 交换机(exchange)的类型有关。
fanout不能提供太大的灵活性,所以为了筛选信息,我们使用direct交换机。
生产方:
生产方不能创建队列,发布时原本填队列名的位置填写 routkey
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 1 try(Connection connection=getConnection("guest","guest","localhost",5672,"/");
2 Channel channel=connection.createChannel()) {
3
4 channel.exchangeDeclare(args[0],"direct");
5
6 String message=getMessage(new String[]{"a","b","c"});
7 int i=0;
8 while (true){
9 Thread.sleep(1000);
10 channel.basicPublish(args[0],args[1], null,(i+++message).getBytes("utf-8"));
11 }
12
13// Thread.sleep(2000);
14 } catch (Exception e) {
15 throw new RuntimeException(e);
16 }
17 }
18
消费方:
主要是注意绑定routkey
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30 1 public static void main(String[] args) {
2 try(Connection connection=BaseTest2.getConnection("guest","guest","localhost",5672,"/");
3 Channel channel=connection.createChannel()) {
4 channel.exchangeDeclare(args[0],"direct");
5// channel.queueDeclare("red",false,false,false,null);
6 String queueName= channel.queueDeclare().getQueue();
7 channel.queueBind(queueName,args[0],args[1]);
8// System.out.println(queueName);
9 Consumer consumer=new DefaultConsumer(channel) {
10 @Override
11 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
12 String message = new String(body, "UTF-8");
13 try {
14 doWork(message);
15 } catch (InterruptedException e) {
16 e.printStackTrace();
17 } finally {
18 System.out.println(message+" [x] Done");
19 }
20// channel.basicAck(envelope.getDeliveryTag(),false);
21 }
22 };
23 channel.basicConsume(queueName,true, consumer);
24 Thread.sleep(200000);
25 } catch (Exception e) {
26 throw new RuntimeException(e);
27 }
28
29 }
30