[编织消息框架][netty源码分析]5 EventLoopGroup 实现类NioEventLoopGroup职责与实现

释放双眼,带上耳机,听听看~!

分析NioEventLoopGroup最主有两个疑问

1.next work如何分配NioEventLoop

2.boss group 与child group 是如何协作运行的

 

从EventLoopGroup接口约定通过register方法从channel或promise转换成ChannelFuture对象

next方法就是用来分配NioEventLoop


1
2
3
4
5
6
7
8
9
10
11
1public interface EventLoopGroup extends EventExecutorGroup {
2
3    @Override
4    EventLoop next();
5    
6    ChannelFuture register(Channel channel);
7    ChannelFuture register(ChannelPromise promise);
8    @Deprecated
9    ChannelFuture register(Channel channel, ChannelPromise promise);
10}
11

为了节省篇副,做了代码整理

1.NioEventLoopGroup构造时绑定SelectorProvider.provider(),通过newChild生成单个EventLoop

2.next实现是个环形循环

3.register方法是将channel转换成ChannelFuture

读者如果感兴趣可以在这几个方法打上断点看看


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
1public class NioEventLoopGroup extends MultithreadEventLoopGroup {
2    public NioEventLoopGroup(int nThreads, Executor executor) {
3        this(nThreads, executor, SelectorProvider.provider());
4    }
5    @Override
6    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
7        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
8            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
9    }
10    /////////////////////////////GenericEventExecutorChooser实现next//////////////////////////////////
11    @Override
12    public EventExecutor next() {
13        return executors[Math.abs(idx.getAndIncrement() % executors.length)];
14    }
15    
16    /////////////////////////////SingleThreadEventLoop实现register//////////////////////////////////
17
18    @Override
19    public ChannelFuture register(Channel channel) {
20        return register(new DefaultChannelPromise(channel, this));
21    }
22
23    @Override
24    public ChannelFuture register(final ChannelPromise promise) {
25        ObjectUtil.checkNotNull(promise, "promise");
26        promise.channel().unsafe().register(this, promise);
27        return promise;
28    }
29}
30

我们用过程的方式来模拟NioEventLoopGroup使用

如果读者有印象netty server 至少有两组NioEventLoopGroup 一个是boss 另一个是child


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
1public class TestBossChildGroup {
2    static SocketAddress address = new InetSocketAddress("localhost", 8877);
3
4    @Test
5    public void server() throws IOException {
6
7        SelectorProvider bossProvider = SelectorProvider.provider();
8        SelectorProvider childProvider = SelectorProvider.provider();
9
10        int count = 2;
11        AbstractSelector bossSelector = bossProvider.openSelector();
12        AbstractSelector[] childSelectors = new AbstractSelector[count];
13        for (int i = 0; i < count; i++) {
14            childSelectors[i] = childProvider.openSelector();
15        }
16
17        //server绑定访问端口 并向Selector注册OP_ACCEPT
18        ServerSocketChannel serverSocketChannel = bossProvider.openServerSocketChannel();
19        serverSocketChannel.configureBlocking(false);
20        serverSocketChannel.bind(address);
21        serverSocketChannel.register(bossSelector, SelectionKey.OP_ACCEPT);  
22        
23        int i = 0;
24        while (true) {
25            int s = bossSelector.select(300);
26            if (s > 0) {
27            Set<SelectionKey> keys = bossSelector.selectedKeys();
28            Iterator<SelectionKey> it = keys.iterator();
29            while (it.hasNext()) {
30                SelectionKey key = it.next();
31                //为什么不用elseIf 因为 key interestOps 是多重叠状态,一次返回多个操作
32                if (key.isAcceptable()) {
33                System.out.println("isAcceptable");
34                //这里比较巧妙,注册OP_READ交给别一个Selector处理
35                key.channel().register(childSelectors[i++ % count], SelectionKey.OP_READ);
36                }
37                //这部分是child eventLoop处理
38                if (key.isConnectable()) {
39                System.out.println("isConnectable");
40                }
41                if (key.isWritable()) {
42                System.out.println("isWritable");
43                }
44                if (key.isReadable()) {
45                System.out.println("isReadable");
46                }
47                key.interestOps(~key.interestOps());
48                it.remove();
49            }
50            }
51        }
52    }
53
54    @Test
55    public void client() throws IOException {
56        SocketChannel clientSocketChannel = SelectorProvider.provider().openSocketChannel();
57        clientSocketChannel.configureBlocking(true);
58        clientSocketChannel.connect(address);
59    }
60}
61

 

给TA打赏
共{{data.count}}人
人已打赏
安全网络

CDN安全市场到2022年价值76.3亿美元

2018-2-1 18:02:50

安全运维

700元就可买到同事行踪?安全专家:属实!你不知道的还有这些

2016-12-19 15:21:33

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索