目录
jar包
消息类型
简单模型Hello Word
工作队列模式Work Queue
Round-robin (轮询分发)
Fair dispatch (公平分发)
订阅 模式 Publish/Subscribe
路由模式
通配符模式Topic
RabbitMQ 之消息确认机制(事务+Confirm )
概述
事务机制
Confirm 模式
jar包
1
2
3
4
5
6 1<dependency>
2 <groupId>com.rabbitmq</groupId>
3 <artifactId>amqp-client</artifactId>
4 <version>4.0.2</version>
5</dependency>
6
消息类型
简单模型Hello Word
简单队列 生产消费一一对应,耦合性高.
生产消息快,消费消息慢.如果很多容易堵
功能:一个生产者
P
发送消息到队列
Q,
一个消费者
C
接收
** 生产者实现思路:**
创建连接工厂ConnectionFactory,设置服务地址127.0.0.1,端口号5672,设置用户名、密码、virtual host,从连接工 厂中获取连接connection,使用连接创建通道channel,使用通道channel创建队列queue,使用通道channel向队列中发 送 消息,关闭通道和连接。
** 创建工厂**
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 1package rabbitMQ.com.rabbitMQ.util;
2
3import java.io.IOException;
4import java.util.concurrent.TimeoutException;
5import com.rabbitmq.client.ConnectionFactory;
6import com.rabbitmq.client.Connection;
7
8public class ConnectionUtils {
9 public static Connection getConnection() throws IOException, TimeoutException {
10 // 定义连接工厂
11 ConnectionFactory factory = new ConnectionFactory();
12 // 设置服务地址
13 factory.setHost("127.0.0.1");
14 // 端口
15 factory.setPort(5672);// amqp协议 端口 类似与mysql的3306
16 // 设置账号信息,用户名、密码、vhost
17 factory.setVirtualHost("/user");
18 factory.setUsername("user");
19 factory.setPassword("123");
20 // 通过工程获取连接
21 Connection connection = factory.newConnection();
22 return connection;
23 }
24}
25
26
生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39 1package rabbitMQ.com.rabbitMQ.simple;
2
3import java.io.IOException;
4import java.util.concurrent.TimeoutException;
5
6import com.rabbitmq.client.Channel;
7import com.rabbitmq.client.Connection;
8
9import rabbitMQ.com.rabbitMQ.util.ConnectionUtils;
10
11public class SendMQ {
12 private static final String QUEUE_NAME = "QUEUE_simple";
13
14 /*
15 * P----->|QUEUE |
16 */
17 public static void main(String[] args) throws IOException, TimeoutException {
18
19 // 获取一个连接
20 Connection connection = ConnectionUtils.getConnection();
21 //从连接中创建通道
22 Channel channel = connection.createChannel();
23 // 创建队列 (声明) 因为我们要往队列里面发送消息
24 boolean durable = false;//是否持久化
25 boolean exclusive = false;//独占的queue
26 boolean autoDelete = false;//不使用时是否自动删除
27 channel.queueDeclare(QUEUE_NAME, durable, exclusive, autoDelete, null);// 如果这个队列不存在,其实这句话是不需要的
28 String msg = "Hello Simple QUEUE !";
29 // 第一个参数是exchangeName(默认情况下代理服务器端是存在一个""名字的exchange的,
30 // 因此如果不创建exchange的话我们可以直接将该参数设置成"",如果创建了exchange的话
31 // 我们需要将该参数设置成创建的exchange的名字),第二个参数是路由键
32 channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
33 System.out.println("---------send ms :" + msg);
34 channel.close();
35 connection.close();
36 }
37}
38
39
消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36 1package rabbitMQ.com.rabbitMQ.simple;
2
3import java.io.IOException;
4
5import com.rabbitmq.client.AMQP.BasicProperties;
6import com.rabbitmq.client.Channel;
7import com.rabbitmq.client.Connection;
8import com.rabbitmq.client.DefaultConsumer;
9import com.rabbitmq.client.Envelope;
10
11import rabbitMQ.com.rabbitMQ.util.ConnectionUtils;
12
13public class Customer {
14 private static final String QUEUE_NAME = "QUEUE_simple";
15
16 public static void main(String[] args) throws Exception {
17 /* 获取一个连接 */
18 Connection connection = ConnectionUtils.getConnection();
19 Channel channel = connection.createChannel();
20 // 声明队列 如果能确定是哪一个队列 这边可以删掉,不去掉 这里会忽略创建
21 // channel.queueDeclare(QUEUE_NAME, false, false, false, null);
22 DefaultConsumer consumer = new DefaultConsumer(channel) {
23 // 获取到达的消息
24 @Override
25 public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
26 throws IOException {
27 String message = new String(body, "UTF-8");
28 System.out.println(" [x] Received '" + message + "'");
29 }
30 };
31 // 监听队列
32 channel.basicConsume(QUEUE_NAME, true, consumer);
33 }
34}
35
36
工作队列模式Work Queue
为什么 会出现 work queues?
前提: 使用 simple 队列 的时候
我们应用程序在是使用消息系统的时候,一般生产者 P 生产消息是毫不费力的(发送消息即可),而消费者接收完消息
后的需要处理,会耗费一定的时间,这时候,就有可能导致很多消息堆积在队列里面,一个消费者有可能不够用
那么怎么让消费者同事处理多个消息呢?
在同一个队列上创建多个消费者,让他们相互竞争,这样消费者就可以同时处理多条消息了
使用任务队列的优点之一就是可以轻易的并行工作。如果我们积压了好多工作,我们可以通过增加工作者(消费者)
来解决这一问题,使得系统的伸缩性更加容易。
Round-robin (轮询分发)
生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 1package rabbitMQ.com.rabbitMQ.work;
2
3import com.rabbitmq.client.Channel;
4import com.rabbitmq.client.Connection;
5
6import rabbitMQ.com.rabbitMQ.util.ConnectionUtils;
7
8public class SendMQ {
9 private final static String QUEUE_NAME = "test_queue_work";
10
11 public static void main(String[] argv) throws Exception {
12 // 获取到连接以及mq通道
13 Connection connection = ConnectionUtils.getConnection();
14 Channel channel = connection.createChannel();
15 // 声明队列
16 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
17 for (int i = 0; i < 50; i++) {
18 // 消息内容
19 String message = "." + i;
20 channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
21 System.out.println(" [x] Sent '" + message + "'");
22 Thread.sleep(i * 10);
23 }
24 channel.close();
25 connection.close();
26 }
27}
28
29
消费者1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48 1package rabbitMQ.com.rabbitMQ.work;
2
3import java.io.IOException;
4
5import com.rabbitmq.client.AMQP.BasicProperties;
6import com.rabbitmq.client.Channel;
7import com.rabbitmq.client.Connection;
8import com.rabbitmq.client.Consumer;
9import com.rabbitmq.client.DefaultConsumer;
10import com.rabbitmq.client.Envelope;
11
12import rabbitMQ.com.rabbitMQ.util.ConnectionUtils;
13
14public class Recv1 {
15 private final static String QUEUE_NAME = "test_queue_work";
16
17 public static void main(String[] args) throws Exception {
18 // 获取到连接以及mq通道
19 Connection connection = ConnectionUtils.getConnection();
20 final Channel channel = connection.createChannel();
21 // 声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
22 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
23 // 定义一个消息的消费者
24 final Consumer consumer = new DefaultConsumer(channel) {
25 @Override
26 public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
27 throws IOException {
28 String message = new String(body, "UTF-8");
29 System.out.println(" [1] Received '" + message + "'");
30 try {
31 doWork(message);
32 } catch (Exception e) {
33 e.printStackTrace();
34 } finally {
35 System.out.println(" [x] Done");
36 }
37 }
38 };
39 boolean autoAck = true; // 消息的确认模式自动应答
40 channel.basicConsume(QUEUE_NAME, autoAck, consumer);
41 }
42
43 private static void doWork(String task) throws InterruptedException {
44 Thread.sleep(1000);
45 }
46}
47
48
消费者2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48 1package rabbitMQ.com.rabbitMQ.work;
2
3import java.io.IOException;
4
5import com.rabbitmq.client.AMQP.BasicProperties;
6import com.rabbitmq.client.Channel;
7import com.rabbitmq.client.Connection;
8import com.rabbitmq.client.Consumer;
9import com.rabbitmq.client.DefaultConsumer;
10import com.rabbitmq.client.Envelope;
11
12import rabbitMQ.com.rabbitMQ.util.ConnectionUtils;
13
14public class Recv2 {
15 private final static String QUEUE_NAME = "test_queue_work";
16
17 public static void main(String[] args) throws Exception {
18 // 获取到连接以及mq通道
19 Connection connection = ConnectionUtils.getConnection();
20 final Channel channel = connection.createChannel();
21 // 声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
22 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
23 // 定义一个消息的消费者
24 final Consumer consumer = new DefaultConsumer(channel) {
25 @Override
26 public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
27 throws IOException {
28 String message = new String(body, "UTF-8");
29 System.out.println(" [2] Received '" + message + "'");
30 try {
31 doWork(message);
32 } catch (Exception e) {
33 e.printStackTrace();
34 } finally {
35 System.out.println(" [x] Done");
36 }
37 }
38 };
39 boolean autoAck = true; // 消息的确认模式自动应答
40 channel.basicConsume(QUEUE_NAME, autoAck, consumer);
41 }
42
43 private static void doWork(String task) throws InterruptedException {
44 Thread.sleep(2000);
45 }
46}
47
48
测试结果
备注:消费者 1 我们处理时间是 1s ;而消费者 2 中处理时间是 2s;
但是我们看到的现象并不是 1 处理的多 消费者 2 处理的少,
消费者 1 中将偶数部分处理掉了
[1] Received '.0' [x] Done [1] Received '.2' [x] Done [1] Received '.4' [x] Done [1] Received '.6'
消费者 2 中将偶数部分处理掉了
[2] Received '.1' [x] Done [2] Received '.3' [x] Done [2] Received '.5' [x] Done …… .. . . .
1.消费者 1 和消费者 2 获取到的消息内容是不同的,同一个消息只能被一个消费者获取
2.消费者 1 和消费者 2 货到的消息数量是一样的 一个奇数一个偶数
按道理消费者 1 获取的比消费者 2 要多
这种方式叫做 轮询分发 结果就是不管谁忙或清闲,都不会给谁多一个任务或少一个任务,任务总是你一个我一个
的分
Fair dispatch (公平分发)
虽然上面的分配法方式也还行,但是有个问题就是:比如:现在有 2 个消费者,所有的偶数的消息都是繁忙的,而奇数则是轻松的。按照轮询的方式,偶数的任务交给了第一个消费者,所以一直在忙个不停。奇数的任务交给另一个消费者,则立即完成任务,然后闲得不行。而 RabbitMQ 则是不了解这些的。他是不知道你消费者的消费能力的,这是因为当消息进入队列RabbitMQ 就会分派消息。而 rabbitmq 只是盲目的将消息轮询的发给消费者。你一个我一个的这样发送.
为了解决这个问题,我们使用basicQos( prefetchCount = 1)
方法,来限制
RabbitMQ
只发不超过
1
条的消息给同一个消费者。当消息处理完毕后,有了反馈 ack
才会进行第二次发送。
(
也就是说需要手动反馈给
Rabbitmq )
生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36 1package rabbitMQ.com.rabbitMQ.work.fair;
2
3import com.rabbitmq.client.Channel;
4import com.rabbitmq.client.Connection;
5
6import rabbitMQ.com.rabbitMQ.util.ConnectionUtils;
7
8public class SendMQ {
9 private final static String QUEUE_NAME = "test_queue_work";
10
11 public static void main(String[] argv) throws Exception {
12 // 获取到连接以及mq通道
13 Connection connection = ConnectionUtils.getConnection();
14 // 创建一个频道
15 Channel channel = connection.createChannel();
16 // 指定一个队列
17 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
18 int prefetchCount = 1;
19 // 每个消费者发送确认信号之前,消息队列不发送下一个消息过来,一次只处理一个消息
20 // 限制发给同一个消费者不得超过1条消息
21 channel.basicQos(prefetchCount);
22 // 发送的消息
23 for (int i = 0; i < 50; i++) {
24 String message = "." + i;
25 // 往队列中发出一条消息
26 channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
27 System.out.println(" [x] Sent '" + message + "'");
28 Thread.sleep(i * 10);
29 }
30 // 关闭频道和连接
31 channel.close();
32 connection.close();
33 }
34}
35
36
消费者1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50 1package rabbitMQ.com.rabbitMQ.work.fair;
2
3import java.io.IOException;
4
5import com.rabbitmq.client.AMQP.BasicProperties;
6import com.rabbitmq.client.Channel;
7import com.rabbitmq.client.Connection;
8import com.rabbitmq.client.Consumer;
9import com.rabbitmq.client.DefaultConsumer;
10import com.rabbitmq.client.Envelope;
11
12import rabbitMQ.com.rabbitMQ.util.ConnectionUtils;
13
14public class Recv1 {
15 private final static String QUEUE_NAME = "test_queue_work";
16
17 public static void main(String[] args) throws Exception {
18 // 获取到连接以及mq通道
19 Connection connection = ConnectionUtils.getConnection();
20 final Channel channel = connection.createChannel();
21 // 声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
22 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
23 channel.basicQos(1);// 保证一次只分发一个
24 // 定义一个消息的消费者
25 final Consumer consumer = new DefaultConsumer(channel) {
26 @Override
27 public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
28 throws IOException {
29 String message = new String(body, "UTF-8");
30 System.out.println(" [1] Received '" + message + "'");
31 try {
32 doWork(message);
33 } catch (Exception e) {
34 e.printStackTrace();
35 } finally {
36 System.out.println(" [x] Done");
37 channel.basicAck(envelope.getDeliveryTag(), false);
38 }
39 }
40 };
41 boolean autoAck = false; // 手动确认消息
42 channel.basicConsume(QUEUE_NAME, autoAck, consumer);
43 }
44
45 private static void doWork(String task) throws InterruptedException {
46 Thread.sleep(1000);
47 }
48}
49
50
消费者2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50 1package rabbitMQ.com.rabbitMQ.work.fair;
2
3import java.io.IOException;
4
5import com.rabbitmq.client.AMQP.BasicProperties;
6import com.rabbitmq.client.Channel;
7import com.rabbitmq.client.Connection;
8import com.rabbitmq.client.Consumer;
9import com.rabbitmq.client.DefaultConsumer;
10import com.rabbitmq.client.Envelope;
11
12import rabbitMQ.com.rabbitMQ.util.ConnectionUtils;
13
14public class Recv2 {
15 private final static String QUEUE_NAME = "test_queue_work";
16
17 public static void main(String[] args) throws Exception {
18 // 获取到连接以及mq通道
19 Connection connection = ConnectionUtils.getConnection();
20 final Channel channel = connection.createChannel();
21 // 声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
22 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
23 channel.basicQos(1);// 保证一次只分发一个
24 // 定义一个消息的消费者
25 final Consumer consumer = new DefaultConsumer(channel) {
26 @Override
27 public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
28 throws IOException {
29 String message = new String(body, "UTF-8");
30 System.out.println(" [2] Received '" + message + "'");
31 try {
32 doWork(message);
33 } catch (Exception e) {
34 e.printStackTrace();
35 } finally {
36 System.out.println(" [x] Done");
37 channel.basicAck(envelope.getDeliveryTag(), false);
38 }
39 }
40 };
41 boolean autoAck = false; // 手动确认消息
42 channel.basicConsume(QUEUE_NAME, autoAck, consumer);
43 }
44
45 private static void doWork(String task) throws InterruptedException {
46 Thread.sleep(2000);
47 }
48}
49
50
测试结果
这时候 现象就是 消费者 者 1 速度 大于消费者 2
boolean autoAck = true ;( 自动 确认 模式) 一旦 RabbitMQ 将消息分发给了消费者,就会从内存中删除。在这种情况下,如果杀死正在执行任务的消费者,会丢失正在处理的消息,也会丢失已经分发给这个消费者但尚未处理的消息。
boolean autoAck = false ; (手动确认模式) 我们不想丢失任何任务,如果有一个消费者挂掉了,那么我们应该将分发给它的任务交付给另一个消费者去处理。 为了确保消息不会丢失,RabbitMQ 支持消息应答。消费者发送一个消息应答,告诉 RabbitMQ 这个消息已经接收并且处理完毕了。RabbitMQ 可以删除它了。
消息应答是默认打开的。也就是 boolean autoAck = false
订阅 模式 Publish/Subscribe
解读:
1、1 个生产者,多个消费者
2、每一个消费者都有自己的一个队列
3、生产者没有将消息直接发送到队列,而是发送到了交换机(转发器)
4、每个队列都要绑定到交换机
5、生产者发送的消息,经过交换机,到达队列,实现,一个消息被多个消费者获取的目的
6、
使用通道****channel
创建交换机并指定交换机类型为
fanout
生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 1package rabbitMQ.com.rabbitMQ.publish;
2
3import com.rabbitmq.client.Channel;
4import com.rabbitmq.client.Connection;
5
6import rabbitMQ.com.rabbitMQ.util.ConnectionUtils;
7
8public class SendMQ {
9 private final static String EXCHANGE_NAME = "test_exchange_fanout";
10
11 public static void main(String[] argv) throws Exception {
12 // 获取到连接以及mq通道
13 Connection connection = ConnectionUtils.getConnection();
14 Channel channel = connection.createChannel();
15 // 声明exchange 交换机 转发器
16 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // fanout 分裂
17 // 消息内容
18 String message = "Hello PB";
19 channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
20 System.out.println(" [x] Sent '" + message + "'");
21 channel.close();
22 connection.close();
23 }
24}
25
26
消费者1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53 1package rabbitMQ.com.rabbitMQ.publish;
2
3import java.io.IOException;
4
5import com.rabbitmq.client.AMQP.BasicProperties;
6import com.rabbitmq.client.Channel;
7import com.rabbitmq.client.Connection;
8import com.rabbitmq.client.Consumer;
9import com.rabbitmq.client.DefaultConsumer;
10import com.rabbitmq.client.Envelope;
11
12import rabbitMQ.com.rabbitMQ.util.ConnectionUtils;
13
14public class Recv1 {
15 private final static String QUEUE_NAME = "test_queue_fanout_1";
16 private final static String EXCHANGE_NAME = "test_exchange_fanout";
17
18 public static void main(String[] argv) throws Exception {
19 // 获取到连接以及mq通道
20 Connection connection = ConnectionUtils.getConnection();
21 final Channel channel = connection.createChannel();
22 // 声明队列
23 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
24 // 绑定队列到交换机
25 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
26 // ------------下面逻辑和work模式一样-----
27 // 同一时刻服务器只会发一条消息给消费者
28 channel.basicQos(1);
29 // 定义一个消费者
30 Consumer consumer = new DefaultConsumer(channel) {
31 // 消息到达 触发这个方法
32 @Override
33 public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
34 throws IOException {
35 String msg = new String(body, "utf-8");
36 System.out.println("[1] Recv msg:" + msg);
37 try {
38 Thread.sleep(1000);
39 } catch (InterruptedException e) {
40 e.printStackTrace();
41 } finally {
42 System.out.println("[1] done ");
43 // 手动回执
44 channel.basicAck(envelope.getDeliveryTag(), false);
45 }
46 }
47 };
48 boolean autoAck = false;
49 channel.basicConsume(QUEUE_NAME, autoAck, consumer);
50 }
51}
52
53
消费者2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53 1package rabbitMQ.com.rabbitMQ.publish;
2
3import java.io.IOException;
4
5import com.rabbitmq.client.AMQP.BasicProperties;
6import com.rabbitmq.client.Channel;
7import com.rabbitmq.client.Connection;
8import com.rabbitmq.client.Consumer;
9import com.rabbitmq.client.DefaultConsumer;
10import com.rabbitmq.client.Envelope;
11
12import rabbitMQ.com.rabbitMQ.util.ConnectionUtils;
13
14public class Recv2 {
15 private final static String QUEUE_NAME = "test_queue_fanout_2";
16 private final static String EXCHANGE_NAME = "test_exchange_fanout";
17
18 public static void main(String[] argv) throws Exception {
19 // 获取到连接以及mq通道
20 Connection connection = ConnectionUtils.getConnection();
21 final Channel channel = connection.createChannel();
22 // 声明队列
23 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
24 // 绑定队列到交换机
25 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
26 // ------------下面逻辑和work模式一样-----
27 // 同一时刻服务器只会发一条消息给消费者
28 channel.basicQos(1);
29 // 定义一个消费者
30 Consumer consumer = new DefaultConsumer(channel) {
31 // 消息到达 触发这个方法
32 @Override
33 public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
34 throws IOException {
35 String msg = new String(body, "utf-8");
36 System.out.println("[2] Recv msg:" + msg);
37 try {
38 Thread.sleep(2000);
39 } catch (InterruptedException e) {
40 e.printStackTrace();
41 } finally {
42 System.out.println("[2] done ");
43 // 手动回执
44 channel.basicAck(envelope.getDeliveryTag(), false);
45 }
46 }
47 };
48 boolean autoAck = false;
49 channel.basicConsume(QUEUE_NAME, autoAck, consumer);
50 }
51}
52
53
测试结果
一个消息 可以被多个消费者获取
路由模式
说明:生产者发送消息到交换机并且要指定路由
key
,消费者将队列绑定到交换机时需要指定路由
key使用通道****channel
创建交换机并指定交换机类型为
direct
生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 1package rabbitMQ.com.rabbitMQ.route;
2
3import com.rabbitmq.client.Channel;
4import com.rabbitmq.client.Connection;
5
6import rabbitMQ.com.rabbitMQ.util.ConnectionUtils;
7
8public class SendMQ {
9 private final static String EXCHANGE_NAME = "test_exchange_direct";
10
11 public static void main(String[] argv) throws Exception {
12 // 获取到连接以及mq通道
13 Connection connection = ConnectionUtils.getConnection();
14 Channel channel = connection.createChannel();
15 // 声明exchange
16 channel.exchangeDeclare(EXCHANGE_NAME, "direct");
17 // 消息内容
18 String message = "id=1001的商品删除了";
19 channel.basicPublish(EXCHANGE_NAME, "delete", null, message.getBytes());
20 System.out.println(" [x] Sent '" + message + "'");
21 channel.close();
22 connection.close();
23 }
24}
25
26
消费者1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52 1package rabbitMQ.com.rabbitMQ.route;
2
3import java.io.IOException;
4
5import com.rabbitmq.client.AMQP.BasicProperties;
6import com.rabbitmq.client.Channel;
7import com.rabbitmq.client.Connection;
8import com.rabbitmq.client.Consumer;
9import com.rabbitmq.client.DefaultConsumer;
10import com.rabbitmq.client.Envelope;
11
12import rabbitMQ.com.rabbitMQ.util.ConnectionUtils;
13
14public class Recv1 {
15 private final static String QUEUE_NAME = "test_queue_direct_1";
16 private final static String EXCHANGE_NAME = "test_exchange_direct";
17
18 public static void main(String[] argv) throws Exception {
19 // 获取到连接以及mq通道
20 Connection connection = ConnectionUtils.getConnection();
21 final Channel channel = connection.createChannel();
22 // 声明队列
23 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
24 // 绑定队列到交换机
25 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
26 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
27 // 同一时刻服务器只会发一条消息给消费者
28 channel.basicQos(1);
29 Consumer consumer = new DefaultConsumer(channel) {
30 // 消息到达 触发这个方法
31 @Override
32 public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
33 throws IOException {
34 String msg = new String(body, "utf-8");
35 System.out.println("[1] Recv msg:" + msg);
36 try {
37 Thread.sleep(1000);
38 } catch (InterruptedException e) {
39 e.printStackTrace();
40 } finally {
41 System.out.println("[1] done ");
42 // 手动回执
43 channel.basicAck(envelope.getDeliveryTag(), false);
44 }
45 }
46 };
47 boolean autoAck = false;
48 channel.basicConsume(QUEUE_NAME, autoAck, consumer);
49 }
50}
51
52
消费者2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51 1package rabbitMQ.com.rabbitMQ.route;
2
3import java.io.IOException;
4
5import com.rabbitmq.client.AMQP.BasicProperties;
6import com.rabbitmq.client.Channel;
7import com.rabbitmq.client.Connection;
8import com.rabbitmq.client.Consumer;
9import com.rabbitmq.client.DefaultConsumer;
10import com.rabbitmq.client.Envelope;
11
12import rabbitMQ.com.rabbitMQ.util.ConnectionUtils;
13
14public class Recv2 {
15 private final static String QUEUE_NAME = "test_queue_direct_2";
16 private final static String EXCHANGE_NAME = "order-exchanges";
17
18 public static void main(String[] argv) throws Exception {
19 // 获取到连接以及mq通道
20 Connection connection = ConnectionUtils.getConnection();
21 final Channel channel = connection.createChannel();
22 // 声明队列
23 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
24 // 绑定队列到交换机
25 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "add");
26 // 同一时刻服务器只会发一条消息给消费者
27 channel.basicQos(1);
28 Consumer consumer = new DefaultConsumer(channel) {
29 // 消息到达 触发这个方法
30 @Override
31 public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
32 throws IOException {
33 String msg = new String(body, "utf-8");
34 System.out.println("[2] Recv msg:" + msg);
35 try {
36 Thread.sleep(1000);
37 } catch (InterruptedException e) {
38 e.printStackTrace();
39 } finally {
40 System.out.println("[2] done ");
41 // 手动回执
42 channel.basicAck(envelope.getDeliveryTag(), false);
43 }
44 }
45 };
46 boolean autoAck = false;
47 channel.basicConsume(QUEUE_NAME, autoAck, consumer);
48 }
49}
50
51
通配符模式Topic
说明:生产者P发送消息到交换机X,type=topic,交换机根据绑定队列的routing key的值进行通配符匹配;
符号#:匹配一个或者多个词 lazy.# 可以匹配 lazy.irs或者lazy.irs.cor
符号*:只能匹配一个词 lazy.* 可以匹配 lazy.irs或者lazy.cor使用通道****channel
创建交换机并指定交换机类型为topic
生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 1package rabbitMQ.com.rabbitMQ.topic;
2
3import com.rabbitmq.client.Channel;
4import com.rabbitmq.client.Connection;
5
6import rabbitMQ.com.rabbitMQ.util.ConnectionUtils;
7
8public class SendMQ {
9 private final static String EXCHANGE_NAME = "test_exchange_topic";
10
11 public static void main(String[] argv) throws Exception {
12 // 获取到连接以及mq通道
13 Connection connection = ConnectionUtils.getConnection();
14 Channel channel = connection.createChannel();
15 // 声明exchange
16 channel.exchangeDeclare(EXCHANGE_NAME, "topic");
17 // 消息内容
18 String message = "id=1001";
19 channel.basicPublish(EXCHANGE_NAME, "item.delete", null, message.getBytes());
20 System.out.println(" [x] Sent '" + message + "'");
21 channel.close();
22 connection.close();
23 }
24}
25
26
消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55 1package rabbitMQ.com.rabbitMQ.topic;
2
3import java.io.IOException;
4
5import com.rabbitmq.client.AMQP.BasicProperties;
6import com.rabbitmq.client.Channel;
7import com.rabbitmq.client.Connection;
8import com.rabbitmq.client.Consumer;
9import com.rabbitmq.client.DefaultConsumer;
10import com.rabbitmq.client.Envelope;
11
12import rabbitMQ.com.rabbitMQ.util.ConnectionUtils;
13
14public class Recv1 {
15 private final static String QUEUE_NAME = "test_queue_topic_1";
16 private final static String EXCHANGE_NAME = "test_exchange_topic";
17
18 public static void main(String[] argv) throws Exception {
19 // 获取到连接以及mq通道
20 Connection connection = ConnectionUtils.getConnection();
21 final Channel channel = connection.createChannel();
22 // 声明队列
23 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
24 // 绑定队列到交换机
25 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.*");
26
27 // 同一时刻服务器只会发一条消息给消费者
28 channel.basicQos(1);
29 // 定义队列的消费者
30 Consumer consumer = new DefaultConsumer(channel) {
31 // 消息到达 触发这个方法
32 @Override
33 public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
34 throws IOException {
35 String msg = new String(body, "utf-8");
36 System.out.println("[1] Recv msg:" + msg);
37 try {
38 Thread.sleep(1000);
39 } catch (InterruptedException e) {
40 e.printStackTrace();
41 } finally {
42 System.out.println("[1] done ");
43 // 手动回执
44 channel.basicAck(envelope.getDeliveryTag(), false);
45 }
46 }
47 };
48 boolean autoAck = false;
49 channel.basicConsume(QUEUE_NAME, autoAck, consumer);
50 }
51}
52
53
54
55
RabbitMQ 之消息确认机制(事务+Confirm )
概述
在 Rabbitmq 中我们可以通过持久化来解决因为服务器异常而导致丢失的问题,除此之外我们还会遇到一个问题:
生产者将消息发送出去之后
,
消息到底有没有正确到达
Rabbit
服务器呢?如果不错得数处理,我们是不知道的,(即 Rabbit 服务器不会反馈任何消息给生产者),也就是默认的情况下是不知道消息有没有正确到达;导致 的问题:消息到达服务器之前丢失,那么持久化也不能解决此问题,因为消息根本就没有到达 Rabbit 服务器!
RabbitMQ 为我们 提供了两种方式:
通过 AMQP 事务机制实现,这也是 AMQP 协议层面提供的解决方案;
通过将 channel 设置成 confirm 模式来实现;
事务机制
RabbitMQ 中与事务机制有关的方法有三个:
txSelect(), txCommit()
以及
txRollback(), txSelect 用于将当前 channel 设置成 transaction 模式,txCommit 用于提交事务,txRollback 用于回滚事务,在通过 txSelect 开启事务之后,我们便可以发布消息给 broker 代理服务器了,如果 txCommit 提交成功了,则消息一定到达了 broker 了,如果在 txCommit执行之前 broker 异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过 txRollback 回滚事务了。关键代码
channel.txSelect();
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
channel.txCommit();
生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39 1package rabbitMQ.com.rabbitMQ.transaction;
2
3import java.io.IOException;
4import java.util.concurrent.TimeoutException;
5
6import org.junit.Test;
7
8import com.rabbitmq.client.Channel;
9import com.rabbitmq.client.Connection;
10
11import rabbitMQ.com.rabbitMQ.util.ConnectionUtils;
12
13public class SendMQ {
14 private static final String QUEUE_NAME = "QUEUE_simple";
15
16 public static void main(String[] args) throws IOException, TimeoutException {
17
18 /* 获取一个连接 */
19 Connection connection = ConnectionUtils.getConnection();
20 /* 从连接中创建通道 */
21 Channel channel = connection.createChannel();
22 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
23 String msg = "Hello Simple QUEUE !";
24 try {
25 channel.txSelect();
26 channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
27 channel.txCommit();
28 } catch (Exception e) {
29 channel.txRollback();
30 System.out.println("----msg rollabck ");
31 } finally {
32 System.out.println("---------send msg over:" + msg);
33 }
34 channel.close();
35 connection.close();
36 }
37}
38
39
此种模式还是很耗时的,采用这种方式 降低了 Rabbitmq 的消息吞吐量
Confirm 模式
概述
上面我们介绍了 RabbitMQ 可能会遇到的一个问题,即生成者不知道消息是否真正到达 broker,随后通过 AMQP 协议层面为我们提供了事务机制解决了这个问题, 但是采用事务机制实现会降低RabbitMQ 的消息吞吐量,那么有没有更加高效的解决方式呢?答案是采用 Confirm 模式。
**producer ****端 confirm **模式的实现原理
生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出,
broker 回传给生产者的确认消息中 deliver-tag 域包含了确认消息的序列号,此外 broker 也可以设置 basic.
ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息。开启 confirm 模式的方法
已经在 transaction 事务模式的 channel 是不能再设置成 confirm 模式的,即这两种模式是不能共存的。
生产者通过调用
channel
的
confirmSelect
方法将
channel
设置为
confirm
模式核心代码:
//生产者通过调用channel的confirmSelect方法将channel设置为confirm模式
channel.confirmSelect();
**编程 **模式
普通 confirm 模式:每发送一条消息后,调用 waitForConfirms()方法,等待服务器端confirm。实际上是一种串行 confirm 了。
批量 confirm 模式:每发送一批消息后,调用 waitForConfirms()方法,等待服务器端confirm。
异步 confirm 模式:提供一个回调方法,服务端 confirm 了一条或者多条消息后Client 端会回调这个方法。
普通 confirm
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36 1package rabbitMQ.com.rabbitMQ.confirm;
2
3import java.io.IOException;
4import java.util.concurrent.TimeoutException;
5
6import org.junit.Test;
7
8import com.rabbitmq.client.Channel;
9import com.rabbitmq.client.Connection;
10
11import rabbitMQ.com.rabbitMQ.util.ConnectionUtils;
12
13public class SendConfirm {
14 private static final String QUEUE_NAME = "QUEUE_simple_confirm";
15
16 public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
17 /* 获取一个连接 */
18 Connection connection = ConnectionUtils.getConnection();
19 /* 从连接中创建通道 */
20 Channel channel = connection.createChannel();
21 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
22 // 生产者通过调用channel的confirmSelect方法将channel设置为confirm模式
23 channel.confirmSelect();
24 String msg = "Hello QUEUE !";
25 channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
26 if (!channel.waitForConfirms()) {
27 System.out.println("send message failed.");
28 } else {
29 System.out.println(" send messgae ok ...");
30 }
31 channel.close();
32 connection.close();
33 }
34}
35
36
** 批量 confirm 模式**
批量 confirm 模式稍微复杂一点,客户端程序需要定期(每隔多少秒)或者定量(达到多少条)或者两则结合起来publish 消息,然后等待服务器端 confirm, 相比普通 confirm 模式,批量极大提升 confirm 效率,但是问题在于一旦出现****confirm 返回 false 或者超时的情况时,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息数量,并且,当消息经常丢失时,批量****confirm 性能应该是不升反降的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39 1package rabbitMQ.com.rabbitMQ.confirm;
2
3import java.io.IOException;
4import java.util.concurrent.TimeoutException;
5
6import org.junit.Test;
7
8import com.rabbitmq.client.Channel;
9import com.rabbitmq.client.Connection;
10
11import rabbitMQ.com.rabbitMQ.util.ConnectionUtils;
12
13public class SendbatchConfirm {
14 private static final String QUEUE_NAME = "QUEUE_simple_confirm";
15
16 public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
17
18 /* 获取一个连接 */
19 Connection connection = ConnectionUtils.getConnection();
20 /* 从连接中创建通道 */
21 Channel channel = connection.createChannel();
22 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
23 // 生产者通过调用channel的confirmSelect方法将channel设置为confirm模式
24 channel.confirmSelect();
25 String msg = "Hello QUEUE !";
26 for (int i = 0; i < 10; i++) {
27 channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
28 }
29 if (!channel.waitForConfirms()) {
30 System.out.println("send message failed.");
31 } else {
32 System.out.println(" send messgae ok ...");
33 }
34 channel.close();
35 connection.close();
36 }
37}
38
39
异步 confirm 模式
Channel 对象提供的 ConfirmListener()回调方法只包含 deliveryTag(当前 Chanel 发出的消息序号),我们需要自己为每一个 Channel 维护一个 unconfirm 的消息序号集合,每 publish 一条数据,集合中元素加 1,每回调一次 handleAck方法,unconfirm 集合删掉相应的一条(multiple=false**)或多条(multiple=true****)记录。从程序运行效**率上看,这个unconfirm 集合最好采用有序集合 SortedSet 存储结构。实际上,SDK 中的 waitForConfirms()方法也是通过 SortedSet维护消息序号的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57 1package rabbitMQ.com.rabbitMQ.confirm;
2
3import java.io.IOException;
4import java.util.Collections;
5import java.util.SortedSet;
6import java.util.TreeSet;
7import java.util.concurrent.TimeoutException;
8
9import com.rabbitmq.client.Channel;
10import com.rabbitmq.client.ConfirmListener;
11import com.rabbitmq.client.Connection;
12
13import rabbitMQ.com.rabbitMQ.util.ConnectionUtils;
14
15public class SendAync {
16 private static final String QUEUE_NAME = "QUEUE_simple_confirm_aync";
17
18 public static void main(String[] args) throws IOException, TimeoutException {
19 /* 获取一个连接 */
20 Connection connection = ConnectionUtils.getConnection();
21 /* 从连接中创建通道 */
22 Channel channel = connection.createChannel();
23 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
24 // 生产者通过调用channel的confirmSelect方法将channel设置为confirm模式
25 channel.confirmSelect();
26 final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
27 channel.addConfirmListener(new ConfirmListener() {
28 // 每回调一次handleAck方法,unconfirm集合删掉相应的一条(multiple=false)或多条(multiple=true)记录。
29 public void handleAck(long deliveryTag, boolean multiple) throws IOException {
30 if (multiple) {
31 System.out.println("--multiple--");
32 confirmSet.headSet(deliveryTag + 1).clear();// 用一个SortedSet, 返回此有序集合中小于end的所有元素。
33 } else {
34 System.out.println("--multiple false--");
35 confirmSet.remove(deliveryTag);
36 }
37 }
38
39 public void handleNack(long deliveryTag, boolean multiple) throws IOException {
40 System.out.println("Nack, SeqNo: " + deliveryTag + ", multiple:" + multiple);
41 if (multiple) {
42 confirmSet.headSet(deliveryTag + 1).clear();
43 } else {
44 confirmSet.remove(deliveryTag);
45 }
46 }
47 });
48 String msg = "Hello QUEUE !";
49 while (true) {
50 long nextSeqNo = channel.getNextPublishSeqNo();
51 channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
52 confirmSet.add(nextSeqNo);
53 }
54 }
55}
56
57