上一篇实现了服务注册发现和基本的字符串通信功能,这一篇则是实现我们平常使用RPC框架的使用类来调用的功能。
实现consumer端通过接口类来调用远程服务,主要核心在于使用动态代理和反射,这里就一步一步来实现。
这里贴出github代码地址,想直接看代码的可以直接下载运行:https://github.com/whiteBX/wrpc
首先来看consumer端代码,RPCConsumer完整代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58 1public class RPCConsumer {
2
3 /**
4 * url处理器
5 */
6 private UrlHolder urlHolder = new UrlHolder();
7 /**
8 * netty客户端
9 */
10 private NettyClient nettyClient = new NettyClient();
11
12 /**
13 * 远程调用
14 *
15 * @param appCode
16 * @param param
17 * @return
18 */
19 public String call(String appCode, String param) {
20 try {
21 // 从zookeeper获取服务地址
22 String serverIp = urlHolder.getUrl(appCode);
23 if (serverIp == null) {
24 System.out.println("远程调用错误:当前无服务提供者");
25 return "connect error";
26 }
27 // 连接netty,请求并接收响应
28 RpcClientNettyHandler clientHandler = new RpcClientNettyHandler();
29 clientHandler.setParam(param);
30 nettyClient.initClient(serverIp, clientHandler);
31 String result = clientHandler.process();
32 System.out.println(MessageFormat.format("调用服务器:{0},请求参数:{1},响应参数:{2}", serverIp, param, result));
33 return result;
34 } catch (Exception e) {
35 System.out.println("远程服务调用失败:" + e);
36 return "error";
37 }
38 }
39
40 /**
41 * 获取代理类
42 * @param clazz
43 * @param appCode
44 * @return
45 */
46 public <T> T getBean(final Class<T> clazz, final String appCode) {
47 return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class[] { clazz }, new InvocationHandler() {
48 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
49 String param = JSON.toJSONString(args[0]);
50 String beanMessage = MessageFormat.format(ConsumerConstant.BEAN_STRING, clazz.getName(),
51 method.getName(), method.getParameterTypes()[0].getName());
52 return JSON.parseObject(call(appCode, beanMessage.concat(param)), method.getReturnType());
53 }
54 });
55 }
56}
57
58
这里getBean方法主要是通过动态代理来获取代理类,传入的就是我们自己的Service的接口类,其中绑定了InvocationHandler来在调用接口类的方法时执行对应的远程调用操作,包括远程调用包装和返回值解析.
接下来看provider相关类,在RPCProvider类中新增服务注册方法,将我们自己的Service注册进系统缓存:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 1public class RPCProvider {
2 /**
3 * netty客户端
4 */
5 private static NettyClient nettyClient = new NettyClient();
6 /**
7 * zookeeper客户端
8 */
9 private static ZKClient zkClient = new ZKClient();
10
11 public void registry(String server, int port) {
12 // 开启netty监听客户端连接
13 nettyClient.startServer(port);
14 // 创建zk连接并创建临时节点
15 ZooKeeper zooKeeper = zkClient.newConnection(ProviderConstant.ZK_CONNECTION_STRING,
16 ProviderConstant.ZK_SESSION_TIME_OUT);
17 String serverIp = server + CommonConstant.COMMOA + port;
18 zkClient.createEphemeralNode(zooKeeper, ProviderConstant.APP_CODE, serverIp.getBytes());
19 }
20
21 /**
22 * 注册服务提供者
23 * @param clazz
24 * @param obj
25 */
26 public void provide(Class<?> clazz, Object obj) {
27 ProviderBeanHolder.regist(clazz.getName(), obj);
28 }
29}
30
31
其中的ProviderBeanHolder类代码如下,主要负责缓存服务注册信息:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 1 public class ProviderBeanHolder {
2
3 /**
4 * bean注册缓存
5 */
6 private static Map<String, Object> providerList = new HashMap<String, Object>();
7
8 /**
9 * 注册
10 * @param clazzName
11 * @param obj
12 */
13 public static void regist(String clazzName, Object obj) {
14 providerList.put(clazzName, obj);
15 System.out.println("注册provider:" + clazzName);
16 }
17
18 /**
19 * 获取
20 * @param clazzName
21 * @return
22 */
23 public static Object getBean(String clazzName) {
24 return providerList.get(clazzName);
25 }
26}
27
28
接下来来看RpcServerNettyHandler类,这个类负责接收客户端请求并处理,这里通过反射来调用服务注册的方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 1public class RpcServerNettyHandler extends ChannelInboundHandlerAdapter {
2
3 @Override
4 public void channelRead(ChannelHandlerContext ctx, Object msg) {
5 System.out.println("服务端收到请求:" + msg);
6 try {
7 // 解析出 类名+方法名+请求参数类型(方法签名)
8 String[] splitParam = msg.toString().split(ProviderConstant.DOLLAR_SPLIT);
9 String[] beanMessage = splitParam[0].split(ProviderConstant.SHARP_SPLIT);
10 // 获取注册的服务
11 Object object = ProviderBeanHolder.getBean(beanMessage[0]);
12 if (object == null) {
13 System.out.println("服务类未注册:" + beanMessage[0]);
14 }
15 // 通过反射调用服务
16 Class paramType = Class.forName(beanMessage[2]);
17 Method method = object.getClass().getDeclaredMethod(beanMessage[1], paramType);
18 Object response = method.invoke(object, JSON.parseObject(splitParam[1], paramType));
19 // 请求响应
20 ctx.writeAndFlush(JSON.toJSONString(response));
21 } catch (Exception e) {
22 System.out.println("服务异常");
23 }
24 }
25}
26
27
这里主要就是用到了反射相关的知识,实现了服务的调用和响应,到这里相关的代码就实现了,下面来写个service测试一下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32 1public class HelloRequest {
2
3 private int seq;
4
5 private String content;
6
7 // 省略getter setter
8}
9public class HelloResponse {
10
11 private int code;
12
13 private String message;
14
15 // 省略getter setter
16}
17public interface HelloService {
18
19 HelloResponse hello(HelloRequest request);
20}
21public class HelloServiceImpl implements HelloService {
22
23 public HelloResponse hello(HelloRequest request) {
24 System.out.println("HelloService收到请求,序列号:" + request.getSeq());
25 HelloResponse response = new HelloResponse();
26 response.setCode(200);
27 response.setMessage("success:" + request.getSeq());
28 return response;
29 }
30}
31
32
测试代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 1 // ProviderTest相关代码
2 public static void main(String[] args) throws InterruptedException {
3 RPCProvider provider = new RPCProvider();
4 provider.registry("127.0.0.1", 8091);
5 provider.registry("127.0.0.1", 8092);
6 provider.registry("127.0.0.1", 8093);
7 provider.registry("127.0.0.1", 8094);
8 provider.registry("127.0.0.1", 8095);
9 provider.provide(HelloService.class, new HelloServiceImpl());
10
11 Thread.sleep(Long.MAX_VALUE);
12 }
13 // ConsumerTest相关代码
14 public static void main(String[] args) {
15 RPCConsumer consumer = new RPCConsumer();
16 HelloService helloService = consumer.getBean(HelloService.class, ConsumerConstant.APP_CODE);
17 int i = 0;
18 while (true) {
19 HelloRequest request = new HelloRequest();
20 request.setSeq(i++);
21 HelloResponse helloResponse = helloService.hello(request);
22 System.out.println("客户端收到响应:" + JSON.toJSONString(helloResponse));
23 Thread.sleep(Long.MAX_VALUE);
24 }
25 }
26
27
执行结果如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 1// 服务端日志
2zookeeper连接成功
3临时节点创建成功:/registry/100000/0000000025
4zookeeper连接成功
5临时节点创建成功:/registry/100000/0000000026
6zookeeper连接成功
7临时节点创建成功:/registry/100000/0000000027
8zookeeper连接成功
9临时节点创建成功:/registry/100000/0000000028
10zookeeper连接成功
11临时节点创建成功:/registry/100000/0000000029
12注册provider:org.white.wrpc.hello.service.HelloService
13服务端收到请求:org.white.wrpc.hello.service.HelloService#hello#org.white.wrpc.hello.model.request.HelloRequest${"seq":0}
14服务端收到请求,序列号:0
15服务端收到请求:org.white.wrpc.hello.service.HelloService#hello#org.white.wrpc.hello.model.request.HelloRequest${"seq":1}
16服务端收到请求,序列号:1
17服务端收到请求:org.white.wrpc.hello.service.HelloService#hello#org.white.wrpc.hello.model.request.HelloRequest${"seq":2}
18服务端收到请求,序列号:2
19
20// 客户端日志
21zookeeper连接成功
22调用服务器:127.0.0.1,8094,请求参数:org.white.wrpc.hello.service.HelloService#hello#org.white.wrpc.hello.model.request.HelloRequest${"seq":0},响应参数:{"code":200,"message":"success:0"}
23客户端收到响应:{"code":200,"message":"success:0"}
24调用服务器:127.0.0.1,8094,请求参数:org.white.wrpc.hello.service.HelloService#hello#org.white.wrpc.hello.model.request.HelloRequest${"seq":1},响应参数:{"code":200,"message":"success:1"}
25客户端收到响应:{"code":200,"message":"success:1"}
26调用服务器:127.0.0.1,8092,请求参数:org.white.wrpc.hello.service.HelloService#hello#org.white.wrpc.hello.model.request.HelloRequest${"seq":2},响应参数:{"code":200,"message":"success:2"}
27客户端收到响应:{"code":200,"message":"success:2"}
28
29
此时修改Provider类里的端口,重新启动4个服务,可以看到客户端有如下日志:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 1zookeeper连接成功
2zookeeper连接成功
3zookeeper连接成功
4zookeeper连接成功
5调用服务器:127.0.0.1,8095,请求参数:org.white.wrpc.hello.service.HelloService#hello#org.white.wrpc.hello.model.request.HelloRequest${"seq":29},响应参数:{"code":200,"message":"success:29"}
6客户端收到响应:{"code":200,"message":"success:29"}
7调用服务器:127.0.0.1,8091,请求参数:org.white.wrpc.hello.service.HelloService#hello#org.white.wrpc.hello.model.request.HelloRequest${"seq":30},响应参数:{"code":200,"message":"success:30"}
8客户端收到响应:{"code":200,"message":"success:30"}
9调用服务器:127.0.0.1,8097,请求参数:org.white.wrpc.hello.service.HelloService#hello#org.white.wrpc.hello.model.request.HelloRequest${"seq":31},响应参数:{"code":200,"message":"success:31"}
10客户端收到响应:{"code":200,"message":"success:31"}
11调用服务器:127.0.0.1,8092,请求参数:org.white.wrpc.hello.service.HelloService#hello#org.white.wrpc.hello.model.request.HelloRequest${"seq":32},响应参数:{"code":200,"message":"success:32"}
12客户端收到响应:{"code":200,"message":"success:32"}
13调用服务器:127.0.0.1,8099,请求参数:org.white.wrpc.hello.service.HelloService#hello#org.white.wrpc.hello.model.request.HelloRequest${"seq":33},响应参数:{"code":200,"message":"success:33"}
14客户端收到响应:{"code":200,"message":"success:33"}
15调用服务器:127.0.0.1,8096,请求参数:org.white.wrpc.hello.service.HelloService#hello#org.white.wrpc.hello.model.request.HelloRequest${"seq":34},响应参数:{"code":200,"message":"success:34"}
16
17
可以看到,新增了服务,客户端什么都不用改,就能连接到新的服务,此时关闭新启的服务,会发现客户端会访问剩余的服务,不会出现任何问题。
到这里就实现了RPC框架的自己注册bean并通过接口调用.
后续将一步一步实现负载均衡/调用链路Trace记录/限流等功能,欢迎持续关注!