如何写一个RPC框架(五):服务器端实现

释放双眼,带上耳机,听听看~!

在后续一段时间里, 我会写一系列文章来讲述如何实现一个RPC框架(我已经实现了一个示例框架, 代码在我的github上)。 这是系列第五篇文章, 主要讲述了服务器端的实现。

在前面的几篇文章里, 我们已经实现了客户端创建proxy bean, 并利用它来发送请求、处理返回的全部流程:

  1. 扫描package找出需要代理的service
  2. 通过服务注册中心和Load Balancer获取service地址
  3. 利用Netty与service建立连接, 并且复用所创建的channel
  4. 创建request, 用唯一的requestId来标识它, 发送这个请求, 调用future.get()
  5. 收到response,利用response中附带的requestId找到对应future,让future变成done的状态

这篇文章, 我们会介绍server端的实现。

1.获取server端所实现的接口


1
2
3
4
5
6
7
8
9
10
11
1private List<Class<?>> getServiceInterfaces(ApplicationContext ctx) {
2        Class<? extends Annotation> clazz = RPCService.class;
3        return ctx.getBeansWithAnnotation(clazz)
4                .values().stream()
5                .map(AopUtils::getTargetClass)
6                .map(cls -> Arrays.asList(cls.getInterfaces()))
7                .flatMap(List::stream)
8                .filter(cls -> Objects.nonNull(cls.getAnnotation(clazz)))
9                .collect(Collectors.toList());
10    }
11

利用Spring的ApplicationContext, 获取bean容器中带有RPCService注解的bean。

2.bean与对应的接口名一一对应并保存在map中


1
2
3
4
5
6
7
8
9
10
1private Map<String, Object> handlerMap = new HashMap<>();
2getServiceInterfaces(ctx)
3                .stream()
4                .forEach(interfaceClazz -> {
5                    String serviceName = interfaceClazz.getAnnotation(RPCService.class).value().getName();
6                    Object serviceBean = ctx.getBean(interfaceClazz);
7                    handlerMap.put(serviceName, serviceBean);
8                    log.debug("Put handler: {}, {}", serviceName, serviceBean);
9                });
10

handlerMap的作用是, 收到请求时, 可以通过这个map找到该请求所对应的处理对象。

3.启动服务器并注册所实现的service


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
1private void startServer() {
2        // Get ip and port
3        String[] addressArray = StringUtils.split(serviceAddress, ":");
4        String ip = addressArray[0];
5        int port = Integer.parseInt(addressArray[1]);
6
7        log.debug("Starting server on port: {}", port);
8        EventLoopGroup bossGroup = new NioEventLoopGroup();
9        EventLoopGroup workerGroup = new NioEventLoopGroup();
10        try {
11            ServerBootstrap bootstrap = new ServerBootstrap();
12            bootstrap.group(bossGroup, workerGroup)
13                    .channel(NioServerSocketChannel.class)
14                    .childHandler(new ChannelInitializer<SocketChannel>() {
15                        @Override
16                        public void initChannel(SocketChannel channel) throws Exception {
17                            ChannelPipeline pipeline = channel.pipeline();
18                            pipeline.addLast(new RPCDecoder(RPCRequest.class, new ProtobufSerializer()));
19                            pipeline.addLast(new RPCEncoder(RPCResponse.class, new ProtobufSerializer()));
20                            pipeline.addLast(new RPCServerHandler(handlerMap));
21                        }
22                    });
23            bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
24            bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
25
26            ChannelFuture future = bootstrap.bind(ip, port).sync();
27            log.info("Server started");
28
29            registerServices();
30
31            future.channel().closeFuture().sync();
32        } catch (InterruptedException e) {
33            throw new RuntimeException("Server shutdown!", e);
34        } finally {
35            workerGroup.shutdownGracefully();
36            bossGroup.shutdownGracefully();
37        }
38    }
39
40private void registerServices() {
41        if (serviceRegistry != null && serviceAddress != null) {
42            for (String interfaceName : handlerMap.keySet()) {
43                serviceRegistry.register(interfaceName, serviceAddress.toString());
44                log.info("Registering service: {} with address: {}", interfaceName, serviceAddress);
45            }
46        }
47    }
48

这里几个地方需要说明一下:

  1. 这里的ip和port我直接用了配置文件中传入的, 优化的方案应该是获取本地ip以及找到一个可用端口
  2. 利用Netty创建server, 在pipeline中加入RPCServerHandler, 这个handler将在下文给出
  3. 向服务注册中心注册实现的所有服务

4.RPCServerHandler的实现


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
1@AllArgsConstructor
2public class RPCServerHandler extends SimpleChannelInboundHandler<RPCRequest> {
3
4    private Map<String, Object> handlerMap;
5
6    @Override
7    public void channelRead0(final ChannelHandlerContext ctx, RPCRequest request) throws Exception {
8        log.debug("Get request: {}", request);
9        RPCResponse response = new RPCResponse();
10        response.setRequestId(request.getRequestId());
11        try {
12            Object result = handleRequest(request);
13            response.setResult(result);
14        } catch (Exception e) {
15            log.warn("Get exception when hanlding request, exception: {}", e);
16            response.setException(e);
17        }
18        ctx.writeAndFlush(response).addListener(
19                (ChannelFutureListener) channelFuture -> {
20                    log.debug("Sent response for request: {}", request.getRequestId());
21                });
22    }
23
24    private Object handleRequest(RPCRequest request) throws Exception {
25        // Get service bean
26        String serviceName = request.getInterfaceName();
27        Object serviceBean = handlerMap.get(serviceName);
28        if (serviceBean == null) {
29            throw new RuntimeException(String.format("No service bean available: %s", serviceName));
30        }
31
32        // Invoke by reflect
33        Class<?> serviceClass = serviceBean.getClass();
34        String methodName = request.getMethodName();
35        Class<?>[] parameterTypes = request.getParameterTypes();
36        Object[] parameters = request.getParameters();
37        Method method = serviceClass.getMethod(methodName, parameterTypes);
38        method.setAccessible(true);
39        return method.invoke(serviceBean, parameters);
40    }
41
42    @Override
43    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
44        log.error("server caught exception", cause);
45        ctx.close();
46    }
47}
48

这个实现相当简单,收到请求之后, 根据servicename,找到对应的handler,再利用反射进行方法调用。

就这样, 一个简单的RPCServer就实现了。 完整代码请看我的github。

给TA打赏
共{{data.count}}人
人已打赏
安全经验

Google Adsense优化心得

2021-10-11 16:36:11

安全经验

安全咨询服务

2022-1-12 14:11:49

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索