分析NioEventLoopGroup最主有两个疑问
1.next work如何分配NioEventLoop
2.boss group 与child group 是如何协作运行的
从EventLoopGroup接口约定通过register方法从channel或promise转换成ChannelFuture对象
next方法就是用来分配NioEventLoop
1
2
3
4
5
6
7
8
9
10
11 1public interface EventLoopGroup extends EventExecutorGroup {
2
3 @Override
4 EventLoop next();
5
6 ChannelFuture register(Channel channel);
7 ChannelFuture register(ChannelPromise promise);
8 @Deprecated
9 ChannelFuture register(Channel channel, ChannelPromise promise);
10}
11
为了节省篇副,做了代码整理
1.NioEventLoopGroup构造时绑定SelectorProvider.provider(),通过newChild生成单个EventLoop
2.next实现是个环形循环
3.register方法是将channel转换成ChannelFuture
读者如果感兴趣可以在这几个方法打上断点看看
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30 1public class NioEventLoopGroup extends MultithreadEventLoopGroup {
2 public NioEventLoopGroup(int nThreads, Executor executor) {
3 this(nThreads, executor, SelectorProvider.provider());
4 }
5 @Override
6 protected EventLoop newChild(Executor executor, Object... args) throws Exception {
7 return new NioEventLoop(this, executor, (SelectorProvider) args[0],
8 ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
9 }
10 /////////////////////////////GenericEventExecutorChooser实现next//////////////////////////////////
11 @Override
12 public EventExecutor next() {
13 return executors[Math.abs(idx.getAndIncrement() % executors.length)];
14 }
15
16 /////////////////////////////SingleThreadEventLoop实现register//////////////////////////////////
17
18 @Override
19 public ChannelFuture register(Channel channel) {
20 return register(new DefaultChannelPromise(channel, this));
21 }
22
23 @Override
24 public ChannelFuture register(final ChannelPromise promise) {
25 ObjectUtil.checkNotNull(promise, "promise");
26 promise.channel().unsafe().register(this, promise);
27 return promise;
28 }
29}
30
我们用过程的方式来模拟NioEventLoopGroup使用
如果读者有印象netty server 至少有两组NioEventLoopGroup 一个是boss 另一个是child
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61 1public class TestBossChildGroup {
2 static SocketAddress address = new InetSocketAddress("localhost", 8877);
3
4 @Test
5 public void server() throws IOException {
6
7 SelectorProvider bossProvider = SelectorProvider.provider();
8 SelectorProvider childProvider = SelectorProvider.provider();
9
10 int count = 2;
11 AbstractSelector bossSelector = bossProvider.openSelector();
12 AbstractSelector[] childSelectors = new AbstractSelector[count];
13 for (int i = 0; i < count; i++) {
14 childSelectors[i] = childProvider.openSelector();
15 }
16
17 //server绑定访问端口 并向Selector注册OP_ACCEPT
18 ServerSocketChannel serverSocketChannel = bossProvider.openServerSocketChannel();
19 serverSocketChannel.configureBlocking(false);
20 serverSocketChannel.bind(address);
21 serverSocketChannel.register(bossSelector, SelectionKey.OP_ACCEPT);
22
23 int i = 0;
24 while (true) {
25 int s = bossSelector.select(300);
26 if (s > 0) {
27 Set<SelectionKey> keys = bossSelector.selectedKeys();
28 Iterator<SelectionKey> it = keys.iterator();
29 while (it.hasNext()) {
30 SelectionKey key = it.next();
31 //为什么不用elseIf 因为 key interestOps 是多重叠状态,一次返回多个操作
32 if (key.isAcceptable()) {
33 System.out.println("isAcceptable");
34 //这里比较巧妙,注册OP_READ交给别一个Selector处理
35 key.channel().register(childSelectors[i++ % count], SelectionKey.OP_READ);
36 }
37 //这部分是child eventLoop处理
38 if (key.isConnectable()) {
39 System.out.println("isConnectable");
40 }
41 if (key.isWritable()) {
42 System.out.println("isWritable");
43 }
44 if (key.isReadable()) {
45 System.out.println("isReadable");
46 }
47 key.interestOps(~key.interestOps());
48 it.remove();
49 }
50 }
51 }
52 }
53
54 @Test
55 public void client() throws IOException {
56 SocketChannel clientSocketChannel = SelectorProvider.provider().openSocketChannel();
57 clientSocketChannel.configureBlocking(true);
58 clientSocketChannel.connect(address);
59 }
60}
61