Netty源码分析第二章: NioEventLoop
第六节: 执行select操作
分析完了selector的创建和优化的过程, 这一小节分析select相关操作
跟到跟到NioEventLoop的run方法:
1
2
3
4
5
6
7
8
9 1protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: //轮询io事件(1)
2 select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } default: } cancelledKeys = 0; needsToSelectAgain = false; //默认是50
3 final int ioRatio = this.ioRatio; if (ioRatio == 100) { try { processSelectedKeys(); } finally { runAllTasks(); } } else { //记录下开始时间
4 final long ioStartTime = System.nanoTime(); try { //处理轮询到的key(2)
5 processSelectedKeys(); } finally { //计算耗时
6 final long ioTime = System.nanoTime() - ioStartTime; //执行task(3)
7 runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) { handleLoopException(t); } //代码省略
8 } }
9
代码比较长, 其实主要分为三部分:
1.轮询io事件
- 处理轮询到的key
3.执行task
这一小节, 主要剖析第一部分, 轮询io事件
首先switch块中默认会走到SelectStrategy.SELECT中, 执行select(wakenUp.getAndSet(false))方法
参数wakenUp.getAndSet(false)代表当前select操作是未唤醒状态
进入到select(wakenUp.getAndSet(false))方法中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14 1private void select(boolean oldWakenUp) throws IOException { Selector selector = this.selector; try { int selectCnt = 0; //当前系统的纳秒数
2 long currentTimeNanos = System.nanoTime(); //截止时间=当前时间+队列第一个任务剩余执行时间
3 long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); for (;;) { //阻塞时间(毫秒)=(截止时间-当前时间+0.5毫秒)
4 long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; if (timeoutMillis <= 0) { if (selectCnt == 0) { selector.selectNow(); selectCnt = 1; } break; } if (hasTasks() && wakenUp.compareAndSet(false, true)) { selector.selectNow(); selectCnt = 1; break; } //进行阻塞式的select操作
5 int selectedKeys = selector.select(timeoutMillis); //轮询次数
6 selectCnt ++; //如果轮询到一个事件(selectedKeys != 0), 或者当前select操作需要唤醒(oldWakenUp), //或者在执行select操作时已经被外部线程唤醒(wakenUp.get()), //或者任务队列已经有任务(hasTask), 或者定时任务队列中有任务(hasScheduledTasks())
7 if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { break; } //省略 //记录下当前时间
8 long time = System.nanoTime(); //当前时间-开始时间>=超时时间(条件成立, 执行过一次select操作, 条件不成立, 有可能发生空轮询)
9 if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { //代表已经进行了一次阻塞式select操作, 操作次数重置为1
10 selectCnt = 1; } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { //省略日志代码 //如果空轮询的次数大于一个阈值(512), 解决空轮询的bug
11 rebuildSelector(); selector = this.selector; selector.selectNow(); selectCnt = 1; break; } currentTimeNanos = time; } //代码省略
12 } catch (CancelledKeyException e) { //省略
13 } }
14
首先通过long currentTimeNanos = System.nanoTime()获取系统的纳秒数
继续往下看:
1
2 1long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
2
delayNanos(currentTimeNanos)代表距定时任务中第一个任务剩余多长时间, 这个时间+当前时间代表这次操作不能超过的时间, 因为超过之后定时任务不能严格按照预定时间执行, 其中定时任务队列是已经按照执行时间有小到大排列好的队列, 所以第一个任务则是最近需要执行的任务, selectDeadLineNanos就代表了当前操作不能超过的时间
然后就进入到了无限for循环
for循环中我们关注:
1
2 1long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L
2
selectDeadLineNanos – currentTimeNanos+500000L
代表截止时间
-当前时间
+0.5毫秒的调整时间
, 除以
1000000表示将计算的时间转化为毫秒数
最后算出的时间就是
selector操作的阻塞时间
, 并赋值到局部变量的
timeoutMillis中
后面有个判断
if(imeoutMillis<0), 代表当前时间已经超过了最后截止时间
+0.5毫秒
, selectCnt == 0代表没有进行
select操作
, 满足这两个条件
, 则执行
selectNow()之后
, 将
selectCnt赋值为
1之后跳出循环
如果没超过截止时间
, 就进行了
if(hasTasks() && wakenUp.compareAndSet(false, true))判断
这里我们只需要关注
hasTasks()方法就行
, 这里是判断当前
NioEventLoop所绑定的
taskQueue是否有任务
, 如果有任务
, 则执行
selectNow()之后
, 将
selectCnt赋值为
1之后跳出循环
(跳出循环之后去执行任务队列中的任务
)
hasTasks()
方法可以自己跟一下
, 非常简单
如果没有满足上述条件
, 就会执行
int selectedKeys = selector.select(timeoutMillis)进行阻塞式轮询
, 并且自增轮询次数
, 而后会进行如下判断
:
1
2 1if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { break; }
2
selectedKeys != 0
代表已经有轮询到的事件
, oldWakenUp代表当前
select操作是否需要唤醒
, wakenUp.get()说明已经被外部线程唤醒
, hasTasks()代表任务队列是否有任务
, hasScheduledTasks()代表定时任务队列是否任务
, 满足条件之一
, 就跳出循环
long time = System.nanoTime()
记录了当前的时间
, 之后有个判断
:
if (time – TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos)
这里的意思是当前时间
-阻塞时间
方法开始执行的时间
, 这里说明已经完整的执行完成了一个阻塞的
select()操作
, 将
selectCnt设置成
1
如果此条件不成立
, 说明没有完整执行
select()操作
, 可能触发了一次空轮询
, 根据前一个
selectCnt++这步我们知道
, 每触发一次空轮询
selectCnt都会自增
之后会进入第二个判断
SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD
其中
SELECTOR_AUTO_REBUILD_THRESHOLD默认是
512, 这个判断意思就是空轮询的次数如果超过
512次
, 则会认为是发生了
epoll bug, 这样会通过
rebuildSelector()
方法重新构建
selector, 然后将重新构建的
selector赋值到局部变量
selector, 执行一次
selectNow(), 将
selectCnt初始化
1, 跳出循环
rebuildSelector()
方法中
, 看
netty是如何解决
epoll bug的
:
1
2
3
4
5
6
7
8
9
10
11
12
13 1public void rebuildSelector() { //是否是由其他线程发起的
2 if (!inEventLoop()) { //如果是其他线程发起的, 将rebuildSelector()封装成任务队列, 由NioEventLoop进行调用
3 execute(new Runnable() { @Override public void run() { rebuildSelector(); } }); return; } final Selector oldSelector = selector; final Selector newSelector; if (oldSelector == null) { return; } try { //重新创建一个select
4 newSelector = openSelector(); } catch (Exception e) { logger.warn("Failed to create a new Selector.", e); return; } int nChannels = 0; for (;;) { try { //拿到旧select中所有的key
5 for (SelectionKey key: oldSelector.keys()) { Object a = key.attachment(); try { Object a = key.attachment(); //代码省略 //获取key注册的事件
6 int interestOps = key.interestOps(); //将key注册的事件取消
7 key.cancel(); //注册到重新创建的新的selector中
8 SelectionKey newKey = key.channel().register(newSelector, interestOps, a); //如果channel是NioChannel
9 if (a instanceof AbstractNioChannel) { //重新赋值
10 ((AbstractNioChannel) a).selectionKey = newKey; } nChannels ++; } catch (Exception e) { //代码省略
11 } } } catch (ConcurrentModificationException e) { continue; } break; } selector = newSelector; //代码省略
12}
13
首先会判断是不是当前
NioEventLoop线程执行的
, 如果不是
, 则将构建方法封装成
task由当前
NioEventLoop执行
final Selector oldSelector = selector
拿到旧的
selector
然后通过
newSelector = openSelector()创建新的
selector
通过
for循环遍历所有注册在
selector中的
key
Object a = key.attachment()
是获取
channel, 第一章讲过
, 在注册时
, 将自身作为属性绑定在
key上
for
循环体中
, 通过
int interestOps = key.interestOps()获取其注册的事件
key.cancel()
将注册的事件进行取消
SelectionKey newKey = key.channel().register(newSelector, interestOps, a)
将
channel以及注册的事件注册在新的
selector中
if (a instanceof AbstractNioChannel)
判断是不是
NioChannel
如果是
NioChannel, 则通过
((AbstractNioChannel) a).selectionKey = newKey将自身的属性
selectionKey赋值为新返回的
key
selector = newSelector
将自身
NioEventLoop属性
selector赋值为新创建的
newSelector
至此
, 就是
netty解决
epoll bug的步骤
, 其实就是创建一个新的
selector, 将旧
selector中注册的
channel和事件重新注册到新的
selector中
, 然后将自身
selector属性替换成新创建的
selector