Netty源码分析第2章(NioEventLoop)—->第6节: 执行select操作

释放双眼,带上耳机,听听看~!

 

Netty源码分析第二章: NioEventLoop

** **

第六节: 执行select操作

 

分析完了selector的创建和优化的过程, 这一小节分析select相关操作

跟到跟到select操作的入口,NioEventLoop的run方法:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
1protected void run() {
2    for (;;) {
3        try {
4            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
5                case SelectStrategy.CONTINUE:
6                    continue;
7                case SelectStrategy.SELECT:
8                    //轮询io事件(1)
9                    select(wakenUp.getAndSet(false));
10                    if (wakenUp.get()) {
11                        selector.wakeup();
12                    }
13                default:
14            }
15            cancelledKeys = 0;
16            needsToSelectAgain = false;
17            //默认是50
18            final int ioRatio = this.ioRatio;
19            if (ioRatio == 100) {
20                try {
21                    processSelectedKeys();
22                } finally {
23                    runAllTasks();
24                }
25            } else {
26                //记录下开始时间
27                final long ioStartTime = System.nanoTime();
28                try {
29                    //处理轮询到的key(2)
30                    processSelectedKeys();
31                } finally {
32                    //计算耗时
33                    final long ioTime = System.nanoTime() - ioStartTime;
34                    //执行task(3)
35                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
36                }
37            }
38        } catch (Throwable t) {
39            handleLoopException(t);
40        }
41        //代码省略
42    }
43}
44

代码比较长, 其实主要分为三部分:

  1. 轮询io事件

  2. 处理轮询到的key

  3. 执行task

这一小节, 主要剖析第一部分, 轮询io事件

首先switch块中默认会走到SelectStrategy.SELECT中, 执行select(wakenUp.getAndSet(false))方法

参数wakenUp.getAndSet(false)代表当前select操作是未唤醒状态

进入到select(wakenUp.getAndSet(false))方法中:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
1private void select(boolean oldWakenUp) throws IOException {
2    Selector selector = this.selector;
3    try {
4        int selectCnt = 0;
5        //当前系统的纳秒数
6        long currentTimeNanos = System.nanoTime();
7        //截止时间=当前时间+队列第一个任务剩余执行时间
8        long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
9        for (;;) {
10            //阻塞时间(毫秒)=(截止时间-当前时间+0.5毫秒)
11            long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
12            if (timeoutMillis <= 0) {
13                if (selectCnt == 0) {
14                    selector.selectNow();
15                    selectCnt = 1;
16                }
17                break;
18            }
19            if (hasTasks() && wakenUp.compareAndSet(false, true)) {
20                selector.selectNow();
21                selectCnt = 1;
22                break;
23            }
24            //进行阻塞式的select操作
25            int selectedKeys = selector.select(timeoutMillis);
26            //轮询次数
27            selectCnt ++;
28            //如果轮询到一个事件(selectedKeys != 0), 或者当前select操作需要唤醒(oldWakenUp),
29            //或者在执行select操作时已经被外部线程唤醒(wakenUp.get()),
30            //或者任务队列已经有任务(hasTask), 或者定时任务队列中有任务(hasScheduledTasks())
31            if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
32                break;
33            }
34            //省略
35            //记录下当前时间
36            long time = System.nanoTime();
37            //当前时间-开始时间>=超时时间(条件成立, 执行过一次select操作, 条件不成立, 有可能发生空轮询)
38            if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
39                //代表已经进行了一次阻塞式select操作, 操作次数重置为1
40                selectCnt = 1;
41            } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
42                //省略日志代码
43                //如果空轮询的次数大于一个阈值(512), 解决空轮询的bug
44                rebuildSelector();
45                selector = this.selector;
46                selector.selectNow();
47                selectCnt = 1;
48                break;
49            }
50            currentTimeNanos = time;
51        }
52        //代码省略
53    } catch (CancelledKeyException e) {
54        //省略
55    }
56}
57

首先通过 
long currentTimeNanos = System.nanoTime() 获取系统的纳秒数

继续往下看:


1
2
1long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
2

delayNanos(currentTimeNanos)代表距定时任务中第一个任务剩余多长时间, 这个时间+当前时间代表这次操作不能超过的时间, 因为超过之后定时任务不能严格按照预定时间执行, 其中定时任务队列是已经按照执行时间有小到大排列好的队列, 所以第一个任务则是最近需要执行的任务, selectDeadLineNanos就代表了当前操作不能超过的时间

然后就进入到了无限for循环

for循环中我们关注:


1
2
1long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L
2

 
selectDeadLineNanos – currentTimeNanos+500000L 代表截止时间
-当前时间
+0.5毫秒的调整时间
, 除以
1000000表示将计算的时间转化为毫秒数

最后算出的时间就是
selector操作的阻塞时间
, 并赋值到局部变量的
timeoutMillis中

 

后面有个判断 
if(imeoutMillis<0) 
, 代表当前时间已经超过了最后截止时间
+0.5毫秒
,  
selectCnt == 0 代表没有进行
select操作
, 满足这两个条件
, 则执行
selectNow()之后
, 将
selectCnt赋值为
1之后跳出循环

 

如果没超过截止时间
, 就进行了 
if(hasTasks() && wakenUp.compareAndSet(
false,
true)) 判断

 

这里我们关注
hasTasks()方法
, 这里是判断当前
NioEventLoop所绑定的
taskQueue是否有任务
, 如果有任务
, 则执行
selectNow()之后
, 将
selectCnt赋值为
1之后跳出循环
(跳出循环之后去执行任务队列中的任务
)

 

hasTasks()
方法可以自己跟一下
, 非常简单

如果没有满足上述条件
, 就会执行 
int selectedKeys = selector.select(timeoutMillis) 进行阻塞式轮询
, 并且自增轮询次数
, 而后会进行如下判断
:


1
2
3
4
1if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
2    break;
3}
4

selectedKeys != 0
代表已经有轮询到的事件
, oldWakenUp代表当前
select操作是否需要唤醒
, wakenUp.get()说明已经被外部线程唤醒
, hasTasks()代表任务队列是否有任务
, hasScheduledTasks()代表定时任务队列是否任务
, 满足条件之一
, 就跳出循环

 

 
long time = System.nanoTime() 记录了当前的时间
, 之后有个判断
:

 
if (time – TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) 这里的意思是当前时间
-阻塞时间

方法开始执行的时间

, 这里说明已经完整的执行完成了一个阻塞的
select()操作
, 将
selectCnt设置成
1

 

如果此条件不成立
, 说明没有完整执行
select()操作
, 可能触发了一次空轮询
, 根据前一个
selectCnt++这步我们知道
, 每触发一次空轮询
selectCnt都会自增

之后会进入第二个判断 
SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD 

 

其中
SELECTOR_AUTO_REBUILD_THRESHOLD默认是
512, 这个判断意思就是空轮询的次数如果超过
512次
, 则会认为是发生了
epoll bug, 这样会通过
rebuildSelector()
方法重新构建
selector, 然后将重新构建的
selector赋值到局部变量
selector, 执行一次
selectNow(), 将
selectCnt初始化
1, 跳出循环

 

**
rebuildSelector()
方法中
, 看
netty是如何解决
epoll bug的
:**


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
1public void rebuildSelector() {
2    //是否是由其他线程发起的
3    if (!inEventLoop()) {
4        //如果是其他线程发起的, 将rebuildSelector()封装成任务队列, 由NioEventLoop进行调用
5        execute(new Runnable() {
6            @Override
7            public void run() {
8                rebuildSelector();
9            }
10        });
11        return;
12    }
13    final Selector oldSelector = selector;
14    final Selector newSelector;
15    if (oldSelector == null) {
16        return;
17    }
18    try {
19        //重新创建一个select
20        newSelector = openSelector();
21    } catch (Exception e) {
22        logger.warn(&quot;Failed to create a new Selector.&quot;, e);
23        return;
24    }
25    int nChannels = 0;
26    for (;;) {
27        try {
28            //拿到旧select中所有的key
29            for (SelectionKey key: oldSelector.keys()) {
30                Object a = key.attachment();
31                try {
32                    Object a = key.attachment();
33                    //代码省略
34
35                    //获取key注册的事件
36                    int interestOps = key.interestOps();
37                    //将key注册的事件取消
38                    key.cancel();
39                    //注册到重新创建的新的selector中
40                    SelectionKey newKey = key.channel().register(newSelector, interestOps, a);
41                    //如果channel是NioChannel
42                    if (a instanceof AbstractNioChannel) {
43                        //重新赋值
44                        ((AbstractNioChannel) a).selectionKey = newKey;
45                    }
46                    nChannels ++;
47                } catch (Exception e) {
48                    //代码省略
49                }
50            }
51        } catch (ConcurrentModificationException e) {
52            continue;
53        }
54        break;
55    }
56    selector = newSelector;
57    //代码省略
58}
59

首先会判断是不是当前
NioEventLoop线程执行的
, 如果不是
, 则将构建方法封装成
task由当前
NioEventLoop执行

 
final Selector oldSelector = selector 表示
拿到旧的
selector

然后通过 
newSelector = openSelector() 创建新的
selector

通过
for循环遍历所有注册在
selector中的
key

 
Object a = key.attachment() 是获取
channel, 第一章讲过
, 在注册时
, 将自身作为属性绑定在
key上

for
循环体中
, 通过 
int interestOps = key.interestOps() 获取其注册的事件

key.cancel()
将注册的事件进行取消

 
SelectionKey newKey = key.channel().register(newSelector, interestOps, a) 将
channel以及注册的事件注册在新的
selector中

 
if (a
instanceof AbstractNioChannel) 判断是不是
NioChannel

如果是
NioChannel, 则通过 
((AbstractNioChannel) a).selectionKey = newKey 将自身的属性
selectionKey赋值为新返回的
key

 
selector = newSelector 将自身
NioEventLoop属性
selector赋值为新创建的
newSelector

至此
, 就是
netty解决
epoll bug的步骤
, 其实就是创建一个新的
selector, 将旧
selector中注册的
channel和事件重新注册到新的
selector中
, 然后将自身
selector属性替换成新创建的
selector

 

 

上一节: 优化selector

下一节: 处理IO事件

 

给TA打赏
共{{data.count}}人
人已打赏
安全技术

用node.js从零开始去写一个简单的爬虫

2021-12-21 16:36:11

安全技术

从零搭建自己的SpringBoot后台框架(二十三)

2022-1-12 12:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索