使用RabbitMQ进行RPC的必要
常见的RPC方法/协议包括CORBA,Java RMI,Web Service,Hessian,Thrift及Rest API,相对于前面提到的RPC方式,使用RabbbitMQ(JMS也是一样)方式需要在client-service provider中间增加MQ组件,这样做增加了部署的复杂性,但同时带来额外的好处。
- 可以对service provider进行保护,MQ对请求进行缓冲,处理不了的请求可以被MQ抛弃而不会压垮service provider。
- 可以隔离低安全区对高安全区的访问,此优点是其他rpc没有的。
一个典型的互联网访问方式如下(FW表示防火墙)
client—-FW1–>frontend server(DMZ)—–FW2—->service provider(高安全区)
通过MQ进行RPC则变成
client—-FW1–>frontend server(DMZ)—>MQ<–FW2—-service provider(高安全区)
这里一个重要的区别是service provider(高安全区)是主动访问位于DMZ区域的MQ,由此可以设定FW2只允许高安全区访问DMZ,而禁止DMZ访问高安全区。此方式对于安全性要求高的如银行,金融,政府系统尤为重要。
实现spring remoting实现使用MQ的RPC
原理
一个RPC交互大致分为以下几个步骤:
- 服务端监听MQ队列
- 客户端将调用请求(调用请求包括结果返回队列)发送到队列中。
- 客户端在结果返回队列监听。
- 服务端处理业务,将结果发送到结果返回队列。
公共内容
定义RabbitMQ的连接,定义service接口,进行通讯的queue。
1
2
3
4 1public interface Service {
2 String sayHello(String name);
3}
4
服务端实现
配置文件内容
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44 1<?xml version="1.0" encoding="UTF-8"?>
2<beans xmlns="http://www.springframework.org/schema/beans"
3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
4 xmlns:lang="http://www.springframework.org/schema/lang" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
5 xmlns:util="http://www.springframework.org/schema/util"
6 xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
7 http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
8 http://www.springframework.org/schema/lang http://www.springframework.org/schema/lang/spring-lang-3.0.xsd
9 http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd
10 http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd">
11 <!--MQ连接-->
12 <rabbit:connection-factory id="rabbitConnectionFactory" />
13
14 <!--创建必要的exchange及queue-->
15 <rabbit:admin connection-factory="rabbitConnectionFactory" />
16 <rabbit:direct-exchange name="exchange">
17 <rabbit:bindings>
18 <rabbit:binding queue="queue.appgw" key="queue.appgw">
19 </rabbit:binding>
20 </rabbit:bindings>
21 </rabbit:direct-exchange>
22 <rabbit:queue name="queue.appgw">
23 </rabbit:queue>
24
25 <!--返回结果的template-->
26 <rabbit:template id="amqpTemplateInternetProxy"
27 connection-factory="rabbitConnectionFactory">
28 </rabbit:template>
29 <!--服务监听-->
30 <rabbit:listener-container acknowledge="none"
31 max-concurrency="128" prefetch="10">
32 <rabbit:listener ref="service" queue-names="queue.appgw" />
33 </rabbit:listener-container>
34 <!--服务实现-->
35 <bean id="serviceImpl" class="net.nxmax.atp.exporter.ServiceImpl"></bean>
36 <!--服务发布-->
37 <bean id="service"
38 class="org.springframework.amqp.remoting.service.AmqpInvokerServiceExporter">
39 <property name="serviceInterface" value="net.nxmax.atp.exporter.Service" />
40 <property name="service" ref="serviceImpl" />
41 <property name="amqpTemplate" ref="amqpTemplateInternetProxy" />
42 </bean>
43</beans>
44
发布服务代码:
1
2
3
4
5
6 1ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext("applicationContext.xml");
2BufferedReader r = new BufferedReader(new InputStreamReader(System.in));
3while (!r.readLine().equalsIgnoreCase("exit")) {
4}
5ctx.destroy();
6
客户端实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 1<?xml version="1.0" encoding="UTF-8"?>
2<beans xmlns="http://www.springframework.org/schema/beans"
3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
4 xmlns:lang="http://www.springframework.org/schema/lang" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
5 xmlns:util="http://www.springframework.org/schema/util" xmlns:p="http://www.springframework.org/schema/p"
6 xsi:schemaLocation="http://www.springframework.org/schema/lang http://www.springframework.org/schema/lang/spring-lang-3.0.xsd
7 http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd
8 http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
9 http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd
10 http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd">
11 <!--MQ连接-->
12 <rabbit:connection-factory id="rabbitConnectionFactory"/>
13 <!--发送请求的template-->
14 <rabbit:template id="amqpTemplate" connection-factory="rabbitConnectionFactory"
15 queue="queue.appgw" exchange="exchange" reply-timeout="60000" />
16 <bean class="org.springframework.amqp.remoting.client.AmqpProxyFactoryBean"
17 p:serviceInterface="net.nxmax.atp.exporter.Service" p:routingKey="queue.appgw">
18 <property name="amqpTemplate" ref="amqpTemplate" />
19 </bean>
20</beans>
21
22
客户端调用代码:
1
2
3
4
5 1ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext("ctx-client.xml");
2Service service = ctx.getBean(Service.class);
3service.sayHello("name");
4ctx.destroy();
5
优化
问题
使用默认的spring实现存在以下的限制:
- 消息使用持久化,对于RPC来说消息是不需要持久化的。
- 消息没有过期时间,意味着后端可能需要处理很久以前的请求,对于RPC来说很久以前的请求应该要抛弃。
- 默认使用临时队列作为结果返回队列,意味着每次调用都需要创建队列,性能极差。
- 可以配置使用固定队列返回结果,但是如果多个节点使用一个配置,同时监听固定队列,可能造成节点收不到结果。
- 使用默认的java作为序列化实现,性能不如Kryo。
方案
针对默认实现存在的问题,可以使用以下优化方案:
- 每一个客户端使用独立规定的结果返回队列,避免创建临时队列。
- 设置消息为非持久化。
- 设置消息的超时时间,使用Kryo作为序列化库。
实现
需要做的关键修改有以下3个:
创建自定义的MessageConverter,将消息设置为非持久化,设定过期时间,以及使用kryo序列化。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48 1public class MessageConverterWithExpire extends SimpleMessageConverter {
2
3 /** Logger */
4 protected static final Logger log = LoggerFactory
5 .getLogger(MessageConverterWithExpire.class);
6
7 // Setup ThreadLocal of Kryo instances
8 private ThreadLocal<Kryo> kryos = new ThreadLocal<Kryo>() {
9 protected Kryo initialValue() {
10 Kryo kryo = new Kryo();
11 // configure kryo instance, customize settings
12 kryo.register(RemoteInvocation.class);
13 Registration reg = kryo.register(RemoteInvocationResult.class);
14 reg.setInstantiator(new ObjectInstantiator() {
15 @Override
16 public Object newInstance() {
17 return new RemoteInvocationResult(null);
18 }
19
20 });
21 return kryo;
22 };
23 };
24
25 @Override
26 protected Message createMessage(Object object,
27 MessageProperties messageProperties)
28 throws MessageConversionException {
29 // expire in ms.
30 messageProperties.setExpiration("45000");
31 messageProperties.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
32
33 Kryo k = kryos.get();
34 Output output = new Output(4096, 4194304);
35 k.writeClassAndObject(output, object);
36 return new Message(output.toBytes(), messageProperties);
37 // return super.createMessage(object, messageProperties);
38 }
39
40 @Override
41 public Object fromMessage(Message message)
42 throws MessageConversionException {
43 Kryo k = kryos.get();
44 return k.readClassAndObject(new Input(message.getBody()));
45 // return super.fromMessage(message);
46 }
47}
48
使用自定义的listenerContainer,在监听前创建规定的结果返回队列。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 1public class DynamicReplyMessageListenerContainer extends
2 SimpleMessageListenerContainer {
3 @Override
4 protected void doInitialize() throws Exception {
5 super.doInitialize();
6 Object listener = getMessageListener();
7 if (listener instanceof RabbitTemplate) {
8 RabbitTemplate template = (RabbitTemplate) listener;
9 Queue queue = getRabbitAdmin().declareQueue();
10 template.setReplyQueue(queue);
11 setQueues(queue);
12 }
13 }
14}
15
重写AmqpInvokerServiceExporter ,提供通过固定队列返回结果的支持。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40 1public class AmqpInvokerServiceExporterCorrelate extends
2 AmqpInvokerServiceExporter {
3 @Override
4 public void onMessage(Message message) {
5 Address replyToAddress = message.getMessageProperties()
6 .getReplyToAddress();
7 if (replyToAddress == null) {
8 throw new AmqpRejectAndDontRequeueException(
9 "No replyToAddress in inbound AMQP Message");
10 }
11
12 Object invocationRaw = getMessageConverter().fromMessage(message);
13
14 RemoteInvocationResult remoteInvocationResult;
15 if (invocationRaw == null
16 || !(invocationRaw instanceof RemoteInvocation)) {
17 remoteInvocationResult = new RemoteInvocationResult(
18 new IllegalArgumentException(
19 "The message does not contain a RemoteInvocation payload"));
20 } else {
21 RemoteInvocation invocation = (RemoteInvocation) invocationRaw;
22 remoteInvocationResult = invokeAndCreateResult(invocation,
23 getService());
24 }
25 send(remoteInvocationResult, replyToAddress, message);
26 }
27
28 private void send(Object object, Address replyToAddress,
29 Message sourceMessage) {
30 MessageProperties mp = new MessageProperties();
31 mp.setCorrelationId(sourceMessage.getMessageProperties()
32 .getCorrelationId());
33 Message message = getMessageConverter().toMessage(object, mp);
34
35 getAmqpTemplate().send(replyToAddress.getExchangeName(),
36 replyToAddress.getRoutingKey(), message);
37 }
38
39}
40
测试结果
默认实现的tps在640左右。经过优化的TPS在4000左右。更多内容请参见测试代码。
测试代码下载