1.Netty
Netty是由JBOSS提供的一个java开源框架。在吸收了FTP,SMTP,HTTP,各种二进制,文本协议等多种协议的实现经验,并经过设计相当精心的项目后,Netty最终成功地找到了一种方式,在保证易于开发的同时还保证了其应用的性能,稳定性和伸缩性。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。Netty是一个基于NIO的客户,服务器端编程框架,使用Netty可以确保你快速和简单的开发出一个网络应用。
建议参考《Netty权威指南》或《Netty In Action》,后者已有中文翻译版本在此
2.依赖配置
本例使用netty 4.0.23.Final实现网络通信
1
2
3
4
5
6
7 1<dependency>
2 <groupId>io.netty</groupId>
3 <artifactId>netty-all</artifactId>
4 <version>4.0.23.Final</version>
5 <scope>compile</scope>
6</dependency>
7
使用maven直接在pom.xml中添加上面的依赖即可
3.ChannelHandler设计
本例情景相对简单,在客户端和服务端均只有3个ChannelHandler。两者均有一个NettyKryoEncoder和一个NettyKryoDecoder。其中
NettyKryoEncoder用于将发送的对象序列化为字节序列,是一个
ChannelInboundHandler。而
NettyKryoDecoder用于从接收到的字节序列还原出对象,是一个
ChannelOutboundHandler。
客户端还有一个RpcClientDispatchHandler,用于接收到服务端返回的对象时(实际应用场景中只可能为RpcResponse对象)进行事件的分发。与之类似,服务端设有一个RpcServerDispatchHandler,用于接收到客户端发送的对象时
(实际应用场景中只可能为RpcRequest对象)进行事件分发。
(1).NettyKryoEncoder
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 1package com.maigo.rpc.netty;
2
3import com.maigo.rpc.serializer.KryoSerializer;
4
5import io.netty.buffer.ByteBuf;
6import io.netty.channel.ChannelHandlerContext;
7import io.netty.handler.codec.MessageToByteEncoder;
8
9public class NettyKryoEncoder extends MessageToByteEncoder<Object>
10{
11 @Override
12 protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out)
13 throws Exception
14 {
15 KryoSerializer.serialize(msg, out);
16 }
17}
18
19
直接调用上一节实现的KryoSerializer进行序列化
(2).NettyKryoDecoder
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 1package com.maigo.rpc.netty;
2
3import com.maigo.rpc.serializer.KryoSerializer;
4
5import io.netty.buffer.ByteBuf;
6import io.netty.channel.ChannelHandlerContext;
7import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
8
9public class NettyKryoDecoder extends LengthFieldBasedFrameDecoder
10{
11 public NettyKryoDecoder()
12 {
13 super(1048576, 0, 4, 0, 4);
14 }
15
16 @Override
17 protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception
18 {
19 ByteBuf frame = (ByteBuf) super.decode(ctx, in);
20 if (frame == null)
21 return null;
22
23 return KryoSerializer.deserialize(frame);
24 }
25
26 @Override
27 protected ByteBuf extractFrame(ChannelHandlerContext ctx, ByteBuf buffer, int index, int length)
28 {
29 return buffer.slice(index, length);
30 }
31}
32
33
使用LengthFieldBasedFrameDecoder进行解码,解码依赖于字节序列的头部的4个字节中存放的长度信息(KryoSerializer.serialize()方法会在头部写入长度信息,详见上一节)。获取到长度信息后使用父类的decode()方法截取出实际的字节序列,直接调用
KryoSerializer的deserialize()方法反序列化还原出对象。
(3).RpcClientDispatchHandler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44 1package com.maigo.rpc.client;
2
3import com.maigo.rpc.context.RpcResponse;
4
5import io.netty.channel.ChannelHandlerContext;
6import io.netty.channel.ChannelInboundHandlerAdapter;
7
8public class RpcClientDispatchHandler extends ChannelInboundHandlerAdapter
9{
10 private RpcClientResponseHandler rpcClientResponseHandler;
11 private RpcClientChannelInactiveListener rpcClientChannelInactiveListener = null;
12
13 public RpcClientDispatchHandler(
14 RpcClientResponseHandler rpcClientResponseHandler,
15 RpcClientChannelInactiveListener rpcClientChannelInactiveListener)
16 {
17 this.rpcClientResponseHandler = rpcClientResponseHandler;
18 this.rpcClientChannelInactiveListener = rpcClientChannelInactiveListener;
19 }
20
21 @Override
22 public void channelRead(ChannelHandlerContext ctx, Object msg)
23 throws Exception
24 {
25 RpcResponse rpcResponse = (RpcResponse)msg;
26 rpcClientResponseHandler.addResponse(rpcResponse);
27 }
28
29 @Override
30 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
31 throws Exception
32 {
33
34 }
35
36 @Override
37 public void channelInactive(ChannelHandlerContext ctx) throws Exception
38 {
39 if(rpcClientChannelInactiveListener != null)
40 rpcClientChannelInactiveListener.onInactive();
41 }
42}
43
44
1
2 1channelRead()在收到某个对象(由
2
NettyKryoDecoder解码还原得到的对象)后,强转为RpcResponse并调用RpcClientResponseHandler的addResponse()方法添加收到的
RpcResponse。
RpcClientResponseHandler需要在创建RpcClientDispatchHandler时从外部传入,每个RpcClient应该有且只有一个
RpcClientResponseHandler,用于收到服务端的调用结果
RpcResponse时进行相应的处理。RpcClientChannelInactiveListener用于在channel变为InActive状态时(一般为由于意外情况与服务端的连接中断了),调用其中的回调方法,用于实现短线自动重连功能。
(4).RpcServerDispatchHandler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37 1package com.maigo.rpc.server;
2
3import com.maigo.rpc.context.RpcRequest;
4import com.maigo.rpc.context.RpcRequestWrapper;
5
6import io.netty.channel.ChannelHandlerContext;
7import io.netty.channel.ChannelInboundHandlerAdapter;
8
9public class RpcServerDispatchHandler extends ChannelInboundHandlerAdapter
10{
11 private RpcServerRequestHandler rpcServerRequestHandler;
12
13 public RpcServerDispatchHandler(
14 RpcServerRequestHandler rpcServerRequestHandler)
15 {
16 this.rpcServerRequestHandler = rpcServerRequestHandler;
17 }
18
19 @Override
20 public void channelRead(ChannelHandlerContext ctx, Object msg)
21 throws Exception
22 {
23 RpcRequest rpcRequest = (RpcRequest)msg;
24 RpcRequestWrapper rpcRequestWrapper = new RpcRequestWrapper(rpcRequest, ctx.channel());
25
26 rpcServerRequestHandler.addRequest(rpcRequestWrapper);
27 }
28
29 @Override
30 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
31 throws Exception
32 {
33
34 }
35}
36
37
与RpcClientDispatchHandler类似,将收到的RpcRequest送进RpcServerRequestHandler进行处理。不同的是此处将
RpcRequest和Channel包装到了一起生成了RpcRequestWrapper,
RpcServerRequestHandler读取
RpcRequestWrapper完成其中的请求的调用后,通过
RpcRequestWrapper中的Channel将结果返回回去,调用结果必然是从收到请求的Channel返回的。
4.客户端的启动配置
前文中实现的RPC客户端的连接部分仅为一行打印输出的模拟,现替换为实际连接的代码逻辑(仅给出改变的部分)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69 1public void connect()
2 {
3 bootstrap = new Bootstrap();
4 EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
5 try
6 {
7 bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
8 .handler(new ChannelInitializer<Channel>()
9 {
10 @Override
11 protected void initChannel(Channel ch) throws Exception
12 {
13 ch.pipeline().addLast(new NettyKryoDecoder(),
14 new RpcClientDispatchHandler(rpcClientResponseHandler, rpcClientChannelInactiveListener),
15 new NettyKryoEncoder());
16 }
17 });
18 bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
19 bootstrap.option(ChannelOption.TCP_NODELAY, true);
20 bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
21
22 do
23 {
24 channel = tryConnect();
25 }
26 while(channel == null);
27
28 }
29 catch(Exception e)
30 {
31 e.printStackTrace();
32 }
33 }
34
35 private Channel tryConnect()
36 {
37 try
38 {
39 InfoPrinter.println("Try to connect to [" + host + ":" + port + "].");
40 ChannelFuture future = bootstrap.connect(host, port).sync();
41 if(future.isSuccess())
42 {
43 InfoPrinter.println("Connect to [" + host + ":" + port + "] successed.");
44 return future.channel();
45 }
46 else
47 {
48 InfoPrinter.println("Connect to [" + host + ":" + port + "] failed.");
49 InfoPrinter.println("Try to reconnect in 10s.");
50 Thread.sleep(10000);
51 return null;
52 }
53 }
54 catch (Exception exception)
55 {
56 InfoPrinter.println("Connect to [" + host + ":" + port + "] failed.");
57 InfoPrinter.println("Try to reconnect in 10 seconds.");
58 try
59 {
60 Thread.sleep(10000);
61 }
62 catch (InterruptedException e)
63 {
64 e.printStackTrace();
65 }
66 return null;
67 }
68 }
69
1
2 1connect()方法中完成了对bootstrap的配置,tryConnect()方法尝试连接到服务端并返回连接后的Channel,若连接失败返回null。
2
connect()方法会一直尝试连接,失败后等待10秒重新尝试连接直到成功。可以看到在RpcClientDispatchHandler中注册了rpcClientChannelInactiveListener,这个回调接口用于实现连接断开后自动重连。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 1rpcClientChannelInactiveListener = new RpcClientChannelInactiveListener()
2 {
3 public void onInactive()
4 {
5 InfoPrinter.println("connection with server is closed.");
6 InfoPrinter.println("try to reconnect to the server.");
7 channel = null;
8 do
9 {
10 channel = tryConnect();
11 }
12 while(channel == null);
13 }
14 };
15
1
2 1与第一次连接类似,
2
一直尝试连接,失败后等待10秒重新尝试连接直到成功。
5.服务端的启动配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44 1public void start()
2 {
3 EventLoopGroup bossGroup = new NioEventLoopGroup(1);
4 EventLoopGroup workerGroup = new NioEventLoopGroup();
5 try
6 {
7 ServerBootstrap bootstrap = new ServerBootstrap();
8 bootstrap.group(bossGroup, workerGroup)
9 .channel(NioServerSocketChannel.class)
10 .childHandler(new ChannelInitializer<SocketChannel>()
11 {
12 @Override
13 protected void initChannel(SocketChannel ch) throws Exception
14 {
15 ch.pipeline().addLast(new NettyKryoDecoder(),
16 new RpcServerDispatchHandler(rpcServerRequestHandler),
17 new NettyKryoEncoder());
18 }
19 });
20 bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
21 bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
22 bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
23 bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
24
25 ChannelFuture channelFuture = bootstrap.bind(port);
26 channelFuture.sync();
27 Channel channel = channelFuture.channel();
28 InfoPrinter.println("RpcServer started.");
29 InfoPrinter.println(interfaceClass.getSimpleName() + " in service.");
30 channel.closeFuture().sync();
31 }
32 catch(Exception e)
33 {
34 e.printStackTrace();
35 }
36 finally
37 {
38 bossGroup.shutdownGracefully();
39 workerGroup.shutdownGracefully();
40 }
41 InfoPrinter.println("RpcServer started.");
42 InfoPrinter.println(interfaceClass.getSimpleName() + " in service.");
43 }
44
1
2 1与客户端类似,只是服务端分开了两个EventLoopGroup分别用于处理对端口的监听和Channel上IO读写。由于只监听1个端口,bossGroup大小设为1,workerGroup大小保持默认(默认为CPU核数的2倍)。
2
6.客户端对返回结果的处理
在多线程并发使用同一个RpcClient(无论是使用同步方式下的动态代理对象还是异步方式下的RpcClientAsyncProxy)时,由于网络情况和服务端对请求的处理时间等等的不确定,后调用的请求先返回结果是完全有可能的,因此必须使用一个唯一的id对该次调用请求进行标识。由于某次调用请求的结果只会通过发送该请求的Channel返回,服务端并不需要通过id区分不同的客户端的请求,所以此id无需全局唯一,只需要在客户端唯一即可,使用AtomicInteger进行自增即可实现。
(1).RpcClientResponseHandler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40 1package com.maigo.rpc.client;
2
3import java.util.concurrent.BlockingQueue;
4import java.util.concurrent.ConcurrentHashMap;
5import java.util.concurrent.ConcurrentMap;
6import java.util.concurrent.ExecutorService;
7import java.util.concurrent.Executors;
8import java.util.concurrent.LinkedBlockingQueue;
9
10import com.maigo.rpc.context.RpcResponse;
11import com.maigo.rpc.future.RpcFuture;
12
13public class RpcClientResponseHandler
14{
15 private ConcurrentMap<Integer, RpcFuture> invokeIdRpcFutureMap = new ConcurrentHashMap<Integer, RpcFuture>();
16
17 private ExecutorService threadPool;
18 private BlockingQueue<RpcResponse> responseQueue = new LinkedBlockingQueue<RpcResponse>();
19
20 public RpcClientResponseHandler(int threads)
21 {
22 threadPool = Executors.newFixedThreadPool(threads);
23 for(int i=0; i<threads; i++)
24 {
25 threadPool.execute(new RpcClientResponseHandleRunnable(invokeIdRpcFutureMap, responseQueue));
26 }
27 }
28
29 public void register(int id, RpcFuture rpcFuture)
30 {
31 invokeIdRpcFutureMap.put(id, rpcFuture);
32 }
33
34 public void addResponse(RpcResponse rpcResponse)
35 {
36 responseQueue.add(rpcResponse);
37 }
38}
39
40
RpcClientResponseHandler内部维护了一个Map用于管理某次调用的id和该次调用返回的RpcFuture(本例中同步调用是通过异步调用实现的,因此每次调用必然会有RpcFuture产生,区别只在于
RpcFuture是否直接返回给用户)。在RpcClientResponseHandler创建时开启了一个线程池并根据参数threads创建了指定数量的
RpcClientResponseHandleRunnable并开始执行,处理的具体实现在RpcClientResponseHandleRunnable中。
(2).RpcClientResponseHandleRunnable
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46 1package com.maigo.rpc.client;
2
3import java.util.concurrent.BlockingQueue;
4import java.util.concurrent.ConcurrentMap;
5
6import com.maigo.rpc.context.RpcResponse;
7import com.maigo.rpc.future.RpcFuture;
8
9public class RpcClientResponseHandleRunnable implements Runnable
10{
11 private ConcurrentMap<Integer, RpcFuture> invokeIdRpcFutureMap;
12 private BlockingQueue<RpcResponse> responseQueue;
13
14 public RpcClientResponseHandleRunnable(
15 ConcurrentMap<Integer, RpcFuture> invokeIdRpcFutureMap,
16 BlockingQueue<RpcResponse> responseQueue)
17 {
18 this.invokeIdRpcFutureMap = invokeIdRpcFutureMap;
19 this.responseQueue = responseQueue;
20 }
21
22 public void run()
23 {
24 while(true)
25 {
26 try
27 {
28 RpcResponse rpcResponse = responseQueue.take();
29
30 int id = rpcResponse.getId();
31 RpcFuture rpcFuture = invokeIdRpcFutureMap.remove(id);
32
33 if(rpcResponse.isInvokeSuccess())
34 rpcFuture.setResult(rpcResponse.getResult());
35 else
36 rpcFuture.setThrowable(rpcResponse.getThrowable());
37 }
38 catch (InterruptedException e)
39 {
40 e.printStackTrace();
41 }
42 }
43 }
44}
45
46
从响应队列responseQueue中不断取出RpcResponse,根据RpcResponse的id(同一次调用中
RpcResponse的id和RpcRequest相同)取出该次调用的RpcFuture。判断返回状态,若是成功则对
RpcFuture调用setResult()方法,否则调用setThrowable()方法。
(3).RpcClient中调用id的生成
id使用AtomicInteger生成,每次调用自增。RpcClient中的call()方法修改为(之前未实现调用请求的发送,仅为打印输出的模拟)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 1public RpcFuture call(String methodName, Object ... args)
2 {
3 if(rpcInvokeHook != null)
4 rpcInvokeHook.beforeInvoke(methodName, args);
5
6 RpcFuture rpcFuture = new RpcFuture();
7 int id = invokeIdGenerator.addAndGet(1);
8 rpcClientResponseHandler.register(id, rpcFuture);
9
10 RpcRequest rpcRequest = new RpcRequest(id, methodName, args);
11 if(channel != null)
12 channel.writeAndFlush(rpcRequest);
13 else
14 return null;
15
16 return rpcFuture;
17 }
18
1
2 1每次调用方法时,生成RpcRequest并填入方法名和参数,同时生成一个唯一的id用于标识本次调用,填入
2
RpcRequest中。产生一个RpcFuture连同id注册到rpcClientResponseHandler中,通过channel将
RpcRequest发送至服务端,最后将RpcFuture返回。至于是在RpcFuture上面阻塞直到结果返回(同步调用方式)还是直接将RpcFuture返回给用户则因调用方式而异,而对于返回结果的注册和处理都是一样的。
7.服务端对调用请求的处理
(1).RpcServerRequestHandler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53 1package com.maigo.rpc.server;
2
3import java.util.concurrent.BlockingQueue;
4import java.util.concurrent.ExecutorService;
5import java.util.concurrent.Executors;
6import java.util.concurrent.LinkedBlockingQueue;
7
8import com.maigo.rpc.aop.RpcInvokeHook;
9import com.maigo.rpc.context.RpcRequestWrapper;
10
11public class RpcServerRequestHandler
12{
13 private Class<?> interfaceClass;
14 private Object serviceProvider;
15 private RpcInvokeHook rpcInvokeHook;
16
17 private int threads;
18 private ExecutorService threadPool;
19 private BlockingQueue<RpcRequestWrapper> requestQueue = new LinkedBlockingQueue<RpcRequestWrapper>();
20
21 public RpcServerRequestHandler(Class<?> interfaceClass, Object serviceProvider, int threads,
22 RpcInvokeHook rpcInvokeHook)
23 {
24 this.interfaceClass = interfaceClass;
25 this.serviceProvider = serviceProvider;
26 this.threads = threads;
27 this.rpcInvokeHook = rpcInvokeHook;
28 }
29
30 public void start()
31 {
32 threadPool = Executors.newFixedThreadPool(threads);
33 for(int i=0; i<threads; i++)
34 {
35 threadPool.execute(new RpcServerRequestHandleRunnable(interfaceClass,
36 serviceProvider, rpcInvokeHook, requestQueue));
37 }
38 }
39
40 public void addRequest(RpcRequestWrapper rpcRequestWrapper)
41 {
42 try
43 {
44 requestQueue.put(rpcRequestWrapper);
45 }
46 catch (InterruptedException e)
47 {
48 e.printStackTrace();
49 }
50 }
51}
52
53
已在第二节给出过,仅作了少许改动(addRequest接受RpcRequestWrapper而不是RpcRequest)
(2).RpcServerRequestHandleRunnable
实际调用被请求方法,相比第二节的代码做了一些改动,不再使用JDK自带的反射而使用Reflectasm
1
2
3
4
5
6 1<dependency>
2 <groupId>com.esotericsoftware</groupId>
3 <artifactId>reflectasm</artifactId>
4 <version>1.11.0</version>
5</dependency>
6
在pom.xml的<
dependencies
中添加相关依赖,此处使用的是最新版的1.11.0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82 1package com.maigo.rpc.server;
2
3import io.netty.channel.Channel;
4
5import java.util.concurrent.BlockingQueue;
6
7import com.esotericsoftware.reflectasm.MethodAccess;
8import com.maigo.rpc.aop.RpcInvokeHook;
9import com.maigo.rpc.context.RpcRequestWrapper;
10import com.maigo.rpc.context.RpcResponse;
11
12public class RpcServerRequestHandleRunnable implements Runnable
13{
14 private Class<?> interfaceClass;
15 private Object serviceProvider;
16 private RpcInvokeHook rpcInvokeHook;
17 private BlockingQueue<RpcRequestWrapper> requestQueue;
18 private RpcRequestWrapper rpcRequestWrapper;
19
20 private MethodAccess methodAccess;
21 private String lastMethodName = "";
22 private int lastMethodIndex;
23
24 public RpcServerRequestHandleRunnable(Class<?> interfaceClass,
25 Object serviceProvider, RpcInvokeHook rpcInvokeHook,
26 BlockingQueue<RpcRequestWrapper> requestQueue)
27 {
28 this.interfaceClass = interfaceClass;
29 this.serviceProvider = serviceProvider;
30 this.rpcInvokeHook = rpcInvokeHook;
31 this.requestQueue = requestQueue;
32
33 methodAccess = MethodAccess.get(interfaceClass);
34 }
35
36 public void run()
37 {
38 while(true)
39 {
40 try
41 {
42 rpcRequestWrapper = requestQueue.take();
43
44 String methodName = rpcRequestWrapper.getMethodName();
45 Object[] args = rpcRequestWrapper.getArgs();
46
47 if(rpcInvokeHook != null)
48 rpcInvokeHook.beforeInvoke(methodName, args);
49
50 Object result = null;
51 if(!methodName.equals(lastMethodName))
52 {
53 lastMethodIndex = methodAccess.getIndex(methodName);
54 lastMethodName = methodName;
55 }
56
57 result = methodAccess.invoke(serviceProvider, lastMethodIndex, args);
58
59 Channel channel = rpcRequestWrapper.getChannel();
60 RpcResponse rpcResponse = new RpcResponse();
61 rpcResponse.setId(rpcRequestWrapper.getId());
62 rpcResponse.setResult(result);
63 rpcResponse.setInvokeSuccess(true);
64 channel.writeAndFlush(rpcResponse);
65
66 if(rpcInvokeHook != null)
67 rpcInvokeHook.afterInvoke(methodName, args);
68 }
69 catch (Exception e)
70 {
71 Channel channel = rpcRequestWrapper.getChannel();
72 RpcResponse rpcResponse = new RpcResponse();
73 rpcResponse.setId(rpcRequestWrapper.getId());
74 rpcResponse.setThrowable(e);
75 rpcResponse.setInvokeSuccess(false);
76 channel.writeAndFlush(rpcResponse);
77 }
78 }
79 }
80}
81
82
1
2 1虽然获取MethodAccess会花费较长时间,但是调用方法时的methodAccess.invoke()方法会比JDK原生的反射花费更少的时间。对于RPC服务端,启动之后只需获取一次
2
MethodAccess(每个工作线程都需要获取一次),之后一直调用持有的
methodAccess对象即可,能够一定程度上提升性能。
在调用方法前后调用了Hook中的回调方法,对调用过程中可能抛出的异常进行了捕获并包装进RpcResponse传回客户端。
8.测试
(1).TestClientBuildAndCall
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38 1<span style="font-size:12px;">package com.maigo.rpc.test;
2
3import com.maigo.rpc.aop.RpcInvokeHook;
4import com.maigo.rpc.client.RpcClientProxyBuilder;
5
6public class TestClientBuildAndCall
7{
8 public static void main(String[] args)
9 {
10 RpcInvokeHook hook = new RpcInvokeHook()
11 {
12 public void beforeInvoke(String method, Object[] args)
13 {
14 System.out.println("before invoke in client" + method);
15 }
16
17 public void afterInvoke(String method, Object[] args)
18 {
19 System.out.println("after invoke in client" + method);
20 }
21 };
22
23 final TestInterface testInterface
24 = RpcClientProxyBuilder.create(TestInterface.class)
25 .timeout(0)
26 .threads(4)
27 .hook(hook)
28 .connect("127.0.0.1", 3721)
29 .build();
30
31 for(int i=0; i<10; i++)
32 {
33 System.out.println("invoke result = " + testInterface.testMethod01());
34 }
35 }
36}</span><span style="font-size:14px;">
37</span>
38
1
2 1创建RPC客户端(同步模式)并进行10次方法调用
2
(2).TestServerBuildAndStart
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43 1<span style="font-size:12px;">package com.maigo.rpc.test;
2
3import com.maigo.rpc.aop.RpcInvokeHook;
4import com.maigo.rpc.server.RpcServer;
5import com.maigo.rpc.server.RpcServerBuilder;
6
7public class TestServerBuildAndStart
8{
9 public static void main(String[] args)
10 {
11 TestInterface testInterface = new TestInterface()
12 {
13 public String testMethod01()
14 {
15 return "return from server";
16 }
17 };
18
19 RpcInvokeHook hook = new RpcInvokeHook()
20 {
21 public void beforeInvoke(String methodName, Object[] args)
22 {
23 System.out.println("beforeInvoke in server" + methodName);
24 }
25
26 public void afterInvoke(String methodName, Object[] args)
27 {
28 System.out.println("afterInvoke in server" + methodName);
29 }
30 };
31
32 RpcServer rpcServer = RpcServerBuilder.create()
33 .serviceInterface(TestInterface.class)
34 .serviceProvider(testInterface)
35 .threads(4)
36 .hook(hook)
37 .bind(3721)
38 .build();
39 rpcServer.start();
40 }
41}</span><span style="font-size:14px;">
42</span>
43
启动RPC服务端
先运行TestServerBuildAndStart.main()再运行TestClientBuildAndCall.main()
输出结果:
(1).
TestServerBuildAndStart
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 1[2015-09-20 18:37:46]:RpcServer started.
2[2015-09-20 18:37:46]:TestInterface in service.
3beforeInvoke in servertestMethod01
4afterInvoke in servertestMethod01
5beforeInvoke in servertestMethod01
6afterInvoke in servertestMethod01
7beforeInvoke in servertestMethod01
8afterInvoke in servertestMethod01
9beforeInvoke in servertestMethod01
10afterInvoke in servertestMethod01
11beforeInvoke in servertestMethod01
12afterInvoke in servertestMethod01
13beforeInvoke in servertestMethod01
14afterInvoke in servertestMethod01
15beforeInvoke in servertestMethod01
16afterInvoke in servertestMethod01
17beforeInvoke in servertestMethod01
18afterInvoke in servertestMethod01
19beforeInvoke in servertestMethod01
20afterInvoke in servertestMethod01
21beforeInvoke in servertestMethod01
22afterInvoke in servertestMethod01
23
(2).
TestClientBuildAndCall
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 1[2015-09-20 18:37:51]:Try to connect to [127.0.0.1:3721].
2[2015-09-20 18:37:51]:Connect to [127.0.0.1:3721] successed.
3before invoke in clienttestMethod01
4after invoke in clienttestMethod01
5invoke result = return from server
6before invoke in clienttestMethod01
7after invoke in clienttestMethod01
8invoke result = return from server
9before invoke in clienttestMethod01
10after invoke in clienttestMethod01
11invoke result = return from server
12before invoke in clienttestMethod01
13after invoke in clienttestMethod01
14invoke result = return from server
15before invoke in clienttestMethod01
16after invoke in clienttestMethod01
17invoke result = return from server
18before invoke in clienttestMethod01
19after invoke in clienttestMethod01
20invoke result = return from server
21before invoke in clienttestMethod01
22after invoke in clienttestMethod01
23invoke result = return from server
24before invoke in clienttestMethod01
25after invoke in clienttestMethod01
26invoke result = return from server
27before invoke in clienttestMethod01
28after invoke in clienttestMethod01
29invoke result = return from server
30before invoke in clienttestMethod01
31after invoke in clienttestMethod01
32invoke result = return from server
33
1
2 1 在客户端的调用请求被服务端正确的执行并返回了正确的结果,各Hook也得到了执行,RPC框架工作正常
2