基于Netty和SpringBoot实现一个轻量级RPC框架-Client端请求响应同步化处理

释放双眼,带上耳机,听听看~!

前提

前置文章:

  • 《基于Netty和SpringBoot实现一个轻量级RPC框架-协议篇》
  • 《基于Netty和SpringBoot实现一个轻量级RPC框架-Server篇》
  • 《基于Netty和SpringBoot实现一个轻量级RPC框架-Client篇》

前一篇文章简单介绍了通过动态代理完成了Client端契约接口调用转换为发送RPC协议请求的功能。这篇文章主要解决一个遗留的技术难题:请求-响应同步化处理。

需要的依赖如下:

  • JDK1.8+
  • Netty:4.1.44.Final
  • SpringBoot:2.2.2.RELEASE

简单分析Netty请求-响应的处理流程

图中已经忽略了编码解码器和其他入站出站处理器,不同颜色的线程代表完全不相同的线程,不同线程之间的处理逻辑是完全异步,也就是Netty IO线程(n-l-g-1)接收到Server端的消息并且解析完成的时候,用户调用线程(u-t-1)无法感知到解析完毕的消息包,那么这里要做的事情就是让用户调用线程(u-t-1)获取到Netty IO线程(n-l-g-1)接收并且解析完成的消息包。

这里可以用一个简单的例子来说明模拟Client端调用线程等待Netty IO线程的处理结果再同步返回的过程。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
1@Slf4j
2public class NettyThreadSyncTest {
3
4    @ToString
5    private static class ResponseFuture {
6
7        private final long beginTimestamp = System.currentTimeMillis();
8        @Getter
9        private final long timeoutMilliseconds;
10        @Getter
11        private final String requestId;
12        @Setter
13        @Getter
14        private volatile boolean sendRequestSucceed = false;
15        @Setter
16        @Getter
17        private volatile Throwable cause;
18        @Getter
19        private volatile Object response;
20
21        private final CountDownLatch latch = new CountDownLatch(1);
22
23        public ResponseFuture(String requestId, long timeoutMilliseconds) {
24            this.requestId = requestId;
25            this.timeoutMilliseconds = timeoutMilliseconds;
26        }
27
28        public boolean timeout() {
29            return System.currentTimeMillis() - beginTimestamp > timeoutMilliseconds;
30        }
31
32        public Object waitResponse(final long timeoutMilliseconds) throws InterruptedException {
33            latch.await(timeoutMilliseconds, TimeUnit.MILLISECONDS);
34            return response;
35        }
36
37        public void putResponse(Object response) throws InterruptedException {
38            this.response = response;
39            latch.countDown();
40        }
41    }
42
43    static ExecutorService REQUEST_THREAD;
44    static ExecutorService NETTY_IO_THREAD;
45    static Callable<Object> REQUEST_TASK;
46    static Runnable RESPONSE_TASK;
47
48    static String processBusiness(String name) {
49        return String.format("%s say hello!", name);
50    }
51
52    private static final Map<String /* request id */, ResponseFuture> RESPONSE_FUTURE_TABLE = Maps.newConcurrentMap();
53
54    @BeforeClass
55    public static void beforeClass() throws Exception {
56        String requestId = UUID.randomUUID().toString();
57        String requestContent = "throwable";
58        REQUEST_TASK = () -> {
59            try {
60                // 3秒没有得到响应认为超时
61                ResponseFuture responseFuture = new ResponseFuture(requestId, 3000);
62                RESPONSE_FUTURE_TABLE.put(requestId, responseFuture);
63                // 这里忽略发送请求的操作,只打印日志和模拟耗时1秒
64                Thread.sleep(1000);
65                log.info("发送请求成功,请求ID:{},请求内容:{}", requestId, requestContent);
66                // 更新标记属性
67                responseFuture.setSendRequestSucceed(true);
68                // 剩余2秒等待时间 - 这里只是粗略计算
69                return responseFuture.waitResponse(3000 - 1000);
70            } catch (Exception e) {
71                log.info("发送请求失败,请求ID:{},请求内容:{}", requestId, requestContent);
72                throw new RuntimeException(e);
73            }
74        };
75        RESPONSE_TASK = () -> {
76            String responseContent = processBusiness(requestContent);
77            try {
78                ResponseFuture responseFuture = RESPONSE_FUTURE_TABLE.get(requestId);
79                if (null != responseFuture) {
80                    log.warn("处理响应成功,请求ID:{},响应内容:{}", requestId, responseContent);
81                    responseFuture.putResponse(responseContent);
82                } else {
83                    log.warn("请求ID[{}]对应的ResponseFuture不存在,忽略处理", requestId);
84                }
85            } catch (Exception e) {
86                log.info("处理响应失败,请求ID:{},响应内容:{}", requestId, responseContent);
87                throw new RuntimeException(e);
88            }
89        };
90        REQUEST_THREAD = Executors.newSingleThreadExecutor(runnable -> {
91            Thread thread = new Thread(runnable, "REQUEST_THREAD");
92            thread.setDaemon(true);
93            return thread;
94        });
95        NETTY_IO_THREAD = Executors.newSingleThreadExecutor(runnable -> {
96            Thread thread = new Thread(runnable, "NETTY_IO_THREAD");
97            thread.setDaemon(true);
98            return thread;
99        });
100    }
101
102    @Test
103    public void testProcessSync() throws Exception {
104        log.info("异步提交请求处理任务......");
105        Future<Object> future = REQUEST_THREAD.submit(REQUEST_TASK);
106        // 模拟请求耗时
107        Thread.sleep(1500);
108        log.info("异步提交响应处理任务......");
109        NETTY_IO_THREAD.execute(RESPONSE_TASK);
110        // 这里可以设置超时
111        log.info("同步获取请求结果:{}", future.get());
112        Thread.sleep(Long.MAX_VALUE);
113    }
114}
115

执行testProcessSync()方法,控制台输出如下:


1
2
3
4
5
6
12020-01-18 13:17:07 [main] INFO  c.t.client.NettyThreadSyncTest - 异步提交请求处理任务......
22020-01-18 13:17:08 [REQUEST_THREAD] INFO  c.t.client.NettyThreadSyncTest - 发送请求成功,请求ID:71f47e27-c17c-458d-b271-4e74fad33a7b,请求内容:throwable
32020-01-18 13:17:09 [main] INFO  c.t.client.NettyThreadSyncTest - 异步提交响应处理任务......
42020-01-18 13:17:09 [NETTY_IO_THREAD] WARN  c.t.client.NettyThreadSyncTest - 处理响应成功,请求ID:71f47e27-c17c-458d-b271-4e74fad33a7b,响应内容:throwable say hello!
52020-01-18 13:17:09 [main] INFO  c.t.client.NettyThreadSyncTest - 同步获取请求结果:throwable say hello!
6

上面这个例子里面的线程同步处理主要参考主流的Netty框架客户端部分的实现逻辑:RocketMQ(具体是NettyRemotingClient类)以及Redisson(具体是RedisExecutor类),它们就是用这种方式使得异步线程处理转化为同步处理。

Client端请求响应同步化处理

按照前面的例子,首先新增一个ResponseFuture用于承载已发送但未响应的请求:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
1@ToString
2public class ResponseFuture {
3
4    private final long beginTimestamp = System.currentTimeMillis();
5    @Getter
6    private final long timeoutMilliseconds;
7    @Getter
8    private final String requestId;
9    @Setter
10    @Getter
11    private volatile boolean sendRequestSucceed = false;
12    @Setter
13    @Getter
14    private volatile Throwable cause;
15    @Getter
16    private volatile ResponseMessagePacket response;
17
18    private final CountDownLatch latch = new CountDownLatch(1);
19
20    public ResponseFuture(String requestId, long timeoutMilliseconds) {
21        this.requestId = requestId;
22        this.timeoutMilliseconds = timeoutMilliseconds;
23    }
24
25    public boolean timeout() {
26        return System.currentTimeMillis() - beginTimestamp > timeoutMilliseconds;
27    }
28
29    public ResponseMessagePacket waitResponse(final long timeoutMilliseconds) throws InterruptedException {
30        latch.await(timeoutMilliseconds, TimeUnit.MILLISECONDS);
31        return response;
32    }
33
34    public void putResponse(ResponseMessagePacket response) throws InterruptedException {
35        this.response = response;
36        latch.countDown();
37    }
38}
39

接着需要新增一个HashMap去缓存这些返送成功但是未得到响应处理的ResponseFuture:


1
2
1Map<String /* request id */, ResponseFuture> RESPONSE_FUTURE_TABLE = Maps.newConcurrentMap();
2

这里的KEY选用requestId,而requestId之前已经定义为UUID,确保每个请求不会重复。为了简单起见,目前所有的逻辑都编写在契约代理工厂ContractProxyFactory,添加下面的功能:

  • 添加一个同步发送方法sendRequestSync()处理消息包的发送和同步响应,RequestMessagePacket转换为调用代理目标方法返回值类型的逻辑暂时也编写在此方法中。
  • 添加一个核心线程数量为逻辑核心数量 * 2的线程池用于处理请求。
  • 添加一个单线程的调度线程池用于定时清理那些过期的ResponseFuture,清理方法为scanResponseFutureTable()。

修改后的ContractProxyFactory如下:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
1@Slf4j
2public class ContractProxyFactory {
3
4    private static final RequestArgumentExtractor EXTRACTOR = new DefaultRequestArgumentExtractor();
5    private static final ConcurrentMap<Class<?>, Object> CACHE = Maps.newConcurrentMap();
6    static final ConcurrentMap<String /* request id */, ResponseFuture> RESPONSE_FUTURE_TABLE = Maps.newConcurrentMap();
7    // 定义请求的最大超时时间为3秒
8    private static final long REQUEST_TIMEOUT_MS = 3000;
9    private static final ExecutorService EXECUTOR;
10    private static final ScheduledExecutorService CLIENT_HOUSE_KEEPER;
11    private static final Serializer SERIALIZER = FastJsonSerializer.X;
12
13
14    @SuppressWarnings("unchecked")
15    public static <T> T ofProxy(Class<T> interfaceKlass) {
16        // 缓存契约接口的代理类实例
17        return (T) CACHE.computeIfAbsent(interfaceKlass, x ->
18                Proxy.newProxyInstance(interfaceKlass.getClassLoader(), new Class[]{interfaceKlass}, (target, method, args) -> {
19                    RequestArgumentExtractInput input = new RequestArgumentExtractInput();
20                    input.setInterfaceKlass(interfaceKlass);
21                    input.setMethod(method);
22                    RequestArgumentExtractOutput output = EXTRACTOR.extract(input);
23                    // 封装请求参数
24                    RequestMessagePacket packet = new RequestMessagePacket();
25                    packet.setMagicNumber(ProtocolConstant.MAGIC_NUMBER);
26                    packet.setVersion(ProtocolConstant.VERSION);
27                    packet.setSerialNumber(SerialNumberUtils.X.generateSerialNumber());
28                    packet.setMessageType(MessageType.REQUEST);
29                    packet.setInterfaceName(output.getInterfaceName());
30                    packet.setMethodName(output.getMethodName());
31                    packet.setMethodArgumentSignatures(output.getMethodArgumentSignatures().toArray(new String[0]));
32                    packet.setMethodArguments(args);
33                    Channel channel = ClientChannelHolder.CHANNEL_REFERENCE.get();
34                    return sendRequestSync(channel, packet, method.getReturnType());
35                }));
36    }
37
38    /**
39     * 同步发送请求
40     *
41     * @param channel channel
42     * @param packet  packet
43     * @return Object
44     */
45    static Object sendRequestSync(Channel channel, RequestMessagePacket packet, Class<?> returnType) {
46        long beginTimestamp = System.currentTimeMillis();
47        ResponseFuture responseFuture = new ResponseFuture(packet.getSerialNumber(), REQUEST_TIMEOUT_MS);
48        RESPONSE_FUTURE_TABLE.put(packet.getSerialNumber(), responseFuture);
49        try {
50            // 获取到承载响应Packet的Future
51            Future<ResponseMessagePacket> packetFuture = EXECUTOR.submit(() -> {
52                channel.writeAndFlush(packet).addListener((ChannelFutureListener)
53                        future -> responseFuture.setSendRequestSucceed(true));
54                return responseFuture.waitResponse(REQUEST_TIMEOUT_MS - (System.currentTimeMillis() - beginTimestamp));
55            });
56            ResponseMessagePacket responsePacket = packetFuture.get(
57                    REQUEST_TIMEOUT_MS - (System.currentTimeMillis() - beginTimestamp), TimeUnit.MILLISECONDS);
58            if (null == responsePacket) {
59                // 超时导致响应包获取失败
60                throw new SendRequestException(String.format("ResponseMessagePacket获取超时,请求ID:%s", packet.getSerialNumber()));
61            } else {
62                ByteBuf payload = (ByteBuf) responsePacket.getPayload();
63                byte[] bytes = ByteBufferUtils.X.readBytes(payload);
64                return SERIALIZER.decode(bytes, returnType);
65            }
66        } catch (Exception e) {
67            log.error("同步发送请求异常,请求包:{}", JSON.toJSONString(packet), e);
68            if (e instanceof RuntimeException) {
69                throw (RuntimeException) e;
70            } else {
71                throw new SendRequestException(e);
72            }
73        }
74    }
75
76    static void scanResponseFutureTable() {
77        log.info("开始执行ResponseFutureTable清理任务......");
78        Iterator<Map.Entry<String, ResponseFuture>> iterator = RESPONSE_FUTURE_TABLE.entrySet().iterator();
79        while (iterator.hasNext()) {
80            Map.Entry<String, ResponseFuture> entry = iterator.next();
81            ResponseFuture responseFuture = entry.getValue();
82            if (responseFuture.timeout()) {
83                iterator.remove();
84                log.warn("移除过期的请求ResponseFuture,请求ID:{}", entry.getKey());
85            }
86        }
87        log.info("执行ResponseFutureTable清理任务结束......");
88    }
89
90    static {
91        int n = Runtime.getRuntime().availableProcessors();
92        EXECUTOR = new ThreadPoolExecutor(n * 2, n * 2, 0, TimeUnit.SECONDS,
93                new ArrayBlockingQueue<>(50), runnable -> {
94            Thread thread = new Thread(runnable);
95            thread.setDaemon(true);
96            thread.setName("CLIENT_REQUEST_EXECUTOR");
97            return thread;
98        });
99        CLIENT_HOUSE_KEEPER = new ScheduledThreadPoolExecutor(1, runnable -> {
100            Thread thread = new Thread(runnable);
101            thread.setDaemon(true);
102            thread.setName("CLIENT_HOUSE_KEEPER");
103            return thread;
104        });
105        CLIENT_HOUSE_KEEPER.scheduleWithFixedDelay(ContractProxyFactory::scanResponseFutureTable, 5, 5, TimeUnit.SECONDS);
106    }
107}
108

接着添加一个客户端入站处理器,用于通过reuqestId匹配目标ResponseFuture实例,同时设置ResponseFuture实例中的response属性为响应包,同时释放闭锁:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
1@Slf4j
2public class ClientHandler extends SimpleChannelInboundHandler<ResponseMessagePacket> {
3
4    @Override
5    protected void channelRead0(ChannelHandlerContext ctx, ResponseMessagePacket packet) throws Exception {
6        log.info("接收到响应包,内容:{}", JSON.toJSONString(packet));
7        ResponseFuture responseFuture = ContractProxyFactory.RESPONSE_FUTURE_TABLE.get(packet.getSerialNumber());
8        if (null != responseFuture) {
9            responseFuture.putResponse(packet);
10        } else {
11            log.warn("接收响应包查询ResponseFuture不存在,请求ID:{}", packet.getSerialNumber());
12        }
13    }
14}
15

最后,客户端启动类ClientApplication中添加ClientHandler到Netty的处理器流水线中即可:


1
2
3
4
5
6
7
8
9
10
11
12
13
1bootstrap.handler(new ChannelInitializer<SocketChannel>() {
2
3    @Override
4    protected void initChannel(SocketChannel ch) throws Exception {
5        ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
6        ch.pipeline().addLast(new LengthFieldPrepender(4));
7        ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
8        ch.pipeline().addLast(new RequestMessagePacketEncoder(FastJsonSerializer.X));
9        ch.pipeline().addLast(new ResponseMessagePacketDecoder());
10        ch.pipeline().addLast(new ClientHandler());
11    }
12});
13

先运行之前- 《基于Netty和SpringBoot实现一个轻量级RPC框架-Server篇》中编写好的ServerApplication,再启动ClientApplication,日志输出如下:


1
2
3
4
5
6
7
8
9
10
11
1// 服务端
22020-01-18 14:32:59 [nioEventLoopGroup-3-2] INFO  club.throwable.server.ServerHandler - 服务端接收到:RequestMessagePacket(interfaceName=club.throwable.contract.HelloService, methodName=sayHello, methodArgumentSignatures=[java.lang.String], methodArguments=[PooledUnsafeDirectByteBuf(ridx: 0, widx: 11, cap: 11/144)])
32020-01-18 14:32:59 [nioEventLoopGroup-3-2] INFO  club.throwable.server.ServerHandler - 查找目标实现方法成功,目标类:club.throwable.server.contract.DefaultHelloService,宿主类:club.throwable.server.contract.DefaultHelloService,宿主方法:sayHello
42020-01-18 14:32:59 [nioEventLoopGroup-3-2] INFO  club.throwable.server.ServerHandler - 服务端输出:{"attachments":{},"errorCode":200,"magicNumber":10086,"message":"Success","messageType":"RESPONSE","payload":"\"throwable say hello!\"","serialNumber":"21d131d26fc74f91b4691e0207826b90","version":1}
5
6// 客户端
72020-01-18 14:32:59 [nioEventLoopGroup-2-1] INFO  club.throwable.client.ClientHandler - 接收到响应包,内容:{"attachments":{},"errorCode":200,"magicNumber":10086,"message":"Success","messageType":"RESPONSE","payload":{"contiguous":true,"direct":true,"readOnly":false,"readable":true,"writable":false},"serialNumber":"21d131d26fc74f91b4691e0207826b90","version":1}
82020-01-18 14:32:59 [main] INFO  c.throwable.client.ClientApplication - HelloService[throwable]调用结果:"throwable say hello!"
92020-01-18 14:33:04 [CLIENT_HOUSE_KEEPER] INFO  c.t.client.ContractProxyFactory - 开始执行ResponseFutureTable清理任务......
102020-01-18 14:33:04 [CLIENT_HOUSE_KEEPER] WARN  c.t.client.ContractProxyFactory - 移除过期的请求ResponseFuture,请求ID:21d131d26fc74f91b4691e0207826b90
11

可见异步线程模型已经被改造为同步化,现在可以通过契约接口通过RPC同步调用服务端。

小结

Client端的请求-响应同步化处理基本改造完毕,到此为止,一个RPC框架大致已经完成,接下来会对Client端和Server端进行一些改造,让契约相关组件托管到IOC容器,实现契约接口自动注入等等功能。

Demo项目地址:

  • ch0-custom-rpc-protocol

(本文完e-a-20200118 c-2-d)

给TA打赏
共{{data.count}}人
人已打赏
安全经验

google adsense作弊及反作弊技术分析

2021-10-11 16:36:11

安全经验

安全咨询服务

2022-1-12 14:11:49

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索