使用Rabbitmq/spring进行RPC

释放双眼,带上耳机,听听看~!

使用RabbitMQ进行RPC的必要

常见的RPC方法/协议包括CORBA,Java RMI,Web Service,Hessian,Thrift及Rest API,相对于前面提到的RPC方式,使用RabbbitMQ(JMS也是一样)方式需要在client-service provider中间增加MQ组件,这样做增加了部署的复杂性,但同时带来额外的好处。

  1. 可以对service provider进行保护,MQ对请求进行缓冲,处理不了的请求可以被MQ抛弃而不会压垮service provider。
  2. 可以隔离低安全区对高安全区的访问,此优点是其他rpc没有的。

一个典型的互联网访问方式如下(FW表示防火墙)
client—-FW1–>frontend server(DMZ)—–FW2—->service provider(高安全区)
通过MQ进行RPC则变成
client—-FW1–>frontend server(DMZ)—>MQ<–FW2—-service provider(高安全区)
这里一个重要的区别是service provider(高安全区)是主动访问位于DMZ区域的MQ,由此可以设定FW2只允许高安全区访问DMZ,而禁止DMZ访问高安全区。此方式对于安全性要求高的如银行,金融,政府系统尤为重要。

实现spring remoting实现使用MQ的RPC


原理

一个RPC交互大致分为以下几个步骤:

  1. 服务端监听MQ队列
  2. 客户端将调用请求(调用请求包括结果返回队列)发送到队列中。
  3. 客户端在结果返回队列监听。
  4. 服务端处理业务,将结果发送到结果返回队列。

公共内容

定义RabbitMQ的连接,定义service接口,进行通讯的queue。


1
2
3
4
1public interface Service {
2    String sayHello(String name);
3}
4

服务端实现

配置文件内容


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
1&lt;?xml version=&quot;1.0&quot; encoding=&quot;UTF-8&quot;?&gt;
2&lt;beans xmlns=&quot;http://www.springframework.org/schema/beans&quot;
3    xmlns:xsi=&quot;http://www.w3.org/2001/XMLSchema-instance&quot; xmlns:context=&quot;http://www.springframework.org/schema/context&quot;
4    xmlns:lang=&quot;http://www.springframework.org/schema/lang&quot; xmlns:rabbit=&quot;http://www.springframework.org/schema/rabbit&quot;
5    xmlns:util=&quot;http://www.springframework.org/schema/util&quot;
6    xsi:schemaLocation=&quot;http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
7        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
8        http://www.springframework.org/schema/lang http://www.springframework.org/schema/lang/spring-lang-3.0.xsd
9        http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd
10        http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd&quot;&gt;
11    &lt;!--MQ连接--&gt;
12    &lt;rabbit:connection-factory id=&quot;rabbitConnectionFactory&quot; /&gt;
13
14    &lt;!--创建必要的exchange及queue--&gt;
15    &lt;rabbit:admin connection-factory=&quot;rabbitConnectionFactory&quot; /&gt;
16    &lt;rabbit:direct-exchange name=&quot;exchange&quot;&gt;
17        &lt;rabbit:bindings&gt;
18            &lt;rabbit:binding queue=&quot;queue.appgw&quot; key=&quot;queue.appgw&quot;&gt;
19            &lt;/rabbit:binding&gt;
20        &lt;/rabbit:bindings&gt;
21    &lt;/rabbit:direct-exchange&gt;
22    &lt;rabbit:queue name=&quot;queue.appgw&quot;&gt;
23    &lt;/rabbit:queue&gt;
24
25    &lt;!--返回结果的template--&gt;
26    &lt;rabbit:template id=&quot;amqpTemplateInternetProxy&quot;
27        connection-factory=&quot;rabbitConnectionFactory&quot;&gt;
28    &lt;/rabbit:template&gt;
29    &lt;!--服务监听--&gt;
30    &lt;rabbit:listener-container acknowledge=&quot;none&quot;
31        max-concurrency=&quot;128&quot; prefetch=&quot;10&quot;&gt;
32        &lt;rabbit:listener ref=&quot;service&quot; queue-names=&quot;queue.appgw&quot; /&gt;
33    &lt;/rabbit:listener-container&gt;
34    &lt;!--服务实现--&gt;
35    &lt;bean id=&quot;serviceImpl&quot; class=&quot;net.nxmax.atp.exporter.ServiceImpl&quot;&gt;&lt;/bean&gt;
36    &lt;!--服务发布--&gt;
37    &lt;bean id=&quot;service&quot;
38        class=&quot;org.springframework.amqp.remoting.service.AmqpInvokerServiceExporter&quot;&gt;
39        &lt;property name=&quot;serviceInterface&quot; value=&quot;net.nxmax.atp.exporter.Service&quot; /&gt;
40        &lt;property name=&quot;service&quot; ref=&quot;serviceImpl&quot; /&gt;
41        &lt;property name=&quot;amqpTemplate&quot; ref=&quot;amqpTemplateInternetProxy&quot; /&gt;
42    &lt;/bean&gt;
43&lt;/beans&gt;
44

发布服务代码:


1
2
3
4
5
6
1ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext(&quot;applicationContext.xml&quot;);
2BufferedReader r = new BufferedReader(new InputStreamReader(System.in));
3while (!r.readLine().equalsIgnoreCase(&quot;exit&quot;)) {
4}
5ctx.destroy();
6

客户端实现


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
1&lt;?xml version=&quot;1.0&quot; encoding=&quot;UTF-8&quot;?&gt;
2&lt;beans xmlns=&quot;http://www.springframework.org/schema/beans&quot;
3    xmlns:xsi=&quot;http://www.w3.org/2001/XMLSchema-instance&quot; xmlns:context=&quot;http://www.springframework.org/schema/context&quot;
4    xmlns:lang=&quot;http://www.springframework.org/schema/lang&quot; xmlns:rabbit=&quot;http://www.springframework.org/schema/rabbit&quot;
5    xmlns:util=&quot;http://www.springframework.org/schema/util&quot; xmlns:p=&quot;http://www.springframework.org/schema/p&quot;
6    xsi:schemaLocation=&quot;http://www.springframework.org/schema/lang http://www.springframework.org/schema/lang/spring-lang-3.0.xsd
7        http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd
8        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
9        http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd
10        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd&quot;&gt;
11    &lt;!--MQ连接--&gt;
12    &lt;rabbit:connection-factory id=&quot;rabbitConnectionFactory&quot;/&gt;
13    &lt;!--发送请求的template--&gt;
14    &lt;rabbit:template id=&quot;amqpTemplate&quot; connection-factory=&quot;rabbitConnectionFactory&quot;
15        queue=&quot;queue.appgw&quot; exchange=&quot;exchange&quot; reply-timeout=&quot;60000&quot; /&gt;
16    &lt;bean class=&quot;org.springframework.amqp.remoting.client.AmqpProxyFactoryBean&quot;
17        p:serviceInterface=&quot;net.nxmax.atp.exporter.Service&quot; p:routingKey=&quot;queue.appgw&quot;&gt;
18        &lt;property name=&quot;amqpTemplate&quot; ref=&quot;amqpTemplate&quot; /&gt;
19    &lt;/bean&gt;
20&lt;/beans&gt;
21
22

客户端调用代码:


1
2
3
4
5
1ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext(&quot;ctx-client.xml&quot;);
2Service service = ctx.getBean(Service.class);
3service.sayHello(&quot;name&quot;);
4ctx.destroy();
5

优化

问题

使用默认的spring实现存在以下的限制:

  • 消息使用持久化,对于RPC来说消息是不需要持久化的。
  • 消息没有过期时间,意味着后端可能需要处理很久以前的请求,对于RPC来说很久以前的请求应该要抛弃。
  • 默认使用临时队列作为结果返回队列,意味着每次调用都需要创建队列,性能极差。
  • 可以配置使用固定队列返回结果,但是如果多个节点使用一个配置,同时监听固定队列,可能造成节点收不到结果。
  • 使用默认的java作为序列化实现,性能不如Kryo。

方案

针对默认实现存在的问题,可以使用以下优化方案:

  • 每一个客户端使用独立规定的结果返回队列,避免创建临时队列。
  • 设置消息为非持久化。
  • 设置消息的超时时间,使用Kryo作为序列化库。

实现

需要做的关键修改有以下3个:
创建自定义的MessageConverter,将消息设置为非持久化,设定过期时间,以及使用kryo序列化。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
1public class MessageConverterWithExpire extends SimpleMessageConverter {
2
3    /** Logger */
4    protected static final Logger log = LoggerFactory
5            .getLogger(MessageConverterWithExpire.class);
6
7    // Setup ThreadLocal of Kryo instances
8    private ThreadLocal&lt;Kryo&gt; kryos = new ThreadLocal&lt;Kryo&gt;() {
9        protected Kryo initialValue() {
10            Kryo kryo = new Kryo();
11            // configure kryo instance, customize settings
12            kryo.register(RemoteInvocation.class);
13            Registration reg = kryo.register(RemoteInvocationResult.class);
14            reg.setInstantiator(new ObjectInstantiator() {
15                @Override
16                public Object newInstance() {
17                    return new RemoteInvocationResult(null);
18                }
19
20            });
21            return kryo;
22        };
23    };
24
25    @Override
26    protected Message createMessage(Object object,
27            MessageProperties messageProperties)
28            throws MessageConversionException {
29        // expire in ms.
30        messageProperties.setExpiration(&quot;45000&quot;);
31        messageProperties.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
32
33        Kryo k = kryos.get();
34        Output output = new Output(4096, 4194304);
35        k.writeClassAndObject(output, object);
36        return new Message(output.toBytes(), messageProperties);
37        // return super.createMessage(object, messageProperties);
38    }
39
40    @Override
41    public Object fromMessage(Message message)
42            throws MessageConversionException {
43        Kryo k = kryos.get();
44        return k.readClassAndObject(new Input(message.getBody()));
45        // return super.fromMessage(message);
46    }
47}
48

使用自定义的listenerContainer,在监听前创建规定的结果返回队列。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
1public class DynamicReplyMessageListenerContainer extends
2        SimpleMessageListenerContainer {
3    @Override
4    protected void doInitialize() throws Exception {
5        super.doInitialize();
6        Object listener = getMessageListener();
7        if (listener instanceof RabbitTemplate) {
8            RabbitTemplate template = (RabbitTemplate) listener;
9            Queue queue = getRabbitAdmin().declareQueue();
10            template.setReplyQueue(queue);
11            setQueues(queue);
12        }
13    }
14}
15

重写AmqpInvokerServiceExporter ,提供通过固定队列返回结果的支持。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
1public class AmqpInvokerServiceExporterCorrelate extends
2        AmqpInvokerServiceExporter {
3    @Override
4    public void onMessage(Message message) {
5        Address replyToAddress = message.getMessageProperties()
6                .getReplyToAddress();
7        if (replyToAddress == null) {
8            throw new AmqpRejectAndDontRequeueException(
9                    &quot;No replyToAddress in inbound AMQP Message&quot;);
10        }
11
12        Object invocationRaw = getMessageConverter().fromMessage(message);
13
14        RemoteInvocationResult remoteInvocationResult;
15        if (invocationRaw == null
16                || !(invocationRaw instanceof RemoteInvocation)) {
17            remoteInvocationResult = new RemoteInvocationResult(
18                    new IllegalArgumentException(
19                            &quot;The message does not contain a RemoteInvocation payload&quot;));
20        } else {
21            RemoteInvocation invocation = (RemoteInvocation) invocationRaw;
22            remoteInvocationResult = invokeAndCreateResult(invocation,
23                    getService());
24        }
25        send(remoteInvocationResult, replyToAddress, message);
26    }
27
28    private void send(Object object, Address replyToAddress,
29            Message sourceMessage) {
30        MessageProperties mp = new MessageProperties();
31        mp.setCorrelationId(sourceMessage.getMessageProperties()
32                .getCorrelationId());
33        Message message = getMessageConverter().toMessage(object, mp);
34
35        getAmqpTemplate().send(replyToAddress.getExchangeName(),
36                replyToAddress.getRoutingKey(), message);
37    }
38
39}
40

测试结果

默认实现的tps在640左右。经过优化的TPS在4000左右。更多内容请参见测试代码。
测试代码下载

给TA打赏
共{{data.count}}人
人已打赏
安全网络

CDN安全市场到2022年价值76.3亿美元

2018-2-1 18:02:50

安全运维

WOT2016崔灿:带你了解不一样的百度开放云存储

2016-12-24 20:38:06

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索