前提
前置文章:
- Github Page:《基于Netty和SpringBoot实现一个轻量级RPC框架-协议篇》
- Coding Page:《基于Netty和SpringBoot实现一个轻量级RPC框架-协议篇》
在前置的《基于Netty和SpringBoot实现一个轻量级RPC框架-协议篇》一文中已经定义了一个相对简单的RPC私有协议,并且实现了对应的编码和解码模块。这篇文章基于协议篇,完成Server端代码调用的编写。考虑到目前相对主流的IOC容器是Spring,这里选用了spring-boot-starter(非MVC容器,只是单纯管理Bean),依赖JDK1.8+。
思路
首先RPC私有协议定义了Client端会传过来四个和服务调用息息相关的字符:接口全类名interfaceName、方法名methodName、方法参数签名字符串数组methodArgumentSignatures(可选,这个参数不是必须传入的)以及方法参数数组methodArguments(可选,空方法列表的时候不需要传入参数)。主要流程如下:
- 把Server端的所有服务端(实现)类交由IOC容器托管。
- Client端发起RPC请求。
- 通过前面提到的最多四个参数,从Server服务实例的IOC容器中匹配出吻合度最高的一个方法java.lang.reflect.Method实例、该方法实例的宿主类以及宿主类对应的Bean实例,如果这一步匹配的目标方法超过1个或者为0个,可以直接返回异常信息。
- 把前一步得到的Method实例、宿主类Bean实例,结合方法参数数组methodArguments进行反射调用,得到调用结果。
- Server端把响应结果封装到payload通过私有协议发送回Client端。
Server端代码实现
为了暂时方便起见,部分数组入参被重新封装为ArrayList,实际上编写RPC框架的时候应该优先考虑性能问题,像JDK提供的集合类库等等应该尽可能少用(以ArrayList为例,扩容的时候存在底层Object[]拷贝,造成性能损失和额外的内存消耗),极尽可能使用基本类型和数组。
先定义方法匹配器MethodMatcher相关的类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56 1public interface MethodMatcher {
2
3 /**
4 * 查找一个匹配度最高的方法信息
5 *
6 * @param input input
7 * @return output
8 */
9 MethodMatchOutput selectOneBestMatchMethod(MethodMatchInput input);
10}
11
12// 输入值
13@EqualsAndHashCode
14@Data
15public class MethodMatchInput {
16
17 private String interfaceName;
18
19 private String methodName;
20
21 private List<String> methodArgumentSignatures;
22
23 private int methodArgumentArraySize;
24}
25
26// 输出值
27@Data
28public class MethodMatchOutput {
29
30 /**
31 * 目标方法实例
32 */
33 private Method targetMethod;
34
35 /**
36 * 目标实现类 - 这个有可能是被Cglib增强过的类型,是宿主类的子类,如果没有被Cglib增强过,那么它就是宿主类
37 */
38 private Class<?> targetClass;
39
40 /**
41 * 宿主类
42 */
43 private Class<?> targetUserClass;
44
45 /**
46 * 宿主类Bean实例
47 */
48 private Object target;
49
50 /**
51 * 方法参数类型列表
52 */
53 private List<Class<?>> parameterTypes;
54}
55
56
目标方法匹配的逻辑大致如下:
- 方法名称和方法实例的宿主类型一定作为匹配条件的一部分。
- 如果传入了参数签名列表,优先使用参数签名列表类型进行匹配。
- 如果没有传入参数签名列表,那么使用参数的数量进行匹配。
- 如果参数签名列表和参数列表都没有传入,那么只能通过方法名称和方法实例的宿主类型匹配。
- 考虑到方法匹配解析的过程相对耗时,需要把结果缓存起来。
分析至此,可以基于反射,编写一个抽象的方法匹配器BaseMethodMatcher,然后把获取宿主类信息的功能委托到子类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98 1public class MethodMatchException extends RuntimeException {
2
3 public MethodMatchException(String message) {
4 super(message);
5 }
6
7 public MethodMatchException(String message, Throwable cause) {
8 super(message, cause);
9 }
10
11 public MethodMatchException(Throwable cause) {
12 super(cause);
13 }
14}
15
16@Data
17public class HostClassMethodInfo {
18
19 private Class<?> hostClass;
20 private Class<?> hostUserClass;
21 private Object hostTarget;
22}
23
24@Slf4j
25abstract class BaseMethodMatcher implements MethodMatcher {
26
27 private final ConcurrentMap<MethodMatchInput, MethodMatchOutput> cache = Maps.newConcurrentMap();
28
29 @Override
30 public MethodMatchOutput selectOneBestMatchMethod(MethodMatchInput input) {
31 return cache.computeIfAbsent(input, in -> {
32 try {
33 MethodMatchOutput output = new MethodMatchOutput();
34 Class<?> interfaceClass = Class.forName(in.getInterfaceName());
35 // 获取宿主类信息
36 HostClassMethodInfo info = findHostClassMethodInfo(interfaceClass);
37 List<Method> targetMethods = Lists.newArrayList();
38 ReflectionUtils.doWithMethods(info.getHostUserClass(), targetMethods::add, method -> {
39 String methodName = method.getName();
40 Class<?> declaringClass = method.getDeclaringClass();
41 List<Class<?>> inputParameterTypes = Optional.ofNullable(in.getMethodArgumentSignatures())
42 .map(mas -> {
43 List<Class<?>> list = Lists.newArrayList();
44 mas.forEach(ma -> list.add(ClassUtils.resolveClassName(ma, null)));
45 return list;
46 }).orElse(Lists.newArrayList());
47 output.setParameterTypes(inputParameterTypes);
48 // 如果传入了参数签名列表,优先使用参数签名列表类型进行匹配
49 if (!inputParameterTypes.isEmpty()) {
50 List<Class<?>> parameterTypes = Lists.newArrayList(method.getParameterTypes());
51 return Objects.equals(methodName, in.getMethodName()) &&
52 Objects.equals(info.getHostUserClass(), declaringClass) &&
53 Objects.equals(parameterTypes, inputParameterTypes);
54 }
55 // 如果没有传入参数签名列表,那么使用参数的数量进行匹配
56 if (in.getMethodArgumentArraySize() > 0) {
57 List<Class<?>> parameterTypes = Lists.newArrayList(method.getParameterTypes());
58 return Objects.equals(methodName, in.getMethodName()) &&
59 Objects.equals(info.getHostUserClass(), declaringClass) &&
60 in.getMethodArgumentArraySize() == parameterTypes.size();
61
62 }
63 // 如果参数签名列表和参数列表都没有传入,那么只能通过方法名称和方法实例的宿主类型匹配
64 return Objects.equals(methodName, in.getMethodName()) &&
65 Objects.equals(info.getHostUserClass(), declaringClass);
66
67 });
68 if (targetMethods.size() != 1) {
69 throw new MethodMatchException(String.format("查找到目标方法数量不等于1,interface:%s,method:%s",
70 in.getInterfaceName(), in.getMethodName()));
71 }
72 Method targetMethod = targetMethods.get(0);
73 output.setTargetClass(info.getHostClass());
74 output.setTargetMethod(targetMethod);
75 output.setTargetUserClass(info.getHostUserClass());
76 output.setTarget(info.getHostTarget());
77 return output;
78 } catch (Exception e) {
79 log.error("查找匹配度最高的方法失败,输入参数:{}", JSON.toJSONString(in), e);
80 if (e instanceof MethodMatchException) {
81 throw (MethodMatchException) e;
82 } else {
83 throw new MethodMatchException(e);
84 }
85 }
86 });
87 }
88
89 /**
90 * 获取宿主类的信息
91 *
92 * @param interfaceClass interfaceClass
93 * @return HostClassMethodInfo
94 */
95 abstract HostClassMethodInfo findHostClassMethodInfo(Class<?> interfaceClass);
96}
97
98
接着,通过接口类型获取宿主类的功能就委托给Spring实现,从IOC容器中获取,定义SpringMethodMatcher:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 1@Component
2public class SpringMethodMatcher extends BaseMethodMatcher implements BeanFactoryAware {
3
4 private DefaultListableBeanFactory beanFactory;
5
6 @Override
7 public void setBeanFactory(@NonNull BeanFactory beanFactory) throws BeansException {
8 this.beanFactory = (DefaultListableBeanFactory) beanFactory;
9 }
10
11 @Override
12 HostClassMethodInfo findHostClassMethodInfo(Class<?> interfaceClass) {
13 HostClassMethodInfo info = new HostClassMethodInfo();
14 // 从容器中通过接口类型获取对应的实现,实现必须只有一个
15 Object bean = beanFactory.getBean(interfaceClass);
16 info.setHostTarget(bean);
17 info.setHostClass(bean.getClass());
18 info.setHostUserClass(ClassUtils.getUserClass(bean.getClass()));
19 return info;
20 }
21}
22
23
至此,目标方法匹配的模块已经编写完毕,接下来需要处理方法参数列表的反序列化。编写协议的时候,笔者把方法参数列表methodArguments存放在Object数组中,传输的时候序列化为byte数组,经过协议解析之后,方法参数列表的实际类型为ByteBuf数组(这是因为Netty中的字节容器就是ByteBuf),那么需要考虑把ByteBuf数组转换为目标方法的参数类型实例。主要步骤如下:
- 如果方法参数列表为空,那么什么都不用做,也就是调用了无参数的方法。
- 如果方法参数列表不为空同时方法参数类型列表不为空,优先选用方法参数类型列表进行转换。
- 如果方法参数列表不为空同时方法参数类型列表为空,则使用Method#getParameterTypes()得到的方法参数列表类型进行转换。
定义一个方法参数转换器接口MethodArgumentConverter:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32 1public interface MethodArgumentConverter {
2
3 ArgumentConvertOutput convert(ArgumentConvertInput input);
4}
5
6@Data
7public class ArgumentConvertInput {
8
9 /**
10 * 目标方法
11 */
12 private Method method;
13
14 /**
15 * 方法参数类型列表
16 */
17 private List<Class<?>> parameterTypes;
18
19 /**
20 * 方法参数列表
21 */
22 private List<Object> arguments;
23}
24
25@Data
26public class ArgumentConvertOutput {
27
28
29 private Object[] arguments;
30}
31
32
方法参数转换器的默认实现如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49 1@Slf4j
2@Component
3public class DefaultMethodArgumentConverter implements MethodArgumentConverter {
4
5 private final Serializer serializer = FastJsonSerializer.X;
6
7 @Override
8 public ArgumentConvertOutput convert(ArgumentConvertInput input) {
9 ArgumentConvertOutput output = new ArgumentConvertOutput();
10 try {
11 if (null == input.getArguments() || input.getArguments().isEmpty()) {
12 output.setArguments(new Object[0]);
13 return output;
14 }
15 List<Class<?>> inputParameterTypes = input.getParameterTypes();
16 int size = inputParameterTypes.size();
17 if (size > 0) {
18 Object[] arguments = new Object[size];
19 for (int i = 0; i < size; i++) {
20 ByteBuf byteBuf = (ByteBuf) input.getArguments().get(i);
21 int readableBytes = byteBuf.readableBytes();
22 byte[] bytes = new byte[readableBytes];
23 byteBuf.readBytes(bytes);
24 arguments[i] = serializer.decode(bytes, inputParameterTypes.get(i));
25 byteBuf.release();
26 }
27 output.setArguments(arguments);
28 return output;
29 }
30 Class<?>[] parameterTypes = input.getMethod().getParameterTypes();
31 int len = parameterTypes.length;
32 Object[] arguments = new Object[len];
33 for (int i = 0; i < len; i++) {
34 ByteBuf byteBuf = (ByteBuf) input.getArguments().get(i);
35 int readableBytes = byteBuf.readableBytes();
36 byte[] bytes = new byte[readableBytes];
37 byteBuf.readBytes(bytes);
38 arguments[i] = serializer.decode(bytes, parameterTypes[i]);
39 byteBuf.release();
40 }
41 output.setArguments(arguments);
42 return output;
43 } catch (Exception e) {
44 throw new ArgumentConvertException(e);
45 }
46 }
47}
48
49
所有前置工作都完成了,现在编写一个Server端的入站处理器ServerHandler,暂时不做代码逻辑优化,只做实现,把反射调用的模块直接在此类中编写:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50 1@Component
2@Slf4j
3public class ServerHandler extends SimpleChannelInboundHandler<RequestMessagePacket> {
4
5 @Autowired
6 private MethodMatcher methodMatcher;
7
8 @Autowired
9 private MethodArgumentConverter methodArgumentConverter;
10
11 @Override
12 protected void channelRead0(ChannelHandlerContext ctx, RequestMessagePacket packet) throws Exception {
13 log.info("服务端接收到:{}", packet);
14 MethodMatchInput input = new MethodMatchInput();
15 input.setInterfaceName(packet.getInterfaceName());
16 input.setMethodArgumentSignatures(Optional.ofNullable(packet.getMethodArgumentSignatures())
17 .map(Lists::newArrayList).orElse(Lists.newArrayList()));
18 input.setMethodName(packet.getMethodName());
19 Object[] methodArguments = packet.getMethodArguments();
20 input.setMethodArgumentArraySize(null != methodArguments ? methodArguments.length : 0);
21 MethodMatchOutput output = methodMatcher.selectOneBestMatchMethod(input);
22 log.info("查找目标实现方法成功,目标类:{},宿主类:{},宿主方法:{}",
23 output.getTargetClass().getCanonicalName(),
24 output.getTargetUserClass().getCanonicalName(),
25 output.getTargetMethod().getName()
26 );
27 Method targetMethod = output.getTargetMethod();
28 ArgumentConvertInput convertInput = new ArgumentConvertInput();
29 convertInput.setArguments(input.getMethodArgumentArraySize() > 0 ? Lists.newArrayList(methodArguments) : Lists.newArrayList());
30 convertInput.setMethod(output.getTargetMethod());
31 convertInput.setParameterTypes(output.getParameterTypes());
32 ArgumentConvertOutput convertOutput = methodArgumentConverter.convert(convertInput);
33 ReflectionUtils.makeAccessible(targetMethod);
34 // 反射调用
35 Object result = targetMethod.invoke(output.getTarget(), convertOutput.getArguments());
36 ResponseMessagePacket response = new ResponseMessagePacket();
37 response.setMagicNumber(packet.getMagicNumber());
38 response.setVersion(packet.getVersion());
39 response.setSerialNumber(packet.getSerialNumber());
40 response.setAttachments(packet.getAttachments());
41 response.setMessageType(MessageType.RESPONSE);
42 response.setErrorCode(200L);
43 response.setMessage("Success");
44 response.setPayload(JSON.toJSONString(result));
45 log.info("服务端输出:{}", JSON.toJSONString(response));
46 ctx.writeAndFlush(response);
47 }
48}
49
50
编写一个Server的启动类ServerApplication,在Spring容器启动之后,启动Netty服务:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45 1@SpringBootApplication(scanBasePackages = "club.throwable.server")
2@Slf4j
3public class ServerApplication implements CommandLineRunner {
4
5 @Value("${netty.port:9092}")
6 private Integer nettyPort;
7
8 @Autowired
9 private ServerHandler serverHandler;
10
11 public static void main(String[] args) throws Exception {
12 SpringApplication.run(ServerApplication.class, args);
13 }
14
15 @Override
16 public void run(String... args) throws Exception {
17 int port = nettyPort;
18 ServerBootstrap bootstrap = new ServerBootstrap();
19 EventLoopGroup bossGroup = new NioEventLoopGroup();
20 EventLoopGroup workerGroup = new NioEventLoopGroup();
21 try {
22 bootstrap.group(bossGroup, workerGroup)
23 .channel(NioServerSocketChannel.class)
24 .childHandler(new ChannelInitializer<SocketChannel>() {
25
26 @Override
27 protected void initChannel(SocketChannel ch) throws Exception {
28 ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
29 ch.pipeline().addLast(new LengthFieldPrepender(4));
30 ch.pipeline().addLast(new RequestMessagePacketDecoder());
31 ch.pipeline().addLast(new ResponseMessagePacketEncoder(FastJsonSerializer.X));
32 ch.pipeline().addLast(serverHandler);
33 }
34 });
35 ChannelFuture future = bootstrap.bind(port).sync();
36 log.info("启动NettyServer[{}]成功...", port);
37 future.channel().closeFuture().sync();
38 } finally {
39 workerGroup.shutdownGracefully();
40 bossGroup.shutdownGracefully();
41 }
42 }
43}
44
45
最后,编写契约包和契约实现:
1
2
3
4
5
6
7
8
9
10
11
12 1- ch0-custom-rpc-protocol 项目根目录
2 - club.throwable
3 - utils 工具类
4 - protocol 协议
5 - exception 异常
6 - contract 契约
7 - HelloService 契约接口
8 - server 服务端
9 - contract
10 - DefaultHelloService 契约接口实现
11
12
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 1public interface HelloService {
2
3 String sayHello(String name);
4}
5
6// 实现
7@Service
8public class DefaultHelloService implements HelloService {
9
10 @Override
11 public String sayHello(String name) {
12 return String.format("%s say hello!", name);
13 }
14}
15
16
先启动服务端ServerApplication,再启动上一节提到的TestProtocolClient,输出结果:
1
2
3
4
5
6
7
8
9
10
11
12 1// 服务端日志
22020-01-15 00:05:57.898 INFO 14420 --- [ main] club.throwable.server.ServerApplication : 启动NettyServer[9092]成功...
32020-01-15 00:06:05.980 INFO 14420 --- [ntLoopGroup-3-1] club.throwable.server.ServerHandler : 服务端接收到:RequestMessagePacket(interfaceName=club.throwable.contract.HelloService, methodName=sayHello, methodArgumentSignatures=[java.lang.String], methodArguments=[PooledUnsafeDirectByteBuf(ridx: 0, widx: 6, cap: 6/139)])
42020-01-15 00:06:07.448 INFO 14420 --- [ntLoopGroup-3-1] club.throwable.server.ServerHandler : 查找目标实现方法成功,目标类:club.throwable.server.contract.DefaultHelloService,宿主类:club.throwable.server.contract.DefaultHelloService,宿主方法:sayHello
52020-01-15 00:06:07.521 INFO 14420 --- [ntLoopGroup-3-1] club.throwable.server.ServerHandler : 服务端输出:{"attachments":{},"errorCode":200,"magicNumber":10086,"message":"Success","messageType":"RESPONSE","payload":"\"doge say hello!\"","serialNumber":"65f01b8e89bb479b8a36a60bd6519617","version":1}
6
7// 客户端日志
800:06:05.891 [main] INFO club.throwable.protocol.TestProtocolClient - 启动NettyClient[9092]成功...
9...省略...
1000:06:13.197 [nioEventLoopGroup-2-1] INFO club.throwable.protocol.TestProtocolClient - 接收到来自服务端的响应消息,消息内容:{"attachments":{},"errorCode":200,"magicNumber":10086,"message":"Success","messageType":"RESPONSE","payload":"\"doge say hello!\"","serialNumber":"65f01b8e89bb479b8a36a60bd6519617","version":1}
11
12
可见RPC调用成功。
小结
编写RPC的Server端技巧在于处理目标方法和宿主类的查找,在转换方法参数的时候,需要考虑简化处理和提高效率,剩下的就是做好异常处理和模块封装。限于篇幅,后面会先分析Client端的处理,再分析心跳处理、服务端优化、甚至是对接注册中心等等,在Netty、SpringBoot等优秀框架的加持下编写一个RPC框架其实并不困难,困难的是性能优化和生态圈的支持。
Demo项目地址:
- ch0-custom-rpc-protocol
(本文完 c-1-d e-a-20200115)