RPC,即 Remote Procedure Call(远程过程调用),说得通俗一点就是:调用远程计算机上的服务,就像调用本地服务一样。
RPC 可基于 HTTP 或 TCP 协议,Web Service 就是基于 HTTP 协议的 RPC,它具有良好的跨平台性,但其性能却不如基于 TCP 协议的 RPC。会两方面会直接影响 RPC 的性能,一是传输方式,二是序列化。
众所周知,TCP 是传输层协议,HTTP 是应用层协议,而传输层较应用层更加底层,在数据传输方面,越底层越快,因此,在一般情况下,TCP 一定比 HTTP 快。就序列化而言,Java 提供了默认的序列化方式,但在高并发的情况下,这种方式将会带来一些性能上的瓶颈,于是市面上出现了一系列优秀的序列化框架,比如:Protobuf、Kryo、Hessian、Jackson 等,它们可以取代 Java 默认的序列化,从而提供更高效的性能。
下面是简单实现的基于netty的RPC调用。
一、首先定义消息传递的实体类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 1span style="font-size:14px;">public class ClassInfo implements Serializable {
2
3 private static final long serialVersionUID = -8970942815543515064L;
4
5 private String className;//类名
6 private String methodName;//函数名称
7 private Class<?>[] types;//参数类型
8 private Object[] objects;//参数列表
9 public String getClassName() {
10 return className;
11 }
12 public void setClassName(String className) {
13 this.className = className;
14 }
15 public String getMethodName() {
16 return methodName;
17 }
18 public void setMethodName(String methodName) {
19 this.methodName = methodName;
20 }
21 public Class<?>[] getTypes() {
22 return types;
23 }
24 public void setTypes(Class<?>[] types) {
25 this.types = types;
26 }
27 public Object[] getObjects() {
28 return objects;
29 }
30 public void setObjects(Object[] objects) {
31 this.objects = objects;
32 }
33}
34
35
二、创建Netty操作的服务端,以及具体操作
-
服务端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44 1public class RPCServer {
2 private int port;
3 public RPCServer(int port){
4 this.port = port;
5 }
6 public void start(){
7 EventLoopGroup bossGroup = new NioEventLoopGroup();
8 EventLoopGroup workerGroup = new NioEventLoopGroup();
9
10 try {
11 ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
12 .localAddress(port).childHandler(new ChannelInitializer<SocketChannel>() {
13
14 @Override
15 protected void initChannel(SocketChannel ch) throws Exception {
16 ChannelPipeline pipeline = ch.pipeline();
17 pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
18 pipeline.addLast(new LengthFieldPrepender(4));
19 pipeline.addLast("encoder", new ObjectEncoder());
20 pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
21 pipeline.addLast(new InvokerHandler());
22 }
23 }).option(ChannelOption.SO_BACKLOG, 128)
24 .childOption(ChannelOption.SO_KEEPALIVE, true);
25 ChannelFuture future = serverBootstrap.bind(port).sync();
26 System.out.println("Server start listen at " + port );
27 future.channel().closeFuture().sync();
28 } catch (Exception e) {
29 bossGroup.shutdownGracefully();
30 workerGroup.shutdownGracefully();
31 }
32 }
33 public static void main(String[] args) throws Exception {
34 int port;
35 if (args.length > 0) {
36 port = Integer.parseInt(args[0]);
37 } else {
38 port = 8080;
39 }
40 new RPCServer(port).start();
41 }
42}
43
44
-
服务端操作,由服务端我们看到具体的数据传输操作是进行序列化的,具体的操作还是比较简单的,就是获取发送过来的信息,这样就可以通过反射获得类名,根据函数名和参数值,执行具体的操作,将执行结果发送给客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32 1public class InvokerHandler extends ChannelInboundHandlerAdapter {
2 public static ConcurrentHashMap<String, Object> classMap = new ConcurrentHashMap<String,Object>();
3 @Override
4 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
5 ClassInfo classInfo = (ClassInfo)msg;
6 Object claszz = null;
7 if(!classMap.containsKey(classInfo.getClassName())){
8 try {
9 claszz = Class.forName(classInfo.getClassName()).newInstance();
10 classMap.put(classInfo.getClassName(), claszz);
11 } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
12 e.printStackTrace();
13 }
14 }else {
15 claszz = classMap.get(classInfo.getClassName());
16 }
17 Method method = claszz.getClass().getMethod(classInfo.getMethodName(), classInfo.getTypes());
18 Object result = method.invoke(claszz, classInfo.getObjects());
19 ctx.write(result);
20 ctx.flush();
21 ctx.close();
22 }
23 @Override
24 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
25 cause.printStackTrace();
26 ctx.close();
27 }
28
29}
30
31
32
三、客户端,通过代理机制来触发远程调用
(1)客户端,当执行具体的函数时会调用远程操作,将具体操作的类、函数及参数信息发送到服务端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49 1public class RPCProxy {
2
3 @SuppressWarnings("unchecked")
4 public static <T> T create(Object target){
5
6 return (T) Proxy.newProxyInstance(target.getClass().getClassLoader(),target.getClass().getInterfaces(), new InvocationHandler(){
7
8 @Override
9 public Object invoke(Object proxy, Method method, Object[] args)
10 throws Throwable {
11
12 ClassInfo classInfo = new ClassInfo();
13 classInfo.setClassName(target.getClass().getName());
14 classInfo.setMethodName(method.getName());
15 classInfo.setObjects(args);
16 classInfo.setTypes(method.getParameterTypes());
17
18 ResultHandler resultHandler = new ResultHandler();
19 EventLoopGroup group = new NioEventLoopGroup();
20 try {
21 Bootstrap b = new Bootstrap();
22 b.group(group)
23 .channel(NioSocketChannel.class)
24 .option(ChannelOption.TCP_NODELAY, true)
25 .handler(new ChannelInitializer<SocketChannel>() {
26 @Override
27 public void initChannel(SocketChannel ch) throws Exception {
28 ChannelPipeline pipeline = ch.pipeline();
29 pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
30 pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
31 pipeline.addLast("encoder", new ObjectEncoder());
32 pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
33 pipeline.addLast("handler",resultHandler);
34 }
35 });
36
37 ChannelFuture future = b.connect("localhost", 8080).sync();
38 future.channel().writeAndFlush(classInfo).sync();
39 future.channel().closeFuture().sync();
40 } finally {
41 group.shutdownGracefully();
42 }
43 return resultHandler.getResponse();
44 }
45 });
46 }
47}
48
49
-
获取远程调用返回的结果值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 1public class ResultHandler extends ChannelInboundHandlerAdapter {
2
3 private Object response;
4
5 public Object getResponse() {
6 return response;
7}
8
9 @Override
10 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
11 response=msg;
12 System.out.println("client接收到服务器返回的消息:" + msg);
13 }
14
15 @Override
16 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
17 System.out.println("client exception is general");
18 }
19}
20
21
四、接口、实现类及Main操作
1
2
3
4
5 1public interface HelloRpc {
2 String hello(String name);
3}
4
5
1
2
3
4
5
6
7
8
9
10 1public class HelloRpcImpl implements HelloRpc {
2
3 @Override
4 public String hello(String name) {
5 return "hello "+name;
6 }
7
8}
9
10