基于Netty的RPC简单框架实现(二):RPC服务端

释放双眼,带上耳机,听听看~!

1.RPC服务端的实现思路

相对于客户端而言,服务端要简单不少。基本思想就是,创建RPC服务端的时候,创建一个RPC请求队列和一定数量的Handler线程。Handler线程都持有服务端提供服务的Interface的类类型和实际供方法调用的对象(实现了提供服务的Interface),各线程只需要不断从RPC请求队列中取出请求,然后用供方法调用的对象来调用所请求的方法,最后将调用的结果通过Netty发送回客户端即可。

2.RPC服务端的具体实现

(1).RpcRequest

在具体实现RPC服务端之前,先定义RpcRequest类。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
1package com.maigo.rpc.context;
2
3public class RpcRequest
4{
5   int id;
6   String methodName;
7   Object[] args;
8  
9   public RpcRequest(int id, String methodName, Object[] args)
10  {
11      this.id = id;
12      this.methodName = methodName;
13      this.args = args;
14  }
15
16  public int getId()
17  {
18      return id;
19  }
20
21  public String getMethodName()
22  {
23      return methodName;
24  }
25
26  public Object[] getArgs()
27  {
28      return args;
29  }
30}
31

RpcRequest表示了一个RPC调用请求。id用于区分多次不同的调用,methodName为请求调用的方法名,args为参数。

(2).RpcServerBuilder

RpcServerBuilder是创建RpcServer的工厂类


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
1package com.maigo.rpc.server;
2
3import com.maigo.rpc.aop.RpcInvokeHook;
4
5public class RpcServerBuilder
6{
7   private Class<?> interfaceClass;
8   private Object serviceProvider;
9  
10  private int port;  
11  private int threads;
12  private RpcInvokeHook rpcInvokeHook;
13 
14  public static RpcServerBuilder create()
15  {
16      return new RpcServerBuilder();
17  }
18 
19  /**
20   * set the interface to provide service
21   * @param interfaceClass
22   */
23  public RpcServerBuilder serviceInterface(Class<?> interfaceClass)
24  {
25      this.interfaceClass = interfaceClass;
26      return this;
27  }
28 
29  /**
30   * set the real object to provide service
31   */
32  public RpcServerBuilder serviceProvider(Object serviceProvider)
33  {
34      this.serviceProvider = serviceProvider;
35      return this;
36  }
37 
38  /**
39   * set the port to bind
40   */
41  public RpcServerBuilder bind(int port)
42  {
43      this.port = port;
44      return this;
45  }
46 
47  /**
48   * set the count of threads to handle request from client. (default availableProcessors)
49   */
50  public RpcServerBuilder threads(int threadCount)
51  {
52      this.threads = threadCount;
53      return this;
54  }
55 
56  /**
57   * set the hook of the method invoke in server
58   */
59  public RpcServerBuilder hook(RpcInvokeHook rpcInvokeHook)
60  {
61      this.rpcInvokeHook = rpcInvokeHook;
62      return this;
63  }
64 
65  public RpcServer build()
66  {
67      if(threads <= 0)
68          threads = Runtime.getRuntime().availableProcessors();
69             
70      RpcServer rpcServer = new RpcServer(interfaceClass, serviceProvider, port,
71              threads, rpcInvokeHook);
72     
73      return rpcServer;
74  }
75}
76

API都很简单,create()创建工场,
serviceInterface()设置服务接口,
serviceProvider()设置供方法调用的实际对象,
bind()设置绑定的端口号,
threads()设置Handler线程的个数(默认为CPU核数),build()创建出RpcServer对象。

(3).RpcServer



1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
1package com.maigo.rpc.server;
2
3import java.util.concurrent.atomic.AtomicInteger;
4
5import com.maigo.rpc.aop.RpcInvokeHook;
6import com.maigo.rpc.context.RpcRequest;
7
8public class RpcServer
9{
10  private Class<?> interfaceClass;
11  private Object serviceProvider;
12 
13  private int port;  
14  private int threads;
15  private RpcInvokeHook rpcInvokeHook;
16
17  private RpcServerRequestHandler rpcServerRequestHandler;
18 
19  protected RpcServer(Class<?> interfaceClass, Object serviceProvider, int port, int threads,
20          RpcInvokeHook rpcInvokeHook)
21  {  
22      this.interfaceClass = interfaceClass;
23      this.serviceProvider = serviceProvider;
24      this.port = port;
25      this.threads = threads;
26      this.rpcInvokeHook = rpcInvokeHook;
27     
28      rpcServerRequestHandler = new RpcServerRequestHandler(interfaceClass,
29              serviceProvider, threads, rpcInvokeHook);
30      rpcServerRequestHandler.start();
31  }  
32 
33  public void start()
34  {
35      System.out.println("bind port:"+port + " success!");
36     
37      //simulation for receive RpcRequest
38      AtomicInteger idGenerator = new AtomicInteger(0);
39      for(int i=0; i<10; i++)
40      {
41          rpcServerRequestHandler.addRequest(new RpcRequest(idGenerator.addAndGet(1),
42                  "testMethod01", new Object[]{"qwerty"}));
43      }
44  }
45 
46  public void stop()
47  {
48      //TODO add stop codes here
49      System.out.println("server stop success!");
50  }
51}
52

RpcServer只提供了start()和stop()方法用于启动和停止RPC服务。由于启动和停止要涉及网络部分,现在先用打印输出代替。start()方法中还模拟了收到RpcRequest的情况,用于当前无网络连接的情况下测试。
RpcServer的构造方法中创建了一个
RpcServerRequestHandler,专门用于处理
RpcRequest。

(4).RpcServerRequestHandler

专门用于处理
RpcRequest的类


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
1package com.maigo.rpc.server;
2
3import java.util.concurrent.BlockingQueue;
4import java.util.concurrent.ExecutorService;
5import java.util.concurrent.Executors;
6import java.util.concurrent.LinkedBlockingQueue;
7
8import com.maigo.rpc.aop.RpcInvokeHook;
9import com.maigo.rpc.context.RpcRequest;
10
11public class RpcServerRequestHandler
12{
13  private Class<?> interfaceClass;
14  private Object serviceProvider;
15  private RpcInvokeHook rpcInvokeHook;
16 
17  private int threads;
18  private ExecutorService threadPool;
19  private BlockingQueue<RpcRequest> requestQueue = new LinkedBlockingQueue<RpcRequest>();
20 
21  public RpcServerRequestHandler(Class<?> interfaceClass,   Object serviceProvider, int threads,
22          RpcInvokeHook rpcInvokeHook)
23  {
24      this.interfaceClass = interfaceClass;
25      this.serviceProvider = serviceProvider;
26      this.threads = threads;
27      this.rpcInvokeHook = rpcInvokeHook;
28  }
29
30  public void start()
31  {
32      threadPool = Executors.newFixedThreadPool(threads);
33      for(int i=0; i<threads; i++)
34      {
35          threadPool.execute(new RpcServerRequestHandleRunnable(interfaceClass,
36                  serviceProvider, rpcInvokeHook, requestQueue));
37      }
38  }
39 
40  public void addRequest(RpcRequest rpcRequest)
41  {
42      try
43      {
44          requestQueue.put(rpcRequest);
45      }
46      catch (InterruptedException e)
47      {
48          e.printStackTrace();
49      }
50  }
51}
52


RpcServerRequestHandler的构造方法中,创建了1个大小为threads的线程池,并让其运行了
threads个RpcServerRequestHandleRunnable。每个RpcServerRequestHandleRunnable持有相同的服务接口interfaceClass表示服务端提供哪些服务,相同的服务提供对象serviceProvider供实际方法调用,相同的请求队列requestQueue用于取出收到的方法调用请求。

(5).RpcServerRequestHandleRunnable

方法调用请求的实际执行者


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
1package com.maigo.rpc.server;
2
3import java.lang.reflect.InvocationTargetException;
4import java.lang.reflect.Method;
5import java.util.concurrent.BlockingQueue;
6
7import com.maigo.rpc.aop.RpcInvokeHook;
8import com.maigo.rpc.context.RpcRequest;
9
10public class RpcServerRequestHandleRunnable implements Runnable
11{
12  private Class<?> interfaceClass;
13  private Object serviceProvider;
14  private RpcInvokeHook rpcInvokeHook;
15  private BlockingQueue<RpcRequest> requestQueue;
16     
17  public RpcServerRequestHandleRunnable(Class<?> interfaceClass,
18          Object serviceProvider, RpcInvokeHook rpcInvokeHook,
19          BlockingQueue<RpcRequest> requestQueue)
20  {
21      this.interfaceClass = interfaceClass;
22      this.serviceProvider = serviceProvider;
23      this.rpcInvokeHook = rpcInvokeHook;
24      this.requestQueue = requestQueue;
25  }
26
27  public void run()
28  {  
29      while(true)
30      {
31          try
32          {
33              RpcRequest rpcRequest = requestQueue.take();
34             
35              String methodName = rpcRequest.getMethodName();
36              Object[] args = rpcRequest.getArgs();
37                             
38              int parameterCount = args.length;
39              Method method = null;
40              if(parameterCount > 0)
41              {
42                  Class<?>[] parameterTypes = new Class[args.length];
43                  for(int i=0; i<parameterCount; i++)
44                  {
45                      parameterTypes[i] = args[i].getClass();
46                  }
47                  method = interfaceClass.getMethod(methodName, parameterTypes);
48              }
49              else
50              {
51                  method = interfaceClass.getMethod(methodName);
52              }
53                 
54              if(rpcInvokeHook != null)
55                  rpcInvokeHook.beforeInvoke(methodName, args);
56             
57              Object result = method.invoke(serviceProvider, args);
58              System.out.println("Send response id = " + rpcRequest.getId() + " result = " + result
59                      + " back to client. " + Thread.currentThread());
60             
61              if(rpcInvokeHook != null)
62                  rpcInvokeHook.afterInvoke(methodName, args);
63          }
64          catch (InterruptedException e)
65          {
66              e.printStackTrace();
67          }
68          catch (NoSuchMethodException e)
69          {
70              // TODO return NoSuchMethodException to client 
71              e.printStackTrace();
72          }
73          catch (SecurityException e)
74          {
75              e.printStackTrace();
76          }
77          catch (IllegalAccessException e)
78          {
79              // TODO return IllegalAccessException to client
80              e.printStackTrace();
81          }
82          catch (IllegalArgumentException e)
83          {
84              // TODO return IllegalArgumentException to client  
85              e.printStackTrace();
86          }
87          catch (InvocationTargetException e)
88          {
89              // TODO return Exception to client 
90              e.printStackTrace();
91          }
92      }
93  }
94}
95

1
2
1RpcServerRequestHandleRunnable不断地从请求队列requestQueue中取出方法调用请求RpcRequest,用serviceProvider调用请求的方法并向客户端返回调用结果。由于现在还未加入网络部分,
2

向客户端返回结果暂时先用打印输出代替。在方法实际调用的前后,钩子Hook的回调得到了执行。

3.测试

RpcServer中的start()方法模拟了收到10个调用请求的情况


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
1TestInterface testInterface = new TestInterface()
2{         
3   public String testMethod01(String string)
4   {
5       return string.toUpperCase();
6   }
7};
8  
9RpcInvokeHook hook = new RpcInvokeHook()
10{        
11  public void beforeInvoke(String methodName, Object[] args)
12  {
13      System.out.println("beforeInvoke " + methodName);
14  }
15 
16  public void afterInvoke(String methodName, Object[] args)
17  {
18      System.out.println("afterInvoke " + methodName);
19  }
20};
21     
22RpcServer rpcServer = RpcServerBuilder.create()
23                                      .serviceInterface(TestInterface.class)
24                                      .serviceProvider(testInterface)
25                                      .threads(4)
26                                      .hook(hook)
27                                      .bind(3721)
28                                      .build();
29rpcServer.start();
30

输出结果为:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
1bind port:3721 success!
2beforeInvoke testMethod01
3beforeInvoke testMethod01
4beforeInvoke testMethod01
5beforeInvoke testMethod01
6Send response id = 2 result = QWERTY back to client. Thread[pool-1-thread-2,5,main]
7Send response id = 4 result = QWERTY back to client. Thread[pool-1-thread-4,5,main]
8Send response id = 1 result = QWERTY back to client. Thread[pool-1-thread-1,5,main]
9Send response id = 3 result = QWERTY back to client. Thread[pool-1-thread-3,5,main]
10afterInvoke testMethod01
11afterInvoke testMethod01
12afterInvoke testMethod01
13beforeInvoke testMethod01
14beforeInvoke testMethod01
15afterInvoke testMethod01
16Send response id = 5 result = QWERTY back to client. Thread[pool-1-thread-1,5,main]
17Send response id = 6 result = QWERTY back to client. Thread[pool-1-thread-4,5,main]
18beforeInvoke testMethod01
19afterInvoke testMethod01
20beforeInvoke testMethod01
21afterInvoke testMethod01
22beforeInvoke testMethod01
23beforeInvoke testMethod01
24Send response id = 10 result = QWERTY back to client. Thread[pool-1-thread-1,5,main]
25afterInvoke testMethod01
26Send response id = 9 result = QWERTY back to client. Thread[pool-1-thread-4,5,main]
27afterInvoke testMethod01
28Send response id = 7 result = QWERTY back to client. Thread[pool-1-thread-2,5,main]
29afterInvoke testMethod01
30Send response id = 8 result = QWERTY back to client. Thread[pool-1-thread-3,5,main]
31afterInvoke testMethod01
32

1
2
1可见,共有4个Handler线程在工作,并且都正确的调用了被请求的方法,设置的Hook也受到了正确的回调。  
2

给TA打赏
共{{data.count}}人
人已打赏
安全经验

Docker安装gitlab

2021-10-11 16:36:11

安全经验

安全咨询服务

2022-1-12 14:11:49

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索