在后续一段时间里, 我会写一系列文章来讲述如何实现一个RPC框架(我已经实现了一个示例框架, 代码在我的github上)。 这是系列第五篇文章, 主要讲述了服务器端的实现。
在前面的几篇文章里, 我们已经实现了客户端创建proxy bean, 并利用它来发送请求、处理返回的全部流程:
- 扫描package找出需要代理的service
- 通过服务注册中心和Load Balancer获取service地址
- 利用Netty与service建立连接, 并且复用所创建的channel
- 创建request, 用唯一的requestId来标识它, 发送这个请求, 调用future.get()
- 收到response,利用response中附带的requestId找到对应future,让future变成done的状态
这篇文章, 我们会介绍server端的实现。
1.获取server端所实现的接口
1
2
3
4
5
6
7
8
9
10
11 1private List<Class<?>> getServiceInterfaces(ApplicationContext ctx) {
2 Class<? extends Annotation> clazz = RPCService.class;
3 return ctx.getBeansWithAnnotation(clazz)
4 .values().stream()
5 .map(AopUtils::getTargetClass)
6 .map(cls -> Arrays.asList(cls.getInterfaces()))
7 .flatMap(List::stream)
8 .filter(cls -> Objects.nonNull(cls.getAnnotation(clazz)))
9 .collect(Collectors.toList());
10 }
11
利用Spring的ApplicationContext, 获取bean容器中带有RPCService注解的bean。
2.bean与对应的接口名一一对应并保存在map中
1
2
3
4
5
6
7
8
9
10 1private Map<String, Object> handlerMap = new HashMap<>();
2getServiceInterfaces(ctx)
3 .stream()
4 .forEach(interfaceClazz -> {
5 String serviceName = interfaceClazz.getAnnotation(RPCService.class).value().getName();
6 Object serviceBean = ctx.getBean(interfaceClazz);
7 handlerMap.put(serviceName, serviceBean);
8 log.debug("Put handler: {}, {}", serviceName, serviceBean);
9 });
10
handlerMap的作用是, 收到请求时, 可以通过这个map找到该请求所对应的处理对象。
3.启动服务器并注册所实现的service
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48 1private void startServer() {
2 // Get ip and port
3 String[] addressArray = StringUtils.split(serviceAddress, ":");
4 String ip = addressArray[0];
5 int port = Integer.parseInt(addressArray[1]);
6
7 log.debug("Starting server on port: {}", port);
8 EventLoopGroup bossGroup = new NioEventLoopGroup();
9 EventLoopGroup workerGroup = new NioEventLoopGroup();
10 try {
11 ServerBootstrap bootstrap = new ServerBootstrap();
12 bootstrap.group(bossGroup, workerGroup)
13 .channel(NioServerSocketChannel.class)
14 .childHandler(new ChannelInitializer<SocketChannel>() {
15 @Override
16 public void initChannel(SocketChannel channel) throws Exception {
17 ChannelPipeline pipeline = channel.pipeline();
18 pipeline.addLast(new RPCDecoder(RPCRequest.class, new ProtobufSerializer()));
19 pipeline.addLast(new RPCEncoder(RPCResponse.class, new ProtobufSerializer()));
20 pipeline.addLast(new RPCServerHandler(handlerMap));
21 }
22 });
23 bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
24 bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
25
26 ChannelFuture future = bootstrap.bind(ip, port).sync();
27 log.info("Server started");
28
29 registerServices();
30
31 future.channel().closeFuture().sync();
32 } catch (InterruptedException e) {
33 throw new RuntimeException("Server shutdown!", e);
34 } finally {
35 workerGroup.shutdownGracefully();
36 bossGroup.shutdownGracefully();
37 }
38 }
39
40private void registerServices() {
41 if (serviceRegistry != null && serviceAddress != null) {
42 for (String interfaceName : handlerMap.keySet()) {
43 serviceRegistry.register(interfaceName, serviceAddress.toString());
44 log.info("Registering service: {} with address: {}", interfaceName, serviceAddress);
45 }
46 }
47 }
48
这里几个地方需要说明一下:
- 这里的ip和port我直接用了配置文件中传入的, 优化的方案应该是获取本地ip以及找到一个可用端口
- 利用Netty创建server, 在pipeline中加入RPCServerHandler, 这个handler将在下文给出
- 向服务注册中心注册实现的所有服务
4.RPCServerHandler的实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48 1@AllArgsConstructor
2public class RPCServerHandler extends SimpleChannelInboundHandler<RPCRequest> {
3
4 private Map<String, Object> handlerMap;
5
6 @Override
7 public void channelRead0(final ChannelHandlerContext ctx, RPCRequest request) throws Exception {
8 log.debug("Get request: {}", request);
9 RPCResponse response = new RPCResponse();
10 response.setRequestId(request.getRequestId());
11 try {
12 Object result = handleRequest(request);
13 response.setResult(result);
14 } catch (Exception e) {
15 log.warn("Get exception when hanlding request, exception: {}", e);
16 response.setException(e);
17 }
18 ctx.writeAndFlush(response).addListener(
19 (ChannelFutureListener) channelFuture -> {
20 log.debug("Sent response for request: {}", request.getRequestId());
21 });
22 }
23
24 private Object handleRequest(RPCRequest request) throws Exception {
25 // Get service bean
26 String serviceName = request.getInterfaceName();
27 Object serviceBean = handlerMap.get(serviceName);
28 if (serviceBean == null) {
29 throw new RuntimeException(String.format("No service bean available: %s", serviceName));
30 }
31
32 // Invoke by reflect
33 Class<?> serviceClass = serviceBean.getClass();
34 String methodName = request.getMethodName();
35 Class<?>[] parameterTypes = request.getParameterTypes();
36 Object[] parameters = request.getParameters();
37 Method method = serviceClass.getMethod(methodName, parameterTypes);
38 method.setAccessible(true);
39 return method.invoke(serviceBean, parameters);
40 }
41
42 @Override
43 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
44 log.error("server caught exception", cause);
45 ctx.close();
46 }
47}
48
这个实现相当简单,收到请求之后, 根据servicename,找到对应的handler,再利用反射进行方法调用。
就这样, 一个简单的RPCServer就实现了。 完整代码请看我的github。