上一篇文章说到,现在这种每发起请求一次就新建一个client链接,但是如果在并发比较高的情况下,就会造成资源浪费,如果通过client和server进行长期链接,把需要处理的请求存到client里面,并且通过异步的形式返回,便会减少资源浪费。
这里有两个主要的问题,1、如何实现异步返回?2、如何把client和server对应起来?
先看第一个问题,所谓的异步返回,可以以点奶茶为例,当顾客(消费方)向奶茶店(服务端)点了一杯奶茶(发起请求),奶茶店给他一个号码(凭证),然后客户拿着号就去逛街干其他事情,过了一段时间回来问奶茶店有没有好了,此时奶茶店根据他的号码来给他对应的奶茶。
这里就是为什么上篇文章说的请求要带上requestID,这就相当于凭证的标识,而客户端链接可以保存这些请求并且返回一个凭证,当有返回时,把对应的结果塞到凭证里面,客户端可以做自己的事情或者一直等待凭证有结果,这时候结合线程池去做这个处理
第二个问题,需要一个链接池来管理链接和对应的远端关系
看看编码实现
第一个问题最重要的就是如何设计一个凭证,这时候需要用到Future接口来实现自己的凭证,通过一个内部同步器的管理,这个同步器主要是起到自旋改变状态的作用,同步器因为只有一个线程节点,会不断自旋尝试获取锁,获取锁的条件是远端有返回并且改变了同步器的值。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132 1
2
3/**
4 * @author: lele
5 * @date: 2019/11/21 上午10:54
6 * 实现异步返回
7 */
8public class RpcFuture implements Future<Object> {
9
10 private RpcResponse rpcResponse;
11
12 private RpcRequest rpcRequest;
13
14 /**
15 * 自定义同步器,这里只是用来通过自选改变状态
16 */
17 private Sync sync;
18
19 public RpcFuture(RpcRequest rpcRequest) {
20 this.rpcRequest = rpcRequest;
21 this.sync = new Sync();
22 }
23
24
25 @Override
26 public boolean cancel(boolean mayInterruptIfRunning) {
27 throw new UnsupportedOperationException();
28 }
29
30 @Override
31 public boolean isCancelled() {
32 throw new UnsupportedOperationException();
33 }
34
35 /**
36 * 返回状态是否改变了
37 * @return
38 */
39 @Override
40 public boolean isDone() {
41 return sync.isDone();
42 }
43
44 /**
45 * 赋值并设置同步器锁状态为1
46 * @param response
47 */
48 public void done(RpcResponse response) {
49 this.rpcResponse = response;
50 sync.tryRelease(1);
51
52 }
53
54
55 /**
56 * 自选等待结果,这里一直执行acquire
57 * 直到tryacquire方法return true即state为1
58 * @return
59 * @throws InterruptedException
60 * @throws ExecutionException
61 */
62 @Override
63 public Object get() throws InterruptedException, ExecutionException {
64 sync.acquire(-1);
65 return this.rpcResponse;
66
67 }
68
69 /**
70 * 超时抛异常
71 * @param timeout
72 * @param unit
73 * @return
74 * @throws InterruptedException
75 * @throws ExecutionException
76 * @throws TimeoutException
77 */
78 @Override
79 public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
80 //超时获取
81 boolean success = sync.tryAcquireNanos(-1, unit.toNanos(timeout));
82 return get(success);
83 }
84
85 private Object get(boolean success) {
86 if (success) {
87 return this.rpcResponse;
88 } else {
89 throw new RuntimeException("超时:requestID" + rpcRequest.getRequestId() +
90 " method:" + rpcRequest.getMethodName() + " interface:" + rpcRequest.getInterfaceName()
91 );
92 }
93 }
94
95
96
97 /**
98 * 继承同步器,这里只是用来自旋改变状态,根据state来实现,state初始为0
99 */
100 static class Sync extends AbstractQueuedSynchronizer {
101 /**
102 * 尝试获取锁,如果获取不了,加入同步队列,阻塞自己,只由同步队列的头自旋获取锁
103 * 当状态为1,即有结果返回时可以获取锁进行后续操作,设置result
104 *这里只有一个节点,会不断自选尝试获取锁
105 * @param arg
106 * @return
107 */
108 @Override
109 protected boolean tryAcquire(int arg) {
110 return getState() == 1;
111 }
112
113 /**
114 * 用于远端有返回时,设置状态变更
115 * 从头唤醒同步队列的队头下一个等待的节点,如果下一个节点为空,则从队尾唤醒
116 * @param arg
117 * @return
118 */
119 @Override
120 protected boolean tryRelease(int arg) {
121 //把状态设置为1,给tryAcquire获取锁进行操作
122 return getState() == 0 ? compareAndSetState(0, 1) : true;
123 }
124
125 public boolean isDone() {
126 return getState() == 1;
127 }
128 }
129
130}
131
132
凭证写好了,接下来就写处理凭证的handler类,这个类有一个发送请求的方法,主要是把凭证和对应的标识存储起来,并在有结果返回时设置返回状态
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77 1package com.gdut.rpcstudy.demo.framework.protocol.netty.asyn;
2
3import com.gdut.rpcstudy.demo.framework.URL;
4import com.gdut.rpcstudy.demo.framework.protocol.netty.asyn.RpcFuture;
5import com.gdut.rpcstudy.demo.framework.serialize.tranobject.RpcRequest;
6import com.gdut.rpcstudy.demo.framework.serialize.tranobject.RpcResponse;
7import io.netty.buffer.Unpooled;
8import io.netty.channel.*;
9import lombok.Data;
10import lombok.EqualsAndHashCode;
11
12import java.util.concurrent.ConcurrentHashMap;
13import java.util.concurrent.CountDownLatch;
14
15/**
16 * @author: lele
17 * @date: 2019/11/21 下午4:07
18 * 异步模式下的处理
19 */
20@Data
21@EqualsAndHashCode(callSuper = false)
22public class NettyAsynHandler extends SimpleChannelInboundHandler<RpcResponse> {
23 //key:requestId,value自定义future
24 private ConcurrentHashMap<String, RpcFuture> resultMap = new ConcurrentHashMap<>();
25
26 private volatile Channel channel;
27
28 //对应的远端URL
29 private final URL url;
30
31
32 @Override
33 public void channelActive(ChannelHandlerContext ctx) throws Exception {
34 this.channel = ctx.channel();
35 }
36
37 public void close() {
38 this.channel.close();
39 }
40
41 @Override
42 protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcResponse s) throws Exception {
43 System.out.println("收到结果:" + s);
44 String requestId = s.getRequestId();
45 //设置完成并移除future
46 RpcFuture future = resultMap.get(requestId);
47 if (s != null) {
48 future.done(s);
49 resultMap.remove(requestId);
50 }
51 }
52
53
54 public RpcFuture sendRequest(RpcRequest rpcRequest) {
55 final CountDownLatch latch = new CountDownLatch(1);
56 RpcFuture future = new RpcFuture(rpcRequest);
57 //放到请求列表里面
58 resultMap.put(rpcRequest.getRequestId(), future);
59 //发送请求
60 channel.writeAndFlush(rpcRequest).addListener(new ChannelFutureListener() {
61 @Override
62 public void operationComplete(ChannelFuture channelFuture) throws Exception {
63 System.out.println("发送了消息" + rpcRequest.toString());
64 latch.countDown();
65 }
66 });
67 try {
68 //等待结果
69 latch.await();
70 } catch (InterruptedException e) {
71 e.printStackTrace();
72 }
73 return future;
74 }
75}
76
77
第一个问题的两个比较核心的类就编写到这里。
然后到第二个问题,如何实现一个链接管理池?要为每个服务下的每个server建立一个客户端,则需要先通过注册中心获取地址,再进行后续操作,这里获取每个服务下的server地址
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 1 public static Map<String,List<URL>> getAllURL(){
2 Map<String,List<URL>> mapList=null;
3 try {
4 List<String> servcieList=client.getChildren().forPath("/");
5 mapList=new HashMap<>(servcieList.size());
6 for (String s : servcieList) {
7 mapList.put(s,getService(s));
8 }
9
10 } catch (Exception e) {
11 e.printStackTrace();
12 }
13 return mapList;
14 }
15
获取地址后,我们可以建立关系了,采用{serviceName:{serverAddress:clienthandler}}的形式进行存储,从client handler存放的就是上面说的客户端处理器,通过这里我们可以通过不同的选择方式挑一个出来发送请求,这里采用的选择方式是轮询。
大概是这样的形式去获取凭证,管理池根据服务名轮询地选取一个客户端,并且把凭证让他处理,然后返回凭证,可这里有个隐患,客户端连接到服务端是需要时间的,再没有可用的客户端情况下,会报空指针异常
1
2
3
4
5
6 1 public RpcFuture sendFuture(String serviceName, RpcRequest request) {
2 NettyAsynHandler handler=ConnectManager.getInstance().getConnectionWithPolling(serviceName);
3 RpcFuture future = handler.sendRequest(request);
4 return future;
5 }
6
这时候我们需要为加入锁和条件队列(Condition),当满足条件时,才可以链接,不然就自旋等待直到条件满足,当客户端链接上u对应的server,我们才可以获取对应的客户端处理器,但是服务有这么多,用一个锁是可以,但是调用不相干的服务也要被锁吗?这时候可以细粒度化我们的锁,为每个服务建立一个锁及其相应的条件队列。这是连接池需要注意的点,下面看看代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281 1
2/**
3 * @author: lele
4 * @date: 2019/11/21 上午11:58
5 * 管理连接池
6 */
7public class ConnectManager {
8
9
10 private Boolean isShutDown = false;
11
12 /**
13 * 客户端链接服务端超时时间
14 */
15 private long connectTimeoutMillis = 5000;
16
17 /**
18 * 自定义6个线程组用于客户端服务
19 */
20 private EventLoopGroup eventLoopGroup = new NioEventLoopGroup(6);
21
22 /**
23 * 存放服务对应的访问数,用于轮询
24 */
25 private Map<String, AtomicInteger> pollingMap = new ConcurrentHashMap<>();
26
27 /**
28 * 对于每个服务都有一个锁,每个锁都有个条件队列,用于控制链接获取
29 */
30 private Map<String, Object[]> serviceCondition = new ConcurrentHashMap<>();
31
32 /**
33 * 存放服务端地址和handler的关系
34 */
35 private Map<String, Map<URL, NettyAsynHandler>> serverClientMap = new ConcurrentHashMap<>();
36
37 /**
38 * 用来初始化客户端
39 */
40 private ThreadPoolExecutor clientBooter = new ThreadPoolExecutor(
41 16, 16, 600, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1024)
42 , new BooterThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
43
44 private static class Holder {
45 private static final ConnectManager j = new ConnectManager();
46 }
47
48 private ConnectManager() {
49 //初始化时把所有的url加进去,这里可能没有可用链接,所以需要添加对节点的监听
50 Map<String, List<URL>> allURL = ZkRegister.getAllURL();
51 for (String s : allURL.keySet()) {
52 //为每个服务添加锁和条件队列,通过条件队列控制客户端链接获取
53 ReentrantLock lock =new ReentrantLock();
54 Condition condition = lock.newCondition();
55 serviceCondition.put(s,new Object[]{lock,condition});
56 }
57 addServer(allURL);
58 }
59
60 public static ConnectManager getInstance() {
61
62 return Holder.j;
63 }
64
65
66 /**
67 * 添加该服务对应的链接和handler
68 * @param serviceName
69 * @param url
70 * @param handler
71 */
72 public void addConnection(String serviceName, URL url, NettyAsynHandler handler) {
73 Map<URL, NettyAsynHandler> handlerMap;
74 if (!serverClientMap.containsKey(serviceName)) {
75 handlerMap = new HashMap<>();
76 } else {
77 handlerMap = serverClientMap.get(serviceName);
78 }
79
80 handlerMap.put(url, handler);
81 //添加服务名和对应的url:客户端链接
82 serverClientMap.put(serviceName, handlerMap);
83 //唤醒等待客户端链接的线程
84 signalAvailableHandler(serviceName);
85 }
86
87 /**
88 * 获取对应服务下的handler,通过轮询获取
89 * @param servicName
90 * @return
91 */
92 public NettyAsynHandler getConnectionWithPolling(String servicName) {
93 Map<URL, NettyAsynHandler> urlNettyAsynHandlerMap = serverClientMap.get(servicName);
94 int size = 0;
95 //先尝试获取
96 if (urlNettyAsynHandlerMap != null) {
97 size = urlNettyAsynHandlerMap.size();
98 }
99 //不行就自选等待
100 while (!isShutDown && size <= 0) {
101 try {
102 //自旋等待可用服务出现,因为客户端与服务链接需要一定的时间,如果直接返回会出现空指针异常
103 boolean available = waitingForHandler(servicName);
104 if (available) {
105 urlNettyAsynHandlerMap = serverClientMap.get(servicName);
106 size = urlNettyAsynHandlerMap.size();
107 }
108 } catch (InterruptedException e) {
109 throw new RuntimeException("出错", e);
110 }
111 }
112 //获取对应的访问次数
113 AtomicInteger count = pollingMap.get(servicName);
114 int index = (count.getAndAdd(1) + size) % size;
115
116 Iterator<Map.Entry<URL, NettyAsynHandler>> iterator = urlNettyAsynHandlerMap.entrySet().iterator();
117 //取出相应的handler
118 NettyAsynHandler nettyAsynHandler = null;
119 for (int i = 0; i <= index; i++) {
120 nettyAsynHandler = iterator.next().getValue();
121 }
122 return nettyAsynHandler;
123 }
124
125 /**
126 * 等待一定时间,等handler和相应的server建立建立链接,用条件队列控制
127 * @param serviceName
128 * @return
129 * @throws InterruptedException
130 */
131 private boolean waitingForHandler(String serviceName) throws InterruptedException {
132 Object[] objects = serviceCondition.get(serviceName);
133 ReentrantLock lock = (ReentrantLock) objects[0];
134 lock.lock();
135 Condition condition= (Condition) objects[1];
136 try {
137 return condition.await(this.connectTimeoutMillis, TimeUnit.MILLISECONDS);
138 } finally {
139 lock.unlock();
140 }
141 }
142
143 public void removeURL(URL url) {
144 List<String> list = new ArrayList<>();
145 for (Map.Entry<String, Map<URL, NettyAsynHandler>> map : serverClientMap.entrySet()) {
146 for (Map.Entry<URL, NettyAsynHandler> urlNettyAsynHandlerEntry : map.getValue().entrySet()) {
147 if (urlNettyAsynHandlerEntry.getKey().equals(url)) {
148 urlNettyAsynHandlerEntry.getValue().close();
149 list.add(map.getKey() + "@" + urlNettyAsynHandlerEntry.getKey());
150 }
151 }
152 }
153 for (String s : list) {
154 String[] split = s.split("@");
155 serverClientMap.get(split[0]).remove(split[1]);
156 }
157
158 }
159
160 /**
161 * 释放对应服务的条件队列,代表有客户端链接可用了
162 * @param serviceName
163 */
164 private void signalAvailableHandler(String serviceName) {
165 Object[] objects = serviceCondition.get(serviceName);
166 ReentrantLock lock = (ReentrantLock) objects[0];
167 lock.lock();
168 Condition condition= (Condition) objects[1];
169 try {
170 condition.signalAll();
171 } finally {
172 lock.unlock();
173 }
174 }
175
176 /**
177 * 添加server,并启动对应的服务器
178 * @param allURL
179 */
180 public void addServer(Map<String, List<URL>> allURL) {
181
182 for (String s : allURL.keySet()) {
183 pollingMap.put(s, new AtomicInteger(0));
184 List<URL> urls = allURL.get(s);
185 for (URL url : urls) {
186 //提交创建任务
187 clientBooter.submit(new Runnable() {
188 @Override
189 public void run() {
190 createClient(s, eventLoopGroup, url);
191 }
192 });
193 }
194 }
195 System.out.println("初始化客户端ing");
196 }
197
198 /**
199 * 创建客户端,持久化链接
200 * @param serviceName
201 * @param eventLoopGroup
202 * @param url
203 */
204 public void createClient(String serviceName, EventLoopGroup eventLoopGroup, URL url) {
205 Bootstrap b = new Bootstrap();
206 b.group(eventLoopGroup)
207 .channel(NioSocketChannel.class)
208 .handler((new ChannelInitializer<SocketChannel>() {
209 @Override
210 protected void initChannel(SocketChannel ch) throws Exception {
211 ch.pipeline()
212 //把request实体变为字节
213 .addLast(new RpcEncoder(RpcRequest.class))
214 //把返回的response字节变为对象
215 .addLast(new RpcDecoder(RpcResponse.class))
216 .addLast(new NettyAsynHandler(url));
217 }
218 }));
219
220 ChannelFuture channelFuture = b.connect(url.getHostname(), url.getPort());
221
222 channelFuture.addListener(new ChannelFutureListener() {
223 @Override
224 public void operationComplete(final ChannelFuture channelFuture) throws Exception {
225 //链接成功后的操作,把相应的url地址和客户端链接存入
226 if (channelFuture.isSuccess()) {
227 NettyAsynHandler handler = channelFuture.channel().pipeline().get(NettyAsynHandler.class);
228 addConnection(serviceName, url, handler);
229 }
230 }
231 });
232 }
233
234
235 /**
236 * 关闭方法,关闭每个客户端链接,释放所有锁,关掉创建链接的线程池,和客户端的处理器
237 */
238 public void stop() {
239 isShutDown = true;
240 for (Map<URL, NettyAsynHandler> urlNettyAsynHandlerMap : serverClientMap.values()) {
241 urlNettyAsynHandlerMap.values().forEach(e -> e.close());
242 }
243 for(String s:serviceCondition.keySet()){
244 signalAvailableHandler(s);
245 }
246 clientBooter.shutdown();
247 eventLoopGroup.shutdownGracefully();
248 }
249
250
251 /**
252 * 启动客户端链接的自定义线程工厂
253 */
254 static class BooterThreadFactory implements ThreadFactory {
255
256 private static final AtomicInteger poolNumber = new AtomicInteger(1);
257 private final ThreadGroup group;
258 private final AtomicInteger threadNumber = new AtomicInteger(1);
259 private final String namePrefix;
260
261 BooterThreadFactory() {
262 group = new ThreadGroup("connectManger");
263 group.setDaemon(false);
264 group.setMaxPriority(5);
265 namePrefix = "clientBooter-" +
266 poolNumber.getAndIncrement() +
267 "-thread-";
268 }
269
270 @Override
271 public Thread newThread(Runnable r) {
272 Thread t = new Thread(group, r,
273 namePrefix + threadNumber.getAndIncrement(),
274 0);
275 return t;
276 }
277 }
278
279}
280
281
这两个问题解决好后,接下来对整体框架做修改,首先是接口注解,增加了异步模式,对象工厂根据注解来选择不同的代理方式,
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 1@Retention(RetentionPolicy.RUNTIME)
2@Target(ElementType.TYPE)
3
4//用于接口上,name为服务名,zk则在注册服务改为 服务名/ip,服务端通过传来的接口名通过反射获取类,或者通过给spring托管获取其class
5public @interface RpcStudyClient {
6 String name();
7 //结果返回是异步还是同步模式
8 int mode() default sync;
9 int sync=0;
10 int asyn=1;
11
12}
13
14@Data
15@EqualsAndHashCode(callSuper = false)
16public class RpcStudyClientFactoryBean implements FactoryBean<Object> {
17 private Class<?> type;
18
19 @Override
20 public Object getObject() throws Exception {
21 //根据RpcStudeClient的mode字段选择以哪种方式代理
22 RpcStudyClient annotation = type.getAnnotation(RpcStudyClient.class);
23 int mode = annotation.mode();
24 return mode==RpcStudyClient.asyn?ProxyFactory.getAsyncProxy(this.type):ProxyFactory.getProxy(this.type);
25 }
26
27 @Override
28 public Class<?> getObjectType() {
29 return this.type;
30 }
31}
32
33
代理工厂的新增异步代理,这里需要自旋获取结果,不然会导致结果获取不到。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52 1 public static <T> T getAsyncProxy(Class interfaceClass) {
2 return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, new InvocationHandler() {
3 @Override
4 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
5
6 //指定所用协议
7 Protocol protocol = ProtocolFactory.netty();
8 RpcStudyClient annotation = (RpcStudyClient) interfaceClass.getAnnotation(RpcStudyClient.class);
9 String requestId = UUID.randomUUID().toString().replace("-", "");
10 //封装方法参数
11 RpcRequest rpcRequest = new RpcRequest(requestId, interfaceClass.getName(), method.getName(), args, method.getParameterTypes(), annotation.mode());
12 Future<RpcFuture> res = futureTask.submit(new Callable<RpcFuture>() {
13
14 @Override
15 public RpcFuture call() throws Exception {
16 RpcFuture res = protocol.sendFuture(annotation.fetch(), annotation.name(), rpcRequest);
17 //先尝试一次
18 if (res.isDone()) {
19 return res;
20 }
21 //不行就自旋等待
22 while (!res.isDone()) {
23
24 }
25 return res;
26 }
27 ;
28 });
29
30 //发送请求
31 //这里的管理连接池通过服务名去访问zk,获取可用的url
32 return returnResult(res.get());
33 }
34 });
35 }
36
37 /**
38 * 具体的处理异步返回的方法
39 * @param res
40 * @return
41 * @throws ExecutionException
42 * @throws InterruptedException
43 */
44 public static Object returnResult(RpcFuture res) throws ExecutionException, InterruptedException {
45 RpcResponse response = (RpcResponse) res.get();
46 if (response.getError() != null) {
47 throw new RuntimeException(response.getError());
48 } else {
49 return response.getResult();
50 }
51 }
52
请求实体类也增加多一个属性mode,标识是同步还是异步模式,给serverhandler做处理,如果是异步,则不关闭链接,不然会导致客户端发的请求无法接受,同步则关闭。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 1@Data
2@AllArgsConstructor
3public class RpcRequest {
4
5 private String requestId;
6
7 private String interfaceName;
8
9 private String methodName;
10
11 private Object[] params;
12 //防止重载
13 private Class[] paramsTypes;
14 //是否异步
15 private int mode;
16}
17
18
19
20 ctx.writeAndFlush(response).addListener(new ChannelFutureListener() {
21 @Override
22 public void operationComplete(ChannelFuture channelFuture) throws Exception {
23 System.out.println("发送了结果"+response);
24//如果是mode为同步,就直接关闭连接
25 if(rpcRequest.getMode()==RpcStudyClient.sync){
26 ctx.channel().close();
27 }
28// 当异步模式时,不关闭链接
29 }
30 });
31
这里还可以为异步请求添加回调处理,慢请求日志记录等等,都可以在自定义的凭证里面下文章。异步请求和连接池大概就到这里,具体代码https://github.com/97lele/rpcstudy/tree/withconcurrent