Java使用RabbitMQ(二)–发送和接收信息

释放双眼,带上耳机,听听看~!

发送和接收信息

发送方:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
1public class BaseTest2 {
2    public static String queueName="q1";
3
4    public static ConnectionFactory factory;
5    public static Connection getConnection(String username,String password,String host,int port,String virtualhost) {
6        if (factory==null){
7            factory=getFactory(username,password,host,port,virtualhost);
8        }
9        try {
10            return factory.newConnection();
11        } catch (Exception e) {
12            throw new RuntimeException("创建rabbit client 连接失败"+e);
13        }
14    }
15
16    private synchronized static ConnectionFactory getFactory(String username, String password, String host, int port, String virtualhost) {
17        if (factory!=null)
18            return factory;
19        ConnectionFactory factory=new ConnectionFactory();
20        factory.setUsername(username);
21        factory.setPassword(password);
22        factory.setVirtualHost(virtualhost);
23        factory.setHost(host);
24        factory.setPort(port);
25
26        return factory;
27    }
28    private static String getMessage(String[] strings){
29        if (strings.length < 1)
30            return "Hello World!";
31        return joinStrings(strings, " ");
32    }
33
34    private static String joinStrings(String[] strings, String delimiter) {
35        int length = strings.length;
36        if (length == 0) return "";
37        StringBuilder words = new StringBuilder(strings[0]);
38        for (int i = 1; i < length; i++) {
39            words.append(delimiter).append(strings[i]);
40        }
41        return words.toString();
42    }
43    private static void doWork(String task) throws InterruptedException {
44        for (char ch: task.toCharArray()) {
45            if (ch == '.') Thread.sleep(1000);
46        }
47    }
48    public static void main(String[] args) {
49        try(Connection connection=getConnection("guest","guest","localhost",5672,"/");
50            Channel channel=connection.createChannel()) {
51
52//            channel.exchangeDeclare("交换机","direct",true);
53             channel.queueDeclare(queueName,false,false,false,null);
54
55            String message=getMessage(new String[]{"a","b","c"});
56            channel.basicPublish("",queueName,null,message.getBytes("utf-8"));
57
58//            Thread.sleep(2000);
59        } catch (IOException e) {
60            throw new RuntimeException(e);
61        } catch (TimeoutException e) {
62            throw new RuntimeException("连接超时"+e);
63        }
64    }
65}
66

接收方:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
1public class BasicTest3 {
2    public static void main(String[] args) {
3        try(Connection connection=BaseTest2.getConnection("guest","guest","localhost",5672,"/");
4            Channel channel=connection.createChannel()) {
5
6//            channel.exchangeDeclare("交换机","direct",true);
7            channel.queueDeclare(BaseTest2.queueName,false,false,false,null);
8
9            Consumer consumer=new DefaultConsumer(channel) {
10                @Override
11                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
12                    String message = new String(body, "UTF-8");
13                    try {
14                        doWork(message);
15                    } catch (InterruptedException e) {
16                        e.printStackTrace();
17                    } finally {
18                        System.out.println(message+" [x] Done");
19                    }
20//                    channel.basicAck(envelope.getDeliveryTag(),false);
21                }
22            };
23            channel.basicConsume(BaseTest2.queueName,false, consumer);
24            Thread.sleep(2000);
25        } catch (Exception e) {
26            throw new RuntimeException(e);
27        }
28
29    }
30    private static String getMessage(String[] strings){
31        if (strings.length < 1)
32            return "Hello World!";
33        return joinStrings(strings, " ");
34    }
35
36    private static String joinStrings(String[] strings, String delimiter) {
37        int length = strings.length;
38        if (length == 0) return "";
39        StringBuilder words = new StringBuilder(strings[0]);
40        for (int i = 1; i < length; i++) {
41            words.append(delimiter).append(strings[i]);
42        }
43        return words.toString();
44    }
45    private static void doWork(String task) throws InterruptedException {
46        for (char ch: task.toCharArray()) {
47            if (ch == '.') Thread.sleep(1000);
48        }
49    }
50}
51
52在消费方,channel.basicConsume(BaseTest2.queueName,false, consumer);
53false是指不自动确认(自动确认的话,rabbit中存的消息会被消费掉。)
54为true时,会被自动消费,然后别人就收不到这条消息了。
55
56

修改下 生产者的代码:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
1 public static void main(String[] args) {
2        try(Connection connection=getConnection("guest","guest","localhost",5672,"/");
3            Channel channel=connection.createChannel()) {
4
5//            channel.exchangeDeclare("交换机","direct",true);
6             channel.queueDeclare(queueName,false,false,false,null);
7            String message=getMessage(new String[]{"a","b","c"});
8            int i=0;
9            while (true){
10                Thread.sleep(1000);
11                channel.basicPublish("",queueName,null,(i+++message).getBytes("utf-8"));
12            }
13
14//            Thread.sleep(2000);
15        } catch (IOException e) {
16            throw new RuntimeException(e);
17        } catch (TimeoutException e) {
18            throw new RuntimeException("连接超时"+e);
19        } catch (InterruptedException e) {
20            e.printStackTrace();
21        }
22    }
23

生产者循环生产新的消息

然后开2个消费者,运行结果如下:


1
2
3
4
5
6
110a b c [x] Done
212a b c [x] Done
314a b c [x] Done
416a b c [x] Done
518a b c [x] Done
6

另一个消费者:


1
2
3
4
5
111a b c [x] Done
213a b c [x] Done
315a b c [x] Done
417a b c [x] Done
5

可以看出,是rabbit交替发送的。每个消费者获取的消息是平均的 ,这也叫循环。

给TA打赏
共{{data.count}}人
人已打赏
安全网络

CDN安全市场到2022年价值76.3亿美元

2018-2-1 18:02:50

安全技术

Netty游戏服务器开发实战(14):游戏推送的设计

2022-1-11 12:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索