释放双眼,带上耳机,听听看~!
发送和接收信息
发送方:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66 1public class BaseTest2 {
2 public static String queueName="q1";
3
4 public static ConnectionFactory factory;
5 public static Connection getConnection(String username,String password,String host,int port,String virtualhost) {
6 if (factory==null){
7 factory=getFactory(username,password,host,port,virtualhost);
8 }
9 try {
10 return factory.newConnection();
11 } catch (Exception e) {
12 throw new RuntimeException("创建rabbit client 连接失败"+e);
13 }
14 }
15
16 private synchronized static ConnectionFactory getFactory(String username, String password, String host, int port, String virtualhost) {
17 if (factory!=null)
18 return factory;
19 ConnectionFactory factory=new ConnectionFactory();
20 factory.setUsername(username);
21 factory.setPassword(password);
22 factory.setVirtualHost(virtualhost);
23 factory.setHost(host);
24 factory.setPort(port);
25
26 return factory;
27 }
28 private static String getMessage(String[] strings){
29 if (strings.length < 1)
30 return "Hello World!";
31 return joinStrings(strings, " ");
32 }
33
34 private static String joinStrings(String[] strings, String delimiter) {
35 int length = strings.length;
36 if (length == 0) return "";
37 StringBuilder words = new StringBuilder(strings[0]);
38 for (int i = 1; i < length; i++) {
39 words.append(delimiter).append(strings[i]);
40 }
41 return words.toString();
42 }
43 private static void doWork(String task) throws InterruptedException {
44 for (char ch: task.toCharArray()) {
45 if (ch == '.') Thread.sleep(1000);
46 }
47 }
48 public static void main(String[] args) {
49 try(Connection connection=getConnection("guest","guest","localhost",5672,"/");
50 Channel channel=connection.createChannel()) {
51
52// channel.exchangeDeclare("交换机","direct",true);
53 channel.queueDeclare(queueName,false,false,false,null);
54
55 String message=getMessage(new String[]{"a","b","c"});
56 channel.basicPublish("",queueName,null,message.getBytes("utf-8"));
57
58// Thread.sleep(2000);
59 } catch (IOException e) {
60 throw new RuntimeException(e);
61 } catch (TimeoutException e) {
62 throw new RuntimeException("连接超时"+e);
63 }
64 }
65}
66
接收方:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56 1public class BasicTest3 {
2 public static void main(String[] args) {
3 try(Connection connection=BaseTest2.getConnection("guest","guest","localhost",5672,"/");
4 Channel channel=connection.createChannel()) {
5
6// channel.exchangeDeclare("交换机","direct",true);
7 channel.queueDeclare(BaseTest2.queueName,false,false,false,null);
8
9 Consumer consumer=new DefaultConsumer(channel) {
10 @Override
11 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
12 String message = new String(body, "UTF-8");
13 try {
14 doWork(message);
15 } catch (InterruptedException e) {
16 e.printStackTrace();
17 } finally {
18 System.out.println(message+" [x] Done");
19 }
20// channel.basicAck(envelope.getDeliveryTag(),false);
21 }
22 };
23 channel.basicConsume(BaseTest2.queueName,false, consumer);
24 Thread.sleep(2000);
25 } catch (Exception e) {
26 throw new RuntimeException(e);
27 }
28
29 }
30 private static String getMessage(String[] strings){
31 if (strings.length < 1)
32 return "Hello World!";
33 return joinStrings(strings, " ");
34 }
35
36 private static String joinStrings(String[] strings, String delimiter) {
37 int length = strings.length;
38 if (length == 0) return "";
39 StringBuilder words = new StringBuilder(strings[0]);
40 for (int i = 1; i < length; i++) {
41 words.append(delimiter).append(strings[i]);
42 }
43 return words.toString();
44 }
45 private static void doWork(String task) throws InterruptedException {
46 for (char ch: task.toCharArray()) {
47 if (ch == '.') Thread.sleep(1000);
48 }
49 }
50}
51
52在消费方,channel.basicConsume(BaseTest2.queueName,false, consumer);
53false是指不自动确认(自动确认的话,rabbit中存的消息会被消费掉。)
54为true时,会被自动消费,然后别人就收不到这条消息了。
55
56
修改下 生产者的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 1 public static void main(String[] args) {
2 try(Connection connection=getConnection("guest","guest","localhost",5672,"/");
3 Channel channel=connection.createChannel()) {
4
5// channel.exchangeDeclare("交换机","direct",true);
6 channel.queueDeclare(queueName,false,false,false,null);
7 String message=getMessage(new String[]{"a","b","c"});
8 int i=0;
9 while (true){
10 Thread.sleep(1000);
11 channel.basicPublish("",queueName,null,(i+++message).getBytes("utf-8"));
12 }
13
14// Thread.sleep(2000);
15 } catch (IOException e) {
16 throw new RuntimeException(e);
17 } catch (TimeoutException e) {
18 throw new RuntimeException("连接超时"+e);
19 } catch (InterruptedException e) {
20 e.printStackTrace();
21 }
22 }
23
生产者循环生产新的消息
然后开2个消费者,运行结果如下:
1
2
3
4
5
6 110a b c [x] Done
212a b c [x] Done
314a b c [x] Done
416a b c [x] Done
518a b c [x] Done
6
另一个消费者:
1
2
3
4
5 111a b c [x] Done
213a b c [x] Done
315a b c [x] Done
417a b c [x] Done
5
可以看出,是rabbit交替发送的。每个消费者获取的消息是平均的 ,这也叫循环。