远程方法调用(RPC)的结构如图所示,一般我们还会有一个注册中心如zk、dubbo、eureka等,服务提供方将自己的向外提供的服务等信息注册在上面并暴露IP、端口等信息,而消费方通过在注册中心注册获取可以使用的服务列表,获取想要使用服务的各种信息,从而调用服务方提供的各种方法。
简单来说,服务提供者将服务注册,并监听调用该服务的socket请求,当请求到来时,创建一个子线程获取想要调用的方法、参数等信息,执行后将结果通过socket返回。
而消费者,通过获得一个服务提供方暴露服务的代理对象(即动态代理),通过获取的Ip和Port建立socket链接,发生请求的方法和参数,并接受返回值。
下面说一下动态代理,这里面主要涉及了Proxy代理类和InvocationHandler接口,前者用于获取代理对象,而后者是为了对通过代理对象调用原对象的方法做相应的处理,在RPC中代理主要作用于服务消费方,客户通过服务方暴露的服务接口生成代理对象,在InvocationHandler中重写invoke方法,在其中与服务提供方建立socket连接,发生请求方法和参数并获取返回值。具体逻辑可以这样写
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92 1注意,本部分代码未经验证,只是写出了大体的逻辑,异常也未捕获
2//客户端代码
3//获取代理对象
4public class RpcProxyFactory {
5 public static <T>T getProxy(T interFace) {
6 return (T)Proxy.newProxyInstance(interFace.getClass().getClassLoader(), interFace,
7 new myHandler());
8 }
9}
10
11//重写了invoke方法,就是代理对象的方法执行的时候真正与服务提供方建立连接并获得返回结果的地方。
12public class myHandler extends InvocationHandler{
13 String ip="127.0.0.1";
14 String port = "111";
15
16@override
17public object invock(Method method, object[] args, Proxy proxy) {
18
19 Socket socket = new socket(ip,port);
20 ObjectOutPutStream out= new ObjectOutputStream(socket.getOutputStream());
21 out.writeUTF(method.getName());//请求方法名
22 out.writeObject(method.getParameterTypes());//参数类型
23 output.writeObject(args);//参数值
24 ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
25 Object result = input.readObject();//获取返回值
26 return result;
27
28 //socket 、input、output记得要close
29 }
30}
31
32
33//服务端代码
34接受客户端代理对象socket连接,每到来一个请求建立一个线程,处理请求方法,返回处理结果。
35public class ServiceServer{
36 String port = "111";
37
38 /*
39 * service 为向外暴露的服务接口
40 */
41 public static void provide(Obejct service) {
42 SockerServer server = new SockerServer(port);
43 while(true) {
44 socket.accept();
45 new Thread(new Runable() {
46 @override
47 public void run() {
48 ObejctInputStream input = new ObejctInputStream(server.getInputStream());
49 String methodName = input.readUTF();
50 Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
51 Object[] arguments = (Object[]) input.readObject();
52
53 ObjectOutputStream output = new ObejctInputStream(server.getOutputStream());
54 Method method = service.getClass().getMethod(methodName, parameterTypes);
55 Object result = method.invoke(service, arguments);
56 output.writeObject(result);
57
58 //socketserver 、input、output记得要close
59 }
60 }).start();
61 }
62 }
63}
64
65//服务示例
66public interface HelloService {
67 public String sayHello(String name);
68}
69服务实现
70public class HelloServiceImpl implements HelloService{
71 @Override
72 public String sayHello(String name) {
73 return "hello "+name;
74 }
75}
76//服务端启动
77public class RpcProvider {
78 public static void main(String[] args) throws Exception {
79 HelloService service = new HelloServiceImpl();
80 ServiceServer.provide(service);
81 }
82}
83//客户端调用
84public class RpcConsumer {
85 public static void main(String[] args) throws Exception {
86 HelloService proxy = RpcProxyFactory.getProxy(HelloService.class);
87 String result = proxy.sayHello("world");
88 System.out.println(result);
89 }
90}
91
92
这里有一篇文章写的很详细https://www.jianshu.com/p/d3c7a5bbca09