[编织消息框架][netty源码分析]4 EventLoop 实现类NioEventLoop职责与实现

释放双眼,带上耳机,听听看~!

NioEventLoop 是jdk nio多路处理实现同修复jdk nio的bug

1.NioEventLoop继承SingleThreadEventLoop 重用单线程处理

2.NioEventLoop是组成 pool EventLoopGroup 基本单元 

总之好多边界判断跟业务经验之类的代码,非常烦碎

 

重要属性


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
1public final class NioEventLoop extends SingleThreadEventLoop {  
2    //绑定 selector
3    Selector selector;
4    //优化过的Set集合
5    private SelectedSelectionKeySet selectedKeys;
6    //引用全局 SelectorProvider
7    private final SelectorProvider provider;
8    ///////////////////////////////////////////
9    //为true时执行selector.wakeup()
10    private final AtomicBoolean wakenUp = new AtomicBoolean();
11    //io任务占时比率
12    private volatile int ioRatio = 50;
13    //记录selectionKey撤销次数
14    private int cancelledKeys;
15    //处理selector.selectNow() 标志
16    private boolean needsToSelectAgain;
17}
18

 

替换Selector selectedKeySet字段与重构Selector

优化selectedKeySet集合用的是double cache技术,这种技术在图形渲染处理比较多


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
1    //netty 用到反射加 AccessController技术替换掉 Selector selectedKeySet 字段
2    private Selector openSelector() {
3        final Selector selector = provider.openSelector();
4        final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
5
6        Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
7            @Override
8            public Object run() {
9                try {
10                    return Class.forName(
11                            "sun.nio.ch.SelectorImpl",
12                            false,
13                            PlatformDependent.getSystemClassLoader());
14                } catch (Throwable cause) {
15                    return cause;
16                }
17            }
18        });
19
20        final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
21        Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
22            @Override
23            public Object run() {
24                //用到反射技术更改 SelectorImpl 字段
25                Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
26                Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
27                selectedKeysField.setAccessible(true);
28                publicSelectedKeysField.setAccessible(true);
29
30                selectedKeysField.set(selector, selectedKeySet);
31                publicSelectedKeysField.set(selector, selectedKeySet);
32                return null;
33            }
34        });
35
36        return selector;
37    }
38
39//重新构建Selector
40    private void rebuildSelector0() {
41        final Selector oldSelector = selector;
42        final Selector newSelector;
43
44        if (oldSelector == null) {
45            return;
46        }
47
48       newSelector = openSelector();
49
50        //迁移处理
51        int nChannels = 0;
52        for (SelectionKey key: oldSelector.keys()) {
53            Object a = key.attachment();
54            try {
55                //过滤key是否合法 已处理
56                if (!key.isValid() || key.channel().keyFor(newSelector) != null) {
57                    continue;
58                }
59                int interestOps = key.interestOps();
60                key.cancel();
61                SelectionKey newKey = key.channel().register(newSelector, interestOps, a);
62                if (a instanceof AbstractNioChannel) {
63                    // channel重新绑定SelectionKey
64                    ((AbstractNioChannel) a).selectionKey = newKey;
65                }
66                nChannels ++;
67            } catch (Exception e) {
68                //出错处理 netty认为 socket已关闭
69                if (a instanceof AbstractNioChannel) {
70                    AbstractNioChannel ch = (AbstractNioChannel) a;
71                    ch.unsafe().close(ch.unsafe().voidPromise());
72                } else {
73                    @SuppressWarnings("unchecked")
74                    NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
75                    invokeChannelUnregistered(task, key, e);
76                }
77            }
78        }
79        selector = newSelector;
80        oldSelector.close();
81     }
82

 

double cache 实现


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
1final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
2
3        private SelectionKey[] keysA;
4        private int keysASize;
5        private SelectionKey[] keysB;
6        private int keysBSize;
7        private boolean isA = true;
8
9        SelectedSelectionKeySet() {
10            keysA = new SelectionKey[1024];
11            keysB = keysA.clone();
12        }
13
14        @Override
15        public boolean add(SelectionKey o) {
16            if (o == null) {
17                return false;
18            }
19            //是A开关即处理A
20            if (isA) {
21                int size = keysASize;
22                keysA[size ++] = o;
23                keysASize = size;
24                //双倍扩展容量
25                if (size == keysA.length) {
26                    doubleCapacityA();
27                }
28            } else {
29                int size = keysBSize;
30                keysB[size ++] = o;
31                keysBSize = size;
32                if (size == keysB.length) {
33                    doubleCapacityB();
34                }
35            }
36
37            return true;
38        }
39
40        private void doubleCapacityA() {
41            SelectionKey[] newKeysA = new SelectionKey[keysA.length << 1];
42            System.arraycopy(keysA, 0, newKeysA, 0, keysASize);
43            keysA = newKeysA;
44        }
45
46        private void doubleCapacityB() {
47            SelectionKey[] newKeysB = new SelectionKey[keysB.length << 1];
48            System.arraycopy(keysB, 0, newKeysB, 0, keysBSize);
49            keysB = newKeysB;
50        }
51        //获取keys并切换
52        SelectionKey[] flip() {
53            if (isA) {
54                isA = false;
55                keysA[keysASize] = null;
56                keysBSize = 0;
57                return keysA;
58            } else {
59                isA = true;
60                keysB[keysBSize] = null;
61                keysASize = 0;
62                return keysB;
63            }
64        }
65
66        @Override
67        public int size() {
68            return isA?keysASize : keysBSize;
69        }
70    }
71

重载Selector select 逻辑,修复jdk 会产生的 bug


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
1private void select(boolean oldWakenUp) throws IOException {
2        Selector selector = this.selector;
3  
4            int selectCnt = 0;
5            long currentTimeNanos = System.nanoTime();
6            //通过delayNanos计算出 select结束时间
7            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
8            for (;;) {
9                //计算出超时并转换成毫秒,再加上延时固定0.5毫秒
10                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
11                if (timeoutMillis <= 0) {
12                    if (selectCnt == 0) {
13                        selector.selectNow();
14                        selectCnt = 1;
15                    }
16                    break;
17                }
18
19                //如果有非IO任务,优先等侍selector操作
20                if (hasTasks() && wakenUp.compareAndSet(false, true)) {
21                    selector.selectNow();
22                    selectCnt = 1;
23                    break;
24                }
25                //阻塞当前线程
26                int selectedKeys = selector.select(timeoutMillis);
27                selectCnt ++;
28                //有IO,非IO,计划任务,wakenUp状态认为已完成 select 处理
29                if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
30                    break;
31                }
32                //如果当前线程中断,netty认为关闭了服务,退出处理
33                if (Thread.interrupted()) {
34                    selectCnt = 1;
35                    break;
36                }
37
38                //相当于下面等价,意思是当前时间大于或等于 (selectDeadLineNanos + 0.5毫秒) selectCnt 重置
39                //currentTimeNanos + (System.nanoTime() -  selectDeadLineNanos - 500000L )   >= currentTimeNanos
40                //System.nanoTime() -  selectDeadLineNanos - 500000L >= 0
41                //System.nanoTime() >= selectDeadLineNanos + 500000L
42                long time = System.nanoTime();
43                if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
44                    selectCnt = 1;
45                } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
46                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
47                    
48                    // selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD 默认值512,重构selector
49                    rebuildSelector();
50                    selector = this.selector;
51                    selector.selectNow();
52                    selectCnt = 1;
53                    break;
54                }
55                //刷新当前时间
56                currentTimeNanos = time;
57            }
58  
59    }
60

 

分发io与非io任务逻辑实现


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
1//这部分做了代码整理
2    @Override
3    protected void run() {
4        for (;;) {
5            try {
6                //检查是否有非IO任务同WAKEUP_TASK任务
7                if(!hasTasks()){
8                    continue;
9                }
10                //有任务就触发重写的 select
11                select(wakenUp.getAndSet(false));
12                if (wakenUp.get()) {
13                    selector.wakeup();
14                }
15
16                cancelledKeys = 0;
17                needsToSelectAgain = false;
18                final int ioRatio = this.ioRatio;//默认值50
19                
20               try {
21                    final long ioStartTime = System.nanoTime();
22                    //processSelectedKeys();
23                    //一般会selectedKeys不会为null做了优化处理
24                    if (selectedKeys != null) {
25                        processSelectedKeysOptimized(selectedKeys.flip());
26                    } else {
27                        processSelectedKeysPlain(selector.selectedKeys());
28                    }
29                } finally {
30                    //当ioRatio等于100时,百分百执行非IO全部任务
31                    if (ioRatio == 100) {
32                        runAllTasks();
33                    }else{
34                        final long ioTime = System.nanoTime() - ioStartTime;
35                        //计算时非IO任务超时时间,公式 = 100 - ioRatio 算出非IO比率再跟IO相比 执行过的IO时间 * (非IO:IO)
36                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
37                    }
38                }
39             } catch (Throwable t) {
40                //防止过多失败
41                Thread.sleep(1000);
42            }
43            
44            //处理完任务判断是否结束
45             try {
46                if (isShuttingDown()) {
47                    closeAll();
48                    if (confirmShutdown()) {
49                        return;
50                    }
51                }
52            } catch (Throwable t) {
53                  Thread.sleep(1000);
54            }
55        }
56    }
57    private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
58        for (int i = 0;; i ++) {
59            final SelectionKey k = selectedKeys[i];
60            if (k == null) {
61                break;
62            }
63            //依赖外部逻辑清理
64            selectedKeys[i] = null;
65            final Object a = k.attachment();
66
67            //处理SelectedKey
68            if (a instanceof AbstractNioChannel) {
69                processSelectedKey(k, (AbstractNioChannel) a);
70            } else {
71                @SuppressWarnings("unchecked")
72                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
73                processSelectedKey(k, task);
74            }
75            //这里用到比较奇怪的处理,应该是个补丁来的。。。
76             //从资料来源上说:当触发needsToSelectAgain时 channel全是关闭,所以忽略selectedKeys剩余的key,然后再重获取获取selectedKeys
77            // null out entries in the array to allow to have it GC'ed once the Channel close
78            // See https://github.com/netty/netty/issues/2363
79            if (needsToSelectAgain) {
80                for (;;) {
81                    i++;
82                    if (selectedKeys[i] == null) {
83                        break;
84                    }
85                    selectedKeys[i] = null;
86                }
87
88                selectAgain();
89                selectedKeys = this.selectedKeys.flip();
90                i = -1;
91            }
92        }
93    }
94    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
95        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
96        if (!k.isValid()) {
97            final EventLoop eventLoop;
98            try {
99                eventLoop = ch.eventLoop();
100            } catch (Throwable ignored) {
101                return;
102            }
103            //这里忽略情况是 在执行 registerd deregistration 时不能关闭,至于前后顺序无需要太多关心,读者可以进去看看
104            //每个人出现情况不一样,再加上eventLoop不可能为null的,这段代码明显没有经过测试
105            // Only close ch if ch is still registerd to this EventLoop. ch could have deregistered from the event loop
106            // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
107            // still healthy and should not be closed.
108            // See https://github.com/netty/netty/issues/5125
109            if (eventLoop != this || eventLoop == null) {
110                return;
111            }
112            unsafe.close(unsafe.voidPromise());
113            return;
114        }
115
116        try {
117            int readyOps = k.readyOps();
118            // 如果出现OP_CONNECT 状态必须先完成Connect 才能触发 read or wirte 操作
119            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
120                //清除SelectionKey.OP_CONNECT状态
121                int ops = k.interestOps();
122                ops &= ~SelectionKey.OP_CONNECT;
123                k.interestOps(ops);
124
125                unsafe.finishConnect();
126            }
127            
128            //ByteBuffer 发送出去
129            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
130                 ch.unsafe().forceFlush();
131            }
132
133            //netty将OP_READ,OP_ACCEPT 状态统一执行read操作,那netty如何区分 read accept的呢,后面才分析
134            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
135                unsafe.read();
136            }
137        } catch (CancelledKeyException ignored) {
138            unsafe.close(unsafe.voidPromise());
139        }
140    }
141    
142    //处理任务,失败策略执行注销处理
143    private static void processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task) {
144        try {
145            task.channelReady(k.channel(), k);
146            if (!k.isValid()) {
147                 task.channelUnregistered(k.channel(), null);
148            }
149        } catch (Exception e) {
150            k.cancel();
151            task.channelUnregistered(k.channel(), null);
152        }  
153    }
154

 

总结:

1.防cpu假死,超过一定时间重建Selector迁移SelectionKey

2.用反射技术替换Selector selectedKeySet字段,Set集合用到double cache技术

3.优先处理io任务,剩下时间处理非IO任务,通过ioRatio占比分配执行时间

4.在分发IO任务时做了大量的优化处理,如线程中断,读写IO、链路建立处理优先级,Selector 重建情况等

5.逻辑有时看起来好怪,再加上解决问题是修修补补的没经过优化代码,甚至作者没有经过测试就合并了,这是开源框架的通病

 

给TA打赏
共{{data.count}}人
人已打赏
安全技术

使用bootstrap的栅栏实现五列布局

2021-12-21 16:36:11

安全技术

从零搭建自己的SpringBoot后台框架(二十三)

2022-1-12 12:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索