Netty 实现简单RPC调用

释放双眼,带上耳机,听听看~!

RPC,即 Remote Procedure Call(远程过程调用),说得通俗一点就是:调用远程计算机上的服务,就像调用本地服务一样。

RPC 可基于 HTTP 或 TCP 协议,Web Service 就是基于 HTTP 协议的 RPC,它具有良好的跨平台性,但其性能却不如基于 TCP 协议的 RPC。会两方面会直接影响 RPC 的性能,一是传输方式,二是序列化。

众所周知,TCP 是传输层协议,HTTP 是应用层协议,而传输层较应用层更加底层,在数据传输方面,越底层越快,因此,在一般情况下,TCP 一定比 HTTP 快。就序列化而言,Java 提供了默认的序列化方式,但在高并发的情况下,这种方式将会带来一些性能上的瓶颈,于是市面上出现了一系列优秀的序列化框架,比如:Protobuf、Kryo、Hessian、Jackson 等,它们可以取代 Java 默认的序列化,从而提供更高效的性能。
下面是简单实现的基于netty的RPC调用。

一、首先定义消息传递的实体类


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
1span style="font-size:14px;">public class ClassInfo implements Serializable {  
2  
3    private static final long serialVersionUID = -8970942815543515064L;  
4      
5    private String className;//类名  
6    private String methodName;//函数名称  
7    private Class<?>[] types;//参数类型    
8    private Object[] objects;//参数列表    
9    public String getClassName() {  
10        return className;  
11    }  
12    public void setClassName(String className) {  
13        this.className = className;  
14    }  
15    public String getMethodName() {  
16        return methodName;  
17    }  
18    public void setMethodName(String methodName) {  
19        this.methodName = methodName;  
20    }  
21    public Class<?>[] getTypes() {  
22        return types;  
23    }  
24    public void setTypes(Class<?>[] types) {  
25        this.types = types;  
26    }  
27    public Object[] getObjects() {  
28        return objects;  
29    }  
30    public void setObjects(Object[] objects) {  
31        this.objects = objects;  
32    }  
33}
34
35

二、创建Netty操作的服务端,以及具体操作

  1. 服务端


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
1public class RPCServer {  
2    private int port;  
3    public RPCServer(int port){  
4        this.port = port;  
5    }  
6    public void start(){  
7        EventLoopGroup bossGroup = new NioEventLoopGroup();  
8        EventLoopGroup workerGroup = new NioEventLoopGroup();  
9          
10        try {  
11            ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)  
12                    .localAddress(port).childHandler(new ChannelInitializer<SocketChannel>() {  
13  
14                        @Override  
15                        protected void initChannel(SocketChannel ch) throws Exception {  
16                            ChannelPipeline pipeline = ch.pipeline();    
17                             pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));    
18                                pipeline.addLast(new LengthFieldPrepender(4));    
19                                pipeline.addLast("encoder", new ObjectEncoder());      
20                                pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));    
21                                pipeline.addLast(new InvokerHandler());  
22                        }  
23                    }).option(ChannelOption.SO_BACKLOG, 128)      
24                    .childOption(ChannelOption.SO_KEEPALIVE, true);  
25            ChannelFuture future = serverBootstrap.bind(port).sync();      
26            System.out.println("Server start listen at " + port );    
27            future.channel().closeFuture().sync();    
28        } catch (Exception e) {  
29             bossGroup.shutdownGracefully();    
30             workerGroup.shutdownGracefully();  
31        }  
32    }  
33    public static void main(String[] args) throws Exception {    
34        int port;    
35        if (args.length > 0) {    
36            port = Integer.parseInt(args[0]);    
37        } else {    
38            port = 8080;    
39        }    
40        new RPCServer(port).start();    
41    }    
42}  
43
44
  1. 服务端操作,由服务端我们看到具体的数据传输操作是进行序列化的,具体的操作还是比较简单的,就是获取发送过来的信息,这样就可以通过反射获得类名,根据函数名和参数值,执行具体的操作,将执行结果发送给客户端


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
1public class InvokerHandler extends ChannelInboundHandlerAdapter {  
2    public static ConcurrentHashMap<String, Object> classMap = new ConcurrentHashMap<String,Object>();  
3    @Override    
4    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {    
5        ClassInfo classInfo = (ClassInfo)msg;  
6        Object claszz = null;  
7        if(!classMap.containsKey(classInfo.getClassName())){  
8            try {  
9                claszz = Class.forName(classInfo.getClassName()).newInstance();  
10                classMap.put(classInfo.getClassName(), claszz);  
11            } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {  
12                e.printStackTrace();  
13            }  
14        }else {  
15            claszz = classMap.get(classInfo.getClassName());  
16        }  
17        Method method = claszz.getClass().getMethod(classInfo.getMethodName(), classInfo.getTypes());    
18        Object result = method.invoke(claszz, classInfo.getObjects());  
19        ctx.write(result);  
20        ctx.flush();    
21        ctx.close();  
22    }    
23    @Override    
24    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {    
25         cause.printStackTrace();    
26         ctx.close();    
27    }    
28  
29}  
30
31
32

三、客户端,通过代理机制来触发远程调用
(1)客户端,当执行具体的函数时会调用远程操作,将具体操作的类、函数及参数信息发送到服务端


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
1public class RPCProxy {  
2          
3    @SuppressWarnings("unchecked")  
4    public static <T> T create(Object target){  
5              
6        return (T) Proxy.newProxyInstance(target.getClass().getClassLoader(),target.getClass().getInterfaces(), new InvocationHandler(){  
7  
8            @Override  
9            public Object invoke(Object proxy, Method method, Object[] args)  
10                        throws Throwable {  
11                  
12                ClassInfo classInfo = new ClassInfo();  
13                classInfo.setClassName(target.getClass().getName());  
14                classInfo.setMethodName(method.getName());  
15                classInfo.setObjects(args);  
16                classInfo.setTypes(method.getParameterTypes());  
17                  
18                ResultHandler resultHandler = new ResultHandler();  
19                EventLoopGroup group = new NioEventLoopGroup();    
20                try {    
21                    Bootstrap b = new Bootstrap();    
22                    b.group(group)    
23                     .channel(NioSocketChannel.class)    
24                     .option(ChannelOption.TCP_NODELAY, true)    
25                     .handler(new ChannelInitializer<SocketChannel>() {    
26                         @Override    
27                         public void initChannel(SocketChannel ch) throws Exception {    
28                             ChannelPipeline pipeline = ch.pipeline();    
29                             pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));    
30                             pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));    
31                             pipeline.addLast("encoder", new ObjectEncoder());      
32                             pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));    
33                             pipeline.addLast("handler",resultHandler);  
34                         }    
35                     });    
36            
37                    ChannelFuture future = b.connect("localhost", 8080).sync();    
38                    future.channel().writeAndFlush(classInfo).sync();  
39                    future.channel().closeFuture().sync();    
40                } finally {    
41                    group.shutdownGracefully();    
42                }  
43                return resultHandler.getResponse();  
44            }  
45        });  
46    }  
47}  
48
49
  1. 获取远程调用返回的结果值


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
1public class ResultHandler extends ChannelInboundHandlerAdapter {  
2  
3    private Object response;    
4      
5    public Object getResponse() {    
6    return response;    
7}    
8  
9    @Override    
10    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {    
11        response=msg;    
12        System.out.println("client接收到服务器返回的消息:" + msg);    
13    }    
14        
15    @Override    
16    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {    
17        System.out.println("client exception is general");    
18    }    
19}  
20
21

四、接口、实现类及Main操作


1
2
3
4
5
1public interface HelloRpc {  
2    String hello(String name);  
3}  
4
5

1
2
3
4
5
6
7
8
9
10
1public class HelloRpcImpl implements HelloRpc {  
2  
3    @Override  
4    public String hello(String name) {  
5        return "hello "+name;  
6    }  
7  
8}  
9
10

给TA打赏
共{{data.count}}人
人已打赏
安全经验

Google Adsense(Google网站联盟)广告申请指南

2021-10-11 16:36:11

安全经验

安全咨询服务

2022-1-12 14:11:49

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索