【RPC】一步一步实现基于netty+zookeeper的RPC框架(二)

释放双眼,带上耳机,听听看~!

上一篇实现了服务注册发现和基本的字符串通信功能,这一篇则是实现我们平常使用RPC框架的使用类来调用的功能。

实现consumer端通过接口类来调用远程服务,主要核心在于使用动态代理和反射,这里就一步一步来实现。

这里贴出github代码地址,想直接看代码的可以直接下载运行:https://github.com/whiteBX/wrpc

首先来看consumer端代码,RPCConsumer完整代码如下:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
1public class RPCConsumer {
2
3    /**
4     * url处理器
5     */
6    private UrlHolder   urlHolder   = new UrlHolder();
7    /**
8     * netty客户端
9     */
10    private NettyClient nettyClient = new NettyClient();
11
12    /**
13     * 远程调用
14     *
15     * @param appCode
16     * @param param
17     * @return
18     */
19    public String call(String appCode, String param) {
20        try {
21            // 从zookeeper获取服务地址
22            String serverIp = urlHolder.getUrl(appCode);
23            if (serverIp == null) {
24                System.out.println("远程调用错误:当前无服务提供者");
25                return "connect error";
26            }
27            // 连接netty,请求并接收响应
28            RpcClientNettyHandler clientHandler = new RpcClientNettyHandler();
29            clientHandler.setParam(param);
30            nettyClient.initClient(serverIp, clientHandler);
31            String result = clientHandler.process();
32            System.out.println(MessageFormat.format("调用服务器:{0},请求参数:{1},响应参数:{2}", serverIp, param, result));
33            return result;
34        } catch (Exception e) {
35            System.out.println("远程服务调用失败:" + e);
36            return "error";
37        }
38    }
39
40    /**
41     * 获取代理类
42     * @param clazz
43     * @param appCode
44     * @return
45     */
46    public <T> T getBean(final Class<T> clazz, final String appCode) {
47        return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class[] { clazz }, new InvocationHandler() {
48            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
49                String param = JSON.toJSONString(args[0]);
50                String beanMessage = MessageFormat.format(ConsumerConstant.BEAN_STRING, clazz.getName(),
51                    method.getName(), method.getParameterTypes()[0].getName());
52                return JSON.parseObject(call(appCode, beanMessage.concat(param)), method.getReturnType());
53            }
54        });
55    }
56}
57
58

这里getBean方法主要是通过动态代理来获取代理类,传入的就是我们自己的Service的接口类,其中绑定了InvocationHandler来在调用接口类的方法时执行对应的远程调用操作,包括远程调用包装和返回值解析.

接下来看provider相关类,在RPCProvider类中新增服务注册方法,将我们自己的Service注册进系统缓存:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
1public class RPCProvider {
2    /**
3     * netty客户端
4     */
5    private static NettyClient nettyClient = new NettyClient();
6    /**
7     * zookeeper客户端
8     */
9    private static ZKClient    zkClient    = new ZKClient();
10
11    public void registry(String server, int port) {
12        // 开启netty监听客户端连接
13        nettyClient.startServer(port);
14        // 创建zk连接并创建临时节点
15        ZooKeeper zooKeeper = zkClient.newConnection(ProviderConstant.ZK_CONNECTION_STRING,
16            ProviderConstant.ZK_SESSION_TIME_OUT);
17        String serverIp = server + CommonConstant.COMMOA + port;
18        zkClient.createEphemeralNode(zooKeeper, ProviderConstant.APP_CODE, serverIp.getBytes());
19    }
20
21    /**
22     * 注册服务提供者
23     * @param clazz
24     * @param obj
25     */
26    public void provide(Class<?> clazz, Object obj) {
27        ProviderBeanHolder.regist(clazz.getName(), obj);
28    }
29}
30
31

其中的ProviderBeanHolder类代码如下,主要负责缓存服务注册信息:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
1 public class ProviderBeanHolder {
2
3    /**
4     * bean注册缓存
5     */
6    private static Map<String, Object> providerList = new HashMap<String, Object>();
7
8    /**
9     * 注册
10     * @param clazzName
11     * @param obj
12     */
13    public static void regist(String clazzName, Object obj) {
14        providerList.put(clazzName, obj);
15        System.out.println("注册provider:" + clazzName);
16    }
17
18    /**
19     * 获取
20     * @param clazzName
21     * @return
22     */
23    public static Object getBean(String clazzName) {
24        return providerList.get(clazzName);
25    }
26}
27
28

接下来来看RpcServerNettyHandler类,这个类负责接收客户端请求并处理,这里通过反射来调用服务注册的方法。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
1public class RpcServerNettyHandler extends ChannelInboundHandlerAdapter {
2
3    @Override
4    public void channelRead(ChannelHandlerContext ctx, Object msg) {
5        System.out.println("服务端收到请求:" + msg);
6        try {
7            // 解析出 类名+方法名+请求参数类型(方法签名)
8            String[] splitParam = msg.toString().split(ProviderConstant.DOLLAR_SPLIT);
9            String[] beanMessage = splitParam[0].split(ProviderConstant.SHARP_SPLIT);
10            // 获取注册的服务
11            Object object = ProviderBeanHolder.getBean(beanMessage[0]);
12            if (object == null) {
13                System.out.println("服务类未注册:" + beanMessage[0]);
14            }
15            // 通过反射调用服务
16            Class paramType = Class.forName(beanMessage[2]);
17            Method method = object.getClass().getDeclaredMethod(beanMessage[1], paramType);
18            Object response = method.invoke(object, JSON.parseObject(splitParam[1], paramType));
19            // 请求响应
20            ctx.writeAndFlush(JSON.toJSONString(response));
21        } catch (Exception e) {
22            System.out.println("服务异常");
23        }
24    }
25}
26
27

这里主要就是用到了反射相关的知识,实现了服务的调用和响应,到这里相关的代码就实现了,下面来写个service测试一下:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
1public class HelloRequest {
2
3    private int    seq;
4
5    private String content;
6
7    // 省略getter setter
8}
9public class HelloResponse {
10
11    private int code;
12
13    private String message;
14
15    // 省略getter setter
16}
17public interface HelloService {
18
19    HelloResponse hello(HelloRequest request);
20}
21public class HelloServiceImpl implements HelloService {
22
23    public HelloResponse hello(HelloRequest request) {
24        System.out.println("HelloService收到请求,序列号:" + request.getSeq());
25        HelloResponse response = new HelloResponse();
26        response.setCode(200);
27        response.setMessage("success:" + request.getSeq());
28        return response;
29    }
30}
31
32

测试代码如下:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
1    // ProviderTest相关代码
2    public static void main(String[] args) throws InterruptedException {
3        RPCProvider provider = new RPCProvider();
4        provider.registry("127.0.0.1", 8091);
5        provider.registry("127.0.0.1", 8092);
6        provider.registry("127.0.0.1", 8093);
7        provider.registry("127.0.0.1", 8094);
8        provider.registry("127.0.0.1", 8095);
9        provider.provide(HelloService.class, new HelloServiceImpl());
10
11        Thread.sleep(Long.MAX_VALUE);
12    }
13    // ConsumerTest相关代码
14    public static void main(String[] args) {
15        RPCConsumer consumer = new RPCConsumer();
16        HelloService helloService = consumer.getBean(HelloService.class, ConsumerConstant.APP_CODE);
17        int i = 0;
18        while (true) {
19            HelloRequest request = new HelloRequest();
20            request.setSeq(i++);
21            HelloResponse helloResponse = helloService.hello(request);
22            System.out.println("客户端收到响应:" + JSON.toJSONString(helloResponse));
23            Thread.sleep(Long.MAX_VALUE);
24        }
25    }
26
27

执行结果如下:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
1// 服务端日志
2zookeeper连接成功
3临时节点创建成功:/registry/100000/0000000025
4zookeeper连接成功
5临时节点创建成功:/registry/100000/0000000026
6zookeeper连接成功
7临时节点创建成功:/registry/100000/0000000027
8zookeeper连接成功
9临时节点创建成功:/registry/100000/0000000028
10zookeeper连接成功
11临时节点创建成功:/registry/100000/0000000029
12注册provider:org.white.wrpc.hello.service.HelloService
13服务端收到请求:org.white.wrpc.hello.service.HelloService#hello#org.white.wrpc.hello.model.request.HelloRequest${"seq":0}
14服务端收到请求,序列号:0
15服务端收到请求:org.white.wrpc.hello.service.HelloService#hello#org.white.wrpc.hello.model.request.HelloRequest${"seq":1}
16服务端收到请求,序列号:1
17服务端收到请求:org.white.wrpc.hello.service.HelloService#hello#org.white.wrpc.hello.model.request.HelloRequest${"seq":2}
18服务端收到请求,序列号:2
19
20// 客户端日志
21zookeeper连接成功
22调用服务器:127.0.0.1,8094,请求参数:org.white.wrpc.hello.service.HelloService#hello#org.white.wrpc.hello.model.request.HelloRequest${"seq":0},响应参数:{"code":200,"message":"success:0"}
23客户端收到响应:{"code":200,"message":"success:0"}
24调用服务器:127.0.0.1,8094,请求参数:org.white.wrpc.hello.service.HelloService#hello#org.white.wrpc.hello.model.request.HelloRequest${"seq":1},响应参数:{"code":200,"message":"success:1"}
25客户端收到响应:{"code":200,"message":"success:1"}
26调用服务器:127.0.0.1,8092,请求参数:org.white.wrpc.hello.service.HelloService#hello#org.white.wrpc.hello.model.request.HelloRequest${"seq":2},响应参数:{"code":200,"message":"success:2"}
27客户端收到响应:{"code":200,"message":"success:2"}
28
29

此时修改Provider类里的端口,重新启动4个服务,可以看到客户端有如下日志:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
1zookeeper连接成功
2zookeeper连接成功
3zookeeper连接成功
4zookeeper连接成功
5调用服务器:127.0.0.1,8095,请求参数:org.white.wrpc.hello.service.HelloService#hello#org.white.wrpc.hello.model.request.HelloRequest${"seq":29},响应参数:{"code":200,"message":"success:29"}
6客户端收到响应:{"code":200,"message":"success:29"}
7调用服务器:127.0.0.1,8091,请求参数:org.white.wrpc.hello.service.HelloService#hello#org.white.wrpc.hello.model.request.HelloRequest${"seq":30},响应参数:{"code":200,"message":"success:30"}
8客户端收到响应:{"code":200,"message":"success:30"}
9调用服务器:127.0.0.1,8097,请求参数:org.white.wrpc.hello.service.HelloService#hello#org.white.wrpc.hello.model.request.HelloRequest${"seq":31},响应参数:{"code":200,"message":"success:31"}
10客户端收到响应:{"code":200,"message":"success:31"}
11调用服务器:127.0.0.1,8092,请求参数:org.white.wrpc.hello.service.HelloService#hello#org.white.wrpc.hello.model.request.HelloRequest${"seq":32},响应参数:{"code":200,"message":"success:32"}
12客户端收到响应:{"code":200,"message":"success:32"}
13调用服务器:127.0.0.1,8099,请求参数:org.white.wrpc.hello.service.HelloService#hello#org.white.wrpc.hello.model.request.HelloRequest${"seq":33},响应参数:{"code":200,"message":"success:33"}
14客户端收到响应:{"code":200,"message":"success:33"}
15调用服务器:127.0.0.1,8096,请求参数:org.white.wrpc.hello.service.HelloService#hello#org.white.wrpc.hello.model.request.HelloRequest${"seq":34},响应参数:{"code":200,"message":"success:34"}
16
17

可以看到,新增了服务,客户端什么都不用改,就能连接到新的服务,此时关闭新启的服务,会发现客户端会访问剩余的服务,不会出现任何问题。

到这里就实现了RPC框架的自己注册bean并通过接口调用.

后续将一步一步实现负载均衡/调用链路Trace记录/限流等功能,欢迎持续关注!

给TA打赏
共{{data.count}}人
人已打赏
安全经验

Google AdSense 全面解析(申请+操作+作弊+忠告)

2021-10-11 16:36:11

安全经验

安全咨询服务

2022-1-12 14:11:49

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索