1.RPC服务端的实现思路
相对于客户端而言,服务端要简单不少。基本思想就是,创建RPC服务端的时候,创建一个RPC请求队列和一定数量的Handler线程。Handler线程都持有服务端提供服务的Interface的类类型和实际供方法调用的对象(实现了提供服务的Interface),各线程只需要不断从RPC请求队列中取出请求,然后用供方法调用的对象来调用所请求的方法,最后将调用的结果通过Netty发送回客户端即可。
2.RPC服务端的具体实现
(1).RpcRequest
在具体实现RPC服务端之前,先定义RpcRequest类。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 1package com.maigo.rpc.context;
2
3public class RpcRequest
4{
5 int id;
6 String methodName;
7 Object[] args;
8
9 public RpcRequest(int id, String methodName, Object[] args)
10 {
11 this.id = id;
12 this.methodName = methodName;
13 this.args = args;
14 }
15
16 public int getId()
17 {
18 return id;
19 }
20
21 public String getMethodName()
22 {
23 return methodName;
24 }
25
26 public Object[] getArgs()
27 {
28 return args;
29 }
30}
31
RpcRequest表示了一个RPC调用请求。id用于区分多次不同的调用,methodName为请求调用的方法名,args为参数。
(2).RpcServerBuilder
RpcServerBuilder是创建RpcServer的工厂类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76 1package com.maigo.rpc.server;
2
3import com.maigo.rpc.aop.RpcInvokeHook;
4
5public class RpcServerBuilder
6{
7 private Class<?> interfaceClass;
8 private Object serviceProvider;
9
10 private int port;
11 private int threads;
12 private RpcInvokeHook rpcInvokeHook;
13
14 public static RpcServerBuilder create()
15 {
16 return new RpcServerBuilder();
17 }
18
19 /**
20 * set the interface to provide service
21 * @param interfaceClass
22 */
23 public RpcServerBuilder serviceInterface(Class<?> interfaceClass)
24 {
25 this.interfaceClass = interfaceClass;
26 return this;
27 }
28
29 /**
30 * set the real object to provide service
31 */
32 public RpcServerBuilder serviceProvider(Object serviceProvider)
33 {
34 this.serviceProvider = serviceProvider;
35 return this;
36 }
37
38 /**
39 * set the port to bind
40 */
41 public RpcServerBuilder bind(int port)
42 {
43 this.port = port;
44 return this;
45 }
46
47 /**
48 * set the count of threads to handle request from client. (default availableProcessors)
49 */
50 public RpcServerBuilder threads(int threadCount)
51 {
52 this.threads = threadCount;
53 return this;
54 }
55
56 /**
57 * set the hook of the method invoke in server
58 */
59 public RpcServerBuilder hook(RpcInvokeHook rpcInvokeHook)
60 {
61 this.rpcInvokeHook = rpcInvokeHook;
62 return this;
63 }
64
65 public RpcServer build()
66 {
67 if(threads <= 0)
68 threads = Runtime.getRuntime().availableProcessors();
69
70 RpcServer rpcServer = new RpcServer(interfaceClass, serviceProvider, port,
71 threads, rpcInvokeHook);
72
73 return rpcServer;
74 }
75}
76
API都很简单,create()创建工场,
serviceInterface()设置服务接口,
serviceProvider()设置供方法调用的实际对象,
bind()设置绑定的端口号,
threads()设置Handler线程的个数(默认为CPU核数),build()创建出RpcServer对象。
(3).RpcServer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52 1package com.maigo.rpc.server;
2
3import java.util.concurrent.atomic.AtomicInteger;
4
5import com.maigo.rpc.aop.RpcInvokeHook;
6import com.maigo.rpc.context.RpcRequest;
7
8public class RpcServer
9{
10 private Class<?> interfaceClass;
11 private Object serviceProvider;
12
13 private int port;
14 private int threads;
15 private RpcInvokeHook rpcInvokeHook;
16
17 private RpcServerRequestHandler rpcServerRequestHandler;
18
19 protected RpcServer(Class<?> interfaceClass, Object serviceProvider, int port, int threads,
20 RpcInvokeHook rpcInvokeHook)
21 {
22 this.interfaceClass = interfaceClass;
23 this.serviceProvider = serviceProvider;
24 this.port = port;
25 this.threads = threads;
26 this.rpcInvokeHook = rpcInvokeHook;
27
28 rpcServerRequestHandler = new RpcServerRequestHandler(interfaceClass,
29 serviceProvider, threads, rpcInvokeHook);
30 rpcServerRequestHandler.start();
31 }
32
33 public void start()
34 {
35 System.out.println("bind port:"+port + " success!");
36
37 //simulation for receive RpcRequest
38 AtomicInteger idGenerator = new AtomicInteger(0);
39 for(int i=0; i<10; i++)
40 {
41 rpcServerRequestHandler.addRequest(new RpcRequest(idGenerator.addAndGet(1),
42 "testMethod01", new Object[]{"qwerty"}));
43 }
44 }
45
46 public void stop()
47 {
48 //TODO add stop codes here
49 System.out.println("server stop success!");
50 }
51}
52
RpcServer只提供了start()和stop()方法用于启动和停止RPC服务。由于启动和停止要涉及网络部分,现在先用打印输出代替。start()方法中还模拟了收到RpcRequest的情况,用于当前无网络连接的情况下测试。
RpcServer的构造方法中创建了一个
RpcServerRequestHandler,专门用于处理
RpcRequest。
(4).RpcServerRequestHandler
专门用于处理
RpcRequest的类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52 1package com.maigo.rpc.server;
2
3import java.util.concurrent.BlockingQueue;
4import java.util.concurrent.ExecutorService;
5import java.util.concurrent.Executors;
6import java.util.concurrent.LinkedBlockingQueue;
7
8import com.maigo.rpc.aop.RpcInvokeHook;
9import com.maigo.rpc.context.RpcRequest;
10
11public class RpcServerRequestHandler
12{
13 private Class<?> interfaceClass;
14 private Object serviceProvider;
15 private RpcInvokeHook rpcInvokeHook;
16
17 private int threads;
18 private ExecutorService threadPool;
19 private BlockingQueue<RpcRequest> requestQueue = new LinkedBlockingQueue<RpcRequest>();
20
21 public RpcServerRequestHandler(Class<?> interfaceClass, Object serviceProvider, int threads,
22 RpcInvokeHook rpcInvokeHook)
23 {
24 this.interfaceClass = interfaceClass;
25 this.serviceProvider = serviceProvider;
26 this.threads = threads;
27 this.rpcInvokeHook = rpcInvokeHook;
28 }
29
30 public void start()
31 {
32 threadPool = Executors.newFixedThreadPool(threads);
33 for(int i=0; i<threads; i++)
34 {
35 threadPool.execute(new RpcServerRequestHandleRunnable(interfaceClass,
36 serviceProvider, rpcInvokeHook, requestQueue));
37 }
38 }
39
40 public void addRequest(RpcRequest rpcRequest)
41 {
42 try
43 {
44 requestQueue.put(rpcRequest);
45 }
46 catch (InterruptedException e)
47 {
48 e.printStackTrace();
49 }
50 }
51}
52
在
RpcServerRequestHandler的构造方法中,创建了1个大小为threads的线程池,并让其运行了
threads个RpcServerRequestHandleRunnable。每个RpcServerRequestHandleRunnable持有相同的服务接口interfaceClass表示服务端提供哪些服务,相同的服务提供对象serviceProvider供实际方法调用,相同的请求队列requestQueue用于取出收到的方法调用请求。
(5).RpcServerRequestHandleRunnable
方法调用请求的实际执行者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95 1package com.maigo.rpc.server;
2
3import java.lang.reflect.InvocationTargetException;
4import java.lang.reflect.Method;
5import java.util.concurrent.BlockingQueue;
6
7import com.maigo.rpc.aop.RpcInvokeHook;
8import com.maigo.rpc.context.RpcRequest;
9
10public class RpcServerRequestHandleRunnable implements Runnable
11{
12 private Class<?> interfaceClass;
13 private Object serviceProvider;
14 private RpcInvokeHook rpcInvokeHook;
15 private BlockingQueue<RpcRequest> requestQueue;
16
17 public RpcServerRequestHandleRunnable(Class<?> interfaceClass,
18 Object serviceProvider, RpcInvokeHook rpcInvokeHook,
19 BlockingQueue<RpcRequest> requestQueue)
20 {
21 this.interfaceClass = interfaceClass;
22 this.serviceProvider = serviceProvider;
23 this.rpcInvokeHook = rpcInvokeHook;
24 this.requestQueue = requestQueue;
25 }
26
27 public void run()
28 {
29 while(true)
30 {
31 try
32 {
33 RpcRequest rpcRequest = requestQueue.take();
34
35 String methodName = rpcRequest.getMethodName();
36 Object[] args = rpcRequest.getArgs();
37
38 int parameterCount = args.length;
39 Method method = null;
40 if(parameterCount > 0)
41 {
42 Class<?>[] parameterTypes = new Class[args.length];
43 for(int i=0; i<parameterCount; i++)
44 {
45 parameterTypes[i] = args[i].getClass();
46 }
47 method = interfaceClass.getMethod(methodName, parameterTypes);
48 }
49 else
50 {
51 method = interfaceClass.getMethod(methodName);
52 }
53
54 if(rpcInvokeHook != null)
55 rpcInvokeHook.beforeInvoke(methodName, args);
56
57 Object result = method.invoke(serviceProvider, args);
58 System.out.println("Send response id = " + rpcRequest.getId() + " result = " + result
59 + " back to client. " + Thread.currentThread());
60
61 if(rpcInvokeHook != null)
62 rpcInvokeHook.afterInvoke(methodName, args);
63 }
64 catch (InterruptedException e)
65 {
66 e.printStackTrace();
67 }
68 catch (NoSuchMethodException e)
69 {
70 // TODO return NoSuchMethodException to client
71 e.printStackTrace();
72 }
73 catch (SecurityException e)
74 {
75 e.printStackTrace();
76 }
77 catch (IllegalAccessException e)
78 {
79 // TODO return IllegalAccessException to client
80 e.printStackTrace();
81 }
82 catch (IllegalArgumentException e)
83 {
84 // TODO return IllegalArgumentException to client
85 e.printStackTrace();
86 }
87 catch (InvocationTargetException e)
88 {
89 // TODO return Exception to client
90 e.printStackTrace();
91 }
92 }
93 }
94}
95
1
2 1RpcServerRequestHandleRunnable不断地从请求队列requestQueue中取出方法调用请求RpcRequest,用serviceProvider调用请求的方法并向客户端返回调用结果。由于现在还未加入网络部分,
2
向客户端返回结果暂时先用打印输出代替。在方法实际调用的前后,钩子Hook的回调得到了执行。
3.测试
RpcServer中的start()方法模拟了收到10个调用请求的情况
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30 1TestInterface testInterface = new TestInterface()
2{
3 public String testMethod01(String string)
4 {
5 return string.toUpperCase();
6 }
7};
8
9RpcInvokeHook hook = new RpcInvokeHook()
10{
11 public void beforeInvoke(String methodName, Object[] args)
12 {
13 System.out.println("beforeInvoke " + methodName);
14 }
15
16 public void afterInvoke(String methodName, Object[] args)
17 {
18 System.out.println("afterInvoke " + methodName);
19 }
20};
21
22RpcServer rpcServer = RpcServerBuilder.create()
23 .serviceInterface(TestInterface.class)
24 .serviceProvider(testInterface)
25 .threads(4)
26 .hook(hook)
27 .bind(3721)
28 .build();
29rpcServer.start();
30
输出结果为:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32 1bind port:3721 success!
2beforeInvoke testMethod01
3beforeInvoke testMethod01
4beforeInvoke testMethod01
5beforeInvoke testMethod01
6Send response id = 2 result = QWERTY back to client. Thread[pool-1-thread-2,5,main]
7Send response id = 4 result = QWERTY back to client. Thread[pool-1-thread-4,5,main]
8Send response id = 1 result = QWERTY back to client. Thread[pool-1-thread-1,5,main]
9Send response id = 3 result = QWERTY back to client. Thread[pool-1-thread-3,5,main]
10afterInvoke testMethod01
11afterInvoke testMethod01
12afterInvoke testMethod01
13beforeInvoke testMethod01
14beforeInvoke testMethod01
15afterInvoke testMethod01
16Send response id = 5 result = QWERTY back to client. Thread[pool-1-thread-1,5,main]
17Send response id = 6 result = QWERTY back to client. Thread[pool-1-thread-4,5,main]
18beforeInvoke testMethod01
19afterInvoke testMethod01
20beforeInvoke testMethod01
21afterInvoke testMethod01
22beforeInvoke testMethod01
23beforeInvoke testMethod01
24Send response id = 10 result = QWERTY back to client. Thread[pool-1-thread-1,5,main]
25afterInvoke testMethod01
26Send response id = 9 result = QWERTY back to client. Thread[pool-1-thread-4,5,main]
27afterInvoke testMethod01
28Send response id = 7 result = QWERTY back to client. Thread[pool-1-thread-2,5,main]
29afterInvoke testMethod01
30Send response id = 8 result = QWERTY back to client. Thread[pool-1-thread-3,5,main]
31afterInvoke testMethod01
32
1
2 1可见,共有4个Handler线程在工作,并且都正确的调用了被请求的方法,设置的Hook也受到了正确的回调。
2