Netty游戏服务器实战开发(5):利用Netty打造轻量级RPC组件

释放双眼,带上耳机,听听看~!

简介:什么是RPC?
RPC,即 Remote Procedure Call(远程过程调用),说得通俗一点就是:调用远程计算机上的服务,就像调用本地服务一样。

RPC 可基于 HTTP 或 TCP 协议,Web Service 就是基于 HTTP 协议的 RPC,它具有良好的跨平台性,但其性能却不如基于 TCP 协议的 RPC。会两方面会直接影响 RPC 的性能,一是传输方式,二是序列化。

Netty如何实现轻量级RPC?

将为您揭晓开发轻量级分布式 RPC 框架的具体过程,该框架基于 TCP 协议,提供了 NIO 特性,提供高效的序列化方式,同时也具备服务注册与发现的能力。

首先我们需要的技术栈:

  1. Spring:它是最强大的依赖注入框架,也是业界的权威标准。
  2. Netty:它使 NIO 编程更加容易,屏蔽了 Java 底层的 NIO 细节。
  3. Protostuff:它基于 Protobuf 序列化框架,面向 POJO,无需编写 .proto 文件。
  4. ZooKeeper:提供服务注册与发现功能,开发分布式系统的必备选择,同时它也具备天生的集群能力。

1:首先我们编写rpc服务接口。

rpc服务接和一般的接口没有啥区别,简单的服务接口,传入不同的参数。返回数据。
如下:


1
2
3
4
5
6
7
8
9
10
1/**
2 * @author twjitm- [Created on 2018-08-20 10:24]
3 * @jdk java version "1.8.0_77"
4 */
5public interface IHelloWorld {
6    String getHelloWorld(int number);
7}
8
9
10

实现类:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
1/**
2 * @author twjitm - [Created on 2018-08-20 10:26]
3
4 * @jdk java version "1.8.0_77"
5 * 这个地方一定是注入的是业务对象的接口~~~~~~~~~~~~~~~~
6 */
7@NettyRpcServiceAnnotation(IHelloWorld.class)
8@Repository
9public class HelloWorldImpl implements IHelloWorld {
10    private Logger logger = LoggerUtils.getLogger(HelloWorldImpl.class);
11
12    @Override
13    public String getHelloWorld(int number) {
14        StringBuilder builder = new StringBuilder();
15        for (int i = number; i > 0; i--) {
16            builder.append("helloworld");
17            builder.append(i);
18        }
19        System.out.println("rpc 远程调用方法成功。。。。。,即将返回给远程客户端调用");
20        return builder.toString();
21    }
22}
23
24
25

其中NettyRpcServiceAnnotation 注解表示将这个接口是一个远程服务调用接口。注入一个class类对象。NettyRpcServiceAnnotation .java 如下表示:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
1/**
2 * Created by IntelliJ IDEA.
3 * User: twjitm Date: 2018/8/19  Time: 13:19
4 * https://blog.csdn.net/baidu_23086307
5 * rpc service annotation
6 */
7@Target({ElementType.TYPE})
8@Retention(RetentionPolicy.RUNTIME)
9@Component
10public @interface NettyRpcServiceAnnotation {
11    Class<?> value();
12}
13
14
15

2、编写RPC协议编解码
我们都知道,Netty提供了很多优秀的编解码器,我们能够利用系统提供的编解码快速的编写网络层应用,把更多的心思关注到我们的业务中,同样,我们编写一套适合我们自己的rpc协议编解码器。

  • 2.1编码器

正如前面说的,我们利用Protostuff:它基于 Protobuf 序列化框架,面向 POJO,无需编写 .proto 文件。
所以我们编写一套序列化工厂。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
1/**
2 * Created by IntelliJ IDEA.
3 * User: 文江 Date: 2018/8/19  Time: 9:23
4 * https://blog.csdn.net/baidu_23086307
5 * netty net rpc serialize ,use rpc message
6 */
7
8@Service
9public class NettyProtoBufRpcSerialize implements INettyRpcSerialize {
10    Logger logger = LoggerUtils.getLogger(NettyProtoBufRpcSerialize.class);
11
12    private Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<>();
13
14    private Objenesis objenesis = new ObjenesisStd(true);
15
16    @SuppressWarnings("unchecked")
17    private <T> Schema<T> getSchema(Class<T> cls) {
18        Schema<T> schema = (Schema<T>) cachedSchema.get(cls);
19        if (schema == null) {
20            schema = RuntimeSchema.createFrom(cls);
21            if (schema != null) {
22                cachedSchema.put(cls, schema);
23            }
24        }
25        return schema;
26    }
27
28    /**
29     * 序列化(对象 -> 字节数组)
30     */
31    @Override
32    @SuppressWarnings("unchecked")
33    public <T> byte[] serialize(T obj) {
34        Class<T> cls = (Class<T>) obj.getClass();
35        LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
36        try {
37            Schema<T> schema = getSchema(cls);
38            logger.info("SERIALIZE RPC MESSAGE SUCCESSFUL CLASS IS " +obj.getClass());
39            return ProtostuffIOUtil.toByteArray(obj, schema, buffer);
40        } catch (Exception e) {
41            throw new IllegalStateException(e.getMessage(), e);
42        } finally {
43            buffer.clear();
44        }
45    }
46
47    /**
48     * 反序列化(字节数组 -> 对象)
49     */
50    @Override
51    public <T> T deserialize(byte[] data, Class<T> cls) {
52        try {
53            T message = (T) objenesis.newInstance(cls);
54            Schema<T> schema = getSchema(cls);
55            ProtostuffIOUtil.mergeFrom(data, message, schema);
56            logger.info("DESERIALIZE RPC MESSAGE SUCCESSFUL CLASS IS " +cls.getClass());
57            return message;
58        } catch (Exception e) {
59            throw new IllegalStateException(e.getMessage(), e);
60        }
61    }
62
63    /**
64     * 生成对象
65     */
66    public <T> T newInstance(Class<T> cls) {
67        try {
68            T message = (T) objenesis.newInstance(cls);
69            return message;
70        } catch (Exception e) {
71            throw new IllegalStateException(e.getMessage(), e);
72        }
73    }
74}
75
76
77

rpc协议编码器 NettyNetMessageRPCDecoder


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
1/**
2 * @author twjitm- [Created on 2018-08-17 18:37]
3 * @jdk java version "1.8.0_77"
4 */
5
6public class NettyNetMessageRPCDecoder extends ByteToMessageDecoder {
7    private Class<?> genericClass;
8
9    public NettyNetMessageRPCDecoder(Class<?> genericClass) {
10        this.genericClass = genericClass;
11    }
12
13
14    @Override
15    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
16
17        if (in.readableBytes() < 4) {
18            return;
19        }
20        in.markReaderIndex();
21        int dataLength = in.readInt();
22        /*if (dataLength <= 0) {
23            ctx.close();
24        }*/
25        if (in.readableBytes() < dataLength) {
26            in.resetReaderIndex();
27            return;
28        }
29        byte[] data = new byte[dataLength];
30        in.readBytes(data);
31
32        NettyProtoBufRpcSerialize serialize = SpringServiceManager.getSpringLoadService().getNettyProtoBufRpcSerialize();
33        Object obj = serialize.deserialize(data, genericClass);
34        out.add(obj);
35
36    }
37}
38
39
40

2.2rpc解码器
rpc解码器和编码器是对应的,我们必须要有一套完整的编码器和解码器。缺一不可。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
1/**
2 * Created by IntelliJ IDEA.
3 * User: 文江 Date: 2018/8/19  Time: 9:55
4 * https://blog.csdn.net/baidu_23086307
5 */
6public class NettyNetMessageRPCEncoder extends MessageToByteEncoder {
7
8    private Class<?> genericClass;
9
10    public NettyNetMessageRPCEncoder(Class<?> genericClass) {
11        this.genericClass = genericClass;
12    }
13
14    @Override
15    public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception {
16        if (genericClass.isInstance(in)) {
17            NettyProtoBufRpcSerialize serialize = SpringServiceManager.getSpringLoadService().getNettyProtoBufRpcSerialize();
18            byte[] data = serialize.serialize(in);
19            out.writeInt(data.length);
20            out.writeBytes(data);
21        }
22    }
23}
24
25

到此我们准备工作做完了,我们需要编写rpc服务器了。rpc服务器和Netty编写普通服务器其实差别不是很大。只不过在编码器和解码器使用的不同而已。

3、rpc服务器

rpc服务器为了提供远程调用接口所做的服务。客户端发起rpc远程调用的时候必须要开放对应的服务器端口。否者没法访问rpc服务。所以,我们编写一套基于netty的rpc服务器。利用Netty提供的网络支持。所以很容易编写。而且代码非常简洁。

下面rpc服务器
为了提高代码的复用,我们将服务抽取成一个接口,传入不同的ip,port等基层信息,构造一个服务器。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
1package com.twjitm.core.bootstrap.tcp;
2
3import com.twjitm.core.spring.SpringServiceManager;
4import com.twjitm.core.bootstrap.AbstractNettyGameBootstrapService;
5import com.twjitm.core.utils.logs.LoggerUtils;
6import com.twjitm.threads.thread.NettyThreadNameFactory;
7import io.netty.bootstrap.ServerBootstrap;
8import io.netty.channel.ChannelFuture;
9import io.netty.channel.ChannelInitializer;
10import io.netty.channel.ChannelOption;
11import io.netty.channel.EventLoopGroup;
12import io.netty.channel.nio.NioEventLoopGroup;
13import io.netty.channel.socket.nio.NioServerSocketChannel;
14import org.apache.log4j.Logger;
15
16import java.net.InetSocketAddress;
17
18
19/**
20 * 抽象的tcp协议服务启动类,
21 * 本类提供了启动tcp协议服务的抽象类,主要用来实现启动启动服务器,
22 * 而具体的操作需要由子类来实现。实现类需要将基本信息传递进来
23 * 才可以成功的启动服务。
24 * <h3>不要把这个类和{@link  com.twjitm.core.bootstrap.udp.AbstractNettyGameBootstrapUdpService},
25 * {@link com.twjitm.core.bootstrap.http.AbstractNettyGameBootstrapHttpService}类相互调用<h3/>
26 *
27 * <h3>服务器启动过程<h3/>
28 * <pre>
29 * {@code}
30 *     listenIntoGroup = new NioEventLoopGroup(1, bossNettyThreadNameFactory);
31 *         progressGroup = new NioEventLoopGroup(0, workerNettyThreadNameFactory);
32 *         ServerBootstrap bootstrap = new ServerBootstrap();
33 *         bootstrap.group(listenIntoGroup, progressGroup)
34 *                 .channel(NioServerSocketChannel.class)
35 *                 .childHandler(channelInitializer)
36 *                 .option(ChannelOption.SO_BACKLOG, 128)
37 *                 .childOption(ChannelOption.SO_KEEPALIVE, true);
38 *         ChannelFuture channelFuture;
39 *         try {
40 *             channelFuture = bootstrap.bind(this.serverIp, this.serverPort).sync();
41 *             logger.info("[---------------------" + serverName + " SERVICE START IS SUCCESSFUL IP=[" + serverIp + "]LISTENER PORT NUMBER IS :[" + serverPort + "]------------]");
42 *             channelFuture.channel().closeFuture().sync();
43 *         } catch (InterruptedException e) {
44 *             logger.error(serverName + "START HAVE ERROR ,WILL STOP");
45 *             SpringServiceManager.shutdown();
46 *             e.printStackTrace();
47 *             logger.error(e);
48 *         } finally {
49 *             listenIntoGroup.shutdownGracefully();
50 *             progressGroup.shutdownGracefully();
51 *             logger.info(serverName + "SERVER WORLD STOP");
52 *         }
53 *
54 * @author 文江
55 * @date 2018/4/16
56
57 */
58public abstract class AbstractNettyGameBootstrapTcpService extends AbstractNettyGameBootstrapService {
59    private static Logger logger = LoggerUtils.getLogger(AbstractNettyGameBootstrapTcpService.class);
60    private int serverPort;
61    private String serverIp;
62
63    private String serverName;
64
65    private NettyThreadNameFactory bossNettyThreadNameFactory;
66    private NettyThreadNameFactory workerNettyThreadNameFactory;
67    private ChannelInitializer channelInitializer;
68
69    private EventLoopGroup listenIntoGroup;
70    private EventLoopGroup progressGroup;
71
72    public AbstractNettyGameBootstrapTcpService(int serverPort,
73                                                String serverIp,
74                                                String bossTreadName,
75                                                String workerTreadName,
76                                                ChannelInitializer channelInitializer,
77                                                String serverName) {
78        super(serverPort, new InetSocketAddress(serverIp, serverPort));
79        this.serverIp = serverIp;
80        this.serverPort = serverPort;
81        this.bossNettyThreadNameFactory = new NettyThreadNameFactory(bossTreadName);
82        this.workerNettyThreadNameFactory = new NettyThreadNameFactory(workerTreadName);
83        this.channelInitializer = channelInitializer;
84        this.serverName = serverName;
85    }
86
87    @Override
88    public void startServer() {
89        listenIntoGroup = new NioEventLoopGroup(1, bossNettyThreadNameFactory);
90        progressGroup = new NioEventLoopGroup(0, workerNettyThreadNameFactory);
91        ServerBootstrap bootstrap = new ServerBootstrap();
92        bootstrap.group(listenIntoGroup, progressGroup)
93                .channel(NioServerSocketChannel.class)
94                .childHandler(channelInitializer)
95                .option(ChannelOption.SO_BACKLOG, 128)
96                .childOption(ChannelOption.SO_KEEPALIVE, true);
97        ChannelFuture channelFuture;
98        try {
99            channelFuture = bootstrap.bind(this.serverIp, this.serverPort).sync();
100            logger.info("[---------------------" + serverName + " SERVICE START IS SUCCESSFUL IP=[" + serverIp + "]LISTENER PORT NUMBER IS :[" + serverPort + "]------------]");
101            channelFuture.channel().closeFuture().sync();
102        } catch (InterruptedException e) {
103            logger.error(serverName + "START HAVE ERROR ,WILL STOP");
104            SpringServiceManager.shutdown();
105            e.printStackTrace();
106            logger.error(e);
107        } finally {
108            listenIntoGroup.shutdownGracefully();
109            progressGroup.shutdownGracefully();
110            logger.info(serverName + "SERVER WORLD STOP");
111        }
112    }
113
114    @Override
115    public void stopServer() throws Throwable {
116        listenIntoGroup.shutdownGracefully();
117        progressGroup.shutdownGracefully();
118    }
119
120}
121
122
123

rpc服务器初始化


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
1package com.twjitm.core.bootstrap.rpc;
2
3import com.twjitm.core.bootstrap.tcp.AbstractNettyGameBootstrapTcpService;
4import io.netty.channel.ChannelInitializer;
5
6/**
7 * Created by IntelliJ IDEA.
8 * User: 文江 Date: 2018/8/19  Time: 10:03
9 * https://blog.csdn.net/baidu_23086307
10 */
11public class NettyGameBootstrapRpcService extends AbstractNettyGameBootstrapTcpService {
12
13    public NettyGameBootstrapRpcService(int serverPort, String serverIp, String bossTreadName, String workerTreadName, ChannelInitializer channelInitializer,String serverName) {
14        super(serverPort, serverIp, bossTreadName, workerTreadName, channelInitializer,serverName);
15    }
16
17    @Override
18    public void startServer() {
19        super.startServer();
20
21    }
22
23    @Override
24    public void stopServer() throws Throwable {
25        super.stopServer();
26    }
27}
28
29
30

每个服务都有自己的initial 用于初始化一些组件。同样我们也为rpc服务器编写一个initial。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
1/**
2 * Created by IntelliJ IDEA.
3 * User: 文江 Date: 2018/8/19  Time: 10:19
4 * https://blog.csdn.net/baidu_23086307
5 */
6public class NettyRpcMessageServerInitializer extends ChannelInitializer<NioSocketChannel> {
7
8    @Override
9    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
10        ChannelPipeline pipeline = nioSocketChannel.pipeline();
11        int maxLength = Integer.MAX_VALUE;
12        pipeline.addLast("frame", new LengthFieldBasedFrameDecoder(maxLength, 0, 4, 0, 0));
13        pipeline.addLast("decoder", new NettyNetMessageRPCDecoder(NettyRpcRequestMessage.class));
14        pipeline.addLast("encoder", new NettyNetMessageRPCEncoder(NettyRpcResponseMessage.class));
15        int readerIdleTimeSeconds = 0;
16        int writerIdleTimeSeconds = 0;
17        int allIdleTimeSeconds = GlobalConstants.NettyNet.SESSION_HEART_ALL_TIMEOUT;
18        pipeline.addLast("idleStateHandler", new IdleStateHandler(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds));
19        pipeline.addLast("logger", new LoggingHandler(LogLevel.DEBUG));
20        pipeline.addLast("handler", new NettyNetRPCServerHandler());
21    }
22}
23
24

对应的handler NettyNetRPCServerHandler,handler主要的功能是将收到的rpc请求消息分发给不同的处理器,通过反射注解的方式。把每条rpc请求对应的业务逻辑代码对应起来。最后将结果返回NettyRpcRequestMessage 。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
1package com.twjitm.core.common.handler.rpc;
2
3import com.twjitm.core.common.netstack.entity.rpc.NettyRpcRequestMessage;
4import com.twjitm.core.common.netstack.entity.rpc.NettyRpcResponseMessage;
5import com.twjitm.core.common.service.rpc.service.NettyRemoteRpcHandlerService;
6import com.twjitm.core.spring.SpringServiceManager;
7import com.twjitm.core.utils.logs.LoggerUtils;
8import io.netty.channel.ChannelFutureListener;
9import io.netty.channel.ChannelHandlerContext;
10import io.netty.channel.SimpleChannelInboundHandler;
11import org.apache.log4j.Logger;
12
13/**
14 * rpc远程登陆后远程消息处理器,
15 * 主要用于客户端请求之后消息处理。需要分发到具体的rpc服务类中
16 * <p>
17 * Created by IntelliJ IDEA.
18 * User: 文江 Date: 2018/8/19  Time: 10:22
19 * https://blog.csdn.net/baidu_23086307
20 * <p>
21 * handler rpc message
22 */
23public class NettyNetRPCServerHandler extends SimpleChannelInboundHandler<NettyRpcRequestMessage> {
24    Logger logger = LoggerUtils.getLogger(NettyNetRPCServerHandler.class);
25
26    @Override
27    public void channelActive(ChannelHandlerContext ctx) throws Exception {
28        super.channelActive(ctx);
29    }
30
31    @Override
32    public void channelRead0(final ChannelHandlerContext ctx, final NettyRpcRequestMessage request) throws Exception {
33        NettyRemoteRpcHandlerService remoteRpcHandlerService = SpringServiceManager.getSpringLoadService().getNettyRemoteRpcHandlerService();
34        remoteRpcHandlerService.submit(() -> {
35            if (logger.isDebugEnabled()) {
36                logger.debug("RECEIVE REQUEST " + request.getRequestId());
37            }
38            NettyRpcResponseMessage response = new NettyRpcResponseMessage();
39            response.setRequestId(request.getRequestId());
40            try {
41                Object result = SpringServiceManager.getSpringLoadService().getDispatcherService().dispatcher(request);
42                response.setResult(result);
43            } catch (Throwable t) {
44                response.setError(t.toString());
45                logger.error("RPC SERVER HANDLE REQUEST ERROR", t);
46            }
47            ctx.writeAndFlush(response).addListener((ChannelFutureListener) channelFuture -> {
48                if (logger.isDebugEnabled()) {
49                    logger.debug("SEND RESPONSE FOR REQUEST " + request.getRequestId());
50                }
51            });
52        });
53    }
54
55
56    @Override
57    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
58        //if(logger.isErrorEnabled()) {
59        logger.error("SERVER CAUGHT EXCEPTION", cause);
60        //}
61        ctx.close();
62    }
63}
64
65
66

4、rpc客户端
rpc客户端就是发起rpc请求的服务器的一方,我们把任何发起rpc请求的服务器都在这一时刻当做rpc客户端。rpc客户端发起rpc调用的方式一般有两种:
1 、同步方式调用:同步方式调用主要是rpc请求线程必须得阻塞到远程调用结果返回来才继续执行。
2、 异步调用方式:异步调用主要是rpc请求线程不会阻塞,而是通过回调接口的调用方式来获取返回结果。最后将结果返回给调用线程。

针对于这两种方式,在不同的业务场合选这不同的调用方式。当我们调用线程不考虑耗时的情况下我们选择同步调用的方式。使得程序上面来说要简单。当我们对时间要求特别高的时候我们应该选择异步调用。不要阻塞调用线程。

对于rpc客户,我们先封装一套请求原则,即请求头和响应头。

4.1.1、rpc请求头


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
1package com.twjitm.core.common.netstack.entity.rpc;
2
3/**
4 * Created by IntelliJ IDEA.
5 * User: 文江 Date: 2018/8/19  Time: 10:10
6 * https://blog.csdn.net/baidu_23086307
7 */
8public class NettyRpcRequestMessage {
9    /**
10     * 请求id
11     */
12    private String requestId;
13    /**
14     * class 名
15     */
16    private String className;
17    /**
18     * 方法名
19     */
20    private String methodName;
21    /**
22     * 请求参数类型
23     */
24    private Class<?>[] parameterTypes;
25    /**
26     * 请求参数
27     */
28    private Object[] parameters;
29
30    public String getRequestId() {
31        return requestId;
32    }
33
34    public void setRequestId(String requestId) {
35        this.requestId = requestId;
36    }
37
38    public String getClassName() {
39        return className;
40    }
41
42    public void setClassName(String className) {
43        this.className = className;
44    }
45
46    public String getMethodName() {
47        return methodName;
48    }
49
50    public void setMethodName(String methodName) {
51        this.methodName = methodName;
52    }
53
54    public Class<?>[] getParameterTypes() {
55        return parameterTypes;
56    }
57
58    public void setParameterTypes(Class<?>[] parameterTypes) {
59        this.parameterTypes = parameterTypes;
60    }
61
62    public Object[] getParameters() {
63        return parameters;
64    }
65
66    public void setParameters(Object[] parameters) {
67        this.parameters = parameters;
68    }
69
70}
71
72
73

4.1.2、rpc响应头


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
1package com.twjitm.core.common.netstack.entity.rpc;
2
3/**
4 * Created by IntelliJ IDEA.
5 * User: 文江 Date: 2018/8/19  Time: 10:15
6 * https://blog.csdn.net/baidu_23086307
7 */
8public class NettyRpcResponseMessage {
9    /**
10     * 请求id
11     */
12    private String requestId;
13    /**
14     * 错误码
15     */
16    private String error;
17    /**
18     * 返回值
19     */
20    private Object result;
21
22    public String getRequestId() {
23        return requestId;
24    }
25
26    public String getError() {
27        return error;
28    }
29
30    public Object getResult() {
31        return result;
32    }
33
34    public void setRequestId(String requestId) {
35        this.requestId = requestId;
36    }
37
38    public void setError(String error) {
39        this.error = error;
40    }
41
42    public void setResult(Object result) {
43        this.result = result;
44    }
45    public boolean isError() {
46        return error != null;
47    }
48}
49
50
51

4.2、rpc客户端具体实现:

4.2.1:总体概述

由于rpc客户端代码稍微多一点,我们先把包目录结构给出,这样阅读的时候都比较清晰。

由上往下简单介绍一下:
AbstractNettyRpcConnectManager 连接管理对象,我们通过zookeeper分布式服务发现。连获得远程服务器信息。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
1package com.twjitm.core.common.service.rpc.client;
2
3import com.twjitm.core.common.config.global.NettyGameServiceConfig;
4import com.twjitm.core.common.config.global.NettyGameServiceConfigService;
5import com.twjitm.core.common.service.rpc.network.NettyRpcClient;
6import com.twjitm.core.common.service.rpc.server.NettyRpcNodeInfo;
7import com.twjitm.core.common.service.rpc.server.NettySdServer;
8import com.twjitm.core.common.zookeeper.NettyZookeeperNodeInfo;
9import com.twjitm.core.spring.SpringServiceManager;
10import com.twjitm.core.utils.logs.LoggerUtils;
11import org.apache.log4j.Logger;
12
13import java.util.ArrayList;
14import java.util.HashMap;
15import java.util.List;
16import java.util.Map;
17import java.util.concurrent.ArrayBlockingQueue;
18import java.util.concurrent.ThreadPoolExecutor;
19import java.util.concurrent.TimeUnit;
20import java.util.concurrent.atomic.AtomicInteger;
21import java.util.concurrent.locks.ReentrantLock;
22
23/**
24 * @author twjitm - [Created on 2018-08-20 10:57]
25 * @jdk java version "1.8.0_77"
26 * rpc 抽象连接管理器
27 */
28public abstract class AbstractNettyRpcConnectManager {
29    private Logger logger = LoggerUtils.getLogger(AbstractNettyRpcConnectManager.class);
30
31    private ThreadPoolExecutor threadPoolExecutor;
32
33    private ReentrantLock lock = new ReentrantLock();
34    private Map<Integer, NettyRpcClient> serverNodes = new HashMap<>();
35
36    private AtomicInteger roundRobin = new AtomicInteger();
37
38
39    public void initManager() {
40        NettyGameServiceConfigService config = SpringServiceManager.getSpringLoadService().getNettyGameServiceConfigService();
41        NettyGameServiceConfig serviceConfig = config.getNettyGameServiceConfig();
42        serviceConfig.getAsyncThreadPoolMaxSize();
43        threadPoolExecutor = new ThreadPoolExecutor(serviceConfig.getRpcConnectThreadSize(),
44                serviceConfig.getRpcConnectThreadSize(),
45                600L, TimeUnit.SECONDS,
46                new ArrayBlockingQueue<Runnable>(65536));
47
48    }
49
50    /**
51     * 本服务器自己配置在本地的rpc
52     * @param allServerAddress
53     * @throws InterruptedException
54     */
55    public void initServers(List<NettySdServer> allServerAddress) throws InterruptedException {
56        lock.lock();
57        if (allServerAddress != null) {
58            for (NettySdServer sdServer : allServerAddress) {
59                if (serverNodes.containsKey(sdServer.getServerId())) {
60                    continue;
61                }
62                NettyRpcNodeInfo rpcNodeInfo = new NettyRpcNodeInfo();
63                rpcNodeInfo.setServerId(String.valueOf(sdServer.getServerId()));
64                rpcNodeInfo.setHost(sdServer.getIp());
65                rpcNodeInfo.setPort(String.valueOf(sdServer.getRpcPort()));
66                NettyRpcClient rpcClient = new NettyRpcClient(rpcNodeInfo, threadPoolExecutor);
67                serverNodes.put(sdServer.getServerId(), rpcClient);
68            }
69        }
70        lock.unlock();
71
72    }
73
74    /**
75     * 选择远程rpc
76     * @param serverId
77     * @return
78     */
79    public NettyRpcClient getNettyRpcClientByServerId(int serverId) {
80        if (serverId == 0) {
81            List<NettyRpcClient> handlers = new ArrayList(this.serverNodes.values());
82            int size = handlers.size();
83            int index = (roundRobin.getAndAdd(1) + size) % size;
84            return handlers.get(index);
85        } else {
86            try {
87                NettyRpcClient rpcClient = this.serverNodes.get(serverId);
88                return rpcClient;
89            } catch (Exception e) {
90                logger.error("WAITING FOR AVAILABLE NODE IS INTERRUPTED! ");
91                logger.error(e);
92                throw new RuntimeException("CAN'T CONNECT ANY SERVERS!", e);
93            }
94        }
95    }
96
97    public void stop() {
98        for (NettyRpcClient rpcClient : serverNodes.values()) {
99            rpcClient.close();
100        }
101        if (threadPoolExecutor != null) {
102            threadPoolExecutor.shutdown();
103        }
104    }
105
106
107    /**
108     * 通过zookeeper发现的rpc
109     * @param nettyZookeeperNodeInfoList
110     */
111    public void initNettyZookeeperRpcServers(List<NettyZookeeperNodeInfo> nettyZookeeperNodeInfoList) {
112        //增加同步,当前
113        synchronized (this) {
114            if (nettyZookeeperNodeInfoList != null) {
115                //不能把自己添加到这里面,应为自己已经添加进去了,应该添加别的的服务器信息
116                for (NettyZookeeperNodeInfo zooKeeperNodeInfo : nettyZookeeperNodeInfoList) {
117                    if (serverNodes.containsKey(zooKeeperNodeInfo.getServerId())) {
118                        continue;
119                    }
120                    NettyRpcNodeInfo rpcNodeInfo = new NettyRpcNodeInfo();
121                    rpcNodeInfo.setServerId(zooKeeperNodeInfo.getServerId());
122                    rpcNodeInfo.setHost(zooKeeperNodeInfo.getHost());
123                    rpcNodeInfo.setPort(zooKeeperNodeInfo.getPort());
124                    NettyRpcClient rpcClient = new NettyRpcClient(rpcNodeInfo, threadPoolExecutor);
125                    serverNodes.put(Integer.parseInt(zooKeeperNodeInfo.getServerId()), rpcClient);
126                }
127            }
128        }
129    }
130}
131
132
133

异步回调接口


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
1package com.twjitm.core.common.service.rpc.client;
2
3/**
4 * rpc远程回调接口
5 * @author twjitm - [Created on 2018-08-20 11:46]
6 * @jdk java version "1.8.0_77"
7 *
8 */
9public interface NettyAsyncRPCCallback {
10    void success(Object result);
11
12    void fail(Exception e);
13}
14
15
16

rpc上下文信息持有者。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
1package com.twjitm.core.common.service.rpc.client;
2
3/**
4 * rpc信息上下文持有者
5 *
6 * @author twjitm - [Created on 2018-08-20 12:28]
7 * @jdk java version "1.8.0_77"
8 */
9public class NettyRpcContextHolder {
10    /**
11     * 采用ThreadLocal 模式实现一个线程安全的上下文切换。
12     */
13    private static final ThreadLocal<NettyRpcContextHolderObject> contextHolder = new
14            ThreadLocal<NettyRpcContextHolderObject>();
15
16    public static NettyRpcContextHolderObject getContext() {
17        return (NettyRpcContextHolderObject) contextHolder.get();
18    }
19
20    /**
21     * 通过字符串选择数据源
22     *
23     * @param
24     */
25    public static void setContextHolder(NettyRpcContextHolderObject rpcContextHolderObject) {
26        contextHolder.set(rpcContextHolderObject);
27    }
28}
29
30
31

rpc上下文持有对象。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
1package com.twjitm.core.common.service.rpc.client;
2
3import com.twjitm.core.common.enums.NettyGameTypeEnum;
4
5/**
6 * rpc上下文持有者对像。主要包含请求信息,服务器信息,请求类型
7 * 在rpc请求之前需要将基本信息保存到此对象中,底层需要获取基本消息,来路由
8 * 到具体的远程服务器上。
9 *
10 * @author twjitm - [Created on 2018-08-20 12:31]
11 * @jdk java version "1.8.0_77"
12 */
13public class NettyRpcContextHolderObject {
14    private NettyGameTypeEnum nettyGameTypeEnum;
15    private int serviceId;
16
17    public NettyRpcContextHolderObject(NettyGameTypeEnum nettyGameTypeEnum, int serviceId) {
18        this.nettyGameTypeEnum = nettyGameTypeEnum;
19        this.serviceId = serviceId;
20    }
21
22    public NettyGameTypeEnum getNettyGameTypeEnum() {
23        return nettyGameTypeEnum;
24    }
25
26    public void setNettyGameTypeEnum(NettyGameTypeEnum nettyGameTypeEnum) {
27        this.nettyGameTypeEnum = nettyGameTypeEnum;
28    }
29
30    public int getServiceId() {
31        return serviceId;
32    }
33
34    public void setServiceId(int serviceId) {
35        this.serviceId = serviceId;
36    }
37}
38
39
40

为什么需要rpc上下文持有对象?
在异步调用中,我们需要一个当前调用环境线程。当前调用发起者需要在异步调用中不会去等待rpc返回消息,需要在回调接口中获得消息,当rpc消息返回来是,需要告诉调用者。这个时候就需要用到当前调用的上下文了。

异步接口对象:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
1package com.twjitm.core.common.service.rpc.client;
2
3import com.twjitm.core.common.config.global.NettyGameServiceConfigService;
4import com.twjitm.core.common.netstack.entity.rpc.NettyRpcRequestMessage;
5import com.twjitm.core.common.netstack.entity.rpc.NettyRpcResponseMessage;
6import com.twjitm.core.common.service.rpc.service.NettyRpcProxyService;
7import com.twjitm.core.spring.SpringServiceManager;
8import com.twjitm.core.utils.logs.LoggerUtils;
9import org.apache.log4j.Logger;
10
11import java.util.ArrayList;
12import java.util.List;
13import java.util.concurrent.ExecutionException;
14import java.util.concurrent.Future;
15import java.util.concurrent.TimeUnit;
16import java.util.concurrent.TimeoutException;
17import java.util.concurrent.locks.AbstractQueuedSynchronizer;
18import java.util.concurrent.locks.ReentrantLock;
19
20/**
21 * 远程调用线程执行返结果对象,本身{@link NettyRpcFuture}继承
22 * 了{@link Future}类,所以执行线程必定会阻塞,所以不能再
23 * 游戏线程使用本对象,即不能再{@link io.netty.channel.EventLoop}所在的线程执行
24 *
25 * @author twjitm- [Created on 2018-08-20 11:38]
26 * @jdk java version "1.8.0_77"
27 */
28public class NettyRpcFuture implements Future<Object> {
29    private Logger logger = LoggerUtils.getLogger(NettyRpcFuture.class);
30    /**
31     * 同步器
32     */
33    private Sync sync;
34    /**
35     * rpc 请求消息
36     */
37    private NettyRpcRequestMessage request;
38    /**
39     * rpc返回消息
40     */
41    private NettyRpcResponseMessage response;
42    /**
43     * 开始时间
44     */
45    private long startTime;
46
47    /**
48     * 回调接口
49     */
50    private List<NettyAsyncRPCCallback> pendingCallbacks = new ArrayList<NettyAsyncRPCCallback>();
51    /**
52     * 结果检测锁
53     */
54    private ReentrantLock lock = new ReentrantLock();
55
56    public NettyRpcFuture(NettyRpcRequestMessage request) {
57        this.sync = new Sync();
58        this.request = request;
59        this.startTime = System.currentTimeMillis();
60    }
61
62    /**
63     * 是否完成 利用同步锁状态{@link AbstractQueuedSynchronizer}
64     * 来检测是否执行完成。
65     *
66     * @return
67     */
68    @Override
69    public boolean isDone() {
70        return sync.isDone();
71    }
72
73    /**
74     * 获取一个返回结果
75     *
76     * @return
77     * @throws InterruptedException
78     * @throws ExecutionException
79     */
80    @Override
81    public Object get() throws InterruptedException, ExecutionException {
82        //阻塞等待,一直等到返回结果到来,处于阻塞状态
83        sync.acquire(-1);
84        if (this.response != null) {
85            return this.response.getResult();
86        } else {
87            return null;
88        }
89    }
90
91    /**
92     * 获取一个是否具有过时效果的返回结果
93     *
94     * @param timeout
95     * @param unit
96     * @return
97     * @throws InterruptedException
98     * @throws ExecutionException
99     * @throws TimeoutException
100     */
101    @Override
102    public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
103        //阻塞到获取锁返回true表示获得了锁,或者被打断抛出异常,或者到超时,返回false表示没有获得锁。
104        boolean success = sync.tryAcquireNanos(-1, unit.toNanos(timeout));
105        if (success) {
106            //获得锁成功,表明已经有结果返回或者还没有提交处理,中断处理了
107            if (this.response != null) {
108                return this.response.getResult();
109            } else {
110                return null;
111            }
112        } else {
113            throw new RuntimeException("TIMEOUT EXCEPTION. REQUEST ID: " + this.request.getRequestId()
114                    + ". REQUEST CLASS NAME: " + this.request.getClassName()
115                    + ". REQUEST METHOD: " + this.request.getMethodName());
116        }
117    }
118
119    /**
120     * 是否能够中断一个rpc请求消息
121     *
122     * @return
123     */
124    @Override
125    public boolean isCancelled() {
126        throw new UnsupportedOperationException();
127    }
128
129    /**
130     * 中断一个rpc请求消息
131     *
132     * @param mayInterruptIfRunning 中断的时候是否正在有运行的任务
133     * @return
134     */
135    @Override
136    public boolean cancel(boolean mayInterruptIfRunning) {
137        throw new UnsupportedOperationException();
138    }
139
140    /**
141     * 收到rpc消息返回。
142     * 收到一个rpc消息返回的时候,首先将消息保存到本地,然后将同步锁释放掉。{@link Sync#release(int)}
143     *
144     * @param response
145     */
146    public void done(NettyRpcResponseMessage response) {
147        this.response = response;
148        sync.release(1);
149        invokeCallbacks();
150        // Threshold
151        long responseTime = System.currentTimeMillis() - startTime;
152        /**
153         * 远程调用返回接口最大时长
154         */
155        long responseTimeThreshold = SpringServiceManager.getSpringLoadService().getNettyGameServiceConfigService().getNettyGameServiceConfig().getRpcTimeOut();
156        if (responseTime > responseTimeThreshold) {
157            logger.warn("SERVICE RESPONSE TIME IS TOO SLOW. REQUEST ID = " + response.getRequestId() + ". RESPONSE TIME = " + responseTime + "ms");
158        }
159    }
160
161    /**
162     * 是否超时,当网络状态比较差或者负载比较高的时候,一条rpc请求消息可能会延迟,
163     * 可以利用延迟策略来决定消息是否重新发送处理。
164     *
165     * @return
166     */
167    public boolean isTimeout() {
168        long responseTime = System.currentTimeMillis() - startTime;
169        NettyGameServiceConfigService gameServerConfigService = SpringServiceManager.getSpringLoadService().getNettyGameServiceConfigService();
170        int timeOut = gameServerConfigService.getNettyGameServiceConfig().getRpcFutureDeleteTimeOut();
171        if (responseTime >= timeOut) {
172            return true;
173        }
174        return false;
175    }
176
177    /**
178     * 调用回调函数。当一个rpc 请求消息有多个回调函数调用的时候
179     * 需要把回调函数接口存放到一个集合中,才用可重入锁{@link ReentrantLock}
180     * 来解决并发带来的问题
181     */
182    private void invokeCallbacks() {
183        lock.lock();
184        try {
185            for (final NettyAsyncRPCCallback callback : pendingCallbacks) {
186                runCallback(callback);
187            }
188        } finally {
189            lock.unlock();
190        }
191    }
192
193    /**
194     * 添加回调函数,添加回调函数即对当前rpc请求返回结果进行
195     * 监听处理
196     *
197     * @param callback
198     * @return
199     */
200    public NettyRpcFuture addCallback(NettyAsyncRPCCallback callback) {
201        lock.lock();
202        try {
203            if (isDone()) {
204                logger.info("远程调用结果已经获取到了。直接回调一个函数");
205                runCallback(callback);
206            } else {
207                logger.info("等待远程执行结果的到来,需要将回调函数放入到队列中");
208                this.pendingCallbacks.add(callback);
209            }
210            //不管怎么样,都要释放锁
211        } finally {
212            lock.unlock();
213        }
214        return this;
215    }
216
217    /**
218     * 运行一个回调。如何获得一个执行结果呢?需要在{@link NettyRpcProxyService#submit(Runnable)}
219     * 的回调函数里面获取返回结果。最后将结果类型是否成功提交给调用线程的{@link NettyAsyncRPCCallback}对象,
220     * {@link NettyAsyncRPCCallback} 对象自己去实现返回成功的业务逻辑和返回失败的业务逻辑
221     *
222     * @param callback
223     */
224    private void runCallback(final NettyAsyncRPCCallback callback) {
225        final NettyRpcResponseMessage res = this.response;
226        NettyRpcProxyService nettyRpcProxyService = SpringServiceManager.getSpringLoadService().getNettyRpcProxyService();
227        nettyRpcProxyService.submit(() -> {
228            if (!res.isError()) {
229                callback.success(res.getResult());
230            } else {
231                callback.fail(new RuntimeException("RESPONSE ERROR", new Throwable(res.getError())));
232            }
233        });
234    }
235
236    /**
237     * 实现异步回调的关键核心
238     */
239    static class Sync extends AbstractQueuedSynchronizer {
240
241        private static final long serialVersionUID = 1L;
242
243        private final int done = 1;
244        private final int pending = 0;
245
246        @Override
247        protected boolean tryAcquire(int acquires) {
248            return getState() == done;
249        }
250
251        /**
252         * CAS操作,保证原子性
253         *
254         * @param releases
255         * @return
256         */
257        @Override
258        protected boolean tryRelease(int releases) {
259            if (getState() == pending) {
260                if (compareAndSetState(pending, done)) {
261                    return true;
262                }
263            }
264            return false;
265        }
266
267        public boolean isDone() {
268            getState();
269            return getState() == done;
270        }
271    }
272}
273
274
275

这个类比较重要,他是异步等待监听远程返回结果并且将结果写入到异步接口中的核心类,采用ASQ模式。等待远程返回。

4.2.2、动态代理在rpc中的应用

首先我们来定义一个代理接口


1
2
3
4
5
6
7
8
9
10
11
12
13
14
1package com.twjitm.core.common.service.rpc.client.proxy;
2
3import com.twjitm.core.common.service.rpc.client.NettyRpcFuture;
4
5/**
6 * @author twjitm - [Created on 2018-08-20 14:45]
7 * @jdk java version "1.8.0_77"
8 */
9public interface INettyAsyncRpcProxy {
10    public NettyRpcFuture call(String funcName, Object... args);
11}
12
13
14

代理对象


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
1package com.twjitm.core.common.service.rpc.client.proxy;
2
3import com.twjitm.core.common.netstack.entity.rpc.NettyRpcRequestMessage;
4import com.twjitm.core.common.service.rpc.client.AbstractNettyRpcConnectManager;
5import com.twjitm.core.common.service.rpc.client.NettyRpcFuture;
6import com.twjitm.core.common.service.rpc.client.NettyRpcContextHolder;
7import com.twjitm.core.common.service.rpc.client.NettyRpcContextHolderObject;
8import com.twjitm.core.common.service.rpc.network.NettyRpcClient;
9import com.twjitm.core.common.service.rpc.service.NettyRpcClientConnectService;
10import com.twjitm.core.spring.SpringServiceManager;
11import com.twjitm.core.utils.logs.LoggerUtils;
12import org.apache.log4j.Logger;
13
14import java.lang.reflect.InvocationHandler;
15import java.lang.reflect.Method;
16import java.util.UUID;
17import java.util.concurrent.TimeUnit;
18
19/**
20 * @author twjitm- [Created on 2018-08-20 12:15]
21 * @jdk java version "1.8.0_77"
22 * 代理对象
23 */
24public class NettyObjectProxy<T> implements InvocationHandler {
25    private Logger logger=LoggerUtils.getLogger(NettyObjectProxy.class);
26    private Class<T> clazz;
27    private int timeOut;
28    public NettyObjectProxy(Class<T> clazz, int timeOut) {
29        this.clazz = clazz;
30        this.timeOut = timeOut;
31    }
32
33    @Override
34    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
35        NettyRpcRequestMessage request = new NettyRpcRequestMessage();
36        request.setRequestId(UUID.randomUUID().toString());
37        request.setClassName(method.getDeclaringClass().getName());
38        request.setMethodName(method.getName());
39        request.setParameterTypes(method.getParameterTypes());
40        request.setParameters(args);
41
42        if(logger.isInfoEnabled()) {
43            logger.debug(method.getName());
44            logger.debug(method.getDeclaringClass().getName());
45            for (int i = 0; i < method.getParameterTypes().length; ++i) {
46                logger.debug(method.getParameterTypes()[i].getName());
47            }
48            for (int i = 0; i < args.length; ++i) {
49                logger.debug(args[i].toString());
50            }
51        }
52
53        NettyRpcContextHolderObject rpcContextHolderObject = NettyRpcContextHolder.getContext();
54        NettyRpcClientConnectService rpcClientConnectService = SpringServiceManager.getSpringLoadService().getNettyRpcClientConnectService();
55        AbstractNettyRpcConnectManager abstractRpcConnectManager = rpcClientConnectService.getNettyRpcConnectManager(rpcContextHolderObject.getNettyGameTypeEnum());
56        NettyRpcClient rpcClient = abstractRpcConnectManager.getNettyRpcClientByServerId(rpcContextHolderObject.getServiceId());
57        NettyRpcFuture rpcFuture = rpcClient.sendRequest(request);
58        if(timeOut > 0){
59            return rpcFuture.get(timeOut, TimeUnit.MILLISECONDS);
60        }
61        return rpcFuture.get();
62    }
63}
64
65
66

异步代理接口对象


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
1package com.twjitm.core.common.service.rpc.client.proxy;
2
3import com.twjitm.core.common.factory.NettyRpcRequestFactory;
4import com.twjitm.core.common.netstack.entity.rpc.NettyRpcRequestMessage;
5import com.twjitm.core.common.service.rpc.client.AbstractNettyRpcConnectManager;
6import com.twjitm.core.common.service.rpc.client.NettyRpcFuture;
7import com.twjitm.core.common.service.rpc.client.NettyRpcContextHolder;
8import com.twjitm.core.common.service.rpc.client.NettyRpcContextHolderObject;
9import com.twjitm.core.common.service.rpc.network.NettyRpcClient;
10import com.twjitm.core.common.service.rpc.service.NettyRpcClientConnectService;
11import com.twjitm.core.common.service.rpc.service.NettyRpcProxyService;
12import com.twjitm.core.spring.SpringServiceManager;
13import org.slf4j.Logger;
14import org.slf4j.LoggerFactory;
15
16/**
17 * <p>
18 * 异步rpc代理,一个目标异步代理对象,创建过程由{@link NettyRpcProxyService#createProxy(Class)}
19 * {@link NettyAsyncRpcProxy}代理类主要实现的功能:
20 * 将所需要的代理方法名称和具体参数通过{@link NettyRpcClient} 传递给远程目标代理服务器。本地方法会立即返回一个
21 * 空{@link NettyRpcFuture} 对象,通过注入回调函数的方式,监听{@link NettyRpcFuture} 的状态变化,
22 * 最后通过注入监听回调函数的方式来实现异步rcp远程服务调用
23 * </p>
24 * <p>
25 *
26 * @author twjitm- [Created on 2018-08-20 14:46]
27 * @jdk java version "1.8.0_77"
28 */
29public class NettyAsyncRpcProxy<T> implements INettyAsyncRpcProxy {
30    private Logger logger=LoggerFactory.getLogger(NettyAsyncRpcProxy.class);
31    /**
32     * <p>
33     * 代理接口类信息:此处的类信息必须是接口的,而不是实现类的。
34     * 应为代理模式采用的动态代理,动态代理也成为接口代理,具体查看
35     *
36     * @see <a href="https://my.oschina.net/u/3296367/blog/1475258 "><h2>动态代理模式详解</h2></a>
37     * </p>
38     */
39    private Class<T> clazz;
40
41    public NettyAsyncRpcProxy(Class<T> clazz) {
42        this.clazz = clazz;
43    }
44
45    @Override
46    public NettyRpcFuture call(String funcName, Object... args) {
47        //获得一个持有对象
48        NettyRpcContextHolderObject rpcContextHolderObject = NettyRpcContextHolder.getContext();
49
50        //联合器服务类
51        NettyRpcClientConnectService rpcClientConnectService =
52                SpringServiceManager.getSpringLoadService().getNettyRpcClientConnectService();
53
54        //更具类型获取一个rpc联合管理器
55        AbstractNettyRpcConnectManager abstractRpcConnectManager
56                = rpcClientConnectService.getNettyRpcConnectManager(
57                rpcContextHolderObject.getNettyGameTypeEnum());
58        //根据服务id,获得一个rpc客户端对象,此rpc client对象持有上下文消息的对象
59        NettyRpcClient rpcClient = abstractRpcConnectManager.getNettyRpcClientByServerId(
60                rpcContextHolderObject.getServiceId());
61        //获得一个rpc请求消息生产工厂
62        NettyRpcRequestFactory rpcRequestFactory =
63                SpringServiceManager.getSpringLoadService().getNettyRpcRequestFactory();
64        //构建一个rpc请求
65        NettyRpcRequestMessage request =
66                rpcRequestFactory.createNettyRpcRequestMessage(this.clazz.getName(), funcName, args);
67        //将消息发送出去
68        NettyRpcFuture rpcFuture = rpcClient.sendRequest(request);
69        logger.info("正在接通远程服务PRC.是否完成:"+rpcFuture.isDone());
70        return rpcFuture;
71    }
72}
73
74
75

代理服务:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
1package com.twjitm.core.common.service.rpc.service;
2
3import com.twjitm.core.common.config.global.GlobalConstants;
4import com.twjitm.core.common.config.global.NettyGameServiceConfigService;
5import com.twjitm.core.common.service.IService;
6import com.twjitm.core.common.service.rpc.client.NettyAsyncRPCCallback;
7import com.twjitm.core.common.service.rpc.client.NettyRpcFuture;
8import com.twjitm.core.common.service.rpc.client.proxy.INettyAsyncRpcProxy;
9import com.twjitm.core.common.service.rpc.client.proxy.NettyAsyncRpcProxy;
10import com.twjitm.core.common.service.rpc.client.proxy.NettyObjectProxy;
11import com.twjitm.core.spring.SpringServiceManager;
12import com.twjitm.threads.thread.NettyThreadNameFactory;
13import org.springframework.stereotype.Service;
14
15import java.lang.reflect.InvocationHandler;
16import java.lang.reflect.Proxy;
17import java.util.concurrent.ArrayBlockingQueue;
18import java.util.concurrent.ThreadPoolExecutor;
19import java.util.concurrent.TimeUnit;
20
21/**
22 * @jdk java version "1.8.0_77"
23 */
24@Service
25public class NettyRpcProxyService implements IService {
26
27    /**
28     * rpc异步消息代理服务执行线程
29     */
30    private static ThreadPoolExecutor threadPoolExecutor;
31
32    /**
33     * 动态代理利用了JDK
34     * API,{@link Proxy#newProxyInstance(ClassLoader, Class[], InvocationHandler)}
35     * 动态地在内存中构建代理对象,从而实现对目标对象的代理功能。动态代理又被称为JDK代理或接理。
36     * <p>
37     * 静态代理与动态代理的区别主要在:
38     * <p>
39     * 静态代理在编译时就已经实现,编译完成后代理类是一个实际的class文件
40     * 动态代理是在运行时动态生成的,即编译完成后没有实际的class文件,而是在运行时动态生成类字节码,并加载到JVM中
41     * 特点: 动态代理对象不需要实现接口,但是要求目标对象必须实现接口,否则不能使用动态代理。
42     *
43     * @param interfaceClass
44     * @param <T>
45     * @return
46     */
47    @SuppressWarnings("unchecked")
48    public <T> T createProxy(Class<T> interfaceClass) {
49        NettyGameServiceConfigService gameServerConfigService = SpringServiceManager.getSpringLoadService().getNettyGameServiceConfigService();
50        int timeOut = gameServerConfigService.getNettyGameServiceConfig().getRpcTimeOut();
51        //创建一个动态代理对象
52        return (T) Proxy.newProxyInstance(
53                interfaceClass.getClassLoader(),
54                new Class<?>[]{interfaceClass},
55                new NettyObjectProxy<>(interfaceClass, timeOut)
56        );
57    }
58
59    /**
60     * 创建一个异步代理对象
61     *
62     * @param interfaceClass
63     * @param <T>
64     * @return
65     */
66    public <T> INettyAsyncRpcProxy createAsync(Class<T> interfaceClass) {
67        return new NettyAsyncRpcProxy<>(interfaceClass);
68    }
69
70
71    @Override
72    public String getId() {
73        return NettyRpcProxyService.class.getSimpleName();
74    }
75
76    @Override
77    public void startup() throws Exception {
78        NettyGameServiceConfigService gameServerConfigService = SpringServiceManager.getSpringLoadService().getNettyGameServiceConfigService();
79        int threadSize = gameServerConfigService.getNettyGameServiceConfig().getRpcSendProxyThreadSize();
80
81        NettyThreadNameFactory factory=new NettyThreadNameFactory(
82                GlobalConstants.Thread.RPC_PROXY_MESSAGE_EXECUTOR,false);
83        threadPoolExecutor = new ThreadPoolExecutor(threadSize, threadSize, 600L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(65536),factory);
84    }
85
86    @Override
87    public void shutdown() throws Exception {
88        threadPoolExecutor.shutdown();
89    }
90
91    /**
92     * 提交一个任务,rpc异步请求采用子线程来处理请求任务,将请求任务放入到请求队列中,任务线程
93     * 从任务队列里取出任务执行。在返回future
94     * 中注入回调函数{@link NettyRpcFuture#addCallback(NettyAsyncRPCCallback)}。
95     * 返回给调用线程获取返回结果。
96     *
97     * @param task
98     */
99    public void submit(Runnable task) {
100        threadPoolExecutor.submit(task);
101    }
102}
103
104
105

4.3、rpc客户端实体


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
1package com.twjitm.core.common.service.rpc.network;
2
3import com.twjitm.core.common.netstack.entity.rpc.NettyRpcRequestMessage;
4import com.twjitm.core.common.netstack.entity.rpc.NettyRpcResponseMessage;
5import com.twjitm.core.common.service.rpc.client.NettyRpcFuture;
6import com.twjitm.core.common.service.rpc.server.NettyRpcNodeInfo;
7import com.twjitm.core.common.service.rpc.service.NettyRPCFutureService;
8import com.twjitm.core.spring.SpringServiceManager;
9import com.twjitm.core.utils.logs.LoggerUtils;
10import io.netty.channel.socket.nio.NioSocketChannel;
11import org.apache.log4j.Logger;
12
13import java.util.concurrent.ExecutorService;
14
15/**
16 * @author twjitm- [Created on 2018-08-20 11:01]
17 * @jdk java version "1.8.0_77"
18 * netty rpc client entity
19 */
20public class NettyRpcClient {
21
22    private Logger logger = LoggerUtils.getLogger(NettyRpcClient.class);
23    private NettyRpcClientConnection rpcClientConnection;
24
25
26    public NettyRpcClient(NettyRpcNodeInfo rpcNodeInfo, ExecutorService threadPool) {
27        rpcClientConnection = new NettyRpcClientConnection(this, rpcNodeInfo, threadPool);
28    }
29
30    /**
31     * 发送消息,最终会调用rpcClientConnection 的writeRequest方法。其实内部实现
32     * 则利用netty 的channel对象发送数据
33     *
34     * @param request
35     * @return
36     */
37    public NettyRpcFuture sendRequest(NettyRpcRequestMessage request) {
38        NettyRpcFuture rpcFuture = new NettyRpcFuture(request);
39        NettyRPCFutureService rpcFutureService = SpringServiceManager.getSpringLoadService().getNettyRPCFutureService();
40        rpcFutureService.addNettyRPCFuture(request.getRequestId(), rpcFuture);
41        //发送数据到远程服务器上
42        rpcClientConnection.writeRequest(request);
43        return rpcFuture;
44    }
45
46    public NioSocketChannel getChannel() {
47        return rpcClientConnection.getChannel();
48    }
49
50    public void close() {
51        logger.info("RPC CLIENT CLOSE");
52        if (rpcClientConnection != null) {
53            rpcClientConnection.close();
54        }
55    }
56
57    /**
58     * 本地调用对象处理rpc返回消息
59     *
60     * @param rpcResponse
61     */
62    public void handleRpcResponse(NettyRpcResponseMessage rpcResponse) {
63        String requestId = rpcResponse.getRequestId();
64        NettyRPCFutureService rpcFutureService = SpringServiceManager.getSpringLoadService().
65                getNettyRPCFutureService();
66        NettyRpcFuture rpcFuture = rpcFutureService.getNettyRPCFuture(requestId);
67        if (rpcFuture != null) {
68            boolean removeFlag = rpcFutureService.removeNettyRPCFuture(requestId, rpcFuture);
69            if (removeFlag) {
70                rpcFuture.done(rpcResponse);
71            } else {
72                //表示服务器已经处理过了,可能已经超时了
73                logger.error("RPCFUTURE IS REMOVE " + requestId);
74            }
75        }
76    }
77
78    public NettyRpcClientConnection getRpcClientConnection() {
79        return rpcClientConnection;
80    }
81
82}
83
84
85

rpc连接对象:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
1package com.twjitm.core.common.service.rpc.network;
2
3import com.twjitm.core.common.netstack.entity.rpc.NettyRpcRequestMessage;
4import com.twjitm.core.common.service.rpc.server.NettyRpcNodeInfo;
5import com.twjitm.core.utils.logs.LoggerUtils;
6import io.netty.channel.EventLoopGroup;
7import io.netty.channel.nio.NioEventLoopGroup;
8import io.netty.channel.socket.nio.NioSocketChannel;
9import org.apache.log4j.Logger;
10
11import java.net.InetSocketAddress;
12import java.util.concurrent.ExecutorService;
13import java.util.concurrent.Future;
14import java.util.concurrent.locks.ReentrantLock;
15
16/**
17 * rpc消息连接实体,主要是讲封装好的请求包,通过连接对象,启动一个netty本地服务器,
18 * 然后通过服务器连接到远程服务器。
19 *
20 * @author twjtim- [Created on 2018-08-20 11:01]
21
22 * @jdk java version "1.8.0_77"
23 * rpc 连接实体对象
24 */
25public class NettyRpcClientConnection {
26    private Logger logger = LoggerUtils.getLogger(NettyRpcClientConnection.class);
27
28    private NioSocketChannel channel;
29
30    private ReentrantLock statusLock;
31    /**
32     * 重连线程池工具
33     */
34    private ExecutorService threadPool;
35    EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);
36    /**
37     * 是否启用重连
38     */
39    private volatile boolean reConnectOn = true;
40
41    private NettyRpcClient nettyRpcClient;
42    private NettyRpcNodeInfo nettyRpcNodeInfo;
43
44    public NettyRpcClientConnection(NettyRpcClient nettyRpcClient, NettyRpcNodeInfo nettyRpcNodeInfo, ExecutorService threadPool) {
45        if (threadPool == null) {
46            throw new IllegalArgumentException("ALL PARAMETERS MUST ACCURATE.");
47        }
48        this.nettyRpcClient = nettyRpcClient;
49        this.nettyRpcNodeInfo = nettyRpcNodeInfo;
50        this.threadPool = threadPool;
51        this.statusLock = new ReentrantLock();
52    }
53
54    /**
55     * 创建打开连接,此方法很重要
56     * 所谓打开连接,其实相当于启动netty客户端程序一样
57     * 将启动程序封装到NettyRpcServerConnectTask类中,可以看到
58     * 当提交一个NettyRpcServerConnectTask任务时候,利用java
59     * 提供的Future类来提交一个任务,我们可以看到这个submit是一个
60     * 同步阻塞试方法。
61     *
62     * @return
63     */
64    public boolean open() {
65        // 判断是否已经连接
66        if (isConnected()) {
67            throw new IllegalStateException("ALREADY CONNECTED. DISCONNECT FIRST.");
68        }
69        // 创建Socket连接
70        try {
71            InetSocketAddress remotePeer = new InetSocketAddress(nettyRpcNodeInfo.getHost(), nettyRpcNodeInfo.getIntPort());
72            //连接结束
73            logger.info("CONNECT TO REMOTE SERVER. REMOTE PEER = " + remotePeer);
74            Future future = threadPool.submit(new NettyRpcServerConnectTask(nettyRpcNodeInfo, eventLoopGroup, nettyRpcClient));
75            future.get();
76            if (isConnected()) {
77                return false;
78            }
79            if (logger.isInfoEnabled()) {
80                logger.info("CONNECT SUCCESS.");
81            }
82            return true;
83        } catch (Exception e) {
84            e.printStackTrace();
85            return false;
86        }
87
88    }
89
90    public boolean isConnected() {
91        if (channel == null) {
92            return false;
93        }
94        return channel.isActive();
95    }
96
97
98    /**
99     * 发送一条消息
100     *
101     * @return
102     */
103    public boolean writeRequest(NettyRpcRequestMessage rpcRequestMessage) {
104        if (!isConnected() && reConnectOn) {
105            tryReConnect();
106            if (!isConnected()) {
107                return false;
108            }
109        }
110        // 发送消息
111        if (channel != null) {
112            if (logger.isDebugEnabled()) {
113                logger.debug("【SEND】" + rpcRequestMessage);
114            }
115            channel.writeAndFlush(rpcRequestMessage);
116            return true;
117        }
118        return false;
119    }
120
121    public void tryReConnect() {
122        statusLock.lock();
123        try {
124            if (!isConnected()) {
125                try {
126                    Future<?> future = threadPool.submit(new ReConnect());
127                    future.get();
128                } catch (Exception e) {
129                    logger.error("NETTY RPC CLIENT CONNECTION TRY RECONNECT IS ERROR");
130                    logger.error(e);
131                }
132            }
133        } catch (Exception e) {
134        } finally {
135            statusLock.unlock();
136        }
137    }
138
139    /**
140     * 重连线程内部类
141     *
142     * @author Fancy
143     */
144    private class ReConnect implements Runnable {
145
146        @Override
147        public void run() {
148            try {
149                open();
150            } catch (Exception e) {
151                if (logger.isDebugEnabled()) {
152                    logger.error("RESTART CONNECTION ERROR.");
153                }
154            } finally {
155                // 设置为允许重连
156//                reConnect = false;
157            }
158        }
159    }
160
161    /**
162     * 启动自动重连
163     */
164    public void setReconnectOn() {
165        this.reConnectOn = true;
166    }
167
168    /**
169     * 关闭自动重连
170     */
171    public void setReconnectOff() {
172        this.reConnectOn = false;
173    }
174
175    public NioSocketChannel getChannel() {
176        return channel;
177    }
178
179    public void setChannel(NioSocketChannel channel) {
180        this.channel = channel;
181    }
182
183    public void close() {
184        if (channel != null) {
185            channel.close();
186        }
187        eventLoopGroup.shutdownGracefully();
188    }
189
190}
191
192
193

rpc 连接任务:每一个rpc 请求相当于一个网络请求,利用netty 通过的网络层操作,实现rpc网络请求。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
1package com.twjitm.core.common.service.rpc.network;
2
3import com.twjitm.core.common.handler.rpc.NettyRpcClientServerHandler;
4import com.twjitm.core.common.service.rpc.server.NettyRpcNodeInfo;
5import com.twjitm.core.initalizer.NettyRpcClientMessageServerInitializer;
6import com.twjitm.core.utils.logs.LoggerUtils;
7import io.netty.bootstrap.Bootstrap;
8import io.netty.channel.ChannelFuture;
9import io.netty.channel.ChannelFutureListener;
10import io.netty.channel.ChannelOption;
11import io.netty.channel.EventLoopGroup;
12import io.netty.channel.socket.nio.NioSocketChannel;
13import io.netty.handler.logging.LogLevel;
14import io.netty.handler.logging.LoggingHandler;
15import org.apache.log4j.Logger;
16
17import java.net.InetSocketAddress;
18
19/**
20 * @author twjtim- [Created on 2018-08-20 11:11]
21 * @jdk java version "1.8.0_77"
22 * 连接到服务器
23 */
24public class NettyRpcServerConnectTask implements Runnable {
25private Logger logger=LoggerUtils.getLogger(NettyRpcServerConnectTask.class);
26
27    /**
28     * 连接地址
29     */
30    private InetSocketAddress remotePeer;
31
32    private EventLoopGroup eventLoopGroup;
33    private NettyRpcClient nettyRpcClient;
34
35    public NettyRpcServerConnectTask(
36            NettyRpcNodeInfo nettyRpcNodeInfo,
37            EventLoopGroup eventLoopGroup,
38            NettyRpcClient nettyRpcClient) {
39        this.eventLoopGroup = eventLoopGroup;
40        this.nettyRpcClient = nettyRpcClient;
41        this.remotePeer =  new InetSocketAddress(nettyRpcNodeInfo.getHost(), nettyRpcNodeInfo.getIntPort());
42
43
44    }
45
46    @Override
47    public void run() {
48        Bootstrap b = new Bootstrap();
49        b.group(eventLoopGroup)
50                .channel(NioSocketChannel.class)
51                .option(ChannelOption.TCP_NODELAY, true)
52                .handler(new LoggingHandler(LogLevel.DEBUG))
53                .handler(new NettyRpcClientMessageServerInitializer());
54        ChannelFuture channelFuture = b.connect(remotePeer);
55        channelFuture.addListener((ChannelFutureListener) channelFuture1 -> {
56            if (channelFuture1.isSuccess()) {
57                logger.info("CONNECT TO REMOTE SERVER. REMOTE PEER = " + remotePeer + " SUCCESS");
58                NettyRpcClientServerHandler handler = channelFuture1.channel().pipeline().get(NettyRpcClientServerHandler.class);
59                handler.setNettyRpcClient(nettyRpcClient);
60                nettyRpcClient.getRpcClientConnection().setChannel((NioSocketChannel) channelFuture1.channel());
61            }else{
62                logger.debug("CONNECT TO REMOTE SERVER. REMOTE PEER = " + remotePeer + "FAIL");
63            }
64        });
65        try {
66            channelFuture.await();
67        } catch (InterruptedException e) {
68            logger.error(e.toString(), e);
69        }
70
71        //连接结束
72        logger.debug("CONNECT TO REMOTE SERVER. REMOTE PEER = " + remotePeer);
73
74    }
75}
76
77
78

4.5、rpc路由和服务发现:

首先我们需要开启rpc路由服务器,


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
1package com.twjitm.core.common.service.rpc.service;
2
3import com.twjitm.core.common.config.global.NettyGameServiceConfig;
4import com.twjitm.core.common.config.global.NettyGameServiceConfigService;
5import com.twjitm.core.common.factory.thread.NettyRpcHandlerThreadPoolFactory;
6import com.twjitm.core.common.service.IService;
7import com.twjitm.core.spring.SpringServiceManager;
8import com.twjitm.threads.utils.ExecutorUtil;
9import org.springframework.stereotype.Service;
10
11/**
12 * Created by IntelliJ IDEA.
13 * User: 文江 Date: 2018/8/19  Time: 10:29
14 * https://blog.csdn.net/baidu_23086307
15 */
16@Service
17public class NettyRemoteRpcHandlerService implements IService {
18
19private NettyRpcHandlerThreadPoolFactory rpcHandlerThreadPool;
20
21    @Override
22    public String getId() {
23        return "NettyRemoteRpcHandlerService";
24    }
25
26    @Override
27    public void startup() throws Exception {
28        NettyGameServiceConfigService config = SpringServiceManager.getSpringLoadService().getNettyGameServiceConfigService();
29        NettyGameServiceConfig gameConfig = config.getNettyGameServiceConfig();
30        if (gameConfig.isRpcOpen()) {
31            //开启服务
32            rpcHandlerThreadPool=  SpringServiceManager.getSpringLoadService().
33                    getNettyRpcHandlerThreadPoolFactory();
34            rpcHandlerThreadPool.createExecutor(
35                    gameConfig.getRpcConnectThreadSize(),
36                    gameConfig.getRpcSendProxyThreadSize());
37        }
38    }
39
40    @Override
41    public void shutdown() throws Exception {
42        ExecutorUtil.shutdownAndAwaitTermination(SpringServiceManager.getSpringLoadService().getNettyRpcHandlerThreadPoolFactory().getExecutor());
43    }
44
45    public void submit(Runnable runnable) {
46        if(rpcHandlerThreadPool!=null){
47            rpcHandlerThreadPool.getExecutor().submit(runnable);
48        }
49    }
50}
51
52
53

任务调度路由服务器


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
1package com.twjitm.core.common.service.rpc.service;
2
3import com.twjitm.core.common.config.global.GlobalConstants;
4import com.twjitm.core.common.config.global.NettyGameRpcConfig;
5import com.twjitm.core.common.config.global.NettyGameServiceConfig;
6import com.twjitm.core.common.config.rpc.RpcServerConfig;
7import com.twjitm.core.common.service.IService;
8import com.twjitm.core.common.service.rpc.client.NettyRpcFuture;
9import com.twjitm.core.spring.SpringServiceManager;
10import com.twjitm.threads.common.executor.NettyOrderThreadPoolExecutor;
11import com.twjitm.threads.thread.NettyThreadNameFactory;
12import com.twjitm.threads.utils.ExecutorUtil;
13import org.springframework.stereotype.Service;
14
15import java.util.Map;
16import java.util.Set;
17import java.util.concurrent.*;
18
19/**
20 * @author EGLS0807 - [Created on 2018-08-20 11:51]
21 * @company http://www.g2us.com/
22 * @jdk java version "1.8.0_77"
23 */
24@Service
25public class NettyRPCFutureService implements IService {
26    /**
27     * 定时任务调度器
28     */
29    private ScheduledExecutorService executorService;
30
31    private ConcurrentHashMap<String, NettyRpcFuture> pendingRPC = new ConcurrentHashMap<>();
32    @Override
33    public String getId() {
34        return NettyRPCFutureService.class.getSimpleName();
35    }
36
37    @Override
38    public void startup() throws Exception {
39        NettyThreadNameFactory nettyThreadNameFactory = new
40                NettyThreadNameFactory(GlobalConstants.Thread.DETECT_RPC_PEND_ING);
41
42        //TODO 优化
43        NettyGameServiceConfig gameServiceConfig = SpringServiceManager.getSpringLoadService().getNettyGameServiceConfigService().getNettyGameServiceConfig();
44        executorService = Executors.newScheduledThreadPool(gameServiceConfig.getRpcConnectThreadSize(), nettyThreadNameFactory);
45        executorService.scheduleAtFixedRate(() -> {
46            ConcurrentHashMap<String, NettyRpcFuture> pendingRPC = getPendingRPC();
47            Set<Map.Entry<String, NettyRpcFuture>> entrySet = pendingRPC.entrySet();
48            for (Map.Entry<String, NettyRpcFuture> entry : entrySet) {
49                NettyRpcFuture rpcFuture = entry.getValue();
50                if(rpcFuture.isTimeout()){
51                    String requestId = entry.getKey();
52                    boolean removeFlag = removeNettyRPCFuture(requestId, rpcFuture);
53                    if(removeFlag) {
54//                            rpcFuture.done(rpcResponse);
55                    }
56                }
57            }
58        }, 1, 1,TimeUnit.MINUTES);
59
60    }
61
62    private ConcurrentHashMap<String,NettyRpcFuture> getPendingRPC() {
63        return pendingRPC;
64    }
65    @Override
66    public void shutdown() throws Exception {
67        ExecutorUtil.shutdownAndAwaitTermination(executorService, 60L, TimeUnit.MILLISECONDS);
68
69    }
70
71    public void addNettyRPCFuture(String requestId, NettyRpcFuture rpcFuture) {
72        pendingRPC.put(requestId, rpcFuture);
73    }
74
75    public boolean removeNettyRPCFuture(String requestId, NettyRpcFuture rpcFuture) {
76         return pendingRPC.remove(requestId, rpcFuture);
77    }
78
79    public NettyRpcFuture getNettyRPCFuture(String requestId) {
80        return pendingRPC.get(requestId);
81    }
82}
83
84
85

5、测试


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
1package com.twjitm.rpc;
2
3import com.twjitm.TestSpring;
4import com.twjitm.core.common.enums.NettyGameTypeEnum;
5import com.twjitm.core.common.service.rpc.client.NettyRpcContextHolder;
6import com.twjitm.core.common.service.rpc.client.NettyRpcContextHolderObject;
7import com.twjitm.core.common.service.rpc.service.NettyRpcProxyService;
8import com.twjitm.core.service.rpc.service.IHelloWorld;
9import com.twjitm.core.spring.SpringServiceManager;
10import org.slf4j.Logger;
11import org.slf4j.LoggerFactory;
12
13/**
14 * @author EGLS0807 - [Created on 2018-08-20 15:16]
15 * @company http://www.g2us.com/
16 * @jdk java version "1.8.0_77"
17 *
18 * 同步阻塞调用
19 */
20public class HelloWorldServiceTest {
21    Logger logger=LoggerFactory.getLogger(HelloWorldServiceTest.class);
22    private NettyRpcProxyService nettyRpcProxyService;
23
24    public static void main(String[] args) {
25        TestSpring.initSpring();
26        HelloWorldServiceTest helloServiceTest = new HelloWorldServiceTest();
27        helloServiceTest.init();
28        helloServiceTest.helloTest1();
29        helloServiceTest.setTear();
30    }
31
32
33
34    private void init() {
35        nettyRpcProxyService=SpringServiceManager.getSpringLoadService().getNettyRpcProxyService();
36
37    }
38
39    private void helloTest1() {
40        IHelloWorld helloWorld = nettyRpcProxyService.createProxy(IHelloWorld.class);
41        int serverId=9001;
42        NettyRpcContextHolderObject rpcContextHolderObject =
43                new NettyRpcContextHolderObject(NettyGameTypeEnum.WORLD, serverId);
44        NettyRpcContextHolder.setContextHolder(rpcContextHolderObject);
45        String result = helloWorld.getHelloWorld(5);
46        logger.info(result);
47    }
48    public void setTear(){
49        if (nettyRpcProxyService != null) {
50            try {
51                nettyRpcProxyService.shutdown();
52            } catch (Exception e) {
53                e.printStackTrace();
54            }
55        }
56    }
57}
58
59
60

由于代码比较多,篇幅受限,源码开源道GitHub,

https://github.com/twjitm/twjitm-core

给TA打赏
共{{data.count}}人
人已打赏
安全网络

CDN安全市场到2022年价值76.3亿美元

2018-2-1 18:02:50

病毒疫情

点点滴滴暖心头——志愿者眼中的援意中国专家组

2020-4-9 8:16:00

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索